added event and obj ID parsing

This commit is contained in:
Robin Mueller 2022-03-04 11:56:42 +01:00
parent 47f982e4f7
commit 2e0cd0fcf4
No known key found for this signature in database
GPG Key ID: 11D4952C8CCEF814
7 changed files with 93 additions and 43 deletions

View File

@ -3,7 +3,14 @@
@details Template configuration file. Copy this folder to the TMTC commander root and adapt @details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs. it to your needs.
""" """
from typing import Dict import csv
import copy
import os.path
from tmtccmd.pus.obj_id import ObjectId, ObjectIdDictT
DEFAULT_OBJECTS_CSV_PATH = "config/objects.csv"
__OBJECT_ID_DICT = None
# Core Object IDs # Core Object IDs
CORE_CONTROLLER_ID = bytes([0x43, 0x00, 0x00, 0x03]) CORE_CONTROLLER_ID = bytes([0x43, 0x00, 0x00, 0x03])
@ -59,31 +66,22 @@ PLOC_MPSOC_ID = bytes([0x44, 0x33, 0x00, 0x15])
PL_PCDU_ID = bytes([0x44, 0x30, 0x00, 0x00]) PL_PCDU_ID = bytes([0x44, 0x30, 0x00, 0x00])
def get_object_ids() -> Dict[bytes, list]: class ObjectInfo:
object_id_dict = { id: int = 0
PUS_SERVICE_17_ID: "PUS Service 17", name: str = ""
TEST_DEVICE_ID: "Test Device",
P60_DOCK_HANDLER: "P60",
PDU_1_HANDLER_ID: "PCDU PDU1 Handler", def get_object_ids() -> ObjectIdDictT:
PDU_2_HANDLER_ID: "PCDU PDU2 Handler", global __OBJECT_ID_DICT
ACU_HANDLER_ID: "ACU Handler", if __OBJECT_ID_DICT is None and os.path.exists(DEFAULT_OBJECTS_CSV_PATH):
TMP_1075_1_HANDLER_ID: "TMP 1075 Handler 1", __OBJECT_ID_DICT = dict()
TMP_1075_2_HANDLER_ID: "TMP 1075 Handler 2", obj_id = ObjectId(object_id=0)
HEATER_ID: "Heater", with open(DEFAULT_OBJECTS_CSV_PATH) as csvfile:
PCDU_HANDLER_ID: "PCDU", csv_reader = csv.reader(csvfile, delimiter=";")
SOLAR_ARRAY_DEPLOYMENT_ID: "Solar Array Deployment", for row in csv_reader:
RW1_ID: "Reaction Wheel 1", obj_id.id = int(row[0], 16)
RW2_ID: "Reaction Wheel 2", obj_id.name = row[1]
RW3_ID: "Reaction Wheel 3", __OBJECT_ID_DICT.update(
RW4_ID: "Reaction Wheel 4", {obj_id.as_bytes: copy.copy(obj_id)}
GPS_HANDLER_0_ID: "GPS 0", )
GPS_HANDLER_1_ID: "GPS 1", return __OBJECT_ID_DICT
RAD_SENSOR_ID: "Radiation Sensor",
PLOC_SUPV_ID: "PLOC Supervisor",
PLOC_MPSOC_ID: "PLOC MPSoC",
CORE_CONTROLLER_ID: "Core Controller",
CCSDS_HANDLER_ID: "CCSDS Handler",
PDEC_HANDLER_ID: "PDEC Handler",
STAR_TRACKER_ID: "Star Tracker Handler",
}
return object_id_dict

View File

