apply black formatting

This commit is contained in:
Robin Müller 2021-12-14 15:46:00 +01:00
parent 99ca187b50
commit 1b6799ae9b
No known key found for this signature in database
GPG Key ID: BE6480244DFE612C
12 changed files with 181 additions and 75 deletions

View File

@ -1 +1 @@
PUS_APID = 0xef PUS_APID = 0xEF

View File

@ -15,34 +15,43 @@ from common_tmtc.config.definitions import PUS_APID
class FsfwHookBase(TmTcHookBase): class FsfwHookBase(TmTcHookBase):
def get_json_config_file_path(self) -> str: def get_json_config_file_path(self) -> str:
return "config/tmtc_config.json" return "config/tmtc_config.json"
def get_service_op_code_dictionary(self) -> ServiceOpCodeDictT: def get_service_op_code_dictionary(self) -> ServiceOpCodeDictT:
from tmtccmd.config.globals import get_default_service_op_code_dict from tmtccmd.config.globals import get_default_service_op_code_dict
return get_default_service_op_code_dict()
def_dict = get_default_service_op_code_dict()
return def_dict
def add_globals_pre_args_parsing(self, gui: bool = False): def add_globals_pre_args_parsing(self, gui: bool = False):
from tmtccmd.config.globals import set_default_globals_pre_args_parsing from tmtccmd.config.globals import set_default_globals_pre_args_parsing
set_default_globals_pre_args_parsing( set_default_globals_pre_args_parsing(
gui=gui, pus_tm_version=PusVersion.PUS_C, pus_tc_version=PusVersion.PUS_C, gui=gui,
tc_apid=PUS_APID, tm_apid=PUS_APID pus_tm_version=PusVersion.PUS_C,
pus_tc_version=PusVersion.PUS_C,
tc_apid=PUS_APID,
tm_apid=PUS_APID,
) )
def add_globals_post_args_parsing(self, args: argparse.Namespace): def add_globals_post_args_parsing(self, args: argparse.Namespace):
from tmtccmd.config.globals import set_default_globals_post_args_parsing from tmtccmd.config.globals import set_default_globals_post_args_parsing
set_default_globals_post_args_parsing( set_default_globals_post_args_parsing(
args=args, json_cfg_path=self.get_json_config_file_path() args=args, json_cfg_path=self.get_json_config_file_path()
) )
def assign_communication_interface( def assign_communication_interface(
self, com_if_key: str, tmtc_printer: TmTcPrinter self, com_if_key: str, tmtc_printer: TmTcPrinter
) -> Optional[CommunicationInterface]: ) -> Optional[CommunicationInterface]:
from tmtccmd.config.com_if import create_communication_interface_default from tmtccmd.config.com_if import create_communication_interface_default
return create_communication_interface_default( return create_communication_interface_default(
com_if_key=com_if_key, tmtc_printer=tmtc_printer, com_if_key=com_if_key,
json_cfg_path=self.get_json_config_file_path(), space_packet_ids=(0x08ef,) tmtc_printer=tmtc_printer,
json_cfg_path=self.get_json_config_file_path(),
space_packet_ids=(0x08EF,),
) )
def perform_mode_operation(self, tmtc_backend: TmTcHandler, mode: int): def perform_mode_operation(self, tmtc_backend: TmTcHandler, mode: int):
@ -50,17 +59,22 @@ class FsfwHookBase(TmTcHookBase):
def pack_service_queue(self, service: int, op_code: str, service_queue: TcQueueT): def pack_service_queue(self, service: int, op_code: str, service_queue: TcQueueT):
from common_tmtc.pus_tc.tc_packing import pack_service_queue_user from common_tmtc.pus_tc.tc_packing import pack_service_queue_user
pack_service_queue_user(service=service, op_code=op_code, tc_queue=service_queue)
pack_service_queue_user(
service=service, op_code=op_code, tc_queue=service_queue
)
def get_object_ids(self) -> Dict[bytes, list]: def get_object_ids(self) -> Dict[bytes, list]:
from common_tmtc.config.object_ids import get_object_ids from common_tmtc.config.object_ids import get_object_ids
return get_object_ids() return get_object_ids()
@staticmethod @staticmethod
def handle_service_8_telemetry( def handle_service_8_telemetry(
object_id: int, action_id: int, custom_data: bytearray object_id: int, action_id: int, custom_data: bytearray
) -> Tuple[list, list]: ) -> Tuple[list, list]:
from common_tmtc.pus_tm.service_8_handling import custom_service_8_handling from common_tmtc.pus_tm.service_8_handling import custom_service_8_handling
return custom_service_8_handling( return custom_service_8_handling(
object_id=object_id, action_id=action_id, custom_data=custom_data object_id=object_id, action_id=action_id, custom_data=custom_data
) )
@ -70,6 +84,10 @@ class FsfwHookBase(TmTcHookBase):
object_id: bytes, set_id: int, hk_data: bytearray, service3_packet: Service3Base object_id: bytes, set_id: int, hk_data: bytearray, service3_packet: Service3Base
) -> Tuple[list, list, bytearray, int]: ) -> Tuple[list, list, bytearray, int]:
from common_tmtc.pus_tm.service_3_hk_handling import service_3_hk_handling from common_tmtc.pus_tm.service_3_hk_handling import service_3_hk_handling
return service_3_hk_handling( return service_3_hk_handling(
object_id=object_id, set_id=set_id, hk_data=hk_data, service3_packet=service3_packet object_id=object_id,
set_id=set_id,
hk_data=hk_data,
service3_packet=service3_packet,
) )

View File

@ -12,8 +12,8 @@ TEST_DEVICE_1_ID = bytes([0x44, 0x02, 0xAF, 0xFE])
def get_object_ids() -> Dict[bytes, list]: def get_object_ids() -> Dict[bytes, list]:
object_id_dict = { object_id_dict = {
PUS_SERVICE_17_ID: ["PUS Service 17"], PUS_SERVICE_17_ID: ["PUS Service 17"],
TEST_DEVICE_0_ID: ["Test Device 0"], TEST_DEVICE_0_ID: ["Test Device 0"],
TEST_DEVICE_1_ID: ["Test Device 1"] TEST_DEVICE_1_ID: ["Test Device 1"],
} }
return object_id_dict return object_id_dict

View File

@ -1,9 +1,14 @@
from tmtccmd.tc.definitions import TcQueueT from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.pus.service_17_test import pack_service_17_ping_command, pack_generic_service17_test from tmtccmd.pus.service_17_test import (
pack_service_17_ping_command,
pack_generic_service17_test,
)
def pack_service_17_commands(op_code: str, init_ssc: int, tc_queue: TcQueueT): def pack_service_17_commands(op_code: str, init_ssc: int, tc_queue: TcQueueT):
if op_code == "0": if op_code == "0":
tc_queue.appendleft(pack_service_17_ping_command(ssc=init_ssc).pack_command_tuple()) tc_queue.appendleft(
pack_service_17_ping_command(ssc=init_ssc).pack_command_tuple()
)
else: else:
pack_generic_service17_test(tc_queue=tc_queue, init_ssc=init_ssc) pack_generic_service17_test(tc_queue=tc_queue, init_ssc=init_ssc)

View File

@ -15,7 +15,7 @@ from common_tmtc.config.object_ids import TEST_DEVICE_0_ID
def pack_service_200_commands_into(tc_queue: TcQueueT, op_code: str): def pack_service_200_commands_into(tc_queue: TcQueueT, op_code: str):
if op_code == "0": if op_code == "test":
pack_service_200_test_into(tc_queue=tc_queue, init_ssc=2000) pack_service_200_test_into(tc_queue=tc_queue, init_ssc=2000)
@ -50,4 +50,3 @@ def pack_service_200_test_into(init_ssc: int, tc_queue: TcQueueT) -> int:
new_ssc += 1 new_ssc += 1
tc_queue.appendleft((QueueCommands.EXPORT_LOG, "log/tmtc_log_service200.txt")) tc_queue.appendleft((QueueCommands.EXPORT_LOG, "log/tmtc_log_service200.txt"))
return new_ssc return new_ssc

View File

@ -90,4 +90,3 @@ def load_param_2_simple_test_commands(tc_queue: TcQueueT):
# payload = object_id + parameter_id_2 # payload = object_id + parameter_id_2
# command = PusTelecommand(service=20, subservice=129, ssc=2060, app_data=payload) # command = PusTelecommand(service=20, subservice=129, ssc=2060, app_data=payload)
# tc_queue.appendleft(command.pack_command_tuple()) # tc_queue.appendleft(command.pack_command_tuple())

