update to new tmtccmd version

This commit is contained in:
Robin Müller 2021-05-17 15:23:54 +02:00
parent a4ea0680fe
commit bd35acead0
No known key found for this signature in database
GPG Key ID: 71B58F8A3CDFA9AC
13 changed files with 75 additions and 88 deletions

View File

@ -15,7 +15,5 @@ def perform_mode_operation_user(tmtc_backend: TmTcHandler, mode: int):
""" """
Custom modes can be implemented here Custom modes can be implemented here
""" """
if tmtc_backend:
pass
LOGGER.error(f"Unknown mode {mode}, Configuration error !") LOGGER.error(f"Unknown mode {mode}, Configuration error !")
sys.exit() sys.exit()

View File

@ -1,8 +1,8 @@
import argparse import argparse
from typing import Dict, Union, Tuple from typing import Dict, Union, Tuple
from tmtccmd.core.hook_base import TmTcHookBase from tmtccmd.config.definitions import ServiceOpCodeDictT
from tmtccmd.ecss.tc import PusTelecommand from tmtccmd.config.hook import TmTcHookBase
from tmtccmd.pus_tm.service_3_base import Service3Base from tmtccmd.pus_tm.service_3_base import Service3Base
@ -17,19 +17,26 @@ class FsfwHookBase(TmTcHookBase):
from config.version import SW_NAME, SW_VERSION, SW_SUBVERSION, SW_SUBSUBVERSION from config.version import SW_NAME, SW_VERSION, SW_SUBVERSION, SW_SUBSUBVERSION
return f"{SW_NAME} {SW_VERSION}.{SW_SUBVERSION}.{SW_SUBSUBVERSION}" return f"{SW_NAME} {SW_VERSION}.{SW_SUBVERSION}.{SW_SUBSUBVERSION}"
def get_json_config_file_path(self) -> str:
return "config/tmtc_config.json"
def get_service_op_code_dictionary(self) -> ServiceOpCodeDictT:
from tmtccmd.config.globals import get_default_service_op_code_dict
return get_default_service_op_code_dict()
def add_globals_pre_args_parsing(self, gui: bool = False): def add_globals_pre_args_parsing(self, gui: bool = False):
from tmtccmd.defaults.globals_setup import set_default_globals_pre_args_parsing from tmtccmd.config.globals import set_default_globals_pre_args_parsing
set_default_globals_pre_args_parsing(gui=gui, apid=0xef) set_default_globals_pre_args_parsing(gui=gui, apid=0xef)
def add_globals_post_args_parsing(self, args: argparse.Namespace): def add_globals_post_args_parsing(self, args: argparse.Namespace):
from tmtccmd.defaults.globals_setup import set_default_globals_post_args_parsing from tmtccmd.config.globals import set_default_globals_post_args_parsing
set_default_globals_post_args_parsing(args=args, json_cfg_path=self.set_json_config_file_path()) set_default_globals_post_args_parsing(args=args, json_cfg_path=self.get_json_config_file_path())
def assign_communication_interface(self, com_if: int, tmtc_printer: TmTcPrinter) -> \ def assign_communication_interface(self, com_if_key: str, tmtc_printer: TmTcPrinter) -> \
Union[CommunicationInterface, None]: Union[CommunicationInterface, None]:
from tmtccmd.defaults.com_setup import create_communication_interface_default from tmtccmd.config.com_if import create_communication_interface_default
return create_communication_interface_default( return create_communication_interface_default(
com_if=com_if, tmtc_printer=tmtc_printer, json_cfg_path=self.set_json_config_file_path() com_if_key=com_if_key, tmtc_printer=tmtc_printer, json_cfg_path=self.get_json_config_file_path()
) )
def perform_mode_operation(self, tmtc_backend: TmTcHandler, mode: int): def perform_mode_operation(self, tmtc_backend: TmTcHandler, mode: int):
@ -39,17 +46,13 @@ class FsfwHookBase(TmTcHookBase):
from pus_tc.tc_packing import pack_service_queue_user from pus_tc.tc_packing import pack_service_queue_user
pack_service_queue_user(service=service, op_code=op_code, service_queue=service_queue) pack_service_queue_user(service=service, op_code=op_code, service_queue=service_queue)
def pack_total_service_queue(self) -> Union[None, TcQueueT]:
from pus_tc.tc_packing import create_total_tc_queue_user
return create_total_tc_queue_user()
def tm_user_factory_hook(self, raw_tm_packet: bytearray) -> Union[None, PusTelemetry]: def tm_user_factory_hook(self, raw_tm_packet: bytearray) -> Union[None, PusTelemetry]:
from pus_tm.factory_hook import tm_user_factory_hook from pus_tm.factory_hook import tm_user_factory_hook
return tm_user_factory_hook(raw_tm_packet=raw_tm_packet) return tm_user_factory_hook(raw_tm_packet=raw_tm_packet)
def set_object_ids(self) -> Dict[bytes, list]: def get_object_ids(self) -> Dict[bytes, list]:
from config.object_ids import set_object_ids from config.object_ids import get_object_ids
return set_object_ids() return get_object_ids()
@staticmethod @staticmethod
def handle_service_8_telemetry( def handle_service_8_telemetry(
@ -62,18 +65,9 @@ class FsfwHookBase(TmTcHookBase):
@staticmethod @staticmethod
def handle_service_3_housekeeping( def handle_service_3_housekeeping(
object_id: bytearray, set_id: int, hk_data: bytearray, service3_packet: Service3Base object_id: int, set_id: int, hk_data: bytearray, service3_packet: Service3Base
) -> Tuple[list, list, bytearray, int]: ) -> Tuple[list, list, bytearray, int]:
from pus_tm.service_3_hk_handling import service_3_hk_handling from pus_tm.service_3_hk_handling import service_3_hk_handling
return service_3_hk_handling( return service_3_hk_handling(
object_id=object_id, set_id=set_id, hk_data=hk_data, service3_packet=service3_packet object_id=object_id, set_id=set_id, hk_data=hk_data, service3_packet=service3_packet
) )
def command_preparation_hook(self) -> Union[None, PusTelecommand]:
pass
def set_json_config_file_path(self) -> str:
return "config/tmtc_config.json"

View File

@ -3,18 +3,17 @@
@details Template configuration file. Copy this folder to the TMTC commander root and adapt @details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs. it to your needs.
""" """
from typing import Dict from typing import Dict
SERVICE_17_OBJ_ID = bytes([0x53, 0x00, 0x00, 0x17]) PUS_SERVICE_17_ID = bytes([0x53, 0x00, 0x00, 0x17])
TEST_DEVICE_0_OBJ_ID = bytes([0x44, 0x01, 0xAF, 0xFE]) TEST_DEVICE_0_ID = bytes([0x44, 0x01, 0xAF, 0xFE])
TEST_DEVICE_1_OBJ_ID = bytes([0x44, 0x02, 0xAF, 0xFE]) TEST_DEVICE_1_ID = bytes([0x44, 0x02, 0xAF, 0xFE])
def set_object_ids() -> Dict[bytes, list]: def get_object_ids() -> Dict[bytes, list]:
object_id_dict = { object_id_dict = {
SERVICE_17_OBJ_ID: ["Service 17"], PUS_SERVICE_17_ID: ["PUS Service 17"],
TEST_DEVICE_0_OBJ_ID: ["Test Device 0"], TEST_DEVICE_0_ID: ["Test Device 0"],
TEST_DEVICE_1_OBJ_ID: ["Test Device 1"], TEST_DEVICE_1_ID: ["Test Device 1"]
} }
return object_id_dict return object_id_dict

View File

@ -1,4 +1,4 @@
SW_NAME = "fsfw-tmtc" SW_NAME = "fsfw-tmtc"
SW_VERSION = 1 SW_VERSION = 1
SW_SUBVERSION = 1 SW_SUBVERSION = 2
SW_SUBSUBVERSION = 0 SW_SUBSUBVERSION = 0

View File

@ -5,12 +5,13 @@
@author R. Mueller @author R. Mueller
@date 02.05.2020 @date 02.05.2020
""" """
from config.object_ids import TEST_DEVICE_0_OBJ_ID, TEST_DEVICE_1_OBJ_ID from tmtccmd.config.definitions import QueueCommands
from tmtccmd.core.definitions import QueueCommands
from tmtccmd.ecss.tc import PusTelecommand from tmtccmd.ecss.tc import PusTelecommand
from tmtccmd.pus_tc.packer import TcQueueT from tmtccmd.pus_tc.packer import TcQueueT
from tmtccmd.pus_tc.service_200_mode import pack_mode_data from tmtccmd.pus_tc.service_200_mode import pack_mode_data
from config.object_ids import TEST_DEVICE_0_ID
def pack_service_200_commands_into(tc_queue: TcQueueT, op_code: str): def pack_service_200_commands_into(tc_queue: TcQueueT, op_code: str):
if op_code == "0": if op_code == "0":
@ -21,7 +22,7 @@ def pack_service_200_test_into(init_ssc: int, tc_queue: TcQueueT) -> int:
new_ssc = init_ssc new_ssc = init_ssc
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200")) tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200"))
# Object ID: DUMMY Device # Object ID: DUMMY Device
object_id = TEST_DEVICE_0_OBJ_ID object_id = TEST_DEVICE_0_ID
# Set On Mode # Set On Mode
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode On")) tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode On"))
mode_data = pack_mode_data(object_id, 1, 0) mode_data = pack_mode_data(object_id, 1, 0)

View File

@ -1,18 +1,16 @@
import struct import struct
from tmtccmd.core.definitions import QueueCommands from tmtccmd.config.definitions import QueueCommands
from tmtccmd.ecss.tc import PusTelecommand from tmtccmd.ecss.tc import PusTelecommand
from tmtccmd.pus_tc.definitions import TcQueueT from tmtccmd.pus_tc.definitions import TcQueueT
from tmtccmd.pus_tc.service_20_parameter import pack_type_and_matrix_data, \ from tmtccmd.pus_tc.service_20_parameter import pack_type_and_matrix_data, \
pack_parameter_id pack_parameter_id
from tmtccmd.pus_tc.service_200_mode import pack_mode_data from tmtccmd.pus_tc.service_200_mode import pack_mode_data
from tmtccmd.utility.tmtcc_logger import get_logger from tmtccmd.utility.logger import get_logger
from config.object_ids import TEST_DEVICE_0_ID
from config.object_ids import TEST_DEVICE_0_OBJ_ID
LOGGER = get_logger() LOGGER = get_logger()
TEST_DEVICE_0_ID = bytearray()
def pack_service20_commands_into(tc_queue: TcQueueT, op_code: str): def pack_service20_commands_into(tc_queue: TcQueueT, op_code: str):
@ -23,7 +21,7 @@ def pack_service20_commands_into(tc_queue: TcQueueT, op_code: str):
def pack_service20_test_into(tc_queue: TcQueueT, called_externally: bool = False): def pack_service20_test_into(tc_queue: TcQueueT, called_externally: bool = False):
if called_externally is False: if called_externally is False:
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20")) tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20"))
object_id = TEST_DEVICE_0_OBJ_ID object_id = TEST_DEVICE_0_ID
# set mode normal # set mode normal
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20: Set Normal Mode")) tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20: Set Normal Mode"))
@ -40,7 +38,7 @@ def pack_service20_test_into(tc_queue: TcQueueT, called_externally: bool = False
def load_param_0_simple_test_commands(tc_queue: TcQueueT): def load_param_0_simple_test_commands(tc_queue: TcQueueT):
object_id = TEST_DEVICE_0_OBJ_ID object_id = TEST_DEVICE_0_ID
parameter_id_0 = pack_parameter_id(domain_id=0, unique_id=0, linear_index=0) parameter_id_0 = pack_parameter_id(domain_id=0, unique_id=0, linear_index=0)
# test checking Load for uint32_t # test checking Load for uint32_t
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20: Load uint32_t")) tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20: Load uint32_t"))

View File

@ -7,13 +7,13 @@
""" """
import struct import struct
from tmtccmd.core.definitions import QueueCommands from tmtccmd.config.definitions import QueueCommands
from tmtccmd.ecss.tc import PusTelecommand from tmtccmd.ecss.tc import PusTelecommand
from tmtccmd.pus_tc.definitions import TcQueueT from tmtccmd.pus_tc.definitions import TcQueueT
from pus_tc.service_200_mode import pack_mode_data from pus_tc.service_200_mode import pack_mode_data
import pus_tc.command_data as cmd_data import pus_tc.command_data as cmd_data
from config.object_ids import TEST_DEVICE_0_OBJ_ID, TEST_DEVICE_1_OBJ_ID from config.object_ids import TEST_DEVICE_0_ID
def pack_service_2_commands_into(tc_queue: TcQueueT, op_code: str): def pack_service_2_commands_into(tc_queue: TcQueueT, op_code: str):
@ -25,7 +25,7 @@ def pack_service_2_commands_into(tc_queue: TcQueueT, op_code: str):
def pack_generic_service_2_test_into(init_ssc: int, tc_queue: TcQueueT) -> int: def pack_generic_service_2_test_into(init_ssc: int, tc_queue: TcQueueT) -> int:
new_ssc = init_ssc new_ssc = init_ssc
object_id = TEST_DEVICE_0_OBJ_ID # dummy device object_id = TEST_DEVICE_0_ID # dummy device
# Set Raw Mode # Set Raw Mode
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 2: Setting Raw Mode")) tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 2: Setting Raw Mode"))
mode_data = pack_mode_data(object_id, 3, 0) mode_data = pack_mode_data(object_id, 3, 0)

