changes for updated tmtccmd

This commit is contained in:
Robin Mueller 2022-03-04 14:27:19 +01:00
parent 2e0cd0fcf4
commit 454ef636a1
No known key found for this signature in database
GPG Key ID: 11D4952C8CCEF814
5 changed files with 67 additions and 76 deletions

View File

@ -8,6 +8,8 @@ from tmtccmd.config.definitions import (
) )
from tmtccmd.tm.service_3_base import Service3Base from tmtccmd.tm.service_3_base import Service3Base
from tmtccmd.tc.definitions import TcQueueT from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.utility.retval import RetvalDictT
from tmtccmd.pus.obj_id import ObjectIdDictT
from tmtccmd.com_if.com_interface_base import CommunicationInterface from tmtccmd.com_if.com_interface_base import CommunicationInterface
from tmtccmd.core.backend import TmTcHandler from tmtccmd.core.backend import TmTcHandler
from tmtccmd.config.hook import TmTcHookBase from tmtccmd.config.hook import TmTcHookBase
@ -15,6 +17,7 @@ from tmtccmd.utility.tmtc_printer import TmTcPrinter
from tmtccmd.config.globals import OpCodeDictKeys from tmtccmd.config.globals import OpCodeDictKeys
from config.definitions import CustomServiceList from config.definitions import CustomServiceList
from config.retvals import get_retval_dict
class EiveHookObject(TmTcHookBase): class EiveHookObject(TmTcHookBase):
@ -70,9 +73,8 @@ class EiveHookObject(TmTcHookBase):
service=service, op_code=op_code, service_queue=service_queue service=service, op_code=op_code, service_queue=service_queue
) )
def get_object_ids(self) -> Dict[bytes, list]: def get_object_ids(self) -> ObjectIdDictT:
from config.object_ids import get_object_ids from config.object_ids import get_object_ids
return get_object_ids() return get_object_ids()
@staticmethod @staticmethod
@ -108,6 +110,9 @@ class EiveHookObject(TmTcHookBase):
object_id=object_id, event_id=event_id, param_1=param_1, param_2=param_2 object_id=object_id, event_id=event_id, param_1=param_1, param_2=param_2
) )
def get_retval_dict(self) -> RetvalDictT:
return get_retval_dict()
def get_eive_service_op_code_dict(service_op_code_dict: ServiceOpCodeDictT): def get_eive_service_op_code_dict(service_op_code_dict: ServiceOpCodeDictT):
from pus_tc.cmd_definitions import ( from pus_tc.cmd_definitions import (

View File

@ -3,11 +3,12 @@
@details Template configuration file. Copy this folder to the TMTC commander root and adapt @details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs. it to your needs.
""" """
import csv
import copy
import os.path import os.path
from tmtccmd.pus.obj_id import ObjectId, ObjectIdDictT from tmtccmd.pus.obj_id import ObjectIdDictT
from tmtccmd.utility.fsfw import parse_fsfw_objects_csv
from tmtccmd.utility.logger import get_console_logger
LOGGER = get_console_logger()
DEFAULT_OBJECTS_CSV_PATH = "config/objects.csv" DEFAULT_OBJECTS_CSV_PATH = "config/objects.csv"
__OBJECT_ID_DICT = None __OBJECT_ID_DICT = None
@ -73,15 +74,11 @@ class ObjectInfo:
def get_object_ids() -> ObjectIdDictT: def get_object_ids() -> ObjectIdDictT:
global __OBJECT_ID_DICT global __OBJECT_ID_DICT
if __OBJECT_ID_DICT is None and os.path.exists(DEFAULT_OBJECTS_CSV_PATH): if not os.path.exists(DEFAULT_OBJECTS_CSV_PATH):
__OBJECT_ID_DICT = dict() LOGGER.warning(f"No Objects CSV file found at {DEFAULT_OBJECTS_CSV_PATH}")
obj_id = ObjectId(object_id=0) if __OBJECT_ID_DICT is None:
with open(DEFAULT_OBJECTS_CSV_PATH) as csvfile: if os.path.exists(DEFAULT_OBJECTS_CSV_PATH):
csv_reader = csv.reader(csvfile, delimiter=";") __OBJECT_ID_DICT = parse_fsfw_objects_csv(csv_file=DEFAULT_OBJECTS_CSV_PATH)
for row in csv_reader: else:
obj_id.id = int(row[0], 16) __OBJECT_ID_DICT = dict()
obj_id.name = row[1]
__OBJECT_ID_DICT.update(
{obj_id.as_bytes: copy.copy(obj_id)}
)
return __OBJECT_ID_DICT return __OBJECT_ID_DICT

22
config/retvals.py Normal file
View File

@ -0,0 +1,22 @@
import os
from tmtccmd.utility.fsfw import parse_fsfw_returnvalues_csv, RetvalDictT
from tmtccmd.utility.logger import get_console_logger
DEFAULT_RETVAL_CSV_NAME = "config/returnvalues.csv"
__RETVAL_DICT = None
LOGGER = get_console_logger()
def get_retval_dict() -> RetvalDictT:
global __RETVAL_DICT
if __RETVAL_DICT is None:
if os.path.exists(DEFAULT_RETVAL_CSV_NAME):
__RETVAL_DICT = parse_fsfw_returnvalues_csv(
csv_file=DEFAULT_RETVAL_CSV_NAME
)
else:
LOGGER.warning(
f"No Return Value CSV file found at {DEFAULT_RETVAL_CSV_NAME}"
)
__RETVAL_DICT = dict()
return __RETVAL_DICT

View File

@ -1,74 +1,41 @@
import csv
import enum
from typing import Optional
import os.path import os.path
from config.object_ids import get_object_ids, PUS_SERVICE_17_ID from config.object_ids import get_object_ids
from tmtccmd.utility.logger import get_console_logger from tmtccmd.utility.logger import get_console_logger
from tmtccmd.utility.fsfw import parse_fsfw_events_csv, EventDictT
LOGGER = get_console_logger() LOGGER = get_console_logger()
DEFAULT_EVENTS_CSV_PATH = "config/events.csv" DEFAULT_EVENTS_CSV_PATH = "config/events.csv"
__RETURNVALUE_DICT = None __EVENT_DICT = None
class Severity(enum.IntEnum): def get_event_dict() -> EventDictT:
INFO = (0,) global __EVENT_DICT
LOW = (1,) if __EVENT_DICT is None:
MEDIUM = (2,) if os.path.exists(DEFAULT_EVENTS_CSV_PATH):
HIGH = 3 __EVENT_DICT = parse_fsfw_events_csv(DEFAULT_EVENTS_CSV_PATH)
else:
LOGGER.warning(f"No Event CSV file found at {DEFAULT_EVENTS_CSV_PATH}")
def str_to_severity(string: str) -> Optional[Severity]: __EVENT_DICT = dict()
if string == "INFO": return __EVENT_DICT
return Severity.INFO
elif string == "LOW":
return Severity.LOW
elif string == "MEDIUM":
return Severity.MEDIUM
elif string == "HIGH":
return Severity.HIGH
class ReturnValueInfo:
id: int = 0
name: str = ""
severity: str = ""
info: str = ""
file_location: str = ""
def handle_event_packet( def handle_event_packet(
object_id: bytes, event_id: int, param_1: int, param_2: int object_id: bytes, event_id: int, param_1: int, param_2: int
) -> str: ) -> str:
object_id = PUS_SERVICE_17_ID
event_id = 2601
global __RETURNVALUE_DICT
if os.path.exists(DEFAULT_EVENTS_CSV_PATH) and __RETURNVALUE_DICT is None:
__RETURNVALUE_DICT = dict()
with open(DEFAULT_EVENTS_CSV_PATH) as csvfile:
csv_reader = csv.reader(csvfile, delimiter=";")
info = ReturnValueInfo()
for row in csv_reader:
info.id = int(row[0])
info.name = row[2]
info.severity = str_to_severity(row[3])
info.info = row[4]
info.file_location = row[5]
__RETURNVALUE_DICT.update(
{info.id: info}
)
generic_event_string = ""
additional_event_info = "" additional_event_info = ""
if __RETURNVALUE_DICT is not None: event_dict = get_event_dict()
info = __RETURNVALUE_DICT.get(event_id) info = event_dict.get(event_id)
obj_ids = get_object_ids() obj_ids = get_object_ids()
obj_id_obj = obj_ids.get(object_id) obj_id_obj = obj_ids.get(object_id)
if obj_id_obj is None: if obj_id_obj is None:
LOGGER.warning(f"Object ID 0x{object_id.hex()} has no name") LOGGER.warning(f"Object ID 0x{object_id.hex()} has no name")
obj_name = object_id.hex() obj_name = object_id.hex()
else: else:
obj_name = obj_id_obj.name obj_name = obj_id_obj.name
generic_event_string = f"Object {obj_name} generated Event {event_id} | {info.name}" generic_event_string = f"Object {obj_name} generated Event {event_id} | {info.name}"
if info.info != "": if info.info != "":
additional_event_info = f" | Additional info: {info.info}" additional_event_info = (
f" | Additional info: {info.info} | P1: {param_1} | P2: {param_2}"
)
return generic_event_string + additional_event_info return generic_event_string + additional_event_info

@ -1 +1 @@
Subproject commit f1bdd4eb0fd829d49784cbffbf02c655fcf11744 Subproject commit f488c466c958812faab05d23029e09c71cc39f49