import argparse from typing import Union, Dict, Tuple from tmtccmd.config.definitions import ServiceOpCodeDictT from tmtccmd.pus_tm.service_3_base import Service3Base from tmtccmd.ecss.tm import PusTelemetry from tmtccmd.pus_tc.definitions import TcQueueT from tmtccmd.com_if.com_interface_base import CommunicationInterface from tmtccmd.core.backend import TmTcHandler from tmtccmd.config.hook import TmTcHookBase from tmtccmd.utility.tmtc_printer import TmTcPrinter from tmtccmd.config.globals import OpCodeDictKeys from config.object_ids import RW1_ID from config.definitions import CustomServiceList class EiveHookObject(TmTcHookBase): def get_service_op_code_dictionary(self) -> ServiceOpCodeDictT: from tmtccmd.config.globals import get_default_service_op_code_dict service_op_code_dict = get_default_service_op_code_dict() op_code_dict_srv_acu = { "0": ("ACU Tests", {OpCodeDictKeys.TIMEOUT: 2.0}), } service_acu_tuple = ("ACU Devices", op_code_dict_srv_acu) op_code_dict_srv_tmp1075 = { "0": ("TMP1075 Tests", {OpCodeDictKeys.TIMEOUT: 2.2}), } service_tmp1075_1_tuple = ("TMP1075 1", op_code_dict_srv_tmp1075) service_tmp1075_2_tuple = ("TMP1075 2", op_code_dict_srv_tmp1075) op_code_dict_srv_p60 = { "0": ("P60 Tests", {OpCodeDictKeys.TIMEOUT: 2.0}), } service_p60_tuple = ("P60 Device", op_code_dict_srv_p60) op_code_dict_srv_pdu1 = { "0": ("PDU1 Tests", {OpCodeDictKeys.TIMEOUT: 2.0}), } service_pdu1_tuple = ("PDU1 Device", op_code_dict_srv_pdu1) op_code_dict_srv_pdu2 = { "0": ("PDU2 Tests", {OpCodeDictKeys.TIMEOUT: 2.0}), } service_pdu2_tuple = ("PDU2 Device", op_code_dict_srv_pdu2) op_code_dict_srv_heater = { "0": ("Heater Tests", {OpCodeDictKeys.TIMEOUT: 2.0}), } service_heater_tuple = ("Heater Device", op_code_dict_srv_heater) op_code_dict_srv_imtq = { "0": ("IMTQ Tests All", {OpCodeDictKeys.TIMEOUT: 2.0}), "1": ("IMTQ perform pos X self test", {OpCodeDictKeys.TIMEOUT: 2.0}), "2": ("IMTQ perform neg X self test", {OpCodeDictKeys.TIMEOUT: 2.0}), "3": ("IMTQ perform pos Y self test", {OpCodeDictKeys.TIMEOUT: 2.0}), "4": ("IMTQ perform neg Y self test", {OpCodeDictKeys.TIMEOUT: 2.0}), "5": ("IMTQ perform pos Z self test", {OpCodeDictKeys.TIMEOUT: 2.0}), "6": ("IMTQ perform neg Z self test", {OpCodeDictKeys.TIMEOUT: 2.0}), "7": ("IMTQ command dipole", {OpCodeDictKeys.TIMEOUT: 2.0}), "8": ("IMTQ get commanded dipole", {OpCodeDictKeys.TIMEOUT: 2.0}), } service_imtq_tuple = ("IMTQ Device", op_code_dict_srv_imtq) op_code_dict_srv_rw = { "0": ("Reaction Wheel: Run all commands", {OpCodeDictKeys.TIMEOUT: 2.0}), "1": ("Reaction Wheel: Set speed", {OpCodeDictKeys.TIMEOUT: 2.0}), "2": ("Reaction Wheel: Set mode on", {OpCodeDictKeys.TIMEOUT: 2.0}), "3": ("Reaction Wheel: Set mode normal", {OpCodeDictKeys.TIMEOUT: 2.0}), "4": ("Reaction Wheel: Set mode off", {OpCodeDictKeys.TIMEOUT: 2.0}), "5": ("Reaction Wheel: Send get-telemetry-command", {OpCodeDictKeys.TIMEOUT: 2.0}), } service_rw_tuple = ("Reaction Wheel", op_code_dict_srv_rw) op_code_dict_srv_rad_sensor = { "0": ("Radiation Sensor: Set mode on", {OpCodeDictKeys.TIMEOUT: 2.0}), "1": ("Radiation Sensor: Set mode normal", {OpCodeDictKeys.TIMEOUT: 2.0}), "2": ("Radiation Sensor: Set mode off", {OpCodeDictKeys.TIMEOUT: 2.0}), } service_rad_sensor_tuple = ("Radiation Sensor", op_code_dict_srv_rad_sensor) op_code_dict_srv_ploc_supv = { "0": ("PLOC Supervisor: Set mode on", {OpCodeDictKeys.TIMEOUT: 2.0}), "1": ("PLOC Supervisor: Set mode normal", {OpCodeDictKeys.TIMEOUT: 2.0}), "2": ("PLOC Supervisor: Set mode off", {OpCodeDictKeys.TIMEOUT: 2.0}), "3": ("PLOC Supervisor: Get HK Report", {OpCodeDictKeys.TIMEOUT: 2.0}), "4": ("PLOC Supervisor: Restart MPSoC", {OpCodeDictKeys.TIMEOUT: 2.0}), "5": ("PLOC Supervisor: Start MPSoC", {OpCodeDictKeys.TIMEOUT: 2.0}), "6": ("PLOC Supervisor: Shutdown MPSoC", {OpCodeDictKeys.TIMEOUT: 2.0}), "7": ("PLOC Supervisor: Select MPSoC boot image", {OpCodeDictKeys.TIMEOUT: 2.0}), "8": ("PLOC Supervisor: Set max restart tries", {OpCodeDictKeys.TIMEOUT: 2.0}), "9": ("PLOC Supervisor: Reset MPSoC", {OpCodeDictKeys.TIMEOUT: 2.0}), "10": ("PLOC Supervisor: Set time reference", {OpCodeDictKeys.TIMEOUT: 2.0}), "11": ("PLOC Supervisor: Set boot timeout", {OpCodeDictKeys.TIMEOUT: 2.0}), "12": ("PLOC Supervisor: Disable Hk", {OpCodeDictKeys.TIMEOUT: 2.0}), } service_ploc_supv_tuple = ("PLOC Supervisor", op_code_dict_srv_ploc_supv) service_op_code_dict[CustomServiceList.ACU.value] = service_acu_tuple service_op_code_dict[CustomServiceList.TMP1075_1.value] = service_tmp1075_1_tuple service_op_code_dict[CustomServiceList.TMP1075_2.value] = service_tmp1075_2_tuple service_op_code_dict[CustomServiceList.P60DOCK.value] = service_p60_tuple service_op_code_dict[CustomServiceList.PDU1.value] = service_pdu1_tuple service_op_code_dict[CustomServiceList.PDU1.value] = service_pdu2_tuple service_op_code_dict[CustomServiceList.HEATER.value] = service_heater_tuple service_op_code_dict[CustomServiceList.IMTQ.value] = service_imtq_tuple service_op_code_dict[CustomServiceList.REACTION_WHEEL_1.value] = service_rw_tuple service_op_code_dict[CustomServiceList.REACTION_WHEEL_2.value] = service_rw_tuple service_op_code_dict[CustomServiceList.REACTION_WHEEL_3.value] = service_rw_tuple service_op_code_dict[CustomServiceList.REACTION_WHEEL_4.value] = service_rw_tuple service_op_code_dict[CustomServiceList.RAD_SENSOR.value] = service_rad_sensor_tuple service_op_code_dict[CustomServiceList.PLOC_SUPV.value] = service_ploc_supv_tuple return service_op_code_dict def get_json_config_file_path(self) -> str: """ The user can specify a path and filename for the JSON configuration file by overriding this function. :return: """ return "config/tmtc_config.json" def get_version(self) -> str: from config.version import SW_NAME, VERSION_MAJOR, VERSION_MINOR, VERSION_SUBMINOR return f"{SW_NAME} {VERSION_MAJOR}.{VERSION_MINOR}.{VERSION_SUBMINOR}" def add_globals_pre_args_parsing(self, gui: bool = False): from config.globals_config import set_globals_pre_args_parsing set_globals_pre_args_parsing(gui=gui) def add_globals_post_args_parsing(self, args: argparse.Namespace): from config.globals_config import add_globals_post_args_parsing add_globals_post_args_parsing(args=args, json_cfg_path=self.get_json_config_file_path()) def assign_communication_interface(self, com_if_key: str, tmtc_printer: TmTcPrinter) -> \ Union[CommunicationInterface, None]: from tmtccmd.config.com_if import create_communication_interface_default return create_communication_interface_default( com_if_key=com_if_key, tmtc_printer=tmtc_printer, json_cfg_path=self.get_json_config_file_path() ) def perform_mode_operation(self, tmtc_backend: TmTcHandler, mode: int): from config.custom_mode_op import custom_mode_operation custom_mode_operation(mode=mode, tmtc_backend=tmtc_backend) def pack_service_queue(self, service: int, op_code: str, service_queue: TcQueueT): from pus_tc.tc_packer_hook import pack_service_queue_user pack_service_queue_user(service=service, op_code=op_code, service_queue=service_queue) def tm_user_factory_hook(self, raw_tm_packet: bytearray) -> PusTelemetry: from pus_tm.factory_hook import tm_user_factory_hook return tm_user_factory_hook(raw_tm_packet=raw_tm_packet) def get_object_ids(self) -> Dict[bytes, list]: from config.object_ids import get_object_ids return get_object_ids() @staticmethod def handle_service_8_telemetry( object_id: bytes, action_id: int, custom_data: bytearray ) -> Tuple[list, list]: from pus_tm.service_8_hook import user_analyze_service_8_data return user_analyze_service_8_data( object_id=object_id, action_id=action_id, custom_data=custom_data ) @staticmethod def handle_service_3_housekeeping( object_id: bytes, set_id: int, hk_data: bytearray, service3_packet: Service3Base ) -> Tuple[list, list, bytearray, int]: from pus_tm.hk_handling import handle_user_hk_packet return handle_user_hk_packet( object_id=object_id, set_id=set_id, hk_data=hk_data, service3_packet=service3_packet ) @staticmethod def handle_service_5_event( object_id: bytes, event_id: int, param_1: int, param_2: int ) -> str: if object_id == RW1_ID: if event_id == 1: return "" return ""