View File

@ -1,4 +1,4 @@
from tmtccmd.core.definitions import QueueCommands from tmtccmd.config.definitions import QueueCommands
from tmtccmd.pus_tc.service_200_mode import pack_mode_data from tmtccmd.pus_tc.service_200_mode import pack_mode_data
from tmtccmd.pus_tc.service_20_parameter import pack_boolean_parameter_command from tmtccmd.pus_tc.service_20_parameter import pack_boolean_parameter_command
from tmtccmd.pus_tc.service_3_housekeeping import make_sid, generate_one_hk_command, \ from tmtccmd.pus_tc.service_3_housekeeping import make_sid, generate_one_hk_command, \
@ -7,8 +7,8 @@ from tmtccmd.ecss.tc import PusTelecommand
from tmtccmd.pus_tc.definitions import TcQueueT from tmtccmd.pus_tc.definitions import TcQueueT
from tmtccmd.pus_tc.service_8_functional_cmd import generate_action_command from tmtccmd.pus_tc.service_8_functional_cmd import generate_action_command
from config.object_ids import TEST_DEVICE_0_ID, TEST_DEVICE_1_ID
from config.object_ids import TEST_DEVICE_1_OBJ_ID, TEST_DEVICE_0_OBJ_ID
# Set IDs # Set IDs
TEST_SET_ID = 0 TEST_SET_ID = 0
@ -25,9 +25,9 @@ def pack_service_3_commands_into(tc_queue: TcQueueT, op_code: str):
# TODO: Import this from config instead # TODO: Import this from config instead
device_idx = 0 device_idx = 0
if device_idx == 0: if device_idx == 0:
object_id = TEST_DEVICE_0_OBJ_ID object_id = TEST_DEVICE_0_ID
else: else:
object_id = TEST_DEVICE_1_OBJ_ID object_id = TEST_DEVICE_1_ID
if op_code == "0": if op_code == "0":
# This will pack all the tests # This will pack all the tests

View File

