import logging import os.path from datetime import datetime from config.object_ids import get_object_ids from pus_tm.defs import PrintWrapper from pus_tm.verification_handler import generic_retval_printout from tmtc.acs_subsystem import AcsModes from tmtccmd.tc.pus_200_fsfw_modes import Modes from tmtccmd.tm import Service5Tm from tmtccmd.logging import get_console_logger from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter from tmtccmd.fsfw import parse_fsfw_events_csv, EventDictT, EventInfo LOGGER = get_console_logger() DEFAULT_EVENTS_CSV_PATH = "config/events.csv" __EVENT_DICT = None def get_event_dict() -> EventDictT: global __EVENT_DICT if __EVENT_DICT is None: if os.path.exists(DEFAULT_EVENTS_CSV_PATH): __EVENT_DICT = parse_fsfw_events_csv(DEFAULT_EVENTS_CSV_PATH) else: LOGGER.warning(f"No Event CSV file found at {DEFAULT_EVENTS_CSV_PATH}") __EVENT_DICT = dict() return __EVENT_DICT def handle_event_packet(raw_tm: bytes, printer: FsfwTmTcPrinter): pw = PrintWrapper(printer) tm = Service5Tm.unpack(raw_telemetry=raw_tm) event_dict = get_event_dict() info = event_dict.get(tm.event_id) if info is None: LOGGER.warning(f"Event ID {tm.event_id} has no information") info = EventInfo() info.name = "Unknown event" obj_ids = get_object_ids() obj_id_obj = obj_ids.get(tm.reporter_id.as_bytes) if obj_id_obj is None: LOGGER.warning(f"Object ID 0x{tm.reporter_id.as_hex_string} has no name") obj_name = tm.reporter_id.as_hex_string else: obj_name = obj_id_obj.name generic_event_string = ( f"Object {obj_name} generated Event {tm.event_id} | {info.name}" ) pw.printer.file_logger.info( f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}: {generic_event_string}" ) LOGGER.info(generic_event_string) specific_handler = True if info.name == "MODE_TRANSITION_FAILED": reason = generic_retval_printout(tm.param_1) for string in reason: pw.dlog(f"Reason from event parameter 1: {string}") pw.dlog(f"Mode, sequence or table: {tm.param_2:#08x}") if info.name == "SUPV_UPDATE_PROGRESS" or info.name == "WRITE_MEMORY_FAILED": additional_event_info = f"Additional info: {info.info}" context = ( f"Progress Percent: {tm.param_1 >> 24 & 0xff} | Sequence Count: {tm.param_1 & 0xffff} " f"| Bytes Written: {tm.param_2}" ) pw.dlog(additional_event_info) pw.dlog(context) if info.name == "MODE_INFO": mode_name = "Unknown" if obj_name == "ACS_SUBSYSTEM": if tm.param_1 == Modes.OFF: mode_name = "Off" elif tm.param_1 == AcsModes.IDLE: mode_name = "Idle" elif tm.param_1 == AcsModes.DETUMBLE: mode_name = "Detumble" elif tm.param_1 == AcsModes.SAFE: mode_name = "Safe" elif tm.param_1 == AcsModes.TARGET_PT: mode_name = "Target Pointing" else: if tm.param_1 == Modes.OFF: mode_name = "Off" elif tm.param_1 == Modes.ON: mode_name = "On" elif tm.param_1 == Modes.NORMAL: mode_name = "Normal" elif tm.param_1 == Modes.RAW: mode_name = "Raw" pw.dlog(f"Mode Number {tm.param_1}, Mode Name {mode_name}") pw.dlog(f"Submode: {tm.param_2}") else: specific_handler = False if info.info != "": additional_event_info = ( f"Additional info: {info.info} | P1: {tm.param_1} | P2: {tm.param_2}" ) pw.dlog(additional_event_info) if not specific_handler: printer.handle_long_tm_print(packet_if=tm, info_if=tm)