diff --git a/CHANGELOG.md b/CHANGELOG.md index f351cc8..0a24f0b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,8 @@ list yields a list of all related PRs for each release. https://egit.irs.uni-stuttgart.de/eive/eive-tmtc/pulls/74 https://egit.irs.uni-stuttgart.de/eive/eive-tmtc/pulls/79 https://egit.irs.uni-stuttgart.de/eive/eive-tmtc/pulls/73 +- Add ACU HK parsing + PR: https://egit.irs.uni-stuttgart.de/eive/eive-tmtc/pulls/80 # [v1.11.0] diff --git a/gomspace/gomspace_common.py b/gomspace/gomspace_common.py index aff5d20..6e82950 100644 --- a/gomspace/gomspace_common.py +++ b/gomspace/gomspace_common.py @@ -10,6 +10,7 @@ import enum from tmtccmd.tc.pus_8_funccmd import generate_action_command from tmtccmd.tc.definitions import PusTelecommand +from tmtccmd.utility import ObjectId class GomspaceDeviceActionIds(enum.IntEnum): @@ -43,7 +44,8 @@ class SetIds: PDU_2_AUX = 4 P60_CORE = 5 P60_AUX = 6 - ACU = 7 + ACU_CORE = 7 + ACU_AUX = 8 class TableIds: @@ -67,7 +69,7 @@ class Channel: def pack_get_param_command( - object_id: bytearray, table_id: int, memory_address: bytearray, parameter_size: int + object_id: bytes, table_id: int, memory_address: bytearray, parameter_size: int ) -> PusTelecommand: """Function to generate a command to retrieve parameters like the temperature from a gomspace device. @param object_id: The object id of the gomspace device handler. @@ -88,7 +90,7 @@ def pack_get_param_command( def pack_set_param_command( - object_id: bytearray, + object_id: bytes, memory_address: bytearray, parameter_size: int, parameter: int, @@ -128,7 +130,7 @@ def pack_set_param_command( ) -def pack_ping_command(object_id: bytearray, data: bytearray) -> PusTelecommand: +def pack_ping_command(object_id: ObjectId, data: bytearray) -> PusTelecommand: """ " Function to generate the command to ping a gomspace device @param object_id Object Id of the gomspace device handler. @param data Bytearray containing the bytes to send to the gomspace device. For now the on board software @@ -137,33 +139,35 @@ def pack_ping_command(object_id: bytearray, data: bytearray) -> PusTelecommand: data are simply copied by the device and then sent back. """ return generate_action_command( - object_id=object_id, action_id=GomspaceDeviceActionIds.PING, app_data=data + object_id=object_id.as_bytes, + action_id=GomspaceDeviceActionIds.PING, + app_data=data, ) -def pack_gnd_wdt_reset_command(object_id: bytearray) -> PusTelecommand: +def pack_gnd_wdt_reset_command(object_id: ObjectId) -> PusTelecommand: """ " Function to generate the command to reset the watchdog of a gomspace device. @param object_id Object Id of the gomspace device handler. """ return generate_action_command( - object_id=object_id, action_id=GomspaceDeviceActionIds.WDT_RESET + object_id=object_id.as_bytes, action_id=GomspaceDeviceActionIds.WDT_RESET ) -def pack_reboot_command(object_id: bytearray) -> PusTelecommand: +def pack_reboot_command(object_id: ObjectId) -> PusTelecommand: """Function to generate the command which triggers a reboot of a gomspace device @param object_id The object id of the gomspace device handler. """ return generate_action_command( - object_id=object_id, action_id=GomspaceDeviceActionIds.REBOOT + object_id=object_id.as_bytes, action_id=GomspaceDeviceActionIds.REBOOT ) -def pack_request_full_hk_table_command(object_id: bytearray) -> PusTelecommand: +def pack_request_full_hk_table_command(object_id: ObjectId) -> PusTelecommand: """Function to generate the command to request the full housekeeping table from a gomspace device. @param object_id The object id of the gomspace device handler. """ return generate_action_command( - object_id=object_id, action_id=GomspaceDeviceActionIds.REQUEST_HK_TABLE + object_id=object_id.as_bytes, action_id=GomspaceDeviceActionIds.REQUEST_HK_TABLE ) diff --git a/pus_tc/cmd_definitions.py b/pus_tc/cmd_definitions.py index a0d96fe..6307702 100644 --- a/pus_tc/cmd_definitions.py +++ b/pus_tc/cmd_definitions.py @@ -1,4 +1,5 @@ from pus_tc.devs.gps import GpsOpCodes +from pus_tc.devs.pcdu import add_pcdu_cmds from pus_tc.devs.rad_sensor import add_rad_sens_cmds from tmtccmd.config import ( add_op_code_entry, @@ -479,289 +480,6 @@ def add_time_cmds(cmd_dict: ServiceOpCodeDictT): ) -def add_pcdu_cmds(cmd_dict: ServiceOpCodeDictT): - from pus_tc.devs.p60dock import P60OpCodes, GomspaceOpCodes, Info - from pus_tc.devs.pdu1 import Pdu1OpCodes - from pus_tc.devs.pdu2 import Pdu2OpCodes - from gomspace.gomspace_common import Info as GsInfo - - op_code_dict = dict() - add_op_code_entry( - op_code_dict=op_code_dict, - keys=P60OpCodes.STACK_3V3_ON, - info=Info.STACK_3V3_ON, - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=P60OpCodes.STACK_3V3_OFF, - info=Info.STACK_3V3_OFF, - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=P60OpCodes.STACK_5V_ON, - info=Info.STACK_5V_ON, - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=P60OpCodes.STACK_5V_OFF, - info=Info.STACK_5V_OFF, - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=GomspaceOpCodes.REQUEST_CORE_HK_ONCE, - info=GsInfo.REQUEST_CORE_HK_ONCE, - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=GomspaceOpCodes.REQUEST_AUX_HK_ONCE, - info=GsInfo.REQUEST_AUX_HK_ONCE, - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=GomspaceOpCodes.PRINT_SWITCH_V_I, - info="P60 Dock: Print Switches, Voltages, Currents", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=GomspaceOpCodes.PRINT_LATCHUPS, - info="P60 Dock: Print Latchups", - ) - add_op_code_entry(op_code_dict=op_code_dict, keys=P60OpCodes.TEST, info="P60 Tests") - add_service_op_code_entry( - srv_op_code_dict=cmd_dict, - name=CustomServiceList.P60DOCK.value, - info="P60 Device", - op_code_entry=op_code_dict, - ) - - op_code_dict = dict() - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu1OpCodes.TCS_BOARD_ON.value, - info="PDU1: Turn TCS board on", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu1OpCodes.TCS_BOARD_OFF.value, - info="PDU1: Turn TCS board off", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu1OpCodes.STAR_TRACKER_ON.value, - info="PDU1: Turn star tracker on", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu1OpCodes.STAR_TRACKER_OFF.value, - info="PDU1: Turn star tracker off", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu1OpCodes.SUS_NOMINAL_ON.value, - info="PDU1: Turn SUS nominal on", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu1OpCodes.SUS_NOMINAL_OFF.value, - info="PDU1: Turn SUS nominal off", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu1OpCodes.ACS_A_SIDE_ON.value, - info="PDU1: Turn ACS A side on", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu1OpCodes.ACS_A_SIDE_OFF.value, - info="PDU1: Turn ACS A side off", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu1OpCodes.SYRLINKS_ON.value, - info="PDU1: Turn Syrlinks on", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu1OpCodes.SYRLINKS_OFF.value, - info="PDU1: Turn Syrlinks off", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu1OpCodes.MGT_ON.value, - info="PDU1: Turn MGT on", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu1OpCodes.MGT_OFF.value, - info="PDU1: Turn MGT off", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu1OpCodes.PLOC_ON.value, - info="PDU1: Turn PLOC on", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu1OpCodes.PLOC_OFF.value, - info="PDU1: Turn PLOC off", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu1OpCodes.SCEX_ON.value, - info="PDU1: Turn Solar Cell Experiment on", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu1OpCodes.SCEX_OFF.value, - info="PDU1: Turn Solar Cell Experiment off", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=GomspaceOpCodes.REQUEST_CORE_HK_ONCE, - info=GsInfo.REQUEST_CORE_HK_ONCE, - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=GomspaceOpCodes.REQUEST_AUX_HK_ONCE, - info=GsInfo.REQUEST_AUX_HK_ONCE, - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=GomspaceOpCodes.PRINT_SWITCH_V_I, - info="PDU1: Print Switches, Voltages, Currents", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=GomspaceOpCodes.PRINT_LATCHUPS, - info="PDU1: Print Latchups", - ) - add_op_code_entry( - op_code_dict=op_code_dict, keys=Pdu1OpCodes.TESTS.value, info="PDU1 Tests" - ) - add_service_op_code_entry( - srv_op_code_dict=cmd_dict, - name=CustomServiceList.PDU1.value, - info="PDU1 Device", - op_code_entry=op_code_dict, - ) - - op_code_dict = dict() - add_op_code_entry(op_code_dict=op_code_dict, keys="0", info="PDU2 Tests") - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu2OpCodes.ACS_SIDE_B_ON.value, - info="PDU2: Turn ACS Side B on", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu2OpCodes.ACS_SIDE_B_OFF.value, - info="PDU2: Turn ACS Side B off", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu2OpCodes.SUS_REDUNDANT_ON.value, - info="PDU2: Turn SUS redundant on", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu2OpCodes.SUS_REDUNDANT_OFF.value, - info="PDU2: Turn SUS redundant off", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu2OpCodes.RW_ON.value, - info="PDU2: Turn reaction wheels on", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu2OpCodes.RW_OFF.value, - info="PDU2: Turn reaction wheels off", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu2OpCodes.PL_PCDU_VBAT_NOM_ON.value, - info="PDU2: PL PCDU Switch Channel Nominal (1) on", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu2OpCodes.PL_PCDU_VBAT_NOM_OFF.value, - info="PDU2: PL PCDU Switch Channel Nominal (1) off", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu2OpCodes.PL_PCDU_VBAT_RED_ON.value, - info="PDU2: PL PCDU Switch Channel Redundant (1) on", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu2OpCodes.PL_PCDU_VBAT_RED_OFF.value, - info="PDU2: PL PCDU Switch Channel Redundant (1) off", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu2OpCodes.TCS_HEATER_IN_ON.value, - info="PDU2: Switch TCS Heater Input on", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu2OpCodes.TCS_HEATER_IN_OFF.value, - info="PDU2: Switch TCS Heater Input off", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu2OpCodes.SOLAR_ARRAY_DEPL_ON.value, - info="PDU2: Switch Solar Array Deployment On", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu2OpCodes.SOLAR_ARRAY_DEPL_OFF.value, - info="PDU2: Switch Solar Array Deployment Off", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu2OpCodes.PL_CAMERA_ON.value, - info="PDU2: Turn payload camera on", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu2OpCodes.PL_CAMERA_OFF.value, - info="PDU2: Turn payload camera off", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=GomspaceOpCodes.REQUEST_CORE_HK_ONCE, - info=GsInfo.REQUEST_CORE_HK_ONCE, - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=GomspaceOpCodes.REQUEST_AUX_HK_ONCE, - info=GsInfo.REQUEST_AUX_HK_ONCE, - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=GomspaceOpCodes.PRINT_SWITCH_V_I, - info="PDU2: Print Switches, Voltages, Currents", - options={OpCodeDictKeys.TIMEOUT: 2.0}, - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=GomspaceOpCodes.PRINT_LATCHUPS, - info="PDU2: Print Latchups", - ) - add_service_op_code_entry( - srv_op_code_dict=cmd_dict, - name="pdu2", - info="PDU2 Device", - op_code_entry=op_code_dict, - ) - op_code_dict = { - "0": ("ACU: Tests", {OpCodeDictKeys.TIMEOUT: 2.0}), - "51": ("ACU: Print channel statistics", {OpCodeDictKeys.TIMEOUT: 2.0}), - } - service_tuple = ("ACU Devices", op_code_dict) - cmd_dict[CustomServiceList.ACU.value] = service_tuple - - def add_imtq_cmds(cmd_dict: ServiceOpCodeDictT): op_code_dict_srv_imtq = { "0": ("IMTQ Tests All", {OpCodeDictKeys.TIMEOUT: 2.0}), diff --git a/pus_tc/devs/acu.py b/pus_tc/devs/acu.py index f08494d..06d6533 100644 --- a/pus_tc/devs/acu.py +++ b/pus_tc/devs/acu.py @@ -1,15 +1,105 @@ # -*- coding: utf-8 -*- -""" -@file tmtcc_tc_acu.py -@brief ACU tests -@author J. Meier +"""ACU commands +@author J. Meier, R. Mueller @date 21.12.2020 """ -import struct +from config.definitions import CustomServiceList +from tmtccmd.config import add_op_code_entry, add_service_op_code_entry from tmtccmd.tc.packer import TcQueueT -from tmtccmd.config.definitions import QueueCommands -from gomspace.gomspace_common import * +from tmtccmd.config.definitions import QueueCommands, ServiceOpCodeDictT +from tmtccmd.tc.pus_3_fsfw_hk import ( + make_sid, + generate_one_diag_command, + generate_one_hk_command, +) +import gomspace.gomspace_common as gs +from gomspace.gomspace_common import GomspaceOpCodes +from gomspace.gomspace_common import Info as GsInfo +from config.object_ids import ACU_HANDLER_ID from pus_tc.devs.p60dock import P60DockConfigTable +from tmtccmd.tc.pus_8_funccmd import generate_action_command +from tmtccmd.utility import ObjectId + + +class ACUConfigTable: + mppt_mode = gs.TableEntry(bytearray([0x00, 0x00]), gs.TableEntry.uint8_size) + mppt_d_mode = gs.TableEntry(bytearray([0x00, 0x01]), gs.TableEntry.uint8_size) + vboost = gs.TableEntry(bytearray([0x00, 0x02]), gs.TableEntry.uint16_size) + vbat_max_hi = gs.TableEntry(bytearray([0x00, 0x10]), gs.TableEntry.uint16_size) + vbat_max_lo = gs.TableEntry(bytearray([0x00, 0x12]), gs.TableEntry.uint16_size) + ov_mode = gs.TableEntry(bytearray([0x00, 0x1A]), gs.TableEntry.uint8_size) + + +class ACUHkTable: + temperature1 = gs.TableEntry(bytearray([0x00, 0x1C]), gs.TableEntry.uint16_size) + temperature2 = gs.TableEntry(bytearray([0x00, 0x1D]), gs.TableEntry.uint16_size) + temperature3 = gs.TableEntry(bytearray([0x00, 0x1E]), gs.TableEntry.uint16_size) + # Ground WDT value (remaining seconds until reboot) + wdt_gnd_left = gs.TableEntry(bytearray([0x00, 0x74]), gs.TableEntry.uint32_size) + + +class OpCodes: + TEST = ["0", "test"] + + +class Info: + TEST = "ACU Test" + + +def add_acu_cmds(cmd_dict: ServiceOpCodeDictT): + op_code_dict = dict() + add_op_code_entry( + op_code_dict=op_code_dict, + keys=GomspaceOpCodes.REQUEST_CORE_HK_ONCE, + info=GsInfo.REQUEST_CORE_HK_ONCE, + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=GomspaceOpCodes.REQUEST_AUX_HK_ONCE, + info=GsInfo.REQUEST_AUX_HK_ONCE, + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=GomspaceOpCodes.REQUEST_AUX_HK_ONCE, + info=GsInfo.REQUEST_AUX_HK_ONCE, + ) + add_op_code_entry(op_code_dict=op_code_dict, keys=OpCodes.TEST, info=Info.TEST) + add_service_op_code_entry( + srv_op_code_dict=cmd_dict, + op_code_entry=op_code_dict, + name=CustomServiceList.ACU.value, + info="ACU Device", + ) + + +def pack_acu_commands( + object_id: ObjectId, tc_queue: TcQueueT, op_code: str +) -> TcQueueT: + tc_queue.appendleft((QueueCommands.PRINT, "Handling ACU command")) + if op_code in GomspaceOpCodes.PRINT_SWITCH_V_I: + tc_queue.appendleft((QueueCommands.PRINT, "ACU: Print channel stats")) + command = generate_action_command( + object_id=object_id.as_bytes, + action_id=gs.GomspaceDeviceActionIds.PRINT_SWITCH_V_I, + ) + tc_queue.appendleft(command.pack_command_tuple()) + if op_code in GomspaceOpCodes.REQUEST_CORE_HK_ONCE: + tc_queue.appendleft( + (QueueCommands.PRINT, f"PDU1: {GsInfo.REQUEST_CORE_HK_ONCE}") + ) + hk_sid = make_sid(object_id=object_id.as_bytes, set_id=gs.SetIds.ACU_CORE) + command = generate_one_diag_command(sid=hk_sid, ssc=0) + tc_queue.appendleft(command.pack_command_tuple()) + if op_code in GomspaceOpCodes.REQUEST_AUX_HK_ONCE: + tc_queue.appendleft( + (QueueCommands.PRINT, f"PDU1: {GsInfo.REQUEST_AUX_HK_ONCE}") + ) + hk_sid = make_sid(object_id=object_id.as_bytes, set_id=gs.SetIds.ACU_AUX) + command = generate_one_hk_command(sid=hk_sid, ssc=0) + tc_queue.appendleft(command.pack_command_tuple()) + pack_test_cmds(object_id=object_id, tc_queue=tc_queue) + + return tc_queue class ACUTestProcedure: @@ -26,59 +116,28 @@ class ACUTestProcedure: ping = False read_temperature1 = False read_temperature2 = False - read_temperature3 = True - read_mppt_mode = True - read_vboost = True - read_vbat_max_hi = True - read_vbat_max_lo = True - read_ov_mode = True + read_temperature3 = False + read_mppt_mode = False + read_vboost = False + read_vbat_max_hi = False + read_vbat_max_lo = False + read_ov_mode = False + off = False -class ACUConfigTable: - mppt_mode = TableEntry(bytearray([0x00, 0x00]), TableEntry.uint8_size) - mppt_d_mode = TableEntry(bytearray([0x00, 0x01]), TableEntry.uint8_size) - vboost = TableEntry(bytearray([0x00, 0x02]), TableEntry.uint16_size) - vbat_max_hi = TableEntry(bytearray([0x00, 0x10]), TableEntry.uint16_size) - vbat_max_lo = TableEntry(bytearray([0x00, 0x12]), TableEntry.uint16_size) - ov_mode = TableEntry(bytearray([0x00, 0x1A]), TableEntry.uint8_size) - - -class ACUHkTable: - temperature1 = TableEntry(bytearray([0x00, 0x1C]), TableEntry.uint16_size) - temperature2 = TableEntry(bytearray([0x00, 0x1D]), TableEntry.uint16_size) - temperature3 = TableEntry(bytearray([0x00, 0x1E]), TableEntry.uint16_size) - # Ground WDT value (remaining seconds until reboot) - wdt_gnd_left = TableEntry(bytearray([0x00, 0x74]), TableEntry.uint32_size) - - -class CommandId: - PRINT_CHANNEL_STATS = 51 - - -def pack_acu_test_into( - object_id: bytearray, tc_queue: TcQueueT, op_code: str -) -> TcQueueT: - tc_queue.appendleft((QueueCommands.PRINT, "Testing ACU")) - - if op_code == "51": - tc_queue.appendleft((QueueCommands.PRINT, "ACU: Print channel stats")) - command = object_id + struct.pack("!I", CommandId.PRINT_CHANNEL_STATS) - command = PusTelecommand(service=8, subservice=128, ssc=30, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) - return - +def pack_test_cmds(object_id: ObjectId, tc_queue: TcQueueT): if ACUTestProcedure.all or ACUTestProcedure.reboot: tc_queue.appendleft((QueueCommands.PRINT, "ACU: Reboot")) - command = pack_reboot_command(object_id) + command = gs.pack_reboot_command(object_id) # command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) if ACUTestProcedure.all or ACUTestProcedure.read_gnd_wdt: tc_queue.appendleft( (QueueCommands.PRINT, "ACU: Reading ground watchdog timer value") ) - command = pack_get_param_command( - object_id, - TableIds.hk, + command = gs.pack_get_param_command( + object_id.as_bytes, + gs.TableIds.hk, ACUHkTable.wdt_gnd_left.parameter_address, ACUHkTable.wdt_gnd_left.parameter_size, ) @@ -86,20 +145,20 @@ def pack_acu_test_into( tc_queue.appendleft(command.pack_command_tuple()) if ACUTestProcedure.all or ACUTestProcedure.gnd_wdt_reset: tc_queue.appendleft((QueueCommands.PRINT, "ACU: Testing ground watchdog reset")) - command = pack_gnd_wdt_reset_command(object_id) + command = gs.pack_gnd_wdt_reset_command(object_id) # command = PusTelecommand(service=8, subservice=128, ssc=21, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) if ACUTestProcedure.all or ACUTestProcedure.ping: tc_queue.appendleft((QueueCommands.PRINT, "ACU: Ping Test")) ping_data = bytearray([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]) - command = pack_ping_command(object_id, ping_data) + command = gs.pack_ping_command(object_id, ping_data) # command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) if ACUTestProcedure.all or ACUTestProcedure.read_temperature3: tc_queue.appendleft((QueueCommands.PRINT, "ACU: Reading temperature 3")) - command = pack_get_param_command( - object_id, - TableIds.hk, + command = gs.pack_get_param_command( + object_id.as_bytes, + gs.TableIds.hk, ACUHkTable.temperature3.parameter_address, ACUHkTable.temperature3.parameter_size, ) @@ -107,9 +166,9 @@ def pack_acu_test_into( tc_queue.appendleft(command.pack_command_tuple()) if ACUTestProcedure.all or ACUTestProcedure.read_vboost: tc_queue.appendleft((QueueCommands.PRINT, "ACU: Reading vboost value")) - command = pack_get_param_command( - object_id, - TableIds.config, + command = gs.pack_get_param_command( + object_id.as_bytes, + gs.TableIds.config, ACUConfigTable.vboost.parameter_address, ACUConfigTable.vboost.parameter_size, ) @@ -117,9 +176,9 @@ def pack_acu_test_into( tc_queue.appendleft(command.pack_command_tuple()) if ACUTestProcedure.all or ACUTestProcedure.read_vbat_max_hi: tc_queue.appendleft((QueueCommands.PRINT, "ACU: Reading vbat_max_hi")) - command = pack_get_param_command( - object_id, - TableIds.config, + command = gs.pack_get_param_command( + object_id.as_bytes, + gs.TableIds.config, ACUConfigTable.vbat_max_hi.parameter_address, ACUConfigTable.vbat_max_hi.parameter_size, ) @@ -127,9 +186,9 @@ def pack_acu_test_into( tc_queue.appendleft(command.pack_command_tuple()) if ACUTestProcedure.all or ACUTestProcedure.read_vbat_max_lo: tc_queue.appendleft((QueueCommands.PRINT, "ACU: Reading vbat_max_lo")) - command = pack_get_param_command( - object_id, - TableIds.config, + command = gs.pack_get_param_command( + object_id.as_bytes, + gs.TableIds.config, ACUConfigTable.vbat_max_lo.parameter_address, ACUConfigTable.vbat_max_lo.parameter_size, ) @@ -137,23 +196,20 @@ def pack_acu_test_into( tc_queue.appendleft(command.pack_command_tuple()) if ACUTestProcedure.all or ACUTestProcedure.read_ov_mode: tc_queue.appendleft((QueueCommands.PRINT, "ACU: Reading ov_mode")) - command = pack_get_param_command( - object_id, - TableIds.config, + command = gs.pack_get_param_command( + object_id.as_bytes, + gs.TableIds.config, ACUConfigTable.ov_mode.parameter_address, ACUConfigTable.ov_mode.parameter_size, ) # command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) - - tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Turning off ACU")) - command = pack_set_param_command( - p60dock_object_id, - P60DockConfigTable.out_en_0.parameter_address, - P60DockConfigTable.out_en_0.parameter_size, - Channel.off, - ) - # command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) - - return tc_queue + if ACUTestProcedure.all or ACUTestProcedure.off: + tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Turning off ACU")) + command = gs.pack_set_param_command( + ACU_HANDLER_ID, + P60DockConfigTable.out_en_0.parameter_address, + P60DockConfigTable.out_en_0.parameter_size, + gs.Channel.off, + ) + tc_queue.appendleft(command.pack_command_tuple()) diff --git a/pus_tc/devs/p60dock.py b/pus_tc/devs/p60dock.py index be71cac..c0f395a 100644 --- a/pus_tc/devs/p60dock.py +++ b/pus_tc/devs/p60dock.py @@ -87,11 +87,12 @@ class P60DockHkTable: wdt_gnd_left = TableEntry(bytearray([0x00, 0xA8]), TableEntry.uint32_size) -def pack_p60dock_cmds(object_id: bytearray, tc_queue: TcQueueT, op_code: str): +def pack_p60dock_cmds(object_id: ObjectId, tc_queue: TcQueueT, op_code: str): + objb = object_id.as_bytes if op_code in P60OpCodes.STACK_3V3_ON: tc_queue.appendleft((QueueCommands.PRINT, Info.STACK_3V3_ON)) command = pack_set_param_command( - object_id, + objb, P60DockConfigTable.out_en_9.parameter_address, P60DockConfigTable.out_en_9.parameter_size, Channel.on, @@ -100,7 +101,7 @@ def pack_p60dock_cmds(object_id: bytearray, tc_queue: TcQueueT, op_code: str): if op_code in P60OpCodes.STACK_3V3_OFF: tc_queue.appendleft((QueueCommands.PRINT, Info.STACK_3V3_OFF)) command = pack_set_param_command( - object_id, + objb, P60DockConfigTable.out_en_9.parameter_address, P60DockConfigTable.out_en_9.parameter_size, Channel.off, @@ -109,7 +110,7 @@ def pack_p60dock_cmds(object_id: bytearray, tc_queue: TcQueueT, op_code: str): if op_code in P60OpCodes.STACK_5V_ON: tc_queue.appendleft((QueueCommands.PRINT, Info.STACK_5V_ON)) command = pack_set_param_command( - object_id, + objb, P60DockConfigTable.out_en_10.parameter_address, P60DockConfigTable.out_en_10.parameter_size, Channel.on, @@ -118,7 +119,7 @@ def pack_p60dock_cmds(object_id: bytearray, tc_queue: TcQueueT, op_code: str): if op_code in P60OpCodes.STACK_5V_OFF: tc_queue.appendleft((QueueCommands.PRINT, Info.STACK_5V_OFF)) command = pack_set_param_command( - object_id, + objb, P60DockConfigTable.out_en_10.parameter_address, P60DockConfigTable.out_en_10.parameter_size, Channel.off, @@ -143,13 +144,13 @@ def pack_p60dock_cmds(object_id: bytearray, tc_queue: TcQueueT, op_code: str): (QueueCommands.PRINT, "P60 Dock: Print Switches, Voltages, Currents") ) command = generate_action_command( - object_id=object_id, action_id=GomspaceDeviceActionIds.PRINT_SWITCH_V_I + object_id=objb, action_id=GomspaceDeviceActionIds.PRINT_SWITCH_V_I ) tc_queue.appendleft(command.pack_command_tuple()) if op_code in GomspaceOpCodes.PRINT_LATCHUPS: tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Print Latchups")) command = generate_action_command( - object_id=object_id, action_id=GomspaceDeviceActionIds.PRINT_LATCHUPS + object_id=objb, action_id=GomspaceDeviceActionIds.PRINT_LATCHUPS ) tc_queue.appendleft(command.pack_command_tuple()) if P60DockTestProcedure.all or P60DockTestProcedure.reboot: @@ -162,7 +163,7 @@ def pack_p60dock_cmds(object_id: bytearray, tc_queue: TcQueueT, op_code: str): (QueueCommands.PRINT, "P60 Dock: Reading ground watchdog timer value") ) command = pack_get_param_command( - object_id, + objb, TableIds.hk, P60DockHkTable.wdt_gnd_left.parameter_address, P60DockHkTable.wdt_gnd_left.parameter_size, @@ -188,7 +189,7 @@ def pack_p60dock_cmds(object_id: bytearray, tc_queue: TcQueueT, op_code: str): ) parameter = 0 # set channel off command = pack_set_param_command( - object_id, + objb, P60DockConfigTable.out_en_3.parameter_address, P60DockConfigTable.out_en_3.parameter_size, parameter, @@ -200,7 +201,7 @@ def pack_p60dock_cmds(object_id: bytearray, tc_queue: TcQueueT, op_code: str): (QueueCommands.PRINT, "P60 Dock: Testing temperature reading") ) command = pack_get_param_command( - object_id, + objb, TableIds.hk, P60DockHkTable.temperature1.parameter_address, P60DockHkTable.temperature1.parameter_size, @@ -212,7 +213,7 @@ def pack_p60dock_cmds(object_id: bytearray, tc_queue: TcQueueT, op_code: str): (QueueCommands.PRINT, "P60 Dock: Testing Output Channel 3 state (PDU2)") ) command = pack_get_param_command( - object_id, + objb, TableIds.config, P60DockConfigTable.out_en_3.parameter_address, P60DockConfigTable.out_en_3.parameter_size, @@ -227,7 +228,7 @@ def pack_p60dock_cmds(object_id: bytearray, tc_queue: TcQueueT, op_code: str): ) ) command = pack_get_param_command( - object_id, + objb, TableIds.config, P60DockConfigTable.cur_lu_lim_0.parameter_address, P60DockConfigTable.cur_lu_lim_0.parameter_size, @@ -240,7 +241,7 @@ def pack_p60dock_cmds(object_id: bytearray, tc_queue: TcQueueT, op_code: str): ) parameter = 1 # set channel on command = pack_set_param_command( - object_id, + objb, P60DockConfigTable.out_en_3.parameter_address, P60DockConfigTable.out_en_3.parameter_size, parameter, @@ -253,7 +254,7 @@ def pack_p60dock_cmds(object_id: bytearray, tc_queue: TcQueueT, op_code: str): ) table_id_invalid = 5 command = pack_get_param_command( - object_id, + objb, table_id_invalid, P60DockHkTable.temperature1.parameter_address, P60DockHkTable.temperature1.parameter_size, @@ -269,7 +270,7 @@ def pack_p60dock_cmds(object_id: bytearray, tc_queue: TcQueueT, op_code: str): ) invalid_address = bytearray([0x01, 0xF4]) command = pack_get_param_command( - object_id, + objb, TableIds.hk, invalid_address, P60DockHkTable.temperature1.parameter_size, @@ -286,7 +287,7 @@ def pack_p60dock_cmds(object_id: bytearray, tc_queue: TcQueueT, op_code: str): parameter_size = 2 parameter = 1 command = pack_set_param_command( - object_id, invalid_address, parameter_size, parameter + objb, invalid_address, parameter_size, parameter ) # command = PusTelecommand(service=8, subservice=128, ssc=30, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) @@ -299,7 +300,7 @@ def pack_p60dock_cmds(object_id: bytearray, tc_queue: TcQueueT, op_code: str): ) invalid_size = 5 command = pack_get_param_command( - object_id, + objb, TableIds.hk, P60DockHkTable.temperature1.parameter_address, invalid_size, @@ -314,7 +315,7 @@ def pack_p60dock_cmds(object_id: bytearray, tc_queue: TcQueueT, op_code: str): ) parameter = 1 command = pack_set_param_command( - object_id, + objb, P60DockConfigTable.out_en_3.parameter_address, invalid_size, parameter, diff --git a/pus_tc/devs/pcdu.py b/pus_tc/devs/pcdu.py new file mode 100644 index 0000000..bd95d3d --- /dev/null +++ b/pus_tc/devs/pcdu.py @@ -0,0 +1,297 @@ +from config.definitions import CustomServiceList +from tmtccmd.config import ( + ServiceOpCodeDictT, + add_op_code_entry, + add_service_op_code_entry, + OpCodeDictKeys, +) + +from pus_tc.devs.p60dock import P60OpCodes, GomspaceOpCodes, Info +from pus_tc.devs.pdu1 import Pdu1OpCodes +from pus_tc.devs.pdu2 import Pdu2OpCodes +from pus_tc.devs.acu import add_acu_cmds +from gomspace.gomspace_common import Info as GsInfo + + +def add_p60_cmds(cmd_dict: ServiceOpCodeDictT): + op_code_dict = dict() + add_op_code_entry( + op_code_dict=op_code_dict, + keys=P60OpCodes.STACK_3V3_ON, + info=Info.STACK_3V3_ON, + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=P60OpCodes.STACK_3V3_OFF, + info=Info.STACK_3V3_OFF, + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=P60OpCodes.STACK_5V_ON, + info=Info.STACK_5V_ON, + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=P60OpCodes.STACK_5V_OFF, + info=Info.STACK_5V_OFF, + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=GomspaceOpCodes.REQUEST_CORE_HK_ONCE, + info=GsInfo.REQUEST_CORE_HK_ONCE, + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=GomspaceOpCodes.REQUEST_AUX_HK_ONCE, + info=GsInfo.REQUEST_AUX_HK_ONCE, + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=GomspaceOpCodes.PRINT_SWITCH_V_I, + info="P60 Dock: Print Switches, Voltages, Currents", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=GomspaceOpCodes.PRINT_LATCHUPS, + info="P60 Dock: Print Latchups", + ) + add_op_code_entry(op_code_dict=op_code_dict, keys=P60OpCodes.TEST, info="P60 Tests") + add_service_op_code_entry( + srv_op_code_dict=cmd_dict, + name=CustomServiceList.P60DOCK.value, + info="P60 Device", + op_code_entry=op_code_dict, + ) + + +def add_pdu1_cmds(cmd_dict: ServiceOpCodeDictT): + op_code_dict = dict() + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu1OpCodes.TCS_BOARD_OFF.value, + info="PDU1: Turn TCS board off", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu1OpCodes.STAR_TRACKER_ON.value, + info="PDU1: Turn star tracker on", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu1OpCodes.STAR_TRACKER_OFF.value, + info="PDU1: Turn star tracker off", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu1OpCodes.SUS_NOMINAL_ON.value, + info="PDU1: Turn SUS nominal on", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu1OpCodes.SUS_NOMINAL_OFF.value, + info="PDU1: Turn SUS nominal off", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu1OpCodes.ACS_A_SIDE_ON.value, + info="PDU1: Turn ACS A side on", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu1OpCodes.ACS_A_SIDE_OFF.value, + info="PDU1: Turn ACS A side off", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu1OpCodes.SYRLINKS_ON.value, + info="PDU1: Turn Syrlinks on", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu1OpCodes.SYRLINKS_OFF.value, + info="PDU1: Turn Syrlinks off", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu1OpCodes.MGT_ON.value, + info="PDU1: Turn MGT on", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu1OpCodes.MGT_OFF.value, + info="PDU1: Turn MGT off", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu1OpCodes.PLOC_ON.value, + info="PDU1: Turn PLOC on", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu1OpCodes.PLOC_OFF.value, + info="PDU1: Turn PLOC off", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu1OpCodes.SCEX_ON.value, + info="PDU1: Turn Solar Cell Experiment on", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu1OpCodes.SCEX_OFF.value, + info="PDU1: Turn Solar Cell Experiment off", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=GomspaceOpCodes.REQUEST_CORE_HK_ONCE, + info=GsInfo.REQUEST_CORE_HK_ONCE, + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=GomspaceOpCodes.REQUEST_AUX_HK_ONCE, + info=GsInfo.REQUEST_AUX_HK_ONCE, + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=GomspaceOpCodes.PRINT_SWITCH_V_I, + info="PDU1: Print Switches, Voltages, Currents", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu1OpCodes.TCS_BOARD_ON.value, + info="PDU1: Turn TCS board on", + ) + + add_op_code_entry( + op_code_dict=op_code_dict, + keys=GomspaceOpCodes.PRINT_LATCHUPS, + info="PDU1: Print Latchups", + ) + add_op_code_entry( + op_code_dict=op_code_dict, keys=Pdu1OpCodes.TESTS.value, info="PDU1 Tests" + ) + add_service_op_code_entry( + srv_op_code_dict=cmd_dict, + name=CustomServiceList.PDU1.value, + info="PDU1 Device", + op_code_entry=op_code_dict, + ) + + +def add_pdu2_cmds(cmd_dict: ServiceOpCodeDictT): + op_code_dict = dict() + add_op_code_entry(op_code_dict=op_code_dict, keys="0", info="PDU2 Tests") + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu2OpCodes.ACS_SIDE_B_ON.value, + info="PDU2: Turn ACS Side B on", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu2OpCodes.ACS_SIDE_B_OFF.value, + info="PDU2: Turn ACS Side B off", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu2OpCodes.SUS_REDUNDANT_ON.value, + info="PDU2: Turn SUS redundant on", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu2OpCodes.SUS_REDUNDANT_OFF.value, + info="PDU2: Turn SUS redundant off", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu2OpCodes.RW_ON.value, + info="PDU2: Turn reaction wheels on", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu2OpCodes.RW_OFF.value, + info="PDU2: Turn reaction wheels off", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu2OpCodes.PL_PCDU_VBAT_NOM_ON.value, + info="PDU2: PL PCDU Switch Channel Nominal (1) on", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu2OpCodes.PL_PCDU_VBAT_NOM_OFF.value, + info="PDU2: PL PCDU Switch Channel Nominal (1) off", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu2OpCodes.PL_PCDU_VBAT_RED_ON.value, + info="PDU2: PL PCDU Switch Channel Redundant (1) on", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu2OpCodes.PL_PCDU_VBAT_RED_OFF.value, + info="PDU2: PL PCDU Switch Channel Redundant (1) off", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu2OpCodes.TCS_HEATER_IN_ON.value, + info="PDU2: Switch TCS Heater Input on", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu2OpCodes.TCS_HEATER_IN_OFF.value, + info="PDU2: Switch TCS Heater Input off", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu2OpCodes.SOLAR_ARRAY_DEPL_ON.value, + info="PDU2: Switch Solar Array Deployment On", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu2OpCodes.SOLAR_ARRAY_DEPL_OFF.value, + info="PDU2: Switch Solar Array Deployment Off", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu2OpCodes.PL_CAMERA_ON.value, + info="PDU2: Turn payload camera on", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu2OpCodes.PL_CAMERA_OFF.value, + info="PDU2: Turn payload camera off", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=GomspaceOpCodes.REQUEST_CORE_HK_ONCE, + info=GsInfo.REQUEST_CORE_HK_ONCE, + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=GomspaceOpCodes.REQUEST_AUX_HK_ONCE, + info=GsInfo.REQUEST_AUX_HK_ONCE, + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=GomspaceOpCodes.PRINT_SWITCH_V_I, + info="PDU2: Print Switches, Voltages, Currents", + options={OpCodeDictKeys.TIMEOUT: 2.0}, + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=GomspaceOpCodes.PRINT_LATCHUPS, + info="PDU2: Print Latchups", + ) + add_service_op_code_entry( + srv_op_code_dict=cmd_dict, + name="pdu2", + info="PDU2 Device", + op_code_entry=op_code_dict, + ) + + +def add_pcdu_cmds(cmd_dict: ServiceOpCodeDictT): + add_p60_cmds(cmd_dict) + add_pdu1_cmds(cmd_dict) + add_pdu2_cmds(cmd_dict) + add_acu_cmds(cmd_dict) diff --git a/pus_tc/devs/pdu1.py b/pus_tc/devs/pdu1.py index 08ddbd9..04c042e 100644 --- a/pus_tc/devs/pdu1.py +++ b/pus_tc/devs/pdu1.py @@ -54,13 +54,13 @@ class PDU1TestProcedure: turn_channel_3_off = False -def pack_pdu1_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): +def pack_pdu1_commands(object_id: ObjectId, tc_queue: TcQueueT, op_code: str): tc_queue.appendleft((QueueCommands.PRINT, "Commanding PDU1")) - + objb = object_id.as_bytes if op_code == Pdu1OpCodes.TCS_BOARD_ON.value: tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn TCS board on")) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_0.parameter_address, PDUConfigTable.out_en_0.parameter_size, Channel.on, @@ -69,7 +69,7 @@ def pack_pdu1_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): if op_code == Pdu1OpCodes.TCS_BOARD_OFF.value: tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn TCS board off")) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_0.parameter_address, PDUConfigTable.out_en_0.parameter_size, Channel.off, @@ -78,7 +78,7 @@ def pack_pdu1_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): if op_code == Pdu1OpCodes.STAR_TRACKER_ON.value: tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn star tracker on")) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_2.parameter_address, PDUConfigTable.out_en_2.parameter_size, Channel.on, @@ -87,7 +87,7 @@ def pack_pdu1_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): if op_code == Pdu1OpCodes.STAR_TRACKER_OFF.value: tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn star tracker off")) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_2.parameter_address, PDUConfigTable.out_en_2.parameter_size, Channel.off, @@ -96,7 +96,7 @@ def pack_pdu1_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): if op_code == Pdu1OpCodes.SUS_NOMINAL_ON.value: tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn SUS nominal on")) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_4.parameter_address, PDUConfigTable.out_en_4.parameter_size, Channel.on, @@ -105,7 +105,7 @@ def pack_pdu1_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): if op_code == Pdu1OpCodes.SUS_NOMINAL_OFF.value: tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn SUS nominal off")) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_4.parameter_address, PDUConfigTable.out_en_4.parameter_size, Channel.off, @@ -114,7 +114,7 @@ def pack_pdu1_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): if op_code == Pdu1OpCodes.ACS_A_SIDE_ON.value: tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn ACS Side A on")) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_7.parameter_address, PDUConfigTable.out_en_7.parameter_size, Channel.on, @@ -123,7 +123,7 @@ def pack_pdu1_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): if op_code == Pdu1OpCodes.ACS_A_SIDE_OFF.value: tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn ACS Side A off")) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_7.parameter_address, PDUConfigTable.out_en_7.parameter_size, Channel.off, @@ -132,7 +132,7 @@ def pack_pdu1_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): if op_code == Pdu1OpCodes.SUS_NOMINAL_OFF.value: tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn SUS nominal off")) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_4.parameter_address, PDUConfigTable.out_en_4.parameter_size, Channel.off, @@ -143,7 +143,7 @@ def pack_pdu1_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): (QueueCommands.PRINT, "PDU1: Turn Solar Cell Experiment on") ) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_5.parameter_address, PDUConfigTable.out_en_5.parameter_size, Channel.on, @@ -154,7 +154,7 @@ def pack_pdu1_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): (QueueCommands.PRINT, "PDU1: Turn Solar Cell Experiment off") ) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_5.parameter_address, PDUConfigTable.out_en_5.parameter_size, Channel.off, @@ -163,7 +163,7 @@ def pack_pdu1_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): if op_code == Pdu1OpCodes.SYRLINKS_ON.value: tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn Syrlinks on")) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_1.parameter_address, PDUConfigTable.out_en_1.parameter_size, Channel.on, @@ -172,7 +172,7 @@ def pack_pdu1_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): if op_code == Pdu1OpCodes.SYRLINKS_OFF.value: tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn Syrlinks off")) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_1.parameter_address, PDUConfigTable.out_en_1.parameter_size, Channel.off, @@ -181,7 +181,7 @@ def pack_pdu1_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): if op_code == Pdu1OpCodes.MGT_ON.value: tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn MGT on")) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_3.parameter_address, PDUConfigTable.out_en_3.parameter_size, Channel.on, @@ -190,7 +190,7 @@ def pack_pdu1_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): if op_code == Pdu1OpCodes.MGT_OFF.value: tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn MGT off")) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_3.parameter_address, PDUConfigTable.out_en_3.parameter_size, Channel.off, @@ -199,7 +199,7 @@ def pack_pdu1_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): if op_code == Pdu1OpCodes.PLOC_ON.value: tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn PLOC on")) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_6.parameter_address, PDUConfigTable.out_en_6.parameter_size, Channel.on, @@ -208,7 +208,7 @@ def pack_pdu1_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): if op_code == Pdu1OpCodes.PLOC_OFF.value: tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn PLOC off")) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_6.parameter_address, PDUConfigTable.out_en_6.parameter_size, Channel.off, @@ -229,13 +229,13 @@ def pack_pdu1_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): (QueueCommands.PRINT, "PDU1: Print Switches, Voltages, Currents") ) command = generate_action_command( - object_id=object_id, action_id=GomspaceDeviceActionIds.PRINT_SWITCH_V_I + object_id=objb, action_id=GomspaceDeviceActionIds.PRINT_SWITCH_V_I ) tc_queue.appendleft(command.pack_command_tuple()) if op_code in GomspaceOpCodes.PRINT_LATCHUPS: tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Print Latchups")) command = generate_action_command( - object_id=object_id, action_id=GomspaceDeviceActionIds.PRINT_LATCHUPS + object_id=objb, action_id=GomspaceDeviceActionIds.PRINT_LATCHUPS ) tc_queue.appendleft(command.pack_command_tuple()) if PDU1TestProcedure.all or PDU1TestProcedure.ping: @@ -246,7 +246,7 @@ def pack_pdu1_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): if PDU1TestProcedure.all or PDU1TestProcedure.read_temperature: tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Testing temperature reading")) command = pack_get_param_command( - object_id, + objb, TableIds.hk, PDUHkTable.temperature.parameter_address, PDUHkTable.temperature.parameter_size, @@ -257,7 +257,7 @@ def pack_pdu1_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): (QueueCommands.PRINT, "PDU1: Turn channel 2 on (Star Tracker)") ) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_2.parameter_address, PDUConfigTable.out_en_2.parameter_size, Channel.on, @@ -268,7 +268,7 @@ def pack_pdu1_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): (QueueCommands.PRINT, "PDU1: Turn channel 2 off (Star Tracker)") ) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_2.parameter_address, PDUConfigTable.out_en_2.parameter_size, Channel.off, @@ -277,7 +277,7 @@ def pack_pdu1_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): if PDU1TestProcedure.all or PDU1TestProcedure.turn_channel_3_on: tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn channel 3 on (MTQ)")) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_3.parameter_address, PDUConfigTable.out_en_3.parameter_size, Channel.on, @@ -286,7 +286,7 @@ def pack_pdu1_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): if PDU1TestProcedure.all or PDU1TestProcedure.turn_channel_3_off: tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn channel 3 off (MTQ)")) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_3.parameter_address, PDUConfigTable.out_en_3.parameter_size, Channel.off, diff --git a/pus_tc/devs/pdu2.py b/pus_tc/devs/pdu2.py index 12c1636..df4c883 100644 --- a/pus_tc/devs/pdu2.py +++ b/pus_tc/devs/pdu2.py @@ -66,13 +66,13 @@ class PDU2TestProcedure: request_hk_table = False -def pack_pdu2_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): +def pack_pdu2_commands(object_id: ObjectId, tc_queue: TcQueueT, op_code: str): tc_queue.appendleft((QueueCommands.PRINT, "Testing PDU2")) - + objb = object_id.as_bytes if op_code == Pdu2OpCodes.ACS_SIDE_B_ON.value: tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Turn ACS Side B on")) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_7.parameter_address, PDUConfigTable.out_en_7.parameter_size, Channel.on, @@ -82,7 +82,7 @@ def pack_pdu2_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): if op_code == Pdu2OpCodes.ACS_SIDE_B_OFF.value: tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Turn ACS Side B off")) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_7.parameter_address, PDUConfigTable.out_en_7.parameter_size, Channel.off, @@ -92,7 +92,7 @@ def pack_pdu2_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): if op_code == Pdu2OpCodes.Q7S_OFF.value: tc_queue.appendleft((QueueCommands.PRINT, "Turning off Q7S OBC")) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_0.parameter_address, PDUConfigTable.out_en_0.parameter_size, Channel.off, @@ -101,7 +101,7 @@ def pack_pdu2_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): if op_code == Pdu2OpCodes.SUS_REDUNDANT_ON.value: tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Turn SUS redundant on")) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_4.parameter_address, PDUConfigTable.out_en_4.parameter_size, Channel.on, @@ -110,7 +110,7 @@ def pack_pdu2_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): if op_code == Pdu2OpCodes.SUS_REDUNDANT_OFF.value: tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Turn SUS redundant off")) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_4.parameter_address, PDUConfigTable.out_en_4.parameter_size, Channel.off, @@ -119,7 +119,7 @@ def pack_pdu2_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): if op_code == Pdu2OpCodes.RW_ON.value: tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Turn reaction wheels on")) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_2.parameter_address, PDUConfigTable.out_en_2.parameter_size, Channel.on, @@ -128,7 +128,7 @@ def pack_pdu2_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): if op_code == Pdu2OpCodes.RW_OFF.value: tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Turn reaction wheels off")) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_2.parameter_address, PDUConfigTable.out_en_2.parameter_size, Channel.off, @@ -139,7 +139,7 @@ def pack_pdu2_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): (QueueCommands.PRINT, "PDU2: Turn PDU2 PL PCDU Channel 1 on") ) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_1.parameter_address, PDUConfigTable.out_en_1.parameter_size, Channel.on, @@ -150,7 +150,7 @@ def pack_pdu2_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): (QueueCommands.PRINT, "PDU2: Turn PDU2 PL PCDU Channel 1 off") ) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_1.parameter_address, PDUConfigTable.out_en_1.parameter_size, Channel.off, @@ -161,7 +161,7 @@ def pack_pdu2_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): (QueueCommands.PRINT, "PDU2: Turn PDU2 PL PCDU Channel 6 on") ) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_6.parameter_address, PDUConfigTable.out_en_6.parameter_size, Channel.off, @@ -172,7 +172,7 @@ def pack_pdu2_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): (QueueCommands.PRINT, "PDU2: Turn PDU2 PL PCDU Channel 6 off") ) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_6.parameter_address, PDUConfigTable.out_en_6.parameter_size, Channel.off, @@ -181,7 +181,7 @@ def pack_pdu2_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): if op_code == Pdu2OpCodes.TCS_HEATER_IN_ON.value: tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Turn TCS Heater Input on")) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_3.parameter_address, PDUConfigTable.out_en_3.parameter_size, Channel.on, @@ -190,7 +190,7 @@ def pack_pdu2_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): if op_code == Pdu2OpCodes.TCS_HEATER_IN_OFF.value: tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Turn TCS Heater Input off")) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_3.parameter_address, PDUConfigTable.out_en_3.parameter_size, Channel.off, @@ -201,7 +201,7 @@ def pack_pdu2_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): (QueueCommands.PRINT, "PDU2: Turn Solar Array Deployment On") ) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_5.parameter_address, PDUConfigTable.out_en_5.parameter_size, Channel.on, @@ -212,7 +212,7 @@ def pack_pdu2_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): (QueueCommands.PRINT, "PDU2: Turn Solar Array Deployment Off") ) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_5.parameter_address, PDUConfigTable.out_en_5.parameter_size, Channel.off, @@ -221,7 +221,7 @@ def pack_pdu2_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): if op_code == Pdu2OpCodes.PL_CAMERA_ON.value: tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Turn payload camera on")) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_8.parameter_address, PDUConfigTable.out_en_8.parameter_size, Channel.on, @@ -230,7 +230,7 @@ def pack_pdu2_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): if op_code == Pdu2OpCodes.PL_CAMERA_OFF.value: tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Turn payload camera off")) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_8.parameter_address, PDUConfigTable.out_en_8.parameter_size, Channel.off, @@ -251,13 +251,13 @@ def pack_pdu2_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): (QueueCommands.PRINT, "PDU2: Print Switches, Currents, Voltahes") ) command = generate_action_command( - object_id=object_id, action_id=GomspaceDeviceActionIds.PRINT_SWITCH_V_I + object_id=objb, action_id=GomspaceDeviceActionIds.PRINT_SWITCH_V_I ) tc_queue.appendleft(command.pack_command_tuple()) if op_code in GomspaceOpCodes.PRINT_LATCHUPS: tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Print Latchups")) command = generate_action_command( - object_id=object_id, action_id=GomspaceDeviceActionIds.PRINT_LATCHUPS + object_id=objb, action_id=GomspaceDeviceActionIds.PRINT_LATCHUPS ) tc_queue.appendleft(command.pack_command_tuple()) if PDU2TestProcedure.all or PDU2TestProcedure.reboot: @@ -269,7 +269,7 @@ def pack_pdu2_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): (QueueCommands.PRINT, "PDU2: Reading ground watchdog timer value") ) command = pack_get_param_command( - object_id, + objb, TableIds.hk, PDUHkTable.wdt_gnd_left.parameter_address, PDUHkTable.wdt_gnd_left.parameter_size, @@ -294,7 +294,7 @@ def pack_pdu2_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): ) ) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_2.parameter_address, PDUConfigTable.out_en_2.parameter_size, Channel.on, @@ -303,7 +303,7 @@ def pack_pdu2_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): if PDU2TestProcedure.all or PDU2TestProcedure.read_temperature: tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Testing temperature reading")) command = pack_get_param_command( - object_id, + objb, TableIds.hk, PDUHkTable.temperature.parameter_address, PDUHkTable.temperature.parameter_size, @@ -314,7 +314,7 @@ def pack_pdu2_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): (QueueCommands.PRINT, "PDU2: Reading output channel 2 state (TCS Heater)") ) command = pack_get_param_command( - object_id, + objb, TableIds.config, PDUConfigTable.out_en_2.parameter_address, PDUConfigTable.out_en_2.parameter_size, @@ -328,7 +328,7 @@ def pack_pdu2_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): ) ) command = pack_get_param_command( - object_id, + objb, TableIds.config, PDUConfigTable.cur_lu_lim_0.parameter_address, PDUConfigTable.cur_lu_lim_0.parameter_size, @@ -339,7 +339,7 @@ def pack_pdu2_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): (QueueCommands.PRINT, "PDU2: Testing setting output channel 2 off") ) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_2.parameter_address, PDUConfigTable.out_en_2.parameter_size, Channel.off, diff --git a/pus_tc/tc_packer_hook.py b/pus_tc/tc_packer_hook.py index 049667e..137877b 100644 --- a/pus_tc/tc_packer_hook.py +++ b/pus_tc/tc_packer_hook.py @@ -21,7 +21,7 @@ from pus_tc.devs.p60dock import pack_p60dock_cmds from pus_tc.devs.pdu2 import pack_pdu2_commands from pus_tc.devs.pdu1 import pack_pdu1_commands from pus_tc.devs.bpx_batt import pack_bpx_commands -from pus_tc.devs.acu import pack_acu_test_into +from pus_tc.devs.acu import pack_acu_commands from pus_tc.devs.solar_array_deployment import pack_solar_array_deployment_test_into from pus_tc.devs.imtq import pack_imtq_test_into from pus_tc.devs.tmp1075 import pack_tmp1075_test_into @@ -45,6 +45,7 @@ from pus_tc.system.proc import pack_proc_commands from pus_tc.system.controllers import pack_controller_commands from config.definitions import CustomServiceList from config.object_ids import ( + get_object_ids, P60_DOCK_HANDLER, PDU_1_HANDLER_ID, PDU_2_HANDLER_ID, @@ -110,7 +111,7 @@ def pack_service_queue_user( if service == CoreServiceList.SERVICE_200.value: return pack_service200_test_into(tc_queue=service_queue) if service == CustomServiceList.P60DOCK.value: - object_id = P60_DOCK_HANDLER + object_id = obj_id_man.get(P60_DOCK_HANDLER) return pack_p60dock_cmds( object_id=object_id, tc_queue=service_queue, op_code=op_code ) @@ -119,18 +120,18 @@ def pack_service_queue_user( object_id=None, tc_queue=service_queue, op_code=op_code ) if service == CustomServiceList.PDU1.value: - object_id = PDU_1_HANDLER_ID + object_id = obj_id_man.get(PDU_1_HANDLER_ID) return pack_pdu1_commands( object_id=object_id, tc_queue=service_queue, op_code=op_code ) if service == CustomServiceList.PDU2.value: - object_id = PDU_2_HANDLER_ID + object_id = obj_id_man.get(PDU_2_HANDLER_ID) return pack_pdu2_commands( object_id=object_id, tc_queue=service_queue, op_code=op_code ) if service == CustomServiceList.ACU.value: - object_id = ACU_HANDLER_ID - return pack_acu_test_into( + object_id = obj_id_man.get(ACU_HANDLER_ID) + return pack_acu_commands( object_id=object_id, tc_queue=service_queue, op_code=op_code ) if service == CustomServiceList.BPX_BATTERY.value: diff --git a/pus_tm/devs/pcdu.py b/pus_tm/devs/pcdu.py index d33ddc6..ae24d7e 100644 --- a/pus_tm/devs/pcdu.py +++ b/pus_tm/devs/pcdu.py @@ -1,4 +1,5 @@ import struct +from typing import List, Tuple from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter from pus_tm.defs import PrintWrapper @@ -87,6 +88,49 @@ class WdtInfo: return current_idx +class DevicesInfoParser: + def __init__(self): + self.dev_types = None + self.dev_statuses = None + + def parse(self, hk_data: bytes, current_idx: int) -> int: + self.dev_types = [] + self.dev_statuses = [] + for idx in range(8): + self.dev_types.append(hk_data[current_idx]) + current_idx += 1 + for idx in range(8): + self.dev_statuses.append(hk_data[current_idx]) + current_idx += 1 + return current_idx + + def map_idx_to_type(self, devtype: int) -> str: + if devtype == 0: + return "Reserved" + if devtype == 1: + return "ADC" + if devtype == 2: + return "ADC" + if devtype == 3: + return "DAC" + if devtype == 4: + return "Temperature Sensor" + if devtype == 5: + return "Temperature Sensor (Bat Pack)" + if devtype == 6: + return "RTC" + if devtype == 7: + return "FRAM" + return "Unknown Type" + + def print(self, pw: PrintWrapper): + pw.dlog(f"Device Type | Device State (0:None | 1:OK | 3:ERROR | 4:NOT FOUND)") + for i in range(len(self.dev_types)): + pw.dlog( + f"{self.map_idx_to_type(self.dev_types[i])} | {self.dev_statuses[i]}" + ) + + def handle_pdu_data( printer: FsfwTmTcPrinter, pdu_idx: int, set_id: int, hk_data: bytes ): @@ -123,17 +167,13 @@ def handle_pdu_data( ) pw.dlog(content_line) current_idx += 2 - device_types = [] - for idx in range(len(PDU1_CHANNELS_NAMES)): - device_types.append(hk_data[current_idx]) - current_idx += 1 - device_statuses = [] - for idx in range(len(PDU1_CHANNELS_NAMES)): - device_statuses.append(hk_data[current_idx]) - current_idx += 1 + dev_parser = DevicesInfoParser() + current_idx = dev_parser.parse(hk_data=hk_data, current_idx=current_idx) wdt = WdtInfo(pw=pw) current_idx = wdt.parse(wdt_data=hk_data[current_idx:], current_idx=current_idx) wdt.print() + pw.dlog(f"PDU Device Types: 0:FRAM|1:ADC|2:ADC|3:TempSens|4,5,6,7:Reserved") + dev_parser.print(pw=pw) if set_id == SetIds.PDU_1_CORE or set_id == SetIds.PDU_2_CORE: pw.dlog(f"Received PDU HK from PDU {pdu_idx}") current_list = [] @@ -153,8 +193,7 @@ def handle_pdu_data( output_enb_list.append(hk_data[current_idx]) current_idx += 1 header_str = f"{'Name'.ljust(24)} | OutEnb | U [mV] | I [mA]" - print(header_str) - printer.file_logger.info(header_str) + pw.dlog(header_str) for idx in range(len(PDU1_CHANNELS_NAMES)): out_enb = f"{output_enb_list[idx]}".ljust(6) content_line = ( @@ -196,8 +235,7 @@ def handle_p60_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): out_enb_list.append(hk_data[current_idx]) current_idx += 1 header_str = f"{'Name'.ljust(24)} | OutEnb | U [mV] | I [mA]" - print(header_str) - printer.file_logger.info(header_str) + pw.dlog(header_str) for idx in range(13): out_enb = f"{out_enb_list[idx]}".ljust(6) content_line = ( @@ -262,14 +300,8 @@ def handle_p60_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): ar6_depl, ) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len]) current_idx += inc_len - device_types = [] - device_statuses = [] - for idx in range(8): - device_types.append(hk_data[current_idx]) - current_idx += 1 - for idx in range(8): - device_statuses.append(hk_data[current_idx]) - current_idx += 1 + dev_parser = DevicesInfoParser() + current_idx = dev_parser.parse(hk_data=hk_data, current_idx=current_idx) util_info = ( f"Reset Cause {reset_cause} | Boot Cause {boot_cause} | Uptime {uptime}" ) @@ -289,6 +321,104 @@ def handle_p60_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): f"Charge Current {batt_charge_current} | Discharge Current {batt_discharge_current}" ) pw.dlog(batt_info) + pw.dlog( + "P60 Dock Dev Types: 0:FRAM|1:ADC|2:ADC|3:ADC|4:TempSens|5:RTC|" + "6:TempSens(BatPack)|7:TempSens(BatPack)" + ) + dev_parser.print(pw=pw) printer.print_validity_buffer( validity_buffer=hk_data[current_idx:], num_vars=27 ) + + +def gen_six_entry_u16_list(hk_data: bytes, current_idx: int) -> Tuple[int, List[int]]: + u16_list = [] + for idx in range(6): + u16_list.append(struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0]) + current_idx += 2 + return current_idx, u16_list + + +def handle_acu_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): + pw = PrintWrapper(printer=printer) + if set_id == SetIds.ACU_CORE: + mppt_mode = hk_data[0] + current_idx = 1 + current_idx, currents = gen_six_entry_u16_list( + hk_data=hk_data, current_idx=current_idx + ) + current_idx, voltages = gen_six_entry_u16_list( + hk_data=hk_data, current_idx=current_idx + ) + vcc = struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0] + current_idx += 2 + vbat = struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0] + current_idx += 2 + current_idx, vboosts = gen_six_entry_u16_list( + hk_data=hk_data, current_idx=current_idx + ) + current_idx, powers = gen_six_entry_u16_list( + hk_data=hk_data, current_idx=current_idx + ) + fmt_str = "!HHHIIHH" + inc_len = struct.calcsize(fmt_str) + (tmp0, tmp1, tmp2, bootcnt, uptime, mppt_time, mppt_period) = struct.unpack( + fmt_str, hk_data[current_idx : current_idx + inc_len] + ) + current_idx += inc_len + pw.dlog("Received ACU Core HK. Voltages in mV, currents in mA") + pw.dlog(f"VCC {vcc} mV | VBAT {vbat} mV | MPPT Mode {mppt_mode}") + header_str = ( + f"Channel | Input U [mV] | Input I [mA] | U Boost [mV] | Power [mW]" + ) + pw.dlog(header_str) + for i in range(6): + pw.dlog( + f"{i} | {str(voltages[i]).ljust(4)} | {str(currents[i]).ljust(4)} | " + f"{str(vboosts[i]).ljust(4)} | {str(powers[i]).ljust(2)}" + ) + pw.dlog( + f"Temperatures in C: Ch0 {tmp0/10.0} | Ch1 {tmp1/10.0} | Ch2 {tmp2/10.0}" + ) + pw.dlog( + f"Boot Count {bootcnt} | Uptime {uptime} sec | " + f"MPPT Time {mppt_time} msec | MPPT Period {mppt_period} msec" + ) + printer.print_validity_buffer( + validity_buffer=hk_data[current_idx:], num_vars=12 + ) + if set_id == SetIds.ACU_AUX: + current_idx = 0 + fmt_str = "!BBB" + inc_len = struct.calcsize(fmt_str) + enb_tuple = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len]) + (dac_enb0, dac_enb1, dac_enb2) = enb_tuple + dac_enb_str = ["on" if entry == 1 else "off" for entry in enb_tuple] + current_idx += inc_len + current_idx, dac_channels_raw = gen_six_entry_u16_list( + hk_data=hk_data, current_idx=current_idx + ) + fmt_str = "!IHII" + inc_len = struct.calcsize(fmt_str) + (boot_cause, reset_cause, wdt_cnt_gnd, wdt_gnd_time_left) = struct.unpack( + fmt_str, hk_data[current_idx : current_idx + inc_len] + ) + current_idx += inc_len + dev_parser = DevicesInfoParser() + current_idx = dev_parser.parse(hk_data=hk_data, current_idx=current_idx) + pw.dlog("Received ACU Aux HK. Voltages in mV, currents in mA") + + pw.dlog( + f"DAC Enable States: DAC 0 {dac_enb_str[0]} | DAC 1 {dac_enb_str[1]} | DAC 2 {dac_enb_str[2]}" + ) + pw.dlog(f"Boot Cause {boot_cause} | Reset Cause {reset_cause}") + pw.dlog( + f"Ground WDT: Reboot Count {wdt_cnt_gnd} | Time Left {wdt_gnd_time_left} sec" + ) + + pw.dlog( + f"ACU Dev Types: 0:FRAM|1:ADC|2:ADC|3:DAC|4:DAC|" + f"5:DAC|6:TempSens|7:Reserved" + ) + dev_parser.print(pw=pw) + printer.print_validity_buffer(validity_buffer=hk_data[current_idx:], num_vars=8) diff --git a/pus_tm/hk_handling.py b/pus_tm/hk_handling.py index 02f3866..427017c 100644 --- a/pus_tm/hk_handling.py +++ b/pus_tm/hk_handling.py @@ -16,7 +16,7 @@ from pus_tm.devs.bpx_bat import handle_bpx_hk_data from pus_tm.devs.gps import handle_gps_data from pus_tm.devs.gyros import handle_gyros_hk_data from pus_tm.devs.imtq_mgt import handle_self_test_data -from pus_tm.devs.pcdu import handle_pdu_data, handle_p60_hk_data +from pus_tm.devs.pcdu import handle_pdu_data, handle_p60_hk_data, handle_acu_hk_data from pus_tm.devs.syrlinks import handle_syrlinks_hk_data from pus_tc.devs.imtq import ImtqSetIds from pus_tm.devs.reaction_wheels import handle_rw_hk_data @@ -49,12 +49,17 @@ def handle_hk_packet( set_id=tm_packet.set_id, hk_data=hk_data, ) - handle_regular_hk_print( - printer=printer, - object_id=named_obj_id, - hk_packet=tm_packet, - hk_data=hk_data, - ) + try: + handle_regular_hk_print( + printer=printer, + object_id=named_obj_id, + hk_packet=tm_packet, + hk_data=hk_data, + ) + except ValueError as e: + LOGGER.exception( + f"{e} error when parsing HK data coming from {named_obj_id}" + ) if tm_packet.subservice == 10 or tm_packet.subservice == 12: LOGGER.warning("HK definitions printout not implemented yet") @@ -93,6 +98,8 @@ def handle_regular_hk_print( return handle_pdu_data( printer=printer, pdu_idx=2, set_id=set_id, hk_data=hk_data ) + if objb == obj_ids.ACU_HANDLER_ID: + return handle_acu_hk_data(printer=printer, hk_data=hk_data, set_id=set_id) if objb == obj_ids.RAD_SENSOR_ID: return handle_rad_sensor_data(printer=printer, hk_data=hk_data, set_id=set_id) if objb in [obj_ids.RW1_ID, obj_ids.RW2_ID, obj_ids.RW3_ID, obj_ids.RW4_ID]: