diff --git a/config/definitions.py b/config/definitions.py index c517c70..7a8527c 100644 --- a/config/definitions.py +++ b/config/definitions.py @@ -1 +1 @@ -PUS_APID = 0xef \ No newline at end of file +PUS_APID = 0xEF diff --git a/config/hook_implementation.py b/config/hook_implementation.py index e9c4869..0243b35 100644 --- a/config/hook_implementation.py +++ b/config/hook_implementation.py @@ -15,34 +15,43 @@ from common_tmtc.config.definitions import PUS_APID class FsfwHookBase(TmTcHookBase): - def get_json_config_file_path(self) -> str: return "config/tmtc_config.json" def get_service_op_code_dictionary(self) -> ServiceOpCodeDictT: from tmtccmd.config.globals import get_default_service_op_code_dict - return get_default_service_op_code_dict() + + def_dict = get_default_service_op_code_dict() + return def_dict def add_globals_pre_args_parsing(self, gui: bool = False): from tmtccmd.config.globals import set_default_globals_pre_args_parsing + set_default_globals_pre_args_parsing( - gui=gui, pus_tm_version=PusVersion.PUS_C, pus_tc_version=PusVersion.PUS_C, - tc_apid=PUS_APID, tm_apid=PUS_APID + gui=gui, + pus_tm_version=PusVersion.PUS_C, + pus_tc_version=PusVersion.PUS_C, + tc_apid=PUS_APID, + tm_apid=PUS_APID, ) def add_globals_post_args_parsing(self, args: argparse.Namespace): from tmtccmd.config.globals import set_default_globals_post_args_parsing + set_default_globals_post_args_parsing( args=args, json_cfg_path=self.get_json_config_file_path() ) def assign_communication_interface( - self, com_if_key: str, tmtc_printer: TmTcPrinter + self, com_if_key: str, tmtc_printer: TmTcPrinter ) -> Optional[CommunicationInterface]: from tmtccmd.config.com_if import create_communication_interface_default + return create_communication_interface_default( - com_if_key=com_if_key, tmtc_printer=tmtc_printer, - json_cfg_path=self.get_json_config_file_path(), space_packet_ids=(0x08ef,) + com_if_key=com_if_key, + tmtc_printer=tmtc_printer, + json_cfg_path=self.get_json_config_file_path(), + space_packet_ids=(0x08EF,), ) def perform_mode_operation(self, tmtc_backend: TmTcHandler, mode: int): @@ -50,17 +59,22 @@ class FsfwHookBase(TmTcHookBase): def pack_service_queue(self, service: int, op_code: str, service_queue: TcQueueT): from common_tmtc.pus_tc.tc_packing import pack_service_queue_user - pack_service_queue_user(service=service, op_code=op_code, tc_queue=service_queue) + + pack_service_queue_user( + service=service, op_code=op_code, tc_queue=service_queue + ) def get_object_ids(self) -> Dict[bytes, list]: from common_tmtc.config.object_ids import get_object_ids + return get_object_ids() @staticmethod def handle_service_8_telemetry( - object_id: int, action_id: int, custom_data: bytearray + object_id: int, action_id: int, custom_data: bytearray ) -> Tuple[list, list]: from common_tmtc.pus_tm.service_8_handling import custom_service_8_handling + return custom_service_8_handling( object_id=object_id, action_id=action_id, custom_data=custom_data ) @@ -70,6 +84,10 @@ class FsfwHookBase(TmTcHookBase): object_id: bytes, set_id: int, hk_data: bytearray, service3_packet: Service3Base ) -> Tuple[list, list, bytearray, int]: from common_tmtc.pus_tm.service_3_hk_handling import service_3_hk_handling + return service_3_hk_handling( - object_id=object_id, set_id=set_id, hk_data=hk_data, service3_packet=service3_packet + object_id=object_id, + set_id=set_id, + hk_data=hk_data, + service3_packet=service3_packet, ) diff --git a/config/object_ids.py b/config/object_ids.py index cbea7e3..2b2eff3 100644 --- a/config/object_ids.py +++ b/config/object_ids.py @@ -12,8 +12,8 @@ TEST_DEVICE_1_ID = bytes([0x44, 0x02, 0xAF, 0xFE]) def get_object_ids() -> Dict[bytes, list]: object_id_dict = { - PUS_SERVICE_17_ID: ["PUS Service 17"], - TEST_DEVICE_0_ID: ["Test Device 0"], - TEST_DEVICE_1_ID: ["Test Device 1"] + PUS_SERVICE_17_ID: ["PUS Service 17"], + TEST_DEVICE_0_ID: ["Test Device 0"], + TEST_DEVICE_1_ID: ["Test Device 1"], } return object_id_dict diff --git a/pus_tc/service_17_test.py b/pus_tc/service_17_test.py index a0b8f3b..8ebbecc 100644 --- a/pus_tc/service_17_test.py +++ b/pus_tc/service_17_test.py @@ -1,9 +1,14 @@ from tmtccmd.tc.definitions import TcQueueT -from tmtccmd.pus.service_17_test import pack_service_17_ping_command, pack_generic_service17_test +from tmtccmd.pus.service_17_test import ( + pack_service_17_ping_command, + pack_generic_service17_test, +) def pack_service_17_commands(op_code: str, init_ssc: int, tc_queue: TcQueueT): if op_code == "0": - tc_queue.appendleft(pack_service_17_ping_command(ssc=init_ssc).pack_command_tuple()) + tc_queue.appendleft( + pack_service_17_ping_command(ssc=init_ssc).pack_command_tuple() + ) else: pack_generic_service17_test(tc_queue=tc_queue, init_ssc=init_ssc) diff --git a/pus_tc/service_200_mode.py b/pus_tc/service_200_mode.py index 3f8735c..9010727 100644 --- a/pus_tc/service_200_mode.py +++ b/pus_tc/service_200_mode.py @@ -15,7 +15,7 @@ from common_tmtc.config.object_ids import TEST_DEVICE_0_ID def pack_service_200_commands_into(tc_queue: TcQueueT, op_code: str): - if op_code == "0": + if op_code == "test": pack_service_200_test_into(tc_queue=tc_queue, init_ssc=2000) @@ -50,4 +50,3 @@ def pack_service_200_test_into(init_ssc: int, tc_queue: TcQueueT) -> int: new_ssc += 1 tc_queue.appendleft((QueueCommands.EXPORT_LOG, "log/tmtc_log_service200.txt")) return new_ssc - diff --git a/pus_tc/service_20_parameters.py b/pus_tc/service_20_parameters.py index 9752499..da539ba 100644 --- a/pus_tc/service_20_parameters.py +++ b/pus_tc/service_20_parameters.py @@ -90,4 +90,3 @@ def load_param_2_simple_test_commands(tc_queue: TcQueueT): # payload = object_id + parameter_id_2 # command = PusTelecommand(service=20, subservice=129, ssc=2060, app_data=payload) # tc_queue.appendleft(command.pack_command_tuple()) - diff --git a/pus_tc/service_2_raw_cmd.py b/pus_tc/service_2_raw_cmd.py index 53159e5..d0e7ad0 100644 --- a/pus_tc/service_2_raw_cmd.py +++ b/pus_tc/service_2_raw_cmd.py @@ -35,7 +35,9 @@ def pack_generic_service_2_test_into(init_ssc: int, tc_queue: TcQueueT) -> int: tc_queue.appendleft(command.pack_command_tuple()) new_ssc += 1 # toggle wiretapping raw - tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 2: Toggling Wiretapping Raw")) + tc_queue.appendleft( + (QueueCommands.PRINT, "Testing Service 2: Toggling Wiretapping Raw") + ) wiretapping_toggle_data = pack_wiretapping_mode(object_id, 1) toggle_wiretapping_on_command = PusTelecommand( service=2, subservice=129, ssc=new_ssc, app_data=wiretapping_toggle_data @@ -46,18 +48,25 @@ def pack_generic_service_2_test_into(init_ssc: int, tc_queue: TcQueueT) -> int: tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 2: Sending Raw Command")) raw_command = cmd_data.TEST_COMMAND_0 raw_data = object_id + raw_command - raw_command = PusTelecommand(service=2, subservice=128, ssc=new_ssc, app_data=raw_data) + raw_command = PusTelecommand( + service=2, subservice=128, ssc=new_ssc, app_data=raw_data + ) tc_queue.appendleft(raw_command.pack_command_tuple()) new_ssc += 1 # toggle wiretapping off - tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 2: Toggle Wiretapping Off")) + tc_queue.appendleft( + (QueueCommands.PRINT, "Testing Service 2: Toggle Wiretapping Off") + ) wiretapping_toggle_data = pack_wiretapping_mode(object_id, 0) - toggle_wiretapping_off_command = PusTelecommand(service=2, subservice=129, ssc=new_ssc, - app_data=wiretapping_toggle_data) + toggle_wiretapping_off_command = PusTelecommand( + service=2, subservice=129, ssc=new_ssc, app_data=wiretapping_toggle_data + ) tc_queue.appendleft(toggle_wiretapping_off_command.pack_command_tuple()) new_ssc += 1 # send raw command which should be returned via TM[2,130] - tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 2: Send second raw command")) + tc_queue.appendleft( + (QueueCommands.PRINT, "Testing Service 2: Send second raw command") + ) command = PusTelecommand(service=2, subservice=128, ssc=new_ssc, app_data=raw_data) tc_queue.appendleft(command.pack_command_tuple()) new_ssc += 1 @@ -74,6 +83,8 @@ def pack_generic_service_2_test_into(init_ssc: int, tc_queue: TcQueueT) -> int: # wiretappingMode = 0: MODE_OFF, wiretappingMode = 1: MODE_RAW def pack_wiretapping_mode(object_id, wiretapping_mode_): - wiretapping_mode = struct.pack(">B", wiretapping_mode_) # MODE_OFF : 0x00, MODE_RAW: 0x01 + wiretapping_mode = struct.pack( + ">B", wiretapping_mode_ + ) # MODE_OFF : 0x00, MODE_RAW: 0x01 wiretapping_toggle_data = object_id + wiretapping_mode return wiretapping_toggle_data diff --git a/pus_tc/service_3_housekeeping.py b/pus_tc/service_3_housekeeping.py index a8b17ab..db4d440 100644 --- a/pus_tc/service_3_housekeeping.py +++ b/pus_tc/service_3_housekeeping.py @@ -3,8 +3,11 @@ from spacepackets.ecss.tc import PusTelecommand from tmtccmd.config.definitions import QueueCommands from tmtccmd.tc.service_200_mode import pack_mode_data, Modes from tmtccmd.tc.service_20_parameter import pack_boolean_parameter_command -from tmtccmd.tc.service_3_housekeeping import make_sid, generate_one_hk_command, \ - Srv3Subservice +from tmtccmd.tc.service_3_housekeeping import ( + make_sid, + generate_one_hk_command, + Srv3Subservice, +) from tmtccmd.tc.definitions import TcQueueT from tmtccmd.tc.service_8_functional_cmd import generate_action_command @@ -31,43 +34,67 @@ def pack_service_3_commands_into(tc_queue: TcQueueT, op_code: str): if op_code == "0": # This will pack all the tests - pack_service_3_test_info(tc_queue=tc_queue, init_ssc=current_ssc, object_id=object_id, - device_idx=device_idx) + pack_service_3_test_info( + tc_queue=tc_queue, + init_ssc=current_ssc, + object_id=object_id, + device_idx=device_idx, + ) elif op_code == "1": # Extremely simple, generate one HK packet - pack_gen_one_hk_command(tc_queue=tc_queue, device_idx=device_idx, init_ssc=current_ssc, - object_id=object_id) + pack_gen_one_hk_command( + tc_queue=tc_queue, + device_idx=device_idx, + init_ssc=current_ssc, + object_id=object_id, + ) elif op_code == "2": # Housekeeping basic test - pack_housekeeping_basic_test(tc_queue=tc_queue, object_id=object_id, init_ssc=current_ssc) + pack_housekeeping_basic_test( + tc_queue=tc_queue, object_id=object_id, init_ssc=current_ssc + ) elif op_code == "3": # Notification demo - pack_notification_basic_test(tc_queue=tc_queue, object_id=object_id, init_ssc=current_ssc) + pack_notification_basic_test( + tc_queue=tc_queue, object_id=object_id, init_ssc=current_ssc + ) -def pack_service_3_test_info(tc_queue: TcQueueT, device_idx: int, object_id: bytearray, - init_ssc: int): - tc_queue.appendleft((QueueCommands.PRINT, "Service 3 (Housekeeping Service): All tests")) +def pack_service_3_test_info( + tc_queue: TcQueueT, device_idx: int, object_id: bytearray, init_ssc: int +): + tc_queue.appendleft( + (QueueCommands.PRINT, "Service 3 (Housekeeping Service): All tests") + ) current_ssc = init_ssc current_ssc += pack_gen_one_hk_command( - tc_queue=tc_queue, device_idx=device_idx, object_id=object_id, init_ssc=current_ssc + tc_queue=tc_queue, + device_idx=device_idx, + object_id=object_id, + init_ssc=current_ssc, ) current_ssc += pack_housekeeping_basic_test( - tc_queue=tc_queue, object_id=object_id, init_ssc=current_ssc + tc_queue=tc_queue, object_id=object_id, init_ssc=current_ssc ) current_ssc += pack_notification_basic_test( - tc_queue=tc_queue, object_id=object_id, init_ssc=current_ssc, enable_normal_mode=False + tc_queue=tc_queue, + object_id=object_id, + init_ssc=current_ssc, + enable_normal_mode=False, ) def pack_gen_one_hk_command( - tc_queue: TcQueueT, device_idx: int, init_ssc: int, object_id: bytearray + tc_queue: TcQueueT, device_idx: int, init_ssc: int, object_id: bytearray ) -> int: test_sid = make_sid(object_id=object_id, set_id=TEST_SET_ID) tc_queue.appendleft( - (QueueCommands.PRINT, f"Service 3 Test: Generate one test set packet for " - f"test device {device_idx}") + ( + QueueCommands.PRINT, + f"Service 3 Test: Generate one test set packet for " + f"test device {device_idx}", + ) ) command = generate_one_hk_command(ssc=init_ssc, sid=test_sid) init_ssc += 1 @@ -76,7 +103,10 @@ def pack_gen_one_hk_command( def pack_housekeeping_basic_test( - tc_queue: TcQueueT, object_id: bytearray, init_ssc: int, enable_normal_mode: bool = True + tc_queue: TcQueueT, + object_id: bytearray, + init_ssc: int, + enable_normal_mode: bool = True, ) -> int: """ This basic test will request one HK packet, then it will enable periodic packets and listen @@ -85,63 +115,91 @@ def pack_housekeeping_basic_test( test_sid = make_sid(object_id=object_id, set_id=TEST_SET_ID) current_ssc = init_ssc # Enable changing datasets via parameter service (Service 20) - tc_queue.appendleft((QueueCommands.PRINT, "Service 3 Test: Performing basic HK tests")) + tc_queue.appendleft( + (QueueCommands.PRINT, "Service 3 Test: Performing basic HK tests") + ) if enable_normal_mode: # Set mode normal so that sets are changed/read regularly tc_queue.appendleft((QueueCommands.PRINT, "Service 3 Test: Set Normal Mode")) mode_data = pack_mode_data(object_id, Modes.NORMAL, 0) - command = PusTelecommand(service=200, subservice=1, ssc=current_ssc, app_data=mode_data) + command = PusTelecommand( + service=200, subservice=1, ssc=current_ssc, app_data=mode_data + ) current_ssc += 1 tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft((QueueCommands.PRINT, "Enabling changing datasets")) command = pack_boolean_parameter_command( - object_id=object_id, domain_id=0, unique_id=PARAM_ACTIVATE_CHANGING_DATASETS, - parameter=True, ssc=current_ssc + object_id=object_id, + domain_id=0, + unique_id=PARAM_ACTIVATE_CHANGING_DATASETS, + parameter=True, + ssc=current_ssc, ) current_ssc += 1 tc_queue.appendleft(command.pack_command_tuple()) # Enable periodic reporting - tc_queue.appendleft((QueueCommands.PRINT, - "Enabling periodic thermal sensor packet generation: ")) - command = PusTelecommand(service=3, subservice=Srv3Subservice.ENABLE_PERIODIC_HK_GEN.value, - ssc=current_ssc, app_data=test_sid) + tc_queue.appendleft( + (QueueCommands.PRINT, "Enabling periodic thermal sensor packet generation: ") + ) + command = PusTelecommand( + service=3, + subservice=Srv3Subservice.ENABLE_PERIODIC_HK_GEN.value, + ssc=current_ssc, + app_data=test_sid, + ) current_ssc += 1 tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft((QueueCommands.WAIT, 2.0)) # Disable periodic reporting - tc_queue.appendleft((QueueCommands.PRINT, - "Disabling periodic thermal sensor packet generation: ")) - command = PusTelecommand(service=3, subservice=Srv3Subservice.DISABLE_PERIODIC_HK_GEN.value, - ssc=current_ssc, app_data=test_sid) + tc_queue.appendleft( + (QueueCommands.PRINT, "Disabling periodic thermal sensor packet generation: ") + ) + command = PusTelecommand( + service=3, + subservice=Srv3Subservice.DISABLE_PERIODIC_HK_GEN.value, + ssc=current_ssc, + app_data=test_sid, + ) current_ssc += 1 tc_queue.appendleft(command.pack_command_tuple()) # Disable changing datasets via parameter service (Service 20) tc_queue.appendleft((QueueCommands.PRINT, "Disabling changing datasets")) command = pack_boolean_parameter_command( - object_id=object_id, domain_id=0, unique_id=PARAM_ACTIVATE_CHANGING_DATASETS, - parameter=False, ssc=current_ssc + object_id=object_id, + domain_id=0, + unique_id=PARAM_ACTIVATE_CHANGING_DATASETS, + parameter=False, + ssc=current_ssc, ) current_ssc += 1 tc_queue.appendleft(command.pack_command_tuple()) return current_ssc -def pack_notification_basic_test(tc_queue: TcQueueT, object_id: bytearray, init_ssc: int, - enable_normal_mode: bool = True) -> int: +def pack_notification_basic_test( + tc_queue: TcQueueT, + object_id: bytearray, + init_ssc: int, + enable_normal_mode: bool = True, +) -> int: current_ssc = init_ssc - tc_queue.appendleft((QueueCommands.PRINT, "Service 3 Test: Performing notification tests")) + tc_queue.appendleft( + (QueueCommands.PRINT, "Service 3 Test: Performing notification tests") + ) if enable_normal_mode: # Set mode normal so that sets are changed/read regularly tc_queue.appendleft((QueueCommands.PRINT, "Service 3 Test: Set Normal Mode")) mode_data = pack_mode_data(object_id, Modes.NORMAL, 0) - command = PusTelecommand(service=200, subservice=1, ssc=current_ssc, app_data=mode_data) + command = PusTelecommand( + service=200, subservice=1, ssc=current_ssc, app_data=mode_data + ) current_ssc += 1 tc_queue.appendleft(command.pack_command_tuple()) diff --git a/pus_tc/service_8_func_cmd.py b/pus_tc/service_8_func_cmd.py index aab3eae..64a2eb8 100644 --- a/pus_tc/service_8_func_cmd.py +++ b/pus_tc/service_8_func_cmd.py @@ -34,10 +34,14 @@ def pack_generic_service_8_test_into(tc_queue: TcQueueT): tc_queue.appendleft(command.pack_command_tuple()) # Direct command which triggers completion reply - tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 8: Trigger Step and Completion Reply")) + tc_queue.appendleft( + (QueueCommands.PRINT, "Testing Service 8: Trigger Step and Completion Reply") + ) action_id = cmd_data.TEST_COMMAND_0 direct_command = object_id + action_id - command = PusTelecommand(service=8, subservice=128, ssc=820, app_data=direct_command) + command = PusTelecommand( + service=8, subservice=128, ssc=820, app_data=direct_command + ) tc_queue.appendleft(command.pack_command_tuple()) # Direct command which triggers _tm_data reply @@ -46,7 +50,9 @@ def pack_generic_service_8_test_into(tc_queue: TcQueueT): command_param1 = cmd_data.TEST_COMMAND_1_PARAM_1 command_param2 = cmd_data.TEST_COMMAND_1_PARAM_2 direct_command = object_id + action_id + command_param1 + command_param2 - command = PusTelecommand(service=8, subservice=128, ssc=830, app_data=direct_command) + command = PusTelecommand( + service=8, subservice=128, ssc=830, app_data=direct_command + ) tc_queue.appendleft(command.pack_command_tuple()) # Set mode off diff --git a/pus_tm/factory_hook.py b/pus_tm/factory_hook.py index 2722009..e4aac68 100644 --- a/pus_tm/factory_hook.py +++ b/pus_tm/factory_hook.py @@ -22,7 +22,9 @@ from common_tmtc.config.definitions import PUS_APID LOGGER = get_console_logger() -def ccsds_tm_handler(apid: int, raw_tm_packet: bytearray, tmtc_printer: TmTcPrinter) -> None: +def ccsds_tm_handler( + apid: int, raw_tm_packet: bytearray, tmtc_printer: TmTcPrinter +) -> None: if apid == PUS_APID: pus_packet_factory(raw_tm_packet=raw_tm_packet, tmtc_printer=tmtc_printer) @@ -47,6 +49,10 @@ def pus_packet_factory(raw_tm_packet: bytearray, tmtc_printer: TmTcPrinter): if service_type == 200: tm_packet = Service200TM.unpack(raw_tm_packet) if tm_packet is None: - LOGGER.info("The service " + str(service_type) + " is not implemented in Telemetry Factory") + LOGGER.info( + "The service " + + str(service_type) + + " is not implemented in Telemetry Factory" + ) tm_packet = PusTelemetry.unpack(raw_tm_packet) tmtc_printer.print_telemetry(packet_if=tm_packet, info_if=tm_packet) diff --git a/pus_tm/service_3_hk_handling.py b/pus_tm/service_3_hk_handling.py index 074a10d..18ec370 100644 --- a/pus_tm/service_3_hk_handling.py +++ b/pus_tm/service_3_hk_handling.py @@ -6,11 +6,12 @@ from tmtccmd.tm.service_3_housekeeping import Service3Base from tmtccmd.utility.logger import get_console_logger from common_tmtc.config.object_ids import TEST_DEVICE_0_ID, TEST_DEVICE_1_ID + LOGGER = get_console_logger() def service_3_hk_handling( - object_id: bytes, set_id: int, hk_data: bytearray, service3_packet: Service3Base + object_id: bytes, set_id: int, hk_data: bytearray, service3_packet: Service3Base ) -> Tuple[list, list, bytearray, int]: """This function is called when a Service 3 Housekeeping packet is received. @@ -37,7 +38,9 @@ def service_3_hk_handling( return [], [], bytearray(), 0 -def handle_test_set_deserialization(hk_data: bytearray) -> Tuple[list, list, bytearray, int]: +def handle_test_set_deserialization( + hk_data: bytearray, +) -> Tuple[list, list, bytearray, int]: header_list = [] content_list = [] validity_buffer = bytearray() @@ -45,11 +48,11 @@ def handle_test_set_deserialization(hk_data: bytearray) -> Tuple[list, list, byt if len(hk_data) < 18: LOGGER.warning("Invalid HK data format for test set reply!") return header_list, content_list, validity_buffer, 0 - uint8_value = struct.unpack('!B', hk_data[0:1])[0] - uint32_value = struct.unpack('!I', hk_data[1:5])[0] - float_value_1 = struct.unpack('!f', hk_data[5:9])[0] - float_value_2 = struct.unpack('!f', hk_data[9:13])[0] - float_value_3 = struct.unpack('!f', hk_data[13:17])[0] + uint8_value = struct.unpack("!B", hk_data[0:1])[0] + uint32_value = struct.unpack("!I", hk_data[1:5])[0] + float_value_1 = struct.unpack("!f", hk_data[5:9])[0] + float_value_2 = struct.unpack("!f", hk_data[9:13])[0] + float_value_3 = struct.unpack("!f", hk_data[13:17])[0] validity_buffer.append(hk_data[17]) header_list.append("uint8 value") header_list.append("uint32 value") diff --git a/pus_tm/service_8_handling.py b/pus_tm/service_8_handling.py index 259e7e4..766c1ed 100644 --- a/pus_tm/service_8_handling.py +++ b/pus_tm/service_8_handling.py @@ -2,7 +2,8 @@ from typing import Tuple def custom_service_8_handling( - object_id: int, action_id: int, custom_data: bytearray) -> Tuple[list, list]: + object_id: int, action_id: int, custom_data: bytearray +) -> Tuple[list, list]: """ This function is called by the TMTC core if a Service 8 data reply (subservice 130) is received. The user can return a tuple of two lists, where the first list