View File

@ -35,7 +35,9 @@ def pack_generic_service_2_test_into(init_ssc: int, tc_queue: TcQueueT) -> int:
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
new_ssc += 1 new_ssc += 1
# toggle wiretapping raw # toggle wiretapping raw
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 2: Toggling Wiretapping Raw")) tc_queue.appendleft(
(QueueCommands.PRINT, "Testing Service 2: Toggling Wiretapping Raw")
)
wiretapping_toggle_data = pack_wiretapping_mode(object_id, 1) wiretapping_toggle_data = pack_wiretapping_mode(object_id, 1)
toggle_wiretapping_on_command = PusTelecommand( toggle_wiretapping_on_command = PusTelecommand(
service=2, subservice=129, ssc=new_ssc, app_data=wiretapping_toggle_data service=2, subservice=129, ssc=new_ssc, app_data=wiretapping_toggle_data
@ -46,18 +48,25 @@ def pack_generic_service_2_test_into(init_ssc: int, tc_queue: TcQueueT) -> int:
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 2: Sending Raw Command")) tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 2: Sending Raw Command"))
raw_command = cmd_data.TEST_COMMAND_0 raw_command = cmd_data.TEST_COMMAND_0
raw_data = object_id + raw_command raw_data = object_id + raw_command
raw_command = PusTelecommand(service=2, subservice=128, ssc=new_ssc, app_data=raw_data) raw_command = PusTelecommand(
service=2, subservice=128, ssc=new_ssc, app_data=raw_data
)
tc_queue.appendleft(raw_command.pack_command_tuple()) tc_queue.appendleft(raw_command.pack_command_tuple())
new_ssc += 1 new_ssc += 1
# toggle wiretapping off # toggle wiretapping off
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 2: Toggle Wiretapping Off")) tc_queue.appendleft(
(QueueCommands.PRINT, "Testing Service 2: Toggle Wiretapping Off")
)
wiretapping_toggle_data = pack_wiretapping_mode(object_id, 0) wiretapping_toggle_data = pack_wiretapping_mode(object_id, 0)
toggle_wiretapping_off_command = PusTelecommand(service=2, subservice=129, ssc=new_ssc, toggle_wiretapping_off_command = PusTelecommand(
app_data=wiretapping_toggle_data) service=2, subservice=129, ssc=new_ssc, app_data=wiretapping_toggle_data
)
tc_queue.appendleft(toggle_wiretapping_off_command.pack_command_tuple()) tc_queue.appendleft(toggle_wiretapping_off_command.pack_command_tuple())
new_ssc += 1 new_ssc += 1
# send raw command which should be returned via TM[2,130] # send raw command which should be returned via TM[2,130]
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 2: Send second raw command")) tc_queue.appendleft(
(QueueCommands.PRINT, "Testing Service 2: Send second raw command")
)
command = PusTelecommand(service=2, subservice=128, ssc=new_ssc, app_data=raw_data) command = PusTelecommand(service=2, subservice=128, ssc=new_ssc, app_data=raw_data)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
new_ssc += 1 new_ssc += 1
@ -74,6 +83,8 @@ def pack_generic_service_2_test_into(init_ssc: int, tc_queue: TcQueueT) -> int:
# wiretappingMode = 0: MODE_OFF, wiretappingMode = 1: MODE_RAW # wiretappingMode = 0: MODE_OFF, wiretappingMode = 1: MODE_RAW
def pack_wiretapping_mode(object_id, wiretapping_mode_): def pack_wiretapping_mode(object_id, wiretapping_mode_):
wiretapping_mode = struct.pack(">B", wiretapping_mode_) # MODE_OFF : 0x00, MODE_RAW: 0x01 wiretapping_mode = struct.pack(
">B", wiretapping_mode_
) # MODE_OFF : 0x00, MODE_RAW: 0x01
wiretapping_toggle_data = object_id + wiretapping_mode wiretapping_toggle_data = object_id + wiretapping_mode
return wiretapping_toggle_data return wiretapping_toggle_data

View File

