From 904dc74075f7688df4d53bee0195fbf5571dc483 Mon Sep 17 00:00:00 2001 From: Jakob Meier Date: Wed, 24 Nov 2021 15:56:25 +0100 Subject: [PATCH] commands for ccsds and pdec handler --- config/definitions.py | 2 ++ config/hook_implementations.py | 18 ++++++++++-- config/object_ids.py | 3 ++ pus_tc/ccsds_handler.py | 54 ++++++++++++++++++++++++++++++++++ pus_tc/pdec_handler.py | 28 ++++++++++++++++++ pus_tc/tc_packer_hook.py | 8 ++++- 6 files changed, 110 insertions(+), 3 deletions(-) create mode 100644 pus_tc/ccsds_handler.py create mode 100644 pus_tc/pdec_handler.py diff --git a/config/definitions.py b/config/definitions.py index 65d130f..e37f26d 100644 --- a/config/definitions.py +++ b/config/definitions.py @@ -36,3 +36,5 @@ class CustomServiceList(enum.Enum): PLOC_MEMORY_DUMPER = "ploc_memory_dumper" CORE = 'core' STAR_TRACKER = 'star_tracker' + CCSDS_HANDLER = 'ccsds_handler' + PDEC_HANDLER = 'pdec_handler' diff --git a/config/hook_implementations.py b/config/hook_implementations.py index ab3c1c0..99e21c1 100644 --- a/config/hook_implementations.py +++ b/config/hook_implementations.py @@ -282,6 +282,19 @@ def get_eive_service_op_code_dict(service_op_code_dict: ServiceOpCodeDictT): } service_star_tracker_tuple = ("Star tracker", op_code_dict_srv_star_tracker) + op_code_dict_srv_ccsds_handler = { + "0": ("CCSDS Handler: Set low rate", {OpCodeDictKeys.TIMEOUT: 2.0}), + "1": ("CCSDS Handler: Set high rate", {OpCodeDictKeys.TIMEOUT: 2.0}), + "2": ("CCSDS Handler: Enable transmitter", {OpCodeDictKeys.TIMEOUT: 2.0}), + "3": ("CCSDS Handler: Disable transmitter", {OpCodeDictKeys.TIMEOUT: 2.0}), + } + service_ccsds_handler_tuple = ("CCSDS Handler", op_code_dict_srv_ccsds_handler) + + op_code_dict_srv_pdec_handler = { + "0": ("PDEC Handler: Print CLCW", {OpCodeDictKeys.TIMEOUT: 2.0}), + } + service_pdec_handler_tuple = ("PDEC Handler", op_code_dict_srv_pdec_handler) + service_op_code_dict[CustomServiceList.P60DOCK.value] = service_p60_tuple service_op_code_dict[CustomServiceList.PDU1.value] = service_pdu1_tuple service_op_code_dict[CustomServiceList.PDU2.value] = service_pdu2_tuple @@ -295,5 +308,6 @@ def get_eive_service_op_code_dict(service_op_code_dict: ServiceOpCodeDictT): service_op_code_dict[CustomServiceList.PLOC_SUPV.value] = service_ploc_supv_tuple service_op_code_dict[CustomServiceList.PLOC_UPDATER.value] = service_ploc_updater_tuple service_op_code_dict[CustomServiceList.STAR_TRACKER.value] = service_star_tracker_tuple - service_op_code_dict[CustomServiceList.PLOC_MEMORY_DUMPER.value] = \ - service_ploc_memory_dumper_tuple \ No newline at end of file + service_op_code_dict[CustomServiceList.PLOC_MEMORY_DUMPER.value] = service_ploc_memory_dumper_tuple + service_op_code_dict[CustomServiceList.CCSDS_HANDLER.value] = service_ccsds_handler_tuple + service_op_code_dict[CustomServiceList.PDEC_HANDLER.value] = service_pdec_handler_tuple diff --git a/config/object_ids.py b/config/object_ids.py index 95b3f64..1b91269 100644 --- a/config/object_ids.py +++ b/config/object_ids.py @@ -44,6 +44,8 @@ IMTQ_HANDLER_ID = bytes([0x44, 0x14, 0x00, 0x14]) # Misc Object IDs PUS_SERVICE_17_ID = bytes([0x53, 0x00, 0x00, 0x17]) TEST_DEVICE_ID = bytes([0x54, 0x00, 0xAF, 0xFE]) +CCSDS_HANDLER_ID = bytes([0x50, 0x00, 0x08, 0x00]) +PDEC_HANDLER_ID = bytes([0x50, 0x00, 0x07, 0x04]) # Payload Object IDs STAR_TRACKER_ID = bytes([0x44, 0x13, 0x00, 0x1]) @@ -76,5 +78,6 @@ def get_object_ids() -> Dict[bytes, list]: RAD_SENSOR_ID: "Radiation Sensor", PLOC_SUPV_ID: "PLOC Supervisor", CORE_CONTROLLER_ID: "Core Controller", + CORE_CONTROLLER_ID: "CCSDS Handler", }) return object_id_dict diff --git a/pus_tc/ccsds_handler.py b/pus_tc/ccsds_handler.py new file mode 100644 index 0000000..c74be2b --- /dev/null +++ b/pus_tc/ccsds_handler.py @@ -0,0 +1,54 @@ +# -*- coding: utf-8 -*- +""" +@file ccsds_handler.py +@brief Test commanding of CCSDS Handler +@author J. Meier +@date 20.11.2021 +""" +from tmtccmd.config.definitions import QueueCommands +from tmtccmd.tc.packer import TcQueueT +from spacepackets.ecss.tc import PusTelecommand + + +class CommandIds: + # Configures input rate of syrlinks to 400 Khz (results in downlink rate of 200 kbps) + SET_LOW_RATE = bytearray([0x0, 0x0, 0x0, 0x0]) + # Configures input rate of syrlinks to 2000 Khz (results in downlink rate of 1000 kbps) + SET_HIGH_RATE = bytearray([0x0, 0x0, 0x0, 0x1]) + # Enables the syrlinks transmitter (by using RS485 enables lines) + EN_TRANSMITTER = bytearray([0x0, 0x0, 0x0, 0x2]) + # Disables the syrlinks transmitter (by using RS485 enables lines) + DIS_TRANSMITTER = bytearray([0x0, 0x0, 0x0, 0x3]) + + +def pack_ccsds_handler_test(object_id: bytearray, tc_queue: TcQueueT, op_code: str) -> TcQueueT: + tc_queue.appendleft( + (QueueCommands.PRINT, + "Testing ccsds handler with object id: 0x" + object_id.hex()) + ) + + if op_code == "0": + tc_queue.appendleft((QueueCommands.PRINT, "CCSDS Handler: Set low rate")) + command = object_id + CommandIds.SET_LOW_RATE + command = PusTelecommand(service=8, subservice=128, ssc=20, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + + if op_code == "1": + tc_queue.appendleft((QueueCommands.PRINT, "CCSDS Handler: Set high rate")) + command = object_id + CommandIds.SET_HIGH_RATE + command = PusTelecommand(service=8, subservice=128, ssc=21, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + + if op_code == "2": + tc_queue.appendleft((QueueCommands.PRINT, "CCSDS Handler: Enables the transmitter")) + command = object_id + CommandIds.EN_TRANSMITTER + command = PusTelecommand(service=8, subservice=128, ssc=21, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + + if op_code == "3": + tc_queue.appendleft((QueueCommands.PRINT, "CCSDS Handler: Disables the transmitter")) + command = object_id + CommandIds.DIS_TRANSMITTER + command = PusTelecommand(service=8, subservice=128, ssc=21, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + + return tc_queue diff --git a/pus_tc/pdec_handler.py b/pus_tc/pdec_handler.py new file mode 100644 index 0000000..73becf8 --- /dev/null +++ b/pus_tc/pdec_handler.py @@ -0,0 +1,28 @@ +# -*- coding: utf-8 -*- +""" +@file pdec_handler.py +@brief Test commanding of PDEC Handler +@author J. Meier +@date 22.11.2021 +""" +from tmtccmd.config.definitions import QueueCommands +from tmtccmd.tc.packer import TcQueueT +from spacepackets.ecss.tc import PusTelecommand + + +class CommandIds: + # prints the clcw to the console. Useful for debugging + PRINT_CLCW = bytearray([0x0, 0x0, 0x0, 0x0]) + + +def pack_pdec_handler_test(object_id: bytearray, tc_queue: TcQueueT, op_code: str) -> TcQueueT: + tc_queue.appendleft( + (QueueCommands.PRINT, + "Testing PDEC handler with object id: 0x" + object_id.hex()) + ) + + if op_code == "0": + tc_queue.appendleft((QueueCommands.PRINT, "PDEC Handler: Print CLCW")) + command = object_id + CommandIds.PRINT_CLCW + command = PusTelecommand(service=8, subservice=128, ssc=30, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) \ No newline at end of file diff --git a/pus_tc/tc_packer_hook.py b/pus_tc/tc_packer_hook.py index 3c11be7..ff12bce 100644 --- a/pus_tc/tc_packer_hook.py +++ b/pus_tc/tc_packer_hook.py @@ -24,6 +24,7 @@ from pus_tc.reaction_wheels import pack_single_rw_test_into from pus_tc.rad_sensor import pack_rad_sensor_test_into from pus_tc.ploc_upater import pack_ploc_updater_test_into from pus_tc.ploc_memory_dumper import pack_ploc_memory_dumper_cmd +from pus_tc.ccsds_handler import pack_ccsds_handler_test from pus_tc.core import pack_core_commands from pus_tc.star_tracker import pack_star_tracker_commands_into from pus_tc.gps import pack_gps_command @@ -32,7 +33,8 @@ from config.definitions import CustomServiceList from config.object_ids import P60_DOCK_HANDLER, PDU_1_HANDLER_ID, PDU_2_HANDLER_ID, \ ACU_HANDLER_ID, TMP_1075_1_HANDLER_ID, TMP_1075_2_HANDLER_ID, HEATER_ID, IMTQ_HANDLER_ID, \ PLOC_MPSOC_ID, RW1_ID, RW2_ID, RW3_ID, RW4_ID, RAD_SENSOR_ID, PLOC_SUPV_ID, PLOC_UPDATER_ID, \ - STAR_TRACKER_ID, PLOC_MEMORY_DUMPER_ID, GPS_HANDLER_0_ID, GPS_HANDLER_1_ID + STAR_TRACKER_ID, PLOC_MEMORY_DUMPER_ID, GPS_HANDLER_0_ID, GPS_HANDLER_1_ID, CCSDS_HANDLER_ID, \ + PDEC_HANDLER_ID LOGGER = get_console_logger() @@ -117,6 +119,10 @@ def pack_service_queue_user(service: Union[str, int], op_code: str, service_queu return pack_gps_command(object_id=GPS_HANDLER_0_ID, tc_queue=service_queue, op_code=op_code) if service == CustomServiceList.GPS_1.value: return pack_gps_command(object_id=GPS_HANDLER_1_ID, tc_queue=service_queue, op_code=op_code) + if service == CustomServiceList.CCSDS_HANDLER.value: + return pack_ccsds_handler_test(object_id=CCSDS_HANDLER_ID, tc_queue=service_queue, op_code=op_code) + if service == CustomServiceList.PDEC_HANDLER.value: + return pack_ccsds_handler_test(object_id=PDEC_HANDLER_ID, tc_queue=service_queue, op_code=op_code) LOGGER.warning("Invalid Service !")