import logging import datetime from eive_tmtc.config.events import get_event_dict from eive_tmtc.config.object_ids import get_object_ids from eive_tmtc.pus_tm.defs import PrintWrapper from eive_tmtc.pus_tm.verification_handler import generic_retval_printout from eive_tmtc.tmtc.acs.subsystem import AcsMode from tmtccmd.tc.pus_200_fsfw_mode import Mode from tmtccmd.tc.pus_201_fsfw_health import FsfwHealth from tmtccmd.tm import Service5Tm from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter from tmtccmd.fsfw import EventInfo from spacepackets.ccsds.time import CdsShortTimestamp _LOGGER = logging.getLogger(__name__) def handle_event_packet(raw_tm: bytes, printer: FsfwTmTcPrinter): pw = PrintWrapper(printer) tm = Service5Tm.unpack(raw_telemetry=raw_tm, time_reader=CdsShortTimestamp.empty()) event_dict = get_event_dict() event_def = tm.event_definition info = event_dict.get(event_def.event_id) if info is None: _LOGGER.warning(f"Event ID {event_def.event_id} has no information") info = EventInfo() info.name = "Unknown event" obj_ids = get_object_ids() obj_id_obj = obj_ids.get(event_def.reporter_id) if obj_id_obj is None: _LOGGER.warning(f"Object ID 0x{event_def.reporter_id.hex(sep=',')} has no name") obj_name = event_def.reporter_id.hex(sep=",") else: obj_name = obj_id_obj.name generic_event_string = f"Object {obj_name} generated Event {info.name} (ID: {event_def.event_id:#04x}) at {tm.time_provider.as_date_time()}" _LOGGER.info(generic_event_string) pw.printer.file_logger.info( f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}: {generic_event_string}" ) specific_handler = True if info.name == "MODE_TRANSITION_FAILED": reason = generic_retval_printout(event_def.param1) for string in reason: pw.dlog(f"Reason from event parameter 1: {string}") pw.dlog(f"Mode, sequence or table: {event_def.param2:#08x}") if info.name == "SUPV_UPDATE_PROGRESS" or info.name == "WRITE_MEMORY_FAILED": additional_event_info = f"Additional info: {info.info}" context = ( f"Progress Percent: {event_def.param1 >> 24 & 0xff} | Sequence Count: {event_def.param1 & 0xffff} " f"| Bytes Written: {event_def.param2}" ) pw.dlog(additional_event_info) pw.dlog(context) if info.name == "MODE_INFO": mode_name = "Unknown" if obj_name == "ACS_SUBSYSTEM": if event_def.param1 == Mode.OFF: mode_name = "Off" elif event_def.param1 == AcsMode.IDLE: mode_name = "Idle" elif event_def.param1 == AcsMode.DETUMBLE: mode_name = "Detumble" elif event_def.param1 == AcsMode.SAFE: mode_name = "Safe" elif event_def.param1 == AcsMode.TARGET_PT: mode_name = "Target Pointing" else: if event_def.param1 == Mode.OFF: mode_name = "Off" elif event_def.param1 == Mode.ON: mode_name = "On" elif event_def.param1 == Mode.NORMAL: mode_name = "Normal" elif event_def.param1 == Mode.RAW: mode_name = "Raw" pw.dlog(f"Mode Number {event_def.param1}, Mode Name {mode_name}") pw.dlog(f"Submode: {event_def.param2}") elif info.name == "CLOCK_SET": old_time = event_def.param1 new_time = event_def.param2 old_time_dt = datetime.datetime.fromtimestamp(old_time, datetime.timezone.utc) new_time_dt = datetime.datetime.fromtimestamp(new_time, datetime.timezone.utc) pw.dlog(f"Old time (UTC): {old_time_dt}") pw.dlog(f"New time (UTC): {new_time_dt}") elif info.name == "CLOCK_DUMP": # param 1 is timeval seconds, param 2 is timeval subsecond milliseconds time = event_def.param1 + event_def.param2 / 1000.0 time_dt = datetime.datetime.fromtimestamp(time, datetime.timezone.utc) pw.dlog(f"Current time: {time_dt}") elif info.name == "HEALTH_INFO": health = FsfwHealth(event_def.param1) pw.dlog(f"{obj_name}: {health!r}") else: specific_handler = False if info.info != "": additional_event_info = f"Additional info: {info.info} | P1: {event_def.param1} | P2: {event_def.param2}" pw.dlog(additional_event_info) if not specific_handler: # printer.handle_long_tm_print(packet_if=tm.pus_tm, info_if=tm.pus_tm) pass