@ -3,8 +3,11 @@ from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.config.definitions import QueueCommands from tmtccmd.config.definitions import QueueCommands
from tmtccmd.tc.service_200_mode import pack_mode_data, Modes from tmtccmd.tc.service_200_mode import pack_mode_data, Modes
from tmtccmd.tc.service_20_parameter import pack_boolean_parameter_command from tmtccmd.tc.service_20_parameter import pack_boolean_parameter_command
from tmtccmd.tc.service_3_housekeeping import make_sid, generate_one_hk_command, \ from tmtccmd.tc.service_3_housekeeping import (
Srv3Subservice make_sid,
generate_one_hk_command,
Srv3Subservice,
)
from tmtccmd.tc.definitions import TcQueueT from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.tc.service_8_functional_cmd import generate_action_command from tmtccmd.tc.service_8_functional_cmd import generate_action_command
@ -31,43 +34,67 @@ def pack_service_3_commands_into(tc_queue: TcQueueT, op_code: str):
if op_code == "0": if op_code == "0":
# This will pack all the tests # This will pack all the tests
pack_service_3_test_info(tc_queue=tc_queue, init_ssc=current_ssc, object_id=object_id, pack_service_3_test_info(
device_idx=device_idx) tc_queue=tc_queue,
init_ssc=current_ssc,
object_id=object_id,
device_idx=device_idx,
)
elif op_code == "1": elif op_code == "1":
# Extremely simple, generate one HK packet # Extremely simple, generate one HK packet
pack_gen_one_hk_command(tc_queue=tc_queue, device_idx=device_idx, init_ssc=current_ssc, pack_gen_one_hk_command(
object_id=object_id) tc_queue=tc_queue,
device_idx=device_idx,
init_ssc=current_ssc,
object_id=object_id,
)
elif op_code == "2": elif op_code == "2":
# Housekeeping basic test # Housekeeping basic test
pack_housekeeping_basic_test(tc_queue=tc_queue, object_id=object_id, init_ssc=current_ssc) pack_housekeeping_basic_test(
tc_queue=tc_queue, object_id=object_id, init_ssc=current_ssc
)
elif op_code == "3": elif op_code == "3":
# Notification demo # Notification demo
pack_notification_basic_test(tc_queue=tc_queue, object_id=object_id, init_ssc=current_ssc) pack_notification_basic_test(
tc_queue=tc_queue, object_id=object_id, init_ssc=current_ssc
)
def pack_service_3_test_info(tc_queue: TcQueueT, device_idx: int, object_id: bytearray, def pack_service_3_test_info(
init_ssc: int): tc_queue: TcQueueT, device_idx: int, object_id: bytearray, init_ssc: int
tc_queue.appendleft((QueueCommands.PRINT, "Service 3 (Housekeeping Service): All tests")) ):
tc_queue.appendleft(
(QueueCommands.PRINT, "Service 3 (Housekeeping Service): All tests")
)
current_ssc = init_ssc current_ssc = init_ssc
current_ssc += pack_gen_one_hk_command( current_ssc += pack_gen_one_hk_command(
tc_queue=tc_queue, device_idx=device_idx, object_id=object_id, init_ssc=current_ssc tc_queue=tc_queue,
device_idx=device_idx,
object_id=object_id,
init_ssc=current_ssc,
) )
current_ssc += pack_housekeeping_basic_test( current_ssc += pack_housekeeping_basic_test(
tc_queue=tc_queue, object_id=object_id, init_ssc=current_ssc tc_queue=tc_queue, object_id=object_id, init_ssc=current_ssc
) )
current_ssc += pack_notification_basic_test( current_ssc += pack_notification_basic_test(
tc_queue=tc_queue, object_id=object_id, init_ssc=current_ssc, enable_normal_mode=False tc_queue=tc_queue,
object_id=object_id,
init_ssc=current_ssc,
enable_normal_mode=False,
) )
def pack_gen_one_hk_command( def pack_gen_one_hk_command(
tc_queue: TcQueueT, device_idx: int, init_ssc: int, object_id: bytearray tc_queue: TcQueueT, device_idx: int, init_ssc: int, object_id: bytearray
) -> int: ) -> int:
test_sid = make_sid(object_id=object_id, set_id=TEST_SET_ID) test_sid = make_sid(object_id=object_id, set_id=TEST_SET_ID)
tc_queue.appendleft( tc_queue.appendleft(
(QueueCommands.PRINT, f"Service 3 Test: Generate one test set packet for " (
f"test device {device_idx}") QueueCommands.PRINT,
f"Service 3 Test: Generate one test set packet for "
f"test device {device_idx}",
)
) )
command = generate_one_hk_command(ssc=init_ssc, sid=test_sid) command = generate_one_hk_command(ssc=init_ssc, sid=test_sid)
init_ssc += 1 init_ssc += 1
@ -76,7 +103,10 @@ def pack_gen_one_hk_command(
def pack_housekeeping_basic_test( def pack_housekeeping_basic_test(
tc_queue: TcQueueT, object_id: bytearray, init_ssc: int, enable_normal_mode: bool = True tc_queue: TcQueueT,
object_id: bytearray,
init_ssc: int,
enable_normal_mode: bool = True,
) -> int: ) -> int:
""" """
This basic test will request one HK packet, then it will enable periodic packets and listen This basic test will request one HK packet, then it will enable periodic packets and listen
@ -85,63 +115,91 @@ def pack_housekeeping_basic_test(
test_sid = make_sid(object_id=object_id, set_id=TEST_SET_ID) test_sid = make_sid(object_id=object_id, set_id=TEST_SET_ID)
current_ssc = init_ssc current_ssc = init_ssc
# Enable changing datasets via parameter service (Service 20) # Enable changing datasets via parameter service (Service 20)
tc_queue.appendleft((QueueCommands.PRINT, "Service 3 Test: Performing basic HK tests")) tc_queue.appendleft(
(QueueCommands.PRINT, "Service 3 Test: Performing basic HK tests")
)
if enable_normal_mode: if enable_normal_mode:
# Set mode normal so that sets are changed/read regularly # Set mode normal so that sets are changed/read regularly
tc_queue.appendleft((QueueCommands.PRINT, "Service 3 Test: Set Normal Mode")) tc_queue.appendleft((QueueCommands.PRINT, "Service 3 Test: Set Normal Mode"))
mode_data = pack_mode_data(object_id, Modes.NORMAL, 0) mode_data = pack_mode_data(object_id, Modes.NORMAL, 0)
command = PusTelecommand(service=200, subservice=1, ssc=current_ssc, app_data=mode_data) command = PusTelecommand(
service=200, subservice=1, ssc=current_ssc, app_data=mode_data
)
current_ssc += 1 current_ssc += 1
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft((QueueCommands.PRINT, "Enabling changing datasets")) tc_queue.appendleft((QueueCommands.PRINT, "Enabling changing datasets"))
command = pack_boolean_parameter_command( command = pack_boolean_parameter_command(
object_id=object_id, domain_id=0, unique_id=PARAM_ACTIVATE_CHANGING_DATASETS, object_id=object_id,
parameter=True, ssc=current_ssc domain_id=0,
unique_id=PARAM_ACTIVATE_CHANGING_DATASETS,
parameter=True,
ssc=current_ssc,
) )
current_ssc += 1 current_ssc += 1
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
# Enable periodic reporting # Enable periodic reporting
tc_queue.appendleft((QueueCommands.PRINT, tc_queue.appendleft(
"Enabling periodic thermal sensor packet generation: ")) (QueueCommands.PRINT, "Enabling periodic thermal sensor packet generation: ")
command = PusTelecommand(service=3, subservice=Srv3Subservice.ENABLE_PERIODIC_HK_GEN.value, )
ssc=current_ssc, app_data=test_sid) command = PusTelecommand(
service=3,
subservice=Srv3Subservice.ENABLE_PERIODIC_HK_GEN.value,
ssc=current_ssc,
app_data=test_sid,
)
current_ssc += 1 current_ssc += 1
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft((QueueCommands.WAIT, 2.0)) tc_queue.appendleft((QueueCommands.WAIT, 2.0))
# Disable periodic reporting # Disable periodic reporting
tc_queue.appendleft((QueueCommands.PRINT, tc_queue.appendleft(
"Disabling periodic thermal sensor packet generation: ")) (QueueCommands.PRINT, "Disabling periodic thermal sensor packet generation: ")
command = PusTelecommand(service=3, subservice=Srv3Subservice.DISABLE_PERIODIC_HK_GEN.value, )
ssc=current_ssc, app_data=test_sid) command = PusTelecommand(
service=3,
subservice=Srv3Subservice.DISABLE_PERIODIC_HK_GEN.value,
ssc=current_ssc,
app_data=test_sid,
)
current_ssc += 1 current_ssc += 1
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
# Disable changing datasets via parameter service (Service 20) # Disable changing datasets via parameter service (Service 20)
tc_queue.appendleft((QueueCommands.PRINT, "Disabling changing datasets")) tc_queue.appendleft((QueueCommands.PRINT, "Disabling changing datasets"))
command = pack_boolean_parameter_command( command = pack_boolean_parameter_command(
object_id=object_id, domain_id=0, unique_id=PARAM_ACTIVATE_CHANGING_DATASETS, object_id=object_id,
parameter=False, ssc=current_ssc domain_id=0,
unique_id=PARAM_ACTIVATE_CHANGING_DATASETS,
parameter=False,
ssc=current_ssc,
) )
current_ssc += 1 current_ssc += 1
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
return current_ssc return current_ssc
def pack_notification_basic_test(tc_queue: TcQueueT, object_id: bytearray, init_ssc: int, def pack_notification_basic_test(
enable_normal_mode: bool = True) -> int: tc_queue: TcQueueT,
object_id: bytearray,
init_ssc: int,
enable_normal_mode: bool = True,
) -> int:
current_ssc = init_ssc current_ssc = init_ssc
tc_queue.appendleft((QueueCommands.PRINT, "Service 3 Test: Performing notification tests")) tc_queue.appendleft(
(QueueCommands.PRINT, "Service 3 Test: Performing notification tests")
)
if enable_normal_mode: if enable_normal_mode:
# Set mode normal so that sets are changed/read regularly # Set mode normal so that sets are changed/read regularly
tc_queue.appendleft((QueueCommands.PRINT, "Service 3 Test: Set Normal Mode")) tc_queue.appendleft((QueueCommands.PRINT, "Service 3 Test: Set Normal Mode"))
mode_data = pack_mode_data(object_id, Modes.NORMAL, 0) mode_data = pack_mode_data(object_id, Modes.NORMAL, 0)
command = PusTelecommand(service=200, subservice=1, ssc=current_ssc, app_data=mode_data) command = PusTelecommand(
service=200, subservice=1, ssc=current_ssc, app_data=mode_data
)
current_ssc += 1 current_ssc += 1
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())

View File

@ -34,10 +34,14 @@ def pack_generic_service_8_test_into(tc_queue: TcQueueT):
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
# Direct command which triggers completion reply # Direct command which triggers completion reply
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 8: Trigger Step and Completion Reply")) tc_queue.appendleft(
(QueueCommands.PRINT, "Testing Service 8: Trigger Step and Completion Reply")
)
action_id = cmd_data.TEST_COMMAND_0 action_id = cmd_data.TEST_COMMAND_0
direct_command = object_id + action_id direct_command = object_id + action_id
command = PusTelecommand(service=8, subservice=128, ssc=820, app_data=direct_command) command = PusTelecommand(
service=8, subservice=128, ssc=820, app_data=direct_command
)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
# Direct command which triggers _tm_data reply # Direct command which triggers _tm_data reply
@ -46,7 +50,9 @@ def pack_generic_service_8_test_into(tc_queue: TcQueueT):
command_param1 = cmd_data.TEST_COMMAND_1_PARAM_1 command_param1 = cmd_data.TEST_COMMAND_1_PARAM_1
command_param2 = cmd_data.TEST_COMMAND_1_PARAM_2 command_param2 = cmd_data.TEST_COMMAND_1_PARAM_2
direct_command = object_id + action_id + command_param1 + command_param2 direct_command = object_id + action_id + command_param1 + command_param2
command = PusTelecommand(service=8, subservice=128, ssc=830, app_data=direct_command) command = PusTelecommand(
service=8, subservice=128, ssc=830, app_data=direct_command
)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
# Set mode off # Set mode off