@ -18,9 +18,7 @@ class ActionIds:
DUMP_MRAM = 1 DUMP_MRAM = 1
def pack_ploc_memory_dumper_cmd( def pack_ploc_memory_dumper_cmd(object_id: bytearray, tc_queue: TcQueueT, op_code: str):
object_id: bytearray, tc_queue: TcQueueT, op_code: str
):
tc_queue.appendleft( tc_queue.appendleft(
( (
QueueCommands.PRINT, QueueCommands.PRINT,

View File

@ -63,7 +63,7 @@ def pack_ploc_mpsoc_test_into(object_id: bytearray, tc_queue: TcQueueT) -> TcQue
def generate_write_mem_command( def generate_write_mem_command(
object_id: bytearray, memory_address: bytearray, memory_data: int object_id: bytearray, memory_address: bytes, memory_data: int
) -> bytearray: ) -> bytearray:
"""This function generates the command to write to a memory address within the PLOC """This function generates the command to write to a memory address within the PLOC
@param object_id The object id of the PlocHandler @param object_id The object id of the PlocHandler

View File

@ -13,9 +13,7 @@ from spacepackets.ecss.tc import PusTelecommand
from pus_tc.service_200_mode import pack_mode_data from pus_tc.service_200_mode import pack_mode_data
def pack_rad_sensor_test_into( def pack_rad_sensor_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: str):
object_id: bytearray, tc_queue: TcQueueT, op_code: str
):
tc_queue.appendleft( tc_queue.appendleft(
( (
QueueCommands.PRINT, QueueCommands.PRINT,

View File

@ -1,18 +1,74 @@
import csv import csv
import enum
from typing import Optional
import os.path import os.path
import config.object_ids as obj_ids from config.object_ids import get_object_ids, PUS_SERVICE_17_ID
from tmtccmd.utility.logger import get_console_logger
LOGGER = get_console_logger()
DEFAULT_EVENTS_CSV_PATH = "config/events.csv" DEFAULT_EVENTS_CSV_PATH = "config/events.csv"
RETURNVALUE_DICT = None __RETURNVALUE_DICT = None
class Severity(enum.IntEnum):
INFO = (0,)
LOW = (1,)
MEDIUM = (2,)
HIGH = 3
def str_to_severity(string: str) -> Optional[Severity]:
if string == "INFO":
return Severity.INFO
elif string == "LOW":
return Severity.LOW
elif string == "MEDIUM":
return Severity.MEDIUM
elif string == "HIGH":
return Severity.HIGH
class ReturnValueInfo:
id: int = 0
name: str = ""
severity: str = ""
info: str = ""
file_location: str = ""
def handle_event_packet( def handle_event_packet(
object_id: bytes, event_id: int, param_1: int, param_2: int object_id: bytes, event_id: int, param_1: int, param_2: int
) -> str: ) -> str:
if os.path.exists(DEFAULT_EVENTS_CSV_PATH) and RETURNVALUE_DICT is None: object_id = PUS_SERVICE_17_ID
event_id = 2601
global __RETURNVALUE_DICT
if os.path.exists(DEFAULT_EVENTS_CSV_PATH) and __RETURNVALUE_DICT is None:
__RETURNVALUE_DICT = dict()
with open(DEFAULT_EVENTS_CSV_PATH) as csvfile: with open(DEFAULT_EVENTS_CSV_PATH) as csvfile:
csv_reader = csv.reader(csvfile, delimiter=";") csv_reader = csv.reader(csvfile, delimiter=";")
info = ReturnValueInfo()
for row in csv_reader: for row in csv_reader:
print(row) info.id = int(row[0])
return "" info.name = row[2]
info.severity = str_to_severity(row[3])
info.info = row[4]
info.file_location = row[5]
__RETURNVALUE_DICT.update(
{info.id: info}
)
generic_event_string = ""
additional_event_info = ""
if __RETURNVALUE_DICT is not None:
info = __RETURNVALUE_DICT.get(event_id)
obj_ids = get_object_ids()
obj_id_obj = obj_ids.get(object_id)
if obj_id_obj is None:
LOGGER.warning(f"Object ID 0x{object_id.hex()} has no name")
obj_name = object_id.hex()
else:
obj_name = obj_id_obj.name
generic_event_string = f"Object {obj_name} generated Event {event_id} | {info.name}"
if info.info != "":
additional_event_info = f" | Additional info: {info.info}"
return generic_event_string + additional_event_info

View File

@ -66,6 +66,7 @@ from pus_tm.factory_hook import ccsds_tm_handler
def main(): def main():
from pus_tm.event_handler import handle_event_packet from pus_tm.event_handler import handle_event_packet
hook_obj = EiveHookObject() hook_obj = EiveHookObject()
print(f"-- eive tmtc version {__version__} --") print(f"-- eive tmtc version {__version__} --")
print(f"-- spacepackets version {spacepackets.__version__} --") print(f"-- spacepackets version {spacepackets.__version__} --")
@ -75,7 +76,6 @@ def main():
ccsds_handler.add_tm_handler( ccsds_handler.add_tm_handler(
apid=PUS_APID, pus_tm_handler=ccsds_tm_handler, max_queue_len=50 apid=PUS_APID, pus_tm_handler=ccsds_tm_handler, max_queue_len=50
) )
handle_event_packet(object_id=bytes([0x00, 0x00, 0x00, 0x00]), event_id=0, param_1=0, param_2=0)
add_ccsds_handler(ccsds_handler) add_ccsds_handler(ccsds_handler)
run_tmtc_commander(False) run_tmtc_commander(False)

@ -1 +1 @@
Subproject commit c315efe165b5ef68e95fd78f8019360d23cac8de Subproject commit f1bdd4eb0fd829d49784cbffbf02c655fcf11744