@ -1,10 +1,11 @@
from tmtccmd.core.definitions import QueueCommands from tmtccmd.config.definitions import QueueCommands
from tmtccmd.ecss.tc import PusTelecommand from tmtccmd.ecss.tc import PusTelecommand
from tmtccmd.pus_tc.definitions import TcQueueT from tmtccmd.pus_tc.definitions import TcQueueT
import pus_tc.command_data as cmd_data import pus_tc.command_data as cmd_data
from pus_tc.service_200_mode import pack_mode_data from pus_tc.service_200_mode import pack_mode_data
from config.object_ids import TEST_DEVICE_1_OBJ_ID, TEST_DEVICE_0_OBJ_ID
from config.object_ids import TEST_DEVICE_0_ID
def pack_service_8_commands_into(tc_queue: TcQueueT, op_code: str): def pack_service_8_commands_into(tc_queue: TcQueueT, op_code: str):
@ -16,7 +17,7 @@ def pack_service_8_commands_into(tc_queue: TcQueueT, op_code: str):
def pack_generic_service_8_test_into(tc_queue: TcQueueT): def pack_generic_service_8_test_into(tc_queue: TcQueueT):
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 8")) tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 8"))
object_id = TEST_DEVICE_0_OBJ_ID object_id = TEST_DEVICE_0_ID
# set mode on # set mode on
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 8: Set On Mode")) tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 8: Set On Mode"))

View File

@ -6,14 +6,15 @@
import os import os
from collections import deque from collections import deque
from typing import Union
from pus_tc.service_20_parameters import pack_service20_commands_into from pus_tc.service_20_parameters import pack_service20_commands_into
from pus_tc.service_2_raw_cmd import pack_service_2_commands_into from pus_tc.service_2_raw_cmd import pack_service_2_commands_into
from pus_tc.service_3_housekeeping import pack_service_3_commands_into from pus_tc.service_3_housekeeping import pack_service_3_commands_into
from pus_tc.service_8_func_cmd import pack_service_8_commands_into from pus_tc.service_8_func_cmd import pack_service_8_commands_into
from tmtccmd.utility.tmtcc_logger import get_logger from tmtccmd.utility.logger import get_logger
from tmtccmd.pus_tc.definitions import TcQueueT from tmtccmd.pus_tc.definitions import TcQueueT
from tmtccmd.core.definitions import CoreServiceList from tmtccmd.config.definitions import CoreServiceList
from tmtccmd.pus_tc.service_5_event import pack_generic_service5_test_into from tmtccmd.pus_tc.service_5_event import pack_generic_service5_test_into
from tmtccmd.pus_tc.service_17_test import pack_generic_service17_test from tmtccmd.pus_tc.service_17_test import pack_generic_service17_test
from pus_tc.service_200_mode import pack_service_200_commands_into from pus_tc.service_200_mode import pack_service_200_commands_into
@ -21,20 +22,20 @@ from pus_tc.service_200_mode import pack_service_200_commands_into
LOGGER = get_logger() LOGGER = get_logger()
def pack_service_queue_user(service: int, op_code: str, service_queue: TcQueueT): def pack_service_queue_user(service: Union[str, int], op_code: str, service_queue: TcQueueT):
if service == CoreServiceList.SERVICE_2: if service == CoreServiceList.SERVICE_2.value:
return pack_service_2_commands_into(op_code=op_code, tc_queue=service_queue) return pack_service_2_commands_into(op_code=op_code, tc_queue=service_queue)
if service == CoreServiceList.SERVICE_3: if service == CoreServiceList.SERVICE_3.value:
return pack_service_3_commands_into(op_code=op_code, tc_queue=service_queue) return pack_service_3_commands_into(op_code=op_code, tc_queue=service_queue)
if service == CoreServiceList.SERVICE_5: if service == CoreServiceList.SERVICE_5.value:
return pack_generic_service5_test_into(tc_queue=service_queue) return pack_generic_service5_test_into(tc_queue=service_queue)
if service == CoreServiceList.SERVICE_8: if service == CoreServiceList.SERVICE_8.value:
return pack_service_8_commands_into(op_code=op_code, tc_queue=service_queue) return pack_service_8_commands_into(op_code=op_code, tc_queue=service_queue)
if service == CoreServiceList.SERVICE_17: if service == CoreServiceList.SERVICE_17.value:
return pack_generic_service17_test(init_ssc=1700, tc_queue=service_queue) return pack_generic_service17_test(init_ssc=1700, tc_queue=service_queue)
if service == CoreServiceList.SERVICE_20: if service == CoreServiceList.SERVICE_20.value:
return pack_service20_commands_into(tc_queue=service_queue, op_code=op_code) return pack_service20_commands_into(tc_queue=service_queue, op_code=op_code)
if service == CoreServiceList.SERVICE_200: if service == CoreServiceList.SERVICE_200.value:
return pack_service_200_commands_into(tc_queue=service_queue, op_code=op_code) return pack_service_200_commands_into(tc_queue=service_queue, op_code=op_code)
LOGGER.warning("Invalid Service !") LOGGER.warning("Invalid Service !")

