import argparse from typing import Dict, Tuple, Optional from spacepackets.ecss.conf import PusVersion from tmtccmd.com_if.com_interface_base import CommunicationInterface from tmtccmd.config.definitions import ServiceOpCodeDictT from tmtccmd.config.hook import TmTcHookBase from tmtccmd.core.backend import TmTcHandler from tmtccmd.tc.definitions import TcQueueT from tmtccmd.tm.service_3_base import Service3Base from tmtccmd.utility.tmtc_printer import TmTcPrinter from common_tmtc.config.definitions import PUS_APID class FsfwHookBase(TmTcHookBase): def get_json_config_file_path(self) -> str: return "config/tmtc_config.json" def get_service_op_code_dictionary(self) -> ServiceOpCodeDictT: from tmtccmd.config.globals import ( get_default_service_op_code_dict, add_service_op_code_entry, add_op_code_entry, OpCodeDictKeys, ) def_dict = get_default_service_op_code_dict() op_code = dict() add_op_code_entry(op_code_dict=op_code, keys="test", info="Mode CMD Test") add_op_code_entry( op_code_dict=op_code, keys=["0", "asm_to_normal"], info="Command test assembly to normal mode", options={OpCodeDictKeys.TIMEOUT: 6.0}, ) add_service_op_code_entry( srv_op_code_dict=def_dict, name="200", info="Mode MGMT", op_code_entry=op_code, ) return def_dict def add_globals_pre_args_parsing(self, gui: bool = False): from tmtccmd.config.globals import set_default_globals_pre_args_parsing set_default_globals_pre_args_parsing( gui=gui, pus_tm_version=PusVersion.PUS_C, pus_tc_version=PusVersion.PUS_C, tc_apid=PUS_APID, tm_apid=PUS_APID, ) def add_globals_post_args_parsing(self, args: argparse.Namespace): from tmtccmd.config.globals import set_default_globals_post_args_parsing set_default_globals_post_args_parsing( args=args, json_cfg_path=self.get_json_config_file_path() ) def assign_communication_interface( self, com_if_key: str, tmtc_printer: TmTcPrinter ) -> Optional[CommunicationInterface]: from tmtccmd.config.com_if import create_communication_interface_default return create_communication_interface_default( com_if_key=com_if_key, tmtc_printer=tmtc_printer, json_cfg_path=self.get_json_config_file_path(), space_packet_ids=(0x08EF,), ) def perform_mode_operation(self, tmtc_backend: TmTcHandler, mode: int): print("No custom mode operation implemented") def pack_service_queue(self, service: int, op_code: str, service_queue: TcQueueT): from common_tmtc.pus_tc.tc_packing import pack_service_queue_user pack_service_queue_user( service=service, op_code=op_code, tc_queue=service_queue ) def get_object_ids(self) -> Dict[bytes, list]: from common_tmtc.config.object_ids import get_object_ids return get_object_ids() @staticmethod def handle_service_8_telemetry( object_id: int, action_id: int, custom_data: bytearray ) -> Tuple[list, list]: from common_tmtc.pus_tm.service_8_handling import custom_service_8_handling return custom_service_8_handling( object_id=object_id, action_id=action_id, custom_data=custom_data ) @staticmethod def handle_service_3_housekeeping( object_id: bytes, set_id: int, hk_data: bytearray, service3_packet: Service3Base ) -> Tuple[list, list, bytearray, int]: from common_tmtc.pus_tm.service_3_hk_handling import service_3_hk_handling return service_3_hk_handling( object_id=object_id, set_id=set_id, hk_data=hk_data, service3_packet=service3_packet, )