From 2e0cd0fcf41726f00ec14fdbfc7c620e5a43ffe7 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Fri, 4 Mar 2022 11:56:42 +0100 Subject: [PATCH] added event and obj ID parsing --- config/object_ids.py | 56 +++++++++++++------------- pus_tc/devs/ploc_memory_dumper.py | 4 +- pus_tc/devs/ploc_mpsoc.py | 2 +- pus_tc/devs/rad_sensor.py | 4 +- pus_tm/event_handler.py | 66 ++++++++++++++++++++++++++++--- tmtccli.py | 2 +- tmtccmd | 2 +- 7 files changed, 93 insertions(+), 43 deletions(-) diff --git a/config/object_ids.py b/config/object_ids.py index 63e9732..87f248f 100644 --- a/config/object_ids.py +++ b/config/object_ids.py @@ -3,7 +3,14 @@ @details Template configuration file. Copy this folder to the TMTC commander root and adapt it to your needs. """ -from typing import Dict +import csv +import copy +import os.path +from tmtccmd.pus.obj_id import ObjectId, ObjectIdDictT + +DEFAULT_OBJECTS_CSV_PATH = "config/objects.csv" +__OBJECT_ID_DICT = None + # Core Object IDs CORE_CONTROLLER_ID = bytes([0x43, 0x00, 0x00, 0x03]) @@ -59,31 +66,22 @@ PLOC_MPSOC_ID = bytes([0x44, 0x33, 0x00, 0x15]) PL_PCDU_ID = bytes([0x44, 0x30, 0x00, 0x00]) -def get_object_ids() -> Dict[bytes, list]: - object_id_dict = { - PUS_SERVICE_17_ID: "PUS Service 17", - TEST_DEVICE_ID: "Test Device", - P60_DOCK_HANDLER: "P60", - PDU_1_HANDLER_ID: "PCDU PDU1 Handler", - PDU_2_HANDLER_ID: "PCDU PDU2 Handler", - ACU_HANDLER_ID: "ACU Handler", - TMP_1075_1_HANDLER_ID: "TMP 1075 Handler 1", - TMP_1075_2_HANDLER_ID: "TMP 1075 Handler 2", - HEATER_ID: "Heater", - PCDU_HANDLER_ID: "PCDU", - SOLAR_ARRAY_DEPLOYMENT_ID: "Solar Array Deployment", - RW1_ID: "Reaction Wheel 1", - RW2_ID: "Reaction Wheel 2", - RW3_ID: "Reaction Wheel 3", - RW4_ID: "Reaction Wheel 4", - GPS_HANDLER_0_ID: "GPS 0", - GPS_HANDLER_1_ID: "GPS 1", - RAD_SENSOR_ID: "Radiation Sensor", - PLOC_SUPV_ID: "PLOC Supervisor", - PLOC_MPSOC_ID: "PLOC MPSoC", - CORE_CONTROLLER_ID: "Core Controller", - CCSDS_HANDLER_ID: "CCSDS Handler", - PDEC_HANDLER_ID: "PDEC Handler", - STAR_TRACKER_ID: "Star Tracker Handler", - } - return object_id_dict +class ObjectInfo: + id: int = 0 + name: str = "" + + +def get_object_ids() -> ObjectIdDictT: + global __OBJECT_ID_DICT + if __OBJECT_ID_DICT is None and os.path.exists(DEFAULT_OBJECTS_CSV_PATH): + __OBJECT_ID_DICT = dict() + obj_id = ObjectId(object_id=0) + with open(DEFAULT_OBJECTS_CSV_PATH) as csvfile: + csv_reader = csv.reader(csvfile, delimiter=";") + for row in csv_reader: + obj_id.id = int(row[0], 16) + obj_id.name = row[1] + __OBJECT_ID_DICT.update( + {obj_id.as_bytes: copy.copy(obj_id)} + ) + return __OBJECT_ID_DICT diff --git a/pus_tc/devs/ploc_memory_dumper.py b/pus_tc/devs/ploc_memory_dumper.py index dae7a02..36edc8e 100644 --- a/pus_tc/devs/ploc_memory_dumper.py +++ b/pus_tc/devs/ploc_memory_dumper.py @@ -18,9 +18,7 @@ class ActionIds: DUMP_MRAM = 1 -def pack_ploc_memory_dumper_cmd( - object_id: bytearray, tc_queue: TcQueueT, op_code: str -): +def pack_ploc_memory_dumper_cmd(object_id: bytearray, tc_queue: TcQueueT, op_code: str): tc_queue.appendleft( ( QueueCommands.PRINT, diff --git a/pus_tc/devs/ploc_mpsoc.py b/pus_tc/devs/ploc_mpsoc.py index 5c6c09c..5fc2224 100644 --- a/pus_tc/devs/ploc_mpsoc.py +++ b/pus_tc/devs/ploc_mpsoc.py @@ -63,7 +63,7 @@ def pack_ploc_mpsoc_test_into(object_id: bytearray, tc_queue: TcQueueT) -> TcQue def generate_write_mem_command( - object_id: bytearray, memory_address: bytearray, memory_data: int + object_id: bytearray, memory_address: bytes, memory_data: int ) -> bytearray: """This function generates the command to write to a memory address within the PLOC @param object_id The object id of the PlocHandler diff --git a/pus_tc/devs/rad_sensor.py b/pus_tc/devs/rad_sensor.py index 1b4ab69..8ea77d1 100644 --- a/pus_tc/devs/rad_sensor.py +++ b/pus_tc/devs/rad_sensor.py @@ -13,9 +13,7 @@ from spacepackets.ecss.tc import PusTelecommand from pus_tc.service_200_mode import pack_mode_data -def pack_rad_sensor_test_into( - object_id: bytearray, tc_queue: TcQueueT, op_code: str -): +def pack_rad_sensor_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: str): tc_queue.appendleft( ( QueueCommands.PRINT, diff --git a/pus_tm/event_handler.py b/pus_tm/event_handler.py index 94ed680..9632ced 100644 --- a/pus_tm/event_handler.py +++ b/pus_tm/event_handler.py @@ -1,18 +1,74 @@ import csv +import enum +from typing import Optional import os.path -import config.object_ids as obj_ids +from config.object_ids import get_object_ids, PUS_SERVICE_17_ID +from tmtccmd.utility.logger import get_console_logger +LOGGER = get_console_logger() DEFAULT_EVENTS_CSV_PATH = "config/events.csv" -RETURNVALUE_DICT = None +__RETURNVALUE_DICT = None + + +class Severity(enum.IntEnum): + INFO = (0,) + LOW = (1,) + MEDIUM = (2,) + HIGH = 3 + + +def str_to_severity(string: str) -> Optional[Severity]: + if string == "INFO": + return Severity.INFO + elif string == "LOW": + return Severity.LOW + elif string == "MEDIUM": + return Severity.MEDIUM + elif string == "HIGH": + return Severity.HIGH + + +class ReturnValueInfo: + id: int = 0 + name: str = "" + severity: str = "" + info: str = "" + file_location: str = "" def handle_event_packet( object_id: bytes, event_id: int, param_1: int, param_2: int ) -> str: - if os.path.exists(DEFAULT_EVENTS_CSV_PATH) and RETURNVALUE_DICT is None: + object_id = PUS_SERVICE_17_ID + event_id = 2601 + global __RETURNVALUE_DICT + if os.path.exists(DEFAULT_EVENTS_CSV_PATH) and __RETURNVALUE_DICT is None: + __RETURNVALUE_DICT = dict() with open(DEFAULT_EVENTS_CSV_PATH) as csvfile: csv_reader = csv.reader(csvfile, delimiter=";") + info = ReturnValueInfo() for row in csv_reader: - print(row) - return "" + info.id = int(row[0]) + info.name = row[2] + info.severity = str_to_severity(row[3]) + info.info = row[4] + info.file_location = row[5] + __RETURNVALUE_DICT.update( + {info.id: info} + ) + generic_event_string = "" + additional_event_info = "" + if __RETURNVALUE_DICT is not None: + info = __RETURNVALUE_DICT.get(event_id) + obj_ids = get_object_ids() + obj_id_obj = obj_ids.get(object_id) + if obj_id_obj is None: + LOGGER.warning(f"Object ID 0x{object_id.hex()} has no name") + obj_name = object_id.hex() + else: + obj_name = obj_id_obj.name + generic_event_string = f"Object {obj_name} generated Event {event_id} | {info.name}" + if info.info != "": + additional_event_info = f" | Additional info: {info.info}" + return generic_event_string + additional_event_info diff --git a/tmtccli.py b/tmtccli.py index b60dc95..70ffefe 100755 --- a/tmtccli.py +++ b/tmtccli.py @@ -66,6 +66,7 @@ from pus_tm.factory_hook import ccsds_tm_handler def main(): from pus_tm.event_handler import handle_event_packet + hook_obj = EiveHookObject() print(f"-- eive tmtc version {__version__} --") print(f"-- spacepackets version {spacepackets.__version__} --") @@ -75,7 +76,6 @@ def main(): ccsds_handler.add_tm_handler( apid=PUS_APID, pus_tm_handler=ccsds_tm_handler, max_queue_len=50 ) - handle_event_packet(object_id=bytes([0x00, 0x00, 0x00, 0x00]), event_id=0, param_1=0, param_2=0) add_ccsds_handler(ccsds_handler) run_tmtc_commander(False) diff --git a/tmtccmd b/tmtccmd index c315efe..f1bdd4e 160000 --- a/tmtccmd +++ b/tmtccmd @@ -1 +1 @@ -Subproject commit c315efe165b5ef68e95fd78f8019360d23cac8de +Subproject commit f1bdd4eb0fd829d49784cbffbf02c655fcf11744