Compare commits

...

2 Commits

Author SHA1 Message Date
Robin Mueller
454ef636a1 changes for updated tmtccmd 2022-03-04 14:27:19 +01:00
Robin Mueller
2e0cd0fcf4 added event and obj ID parsing 2022-03-04 11:56:42 +01:00
9 changed files with 90 additions and 49 deletions

View File

@@ -8,6 +8,8 @@ from tmtccmd.config.definitions import (
)
from tmtccmd.tm.service_3_base import Service3Base
from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.utility.retval import RetvalDictT
from tmtccmd.pus.obj_id import ObjectIdDictT
from tmtccmd.com_if.com_interface_base import CommunicationInterface
from tmtccmd.core.backend import TmTcHandler
from tmtccmd.config.hook import TmTcHookBase
@@ -15,6 +17,7 @@ from tmtccmd.utility.tmtc_printer import TmTcPrinter
from tmtccmd.config.globals import OpCodeDictKeys
from config.definitions import CustomServiceList
from config.retvals import get_retval_dict
class EiveHookObject(TmTcHookBase):
@@ -70,9 +73,8 @@ class EiveHookObject(TmTcHookBase):
service=service, op_code=op_code, service_queue=service_queue
)
def get_object_ids(self) -> Dict[bytes, list]:
def get_object_ids(self) -> ObjectIdDictT:
from config.object_ids import get_object_ids
return get_object_ids()
@staticmethod
@@ -108,6 +110,9 @@ class EiveHookObject(TmTcHookBase):
object_id=object_id, event_id=event_id, param_1=param_1, param_2=param_2
)
def get_retval_dict(self) -> RetvalDictT:
return get_retval_dict()
def get_eive_service_op_code_dict(service_op_code_dict: ServiceOpCodeDictT):
from pus_tc.cmd_definitions import (

View File

@@ -3,7 +3,15 @@
@details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs.
"""
from typing import Dict
import os.path
from tmtccmd.pus.obj_id import ObjectIdDictT
from tmtccmd.utility.fsfw import parse_fsfw_objects_csv
from tmtccmd.utility.logger import get_console_logger
LOGGER = get_console_logger()
DEFAULT_OBJECTS_CSV_PATH = "config/objects.csv"
__OBJECT_ID_DICT = None
# Core Object IDs
CORE_CONTROLLER_ID = bytes([0x43, 0x00, 0x00, 0x03])
@@ -59,31 +67,18 @@ PLOC_MPSOC_ID = bytes([0x44, 0x33, 0x00, 0x15])
PL_PCDU_ID = bytes([0x44, 0x30, 0x00, 0x00])
def get_object_ids() -> Dict[bytes, list]:
object_id_dict = {
PUS_SERVICE_17_ID: "PUS Service 17",
TEST_DEVICE_ID: "Test Device",
P60_DOCK_HANDLER: "P60",
PDU_1_HANDLER_ID: "PCDU PDU1 Handler",
PDU_2_HANDLER_ID: "PCDU PDU2 Handler",
ACU_HANDLER_ID: "ACU Handler",
TMP_1075_1_HANDLER_ID: "TMP 1075 Handler 1",
TMP_1075_2_HANDLER_ID: "TMP 1075 Handler 2",
HEATER_ID: "Heater",
PCDU_HANDLER_ID: "PCDU",
SOLAR_ARRAY_DEPLOYMENT_ID: "Solar Array Deployment",
RW1_ID: "Reaction Wheel 1",
RW2_ID: "Reaction Wheel 2",
RW3_ID: "Reaction Wheel 3",
RW4_ID: "Reaction Wheel 4",
GPS_HANDLER_0_ID: "GPS 0",
GPS_HANDLER_1_ID: "GPS 1",
RAD_SENSOR_ID: "Radiation Sensor",
PLOC_SUPV_ID: "PLOC Supervisor",
PLOC_MPSOC_ID: "PLOC MPSoC",
CORE_CONTROLLER_ID: "Core Controller",
CCSDS_HANDLER_ID: "CCSDS Handler",
PDEC_HANDLER_ID: "PDEC Handler",
STAR_TRACKER_ID: "Star Tracker Handler",
}
return object_id_dict
class ObjectInfo:
id: int = 0
name: str = ""
def get_object_ids() -> ObjectIdDictT:
global __OBJECT_ID_DICT
if not os.path.exists(DEFAULT_OBJECTS_CSV_PATH):
LOGGER.warning(f"No Objects CSV file found at {DEFAULT_OBJECTS_CSV_PATH}")
if __OBJECT_ID_DICT is None:
if os.path.exists(DEFAULT_OBJECTS_CSV_PATH):
__OBJECT_ID_DICT = parse_fsfw_objects_csv(csv_file=DEFAULT_OBJECTS_CSV_PATH)
else:
__OBJECT_ID_DICT = dict()
return __OBJECT_ID_DICT

22
config/retvals.py Normal file
View File

@@ -0,0 +1,22 @@
import os
from tmtccmd.utility.fsfw import parse_fsfw_returnvalues_csv, RetvalDictT
from tmtccmd.utility.logger import get_console_logger
DEFAULT_RETVAL_CSV_NAME = "config/returnvalues.csv"
__RETVAL_DICT = None
LOGGER = get_console_logger()
def get_retval_dict() -> RetvalDictT:
global __RETVAL_DICT
if __RETVAL_DICT is None:
if os.path.exists(DEFAULT_RETVAL_CSV_NAME):
__RETVAL_DICT = parse_fsfw_returnvalues_csv(
csv_file=DEFAULT_RETVAL_CSV_NAME
)
else:
LOGGER.warning(
f"No Return Value CSV file found at {DEFAULT_RETVAL_CSV_NAME}"
)
__RETVAL_DICT = dict()
return __RETVAL_DICT

View File

@@ -18,9 +18,7 @@ class ActionIds:
DUMP_MRAM = 1
def pack_ploc_memory_dumper_cmd(
object_id: bytearray, tc_queue: TcQueueT, op_code: str
):
def pack_ploc_memory_dumper_cmd(object_id: bytearray, tc_queue: TcQueueT, op_code: str):
tc_queue.appendleft(
(
QueueCommands.PRINT,

View File

@@ -63,7 +63,7 @@ def pack_ploc_mpsoc_test_into(object_id: bytearray, tc_queue: TcQueueT) -> TcQue
def generate_write_mem_command(
object_id: bytearray, memory_address: bytearray, memory_data: int
object_id: bytearray, memory_address: bytes, memory_data: int
) -> bytearray:
"""This function generates the command to write to a memory address within the PLOC
@param object_id The object id of the PlocHandler

View File

@@ -13,9 +13,7 @@ from spacepackets.ecss.tc import PusTelecommand
from pus_tc.service_200_mode import pack_mode_data
def pack_rad_sensor_test_into(
object_id: bytearray, tc_queue: TcQueueT, op_code: str
):
def pack_rad_sensor_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: str):
tc_queue.appendleft(
(
QueueCommands.PRINT,

View File

@@ -1,18 +1,41 @@
import csv
import os.path
import config.object_ids as obj_ids
from config.object_ids import get_object_ids
from tmtccmd.utility.logger import get_console_logger
from tmtccmd.utility.fsfw import parse_fsfw_events_csv, EventDictT
LOGGER = get_console_logger()
DEFAULT_EVENTS_CSV_PATH = "config/events.csv"
RETURNVALUE_DICT = None
__EVENT_DICT = None
def get_event_dict() -> EventDictT:
global __EVENT_DICT
if __EVENT_DICT is None:
if os.path.exists(DEFAULT_EVENTS_CSV_PATH):
__EVENT_DICT = parse_fsfw_events_csv(DEFAULT_EVENTS_CSV_PATH)
else:
LOGGER.warning(f"No Event CSV file found at {DEFAULT_EVENTS_CSV_PATH}")
__EVENT_DICT = dict()
return __EVENT_DICT
def handle_event_packet(
object_id: bytes, event_id: int, param_1: int, param_2: int
) -> str:
if os.path.exists(DEFAULT_EVENTS_CSV_PATH) and RETURNVALUE_DICT is None:
with open(DEFAULT_EVENTS_CSV_PATH) as csvfile:
csv_reader = csv.reader(csvfile, delimiter=";")
for row in csv_reader:
print(row)
return ""
additional_event_info = ""
event_dict = get_event_dict()
info = event_dict.get(event_id)
obj_ids = get_object_ids()
obj_id_obj = obj_ids.get(object_id)
if obj_id_obj is None:
LOGGER.warning(f"Object ID 0x{object_id.hex()} has no name")
obj_name = object_id.hex()
else:
obj_name = obj_id_obj.name
generic_event_string = f"Object {obj_name} generated Event {event_id} | {info.name}"
if info.info != "":
additional_event_info = (
f" | Additional info: {info.info} | P1: {param_1} | P2: {param_2}"
)
return generic_event_string + additional_event_info

View File

@@ -66,6 +66,7 @@ from pus_tm.factory_hook import ccsds_tm_handler
def main():
from pus_tm.event_handler import handle_event_packet
hook_obj = EiveHookObject()
print(f"-- eive tmtc version {__version__} --")
print(f"-- spacepackets version {spacepackets.__version__} --")
@@ -75,7 +76,6 @@ def main():
ccsds_handler.add_tm_handler(
apid=PUS_APID, pus_tm_handler=ccsds_tm_handler, max_queue_len=50
)
handle_event_packet(object_id=bytes([0x00, 0x00, 0x00, 0x00]), event_id=0, param_1=0, param_2=0)
add_ccsds_handler(ccsds_handler)
run_tmtc_commander(False)

Submodule tmtccmd updated: c315efe165...f488c466c9