View File

@ -22,7 +22,9 @@ from common_tmtc.config.definitions import PUS_APID
LOGGER = get_console_logger() LOGGER = get_console_logger()
def ccsds_tm_handler(apid: int, raw_tm_packet: bytearray, tmtc_printer: TmTcPrinter) -> None: def ccsds_tm_handler(
apid: int, raw_tm_packet: bytearray, tmtc_printer: TmTcPrinter
) -> None:
if apid == PUS_APID: if apid == PUS_APID:
pus_packet_factory(raw_tm_packet=raw_tm_packet, tmtc_printer=tmtc_printer) pus_packet_factory(raw_tm_packet=raw_tm_packet, tmtc_printer=tmtc_printer)
@ -47,6 +49,10 @@ def pus_packet_factory(raw_tm_packet: bytearray, tmtc_printer: TmTcPrinter):
if service_type == 200: if service_type == 200:
tm_packet = Service200TM.unpack(raw_tm_packet) tm_packet = Service200TM.unpack(raw_tm_packet)
if tm_packet is None: if tm_packet is None:
LOGGER.info("The service " + str(service_type) + " is not implemented in Telemetry Factory") LOGGER.info(
"The service "
+ str(service_type)
+ " is not implemented in Telemetry Factory"
)
tm_packet = PusTelemetry.unpack(raw_tm_packet) tm_packet = PusTelemetry.unpack(raw_tm_packet)
tmtc_printer.print_telemetry(packet_if=tm_packet, info_if=tm_packet) tmtc_printer.print_telemetry(packet_if=tm_packet, info_if=tm_packet)