View File

@ -5,7 +5,7 @@
""" """
from tmtccmd.ecss.tm import PusTelemetry from tmtccmd.ecss.tm import PusTelemetry
from tmtccmd.utility.tmtcc_logger import get_logger from tmtccmd.utility.logger import get_logger
from tmtccmd.pus_tm.service_1_verification import Service1TM from tmtccmd.pus_tm.service_1_verification import Service1TM
from tmtccmd.pus_tm.service_2_raw_cmd import Service2TM from tmtccmd.pus_tm.service_2_raw_cmd import Service2TM
from tmtccmd.pus_tm.service_3_housekeeping import Service3TM from tmtccmd.pus_tm.service_3_housekeeping import Service3TM

View File

@ -6,34 +6,37 @@
""" """
import struct import struct
from typing import Tuple from typing import Tuple
from config.object_ids import ObjectIds
from tmtccmd.pus_tm.service_3_housekeeping import Service3Base from tmtccmd.pus_tm.service_3_housekeeping import Service3Base
from tmtccmd.utility.tmtcc_logger import get_logger from tmtccmd.utility.tmtcc_logger import get_logger
from config.object_ids import TEST_DEVICE_0_OBJ_ID, TEST_DEVICE_1_OBJ_ID
LOGGER = get_logger() LOGGER = get_logger()
def service_3_hk_handling( def service_3_hk_handling(
object_id: bytearray, set_id: int, hk_data: bytearray, service3_packet: Service3Base object_id: int, set_id: int, hk_data: bytearray, service3_packet: Service3Base
) -> Tuple[list, list, bytearray, int]: ) -> Tuple[list, list, bytearray, int]:
""" """
This function is called when a Service 3 Housekeeping packet is received. This function is called when a Service 3 Housekeeping packet is received.
:param object_id: Please note that the object IDs should be compared by value because direct comparison of
:param set_id: enumerations does not work in Python. For example use:
:param hk_data:
:param service3_packet: if object_id.value == ObjectIds.TEST_OBJECT.value
:return: Expects a tuple, consisting of two lists, a bytearray and an integer
to test equality based on the object ID list.
@param object_id:
@param set_id:
@param hk_data:
@param service3_packet:
@return: Expects a tuple, consisting of two lists, a bytearray and an integer
The first list contains the header columns, the second list the list with The first list contains the header columns, the second list the list with
the corresponding values. The bytearray is the validity buffer, which is usually appended the corresponding values. The bytearray is the validity buffer, which is usually appended
at the end of the housekeeping packet. The last value is the number of parameters. at the end of the housekeeping packet. The last value is the number of parameters.
""" """
if set_id: if object_id == ObjectIds.TEST_DEVICE_0.value or \
pass object_id == ObjectIds.TEST_DEVICE_1.value:
if service3_packet:
pass
if object_id == TEST_DEVICE_0_OBJ_ID or \
object_id == TEST_DEVICE_1_OBJ_ID:
return handle_test_set_deserialization(hk_data=hk_data) return handle_test_set_deserialization(hk_data=hk_data)
else: else:
LOGGER.info("Service3TM: Parsing for this SID has not been implemented.") LOGGER.info("Service3TM: Parsing for this SID has not been implemented.")

View File

@ -14,14 +14,6 @@ def custom_service_8_handling(
@param custom_data: @param custom_data:
@return: @return:
""" """
if object_id:
pass
if action_id:
pass
if custom_data:
pass
header_list = [] header_list = []
content_list = [] content_list = []
return header_list, content_list return header_list, content_list