bump tmtccmd
Rust/sat-rs/pipeline/pr-main This commit looks good Details

This commit is contained in:
Robin Müller 2024-01-31 12:28:45 +01:00
parent e2086391bc
commit 134feeb1b4
Signed by: muellerr
GPG Key ID: A649FB78196E3849
5 changed files with 95 additions and 72 deletions

View File

@ -6,3 +6,4 @@ __pycache__
!/.idea/runConfigurations
/seqcnt.txt
/.tmtc-history.txt

View File

@ -44,10 +44,6 @@ class AcsHkIds(enum.IntEnum):
MGM_SET = 1
class HkOpCodes:
GENERATE_ONE_SHOT = ["0", "oneshot"]
def make_addressable_id(target_id: int, unique_id: int) -> bytes:
byte_string = bytearray(struct.pack("!I", target_id))
byte_string.extend(struct.pack("!I", unique_id))

View File

@ -4,6 +4,8 @@ import logging
import sys
import time
from typing import Optional
from prompt_toolkit.history import History
from prompt_toolkit.history import FileHistory
import tmtccmd
from spacepackets.ecss import PusTelemetry, PusVerificator
@ -11,16 +13,16 @@ from spacepackets.ecss.pus_17_test import Service17Tm
from spacepackets.ecss.pus_1_verification import UnpackParams, Service1Tm
from spacepackets.ccsds.time import CdsShortTimestamp
from tmtccmd import CcsdsTmtcBackend, TcHandlerBase, ProcedureParamsWrapper
from tmtccmd import TcHandlerBase, ProcedureParamsWrapper
from tmtccmd.core.base import BackendRequest
from tmtccmd.pus import VerificationWrapper
from tmtccmd.tmtc import CcsdsTmHandler, SpecificApidHandlerBase
from tmtccmd.com import ComInterface
from tmtccmd.config import (
CmdTreeNode,
default_json_path,
SetupParams,
HookBase,
TmtcDefinitionWrapper,
params_to_procedure_conversion,
)
from tmtccmd.config import PreArgsParsingWrapper, SetupWrapper
@ -39,12 +41,11 @@ from tmtccmd.tmtc import (
DefaultPusQueueHelper,
QueueWrapper,
)
from tmtccmd.util import FileSeqCountProvider, PusFileSeqCountProvider
from spacepackets.seqcount import FileSeqCountProvider, PusFileSeqCountProvider
from tmtccmd.util.obj_id import ObjectIdDictT
import pus_tc
import tc_definitions
from common import EXAMPLE_PUS_APID, TM_PACKET_IDS, EventU32
_LOGGER = logging.getLogger()
@ -54,25 +55,29 @@ class SatRsConfigHook(HookBase):
def __init__(self, json_cfg_path: str):
super().__init__(json_cfg_path=json_cfg_path)
def assign_communication_interface(self, com_if_key: str) -> Optional[ComInterface]:
def get_communication_interface(self, com_if_key: str) -> Optional[ComInterface]:
from tmtccmd.config.com import (
create_com_interface_default,
create_com_interface_cfg_default,
)
assert self.cfg_path is not None
cfg = create_com_interface_cfg_default(
com_if_key=com_if_key,
json_cfg_path=self.cfg_path,
space_packet_ids=TM_PACKET_IDS,
)
assert cfg is not None
return create_com_interface_default(cfg)
def get_tmtc_definitions(self) -> TmtcDefinitionWrapper:
return tc_definitions.tc_definitions()
def get_command_definitions(self) -> CmdTreeNode:
"""This function should return the root node of the command definition tree."""
return pus_tc.create_cmd_definition_tree()
def perform_mode_operation(self, tmtc_backend: CcsdsTmtcBackend, mode: int):
_LOGGER.info("Mode operation hook was called")
pass
def get_cmd_history(self) -> Optional[History]:
"""Optionlly return a history class for the past command paths which will be used
when prompting a command path from the user in CLI mode."""
return FileHistory(".tmtc-history.txt")
def get_object_ids(self) -> ObjectIdDictT:
from tmtccmd.config.objects import get_core_object_ids
@ -94,15 +99,12 @@ class PusHandler(SpecificApidHandlerBase):
def handle_tm(self, packet: bytes, _user_args: any):
try:
tm_packet = PusTelemetry.unpack(
packet, time_reader=CdsShortTimestamp.empty()
)
pus_tm = PusTelemetry.unpack(packet, time_reader=CdsShortTimestamp.empty())
except ValueError as e:
_LOGGER.warning("Could not generate PUS TM object from raw data")
_LOGGER.warning(f"Raw Packet: [{packet.hex(sep=',')}], REPR: {packet!r}")
raise e
service = tm_packet.service
dedicated_handler = False
service = pus_tm.service
if service == 1:
tm_packet = Service1Tm.unpack(
data=packet, params=UnpackParams(CdsShortTimestamp.empty(), 1, 2)
@ -119,8 +121,7 @@ class PusHandler(SpecificApidHandlerBase):
else:
self.verif_wrapper.log_to_console(tm_packet, res)
self.verif_wrapper.log_to_file(tm_packet, res)
dedicated_handler = True
if service == 3:
elif service == 3:
_LOGGER.info("No handling for HK packets implemented")
_LOGGER.info(f"Raw packet: 0x[{packet.hex(sep=',')}]")
pus_tm = PusTelemetry.unpack(packet, time_reader=CdsShortTimestamp.empty())
@ -129,8 +130,7 @@ class PusHandler(SpecificApidHandlerBase):
raise ValueError("No addressable ID in HK packet")
json_str = pus_tm.source_data[8:]
_LOGGER.info(json_str)
dedicated_handler = True
if service == 5:
elif service == 5:
tm_packet = PusTelemetry.unpack(
packet, time_reader=CdsShortTimestamp.empty()
)
@ -139,11 +139,10 @@ class PusHandler(SpecificApidHandlerBase):
_LOGGER.info(f"Received event packet. Event: {event_u32}")
if event_u32.group_id == 0 and event_u32.unique_id == 0:
_LOGGER.info("Received test event")
if service == 17:
elif service == 17:
tm_packet = Service17Tm.unpack(
packet, time_reader=CdsShortTimestamp.empty()
)
dedicated_handler = True
if tm_packet.subservice == 2:
self.file_logger.info("Received Ping Reply TM[17,2]")
_LOGGER.info("Received Ping Reply TM[17,2]")
@ -154,17 +153,14 @@ class PusHandler(SpecificApidHandlerBase):
_LOGGER.info(
f"Received Test Packet with unknown subservice {tm_packet.subservice}"
)
if tm_packet is None:
else:
_LOGGER.info(
f"The service {service} is not implemented in Telemetry Factory"
)
tm_packet = PusTelemetry.unpack(
packet, time_reader=CdsShortTimestamp.empty()
)
self.raw_logger.log_tm(tm_packet)
if not dedicated_handler and tm_packet is not None:
pass
# self.printer.handle_long_tm_print(packet_if=tm_packet, info_if=tm_packet)
self.raw_logger.log_tm(pus_tm)
class TcHandler(TcHandlerBase):
@ -196,22 +192,18 @@ class TcHandler(TcHandlerBase):
log_entry = entry_helper.to_log_entry()
_LOGGER.info(log_entry.log_str)
def queue_finished_cb(self, helper: ProcedureWrapper):
if helper.proc_type == TcProcedureType.DEFAULT:
def_proc = helper.to_def_procedure()
_LOGGER.info(
f"Queue handling finished for service {def_proc.service} and "
f"op code {def_proc.op_code}"
)
def queue_finished_cb(self, info: ProcedureWrapper):
if info.proc_type == TcProcedureType.DEFAULT:
def_proc = info.to_def_procedure()
_LOGGER.info(f"Queue handling finished for command {def_proc.cmd_path}")
def feed_cb(self, helper: ProcedureWrapper, wrapper: FeedWrapper):
def feed_cb(self, info: ProcedureWrapper, wrapper: FeedWrapper):
q = self.queue_helper
q.queue_wrapper = wrapper.queue_wrapper
if helper.proc_type == TcProcedureType.DEFAULT:
def_proc = helper.to_def_procedure()
service = def_proc.service
op_code = def_proc.op_code
pus_tc.pack_pus_telecommands(q, service, op_code)
if info.proc_type == TcProcedureType.DEFAULT:
def_proc = info.to_def_procedure()
assert def_proc.cmd_path is not None
pus_tc.pack_pus_telecommands(q, def_proc.cmd_path)
def main():

View File

@ -1,50 +1,84 @@
import datetime
import logging
from spacepackets.ccsds import CdsShortTimestamp
from spacepackets.ecss import PusTelecommand
from tmtccmd.config import CoreServiceList
from tmtccmd.config import CmdTreeNode
from tmtccmd.tmtc import DefaultPusQueueHelper
from tmtccmd.pus.s11_tc_sched import create_time_tagged_cmd
from tmtccmd.pus.tc.s3_fsfw_hk import create_request_one_hk_command
from common import (
EXAMPLE_PUS_APID,
HkOpCodes,
make_addressable_id,
RequestTargetId,
AcsHkIds,
)
_LOGGER = logging.getLogger(__name__)
def pack_pus_telecommands(q: DefaultPusQueueHelper, service: str, op_code: str):
if (
service == CoreServiceList.SERVICE_17
or service == CoreServiceList.SERVICE_17_ALT
):
if op_code == "ping":
def create_cmd_definition_tree() -> CmdTreeNode:
root_node = CmdTreeNode.root_node()
test_node = CmdTreeNode("test", "Test Node")
test_node.add_child(CmdTreeNode("ping", "Send PUS ping TC"))
test_node.add_child(CmdTreeNode("trigger_event", "Send PUS test to trigger event"))
root_node.add_child(test_node)
scheduler_node = CmdTreeNode("scheduler", "Scheduler Node")
scheduler_node.add_child(
CmdTreeNode(
"schedule_ping_10_secs_ahead", "Schedule Ping to execute in 10 seconds"
)
)
root_node.add_child(scheduler_node)
acs_node = CmdTreeNode("acs", "ACS Subsystem Node")
mgm_node = CmdTreeNode("mgms", "MGM devices node")
mgm_node.add_child(CmdTreeNode("one_shot_hk", "Request one shot HK"))
acs_node.add_child(mgm_node)
root_node.add_child(acs_node)
return root_node
def pack_pus_telecommands(q: DefaultPusQueueHelper, cmd_path: str):
cmd_path_list = cmd_path.split("/")
if len(cmd_path_list) == 0:
_LOGGER.warning("empty command path")
return
if cmd_path_list[0] == "test":
assert len(cmd_path_list) >= 2
if cmd_path_list[1] == "ping":
q.add_log_cmd("Sending PUS ping telecommand")
return q.add_pus_tc(PusTelecommand(service=17, subservice=1))
elif op_code == "trigger_event":
elif cmd_path_list[1] == "trigger_event":
q.add_log_cmd("Triggering test event")
return q.add_pus_tc(PusTelecommand(service=17, subservice=128))
if service == CoreServiceList.SERVICE_11:
q.add_log_cmd("Sending PUS scheduled TC telecommand")
crt_time = CdsShortTimestamp.from_now()
time_stamp = crt_time + datetime.timedelta(seconds=10)
time_stamp = time_stamp.pack()
return q.add_pus_tc(
create_time_tagged_cmd(
time_stamp,
PusTelecommand(service=17, subservice=1),
apid=EXAMPLE_PUS_APID,
)
)
if service == CoreServiceList.SERVICE_3:
if op_code in HkOpCodes.GENERATE_ONE_SHOT:
q.add_log_cmd("Sending HK one shot request")
q.add_pus_tc(
create_request_one_hk_command(
make_addressable_id(RequestTargetId.ACS, AcsHkIds.MGM_SET)
if cmd_path_list[0] == "scheduler":
assert len(cmd_path_list) >= 2
if cmd_path_list[1] == "schedule_ping_10_secs_ahead":
q.add_log_cmd("Sending PUS scheduled TC telecommand")
crt_time = CdsShortTimestamp.from_now()
time_stamp = crt_time + datetime.timedelta(seconds=10)
time_stamp = time_stamp.pack()
return q.add_pus_tc(
create_time_tagged_cmd(
time_stamp,
PusTelecommand(service=17, subservice=1),
apid=EXAMPLE_PUS_APID,
)
)
pass
if cmd_path_list[0] == "acs":
assert len(cmd_path_list) >= 2
if cmd_path_list[1] == "mgm":
assert len(cmd_path_list) >= 3
if cmd_path_list[2] == "one_shot_hk":
q.add_log_cmd("Sending HK one shot request")
q.add_pus_tc(
create_request_one_hk_command(
make_addressable_id(RequestTargetId.ACS, AcsHkIds.MGM_SET)
)
)

View File

@ -1,2 +1,2 @@
tmtccmd == 7.0.0
tmtccmd == 8.0.0rc1
# -e git+https://github.com/robamu-org/tmtccmd@97e5e51101a08b21472b3ddecc2063359f7e307a#egg=tmtccmd