View File

@ -6,11 +6,12 @@ from tmtccmd.tm.service_3_housekeeping import Service3Base
from tmtccmd.utility.logger import get_console_logger from tmtccmd.utility.logger import get_console_logger
from common_tmtc.config.object_ids import TEST_DEVICE_0_ID, TEST_DEVICE_1_ID from common_tmtc.config.object_ids import TEST_DEVICE_0_ID, TEST_DEVICE_1_ID
LOGGER = get_console_logger() LOGGER = get_console_logger()
def service_3_hk_handling( def service_3_hk_handling(
object_id: bytes, set_id: int, hk_data: bytearray, service3_packet: Service3Base object_id: bytes, set_id: int, hk_data: bytearray, service3_packet: Service3Base
) -> Tuple[list, list, bytearray, int]: ) -> Tuple[list, list, bytearray, int]:
"""This function is called when a Service 3 Housekeeping packet is received. """This function is called when a Service 3 Housekeeping packet is received.
@ -37,7 +38,9 @@ def service_3_hk_handling(
return [], [], bytearray(), 0 return [], [], bytearray(), 0
def handle_test_set_deserialization(hk_data: bytearray) -> Tuple[list, list, bytearray, int]: def handle_test_set_deserialization(
hk_data: bytearray,
) -> Tuple[list, list, bytearray, int]:
header_list = [] header_list = []
content_list = [] content_list = []
validity_buffer = bytearray() validity_buffer = bytearray()
@ -45,11 +48,11 @@ def handle_test_set_deserialization(hk_data: bytearray) -> Tuple[list, list, byt
if len(hk_data) < 18: if len(hk_data) < 18:
LOGGER.warning("Invalid HK data format for test set reply!") LOGGER.warning("Invalid HK data format for test set reply!")
return header_list, content_list, validity_buffer, 0 return header_list, content_list, validity_buffer, 0
uint8_value = struct.unpack('!B', hk_data[0:1])[0] uint8_value = struct.unpack("!B", hk_data[0:1])[0]
uint32_value = struct.unpack('!I', hk_data[1:5])[0] uint32_value = struct.unpack("!I", hk_data[1:5])[0]
float_value_1 = struct.unpack('!f', hk_data[5:9])[0] float_value_1 = struct.unpack("!f", hk_data[5:9])[0]
float_value_2 = struct.unpack('!f', hk_data[9:13])[0] float_value_2 = struct.unpack("!f", hk_data[9:13])[0]
float_value_3 = struct.unpack('!f', hk_data[13:17])[0] float_value_3 = struct.unpack("!f", hk_data[13:17])[0]
validity_buffer.append(hk_data[17]) validity_buffer.append(hk_data[17])
header_list.append("uint8 value") header_list.append("uint8 value")
header_list.append("uint32 value") header_list.append("uint32 value")

View File

@ -2,7 +2,8 @@ from typing import Tuple
def custom_service_8_handling( def custom_service_8_handling(
object_id: int, action_id: int, custom_data: bytearray) -> Tuple[list, list]: object_id: int, action_id: int, custom_data: bytearray
) -> Tuple[list, list]:
""" """
This function is called by the TMTC core if a Service 8 data reply (subservice 130) This function is called by the TMTC core if a Service 8 data reply (subservice 130)
is received. The user can return a tuple of two lists, where the first list is received. The user can return a tuple of two lists, where the first list