from datetime import datetime from eive_tmtc.config.events import get_event_dict from eive_tmtc.config.object_ids import get_object_ids from eive_tmtc.pus_tm.defs import PrintWrapper from eive_tmtc.pus_tm.verification_handler import generic_retval_printout from eive_tmtc.tmtc.acs.subsystem import AcsMode from tmtccmd.tc.pus_200_fsfw_mode import Mode from tmtccmd.tm import Service5Tm from tmtccmd.logging import get_console_logger from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter from tmtccmd.fsfw import EventInfo from spacepackets.ccsds.time import CdsShortTimestamp LOGGER = get_console_logger() def handle_event_packet(raw_tm: bytes, printer: FsfwTmTcPrinter): pw = PrintWrapper(printer) tm = Service5Tm.unpack(raw_telemetry=raw_tm, time_reader=CdsShortTimestamp.empty()) event_dict = get_event_dict() event_def = tm.event_definition info = event_dict.get(event_def.event_id) if info is None: LOGGER.warning(f"Event ID {event_def.event_id} has no information") info = EventInfo() info.name = "Unknown event" obj_ids = get_object_ids() obj_id_obj = obj_ids.get(event_def.reporter_id) if obj_id_obj is None: LOGGER.warning(f"Object ID 0x{event_def.reporter_id.hex(sep=',')} has no name") obj_name = event_def.reporter_id.hex(sep=",") else: obj_name = obj_id_obj.name generic_event_string = f"Object {obj_name} generated Event {info.name} (ID: {event_def.event_id:#04x}) at {tm.time_provider.as_date_time()}" pw.printer.file_logger.info( f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}: {generic_event_string}" ) LOGGER.info(generic_event_string) specific_handler = True if info.name == "MODE_TRANSITION_FAILED": reason = generic_retval_printout(event_def.param1) for string in reason: pw.dlog(f"Reason from event parameter 1: {string}") pw.dlog(f"Mode, sequence or table: {event_def.param2:#08x}") if info.name == "SUPV_UPDATE_PROGRESS" or info.name == "WRITE_MEMORY_FAILED": additional_event_info = f"Additional info: {info.info}" context = ( f"Progress Percent: {event_def.param1 >> 24 & 0xff} | Sequence Count: {event_def.param1 & 0xffff} " f"| Bytes Written: {event_def.param2}" ) pw.dlog(additional_event_info) pw.dlog(context) if info.name == "MODE_INFO": mode_name = "Unknown" if obj_name == "ACS_SUBSYSTEM": if event_def.param1 == Mode.OFF: mode_name = "Off" elif event_def.param1 == AcsMode.IDLE: mode_name = "Idle" elif event_def.param1 == AcsMode.DETUMBLE: mode_name = "Detumble" elif event_def.param1 == AcsMode.SAFE: mode_name = "Safe" elif event_def.param1 == AcsMode.TARGET_PT: mode_name = "Target Pointing" else: if event_def.param1 == Mode.OFF: mode_name = "Off" elif event_def.param1 == Mode.ON: mode_name = "On" elif event_def.param1 == Mode.NORMAL: mode_name = "Normal" elif event_def.param1 == Mode.RAW: mode_name = "Raw" pw.dlog(f"Mode Number {event_def.param1}, Mode Name {mode_name}") pw.dlog(f"Submode: {event_def.param2}") else: specific_handler = False if info.info != "": additional_event_info = f"Additional info: {info.info} | P1: {event_def.param1} | P2: {event_def.param2}" pw.dlog(additional_event_info) if not specific_handler: # printer.handle_long_tm_print(packet_if=tm.pus_tm, info_if=tm.pus_tm) pass