Compare commits

...

2 Commits

Author SHA1 Message Date
Robin Mueller
454ef636a1 changes for updated tmtccmd 2022-03-04 14:27:19 +01:00
Robin Mueller
2e0cd0fcf4 added event and obj ID parsing 2022-03-04 11:56:42 +01:00
9 changed files with 90 additions and 49 deletions

View File

@@ -8,6 +8,8 @@ from tmtccmd.config.definitions import (
) )
from tmtccmd.tm.service_3_base import Service3Base from tmtccmd.tm.service_3_base import Service3Base
from tmtccmd.tc.definitions import TcQueueT from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.utility.retval import RetvalDictT
from tmtccmd.pus.obj_id import ObjectIdDictT
from tmtccmd.com_if.com_interface_base import CommunicationInterface from tmtccmd.com_if.com_interface_base import CommunicationInterface
from tmtccmd.core.backend import TmTcHandler from tmtccmd.core.backend import TmTcHandler
from tmtccmd.config.hook import TmTcHookBase from tmtccmd.config.hook import TmTcHookBase
@@ -15,6 +17,7 @@ from tmtccmd.utility.tmtc_printer import TmTcPrinter
from tmtccmd.config.globals import OpCodeDictKeys from tmtccmd.config.globals import OpCodeDictKeys
from config.definitions import CustomServiceList from config.definitions import CustomServiceList
from config.retvals import get_retval_dict
class EiveHookObject(TmTcHookBase): class EiveHookObject(TmTcHookBase):
@@ -70,9 +73,8 @@ class EiveHookObject(TmTcHookBase):
service=service, op_code=op_code, service_queue=service_queue service=service, op_code=op_code, service_queue=service_queue
) )
def get_object_ids(self) -> Dict[bytes, list]: def get_object_ids(self) -> ObjectIdDictT:
from config.object_ids import get_object_ids from config.object_ids import get_object_ids
return get_object_ids() return get_object_ids()
@staticmethod @staticmethod
@@ -108,6 +110,9 @@ class EiveHookObject(TmTcHookBase):
object_id=object_id, event_id=event_id, param_1=param_1, param_2=param_2 object_id=object_id, event_id=event_id, param_1=param_1, param_2=param_2
) )
def get_retval_dict(self) -> RetvalDictT:
return get_retval_dict()
def get_eive_service_op_code_dict(service_op_code_dict: ServiceOpCodeDictT): def get_eive_service_op_code_dict(service_op_code_dict: ServiceOpCodeDictT):
from pus_tc.cmd_definitions import ( from pus_tc.cmd_definitions import (

View File

@@ -3,7 +3,15 @@
@details Template configuration file. Copy this folder to the TMTC commander root and adapt @details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs. it to your needs.
""" """
from typing import Dict import os.path
from tmtccmd.pus.obj_id import ObjectIdDictT
from tmtccmd.utility.fsfw import parse_fsfw_objects_csv
from tmtccmd.utility.logger import get_console_logger
LOGGER = get_console_logger()
DEFAULT_OBJECTS_CSV_PATH = "config/objects.csv"
__OBJECT_ID_DICT = None
# Core Object IDs # Core Object IDs
CORE_CONTROLLER_ID = bytes([0x43, 0x00, 0x00, 0x03]) CORE_CONTROLLER_ID = bytes([0x43, 0x00, 0x00, 0x03])
@@ -59,31 +67,18 @@ PLOC_MPSOC_ID = bytes([0x44, 0x33, 0x00, 0x15])
PL_PCDU_ID = bytes([0x44, 0x30, 0x00, 0x00]) PL_PCDU_ID = bytes([0x44, 0x30, 0x00, 0x00])
def get_object_ids() -> Dict[bytes, list]: class ObjectInfo:
object_id_dict = { id: int = 0
PUS_SERVICE_17_ID: "PUS Service 17", name: str = ""
TEST_DEVICE_ID: "Test Device",
P60_DOCK_HANDLER: "P60",
PDU_1_HANDLER_ID: "PCDU PDU1 Handler", def get_object_ids() -> ObjectIdDictT:
PDU_2_HANDLER_ID: "PCDU PDU2 Handler", global __OBJECT_ID_DICT
ACU_HANDLER_ID: "ACU Handler", if not os.path.exists(DEFAULT_OBJECTS_CSV_PATH):
TMP_1075_1_HANDLER_ID: "TMP 1075 Handler 1", LOGGER.warning(f"No Objects CSV file found at {DEFAULT_OBJECTS_CSV_PATH}")
TMP_1075_2_HANDLER_ID: "TMP 1075 Handler 2", if __OBJECT_ID_DICT is None:
HEATER_ID: "Heater", if os.path.exists(DEFAULT_OBJECTS_CSV_PATH):
PCDU_HANDLER_ID: "PCDU", __OBJECT_ID_DICT = parse_fsfw_objects_csv(csv_file=DEFAULT_OBJECTS_CSV_PATH)
SOLAR_ARRAY_DEPLOYMENT_ID: "Solar Array Deployment", else:
RW1_ID: "Reaction Wheel 1", __OBJECT_ID_DICT = dict()
RW2_ID: "Reaction Wheel 2", return __OBJECT_ID_DICT
RW3_ID: "Reaction Wheel 3",
RW4_ID: "Reaction Wheel 4",
GPS_HANDLER_0_ID: "GPS 0",
GPS_HANDLER_1_ID: "GPS 1",
RAD_SENSOR_ID: "Radiation Sensor",
PLOC_SUPV_ID: "PLOC Supervisor",
PLOC_MPSOC_ID: "PLOC MPSoC",
CORE_CONTROLLER_ID: "Core Controller",
CCSDS_HANDLER_ID: "CCSDS Handler",
PDEC_HANDLER_ID: "PDEC Handler",
STAR_TRACKER_ID: "Star Tracker Handler",
}
return object_id_dict

22
config/retvals.py Normal file
View File

@@ -0,0 +1,22 @@
import os
from tmtccmd.utility.fsfw import parse_fsfw_returnvalues_csv, RetvalDictT
from tmtccmd.utility.logger import get_console_logger
DEFAULT_RETVAL_CSV_NAME = "config/returnvalues.csv"
__RETVAL_DICT = None
LOGGER = get_console_logger()
def get_retval_dict() -> RetvalDictT:
global __RETVAL_DICT
if __RETVAL_DICT is None:
if os.path.exists(DEFAULT_RETVAL_CSV_NAME):
__RETVAL_DICT = parse_fsfw_returnvalues_csv(
csv_file=DEFAULT_RETVAL_CSV_NAME
)
else:
LOGGER.warning(
f"No Return Value CSV file found at {DEFAULT_RETVAL_CSV_NAME}"
)
__RETVAL_DICT = dict()
return __RETVAL_DICT

View File

@@ -18,9 +18,7 @@ class ActionIds:
DUMP_MRAM = 1 DUMP_MRAM = 1
def pack_ploc_memory_dumper_cmd( def pack_ploc_memory_dumper_cmd(object_id: bytearray, tc_queue: TcQueueT, op_code: str):
object_id: bytearray, tc_queue: TcQueueT, op_code: str
):
tc_queue.appendleft( tc_queue.appendleft(
( (
QueueCommands.PRINT, QueueCommands.PRINT,

View File

@@ -63,7 +63,7 @@ def pack_ploc_mpsoc_test_into(object_id: bytearray, tc_queue: TcQueueT) -> TcQue
def generate_write_mem_command( def generate_write_mem_command(
object_id: bytearray, memory_address: bytearray, memory_data: int object_id: bytearray, memory_address: bytes, memory_data: int
) -> bytearray: ) -> bytearray:
"""This function generates the command to write to a memory address within the PLOC """This function generates the command to write to a memory address within the PLOC
@param object_id The object id of the PlocHandler @param object_id The object id of the PlocHandler

View File

@@ -13,9 +13,7 @@ from spacepackets.ecss.tc import PusTelecommand
from pus_tc.service_200_mode import pack_mode_data from pus_tc.service_200_mode import pack_mode_data
def pack_rad_sensor_test_into( def pack_rad_sensor_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: str):
object_id: bytearray, tc_queue: TcQueueT, op_code: str
):
tc_queue.appendleft( tc_queue.appendleft(
( (
QueueCommands.PRINT, QueueCommands.PRINT,

View File

@@ -1,18 +1,41 @@
import csv
import os.path import os.path
import config.object_ids as obj_ids from config.object_ids import get_object_ids
from tmtccmd.utility.logger import get_console_logger
from tmtccmd.utility.fsfw import parse_fsfw_events_csv, EventDictT
LOGGER = get_console_logger()
DEFAULT_EVENTS_CSV_PATH = "config/events.csv" DEFAULT_EVENTS_CSV_PATH = "config/events.csv"
RETURNVALUE_DICT = None __EVENT_DICT = None
def get_event_dict() -> EventDictT:
global __EVENT_DICT
if __EVENT_DICT is None:
if os.path.exists(DEFAULT_EVENTS_CSV_PATH):
__EVENT_DICT = parse_fsfw_events_csv(DEFAULT_EVENTS_CSV_PATH)
else:
LOGGER.warning(f"No Event CSV file found at {DEFAULT_EVENTS_CSV_PATH}")
__EVENT_DICT = dict()
return __EVENT_DICT
def handle_event_packet( def handle_event_packet(
object_id: bytes, event_id: int, param_1: int, param_2: int object_id: bytes, event_id: int, param_1: int, param_2: int
) -> str: ) -> str:
if os.path.exists(DEFAULT_EVENTS_CSV_PATH) and RETURNVALUE_DICT is None: additional_event_info = ""
with open(DEFAULT_EVENTS_CSV_PATH) as csvfile: event_dict = get_event_dict()
csv_reader = csv.reader(csvfile, delimiter=";") info = event_dict.get(event_id)
for row in csv_reader: obj_ids = get_object_ids()
print(row) obj_id_obj = obj_ids.get(object_id)
return "" if obj_id_obj is None:
LOGGER.warning(f"Object ID 0x{object_id.hex()} has no name")
obj_name = object_id.hex()
else:
obj_name = obj_id_obj.name
generic_event_string = f"Object {obj_name} generated Event {event_id} | {info.name}"
if info.info != "":
additional_event_info = (
f" | Additional info: {info.info} | P1: {param_1} | P2: {param_2}"
)
return generic_event_string + additional_event_info

View File

@@ -66,6 +66,7 @@ from pus_tm.factory_hook import ccsds_tm_handler
def main(): def main():
from pus_tm.event_handler import handle_event_packet from pus_tm.event_handler import handle_event_packet
hook_obj = EiveHookObject() hook_obj = EiveHookObject()
print(f"-- eive tmtc version {__version__} --") print(f"-- eive tmtc version {__version__} --")
print(f"-- spacepackets version {spacepackets.__version__} --") print(f"-- spacepackets version {spacepackets.__version__} --")
@@ -75,7 +76,6 @@ def main():
ccsds_handler.add_tm_handler( ccsds_handler.add_tm_handler(
apid=PUS_APID, pus_tm_handler=ccsds_tm_handler, max_queue_len=50 apid=PUS_APID, pus_tm_handler=ccsds_tm_handler, max_queue_len=50
) )
handle_event_packet(object_id=bytes([0x00, 0x00, 0x00, 0x00]), event_id=0, param_1=0, param_2=0)
add_ccsds_handler(ccsds_handler) add_ccsds_handler(ccsds_handler)
run_tmtc_commander(False) run_tmtc_commander(False)

Submodule tmtccmd updated: c315efe165...f488c466c9