diff --git a/common.py b/common.py index 54418d6..005d950 100644 --- a/common.py +++ b/common.py @@ -1,63 +1,191 @@ -import argparse +import logging import sys -import traceback -from typing import Optional + +from tmtccmd.com_if import ComInterface +from tmtccmd.logging import get_current_time_string try: import spacepackets except ImportError as error: print(error) - print("Python spacepackets module could not be imported") - print( - 'Install with "cd spacepackets && python3 -m pip intall -e ." for interative installation' - ) + print("Python tmtccmd module could not be imported. Make sure it is installed") sys.exit(1) try: - import tmtccmd.runner as tmtccmd - from tmtccmd.logging.pus import create_tmtc_logger - from tmtccmd.ccsds.handler import ApidHandler, CcsdsTmHandler - from tmtccmd.config import SetupArgs, default_json_path - from tmtccmd.config.args import ( - create_default_args_parser, - add_default_tmtccmd_args, - parse_default_input_arguments, - ) + import tmtccmd except ImportError as error: - run_tmtc_commander = None - initialize_tmtc_commander = None - tb = traceback.format_exc() - print(tb) - print("Python tmtccmd submodule could not be imported") + print(error) + print("Python tmtccmd module could not be imported. Make sure it is installed") sys.exit(1) +from spacepackets.ecss import PusVerificator + +from common_tmtc.pus_tc.pus_11_tc_sched import pack_service_11_commands +from common_tmtc.pus_tc.pus_17_test import pack_service_17_commands +from common_tmtc.pus_tc.pus_200_mode import pack_service_200_commands_into +from common_tmtc.pus_tc.service_20_parameters import pack_service20_commands_into +from common_tmtc.pus_tc.service_2_raw_cmd import pack_service_2_commands_into +from common_tmtc.pus_tc.service_3_housekeeping import pack_service_3_commands_into +from common_tmtc.pus_tc.service_8_func_cmd import pack_service_8_commands_into +from examples.tmtcc import EXAMPLE_APID +from tmtccmd import TcHandlerBase, get_console_logger, TmTcCfgHookBase, BackendBase +from tmtccmd.pus import VerificationWrapper, FileSeqCountProvider +from tmtccmd.tc import ( + ProcedureHelper, + FeedWrapper, + TcProcedureType, + QueueEntryHelper, + TcQueueEntryType, +) +from tmtccmd.tc.pus_5_event import pack_generic_service_5_test_into +from tmtccmd.tm import SpecificApidHandlerBase, CcsdsTmHandler +from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter +from tmtccmd.logging.pus import RawTmtcTimedLogWrapper +from tmtccmd.config import CoreServiceList, SetupWrapper, SetupParams, ArgParserWrapper from common_tmtc.config import __version__ -from common_tmtc.config.definitions import PUS_APID -from common_tmtc.config.hook_implementation import CommonFsfwHookBase -from common_tmtc.pus_tm.factory_hook import ccsds_tm_handler -from common_tmtc.pus_tc.tc_packing import pre_tc_send_cb +from common_tmtc.pus_tm.factory_hook import pus_factory_hook -def tmtcc_pre_args(): - print(f"-- eive tmtc v{__version__} --") - print(f"-- spacepackets v{spacepackets.__version__} --") +LOGGER = get_console_logger() -def tmtcc_post_args( - hook_obj: CommonFsfwHookBase, use_gui: bool, args: Optional[argparse.Namespace] -): - setup_args = SetupArgs( - hook_obj=hook_obj, use_gui=use_gui, apid=PUS_APID, cli_args=args +class PusHandler(SpecificApidHandlerBase): + def __init__( + self, + wrapper: VerificationWrapper, + printer: FsfwTmTcPrinter, + raw_logger: RawTmtcTimedLogWrapper, + ): + super().__init__(EXAMPLE_APID, None) + self.printer = printer + self.verif_wrapper = wrapper + self.raw_logger = raw_logger + + def handle_tm(self, packet: bytes, _user_args: any): + pus_factory_hook( + packet=packet, + wrapper=self.verif_wrapper, + raw_logger=self.raw_logger, + printer=self.printer, + ) + + +class TcHandler(TcHandlerBase): + def __init__( + self, + seq_count_provider: FileSeqCountProvider, + pus_verificator: PusVerificator, + file_logger: logging.Logger, + raw_logger: RawTmtcTimedLogWrapper, + ): + super().__init__() + self.seq_count_provider = seq_count_provider + self.pus_verificator = pus_verificator + self.file_logger = file_logger + self.raw_logger = raw_logger + + def feed_cb(self, info: ProcedureHelper, wrapper: FeedWrapper): + if info.proc_type == TcProcedureType.DEFAULT: + def_proc = info.to_def_procedure() + service = def_proc.service + op_code = def_proc.op_code + if service == CoreServiceList.SERVICE_2.value: + return pack_service_2_commands_into( + op_code=op_code, q=wrapper.queue_helper + ) + if service == CoreServiceList.SERVICE_3.value: + return pack_service_3_commands_into( + op_code=op_code, q=wrapper.queue_helper + ) + if service == CoreServiceList.SERVICE_5.value: + return pack_generic_service_5_test_into(q=wrapper.queue_helper) + if service == CoreServiceList.SERVICE_8.value: + return pack_service_8_commands_into( + op_code=op_code, q=wrapper.queue_helper + ) + if service == CoreServiceList.SERVICE_11.value: + return pack_service_11_commands(op_code=op_code, q=wrapper.queue_helper) + if service == CoreServiceList.SERVICE_17.value: + return pack_service_17_commands(op_code=op_code, q=wrapper.queue_helper) + if service == CoreServiceList.SERVICE_20.value: + return pack_service20_commands_into( + q=wrapper.queue_helper, op_code=op_code + ) + if service == CoreServiceList.SERVICE_200.value: + return pack_service_200_commands_into( + q=wrapper.queue_helper, op_code=op_code + ) + LOGGER.warning("Invalid Service !") + + def send_cb(self, entry_helper: QueueEntryHelper, com_if: ComInterface): + if entry_helper.is_tc: + if entry_helper.entry_type == TcQueueEntryType.PUS_TC: + pus_tc_wrapper = entry_helper.to_pus_tc_entry() + pus_tc_wrapper.pus_tc.seq_count = ( + self.seq_count_provider.next_seq_count() + ) + pus_tc_wrapper.pus_tc.apid = EXAMPLE_APID + # Add TC after Sequence Count stamping + self.pus_verificator.add_tc(pus_tc_wrapper.pus_tc) + raw_tc = pus_tc_wrapper.pus_tc.pack() + self.raw_logger.log_tc(pus_tc_wrapper.pus_tc) + tc_info_string = f"Sent {pus_tc_wrapper.pus_tc}" + LOGGER.info(tc_info_string) + self.file_logger.info( + f"{get_current_time_string(True)}: {tc_info_string}" + ) + com_if.send(raw_tc) + + def queue_finished_cb(self, info: ProcedureHelper): + if info is not None and info.proc_type == TcQueueEntryType.PUS_TC: + def_proc = info.to_def_procedure() + LOGGER.info( + f"Finished queue for service {def_proc.service} and op code {def_proc.op_code}" + ) + + +def setup_params(hook_obj: TmTcCfgHookBase) -> SetupWrapper: + print(f"-- eive TMTC Commander --") + print( + f"-- tmtccmd v{tmtccmd.__version__} spacepackets v{spacepackets.__version__} --" ) - tmtc_file_logger = create_tmtc_logger() - apid_handler = ApidHandler(cb=ccsds_tm_handler, queue_len=50, user_args=None) - ccsds_handler = CcsdsTmHandler() - ccsds_handler.add_tm_handler(apid=PUS_APID, handler=apid_handler) - tmtccmd.setup(setup_args=setup_args) - tmtccmd.add_ccsds_handler(ccsds_handler) + params = SetupParams() + parser_wrapper = ArgParserWrapper(hook_obj) + parser_wrapper.parse() + tmtccmd.init_printout(parser_wrapper.use_gui) + parser_wrapper.set_params(params) + params.apid = EXAMPLE_APID + setup_wrapper = SetupWrapper(hook_obj=hook_obj, setup_params=params) + return setup_wrapper + + +def setup_tmtc_handlers( + verif_wrapper: VerificationWrapper, + printer: FsfwTmTcPrinter, + raw_logger: RawTmtcTimedLogWrapper, +) -> (CcsdsTmHandler, TcHandler): + + pus_handler = PusHandler( + printer=printer, raw_logger=raw_logger, wrapper=verif_wrapper + ) + ccsds_handler = CcsdsTmHandler(None) + ccsds_handler.add_apid_handler(pus_handler) + tc_handler = TcHandler( + file_logger=printer.file_logger, + raw_logger=raw_logger, + pus_verificator=verif_wrapper.pus_verificator, + seq_count_provider=FileSeqCountProvider(), + ) + return ccsds_handler, tc_handler + + +def setup_backend( + setup_wrapper: SetupWrapper, + tc_handler: TcHandler, + ccsds_handler: CcsdsTmHandler, +) -> BackendBase: tmtc_backend = tmtccmd.create_default_tmtc_backend( - setup_args=setup_args, - tm_handler=ccsds_handler, + setup_wrapper=setup_wrapper, tm_handler=ccsds_handler, tc_handler=tc_handler ) - tmtc_backend.usr_send_wrapper = (pre_tc_send_cb, tmtc_file_logger) - tmtccmd.run(tmtc_backend=tmtc_backend) + tmtccmd.start(tmtc_backend=tmtc_backend, hook_obj=setup_wrapper.hook_obj) + return tmtc_backend diff --git a/config/hook_implementation.py b/config/hook_implementation.py index 1e687cd..7cec62d 100644 --- a/config/hook_implementation.py +++ b/config/hook_implementation.py @@ -1,46 +1,27 @@ -from typing import Optional, Union +from abc import abstractmethod +from typing import Optional -from tmtccmd.com_if.com_interface_base import CommunicationInterface -from tmtccmd.config.definitions import ServiceOpCodeDictT -from tmtccmd.config.hook import TmTcHookBase -from tmtccmd.core.backend import TmTcHandler -from tmtccmd.tc.definitions import TcQueueT -from tmtccmd.utility.obj_id import ObjectIdDictT - -from tmtccmd.utility.retval import RetvalDictT - -from common_tmtc.config.definitions import TM_SP_IDS -from common_tmtc.pus_tc.cmd_definitions import common_fsfw_service_op_code_dict +from tmtccmd import BackendBase +from tmtccmd.com_if import ComInterface +from tmtccmd.config import TmTcCfgHookBase, TmTcDefWrapper, default_json_path +from tmtccmd.utility import ObjectIdDictT, RetvalDictT -class CommonFsfwHookBase(TmTcHookBase): - def pack_service_queue( - self, service: Union[int, str], op_code: str, service_queue: TcQueueT - ): - from common_tmtc.pus_tc.tc_packing import common_service_queue_user - - common_service_queue_user( - service=service, op_code=op_code, tc_queue=service_queue - ) - - def __init__(self, json_cfg_path: str): +class CommonFsfwHookBase(TmTcCfgHookBase): + def __init__(self, json_cfg_path: Optional[str] = None): + if json_cfg_path is None: + json_cfg_path = default_json_path() super().__init__(json_cfg_path=json_cfg_path) - def get_service_op_code_dictionary(self) -> ServiceOpCodeDictT: - return common_fsfw_service_op_code_dict() + @abstractmethod + def get_tmtc_definitions(self) -> TmTcDefWrapper: + pass - def assign_communication_interface( - self, com_if_key: str - ) -> Optional[CommunicationInterface]: - from tmtccmd.config.com_if import create_communication_interface_default + @abstractmethod + def assign_communication_interface(self, com_if_key: str) -> Optional[ComInterface]: + pass - return create_communication_interface_default( - com_if_key=com_if_key, - json_cfg_path=self.json_cfg_path, - space_packet_ids=TM_SP_IDS, - ) - - def perform_mode_operation(self, tmtc_backend: TmTcHandler, mode: int): + def perform_mode_operation(self, tmtc_backend: BackendBase, mode: int): print("No custom mode operation implemented") def get_object_ids(self) -> ObjectIdDictT: diff --git a/pus_tc/cmd_definitions.py b/pus_tc/cmd_definitions.py index 057b71d..5ecb282 100644 --- a/pus_tc/cmd_definitions.py +++ b/pus_tc/cmd_definitions.py @@ -1,32 +1,22 @@ from tmtccmd.config import ( - ServiceOpCodeDictT, - add_op_code_entry, - add_service_op_code_entry, - generate_op_code_options, - OpCodeDictKeys, + TmTcDefWrapper, ) -from tmtccmd.config.definitions import OpCodeOptionsT -from tmtccmd.config.globals import get_default_service_op_code_dict -from common_tmtc.pus_tc.pus_11_tc_sched import OpCodes, add_tc_sched_cmds +from common_tmtc.pus_tc.pus_11_tc_sched import add_tc_sched_cmds +from tmtccmd.config.globals import get_default_tmtc_defs +from tmtccmd.config.tmtc import OpCodeEntry -def common_fsfw_service_op_code_dict() -> ServiceOpCodeDictT: - service_op_code_dict = get_default_service_op_code_dict() - op_code = dict() - add_op_code_entry(op_code_dict=op_code, keys="test", info="Mode CMD Test") - add_op_code_entry( - op_code_dict=op_code, - keys=["0", "asm_to_normal"], - info="Command test assembly to normal mode", - options={OpCodeDictKeys.TIMEOUT: 6.0}, +def common_fsfw_service_op_code_dict() -> TmTcDefWrapper: + def_wrapper = get_default_tmtc_defs() + op_code_entry = OpCodeEntry() + op_code_entry.add(keys="test", info="Mode CMD Test") + op_code_entry.add( + keys=["0", "asm_to_normal"], info="Command test assembly to normal mode" ) - add_service_op_code_entry( - srv_op_code_dict=service_op_code_dict, - name="200", - info="PUS Service 200 Mode MGMT", - op_code_entry=op_code, + def_wrapper.add_service( + "200", info="PUS Service 200 Mode MGMT", op_code_entry=op_code_entry ) - add_tc_sched_cmds(service_op_code_dict) + add_tc_sched_cmds(def_wrapper) - return service_op_code_dict + return def_wrapper diff --git a/pus_tc/pus_11_tc_sched.py b/pus_tc/pus_11_tc_sched.py index 841d55d..3fa31ac 100644 --- a/pus_tc/pus_11_tc_sched.py +++ b/pus_tc/pus_11_tc_sched.py @@ -1,24 +1,20 @@ import struct import time +from datetime import timedelta from spacepackets.ecss.tc import PusTelecommand -from tmtccmd.config import ( - QueueCommands, - ServiceOpCodeDictT, - add_op_code_entry, - generate_op_code_options, - add_service_op_code_entry, -) +from tmtccmd.config import TmTcDefWrapper +from tmtccmd.config.tmtc import OpCodeEntry from tmtccmd.pus.pus_17_test import pack_service_17_ping_command from tmtccmd.pus.pus_11_tc_sched import TypeOfTimeWindow, Subservices, TcSchedReqId +from tmtccmd.tc import QueueHelper from tmtccmd.tc.pus_11_tc_sched import ( generate_enable_tc_sched_cmd, generate_disable_tc_sched_cmd, generate_reset_tc_sched_cmd, pack_time_tagged_tc_app_data, ) -from tmtccmd.tc.definitions import TcQueueT class OpCodes: @@ -40,106 +36,80 @@ class Info: TEST_RESET = "Test TC scheduling reset command" -def add_tc_sched_cmds(cmd_dict: ServiceOpCodeDictT): - op_code = dict() - add_op_code_entry( - op_code_dict=op_code, - keys=OpCodes.ENABLE, - info=Info.ENABLE, - options=generate_op_code_options(custom_timeout=2.0), - ) - add_op_code_entry( - op_code_dict=op_code, - keys=OpCodes.DISABLE, - info=Info.DISABLE, - options=generate_op_code_options(custom_timeout=2.0), - ) - add_op_code_entry(op_code_dict=op_code, keys=OpCodes.RESET, info=Info.RESET) - add_op_code_entry( - op_code_dict=op_code, - keys=OpCodes.TEST_INSERT, - info=Info.TEST_INSERT, - options=generate_op_code_options(custom_timeout=0.2), - ) - add_op_code_entry( - op_code_dict=op_code, - keys=OpCodes.TEST_DELETE, - info=Info.TEST_DELETE, - options=generate_op_code_options(custom_timeout=0.2), - ) - add_op_code_entry( - op_code_dict=op_code, - keys=OpCodes.TEST_RESET, - info=Info.TEST_RESET, - options=generate_op_code_options(custom_timeout=0.2), - ) - add_service_op_code_entry( - srv_op_code_dict=cmd_dict, - name="11", - info="PUS Service 11 TC Scheduling", - op_code_entry=op_code, +def add_tc_sched_cmds(cmd_dict: TmTcDefWrapper): + op_code_entry = OpCodeEntry() + op_code_entry.add(keys=OpCodes.ENABLE, info=Info.ENABLE) + op_code_entry.add(keys=OpCodes.DISABLE, info=Info.DISABLE) + op_code_entry.add(keys=OpCodes.RESET, info=Info.RESET) + op_code_entry.add(keys=OpCodes.TEST_INSERT, info=Info.TEST_INSERT) + op_code_entry.add(keys=OpCodes.TEST_DELETE, info=Info.TEST_DELETE) + op_code_entry.add(keys=OpCodes.TEST_RESET, info=Info.TEST_RESET) + cmd_dict.add_service( + "11", info="PUS Service 11 TC Scheduling", op_code_entry=op_code_entry ) -def __generic_pack_three_time_tagged_cmds(tc_queue: TcQueueT): - tc_queue.appendleft((QueueCommands.PRINT, "Testing Time-Tagged Command insertion")) - tc_queue.appendleft(generate_enable_tc_sched_cmd(ssc=0).pack_command_tuple()) +def __generic_pack_three_time_tagged_cmds(q: QueueHelper): + q.add_log_cmd("Testing Time-Tagged Command insertion") + q.add_pus_tc(generate_enable_tc_sched_cmd()) current_time = int(round(time.time())) # these TC[17,1] (ping commands) shall be inserted ping_tcs = [ - pack_service_17_ping_command(1701), - pack_service_17_ping_command(1702), - pack_service_17_ping_command(1703), + pack_service_17_ping_command(), + pack_service_17_ping_command(), + pack_service_17_ping_command(), ] for idx, tc in enumerate(ping_tcs): release_time = current_time + (idx + 2) * 5 - time_tagged_tc = PusTelecommand( - service=11, - subservice=Subservices.TC_INSERT, - app_data=pack_time_tagged_tc_app_data(struct.pack("!I", release_time), tc), + q.add_pus_tc( + PusTelecommand( + service=11, + subservice=Subservices.TC_INSERT, + app_data=pack_time_tagged_tc_app_data( + struct.pack("!I", release_time), tc + ), + ) ) - tc_queue.appendleft(time_tagged_tc.pack_command_tuple()) - tc_queue.appendleft((QueueCommands.WAIT, 25)) + q.add_wait(timedelta(seconds=25.0)) -def pack_service_11_commands(op_code: str, tc_queue: TcQueueT): +def pack_service_11_commands(op_code: str, q: QueueHelper): if op_code in OpCodes.TEST_INSERT: - tc_queue.appendleft( - (QueueCommands.PRINT, "Testing Time-Tagged Command deletion") - ) - __generic_pack_three_time_tagged_cmds(tc_queue=tc_queue) + q.add_log_cmd("Testing Time-Tagged Command deletion") + __generic_pack_three_time_tagged_cmds(q=q) if op_code in OpCodes.TEST_DELETE: current_time = int(round(time.time())) - tc_to_schedule = pack_service_17_ping_command(1703) - time_tagged_tc = PusTelecommand( - service=11, - subservice=Subservices.TC_INSERT, - app_data=pack_time_tagged_tc_app_data( - struct.pack("!I", current_time + 20), tc_to_schedule - ), + tc_to_schedule = pack_service_17_ping_command() + q.add_pus_tc( + PusTelecommand( + service=11, + subservice=Subservices.TC_INSERT, + app_data=pack_time_tagged_tc_app_data( + struct.pack("!I", current_time + 20), tc_to_schedule + ), + ) ) - tc_queue.appendleft(time_tagged_tc.pack_command_tuple()) - del_time_tagged_tcs = PusTelecommand( - service=11, - subservice=Subservices.TC_DELETE, - app_data=pack_delete_corresponding_tc_app_data(tc=tc_to_schedule), - ssc=1105, + q.add_pus_tc( + PusTelecommand( + service=11, + subservice=Subservices.TC_DELETE, + app_data=pack_delete_corresponding_tc_app_data(tc=tc_to_schedule), + ) ) - tc_queue.appendleft(del_time_tagged_tcs.pack_command_tuple()) if op_code in OpCodes.ENABLE: - tc_queue.appendleft((QueueCommands.PRINT, "Enabling TC scheduler")) - tc_queue.appendleft(generate_enable_tc_sched_cmd(ssc=0).pack_command_tuple()) + q.add_log_cmd("Enabling TC scheduler") + q.add_pus_tc(generate_enable_tc_sched_cmd()) if op_code in OpCodes.DISABLE: - tc_queue.appendleft((QueueCommands.PRINT, "Disabling TC scheduler")) - tc_queue.appendleft(generate_disable_tc_sched_cmd(ssc=0).pack_command_tuple()) + q.add_log_cmd("Disabling TC scheduler") + q.add_pus_tc(generate_disable_tc_sched_cmd()) if op_code in OpCodes.RESET: - tc_queue.appendleft((QueueCommands.PRINT, "Reset TC scheduler")) - tc_queue.appendleft(generate_reset_tc_sched_cmd(ssc=0).pack_command_tuple()) + q.add_log_cmd("Reset TC scheduler") + q.add_pus_tc(generate_reset_tc_sched_cmd()) if op_code in OpCodes.TEST_RESET: - tc_queue.appendleft((QueueCommands.PRINT, "Testing Reset command")) - __generic_pack_three_time_tagged_cmds(tc_queue=tc_queue) - tc_queue.appendleft(generate_reset_tc_sched_cmd(ssc=0).pack_command_tuple()) + q.add_log_cmd("Testing Reset command") + __generic_pack_three_time_tagged_cmds(q) + q.add_pus_tc(generate_reset_tc_sched_cmd()) # a TC[11,5] for 3rd inserted ping TC # TODO: This should be an independent test # a TC[11,6] for some other previously inserted TCs @@ -177,7 +147,7 @@ def build_filter_delete_tc( for timestamp in timestamps: app_data.extend(struct.pack("!I", timestamp)) - return PusTelecommand(service=11, subservice=6, app_data=app_data, ssc=1161) + return PusTelecommand(service=11, subservice=6, app_data=app_data) def pack_corresponding_timeshift_app_data( @@ -201,7 +171,7 @@ def build_filter_timeshift_tc( for timestamp in timestamps: app_data.extend(struct.pack("!I", timestamp)) - return PusTelecommand(service=11, subservice=8, app_data=app_data, ssc=1181) + return PusTelecommand(service=11, subservice=8, app_data=app_data) # waits for a specified amount of seconds and prints ". . ." for each second diff --git a/pus_tc/pus_17_test.py b/pus_tc/pus_17_test.py index be190c7..2ea63ae 100644 --- a/pus_tc/pus_17_test.py +++ b/pus_tc/pus_17_test.py @@ -1,14 +1,12 @@ -from tmtccmd.tc.definitions import TcQueueT from tmtccmd.pus.pus_17_test import ( pack_service_17_ping_command, pack_generic_service17_test, ) +from tmtccmd.tc import QueueHelper -def pack_service_17_commands(op_code: str, init_ssc: int, tc_queue: TcQueueT): +def pack_service_17_commands(op_code: str, q: QueueHelper): if op_code == "0": - tc_queue.appendleft( - pack_service_17_ping_command(ssc=init_ssc).pack_command_tuple() - ) + q.add_pus_tc(pack_service_17_ping_command()) else: - pack_generic_service17_test(tc_queue=tc_queue, init_ssc=init_ssc) + pack_generic_service17_test(q=q) diff --git a/pus_tc/pus_200_mode.py b/pus_tc/pus_200_mode.py index 2b3e9f5..89e243e 100644 --- a/pus_tc/pus_200_mode.py +++ b/pus_tc/pus_200_mode.py @@ -1,54 +1,40 @@ # -*- coding: utf-8 -*- from spacepackets.ecss.tc import PusTelecommand -from tmtccmd.config.definitions import QueueCommands -from tmtccmd.tc.packer import TcQueueT +from tmtccmd.tc import QueueHelper from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes from common_tmtc.config.object_ids import TEST_DEVICE_0_ID, ASSEMBLY_ID -def pack_service_200_commands_into(tc_queue: TcQueueT, op_code: str): +def pack_service_200_commands_into(q: QueueHelper, op_code: str): if op_code == "test": - pack_service_200_test_into(tc_queue=tc_queue, init_ssc=2000) + pack_service_200_test_into(q) elif op_code == "asm_to_normal" or op_code == "0": # Set Normal mode - tc_queue.appendleft( - (QueueCommands.PRINT, "Testing Service 200: Set Mode Normal") - ) + q.add_log_cmd("Testing Service 200: Set Mode Normal") # Command assembly to normal, submode 1 for dual mode, mode_data = pack_mode_data(ASSEMBLY_ID, Modes.NORMAL, 1) - command = PusTelecommand(service=200, subservice=1, ssc=0, app_data=mode_data) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data)) -def pack_service_200_test_into(init_ssc: int, tc_queue: TcQueueT) -> int: - new_ssc = init_ssc - tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200")) +def pack_service_200_test_into(q: QueueHelper): + q.add_log_cmd("Testing Service 200") # Object ID: DUMMY Device object_id = TEST_DEVICE_0_ID # Set On Mode - tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode On")) + q.add_log_cmd("Testing Service 200: Set Mode On") mode_data = pack_mode_data(object_id, Modes.ON, 0) - command = PusTelecommand(service=200, subservice=1, ssc=new_ssc, app_data=mode_data) - tc_queue.appendleft(command.pack_command_tuple()) - new_ssc += 1 + q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data)) # Set Normal mode - tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode Normal")) + q.add_log_cmd("Testing Service 200: Set Mode Normal") mode_data = pack_mode_data(object_id, Modes.NORMAL, 0) - command = PusTelecommand(service=200, subservice=1, ssc=new_ssc, app_data=mode_data) - tc_queue.appendleft(command.pack_command_tuple()) - new_ssc += 1 + q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data)) # Set Raw Mode - tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode Raw")) + q.add_log_cmd("Testing Service 200: Set Mode Raw") mode_data = pack_mode_data(object_id, Modes.RAW, 0) - command = PusTelecommand(service=200, subservice=1, ssc=new_ssc, app_data=mode_data) - tc_queue.appendleft(command.pack_command_tuple()) - new_ssc += 1 + q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data)) # Set Off Mode - tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode Off")) + q.add_log_cmd("Testing Service 200: Set Mode Off") mode_data = pack_mode_data(object_id, Modes.OFF, 0) - command = PusTelecommand(service=200, subservice=1, ssc=new_ssc, app_data=mode_data) - tc_queue.appendleft(command.pack_command_tuple()) - new_ssc += 1 - return new_ssc + q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data)) diff --git a/pus_tc/service_20_parameters.py b/pus_tc/service_20_parameters.py index 1e2faf1..9f3dac8 100644 --- a/pus_tc/service_20_parameters.py +++ b/pus_tc/service_20_parameters.py @@ -2,8 +2,7 @@ import struct from spacepackets.ecss.tc import PusTelecommand -from tmtccmd.config.definitions import QueueCommands -from tmtccmd.tc.definitions import TcQueueT +from tmtccmd.tc import QueueHelper from tmtccmd.tc.pus_20_params import pack_type_and_matrix_data, pack_parameter_id from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes from tmtccmd.logging import get_console_logger @@ -13,73 +12,67 @@ from common_tmtc.config.object_ids import TEST_DEVICE_0_ID LOGGER = get_console_logger() -def pack_service20_commands_into(tc_queue: TcQueueT, op_code: str): +def pack_service20_commands_into(q: QueueHelper, op_code: str): if op_code == "0": - pack_service20_test_into(tc_queue=tc_queue) + pack_service20_test_into(q=q) -def pack_service20_test_into(tc_queue: TcQueueT, called_externally: bool = False): +def pack_service20_test_into(q: QueueHelper, called_externally: bool = False): if called_externally is False: - tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20")) + q.add_log_cmd("Testing Service 20") object_id = TEST_DEVICE_0_ID # set mode normal - tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20: Set Normal Mode")) + q.add_log_cmd("Testing Service 20: Set Normal Mode") mode_data = pack_mode_data(object_id, Modes.NORMAL, 0) - command = PusTelecommand(service=200, subservice=1, ssc=2000, app_data=mode_data) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data)) - load_param_0_simple_test_commands(tc_queue=tc_queue) - load_param_1_simple_test_commands(tc_queue=tc_queue) - load_param_2_simple_test_commands(tc_queue=tc_queue) + load_param_0_simple_test_commands(q) + load_param_1_simple_test_commands(q) + load_param_2_simple_test_commands(q) -def load_param_0_simple_test_commands(tc_queue: TcQueueT): +def load_param_0_simple_test_commands(q: QueueHelper): object_id = TEST_DEVICE_0_ID parameter_id_0 = pack_parameter_id(domain_id=0, unique_id=0, linear_index=0) # test checking Load for uint32_t - tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20: Load uint32_t")) + q.add_log_cmd("Testing Service 20: Load uint32_t") type_and_matrix_data = pack_type_and_matrix_data(3, 14, 1, 1) parameter_data = struct.pack("!I", 42) payload = object_id + parameter_id_0 + type_and_matrix_data + parameter_data - command = PusTelecommand(service=20, subservice=128, ssc=2010, app_data=payload) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_pus_tc(PusTelecommand(service=20, subservice=128, app_data=payload)) # test checking Dump for uint32_t - tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20: Dump uint32_t")) + q.add_log_cmd("Testing Service 20: Dump uint32_t") payload = object_id + parameter_id_0 - command = PusTelecommand(service=20, subservice=129, ssc=2020, app_data=payload) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_pus_tc(PusTelecommand(service=20, subservice=129, app_data=payload)) -def load_param_1_simple_test_commands(tc_queue: TcQueueT): +def load_param_1_simple_test_commands(q: QueueHelper): object_id = TEST_DEVICE_0_ID parameter_id_1 = pack_parameter_id(domain_id=0, unique_id=1, linear_index=0) # test checking Load for int32_t - tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20: Load int32_t")) + q.add_log_cmd("Testing Service 20: Load int32_t") type_and_matrix_data = pack_type_and_matrix_data(4, 14, 1, 1) parameter_data = struct.pack("!i", -42) payload = object_id + parameter_id_1 + type_and_matrix_data + parameter_data - command = PusTelecommand(service=20, subservice=128, ssc=2030, app_data=payload) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_pus_tc(PusTelecommand(service=20, subservice=128, app_data=payload)) # test checking Dump for int32_t - tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20: Dump int32_t")) + q.add_log_cmd("Testing Service 20: Dump int32_t") payload = object_id + parameter_id_1 - command = PusTelecommand(service=20, subservice=129, ssc=2040, app_data=payload) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_pus_tc(PusTelecommand(service=20, subservice=129, app_data=payload)) -def load_param_2_simple_test_commands(tc_queue: TcQueueT): +def load_param_2_simple_test_commands(q: QueueHelper): object_id = TEST_DEVICE_0_ID parameter_id_2 = pack_parameter_id(domain_id=0, unique_id=2, linear_index=0) # test checking Load for float - tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20: Load float")) + q.add_log_cmd("Testing Service 20: Load float") type_and_matrix_data = pack_type_and_matrix_data(ptc=5, pfc=1, rows=1, columns=3) parameter_data = struct.pack("!fff", 4.2, -4.2, 49) payload = object_id + parameter_id_2 + type_and_matrix_data + parameter_data - command = PusTelecommand(service=20, subservice=128, ssc=2050, app_data=payload) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_pus_tc(PusTelecommand(service=20, subservice=128, app_data=payload)) # test checking Dump for float # Skip dump for now, still not properly implemented diff --git a/pus_tc/service_2_raw_cmd.py b/pus_tc/service_2_raw_cmd.py index 9e3745f..d779291 100644 --- a/pus_tc/service_2_raw_cmd.py +++ b/pus_tc/service_2_raw_cmd.py @@ -9,73 +9,51 @@ import struct from spacepackets.ecss.tc import PusTelecommand +from tmtccmd.tc import QueueHelper from tmtccmd.tc.pus_200_fsfw_modes import Modes, pack_mode_data -from tmtccmd.config.definitions import QueueCommands -from tmtccmd.tc.definitions import TcQueueT from common_tmtc.pus_tc import command_data as cmd_data from common_tmtc.config.object_ids import TEST_DEVICE_0_ID -def pack_service_2_commands_into(tc_queue: TcQueueT, op_code: str): +def pack_service_2_commands_into(q: QueueHelper, op_code: str): if op_code == "0": - pack_generic_service_2_test_into(0, tc_queue) + pack_generic_service_2_test_into(0, q) else: print(f"pack_service_2_test: Operation code {op_code} unknown!") -def pack_generic_service_2_test_into(init_ssc: int, tc_queue: TcQueueT) -> int: +def pack_generic_service_2_test_into(init_ssc: int, q: QueueHelper) -> int: new_ssc = init_ssc object_id = TEST_DEVICE_0_ID # dummy device # Set Raw Mode - tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 2: Setting Raw Mode")) + q.add_log_cmd("Testing Service 2: Setting Raw Mode") mode_data = pack_mode_data(object_id, Modes.RAW, 0) - command = PusTelecommand(service=200, subservice=1, ssc=new_ssc, app_data=mode_data) - tc_queue.appendleft(command.pack_command_tuple()) - new_ssc += 1 + q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data)) # toggle wiretapping raw - tc_queue.appendleft( - (QueueCommands.PRINT, "Testing Service 2: Toggling Wiretapping Raw") - ) + q.add_log_cmd("Testing Service 2: Toggling Wiretapping Raw") wiretapping_toggle_data = pack_wiretapping_mode(object_id, 1) - toggle_wiretapping_on_command = PusTelecommand( - service=2, subservice=129, ssc=new_ssc, app_data=wiretapping_toggle_data + q.add_pus_tc( + PusTelecommand(service=2, subservice=129, app_data=wiretapping_toggle_data) ) - tc_queue.appendleft(toggle_wiretapping_on_command.pack_command_tuple()) - new_ssc += 1 # send raw command, wiretapping should be returned via TM[2,130] and TC[2,131] - tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 2: Sending Raw Command")) + q.add_log_cmd("Testing Service 2: Sending Raw Command") raw_command = cmd_data.TEST_COMMAND_0 raw_data = object_id + raw_command - raw_command = PusTelecommand( - service=2, subservice=128, ssc=new_ssc, app_data=raw_data - ) - tc_queue.appendleft(raw_command.pack_command_tuple()) - new_ssc += 1 + q.add_pus_tc(PusTelecommand(service=2, subservice=128, app_data=raw_data)) # toggle wiretapping off - tc_queue.appendleft( - (QueueCommands.PRINT, "Testing Service 2: Toggle Wiretapping Off") - ) + q.add_log_cmd("Testing Service 2: Toggle Wiretapping Off") wiretapping_toggle_data = pack_wiretapping_mode(object_id, 0) - toggle_wiretapping_off_command = PusTelecommand( - service=2, subservice=129, ssc=new_ssc, app_data=wiretapping_toggle_data + q.add_pus_tc( + PusTelecommand(service=2, subservice=129, app_data=wiretapping_toggle_data) ) - tc_queue.appendleft(toggle_wiretapping_off_command.pack_command_tuple()) - new_ssc += 1 # send raw command which should be returned via TM[2,130] - tc_queue.appendleft( - (QueueCommands.PRINT, "Testing Service 2: Send second raw command") - ) - command = PusTelecommand(service=2, subservice=128, ssc=new_ssc, app_data=raw_data) - tc_queue.appendleft(command.pack_command_tuple()) - new_ssc += 1 - + q.add_log_cmd("Testing Service 2: Send second raw command") + q.add_pus_tc(PusTelecommand(service=2, subservice=128, app_data=raw_data)) # Set mode off - tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 2: Setting Off Mode")) + q.add_log_cmd("Testing Service 2: Setting Off Mode") mode_data = pack_mode_data(object_id, Modes.OFF, 0) - command = PusTelecommand(service=200, subservice=1, ssc=new_ssc, app_data=mode_data) - tc_queue.appendleft(command.pack_command_tuple()) - new_ssc += 1 + q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data)) return new_ssc diff --git a/pus_tc/service_3_housekeeping.py b/pus_tc/service_3_housekeeping.py index c752cd6..4de6af8 100644 --- a/pus_tc/service_3_housekeeping.py +++ b/pus_tc/service_3_housekeeping.py @@ -1,6 +1,8 @@ +from datetime import timedelta + from spacepackets.ecss.tc import PusTelecommand -from tmtccmd.config.definitions import QueueCommands +from tmtccmd.tc import QueueHelper from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes from tmtccmd.tc.pus_20_params import ( pack_boolean_parameter_app_data, @@ -8,7 +10,6 @@ from tmtccmd.tc.pus_20_params import ( ) from tmtccmd.tc.pus_3_fsfw_hk import make_sid, generate_one_hk_command import tmtccmd.tc.pus_3_fsfw_hk as srv3 -from tmtccmd.tc.definitions import TcQueueT from tmtccmd.tc.pus_8_funccmd import generate_action_command from common_tmtc.config.object_ids import TEST_DEVICE_0_ID, TEST_DEVICE_1_ID @@ -24,7 +25,7 @@ TEST_NOTIFICATION_ACTION_ID = 3 PARAM_ACTIVATE_CHANGING_DATASETS = 4 -def pack_service_3_commands_into(tc_queue: TcQueueT, op_code: str): +def pack_service_3_commands_into(q: QueueHelper, op_code: str): current_ssc = 3000 device_idx = 0 if device_idx == 0: @@ -35,179 +36,120 @@ def pack_service_3_commands_into(tc_queue: TcQueueT, op_code: str): if op_code == "0": # This will pack all the tests pack_service_3_test_info( - tc_queue=tc_queue, - init_ssc=current_ssc, + q=q, object_id=object_id, device_idx=device_idx, ) elif op_code == "1": # Extremely simple, generate one HK packet pack_gen_one_hk_command( - tc_queue=tc_queue, + q=q, device_idx=device_idx, - init_ssc=current_ssc, object_id=object_id, ) elif op_code == "2": # Housekeeping basic test - pack_housekeeping_basic_test( - tc_queue=tc_queue, object_id=object_id, init_ssc=current_ssc - ) + pack_housekeeping_basic_test(q=q, object_id=object_id) elif op_code == "3": # Notification demo - pack_notification_basic_test( - tc_queue=tc_queue, object_id=object_id, init_ssc=current_ssc - ) + pack_notification_basic_test(q=q, object_id=object_id) -def pack_service_3_test_info( - tc_queue: TcQueueT, device_idx: int, object_id: bytearray, init_ssc: int -): - tc_queue.appendleft( - (QueueCommands.PRINT, "Service 3 (Housekeeping Service): All tests") - ) - current_ssc = init_ssc - - current_ssc += pack_gen_one_hk_command( - tc_queue=tc_queue, - device_idx=device_idx, +def pack_service_3_test_info(q: QueueHelper, device_idx: int, object_id: bytearray): + q.add_log_cmd("Service 3 (Housekeeping Service): All tests") + pack_gen_one_hk_command(q=q, device_idx=device_idx, object_id=object_id) + pack_housekeeping_basic_test(q=q, object_id=object_id) + pack_notification_basic_test( + q=q, object_id=object_id, - init_ssc=current_ssc, - ) - current_ssc += pack_housekeeping_basic_test( - tc_queue=tc_queue, object_id=object_id, init_ssc=current_ssc - ) - current_ssc += pack_notification_basic_test( - tc_queue=tc_queue, - object_id=object_id, - init_ssc=current_ssc, enable_normal_mode=False, ) -def pack_gen_one_hk_command( - tc_queue: TcQueueT, device_idx: int, init_ssc: int, object_id: bytearray -) -> int: +def pack_gen_one_hk_command(q: QueueHelper, device_idx: int, object_id: bytearray): test_sid = make_sid(object_id=object_id, set_id=TEST_SET_ID) - tc_queue.appendleft( - ( - QueueCommands.PRINT, - f"Service 3 Test: Generate one test set packet for " - f"test device {device_idx}", - ) + q.add_log_cmd( + f"Service 3 Test: Generate one test set packet for test device {device_idx}" ) - command = generate_one_hk_command(ssc=init_ssc, sid=test_sid) - init_ssc += 1 - tc_queue.appendleft(command.pack_command_tuple()) - return init_ssc + q.add_pus_tc(generate_one_hk_command(sid=test_sid)) def pack_housekeeping_basic_test( - tc_queue: TcQueueT, + q: QueueHelper, object_id: bytearray, - init_ssc: int, enable_normal_mode: bool = True, -) -> int: +): """ This basic test will request one HK packet, then it will enable periodic packets and listen to the periodic packets for a few seconds. After that, HK packets will be disabled again. """ test_sid = make_sid(object_id=object_id, set_id=TEST_SET_ID) - current_ssc = init_ssc # Enable changing datasets via parameter service (Service 20) - tc_queue.appendleft( - (QueueCommands.PRINT, "Service 3 Test: Performing basic HK tests") - ) + q.add_log_cmd("Service 3 Test: Performing basic HK tests") if enable_normal_mode: # Set mode normal so that sets are changed/read regularly - tc_queue.appendleft((QueueCommands.PRINT, "Service 3 Test: Set Normal Mode")) + q.add_log_cmd("Service 3 Test: Set Normal Mode") mode_data = pack_mode_data(object_id, Modes.NORMAL, 0) - command = PusTelecommand( - service=200, subservice=1, ssc=current_ssc, app_data=mode_data - ) - current_ssc += 1 - tc_queue.appendleft(command.pack_command_tuple()) + q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data)) - tc_queue.appendleft((QueueCommands.PRINT, "Enabling changing datasets")) + q.add_log_cmd("Enabling changing datasets") app_data = pack_boolean_parameter_app_data( object_id=object_id, domain_id=0, unique_id=PARAM_ACTIVATE_CHANGING_DATASETS, parameter=True, ) - cmd = pack_fsfw_load_param_cmd(app_data=app_data, ssc=0) - - current_ssc += 1 - tc_queue.appendleft(cmd.pack_command_tuple()) + q.add_pus_tc(pack_fsfw_load_param_cmd(app_data=app_data, ssc=0)) # Enable periodic reporting - tc_queue.appendleft( - (QueueCommands.PRINT, "Enabling periodic thermal sensor packet generation: ") + q.add_log_cmd("Enabling periodic thermal sensor packet generation: ") + q.add_pus_tc( + PusTelecommand( + service=3, + subservice=srv3.Subservices.TC_ENABLE_PERIODIC_HK_GEN, + app_data=test_sid, + ) ) - command = PusTelecommand( - service=3, - subservice=srv3.Subservices.TC_ENABLE_PERIODIC_HK_GEN, - ssc=current_ssc, - app_data=test_sid, - ) - current_ssc += 1 - tc_queue.appendleft(command.pack_command_tuple()) - - tc_queue.appendleft((QueueCommands.WAIT, 2.0)) + q.add_wait(timedelta(seconds=2.0)) # Disable periodic reporting - tc_queue.appendleft( - (QueueCommands.PRINT, "Disabling periodic thermal sensor packet generation: ") + q.add_log_cmd("Disabling periodic thermal sensor packet generation: ") + q.add_pus_tc( + PusTelecommand( + service=3, + subservice=srv3.Subservices.TC_DISABLE_PERIODIC_HK_GEN, + app_data=test_sid, + ) ) - command = PusTelecommand( - service=3, - subservice=srv3.Subservices.TC_DISABLE_PERIODIC_HK_GEN, - ssc=current_ssc, - app_data=test_sid, - ) - current_ssc += 1 - tc_queue.appendleft(command.pack_command_tuple()) # Disable changing datasets via parameter service (Service 20) - tc_queue.appendleft((QueueCommands.PRINT, "Disabling changing datasets")) + q.add_log_cmd("Disabling changing datasets") app_data = pack_boolean_parameter_app_data( object_id=object_id, domain_id=0, unique_id=PARAM_ACTIVATE_CHANGING_DATASETS, parameter=False, ) - current_ssc += 1 - cmd = pack_fsfw_load_param_cmd(app_data=app_data, ssc=0) - tc_queue.appendleft(cmd.pack_command_tuple()) - return current_ssc + q.add_pus_tc(pack_fsfw_load_param_cmd(app_data=app_data, ssc=0)) def pack_notification_basic_test( - tc_queue: TcQueueT, + q: QueueHelper, object_id: bytearray, - init_ssc: int, enable_normal_mode: bool = True, -) -> int: - current_ssc = init_ssc - tc_queue.appendleft( - (QueueCommands.PRINT, "Service 3 Test: Performing notification tests") - ) +): + q.add_log_cmd("Service 3 Test: Performing notification tests") if enable_normal_mode: # Set mode normal so that sets are changed/read regularly - tc_queue.appendleft((QueueCommands.PRINT, "Service 3 Test: Set Normal Mode")) + q.add_log_cmd("Service 3 Test: Set Normal Mode") mode_data = pack_mode_data(object_id, Modes.NORMAL, 0) - command = PusTelecommand( - service=200, subservice=1, ssc=current_ssc, app_data=mode_data - ) - current_ssc += 1 - tc_queue.appendleft(command.pack_command_tuple()) + q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data)) - tc_queue.appendleft((QueueCommands.PRINT, "Triggering notification")) - command = generate_action_command( - object_id=object_id, action_id=TEST_NOTIFICATION_ACTION_ID, ssc=current_ssc + q.add_log_cmd("Triggering notification") + q.add_pus_tc( + generate_action_command( + object_id=object_id, action_id=TEST_NOTIFICATION_ACTION_ID + ) ) - tc_queue.appendleft(command.pack_command_tuple()) - current_ssc += 1 - return current_ssc diff --git a/pus_tc/service_8_func_cmd.py b/pus_tc/service_8_func_cmd.py index 101dad1..1a33699 100644 --- a/pus_tc/service_8_func_cmd.py +++ b/pus_tc/service_8_func_cmd.py @@ -1,7 +1,5 @@ from spacepackets.ecss.tc import PusTelecommand - -from tmtccmd.config.definitions import QueueCommands -from tmtccmd.tc.definitions import TcQueueT +from tmtccmd.tc import QueueHelper from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes import common_tmtc.pus_tc.command_data as cmd_data @@ -10,53 +8,42 @@ import common_tmtc.pus_tc.command_data as cmd_data from common_tmtc.config.object_ids import TEST_DEVICE_0_ID -def pack_service_8_commands_into(tc_queue: TcQueueT, op_code: str): +def pack_service_8_commands_into(q: QueueHelper, op_code: str): if op_code == "0": - pack_generic_service_8_test_into(tc_queue=tc_queue) + pack_generic_service_8_test_into(q=q) else: print(f"pack_service_8_test: Operation code {op_code} unknown!") -def pack_generic_service_8_test_into(tc_queue: TcQueueT): - tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 8")) +def pack_generic_service_8_test_into(q: QueueHelper): + q.add_log_cmd("Testing Service 8") object_id = TEST_DEVICE_0_ID # set mode on - tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 8: Set On Mode")) + q.add_log_cmd("Testing Service 8: Set On Mode") mode_data = pack_mode_data(object_id, Modes.ON, 0) - command = PusTelecommand(service=200, subservice=1, ssc=800, app_data=mode_data) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data)) # set mode normal - tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 8: Set Normal Mode")) + q.add_log_cmd("Testing Service 8: Set Normal Mode") mode_data = pack_mode_data(object_id, Modes.NORMAL, 0) - command = PusTelecommand(service=200, subservice=1, ssc=810, app_data=mode_data) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data)) # Direct command which triggers completion reply - tc_queue.appendleft( - (QueueCommands.PRINT, "Testing Service 8: Trigger Step and Completion Reply") - ) + q.add_log_cmd("Testing Service 8: Trigger Step and Completion Reply") action_id = cmd_data.TEST_COMMAND_0 direct_command = object_id + action_id - command = PusTelecommand( - service=8, subservice=128, ssc=820, app_data=direct_command - ) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=direct_command)) # Direct command which triggers _tm_data reply - tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 8: Trigger Data Reply")) + q.add_log_cmd("Testing Service 8: Trigger Data Reply") action_id = cmd_data.TEST_COMMAND_1 command_param1 = cmd_data.TEST_COMMAND_1_PARAM_1 command_param2 = cmd_data.TEST_COMMAND_1_PARAM_2 direct_command = object_id + action_id + command_param1 + command_param2 - command = PusTelecommand( - service=8, subservice=128, ssc=830, app_data=direct_command - ) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=direct_command)) # Set mode off - tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 8: Set Off Mode")) + q.add_log_cmd("Testing Service 8: Set Off Mode") mode_data = pack_mode_data(object_id, Modes.OFF, 0) - command = PusTelecommand(service=200, subservice=1, ssc=800, app_data=mode_data) - tc_queue.appendleft(command.pack_command_tuple()) + q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data)) diff --git a/pus_tc/tc_packing.py b/pus_tc/tc_packing.py index 5ff2cb4..a7589ce 100644 --- a/pus_tc/tc_packing.py +++ b/pus_tc/tc_packing.py @@ -16,7 +16,7 @@ from tmtccmd.logging import get_console_logger, get_current_time_string from tmtccmd.logging.pus import log_raw_pus_tc from tmtccmd.tc.definitions import TcQueueT from tmtccmd.config.definitions import CoreServiceList, QueueCommands -from tmtccmd.tc.pus_5_event import pack_generic_service5_test_into +from tmtccmd.tc.pus_5_event import pack_generic_service_5_test_into from tmtccmd.pus.pus_17_test import pack_generic_service17_test from common_tmtc.pus_tc.service_20_parameters import pack_service20_commands_into @@ -44,31 +44,6 @@ def pre_tc_send_cb( LOGGER.info(tc_info_string) file_logger.info(f"{get_current_time_string(True)}: {tc_info_string}") com_if.send(data=queue_entry) - elif isinstance(queue_entry, QueueCommands): - if queue_entry == QueueCommands.PRINT: - file_logger.info(queue_info) - - -def common_service_queue_user( - service: Union[str, int], op_code: str, tc_queue: TcQueueT -): - if service == CoreServiceList.SERVICE_2.value: - return pack_service_2_commands_into(op_code=op_code, tc_queue=tc_queue) - if service == CoreServiceList.SERVICE_3.value: - return pack_service_3_commands_into(op_code=op_code, tc_queue=tc_queue) - if service == CoreServiceList.SERVICE_5.value: - return pack_generic_service5_test_into(tc_queue=tc_queue) - if service == CoreServiceList.SERVICE_8.value: - return pack_service_8_commands_into(op_code=op_code, tc_queue=tc_queue) - if service == CoreServiceList.SERVICE_11.value: - return pack_service_11_commands(op_code=op_code, tc_queue=tc_queue) - if service == CoreServiceList.SERVICE_17.value: - return pack_service_17_commands(op_code=op_code, tc_queue=tc_queue, init_ssc=0) - if service == CoreServiceList.SERVICE_20.value: - return pack_service20_commands_into(tc_queue=tc_queue, op_code=op_code) - if service == CoreServiceList.SERVICE_200.value: - return pack_service_200_commands_into(tc_queue=tc_queue, op_code=op_code) - LOGGER.warning("Invalid Service !") def create_total_tc_queue_user() -> TcQueueT: diff --git a/pus_tm/factory_hook.py b/pus_tm/factory_hook.py index 632ef33..9ab1cb8 100644 --- a/pus_tm/factory_hook.py +++ b/pus_tm/factory_hook.py @@ -7,86 +7,68 @@ from spacepackets.ecss.tm import PusTelemetry from spacepackets.util import PrintFormats from spacepackets.ccsds.spacepacket import PacketTypes +from tmtccmd.logging.pus import RawTmtcTimedLogWrapper +from tmtccmd.pus import VerificationWrapper +from tmtccmd.tm.pus_17_test import Service17TmExtended from tmtccmd.tm.pus_2_rawcmd import Service2Tm -from tmtccmd.tm.pus_17_test import Service17TMExtended from tmtccmd.tm.pus_20_fsfw_parameters import Service20FsfwTm from tmtccmd.tm.pus_200_fsfw_modes import Service200FsfwTm from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter from tmtccmd.logging import get_console_logger -from tmtccmd.logging.pus import ( - create_tmtc_logger, - log_raw_pus_tm, - log_raw_unknown_packet, -) from common_tmtc.config.object_ids import get_object_ids from common_tmtc.pus_tm.action_reply_handling import handle_action_reply from common_tmtc.pus_tm.event_handler import handle_event_packet from common_tmtc.pus_tm.verification_handler import handle_service_1_packet from common_tmtc.pus_tm.hk_handling import handle_hk_packet -from common_tmtc.config.definitions import PUS_APID - LOGGER = get_console_logger() -FSFW_PRINTER = FsfwTmTcPrinter(file_logger=create_tmtc_logger()) - - -def ccsds_tm_handler(apid: int, raw_tm_packet: bytes, _user_args: any) -> None: - if apid == PUS_APID: - pus_factory_hook(raw_tm_packet=raw_tm_packet) - - -def pus_factory_hook(raw_tm_packet: bytes): - if len(raw_tm_packet) < 8: +def pus_factory_hook( + wrapper: VerificationWrapper, + packet: bytes, + printer: FsfwTmTcPrinter, + raw_logger: RawTmtcTimedLogWrapper, +): + if len(packet) < 8: LOGGER.warning("Detected packet shorter than 8 bytes!") return - service_type = raw_tm_packet[7] - subservice_type = raw_tm_packet[8] - file_logger = FSFW_PRINTER.file_logger + try: + tm_packet = PusTelemetry.unpack(packet) + except ValueError: + LOGGER.warning("Could not generate PUS TM object from raw data") + LOGGER.warning(f"Raw Packet: [{packet.hex(sep=',')}], REPR: {packet!r}") + return + service = tm_packet.service + file_logger = printer.file_logger obj_id_dict = get_object_ids() dedicated_handler = True - try: - tm_packet = None - if service_type == 1: - handle_service_1_packet(printer=FSFW_PRINTER, raw_tm=raw_tm_packet) - elif service_type == 2: - tm_packet = Service2Tm.unpack(raw_tm_packet) - dedicated_handler = False - elif service_type == 3: - handle_hk_packet( - printer=FSFW_PRINTER, raw_tm=raw_tm_packet, obj_id_dict=obj_id_dict - ) - elif service_type == 8: - handle_action_reply( - raw_tm=raw_tm_packet, printer=FSFW_PRINTER, obj_id_dict=obj_id_dict - ) - elif service_type == 5: - handle_event_packet( - raw_tm=raw_tm_packet, printer=FSFW_PRINTER, file_logger=file_logger - ) - elif service_type == 17: - tm_packet = Service17TMExtended.unpack(raw_telemetry=raw_tm_packet) - dedicated_handler = False - elif service_type == 20: - tm_packet = Service20FsfwTm.unpack(raw_telemetry=raw_tm_packet) - dedicated_handler = False - elif service_type == 200: - tm_packet = Service200FsfwTm.unpack(raw_telemetry=raw_tm_packet) - dedicated_handler = False - else: - LOGGER.info( - f"The service {service_type} is not implemented in Telemetry Factory" - ) - tm_packet = PusTelemetry.unpack(raw_telemetry=raw_tm_packet) - tm_packet.print_source_data(PrintFormats.HEX) - dedicated_handler = True - if not dedicated_handler and tm_packet is not None: - FSFW_PRINTER.handle_long_tm_print(packet_if=tm_packet, info_if=tm_packet) - log_raw_pus_tm( - packet=raw_tm_packet, srv_subservice=(service_type, subservice_type) - ) - except ValueError: - LOGGER.warning("Invalid packet format detected") - log_raw_unknown_packet(packet=raw_tm_packet, packet_type=PacketTypes.TM) + if service == 1: + handle_service_1_packet(wrapper=wrapper, raw_tm=packet) + elif service == 2: + tm_packet = Service2Tm.unpack(packet) + dedicated_handler = False + elif service == 3: + handle_hk_packet(printer=printer, raw_tm=packet, obj_id_dict=obj_id_dict) + elif service == 8: + handle_action_reply(raw_tm=packet, printer=printer, obj_id_dict=obj_id_dict) + elif service == 5: + handle_event_packet(raw_tm=packet, printer=printer, file_logger=file_logger) + elif service == 17: + tm_packet = Service17TmExtended.unpack(raw_telemetry=packet) + dedicated_handler = False + elif service == 20: + tm_packet = Service20FsfwTm.unpack(raw_telemetry=packet) + dedicated_handler = False + elif service == 200: + tm_packet = Service200FsfwTm.unpack(raw_telemetry=packet) + dedicated_handler = False + else: + LOGGER.info(f"The service {service} is not implemented in Telemetry Factory") + tm_packet = PusTelemetry.unpack(raw_telemetry=packet) + tm_packet.print_source_data(PrintFormats.HEX) + dedicated_handler = True + if not dedicated_handler and tm_packet is not None: + printer.handle_long_tm_print(packet_if=tm_packet, info_if=tm_packet) + raw_logger.log_tm(tm_packet) diff --git a/pus_tm/verification_handler.py b/pus_tm/verification_handler.py index aacc573..0f31cb4 100644 --- a/pus_tm/verification_handler.py +++ b/pus_tm/verification_handler.py @@ -1,28 +1,40 @@ -from typing import cast +from spacepackets.ecss.pus_1_verification import UnpackParams -from tmtccmd.tm.pus_1_verification import Service1TMExtended +from tmtccmd.pus import VerificationWrapper +from tmtccmd.tm.pus_1_verification import Service1TmExtended from tmtccmd.logging import get_console_logger -from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter from common_tmtc.config.retvals import get_retval_dict LOGGER = get_console_logger() -def handle_service_1_packet(printer: FsfwTmTcPrinter, raw_tm: bytes): - tm_packet = Service1TMExtended.unpack(raw_telemetry=raw_tm) - printer.handle_long_tm_print(packet_if=tm_packet, info_if=tm_packet) - srv1_packet = cast(Service1TMExtended, tm_packet) +def handle_service_1_packet(wrapper: VerificationWrapper, raw_tm: bytes): + if wrapper.console_logger is None or wrapper.file_logger is None: + raise ValueError( + "Console logger or file logger not valid. Please set a valid one" + ) + tm_packet = Service1TmExtended.unpack(data=raw_tm, params=UnpackParams(1, 1)) + res = wrapper.verificator.add_tm(tm_packet) + if res is None: + LOGGER.info( + f"Received Verification TM[{tm_packet.service}, {tm_packet.subservice}] " + f"with Request ID {tm_packet.tc_req_id.as_u32():#08x}" + ) + LOGGER.warning(f"No matching telecommand found for {tm_packet.tc_req_id}") + else: + wrapper.log_to_console(tm_packet, res) + wrapper.log_to_file(tm_packet, res) retval_dict = get_retval_dict() - if srv1_packet.has_tc_error_code: - retval_info = retval_dict.get(srv1_packet.error_code) + if tm_packet.has_failure_notice: + retval_info = retval_dict.get(tm_packet.error_code.val) if retval_info is None: LOGGER.info( - f"No returnvalue information found for error code {srv1_packet.error_code}" + f"No returnvalue information found for error code {tm_packet.error_code}" ) else: retval_string = ( - f"Error Code information for code {srv1_packet.error_code} | " + f"Error Code information for code {tm_packet.error_code} | " f"Name: {retval_info.name} | Info: {retval_info.info}" ) - LOGGER.info(retval_string) - printer.file_logger.info(retval_string) + wrapper.console_logger.info(retval_string) + wrapper.file_logger.info(retval_string)