diff --git a/satrs-example/pyclient/.gitignore b/satrs-example/pyclient/.gitignore index 894ba5d..d994678 100644 --- a/satrs-example/pyclient/.gitignore +++ b/satrs-example/pyclient/.gitignore @@ -6,3 +6,4 @@ __pycache__ !/.idea/runConfigurations /seqcnt.txt +/.tmtc-history.txt diff --git a/satrs-example/pyclient/common.py b/satrs-example/pyclient/common.py index c2d0777..8f57e54 100644 --- a/satrs-example/pyclient/common.py +++ b/satrs-example/pyclient/common.py @@ -44,10 +44,6 @@ class AcsHkIds(enum.IntEnum): MGM_SET = 1 -class HkOpCodes: - GENERATE_ONE_SHOT = ["0", "oneshot"] - - def make_addressable_id(target_id: int, unique_id: int) -> bytes: byte_string = bytearray(struct.pack("!I", target_id)) byte_string.extend(struct.pack("!I", unique_id)) diff --git a/satrs-example/pyclient/main.py b/satrs-example/pyclient/main.py index c749e26..66a41e4 100755 --- a/satrs-example/pyclient/main.py +++ b/satrs-example/pyclient/main.py @@ -4,6 +4,8 @@ import logging import sys import time from typing import Optional +from prompt_toolkit.history import History +from prompt_toolkit.history import FileHistory import tmtccmd from spacepackets.ecss import PusTelemetry, PusVerificator @@ -11,16 +13,16 @@ from spacepackets.ecss.pus_17_test import Service17Tm from spacepackets.ecss.pus_1_verification import UnpackParams, Service1Tm from spacepackets.ccsds.time import CdsShortTimestamp -from tmtccmd import CcsdsTmtcBackend, TcHandlerBase, ProcedureParamsWrapper +from tmtccmd import TcHandlerBase, ProcedureParamsWrapper from tmtccmd.core.base import BackendRequest from tmtccmd.pus import VerificationWrapper from tmtccmd.tmtc import CcsdsTmHandler, SpecificApidHandlerBase from tmtccmd.com import ComInterface from tmtccmd.config import ( + CmdTreeNode, default_json_path, SetupParams, HookBase, - TmtcDefinitionWrapper, params_to_procedure_conversion, ) from tmtccmd.config import PreArgsParsingWrapper, SetupWrapper @@ -39,12 +41,11 @@ from tmtccmd.tmtc import ( DefaultPusQueueHelper, QueueWrapper, ) -from tmtccmd.util import FileSeqCountProvider, PusFileSeqCountProvider +from spacepackets.seqcount import FileSeqCountProvider, PusFileSeqCountProvider from tmtccmd.util.obj_id import ObjectIdDictT import pus_tc -import tc_definitions from common import EXAMPLE_PUS_APID, TM_PACKET_IDS, EventU32 _LOGGER = logging.getLogger() @@ -54,25 +55,29 @@ class SatRsConfigHook(HookBase): def __init__(self, json_cfg_path: str): super().__init__(json_cfg_path=json_cfg_path) - def assign_communication_interface(self, com_if_key: str) -> Optional[ComInterface]: + def get_communication_interface(self, com_if_key: str) -> Optional[ComInterface]: from tmtccmd.config.com import ( create_com_interface_default, create_com_interface_cfg_default, ) + assert self.cfg_path is not None cfg = create_com_interface_cfg_default( com_if_key=com_if_key, json_cfg_path=self.cfg_path, space_packet_ids=TM_PACKET_IDS, ) + assert cfg is not None return create_com_interface_default(cfg) - def get_tmtc_definitions(self) -> TmtcDefinitionWrapper: - return tc_definitions.tc_definitions() + def get_command_definitions(self) -> CmdTreeNode: + """This function should return the root node of the command definition tree.""" + return pus_tc.create_cmd_definition_tree() - def perform_mode_operation(self, tmtc_backend: CcsdsTmtcBackend, mode: int): - _LOGGER.info("Mode operation hook was called") - pass + def get_cmd_history(self) -> Optional[History]: + """Optionlly return a history class for the past command paths which will be used + when prompting a command path from the user in CLI mode.""" + return FileHistory(".tmtc-history.txt") def get_object_ids(self) -> ObjectIdDictT: from tmtccmd.config.objects import get_core_object_ids @@ -94,15 +99,12 @@ class PusHandler(SpecificApidHandlerBase): def handle_tm(self, packet: bytes, _user_args: any): try: - tm_packet = PusTelemetry.unpack( - packet, time_reader=CdsShortTimestamp.empty() - ) + pus_tm = PusTelemetry.unpack(packet, time_reader=CdsShortTimestamp.empty()) except ValueError as e: _LOGGER.warning("Could not generate PUS TM object from raw data") _LOGGER.warning(f"Raw Packet: [{packet.hex(sep=',')}], REPR: {packet!r}") raise e - service = tm_packet.service - dedicated_handler = False + service = pus_tm.service if service == 1: tm_packet = Service1Tm.unpack( data=packet, params=UnpackParams(CdsShortTimestamp.empty(), 1, 2) @@ -119,8 +121,7 @@ class PusHandler(SpecificApidHandlerBase): else: self.verif_wrapper.log_to_console(tm_packet, res) self.verif_wrapper.log_to_file(tm_packet, res) - dedicated_handler = True - if service == 3: + elif service == 3: _LOGGER.info("No handling for HK packets implemented") _LOGGER.info(f"Raw packet: 0x[{packet.hex(sep=',')}]") pus_tm = PusTelemetry.unpack(packet, time_reader=CdsShortTimestamp.empty()) @@ -129,8 +130,7 @@ class PusHandler(SpecificApidHandlerBase): raise ValueError("No addressable ID in HK packet") json_str = pus_tm.source_data[8:] _LOGGER.info(json_str) - dedicated_handler = True - if service == 5: + elif service == 5: tm_packet = PusTelemetry.unpack( packet, time_reader=CdsShortTimestamp.empty() ) @@ -139,11 +139,10 @@ class PusHandler(SpecificApidHandlerBase): _LOGGER.info(f"Received event packet. Event: {event_u32}") if event_u32.group_id == 0 and event_u32.unique_id == 0: _LOGGER.info("Received test event") - if service == 17: + elif service == 17: tm_packet = Service17Tm.unpack( packet, time_reader=CdsShortTimestamp.empty() ) - dedicated_handler = True if tm_packet.subservice == 2: self.file_logger.info("Received Ping Reply TM[17,2]") _LOGGER.info("Received Ping Reply TM[17,2]") @@ -154,17 +153,14 @@ class PusHandler(SpecificApidHandlerBase): _LOGGER.info( f"Received Test Packet with unknown subservice {tm_packet.subservice}" ) - if tm_packet is None: + else: _LOGGER.info( f"The service {service} is not implemented in Telemetry Factory" ) tm_packet = PusTelemetry.unpack( packet, time_reader=CdsShortTimestamp.empty() ) - self.raw_logger.log_tm(tm_packet) - if not dedicated_handler and tm_packet is not None: - pass - # self.printer.handle_long_tm_print(packet_if=tm_packet, info_if=tm_packet) + self.raw_logger.log_tm(pus_tm) class TcHandler(TcHandlerBase): @@ -196,22 +192,18 @@ class TcHandler(TcHandlerBase): log_entry = entry_helper.to_log_entry() _LOGGER.info(log_entry.log_str) - def queue_finished_cb(self, helper: ProcedureWrapper): - if helper.proc_type == TcProcedureType.DEFAULT: - def_proc = helper.to_def_procedure() - _LOGGER.info( - f"Queue handling finished for service {def_proc.service} and " - f"op code {def_proc.op_code}" - ) + def queue_finished_cb(self, info: ProcedureWrapper): + if info.proc_type == TcProcedureType.DEFAULT: + def_proc = info.to_def_procedure() + _LOGGER.info(f"Queue handling finished for command {def_proc.cmd_path}") - def feed_cb(self, helper: ProcedureWrapper, wrapper: FeedWrapper): + def feed_cb(self, info: ProcedureWrapper, wrapper: FeedWrapper): q = self.queue_helper q.queue_wrapper = wrapper.queue_wrapper - if helper.proc_type == TcProcedureType.DEFAULT: - def_proc = helper.to_def_procedure() - service = def_proc.service - op_code = def_proc.op_code - pus_tc.pack_pus_telecommands(q, service, op_code) + if info.proc_type == TcProcedureType.DEFAULT: + def_proc = info.to_def_procedure() + assert def_proc.cmd_path is not None + pus_tc.pack_pus_telecommands(q, def_proc.cmd_path) def main(): diff --git a/satrs-example/pyclient/pus_tc.py b/satrs-example/pyclient/pus_tc.py index 9996e5b..706d3ba 100644 --- a/satrs-example/pyclient/pus_tc.py +++ b/satrs-example/pyclient/pus_tc.py @@ -1,50 +1,84 @@ import datetime +import logging from spacepackets.ccsds import CdsShortTimestamp from spacepackets.ecss import PusTelecommand -from tmtccmd.config import CoreServiceList +from tmtccmd.config import CmdTreeNode from tmtccmd.tmtc import DefaultPusQueueHelper from tmtccmd.pus.s11_tc_sched import create_time_tagged_cmd from tmtccmd.pus.tc.s3_fsfw_hk import create_request_one_hk_command from common import ( EXAMPLE_PUS_APID, - HkOpCodes, make_addressable_id, RequestTargetId, AcsHkIds, ) +_LOGGER = logging.getLogger(__name__) -def pack_pus_telecommands(q: DefaultPusQueueHelper, service: str, op_code: str): - if ( - service == CoreServiceList.SERVICE_17 - or service == CoreServiceList.SERVICE_17_ALT - ): - if op_code == "ping": + +def create_cmd_definition_tree() -> CmdTreeNode: + + root_node = CmdTreeNode.root_node() + + test_node = CmdTreeNode("test", "Test Node") + test_node.add_child(CmdTreeNode("ping", "Send PUS ping TC")) + test_node.add_child(CmdTreeNode("trigger_event", "Send PUS test to trigger event")) + root_node.add_child(test_node) + + scheduler_node = CmdTreeNode("scheduler", "Scheduler Node") + scheduler_node.add_child( + CmdTreeNode( + "schedule_ping_10_secs_ahead", "Schedule Ping to execute in 10 seconds" + ) + ) + root_node.add_child(scheduler_node) + + acs_node = CmdTreeNode("acs", "ACS Subsystem Node") + mgm_node = CmdTreeNode("mgms", "MGM devices node") + mgm_node.add_child(CmdTreeNode("one_shot_hk", "Request one shot HK")) + acs_node.add_child(mgm_node) + root_node.add_child(acs_node) + + return root_node + + +def pack_pus_telecommands(q: DefaultPusQueueHelper, cmd_path: str): + cmd_path_list = cmd_path.split("/") + if len(cmd_path_list) == 0: + _LOGGER.warning("empty command path") + return + if cmd_path_list[0] == "test": + assert len(cmd_path_list) >= 2 + if cmd_path_list[1] == "ping": q.add_log_cmd("Sending PUS ping telecommand") return q.add_pus_tc(PusTelecommand(service=17, subservice=1)) - elif op_code == "trigger_event": + elif cmd_path_list[1] == "trigger_event": q.add_log_cmd("Triggering test event") return q.add_pus_tc(PusTelecommand(service=17, subservice=128)) - if service == CoreServiceList.SERVICE_11: - q.add_log_cmd("Sending PUS scheduled TC telecommand") - crt_time = CdsShortTimestamp.from_now() - time_stamp = crt_time + datetime.timedelta(seconds=10) - time_stamp = time_stamp.pack() - return q.add_pus_tc( - create_time_tagged_cmd( - time_stamp, - PusTelecommand(service=17, subservice=1), - apid=EXAMPLE_PUS_APID, - ) - ) - if service == CoreServiceList.SERVICE_3: - if op_code in HkOpCodes.GENERATE_ONE_SHOT: - q.add_log_cmd("Sending HK one shot request") - q.add_pus_tc( - create_request_one_hk_command( - make_addressable_id(RequestTargetId.ACS, AcsHkIds.MGM_SET) + if cmd_path_list[0] == "scheduler": + assert len(cmd_path_list) >= 2 + if cmd_path_list[1] == "schedule_ping_10_secs_ahead": + q.add_log_cmd("Sending PUS scheduled TC telecommand") + crt_time = CdsShortTimestamp.from_now() + time_stamp = crt_time + datetime.timedelta(seconds=10) + time_stamp = time_stamp.pack() + return q.add_pus_tc( + create_time_tagged_cmd( + time_stamp, + PusTelecommand(service=17, subservice=1), + apid=EXAMPLE_PUS_APID, ) ) - pass + if cmd_path_list[0] == "acs": + assert len(cmd_path_list) >= 2 + if cmd_path_list[1] == "mgm": + assert len(cmd_path_list) >= 3 + if cmd_path_list[2] == "one_shot_hk": + q.add_log_cmd("Sending HK one shot request") + q.add_pus_tc( + create_request_one_hk_command( + make_addressable_id(RequestTargetId.ACS, AcsHkIds.MGM_SET) + ) + ) diff --git a/satrs-example/pyclient/requirements.txt b/satrs-example/pyclient/requirements.txt index 485c76a..b3f6f2a 100644 --- a/satrs-example/pyclient/requirements.txt +++ b/satrs-example/pyclient/requirements.txt @@ -1,2 +1,2 @@ -tmtccmd == 7.0.0 +tmtccmd == 8.0.0rc1 # -e git+https://github.com/robamu-org/tmtccmd@97e5e51101a08b21472b3ddecc2063359f7e307a#egg=tmtccmd