diff --git a/config/definitions.py b/config/definitions.py index 50f4577..70765d5 100644 --- a/config/definitions.py +++ b/config/definitions.py @@ -30,6 +30,8 @@ class CustomServiceList(enum.Enum): RAD_SENSOR = "rad_sensor" PLOC_SUPV = "ploc_supv" PLOC_UPDATER = "ploc_updater" + GPS_0 = "gps0" + GPS_1 = "gps1" PLOC_MEMORY_DUMPER = "ploc_memory_dumper" CORE = 'core' STAR_TRACKER = 'star_tracker' diff --git a/pus_tc/acu.py b/pus_tc/acu.py index d31cba6..7ffe438 100644 --- a/pus_tc/acu.py +++ b/pus_tc/acu.py @@ -5,9 +5,7 @@ @author J. Meier @date 21.12.2020 """ - from tmtccmd.tc.packer import TcQueueT -from tmtccmd.ecss.tc import PusTelecommand from tmtccmd.config.definitions import QueueCommands from gomspace.gomspace_common import * from pus_tc.p60dock import P60DockConfigTable @@ -64,13 +62,13 @@ def pack_acu_test_into(object_id: bytearray, tc_queue: TcQueueT): p60dock_object_id, P60DockConfigTable.out_en_0.parameter_address, P60DockConfigTable.out_en_0.parameter_size, Channel.on ) - command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command) + # command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) if ACUTestProcedure.all or ACUTestProcedure.reboot: tc_queue.appendleft((QueueCommands.PRINT, "ACU: Reboot")) command = pack_reboot_command(object_id) - command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command) + # command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) if ACUTestProcedure.all or ACUTestProcedure.read_gnd_wdt: tc_queue.appendleft((QueueCommands.PRINT, "ACU: Reading ground watchdog timer value")) @@ -78,54 +76,54 @@ def pack_acu_test_into(object_id: bytearray, tc_queue: TcQueueT): object_id, TableIds.hk, ACUHkTable.wdt_gnd_left.parameter_address, ACUHkTable.wdt_gnd_left.parameter_size ) - command = PusTelecommand(service=8, subservice=128, ssc=20, app_data=command) + # command = PusTelecommand(service=8, subservice=128, ssc=20, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) if ACUTestProcedure.all or ACUTestProcedure.gnd_wdt_reset: tc_queue.appendleft((QueueCommands.PRINT, "ACU: Testing ground watchdog reset")) command = pack_gnd_wdt_reset_command(object_id) - command = PusTelecommand(service=8, subservice=128, ssc=21, app_data=command) + # command = PusTelecommand(service=8, subservice=128, ssc=21, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) if ACUTestProcedure.all or ACUTestProcedure.ping: tc_queue.appendleft((QueueCommands.PRINT, "ACU: Ping Test")) ping_data = bytearray([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]) command = pack_ping_command(object_id, ping_data) - command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command) + # command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) if ACUTestProcedure.all or ACUTestProcedure.read_temperature3: tc_queue.appendleft((QueueCommands.PRINT, "ACU: Reading temperature 3")) command = pack_get_param_command(object_id, TableIds.hk, ACUHkTable.temperature3.parameter_address, ACUHkTable.temperature3.parameter_size) - command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command) + # command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) if ACUTestProcedure.all or ACUTestProcedure.read_vboost: tc_queue.appendleft((QueueCommands.PRINT, "ACU: Reading vboost value")) command = pack_get_param_command(object_id, TableIds.config, ACUConfigTable.vboost.parameter_address, ACUConfigTable.vboost.parameter_size) - command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command) + # command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) if ACUTestProcedure.all or ACUTestProcedure.read_vbat_max_hi: tc_queue.appendleft((QueueCommands.PRINT, "ACU: Reading vbat_max_hi")) command = pack_get_param_command(object_id, TableIds.config, ACUConfigTable.vbat_max_hi.parameter_address, ACUConfigTable.vbat_max_hi.parameter_size) - command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command) + # command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) if ACUTestProcedure.all or ACUTestProcedure.read_vbat_max_lo: tc_queue.appendleft((QueueCommands.PRINT, "ACU: Reading vbat_max_lo")) command = pack_get_param_command(object_id, TableIds.config, ACUConfigTable.vbat_max_lo.parameter_address, ACUConfigTable.vbat_max_lo.parameter_size) - command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command) + # command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) if ACUTestProcedure.all or ACUTestProcedure.read_ov_mode: tc_queue.appendleft((QueueCommands.PRINT, "ACU: Reading ov_mode")) command = pack_get_param_command(object_id, TableIds.config, ACUConfigTable.ov_mode.parameter_address, ACUConfigTable.ov_mode.parameter_size) - command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command) + # command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Turning off ACU")) command = pack_set_param_command(p60dock_object_id, P60DockConfigTable.out_en_0.parameter_address, P60DockConfigTable.out_en_0.parameter_size, Channel.off) - command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command) + # command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) return tc_queue diff --git a/pus_tc/gps.py b/pus_tc/gps.py new file mode 100644 index 0000000..42b136d --- /dev/null +++ b/pus_tc/gps.py @@ -0,0 +1,22 @@ +import enum + +from tmtccmd.config.definitions import QueueCommands +from tmtccmd.tc.definitions import TcQueueT +from tmtccmd.tc.service_8_functional_cmd import generate_action_command + + +from config.object_ids import GPS_HANDLER_1_ID, GPS_HANDLER_0_ID + + +class GpsOpCodes(enum.Enum): + RESET_GNSS = "5" + + +def pack_gps_command(object_id: bytearray, tc_queue: TcQueueT, op_code: str): + if op_code == GpsOpCodes.RESET_GNSS.value: + if object_id == GPS_HANDLER_0_ID: + tc_queue.appendleft((QueueCommands.PRINT, "Resetting GPS device 0")) + elif object_id == GPS_HANDLER_1_ID: + tc_queue.appendleft((QueueCommands.PRINT, "Resetting GPS device 1")) + cmd = generate_action_command(object_id=object_id, action_id=int(op_code)) + tc_queue.appendleft(cmd.pack_command_tuple()) diff --git a/pus_tc/p60dock.py b/pus_tc/p60dock.py index 6c9ed46..60ebe42 100644 --- a/pus_tc/p60dock.py +++ b/pus_tc/p60dock.py @@ -62,24 +62,28 @@ def pack_p60dock_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: st if op_code == "1": tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Turning stack 3V3 on")) parameter = 0 # set channel off - command = pack_set_param_command(object_id, P60DockConfigTable.out_en_9.parameter_address, - P60DockConfigTable.out_en_9.parameter_size, Channel.on) - command = PusTelecommand(service=8, subservice=128, ssc=20, app_data=command) + command = pack_set_param_command( + object_id, P60DockConfigTable.out_en_9.parameter_address, + P60DockConfigTable.out_en_9.parameter_size, Channel.on + ) + # command = PusTelecommand(service=8, subservice=128, ssc=20, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) return tc_queue if op_code == "2": tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Turning stack 3V3 off")) parameter = 0 # set channel off - command = pack_set_param_command(object_id, P60DockConfigTable.out_en_9.parameter_address, - P60DockConfigTable.out_en_9.parameter_size, Channel.off) - command = PusTelecommand(service=8, subservice=128, ssc=20, app_data=command) + command = pack_set_param_command( + object_id, P60DockConfigTable.out_en_9.parameter_address, + P60DockConfigTable.out_en_9.parameter_size, Channel.off + ) + # command = PusTelecommand(service=8, subservice=128, ssc=20, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) return tc_queue if P60DockTestProcedure.all or P60DockTestProcedure.reboot: tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Reboot")) command = pack_reboot_command(object_id) - command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command) + # command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) if P60DockTestProcedure.all or P60DockTestProcedure.read_gnd_wdt: tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Reading ground watchdog timer value")) @@ -87,25 +91,25 @@ def pack_p60dock_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: st object_id, TableIds.hk, P60DockHkTable.wdt_gnd_left.parameter_address, P60DockHkTable.wdt_gnd_left.parameter_size ) - command = PusTelecommand(service=8, subservice=128, ssc=20, app_data=command) + # command = PusTelecommand(service=8, subservice=128, ssc=20, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) if P60DockTestProcedure.all or P60DockTestProcedure.gnd_wdt_reset: tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Testing ground watchdog reset")) command = pack_gnd_wdt_reset_command(object_id) - command = PusTelecommand(service=8, subservice=128, ssc=21, app_data=command) + # command = PusTelecommand(service=8, subservice=128, ssc=21, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) if P60DockTestProcedure.all or P60DockTestProcedure.ping: tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Ping")) ping_data = bytearray([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]) command = pack_ping_command(object_id, ping_data) - command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command) + # command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) if P60DockTestProcedure.all or P60DockTestProcedure.channel_3_off: tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Testing setting output channel 3 off")) parameter = 0 # set channel off command = pack_set_param_command(object_id, P60DockConfigTable.out_en_3.parameter_address, P60DockConfigTable.out_en_3.parameter_size, parameter) - command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command) + # command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) if P60DockTestProcedure.all or P60DockTestProcedure.read_temperature1: tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Testing temperature reading")) @@ -113,14 +117,14 @@ def pack_p60dock_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: st object_id, TableIds.hk, P60DockHkTable.temperature1.parameter_address, P60DockHkTable.temperature1.parameter_size ) - command = PusTelecommand(service=8, subservice=128, ssc=24, app_data=command) + # command = PusTelecommand(service=8, subservice=128, ssc=24, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) if P60DockTestProcedure.all or P60DockTestProcedure.channel_3_on: tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Testing Output Channel 3 state (PDU2)")) command = pack_get_param_command(object_id, TableIds.config, P60DockConfigTable.out_en_3.parameter_address, P60DockConfigTable.out_en_3.parameter_size) - command = PusTelecommand(service=8, subservice=128, ssc=25, app_data=command) + # command = PusTelecommand(service=8, subservice=128, ssc=25, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) if P60DockTestProcedure.all or P60DockTestProcedure.read_cur_lu_lim_0: tc_queue.appendleft((QueueCommands.PRINT, @@ -129,14 +133,14 @@ def pack_p60dock_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: st object_id, TableIds.config, P60DockConfigTable.cur_lu_lim_0.parameter_address, P60DockConfigTable.cur_lu_lim_0.parameter_size ) - command = PusTelecommand(service=8, subservice=128, ssc=26, app_data=command) + # command = PusTelecommand(service=8, subservice=128, ssc=26, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) if P60DockTestProcedure.all or P60DockTestProcedure.channel_3_on: tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Testing setting output channel 3 on")) parameter = 1 # set channel on command = pack_set_param_command(object_id, P60DockConfigTable.out_en_3.parameter_address, P60DockConfigTable.out_en_3.parameter_size, parameter) - command = PusTelecommand(service=8, subservice=128, ssc=27, app_data=command) + # command = PusTelecommand(service=8, subservice=128, ssc=27, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) if P60DockTestProcedure.all or P60DockTestProcedure.invalid_table_id_test: tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Testing invalid table id handling")) @@ -145,7 +149,7 @@ def pack_p60dock_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: st object_id, table_id_invalid, P60DockHkTable.temperature1.parameter_address, P60DockHkTable.temperature1.parameter_size ) - command = PusTelecommand(service=8, subservice=128, ssc=28, app_data=command) + # command = PusTelecommand(service=8, subservice=128, ssc=28, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) if P60DockTestProcedure.all or P60DockTestProcedure.invalid_address_test: tc_queue.appendleft((QueueCommands.PRINT, @@ -153,7 +157,7 @@ def pack_p60dock_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: st invalid_address = bytearray([0x01, 0xF4]) command = pack_get_param_command(object_id, TableIds.hk, invalid_address, P60DockHkTable.temperature1.parameter_size) - command = PusTelecommand(service=8, subservice=128, ssc=29, app_data=command) + # command = PusTelecommand(service=8, subservice=128, ssc=29, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Testing invalid address handling in set param command")) @@ -161,7 +165,7 @@ def pack_p60dock_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: st parameter_size = 2 parameter = 1 command = pack_set_param_command(object_id, invalid_address, parameter_size, parameter) - command = PusTelecommand(service=8, subservice=128, ssc=30, app_data=command) + # command = PusTelecommand(service=8, subservice=128, ssc=30, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) if P60DockTestProcedure.all or P60DockTestProcedure.invalid_parameter_size_test: tc_queue.appendleft( @@ -172,7 +176,7 @@ def pack_p60dock_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: st command = pack_get_param_command( object_id, TableIds.hk, P60DockHkTable.temperature1.parameter_address, invalid_size ) - command = PusTelecommand(service=8, subservice=128, ssc=31, app_data=command) + # command = PusTelecommand(service=8, subservice=128, ssc=31, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft( (QueueCommands.PRINT, @@ -182,7 +186,7 @@ def pack_p60dock_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: st command = pack_set_param_command( object_id, P60DockConfigTable.out_en_3.parameter_address, invalid_size, parameter ) - command = PusTelecommand(service=8, subservice=128, ssc=32, app_data=command) + # command = PusTelecommand(service=8, subservice=128, ssc=32, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) return tc_queue diff --git a/pus_tc/tc_packer_hook.py b/pus_tc/tc_packer_hook.py index d88c966..1717d79 100644 --- a/pus_tc/tc_packer_hook.py +++ b/pus_tc/tc_packer_hook.py @@ -30,10 +30,12 @@ from pus_tc.ploc_upater import pack_ploc_updater_test_into from pus_tc.ploc_memory_dumper import pack_ploc_memory_dumper_cmd from pus_tc.core import pack_core_commands from pus_tc.star_tracker import pack_star_tracker_commands_into +from pus_tc.gps import pack_gps_command from config.definitions import CustomServiceList -from config.object_ids import P60_DOCK_HANDLER, PDU_1_HANDLER_ID, PDU_2_HANDLER_ID, ACU_HANDLER_ID, \ - TMP_1075_1_HANDLER_ID, TMP_1075_2_HANDLER_ID, HEATER_ID, IMTQ_HANDLER_ID, PLOC_MPSOC_ID, RW1_ID, RW2_ID, RW3_ID, RW4_ID, \ - RAD_SENSOR_ID, PLOC_SUPV_ID, PLOC_UPDATER_ID, STAR_TRACKER_ID, PLOC_MEMORY_DUMPER_ID +from config.object_ids import P60_DOCK_HANDLER, PDU_1_HANDLER_ID, PDU_2_HANDLER_ID, \ + ACU_HANDLER_ID, TMP_1075_1_HANDLER_ID, TMP_1075_2_HANDLER_ID, HEATER_ID, IMTQ_HANDLER_ID, \ + PLOC_MPSOC_ID, RW1_ID, RW2_ID, RW3_ID, RW4_ID, RAD_SENSOR_ID, PLOC_SUPV_ID, PLOC_UPDATER_ID, \ + STAR_TRACKER_ID, PLOC_MEMORY_DUMPER_ID, GPS_HANDLER_0_ID, GPS_HANDLER_1_ID LOGGER = get_console_logger() @@ -87,21 +89,35 @@ def pack_service_queue_user(service: Union[str, int], op_code: str, service_queu return pack_single_rw_test_into(object_id=object_id, tc_queue=service_queue, op_code=op_code) if service == CustomServiceList.RAD_SENSOR.value: object_id = RAD_SENSOR_ID - return pack_rad_sensor_test_into(object_id=object_id, tc_queue=service_queue, op_code=op_code) + return pack_rad_sensor_test_into( + object_id=object_id, tc_queue=service_queue, op_code=op_code + ) if service == CustomServiceList.PLOC_SUPV.value: object_id = PLOC_SUPV_ID - return pack_ploc_supv_test_into(object_id=object_id, tc_queue=service_queue, op_code=op_code) + return pack_ploc_supv_test_into( + object_id=object_id, tc_queue=service_queue, op_code=op_code + ) if service == CustomServiceList.PLOC_UPDATER.value: object_id = PLOC_UPDATER_ID - return pack_ploc_updater_test_into(object_id=object_id, tc_queue=service_queue, op_code=op_code) + return pack_ploc_updater_test_into( + object_id=object_id, tc_queue=service_queue, op_code=op_code + ) if service == CustomServiceList.STAR_TRACKER.value: object_id = STAR_TRACKER_ID - return pack_star_tracker_commands_into(object_id=object_id, tc_queue=service_queue, op_code=op_code) + return pack_star_tracker_commands_into( + object_id=object_id, tc_queue=service_queue, op_code=op_code + ) if service == CustomServiceList.CORE.value: return pack_core_commands(tc_queue=service_queue, op_code=op_code) if service == CustomServiceList.PLOC_MEMORY_DUMPER.value: object_id = PLOC_MEMORY_DUMPER_ID - return pack_ploc_memory_dumper_cmd(object_id=object_id, tc_queue=service_queue, op_code=op_code) + return pack_ploc_memory_dumper_cmd( + object_id=object_id, tc_queue=service_queue, op_code=op_code + ) + if service == CustomServiceList.GPS_0.value: + return pack_gps_command(object_id=GPS_HANDLER_0_ID, tc_queue=service_queue, op_code=op_code) + if service == CustomServiceList.GPS_1.value: + return pack_gps_command(object_id=GPS_HANDLER_1_ID, tc_queue=service_queue, op_code=op_code) LOGGER.warning("Invalid Service !")