From d1476eb7708afb043e2c14af08e41bf4b124ba3e Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Thu, 9 May 2024 11:41:11 +0200 Subject: [PATCH] added basic tests for pytmtc app --- satrs-example/pytmtc/main.py | 199 +-------------------- satrs-example/pytmtc/pytmtc/config.py | 47 +++++ satrs-example/pytmtc/pytmtc/pus_tc.py | 55 +++++- satrs-example/pytmtc/pytmtc/pus_tm.py | 98 ++++++++++ satrs-example/pytmtc/tests/__init__.py | 0 satrs-example/pytmtc/tests/test_tc_mods.py | 48 +++++ 6 files changed, 255 insertions(+), 192 deletions(-) create mode 100644 satrs-example/pytmtc/pytmtc/config.py create mode 100644 satrs-example/pytmtc/tests/__init__.py create mode 100644 satrs-example/pytmtc/tests/test_tc_mods.py diff --git a/satrs-example/pytmtc/main.py b/satrs-example/pytmtc/main.py index bf7223a..d8a55b8 100755 --- a/satrs-example/pytmtc/main.py +++ b/satrs-example/pytmtc/main.py @@ -3,27 +3,17 @@ import logging import sys import time -from typing import Any, Optional -from prompt_toolkit.history import History -from prompt_toolkit.history import FileHistory -from spacepackets.ccsds import PacketId, PacketType import tmtccmd -from spacepackets.ecss import PusTelemetry, PusVerificator -from spacepackets.ecss.pus_17_test import Service17Tm -from spacepackets.ecss.pus_1_verification import UnpackParams, Service1Tm -from spacepackets.ccsds.time import CdsShortTimestamp +from spacepackets.ecss import PusVerificator -from tmtccmd import TcHandlerBase, ProcedureParamsWrapper +from tmtccmd import ProcedureParamsWrapper from tmtccmd.core.base import BackendRequest from tmtccmd.pus import VerificationWrapper -from tmtccmd.tmtc import CcsdsTmHandler, GenericApidHandlerBase -from tmtccmd.com import ComInterface +from tmtccmd.tmtc import CcsdsTmHandler from tmtccmd.config import ( - CmdTreeNode, default_json_path, SetupParams, - HookBase, params_to_procedure_conversion, ) from tmtccmd.config import PreArgsParsingWrapper, SetupWrapper @@ -33,193 +23,20 @@ from tmtccmd.logging.pus import ( RawTmtcTimedLogWrapper, TimedLogWhen, ) -from tmtccmd.tmtc import ( - TcQueueEntryType, - ProcedureWrapper, - TcProcedureType, - FeedWrapper, - SendCbParams, - DefaultPusQueueHelper, - QueueWrapper, -) -from spacepackets.seqcount import FileSeqCountProvider, PusFileSeqCountProvider -from tmtccmd.util.obj_id import ObjectIdDictT +from spacepackets.seqcount import PusFileSeqCountProvider -from pytmtc.pus_tc import pack_pus_telecommands, create_cmd_definition_tree -from pytmtc.common import Apid, EventU32 +from pytmtc.config import SatrsConfigHook +from pytmtc.pus_tc import TcHandler +from pytmtc.pus_tm import PusHandler _LOGGER = logging.getLogger() -class SatRsConfigHook(HookBase): - def __init__(self, json_cfg_path: str): - super().__init__(json_cfg_path=json_cfg_path) - - def get_communication_interface(self, com_if_key: str) -> Optional[ComInterface]: - from tmtccmd.config.com import ( - create_com_interface_default, - create_com_interface_cfg_default, - ) - - assert self.cfg_path is not None - packet_id_list = [] - for apid in Apid: - packet_id_list.append(PacketId(PacketType.TM, True, apid)) - cfg = create_com_interface_cfg_default( - com_if_key=com_if_key, - json_cfg_path=self.cfg_path, - space_packet_ids=packet_id_list, - ) - assert cfg is not None - return create_com_interface_default(cfg) - - def get_command_definitions(self) -> CmdTreeNode: - """This function should return the root node of the command definition tree.""" - return create_cmd_definition_tree() - - def get_cmd_history(self) -> Optional[History]: - """Optionlly return a history class for the past command paths which will be used - when prompting a command path from the user in CLI mode.""" - return FileHistory(".tmtc-history.txt") - - def get_object_ids(self) -> ObjectIdDictT: - from tmtccmd.config.objects import get_core_object_ids - - return get_core_object_ids() - - -class PusHandler(GenericApidHandlerBase): - def __init__( - self, - file_logger: logging.Logger, - verif_wrapper: VerificationWrapper, - raw_logger: RawTmtcTimedLogWrapper, - ): - super().__init__(None) - self.file_logger = file_logger - self.raw_logger = raw_logger - self.verif_wrapper = verif_wrapper - - def handle_tm(self, apid: int, packet: bytes, _user_args: Any): - try: - pus_tm = PusTelemetry.unpack( - packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE - ) - except ValueError as e: - _LOGGER.warning("Could not generate PUS TM object from raw data") - _LOGGER.warning(f"Raw Packet: [{packet.hex(sep=',')}], REPR: {packet!r}") - raise e - service = pus_tm.service - if service == 1: - tm_packet = Service1Tm.unpack( - data=packet, params=UnpackParams(CdsShortTimestamp.TIMESTAMP_SIZE, 1, 2) - ) - res = self.verif_wrapper.add_tm(tm_packet) - if res is None: - _LOGGER.info( - f"Received Verification TM[{tm_packet.service}, {tm_packet.subservice}] " - f"with Request ID {tm_packet.tc_req_id.as_u32():#08x}" - ) - _LOGGER.warning( - f"No matching telecommand found for {tm_packet.tc_req_id}" - ) - else: - self.verif_wrapper.log_to_console(tm_packet, res) - self.verif_wrapper.log_to_file(tm_packet, res) - elif service == 3: - _LOGGER.info("No handling for HK packets implemented") - _LOGGER.info(f"Raw packet: 0x[{packet.hex(sep=',')}]") - pus_tm = PusTelemetry.unpack( - packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE - ) - if pus_tm.subservice == 25: - if len(pus_tm.source_data) < 8: - raise ValueError("No addressable ID in HK packet") - json_str = pus_tm.source_data[8:] - _LOGGER.info(json_str) - elif service == 5: - tm_packet = PusTelemetry.unpack( - packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE - ) - src_data = tm_packet.source_data - event_u32 = EventU32.unpack(src_data) - _LOGGER.info( - f"Received event packet. Source APID: {Apid(tm_packet.apid)!r}, Event: {event_u32}" - ) - if event_u32.group_id == 0 and event_u32.unique_id == 0: - _LOGGER.info("Received test event") - elif service == 17: - tm_packet = Service17Tm.unpack( - packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE - ) - if tm_packet.subservice == 2: - self.file_logger.info("Received Ping Reply TM[17,2]") - _LOGGER.info("Received Ping Reply TM[17,2]") - else: - self.file_logger.info( - f"Received Test Packet with unknown subservice {tm_packet.subservice}" - ) - _LOGGER.info( - f"Received Test Packet with unknown subservice {tm_packet.subservice}" - ) - else: - _LOGGER.info( - f"The service {service} is not implemented in Telemetry Factory" - ) - tm_packet = PusTelemetry.unpack( - packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE - ) - self.raw_logger.log_tm(pus_tm) - - -class TcHandler(TcHandlerBase): - def __init__( - self, - seq_count_provider: FileSeqCountProvider, - verif_wrapper: VerificationWrapper, - ): - super(TcHandler, self).__init__() - self.seq_count_provider = seq_count_provider - self.verif_wrapper = verif_wrapper - self.queue_helper = DefaultPusQueueHelper( - queue_wrapper=QueueWrapper.empty(), - tc_sched_timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE, - seq_cnt_provider=seq_count_provider, - pus_verificator=self.verif_wrapper.pus_verificator, - default_pus_apid=None, - ) - - def send_cb(self, send_params: SendCbParams): - entry_helper = send_params.entry - if entry_helper.is_tc: - if entry_helper.entry_type == TcQueueEntryType.PUS_TC: - pus_tc_wrapper = entry_helper.to_pus_tc_entry() - raw_tc = pus_tc_wrapper.pus_tc.pack() - _LOGGER.info(f"Sending {pus_tc_wrapper.pus_tc}") - send_params.com_if.send(raw_tc) - elif entry_helper.entry_type == TcQueueEntryType.LOG: - log_entry = entry_helper.to_log_entry() - _LOGGER.info(log_entry.log_str) - - def queue_finished_cb(self, info: ProcedureWrapper): - if info.proc_type == TcProcedureType.TREE_COMMANDING: - def_proc = info.to_tree_commanding_procedure() - _LOGGER.info(f"Queue handling finished for command {def_proc.cmd_path}") - - def feed_cb(self, info: ProcedureWrapper, wrapper: FeedWrapper): - q = self.queue_helper - q.queue_wrapper = wrapper.queue_wrapper - if info.proc_type == TcProcedureType.TREE_COMMANDING: - def_proc = info.to_tree_commanding_procedure() - assert def_proc.cmd_path is not None - pack_pus_telecommands(q, def_proc.cmd_path) - - def main(): add_colorlog_console_logger(_LOGGER) tmtccmd.init_printout(False) - hook_obj = SatRsConfigHook(json_cfg_path=default_json_path()) + hook_obj = SatrsConfigHook(json_cfg_path=default_json_path()) parser_wrapper = PreArgsParsingWrapper() parser_wrapper.create_default_parent_parser() parser_wrapper.create_default_parser() diff --git a/satrs-example/pytmtc/pytmtc/config.py b/satrs-example/pytmtc/pytmtc/config.py new file mode 100644 index 0000000..6647769 --- /dev/null +++ b/satrs-example/pytmtc/pytmtc/config.py @@ -0,0 +1,47 @@ +from typing import Optional +from prompt_toolkit.history import FileHistory, History +from spacepackets.ccsds import PacketId, PacketType +from tmtccmd import HookBase +from tmtccmd.com import ComInterface +from tmtccmd.config import CmdTreeNode +from tmtccmd.util.obj_id import ObjectIdDictT + +from pytmtc.common import Apid +from pytmtc.pus_tc import create_cmd_definition_tree + + +class SatrsConfigHook(HookBase): + def __init__(self, json_cfg_path: str): + super().__init__(json_cfg_path=json_cfg_path) + + def get_communication_interface(self, com_if_key: str) -> Optional[ComInterface]: + from tmtccmd.config.com import ( + create_com_interface_default, + create_com_interface_cfg_default, + ) + + assert self.cfg_path is not None + packet_id_list = [] + for apid in Apid: + packet_id_list.append(PacketId(PacketType.TM, True, apid)) + cfg = create_com_interface_cfg_default( + com_if_key=com_if_key, + json_cfg_path=self.cfg_path, + space_packet_ids=packet_id_list, + ) + assert cfg is not None + return create_com_interface_default(cfg) + + def get_command_definitions(self) -> CmdTreeNode: + """This function should return the root node of the command definition tree.""" + return create_cmd_definition_tree() + + def get_cmd_history(self) -> Optional[History]: + """Optionlly return a history class for the past command paths which will be used + when prompting a command path from the user in CLI mode.""" + return FileHistory(".tmtc-history.txt") + + def get_object_ids(self) -> ObjectIdDictT: + from tmtccmd.config.objects import get_core_object_ids + + return get_core_object_ids() diff --git a/satrs-example/pytmtc/pytmtc/pus_tc.py b/satrs-example/pytmtc/pytmtc/pus_tc.py index d81dd28..486f4f1 100644 --- a/satrs-example/pytmtc/pytmtc/pus_tc.py +++ b/satrs-example/pytmtc/pytmtc/pus_tc.py @@ -4,9 +4,19 @@ import logging from spacepackets.ccsds import CdsShortTimestamp from spacepackets.ecss import PusTelecommand +from spacepackets.seqcount import FileSeqCountProvider +from tmtccmd import ProcedureWrapper, TcHandlerBase from tmtccmd.config import CmdTreeNode +from tmtccmd.pus import VerificationWrapper from tmtccmd.pus.tc.s200_fsfw_mode import Mode -from tmtccmd.tmtc import DefaultPusQueueHelper +from tmtccmd.tmtc import ( + DefaultPusQueueHelper, + FeedWrapper, + QueueWrapper, + SendCbParams, + TcProcedureType, + TcQueueEntryType, +) from tmtccmd.pus.s11_tc_sched import create_time_tagged_cmd from tmtccmd.pus.s200_fsfw_mode import Subservice as ModeSubservice @@ -15,6 +25,49 @@ from pytmtc.common import AcsId, Apid _LOGGER = logging.getLogger(__name__) +class TcHandler(TcHandlerBase): + def __init__( + self, + seq_count_provider: FileSeqCountProvider, + verif_wrapper: VerificationWrapper, + ): + super(TcHandler, self).__init__() + self.seq_count_provider = seq_count_provider + self.verif_wrapper = verif_wrapper + self.queue_helper = DefaultPusQueueHelper( + queue_wrapper=QueueWrapper.empty(), + tc_sched_timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE, + seq_cnt_provider=seq_count_provider, + pus_verificator=self.verif_wrapper.pus_verificator, + default_pus_apid=None, + ) + + def send_cb(self, send_params: SendCbParams): + entry_helper = send_params.entry + if entry_helper.is_tc: + if entry_helper.entry_type == TcQueueEntryType.PUS_TC: + pus_tc_wrapper = entry_helper.to_pus_tc_entry() + raw_tc = pus_tc_wrapper.pus_tc.pack() + _LOGGER.info(f"Sending {pus_tc_wrapper.pus_tc}") + send_params.com_if.send(raw_tc) + elif entry_helper.entry_type == TcQueueEntryType.LOG: + log_entry = entry_helper.to_log_entry() + _LOGGER.info(log_entry.log_str) + + def queue_finished_cb(self, info: ProcedureWrapper): + if info.proc_type == TcProcedureType.TREE_COMMANDING: + def_proc = info.to_tree_commanding_procedure() + _LOGGER.info(f"Queue handling finished for command {def_proc.cmd_path}") + + def feed_cb(self, info: ProcedureWrapper, wrapper: FeedWrapper): + q = self.queue_helper + q.queue_wrapper = wrapper.queue_wrapper + if info.proc_type == TcProcedureType.TREE_COMMANDING: + def_proc = info.to_tree_commanding_procedure() + assert def_proc.cmd_path is not None + pack_pus_telecommands(q, def_proc.cmd_path) + + def create_cmd_definition_tree() -> CmdTreeNode: root_node = CmdTreeNode.root_node() diff --git a/satrs-example/pytmtc/pytmtc/pus_tm.py b/satrs-example/pytmtc/pytmtc/pus_tm.py index e69de29..3d12a79 100644 --- a/satrs-example/pytmtc/pytmtc/pus_tm.py +++ b/satrs-example/pytmtc/pytmtc/pus_tm.py @@ -0,0 +1,98 @@ +import logging +from typing import Any +from spacepackets.ccsds.time import CdsShortTimestamp +from spacepackets.ecss import PusTm +from spacepackets.ecss.pus_17_test import Service17Tm +from spacepackets.ecss.pus_1_verification import Service1Tm, UnpackParams +from tmtccmd.logging.pus import RawTmtcTimedLogWrapper +from tmtccmd.pus import VerificationWrapper +from tmtccmd.tmtc import GenericApidHandlerBase + +from pytmtc.common import Apid, EventU32 + + +_LOGGER = logging.getLogger(__name__) + + +class PusHandler(GenericApidHandlerBase): + def __init__( + self, + file_logger: logging.Logger, + verif_wrapper: VerificationWrapper, + raw_logger: RawTmtcTimedLogWrapper, + ): + super().__init__(None) + self.file_logger = file_logger + self.raw_logger = raw_logger + self.verif_wrapper = verif_wrapper + + def handle_tm(self, apid: int, packet: bytes, _user_args: Any): + try: + pus_tm = PusTm.unpack( + packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE + ) + except ValueError as e: + _LOGGER.warning("Could not generate PUS TM object from raw data") + _LOGGER.warning(f"Raw Packet: [{packet.hex(sep=',')}], REPR: {packet!r}") + raise e + service = pus_tm.service + if service == 1: + tm_packet = Service1Tm.unpack( + data=packet, params=UnpackParams(CdsShortTimestamp.TIMESTAMP_SIZE, 1, 2) + ) + res = self.verif_wrapper.add_tm(tm_packet) + if res is None: + _LOGGER.info( + f"Received Verification TM[{tm_packet.service}, {tm_packet.subservice}] " + f"with Request ID {tm_packet.tc_req_id.as_u32():#08x}" + ) + _LOGGER.warning( + f"No matching telecommand found for {tm_packet.tc_req_id}" + ) + else: + self.verif_wrapper.log_to_console(tm_packet, res) + self.verif_wrapper.log_to_file(tm_packet, res) + elif service == 3: + _LOGGER.info("No handling for HK packets implemented") + _LOGGER.info(f"Raw packet: 0x[{packet.hex(sep=',')}]") + pus_tm = PusTm.unpack( + packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE + ) + if pus_tm.subservice == 25: + if len(pus_tm.source_data) < 8: + raise ValueError("No addressable ID in HK packet") + json_str = pus_tm.source_data[8:] + _LOGGER.info(json_str) + elif service == 5: + tm_packet = PusTm.unpack( + packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE + ) + src_data = tm_packet.source_data + event_u32 = EventU32.unpack(src_data) + _LOGGER.info( + f"Received event packet. Source APID: {Apid(tm_packet.apid)!r}, Event: {event_u32}" + ) + if event_u32.group_id == 0 and event_u32.unique_id == 0: + _LOGGER.info("Received test event") + elif service == 17: + tm_packet = Service17Tm.unpack( + packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE + ) + if tm_packet.subservice == 2: + self.file_logger.info("Received Ping Reply TM[17,2]") + _LOGGER.info("Received Ping Reply TM[17,2]") + else: + self.file_logger.info( + f"Received Test Packet with unknown subservice {tm_packet.subservice}" + ) + _LOGGER.info( + f"Received Test Packet with unknown subservice {tm_packet.subservice}" + ) + else: + _LOGGER.info( + f"The service {service} is not implemented in Telemetry Factory" + ) + tm_packet = PusTm.unpack( + packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE + ) + self.raw_logger.log_tm(pus_tm) diff --git a/satrs-example/pytmtc/tests/__init__.py b/satrs-example/pytmtc/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/satrs-example/pytmtc/tests/test_tc_mods.py b/satrs-example/pytmtc/tests/test_tc_mods.py new file mode 100644 index 0000000..0b56bde --- /dev/null +++ b/satrs-example/pytmtc/tests/test_tc_mods.py @@ -0,0 +1,48 @@ +from unittest import TestCase + +from spacepackets.ccsds import CdsShortTimestamp +from tmtccmd.tmtc import DefaultPusQueueHelper, QueueEntryHelper +from tmtccmd.tmtc.queue import QueueWrapper + +from pytmtc.config import SatrsConfigHook +from pytmtc.pus_tc import pack_pus_telecommands + + +class TestTcModules(TestCase): + def setUp(self): + self.hook = SatrsConfigHook(json_cfg_path="tmtc_conf.json") + self.queue_helper = DefaultPusQueueHelper( + queue_wrapper=QueueWrapper.empty(), + tc_sched_timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE, + seq_cnt_provider=None, + pus_verificator=None, + default_pus_apid=None, + ) + + def test_cmd_tree_creation_works_without_errors(self): + cmd_defs = self.hook.get_command_definitions() + self.assertIsNotNone(cmd_defs) + + def test_ping_cmd_generation(self): + pack_pus_telecommands(self.queue_helper, "/test/ping") + queue_entry = self.queue_helper.queue_wrapper.queue.popleft() + entry_helper = QueueEntryHelper(queue_entry) + log_queue = entry_helper.to_log_entry() + self.assertEqual(log_queue.log_str, "Sending PUS ping telecommand") + queue_entry = self.queue_helper.queue_wrapper.queue.popleft() + entry_helper.entry = queue_entry + pus_tc_entry = entry_helper.to_pus_tc_entry() + self.assertEqual(pus_tc_entry.pus_tc.service, 17) + self.assertEqual(pus_tc_entry.pus_tc.subservice, 1) + + def test_event_trigger_generation(self): + pack_pus_telecommands(self.queue_helper, "/test/trigger_event") + queue_entry = self.queue_helper.queue_wrapper.queue.popleft() + entry_helper = QueueEntryHelper(queue_entry) + log_queue = entry_helper.to_log_entry() + self.assertEqual(log_queue.log_str, "Triggering test event") + queue_entry = self.queue_helper.queue_wrapper.queue.popleft() + entry_helper.entry = queue_entry + pus_tc_entry = entry_helper.to_pus_tc_entry() + self.assertEqual(pus_tc_entry.pus_tc.service, 17) + self.assertEqual(pus_tc_entry.pus_tc.subservice, 128)