102 lines
3.8 KiB
Python
102 lines
3.8 KiB
Python
import logging
|
|
import os.path
|
|
from datetime import datetime
|
|
from config.object_ids import get_object_ids
|
|
from pus_tm.defs import PrintWrapper
|
|
from pus_tm.verification_handler import generic_retval_printout
|
|
from tmtc.acs.acs_subsystem import AcsModes
|
|
from tmtccmd.tc.pus_200_fsfw_modes import Modes
|
|
|
|
from tmtccmd.tm import Service5Tm
|
|
from tmtccmd.logging import get_console_logger
|
|
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter
|
|
from tmtccmd.fsfw import parse_fsfw_events_csv, EventDictT, EventInfo
|
|
|
|
|
|
LOGGER = get_console_logger()
|
|
DEFAULT_EVENTS_CSV_PATH = "config/events.csv"
|
|
__EVENT_DICT = None
|
|
|
|
|
|
def get_event_dict() -> EventDictT:
|
|
global __EVENT_DICT
|
|
if __EVENT_DICT is None:
|
|
if os.path.exists(DEFAULT_EVENTS_CSV_PATH):
|
|
__EVENT_DICT = parse_fsfw_events_csv(DEFAULT_EVENTS_CSV_PATH)
|
|
else:
|
|
LOGGER.warning(f"No Event CSV file found at {DEFAULT_EVENTS_CSV_PATH}")
|
|
__EVENT_DICT = dict()
|
|
return __EVENT_DICT
|
|
|
|
|
|
def handle_event_packet(raw_tm: bytes, printer: FsfwTmTcPrinter):
|
|
pw = PrintWrapper(printer)
|
|
tm = Service5Tm.unpack(raw_telemetry=raw_tm)
|
|
event_dict = get_event_dict()
|
|
info = event_dict.get(tm.event_id)
|
|
if info is None:
|
|
LOGGER.warning(f"Event ID {tm.event_id} has no information")
|
|
info = EventInfo()
|
|
info.name = "Unknown event"
|
|
obj_ids = get_object_ids()
|
|
obj_id_obj = obj_ids.get(tm.reporter_id.as_bytes)
|
|
if obj_id_obj is None:
|
|
LOGGER.warning(f"Object ID 0x{tm.reporter_id.as_hex_string} has no name")
|
|
obj_name = tm.reporter_id.as_hex_string
|
|
else:
|
|
obj_name = obj_id_obj.name
|
|
generic_event_string = (
|
|
f"Object {obj_name} generated Event {tm.event_id} | {info.name}"
|
|
)
|
|
pw.printer.file_logger.info(
|
|
f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}: {generic_event_string}"
|
|
)
|
|
LOGGER.info(generic_event_string)
|
|
specific_handler = True
|
|
if info.name == "MODE_TRANSITION_FAILED":
|
|
reason = generic_retval_printout(tm.param_1)
|
|
for string in reason:
|
|
pw.dlog(f"Reason from event parameter 1: {string}")
|
|
pw.dlog(f"Mode, sequence or table: {tm.param_2:#08x}")
|
|
if info.name == "SUPV_UPDATE_PROGRESS" or info.name == "WRITE_MEMORY_FAILED":
|
|
additional_event_info = f"Additional info: {info.info}"
|
|
context = (
|
|
f"Progress Percent: {tm.param_1 >> 24 & 0xff} | Sequence Count: {tm.param_1 & 0xffff} "
|
|
f"| Bytes Written: {tm.param_2}"
|
|
)
|
|
pw.dlog(additional_event_info)
|
|
pw.dlog(context)
|
|
if info.name == "MODE_INFO":
|
|
mode_name = "Unknown"
|
|
if obj_name == "ACS_SUBSYSTEM":
|
|
if tm.param_1 == Modes.OFF:
|
|
mode_name = "Off"
|
|
elif tm.param_1 == AcsModes.IDLE:
|
|
mode_name = "Idle"
|
|
elif tm.param_1 == AcsModes.DETUMBLE:
|
|
mode_name = "Detumble"
|
|
elif tm.param_1 == AcsModes.SAFE:
|
|
mode_name = "Safe"
|
|
elif tm.param_1 == AcsModes.TARGET_PT:
|
|
mode_name = "Target Pointing"
|
|
else:
|
|
if tm.param_1 == Modes.OFF:
|
|
mode_name = "Off"
|
|
elif tm.param_1 == Modes.ON:
|
|
mode_name = "On"
|
|
elif tm.param_1 == Modes.NORMAL:
|
|
mode_name = "Normal"
|
|
elif tm.param_1 == Modes.RAW:
|
|
mode_name = "Raw"
|
|
pw.dlog(f"Mode Number {tm.param_1}, Mode Name {mode_name}")
|
|
pw.dlog(f"Submode: {tm.param_2}")
|
|
else:
|
|
specific_handler = False
|
|
if info.info != "":
|
|
additional_event_info = (
|
|
f"Additional info: {info.info} | P1: {tm.param_1} | P2: {tm.param_2}"
|
|
)
|
|
pw.dlog(additional_event_info)
|
|
if not specific_handler:
|
|
printer.handle_long_tm_print(packet_if=tm, info_if=tm)
|