added updates manually
This commit is contained in:
@ -15,5 +15,7 @@ def perform_mode_operation_user(tmtc_backend: TmTcHandler, mode: int):
|
||||
"""
|
||||
Custom modes can be implemented here
|
||||
"""
|
||||
if tmtc_backend:
|
||||
pass
|
||||
LOGGER.error(f"Unknown mode {mode}, Configuration error !")
|
||||
sys.exit()
|
||||
|
@ -21,9 +21,9 @@ class FsfwHookBase(TmTcHookBase):
|
||||
from tmtccmd.defaults.globals_setup import set_default_globals_pre_args_parsing
|
||||
set_default_globals_pre_args_parsing(gui=gui, apid=0xef)
|
||||
|
||||
def add_globals_post_args_parsing(self, args: argparse.Namespace):
|
||||
def add_globals_post_args_parsing(self, args: argparse.Namespace, json_cfg_path: str = ""):
|
||||
from tmtccmd.defaults.globals_setup import set_default_globals_post_args_parsing
|
||||
set_default_globals_post_args_parsing(args=args)
|
||||
set_default_globals_post_args_parsing(args=args, json_cfg_path=json_cfg_path)
|
||||
|
||||
def assign_communication_interface(self, com_if: int, tmtc_printer: TmTcPrinter) -> \
|
||||
Union[CommunicationInterface, None]:
|
||||
@ -45,7 +45,7 @@ class FsfwHookBase(TmTcHookBase):
|
||||
from pus_tm.factory_hook import tm_user_factory_hook
|
||||
return tm_user_factory_hook(raw_tm_packet=raw_tm_packet)
|
||||
|
||||
def set_object_ids(self) -> Dict[int, bytearray]:
|
||||
def set_object_ids(self) -> Dict[bytes, list]:
|
||||
from config.object_ids import set_object_ids
|
||||
return set_object_ids()
|
||||
|
||||
@ -60,7 +60,7 @@ class FsfwHookBase(TmTcHookBase):
|
||||
|
||||
@staticmethod
|
||||
def handle_service_3_housekeeping(
|
||||
object_id: int, set_id: int, hk_data: bytearray, service3_packet: Service3Base
|
||||
object_id: bytearray, set_id: int, hk_data: bytearray, service3_packet: Service3Base
|
||||
) -> Tuple[list, list, bytearray, int]:
|
||||
from pus_tm.service_3_hk_handling import service_3_hk_handling
|
||||
return service_3_hk_handling(
|
||||
@ -70,5 +70,8 @@ class FsfwHookBase(TmTcHookBase):
|
||||
def command_preparation_hook(self) -> Union[None, PusTelecommand]:
|
||||
pass
|
||||
|
||||
def set_json_config_file_path(self) -> str:
|
||||
return "config/tmtc_config.json"
|
||||
|
||||
|
||||
|
||||
|
@ -4,20 +4,17 @@
|
||||
it to your needs.
|
||||
"""
|
||||
|
||||
import enum
|
||||
from typing import Dict
|
||||
|
||||
|
||||
class ObjectIds(enum.IntEnum):
|
||||
PUS_SERVICE_17 = 1
|
||||
TEST_DEVICE_0 = 2
|
||||
TEST_DEVICE_1 = 3
|
||||
SERVICE_17_OBJ_ID = bytes([0x53, 0x00, 0x00, 0x17])
|
||||
TEST_DEVICE_0_OBJ_ID = bytes([0x44, 0x01, 0xAF, 0xFE])
|
||||
TEST_DEVICE_1_OBJ_ID = bytes([0x44, 0x02, 0xAF, 0xFE])
|
||||
|
||||
|
||||
def set_object_ids() -> Dict[int, bytearray]:
|
||||
def set_object_ids() -> Dict[bytes, list]:
|
||||
object_id_dict = {
|
||||
ObjectIds.PUS_SERVICE_17: bytearray([0x53, 0x00, 0x00, 0x17]),
|
||||
ObjectIds.TEST_DEVICE_0: bytearray([0x44, 0x01, 0xAF, 0xFE]),
|
||||
ObjectIds.TEST_DEVICE_1: bytearray([0x44, 0x02, 0xAF, 0xFE]),
|
||||
SERVICE_17_OBJ_ID: ["Service 17"],
|
||||
TEST_DEVICE_0_OBJ_ID: ["Test Device 0"],
|
||||
TEST_DEVICE_1_OBJ_ID: ["Test Device 1"],
|
||||
}
|
||||
return object_id_dict
|
||||
|
@ -5,22 +5,14 @@
|
||||
@author R. Mueller
|
||||
@date 02.05.2020
|
||||
"""
|
||||
from config.object_ids import ObjectIds
|
||||
from config.object_ids import TEST_DEVICE_0_OBJ_ID, TEST_DEVICE_1_OBJ_ID
|
||||
from tmtccmd.core.definitions import QueueCommands
|
||||
from tmtccmd.ecss.tc import PusTelecommand
|
||||
from tmtccmd.pus_tc.packer import TcQueueT
|
||||
from tmtccmd.pus_tc.service_200_mode import pack_mode_data
|
||||
|
||||
TEST_DEVICE_0_ID = bytearray()
|
||||
TEST_DEVICE_1_ID = bytearray()
|
||||
|
||||
|
||||
def pack_service_200_commands_into(tc_queue: TcQueueT, op_code: str):
|
||||
from tmtccmd.core.object_id_manager import get_object_id
|
||||
global TEST_DEVICE_1_ID, TEST_DEVICE_0_ID
|
||||
if TEST_DEVICE_0_ID == bytearray() or TEST_DEVICE_1_ID == bytearray:
|
||||
TEST_DEVICE_0_ID = get_object_id(ObjectIds.TEST_DEVICE_0)
|
||||
TEST_DEVICE_1_ID = get_object_id(ObjectIds.TEST_DEVICE_1)
|
||||
if op_code == "0":
|
||||
pack_service_200_test_into(tc_queue=tc_queue, init_ssc=2000)
|
||||
|
||||
@ -29,7 +21,7 @@ def pack_service_200_test_into(init_ssc: int, tc_queue: TcQueueT) -> int:
|
||||
new_ssc = init_ssc
|
||||
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200"))
|
||||
# Object ID: DUMMY Device
|
||||
object_id = TEST_DEVICE_0_ID
|
||||
object_id = TEST_DEVICE_0_OBJ_ID
|
||||
# Set On Mode
|
||||
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode On"))
|
||||
mode_data = pack_mode_data(object_id, 1, 0)
|
||||
|
@ -1,7 +1,6 @@
|
||||
import struct
|
||||
|
||||
from tmtccmd.core.definitions import QueueCommands
|
||||
from tmtccmd.core.object_id_manager import get_object_id
|
||||
from tmtccmd.ecss.tc import PusTelecommand
|
||||
from tmtccmd.pus_tc.definitions import TcQueueT
|
||||
from tmtccmd.pus_tc.service_20_parameter import pack_type_and_matrix_data, \
|
||||
@ -9,16 +8,14 @@ from tmtccmd.pus_tc.service_20_parameter import pack_type_and_matrix_data, \
|
||||
from tmtccmd.pus_tc.service_200_mode import pack_mode_data
|
||||
from tmtccmd.utility.tmtcc_logger import get_logger
|
||||
|
||||
from config.object_ids import ObjectIds
|
||||
|
||||
from config.object_ids import TEST_DEVICE_0_OBJ_ID
|
||||
|
||||
LOGGER = get_logger()
|
||||
TEST_DEVICE_0_ID = bytearray()
|
||||
|
||||
|
||||
def pack_service20_commands_into(tc_queue: TcQueueT, op_code: str):
|
||||
global TEST_DEVICE_0_ID
|
||||
if TEST_DEVICE_0_ID == bytearray():
|
||||
TEST_DEVICE_0_ID = get_object_id(ObjectIds.TEST_DEVICE_0)
|
||||
if op_code == "0":
|
||||
pack_service20_test_into(tc_queue=tc_queue)
|
||||
|
||||
@ -26,7 +23,7 @@ def pack_service20_commands_into(tc_queue: TcQueueT, op_code: str):
|
||||
def pack_service20_test_into(tc_queue: TcQueueT, called_externally: bool = False):
|
||||
if called_externally is False:
|
||||
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20"))
|
||||
object_id = TEST_DEVICE_0_ID
|
||||
object_id = TEST_DEVICE_0_OBJ_ID
|
||||
|
||||
# set mode normal
|
||||
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20: Set Normal Mode"))
|
||||
@ -43,7 +40,7 @@ def pack_service20_test_into(tc_queue: TcQueueT, called_externally: bool = False
|
||||
|
||||
|
||||
def load_param_0_simple_test_commands(tc_queue: TcQueueT):
|
||||
object_id = TEST_DEVICE_0_ID
|
||||
object_id = TEST_DEVICE_0_OBJ_ID
|
||||
parameter_id_0 = pack_parameter_id(domain_id=0, unique_id=0, linear_index=0)
|
||||
# test checking Load for uint32_t
|
||||
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20: Load uint32_t"))
|
||||
|
@ -8,23 +8,15 @@
|
||||
import struct
|
||||
|
||||
from tmtccmd.core.definitions import QueueCommands
|
||||
from tmtccmd.core.object_id_manager import get_object_id
|
||||
from tmtccmd.ecss.tc import PusTelecommand
|
||||
from tmtccmd.pus_tc.definitions import TcQueueT
|
||||
from pus_tc.service_200_mode import pack_mode_data
|
||||
|
||||
import pus_tc.command_data as cmd_data
|
||||
from config.object_ids import ObjectIds
|
||||
|
||||
TEST_DEVICE_0_ID = bytearray()
|
||||
TEST_DEVICE_1_ID = bytearray()
|
||||
from config.object_ids import TEST_DEVICE_0_OBJ_ID, TEST_DEVICE_1_OBJ_ID
|
||||
|
||||
|
||||
def pack_service_2_commands_into(tc_queue: TcQueueT, op_code: str):
|
||||
global TEST_DEVICE_0_ID, TEST_DEVICE_1_ID
|
||||
if TEST_DEVICE_0_ID == 0 or TEST_DEVICE_1_ID == 0:
|
||||
TEST_DEVICE_0_ID = get_object_id(ObjectIds.TEST_DEVICE_0)
|
||||
TEST_DEVICE_1_ID = get_object_id(ObjectIds.TEST_DEVICE_0)
|
||||
if op_code == "0":
|
||||
pack_generic_service_2_test_into(0, tc_queue)
|
||||
else:
|
||||
@ -33,7 +25,7 @@ def pack_service_2_commands_into(tc_queue: TcQueueT, op_code: str):
|
||||
|
||||
def pack_generic_service_2_test_into(init_ssc: int, tc_queue: TcQueueT) -> int:
|
||||
new_ssc = init_ssc
|
||||
object_id = TEST_DEVICE_0_ID # dummy device
|
||||
object_id = TEST_DEVICE_0_OBJ_ID # dummy device
|
||||
# Set Raw Mode
|
||||
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 2: Setting Raw Mode"))
|
||||
mode_data = pack_mode_data(object_id, 3, 0)
|
||||
|
@ -8,11 +8,8 @@ from tmtccmd.ecss.tc import PusTelecommand
|
||||
from tmtccmd.pus_tc.definitions import TcQueueT
|
||||
from tmtccmd.pus_tc.service_8_functional_cmd import generate_action_command
|
||||
|
||||
from config.object_ids import ObjectIds
|
||||
|
||||
# Object IDs
|
||||
TEST_DEVICE_0_ID = bytearray()
|
||||
TEST_DEVICE_1_ID = bytearray()
|
||||
from config.object_ids import TEST_DEVICE_1_OBJ_ID, TEST_DEVICE_0_OBJ_ID
|
||||
|
||||
# Set IDs
|
||||
TEST_SET_ID = 0
|
||||
@ -25,17 +22,13 @@ PARAM_ACTIVATE_CHANGING_DATASETS = 4
|
||||
|
||||
|
||||
def pack_service_3_commands_into(tc_queue: TcQueueT, op_code: str):
|
||||
global TEST_DEVICE_0_ID, TEST_DEVICE_1_ID
|
||||
if TEST_DEVICE_0_ID == bytearray() or TEST_DEVICE_1_ID == bytearray():
|
||||
TEST_DEVICE_0_ID = get_object_id(ObjectIds.TEST_DEVICE_0)
|
||||
TEST_DEVICE_1_ID = get_object_id(ObjectIds.TEST_DEVICE_1)
|
||||
current_ssc = 3000
|
||||
# TODO: Import this from config instead
|
||||
device_idx = 0
|
||||
if device_idx == 0:
|
||||
object_id = TEST_DEVICE_0_ID
|
||||
object_id = TEST_DEVICE_0_OBJ_ID
|
||||
else:
|
||||
object_id = TEST_DEVICE_1_ID
|
||||
object_id = TEST_DEVICE_1_OBJ_ID
|
||||
|
||||
if op_code == "0":
|
||||
# This will pack all the tests
|
||||
|
@ -1,21 +1,13 @@
|
||||
from config.object_ids import ObjectIds
|
||||
import pus_tc.command_data as cmd_data
|
||||
from pus_tc.service_200_mode import pack_mode_data
|
||||
|
||||
from tmtccmd.core.definitions import QueueCommands
|
||||
from tmtccmd.core.object_id_manager import get_object_id
|
||||
from tmtccmd.ecss.tc import PusTelecommand
|
||||
from tmtccmd.pus_tc.definitions import TcQueueT
|
||||
|
||||
TEST_DEVICE_0_ID = bytearray()
|
||||
TEST_DEVICE_1_ID = bytearray()
|
||||
import pus_tc.command_data as cmd_data
|
||||
from pus_tc.service_200_mode import pack_mode_data
|
||||
from config.object_ids import TEST_DEVICE_1_OBJ_ID, TEST_DEVICE_0_OBJ_ID
|
||||
|
||||
|
||||
def pack_service_8_commands_into(tc_queue: TcQueueT, op_code: str):
|
||||
global TEST_DEVICE_0_ID, TEST_DEVICE_1_ID
|
||||
if TEST_DEVICE_0_ID == bytearray() or TEST_DEVICE_1_ID == bytearray():
|
||||
TEST_DEVICE_0_ID = get_object_id(ObjectIds.TEST_DEVICE_0)
|
||||
TEST_DEVICE_1_ID = get_object_id(ObjectIds.TEST_DEVICE_1)
|
||||
if op_code == "0":
|
||||
pack_generic_service_8_test_into(tc_queue=tc_queue)
|
||||
else:
|
||||
@ -24,7 +16,7 @@ def pack_service_8_commands_into(tc_queue: TcQueueT, op_code: str):
|
||||
|
||||
def pack_generic_service_8_test_into(tc_queue: TcQueueT):
|
||||
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 8"))
|
||||
object_id = TEST_DEVICE_0_ID
|
||||
object_id = TEST_DEVICE_0_OBJ_ID
|
||||
|
||||
# set mode on
|
||||
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 8: Set On Mode"))
|
||||
|
@ -6,37 +6,34 @@
|
||||
"""
|
||||
import struct
|
||||
from typing import Tuple
|
||||
from config.object_ids import ObjectIds
|
||||
from tmtccmd.pus_tm.service_3_housekeeping import Service3Base
|
||||
from tmtccmd.utility.tmtcc_logger import get_logger
|
||||
|
||||
from config.object_ids import TEST_DEVICE_0_OBJ_ID, TEST_DEVICE_1_OBJ_ID
|
||||
LOGGER = get_logger()
|
||||
|
||||
|
||||
def service_3_hk_handling(
|
||||
object_id: int, set_id: int, hk_data: bytearray, service3_packet: Service3Base
|
||||
object_id: bytearray, set_id: int, hk_data: bytearray, service3_packet: Service3Base
|
||||
) -> Tuple[list, list, bytearray, int]:
|
||||
"""
|
||||
This function is called when a Service 3 Housekeeping packet is received.
|
||||
|
||||
Please note that the object IDs should be compared by value because direct comparison of
|
||||
enumerations does not work in Python. For example use:
|
||||
|
||||
if object_id.value == ObjectIds.TEST_OBJECT.value
|
||||
|
||||
to test equality based on the object ID list.
|
||||
|
||||
@param object_id:
|
||||
@param set_id:
|
||||
@param hk_data:
|
||||
@param service3_packet:
|
||||
@return: Expects a tuple, consisting of two lists, a bytearray and an integer
|
||||
:param object_id:
|
||||
:param set_id:
|
||||
:param hk_data:
|
||||
:param service3_packet:
|
||||
:return: Expects a tuple, consisting of two lists, a bytearray and an integer
|
||||
The first list contains the header columns, the second list the list with
|
||||
the corresponding values. The bytearray is the validity buffer, which is usually appended
|
||||
at the end of the housekeeping packet. The last value is the number of parameters.
|
||||
"""
|
||||
if object_id == ObjectIds.TEST_DEVICE_0.value or \
|
||||
object_id == ObjectIds.TEST_DEVICE_1.value:
|
||||
if set_id:
|
||||
pass
|
||||
if service3_packet:
|
||||
pass
|
||||
if object_id == TEST_DEVICE_0_OBJ_ID or \
|
||||
object_id == TEST_DEVICE_1_OBJ_ID:
|
||||
return handle_test_set_deserialization(hk_data=hk_data)
|
||||
else:
|
||||
LOGGER.info("Service3TM: Parsing for this SID has not been implemented.")
|
||||
|
@ -14,6 +14,14 @@ def custom_service_8_handling(
|
||||
@param custom_data:
|
||||
@return:
|
||||
"""
|
||||
if object_id:
|
||||
pass
|
||||
|
||||
if action_id:
|
||||
pass
|
||||
|
||||
if custom_data:
|
||||
pass
|
||||
header_list = []
|
||||
content_list = []
|
||||
return header_list, content_list
|
||||
|
0
tmtc/tmtc_client_cli.py
Executable file → Normal file
0
tmtc/tmtc_client_cli.py
Executable file → Normal file
0
tmtc/tmtc_client_gui.py
Executable file → Normal file
0
tmtc/tmtc_client_gui.py
Executable file → Normal file
Reference in New Issue
Block a user