"""Core EIVE TM handler module """ from eive_tmtc.config.object_ids import get_object_ids from spacepackets.ecss import PusTelemetry from spacepackets.ecss.pus_17_test import Service17Tm from spacepackets.util import PrintFormats from spacepackets.ccsds.time import CdsShortTimestamp from tmtccmd import get_console_logger from tmtccmd.logging.pus import RawTmtcTimedLogWrapper from tmtccmd.pus import VerificationWrapper from tmtccmd.tm import Service20FsfwTm, Service200FsfwTm from tmtccmd.tm.pus_20_fsfw_params import Service20ParamDumpWrapper from tmtccmd.pus.s20_fsfw_params_defs import CustomSubservice as ParamSubservice from tmtccmd.tm.pus_200_fsfw_modes import Subservice as ModeSubservice from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter from .defs import PrintWrapper from .event_handler import handle_event_packet from .verification_handler import handle_service_1_fsfw_packet, generic_retval_printout from .hk_handling import handle_hk_packet from .action_reply_handler import handle_action_reply LOGGER = get_console_logger() def pus_factory_hook( packet: bytes, verif_wrapper: VerificationWrapper, printer: FsfwTmTcPrinter, raw_logger: RawTmtcTimedLogWrapper, ): if len(packet) < 8: LOGGER.warning("Detected packet shorter than 8 bytes!") return try: tm_packet = PusTelemetry.unpack(packet, CdsShortTimestamp.empty()) except ValueError: LOGGER.warning("Could not generate PUS TM object from raw data") LOGGER.warning(f"Raw Packet: [{packet.hex(sep=',')}], REPR: {packet!r}") return service = tm_packet.service obj_id_dict = get_object_ids() pw = PrintWrapper(printer) dedicated_handler = True if service == 1: handle_service_1_fsfw_packet(wrapper=verif_wrapper, raw_tm=packet) elif service == 3: handle_hk_packet(printer=printer, raw_tm=packet, obj_id_dict=obj_id_dict) elif service == 5: handle_event_packet(raw_tm=packet, printer=printer) elif service == 8: handle_action_reply(raw_tm=packet, printer=printer, obj_id_dict=obj_id_dict) elif service == 17: tm_packet = Service17Tm.unpack( raw_telemetry=packet, time_reader=CdsShortTimestamp.empty() ) if tm_packet.subservice == 2: verif_wrapper.dlog("Received Ping Reply TM[17,2]") dedicated_handler = True elif service == 20: param_packet = Service20FsfwTm.unpack( raw_telemetry=packet, time_reader=CdsShortTimestamp.empty() ) if tm_packet.subservice == ParamSubservice.TM_DUMP_REPLY: param_wrapper = Service20ParamDumpWrapper(param_tm=param_packet) try: param = param_wrapper.get_param() obj = obj_id_dict.get(param_wrapper.param_tm.object_id) pw.dlog(f"Received parameter dump TM from {obj}") pw.dlog(f"Parameter: {param}") if param.row == 1 and param.column == 1: try: scalar_param = param.parse_scalar_param() if isinstance(scalar_param, int): pw.dlog(f"Scalar integer parameter: {scalar_param}") elif isinstance(scalar_param, float): pw.dlog(f"Scalar floating point parameter: {scalar_param}") except ValueError as e: pw.dlog("received {e} trying to parse scalar parameter") else: # TODO: Could improve display further by actually displaying a matrix as a # matrix using row and column information pw.dlog( f"Received vector or matrix data: {param.param_data.hex(sep=',')}" ) except ValueError as e: pw.dlog(f"received {e} when trying to parse parameters") except NotImplementedError as e: pw.dlog(f"received {e} when trying to parse parameters") else: pw.dlog(f"unknown subservice {tm_packet.subservice} for parameter service") elif service == 200: tm_packet = Service200FsfwTm.unpack( raw_telemetry=packet, time_reader=CdsShortTimestamp.empty() ) if tm_packet.subservice == ModeSubservice.TM_CANT_REACH_MODE: obj_id = tm_packet.object_id obj_id_obj = obj_id_dict.get(obj_id) retval = tm_packet.return_value string_list = generic_retval_printout(retval) pw.dlog(f"Received Mode Reply from {obj_id_obj}: Can't reach mode.") for string in string_list: pw.dlog(f"Reason: {string}") dedicated_handler = True else: dedicated_handler = False else: LOGGER.info(f"The service {service} is not implemented in Telemetry Factory") tm_packet.print_source_data(PrintFormats.HEX) dedicated_handler = True if not dedicated_handler and tm_packet is not None: printer.handle_long_tm_print(packet_if=tm_packet, info_if=tm_packet) raw_logger.log_tm(tm_packet)