From ea1e440407ed69a079153387e3ed7b0ba0a9abd2 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Mon, 23 May 2022 11:24:55 +0200 Subject: [PATCH 1/7] start acu hk handling --- gomspace/gomspace_common.py | 24 +-- pus_tc/cmd_definitions.py | 283 ---------------------------------- pus_tc/devs/acu.py | 210 +++++++++++++++---------- pus_tc/devs/p60dock.py | 37 ++--- pus_tc/devs/pcdu.py | 295 ++++++++++++++++++++++++++++++++++++ pus_tc/devs/pdu1.py | 52 +++---- pus_tc/devs/pdu2.py | 54 +++---- pus_tc/tc_packer_hook.py | 14 +- pus_tm/devs/pcdu.py | 10 ++ 9 files changed, 529 insertions(+), 450 deletions(-) create mode 100644 pus_tc/devs/pcdu.py diff --git a/gomspace/gomspace_common.py b/gomspace/gomspace_common.py index aff5d20..5fe1cb9 100644 --- a/gomspace/gomspace_common.py +++ b/gomspace/gomspace_common.py @@ -10,6 +10,7 @@ import enum from tmtccmd.tc.pus_8_funccmd import generate_action_command from tmtccmd.tc.definitions import PusTelecommand +from tmtccmd.utility import ObjectId class GomspaceDeviceActionIds(enum.IntEnum): @@ -43,7 +44,8 @@ class SetIds: PDU_2_AUX = 4 P60_CORE = 5 P60_AUX = 6 - ACU = 7 + ACU_CORE = 7 + ACU_AUX = 8 class TableIds: @@ -67,7 +69,7 @@ class Channel: def pack_get_param_command( - object_id: bytearray, table_id: int, memory_address: bytearray, parameter_size: int + object_id: bytes, table_id: int, memory_address: bytearray, parameter_size: int ) -> PusTelecommand: """Function to generate a command to retrieve parameters like the temperature from a gomspace device. @param object_id: The object id of the gomspace device handler. @@ -88,7 +90,7 @@ def pack_get_param_command( def pack_set_param_command( - object_id: bytearray, + object_id: bytes, memory_address: bytearray, parameter_size: int, parameter: int, @@ -128,7 +130,7 @@ def pack_set_param_command( ) -def pack_ping_command(object_id: bytearray, data: bytearray) -> PusTelecommand: +def pack_ping_command(object_id: ObjectId, data: bytearray) -> PusTelecommand: """ " Function to generate the command to ping a gomspace device @param object_id Object Id of the gomspace device handler. @param data Bytearray containing the bytes to send to the gomspace device. For now the on board software @@ -137,33 +139,33 @@ def pack_ping_command(object_id: bytearray, data: bytearray) -> PusTelecommand: data are simply copied by the device and then sent back. """ return generate_action_command( - object_id=object_id, action_id=GomspaceDeviceActionIds.PING, app_data=data + object_id=object_id.as_bytes, action_id=GomspaceDeviceActionIds.PING, app_data=data ) -def pack_gnd_wdt_reset_command(object_id: bytearray) -> PusTelecommand: +def pack_gnd_wdt_reset_command(object_id: ObjectId) -> PusTelecommand: """ " Function to generate the command to reset the watchdog of a gomspace device. @param object_id Object Id of the gomspace device handler. """ return generate_action_command( - object_id=object_id, action_id=GomspaceDeviceActionIds.WDT_RESET + object_id=object_id.as_bytes, action_id=GomspaceDeviceActionIds.WDT_RESET ) -def pack_reboot_command(object_id: bytearray) -> PusTelecommand: +def pack_reboot_command(object_id: ObjectId) -> PusTelecommand: """Function to generate the command which triggers a reboot of a gomspace device @param object_id The object id of the gomspace device handler. """ return generate_action_command( - object_id=object_id, action_id=GomspaceDeviceActionIds.REBOOT + object_id=object_id.as_bytes, action_id=GomspaceDeviceActionIds.REBOOT ) -def pack_request_full_hk_table_command(object_id: bytearray) -> PusTelecommand: +def pack_request_full_hk_table_command(object_id: ObjectId) -> PusTelecommand: """Function to generate the command to request the full housekeeping table from a gomspace device. @param object_id The object id of the gomspace device handler. """ return generate_action_command( - object_id=object_id, action_id=GomspaceDeviceActionIds.REQUEST_HK_TABLE + object_id=object_id.as_bytes, action_id=GomspaceDeviceActionIds.REQUEST_HK_TABLE ) diff --git a/pus_tc/cmd_definitions.py b/pus_tc/cmd_definitions.py index 4d5920a..5aa0075 100644 --- a/pus_tc/cmd_definitions.py +++ b/pus_tc/cmd_definitions.py @@ -476,289 +476,6 @@ def add_time_cmds(cmd_dict: ServiceOpCodeDictT): ) -def add_pcdu_cmds(cmd_dict: ServiceOpCodeDictT): - from pus_tc.devs.p60dock import P60OpCodes, GomspaceOpCodes, Info - from pus_tc.devs.pdu1 import Pdu1OpCodes - from pus_tc.devs.pdu2 import Pdu2OpCodes - from gomspace.gomspace_common import Info as GsInfo - - op_code_dict = dict() - add_op_code_entry( - op_code_dict=op_code_dict, - keys=P60OpCodes.STACK_3V3_ON, - info=Info.STACK_3V3_ON, - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=P60OpCodes.STACK_3V3_OFF, - info=Info.STACK_3V3_OFF, - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=P60OpCodes.STACK_5V_ON, - info=Info.STACK_5V_ON, - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=P60OpCodes.STACK_5V_OFF, - info=Info.STACK_5V_OFF, - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=GomspaceOpCodes.REQUEST_CORE_HK_ONCE, - info=GsInfo.REQUEST_CORE_HK_ONCE, - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=GomspaceOpCodes.REQUEST_AUX_HK_ONCE, - info=GsInfo.REQUEST_AUX_HK_ONCE, - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=GomspaceOpCodes.PRINT_SWITCH_V_I, - info="P60 Dock: Print Switches, Voltages, Currents", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=GomspaceOpCodes.PRINT_LATCHUPS, - info="P60 Dock: Print Latchups", - ) - add_op_code_entry(op_code_dict=op_code_dict, keys=P60OpCodes.TEST, info="P60 Tests") - add_service_op_code_entry( - srv_op_code_dict=cmd_dict, - name=CustomServiceList.P60DOCK.value, - info="P60 Device", - op_code_entry=op_code_dict, - ) - - op_code_dict = dict() - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu1OpCodes.TCS_BOARD_ON.value, - info="PDU1: Turn TCS board on", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu1OpCodes.TCS_BOARD_OFF.value, - info="PDU1: Turn TCS board off", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu1OpCodes.STAR_TRACKER_ON.value, - info="PDU1: Turn star tracker on", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu1OpCodes.STAR_TRACKER_OFF.value, - info="PDU1: Turn star tracker off", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu1OpCodes.SUS_NOMINAL_ON.value, - info="PDU1: Turn SUS nominal on", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu1OpCodes.SUS_NOMINAL_OFF.value, - info="PDU1: Turn SUS nominal off", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu1OpCodes.ACS_A_SIDE_ON.value, - info="PDU1: Turn ACS A side on", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu1OpCodes.ACS_A_SIDE_OFF.value, - info="PDU1: Turn ACS A side off", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu1OpCodes.SYRLINKS_ON.value, - info="PDU1: Turn Syrlinks on", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu1OpCodes.SYRLINKS_OFF.value, - info="PDU1: Turn Syrlinks off", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu1OpCodes.MGT_ON.value, - info="PDU1: Turn MGT on", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu1OpCodes.MGT_OFF.value, - info="PDU1: Turn MGT off", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu1OpCodes.PLOC_ON.value, - info="PDU1: Turn PLOC on", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu1OpCodes.PLOC_OFF.value, - info="PDU1: Turn PLOC off", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu1OpCodes.SCEX_ON.value, - info="PDU1: Turn Solar Cell Experiment on", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu1OpCodes.SCEX_OFF.value, - info="PDU1: Turn Solar Cell Experiment off", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=GomspaceOpCodes.REQUEST_CORE_HK_ONCE, - info=GsInfo.REQUEST_CORE_HK_ONCE, - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=GomspaceOpCodes.REQUEST_AUX_HK_ONCE, - info=GsInfo.REQUEST_AUX_HK_ONCE, - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=GomspaceOpCodes.PRINT_SWITCH_V_I, - info="PDU1: Print Switches, Voltages, Currents", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=GomspaceOpCodes.PRINT_LATCHUPS, - info="PDU1: Print Latchups", - ) - add_op_code_entry( - op_code_dict=op_code_dict, keys=Pdu1OpCodes.TESTS.value, info="PDU1 Tests" - ) - add_service_op_code_entry( - srv_op_code_dict=cmd_dict, - name=CustomServiceList.PDU1.value, - info="PDU1 Device", - op_code_entry=op_code_dict, - ) - - op_code_dict = dict() - add_op_code_entry(op_code_dict=op_code_dict, keys="0", info="PDU2 Tests") - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu2OpCodes.ACS_SIDE_B_ON.value, - info="PDU2: Turn ACS Side B on", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu2OpCodes.ACS_SIDE_B_OFF.value, - info="PDU2: Turn ACS Side B off", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu2OpCodes.SUS_REDUNDANT_ON.value, - info="PDU2: Turn SUS redundant on", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu2OpCodes.SUS_REDUNDANT_OFF.value, - info="PDU2: Turn SUS redundant off", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu2OpCodes.RW_ON.value, - info="PDU2: Turn reaction wheels on", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu2OpCodes.RW_OFF.value, - info="PDU2: Turn reaction wheels off", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu2OpCodes.PL_PCDU_VBAT_NOM_ON.value, - info="PDU2: PL PCDU Switch Channel Nominal (1) on", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu2OpCodes.PL_PCDU_VBAT_NOM_OFF.value, - info="PDU2: PL PCDU Switch Channel Nominal (1) off", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu2OpCodes.PL_PCDU_VBAT_RED_ON.value, - info="PDU2: PL PCDU Switch Channel Redundant (1) on", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu2OpCodes.PL_PCDU_VBAT_RED_OFF.value, - info="PDU2: PL PCDU Switch Channel Redundant (1) off", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu2OpCodes.TCS_HEATER_IN_ON.value, - info="PDU2: Switch TCS Heater Input on", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu2OpCodes.TCS_HEATER_IN_OFF.value, - info="PDU2: Switch TCS Heater Input off", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu2OpCodes.SOLAR_ARRAY_DEPL_ON.value, - info="PDU2: Switch Solar Array Deployment On", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu2OpCodes.SOLAR_ARRAY_DEPL_OFF.value, - info="PDU2: Switch Solar Array Deployment Off", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu2OpCodes.PL_CAMERA_ON.value, - info="PDU2: Turn payload camera on", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu2OpCodes.PL_CAMERA_OFF.value, - info="PDU2: Turn payload camera off", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=GomspaceOpCodes.REQUEST_CORE_HK_ONCE, - info=GsInfo.REQUEST_CORE_HK_ONCE, - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=GomspaceOpCodes.REQUEST_AUX_HK_ONCE, - info=GsInfo.REQUEST_AUX_HK_ONCE, - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=GomspaceOpCodes.PRINT_SWITCH_V_I, - info="PDU2: Print Switches, Voltages, Currents", - options={OpCodeDictKeys.TIMEOUT: 2.0}, - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=GomspaceOpCodes.PRINT_LATCHUPS, - info="PDU2: Print Latchups", - ) - add_service_op_code_entry( - srv_op_code_dict=cmd_dict, - name="pdu2", - info="PDU2 Device", - op_code_entry=op_code_dict, - ) - op_code_dict = { - "0": ("ACU: Tests", {OpCodeDictKeys.TIMEOUT: 2.0}), - "51": ("ACU: Print channel statistics", {OpCodeDictKeys.TIMEOUT: 2.0}), - } - service_tuple = ("ACU Devices", op_code_dict) - cmd_dict[CustomServiceList.ACU.value] = service_tuple - - def add_imtq_cmds(cmd_dict: ServiceOpCodeDictT): op_code_dict_srv_imtq = { "0": ("IMTQ Tests All", {OpCodeDictKeys.TIMEOUT: 2.0}), diff --git a/pus_tc/devs/acu.py b/pus_tc/devs/acu.py index f08494d..4379dca 100644 --- a/pus_tc/devs/acu.py +++ b/pus_tc/devs/acu.py @@ -1,15 +1,101 @@ # -*- coding: utf-8 -*- -""" -@file tmtcc_tc_acu.py -@brief ACU tests -@author J. Meier +"""ACU commands +@author J. Meier, R. Mueller @date 21.12.2020 """ -import struct +from config.definitions import CustomServiceList +from tmtccmd.config import add_op_code_entry, add_service_op_code_entry from tmtccmd.tc.packer import TcQueueT -from tmtccmd.config.definitions import QueueCommands -from gomspace.gomspace_common import * +from tmtccmd.config.definitions import QueueCommands, ServiceOpCodeDictT +from tmtccmd.tc.pus_3_fsfw_hk import make_sid, generate_one_diag_command, generate_one_hk_command +import gomspace.gomspace_common as gs +from gomspace.gomspace_common import GomspaceOpCodes +from gomspace.gomspace_common import Info as GsInfo +from config.object_ids import ACU_HANDLER_ID from pus_tc.devs.p60dock import P60DockConfigTable +from tmtccmd.tc.pus_8_funccmd import generate_action_command +from tmtccmd.utility import ObjectId + + +class ACUConfigTable: + mppt_mode = gs.TableEntry(bytearray([0x00, 0x00]), gs.TableEntry.uint8_size) + mppt_d_mode = gs.TableEntry(bytearray([0x00, 0x01]), gs.TableEntry.uint8_size) + vboost = gs.TableEntry(bytearray([0x00, 0x02]), gs.TableEntry.uint16_size) + vbat_max_hi = gs.TableEntry(bytearray([0x00, 0x10]), gs.TableEntry.uint16_size) + vbat_max_lo = gs.TableEntry(bytearray([0x00, 0x12]), gs.TableEntry.uint16_size) + ov_mode = gs.TableEntry(bytearray([0x00, 0x1A]), gs.TableEntry.uint8_size) + + +class ACUHkTable: + temperature1 = gs.TableEntry(bytearray([0x00, 0x1C]), gs.TableEntry.uint16_size) + temperature2 = gs.TableEntry(bytearray([0x00, 0x1D]), gs.TableEntry.uint16_size) + temperature3 = gs.TableEntry(bytearray([0x00, 0x1E]), gs.TableEntry.uint16_size) + # Ground WDT value (remaining seconds until reboot) + wdt_gnd_left = gs.TableEntry(bytearray([0x00, 0x74]), gs.TableEntry.uint32_size) + + +class OpCodes: + TEST = ["0", "test"] + + +class Info: + TEST = "ACU Test" + + +def add_acu_cmds(cmd_dict: ServiceOpCodeDictT): + op_code_dict = dict() + add_op_code_entry( + op_code_dict=op_code_dict, + keys=GomspaceOpCodes.REQUEST_CORE_HK_ONCE, + info=GsInfo.REQUEST_CORE_HK_ONCE + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=GomspaceOpCodes.REQUEST_AUX_HK_ONCE, + info=GsInfo.REQUEST_AUX_HK_ONCE + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=GomspaceOpCodes.REQUEST_AUX_HK_ONCE, + info=GsInfo.REQUEST_AUX_HK_ONCE + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=OpCodes.TEST, + info=Info.TEST + ) + add_service_op_code_entry( + srv_op_code_dict=cmd_dict, + op_code_entry=op_code_dict, + name=CustomServiceList.ACU.value, + info="ACU Device" + ) + + +def pack_acu_commands( + object_id: ObjectId, tc_queue: TcQueueT, op_code: str +) -> TcQueueT: + tc_queue.appendleft((QueueCommands.PRINT, "Handling ACU command")) + if op_code in GomspaceOpCodes.PRINT_SWITCH_V_I: + tc_queue.appendleft((QueueCommands.PRINT, "ACU: Print channel stats")) + command = generate_action_command( + object_id=object_id.as_bytes, + action_id=gs.GomspaceDeviceActionIds.PRINT_SWITCH_V_I + ) + tc_queue.appendleft(command.pack_command_tuple()) + if op_code in GomspaceOpCodes.REQUEST_CORE_HK_ONCE: + tc_queue.appendleft((QueueCommands.PRINT, f"PDU1: {GsInfo.REQUEST_CORE_HK_ONCE}")) + hk_sid = make_sid(object_id=object_id.as_bytes, set_id=gs.SetIds.ACU_CORE) + command = generate_one_diag_command(sid=hk_sid, ssc=0) + tc_queue.appendleft(command.pack_command_tuple()) + if op_code in GomspaceOpCodes.REQUEST_AUX_HK_ONCE: + tc_queue.appendleft((QueueCommands.PRINT, f"PDU1: {GsInfo.REQUEST_AUX_HK_ONCE}")) + hk_sid = make_sid(object_id=object_id.as_bytes, set_id=gs.SetIds.ACU_AUX) + command = generate_one_hk_command(sid=hk_sid, ssc=0) + tc_queue.appendleft(command.pack_command_tuple()) + pack_test_cmds(object_id=object_id, tc_queue=tc_queue) + + return tc_queue class ACUTestProcedure: @@ -26,59 +112,28 @@ class ACUTestProcedure: ping = False read_temperature1 = False read_temperature2 = False - read_temperature3 = True - read_mppt_mode = True - read_vboost = True - read_vbat_max_hi = True - read_vbat_max_lo = True - read_ov_mode = True + read_temperature3 = False + read_mppt_mode = False + read_vboost = False + read_vbat_max_hi = False + read_vbat_max_lo = False + read_ov_mode = False + off = False -class ACUConfigTable: - mppt_mode = TableEntry(bytearray([0x00, 0x00]), TableEntry.uint8_size) - mppt_d_mode = TableEntry(bytearray([0x00, 0x01]), TableEntry.uint8_size) - vboost = TableEntry(bytearray([0x00, 0x02]), TableEntry.uint16_size) - vbat_max_hi = TableEntry(bytearray([0x00, 0x10]), TableEntry.uint16_size) - vbat_max_lo = TableEntry(bytearray([0x00, 0x12]), TableEntry.uint16_size) - ov_mode = TableEntry(bytearray([0x00, 0x1A]), TableEntry.uint8_size) - - -class ACUHkTable: - temperature1 = TableEntry(bytearray([0x00, 0x1C]), TableEntry.uint16_size) - temperature2 = TableEntry(bytearray([0x00, 0x1D]), TableEntry.uint16_size) - temperature3 = TableEntry(bytearray([0x00, 0x1E]), TableEntry.uint16_size) - # Ground WDT value (remaining seconds until reboot) - wdt_gnd_left = TableEntry(bytearray([0x00, 0x74]), TableEntry.uint32_size) - - -class CommandId: - PRINT_CHANNEL_STATS = 51 - - -def pack_acu_test_into( - object_id: bytearray, tc_queue: TcQueueT, op_code: str -) -> TcQueueT: - tc_queue.appendleft((QueueCommands.PRINT, "Testing ACU")) - - if op_code == "51": - tc_queue.appendleft((QueueCommands.PRINT, "ACU: Print channel stats")) - command = object_id + struct.pack("!I", CommandId.PRINT_CHANNEL_STATS) - command = PusTelecommand(service=8, subservice=128, ssc=30, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) - return - +def pack_test_cmds(object_id: ObjectId, tc_queue: TcQueueT): if ACUTestProcedure.all or ACUTestProcedure.reboot: tc_queue.appendleft((QueueCommands.PRINT, "ACU: Reboot")) - command = pack_reboot_command(object_id) + command = gs.pack_reboot_command(object_id) # command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) if ACUTestProcedure.all or ACUTestProcedure.read_gnd_wdt: tc_queue.appendleft( (QueueCommands.PRINT, "ACU: Reading ground watchdog timer value") ) - command = pack_get_param_command( - object_id, - TableIds.hk, + command = gs.pack_get_param_command( + object_id.as_bytes, + gs.TableIds.hk, ACUHkTable.wdt_gnd_left.parameter_address, ACUHkTable.wdt_gnd_left.parameter_size, ) @@ -86,20 +141,20 @@ def pack_acu_test_into( tc_queue.appendleft(command.pack_command_tuple()) if ACUTestProcedure.all or ACUTestProcedure.gnd_wdt_reset: tc_queue.appendleft((QueueCommands.PRINT, "ACU: Testing ground watchdog reset")) - command = pack_gnd_wdt_reset_command(object_id) + command = gs.pack_gnd_wdt_reset_command(object_id) # command = PusTelecommand(service=8, subservice=128, ssc=21, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) if ACUTestProcedure.all or ACUTestProcedure.ping: tc_queue.appendleft((QueueCommands.PRINT, "ACU: Ping Test")) ping_data = bytearray([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]) - command = pack_ping_command(object_id, ping_data) + command = gs.pack_ping_command(object_id, ping_data) # command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) if ACUTestProcedure.all or ACUTestProcedure.read_temperature3: tc_queue.appendleft((QueueCommands.PRINT, "ACU: Reading temperature 3")) - command = pack_get_param_command( - object_id, - TableIds.hk, + command = gs.pack_get_param_command( + object_id.as_bytes, + gs.TableIds.hk, ACUHkTable.temperature3.parameter_address, ACUHkTable.temperature3.parameter_size, ) @@ -107,9 +162,9 @@ def pack_acu_test_into( tc_queue.appendleft(command.pack_command_tuple()) if ACUTestProcedure.all or ACUTestProcedure.read_vboost: tc_queue.appendleft((QueueCommands.PRINT, "ACU: Reading vboost value")) - command = pack_get_param_command( - object_id, - TableIds.config, + command = gs.pack_get_param_command( + object_id.as_bytes, + gs.TableIds.config, ACUConfigTable.vboost.parameter_address, ACUConfigTable.vboost.parameter_size, ) @@ -117,9 +172,9 @@ def pack_acu_test_into( tc_queue.appendleft(command.pack_command_tuple()) if ACUTestProcedure.all or ACUTestProcedure.read_vbat_max_hi: tc_queue.appendleft((QueueCommands.PRINT, "ACU: Reading vbat_max_hi")) - command = pack_get_param_command( - object_id, - TableIds.config, + command = gs.pack_get_param_command( + object_id.as_bytes, + gs.TableIds.config, ACUConfigTable.vbat_max_hi.parameter_address, ACUConfigTable.vbat_max_hi.parameter_size, ) @@ -127,9 +182,9 @@ def pack_acu_test_into( tc_queue.appendleft(command.pack_command_tuple()) if ACUTestProcedure.all or ACUTestProcedure.read_vbat_max_lo: tc_queue.appendleft((QueueCommands.PRINT, "ACU: Reading vbat_max_lo")) - command = pack_get_param_command( - object_id, - TableIds.config, + command = gs.pack_get_param_command( + object_id.as_bytes, + gs.TableIds.config, ACUConfigTable.vbat_max_lo.parameter_address, ACUConfigTable.vbat_max_lo.parameter_size, ) @@ -137,23 +192,20 @@ def pack_acu_test_into( tc_queue.appendleft(command.pack_command_tuple()) if ACUTestProcedure.all or ACUTestProcedure.read_ov_mode: tc_queue.appendleft((QueueCommands.PRINT, "ACU: Reading ov_mode")) - command = pack_get_param_command( - object_id, - TableIds.config, + command = gs.pack_get_param_command( + object_id.as_bytes, + gs.TableIds.config, ACUConfigTable.ov_mode.parameter_address, ACUConfigTable.ov_mode.parameter_size, ) # command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) - - tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Turning off ACU")) - command = pack_set_param_command( - p60dock_object_id, - P60DockConfigTable.out_en_0.parameter_address, - P60DockConfigTable.out_en_0.parameter_size, - Channel.off, - ) - # command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command) - tc_queue.appendleft(command.pack_command_tuple()) - - return tc_queue + if ACUTestProcedure.all or ACUTestProcedure.off: + tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Turning off ACU")) + command = gs.pack_set_param_command( + ACU_HANDLER_ID, + P60DockConfigTable.out_en_0.parameter_address, + P60DockConfigTable.out_en_0.parameter_size, + gs.Channel.off, + ) + tc_queue.appendleft(command.pack_command_tuple()) \ No newline at end of file diff --git a/pus_tc/devs/p60dock.py b/pus_tc/devs/p60dock.py index be71cac..c0f395a 100644 --- a/pus_tc/devs/p60dock.py +++ b/pus_tc/devs/p60dock.py @@ -87,11 +87,12 @@ class P60DockHkTable: wdt_gnd_left = TableEntry(bytearray([0x00, 0xA8]), TableEntry.uint32_size) -def pack_p60dock_cmds(object_id: bytearray, tc_queue: TcQueueT, op_code: str): +def pack_p60dock_cmds(object_id: ObjectId, tc_queue: TcQueueT, op_code: str): + objb = object_id.as_bytes if op_code in P60OpCodes.STACK_3V3_ON: tc_queue.appendleft((QueueCommands.PRINT, Info.STACK_3V3_ON)) command = pack_set_param_command( - object_id, + objb, P60DockConfigTable.out_en_9.parameter_address, P60DockConfigTable.out_en_9.parameter_size, Channel.on, @@ -100,7 +101,7 @@ def pack_p60dock_cmds(object_id: bytearray, tc_queue: TcQueueT, op_code: str): if op_code in P60OpCodes.STACK_3V3_OFF: tc_queue.appendleft((QueueCommands.PRINT, Info.STACK_3V3_OFF)) command = pack_set_param_command( - object_id, + objb, P60DockConfigTable.out_en_9.parameter_address, P60DockConfigTable.out_en_9.parameter_size, Channel.off, @@ -109,7 +110,7 @@ def pack_p60dock_cmds(object_id: bytearray, tc_queue: TcQueueT, op_code: str): if op_code in P60OpCodes.STACK_5V_ON: tc_queue.appendleft((QueueCommands.PRINT, Info.STACK_5V_ON)) command = pack_set_param_command( - object_id, + objb, P60DockConfigTable.out_en_10.parameter_address, P60DockConfigTable.out_en_10.parameter_size, Channel.on, @@ -118,7 +119,7 @@ def pack_p60dock_cmds(object_id: bytearray, tc_queue: TcQueueT, op_code: str): if op_code in P60OpCodes.STACK_5V_OFF: tc_queue.appendleft((QueueCommands.PRINT, Info.STACK_5V_OFF)) command = pack_set_param_command( - object_id, + objb, P60DockConfigTable.out_en_10.parameter_address, P60DockConfigTable.out_en_10.parameter_size, Channel.off, @@ -143,13 +144,13 @@ def pack_p60dock_cmds(object_id: bytearray, tc_queue: TcQueueT, op_code: str): (QueueCommands.PRINT, "P60 Dock: Print Switches, Voltages, Currents") ) command = generate_action_command( - object_id=object_id, action_id=GomspaceDeviceActionIds.PRINT_SWITCH_V_I + object_id=objb, action_id=GomspaceDeviceActionIds.PRINT_SWITCH_V_I ) tc_queue.appendleft(command.pack_command_tuple()) if op_code in GomspaceOpCodes.PRINT_LATCHUPS: tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Print Latchups")) command = generate_action_command( - object_id=object_id, action_id=GomspaceDeviceActionIds.PRINT_LATCHUPS + object_id=objb, action_id=GomspaceDeviceActionIds.PRINT_LATCHUPS ) tc_queue.appendleft(command.pack_command_tuple()) if P60DockTestProcedure.all or P60DockTestProcedure.reboot: @@ -162,7 +163,7 @@ def pack_p60dock_cmds(object_id: bytearray, tc_queue: TcQueueT, op_code: str): (QueueCommands.PRINT, "P60 Dock: Reading ground watchdog timer value") ) command = pack_get_param_command( - object_id, + objb, TableIds.hk, P60DockHkTable.wdt_gnd_left.parameter_address, P60DockHkTable.wdt_gnd_left.parameter_size, @@ -188,7 +189,7 @@ def pack_p60dock_cmds(object_id: bytearray, tc_queue: TcQueueT, op_code: str): ) parameter = 0 # set channel off command = pack_set_param_command( - object_id, + objb, P60DockConfigTable.out_en_3.parameter_address, P60DockConfigTable.out_en_3.parameter_size, parameter, @@ -200,7 +201,7 @@ def pack_p60dock_cmds(object_id: bytearray, tc_queue: TcQueueT, op_code: str): (QueueCommands.PRINT, "P60 Dock: Testing temperature reading") ) command = pack_get_param_command( - object_id, + objb, TableIds.hk, P60DockHkTable.temperature1.parameter_address, P60DockHkTable.temperature1.parameter_size, @@ -212,7 +213,7 @@ def pack_p60dock_cmds(object_id: bytearray, tc_queue: TcQueueT, op_code: str): (QueueCommands.PRINT, "P60 Dock: Testing Output Channel 3 state (PDU2)") ) command = pack_get_param_command( - object_id, + objb, TableIds.config, P60DockConfigTable.out_en_3.parameter_address, P60DockConfigTable.out_en_3.parameter_size, @@ -227,7 +228,7 @@ def pack_p60dock_cmds(object_id: bytearray, tc_queue: TcQueueT, op_code: str): ) ) command = pack_get_param_command( - object_id, + objb, TableIds.config, P60DockConfigTable.cur_lu_lim_0.parameter_address, P60DockConfigTable.cur_lu_lim_0.parameter_size, @@ -240,7 +241,7 @@ def pack_p60dock_cmds(object_id: bytearray, tc_queue: TcQueueT, op_code: str): ) parameter = 1 # set channel on command = pack_set_param_command( - object_id, + objb, P60DockConfigTable.out_en_3.parameter_address, P60DockConfigTable.out_en_3.parameter_size, parameter, @@ -253,7 +254,7 @@ def pack_p60dock_cmds(object_id: bytearray, tc_queue: TcQueueT, op_code: str): ) table_id_invalid = 5 command = pack_get_param_command( - object_id, + objb, table_id_invalid, P60DockHkTable.temperature1.parameter_address, P60DockHkTable.temperature1.parameter_size, @@ -269,7 +270,7 @@ def pack_p60dock_cmds(object_id: bytearray, tc_queue: TcQueueT, op_code: str): ) invalid_address = bytearray([0x01, 0xF4]) command = pack_get_param_command( - object_id, + objb, TableIds.hk, invalid_address, P60DockHkTable.temperature1.parameter_size, @@ -286,7 +287,7 @@ def pack_p60dock_cmds(object_id: bytearray, tc_queue: TcQueueT, op_code: str): parameter_size = 2 parameter = 1 command = pack_set_param_command( - object_id, invalid_address, parameter_size, parameter + objb, invalid_address, parameter_size, parameter ) # command = PusTelecommand(service=8, subservice=128, ssc=30, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) @@ -299,7 +300,7 @@ def pack_p60dock_cmds(object_id: bytearray, tc_queue: TcQueueT, op_code: str): ) invalid_size = 5 command = pack_get_param_command( - object_id, + objb, TableIds.hk, P60DockHkTable.temperature1.parameter_address, invalid_size, @@ -314,7 +315,7 @@ def pack_p60dock_cmds(object_id: bytearray, tc_queue: TcQueueT, op_code: str): ) parameter = 1 command = pack_set_param_command( - object_id, + objb, P60DockConfigTable.out_en_3.parameter_address, invalid_size, parameter, diff --git a/pus_tc/devs/pcdu.py b/pus_tc/devs/pcdu.py new file mode 100644 index 0000000..9cab064 --- /dev/null +++ b/pus_tc/devs/pcdu.py @@ -0,0 +1,295 @@ +from config.definitions import CustomServiceList +from tmtccmd.config import ServiceOpCodeDictT, add_op_code_entry, add_service_op_code_entry, \ + OpCodeDictKeys + +from pus_tc.devs.p60dock import P60OpCodes, GomspaceOpCodes, Info +from pus_tc.devs.pdu1 import Pdu1OpCodes +from pus_tc.devs.pdu2 import Pdu2OpCodes +from pus_tc.devs.acu import add_acu_cmds +from gomspace.gomspace_common import Info as GsInfo + + +def add_p60_cmds(cmd_dict: ServiceOpCodeDictT): + op_code_dict = dict() + add_op_code_entry( + op_code_dict=op_code_dict, + keys=P60OpCodes.STACK_3V3_ON, + info=Info.STACK_3V3_ON, + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=P60OpCodes.STACK_3V3_OFF, + info=Info.STACK_3V3_OFF, + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=P60OpCodes.STACK_5V_ON, + info=Info.STACK_5V_ON, + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=P60OpCodes.STACK_5V_OFF, + info=Info.STACK_5V_OFF, + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=GomspaceOpCodes.REQUEST_CORE_HK_ONCE, + info=GsInfo.REQUEST_CORE_HK_ONCE, + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=GomspaceOpCodes.REQUEST_AUX_HK_ONCE, + info=GsInfo.REQUEST_AUX_HK_ONCE, + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=GomspaceOpCodes.PRINT_SWITCH_V_I, + info="P60 Dock: Print Switches, Voltages, Currents", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=GomspaceOpCodes.PRINT_LATCHUPS, + info="P60 Dock: Print Latchups", + ) + add_op_code_entry(op_code_dict=op_code_dict, keys=P60OpCodes.TEST, info="P60 Tests") + add_service_op_code_entry( + srv_op_code_dict=cmd_dict, + name=CustomServiceList.P60DOCK.value, + info="P60 Device", + op_code_entry=op_code_dict, + ) + + +def add_pdu1_cmds(cmd_dict: ServiceOpCodeDictT): + op_code_dict = dict() + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu1OpCodes.TCS_BOARD_OFF.value, + info="PDU1: Turn TCS board off", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu1OpCodes.STAR_TRACKER_ON.value, + info="PDU1: Turn star tracker on", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu1OpCodes.STAR_TRACKER_OFF.value, + info="PDU1: Turn star tracker off", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu1OpCodes.SUS_NOMINAL_ON.value, + info="PDU1: Turn SUS nominal on", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu1OpCodes.SUS_NOMINAL_OFF.value, + info="PDU1: Turn SUS nominal off", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu1OpCodes.ACS_A_SIDE_ON.value, + info="PDU1: Turn ACS A side on", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu1OpCodes.ACS_A_SIDE_OFF.value, + info="PDU1: Turn ACS A side off", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu1OpCodes.SYRLINKS_ON.value, + info="PDU1: Turn Syrlinks on", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu1OpCodes.SYRLINKS_OFF.value, + info="PDU1: Turn Syrlinks off", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu1OpCodes.MGT_ON.value, + info="PDU1: Turn MGT on", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu1OpCodes.MGT_OFF.value, + info="PDU1: Turn MGT off", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu1OpCodes.PLOC_ON.value, + info="PDU1: Turn PLOC on", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu1OpCodes.PLOC_OFF.value, + info="PDU1: Turn PLOC off", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu1OpCodes.SCEX_ON.value, + info="PDU1: Turn Solar Cell Experiment on", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu1OpCodes.SCEX_OFF.value, + info="PDU1: Turn Solar Cell Experiment off", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=GomspaceOpCodes.REQUEST_CORE_HK_ONCE, + info=GsInfo.REQUEST_CORE_HK_ONCE, + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=GomspaceOpCodes.REQUEST_AUX_HK_ONCE, + info=GsInfo.REQUEST_AUX_HK_ONCE, + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=GomspaceOpCodes.PRINT_SWITCH_V_I, + info="PDU1: Print Switches, Voltages, Currents", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu1OpCodes.TCS_BOARD_ON.value, + info="PDU1: Turn TCS board on", + ) + + add_op_code_entry( + op_code_dict=op_code_dict, + keys=GomspaceOpCodes.PRINT_LATCHUPS, + info="PDU1: Print Latchups", + ) + add_op_code_entry( + op_code_dict=op_code_dict, keys=Pdu1OpCodes.TESTS.value, info="PDU1 Tests" + ) + add_service_op_code_entry( + srv_op_code_dict=cmd_dict, + name=CustomServiceList.PDU1.value, + info="PDU1 Device", + op_code_entry=op_code_dict, + ) + + +def add_pdu2_cmds(cmd_dict: ServiceOpCodeDictT): + op_code_dict = dict() + add_op_code_entry(op_code_dict=op_code_dict, keys="0", info="PDU2 Tests") + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu2OpCodes.ACS_SIDE_B_ON.value, + info="PDU2: Turn ACS Side B on", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu2OpCodes.ACS_SIDE_B_OFF.value, + info="PDU2: Turn ACS Side B off", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu2OpCodes.SUS_REDUNDANT_ON.value, + info="PDU2: Turn SUS redundant on", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu2OpCodes.SUS_REDUNDANT_OFF.value, + info="PDU2: Turn SUS redundant off", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu2OpCodes.RW_ON.value, + info="PDU2: Turn reaction wheels on", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu2OpCodes.RW_OFF.value, + info="PDU2: Turn reaction wheels off", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu2OpCodes.PL_PCDU_VBAT_NOM_ON.value, + info="PDU2: PL PCDU Switch Channel Nominal (1) on", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu2OpCodes.PL_PCDU_VBAT_NOM_OFF.value, + info="PDU2: PL PCDU Switch Channel Nominal (1) off", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu2OpCodes.PL_PCDU_VBAT_RED_ON.value, + info="PDU2: PL PCDU Switch Channel Redundant (1) on", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu2OpCodes.PL_PCDU_VBAT_RED_OFF.value, + info="PDU2: PL PCDU Switch Channel Redundant (1) off", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu2OpCodes.TCS_HEATER_IN_ON.value, + info="PDU2: Switch TCS Heater Input on", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu2OpCodes.TCS_HEATER_IN_OFF.value, + info="PDU2: Switch TCS Heater Input off", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu2OpCodes.SOLAR_ARRAY_DEPL_ON.value, + info="PDU2: Switch Solar Array Deployment On", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu2OpCodes.SOLAR_ARRAY_DEPL_OFF.value, + info="PDU2: Switch Solar Array Deployment Off", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu2OpCodes.PL_CAMERA_ON.value, + info="PDU2: Turn payload camera on", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=Pdu2OpCodes.PL_CAMERA_OFF.value, + info="PDU2: Turn payload camera off", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=GomspaceOpCodes.REQUEST_CORE_HK_ONCE, + info=GsInfo.REQUEST_CORE_HK_ONCE, + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=GomspaceOpCodes.REQUEST_AUX_HK_ONCE, + info=GsInfo.REQUEST_AUX_HK_ONCE, + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=GomspaceOpCodes.PRINT_SWITCH_V_I, + info="PDU2: Print Switches, Voltages, Currents", + options={OpCodeDictKeys.TIMEOUT: 2.0}, + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=GomspaceOpCodes.PRINT_LATCHUPS, + info="PDU2: Print Latchups", + ) + add_service_op_code_entry( + srv_op_code_dict=cmd_dict, + name="pdu2", + info="PDU2 Device", + op_code_entry=op_code_dict, + ) + + +def add_pcdu_cmds(cmd_dict: ServiceOpCodeDictT): + add_p60_cmds(cmd_dict) + add_pdu1_cmds(cmd_dict) + add_pdu2_cmds(cmd_dict) + add_acu_cmds(cmd_dict) + + diff --git a/pus_tc/devs/pdu1.py b/pus_tc/devs/pdu1.py index 08ddbd9..04c042e 100644 --- a/pus_tc/devs/pdu1.py +++ b/pus_tc/devs/pdu1.py @@ -54,13 +54,13 @@ class PDU1TestProcedure: turn_channel_3_off = False -def pack_pdu1_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): +def pack_pdu1_commands(object_id: ObjectId, tc_queue: TcQueueT, op_code: str): tc_queue.appendleft((QueueCommands.PRINT, "Commanding PDU1")) - + objb = object_id.as_bytes if op_code == Pdu1OpCodes.TCS_BOARD_ON.value: tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn TCS board on")) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_0.parameter_address, PDUConfigTable.out_en_0.parameter_size, Channel.on, @@ -69,7 +69,7 @@ def pack_pdu1_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): if op_code == Pdu1OpCodes.TCS_BOARD_OFF.value: tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn TCS board off")) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_0.parameter_address, PDUConfigTable.out_en_0.parameter_size, Channel.off, @@ -78,7 +78,7 @@ def pack_pdu1_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): if op_code == Pdu1OpCodes.STAR_TRACKER_ON.value: tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn star tracker on")) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_2.parameter_address, PDUConfigTable.out_en_2.parameter_size, Channel.on, @@ -87,7 +87,7 @@ def pack_pdu1_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): if op_code == Pdu1OpCodes.STAR_TRACKER_OFF.value: tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn star tracker off")) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_2.parameter_address, PDUConfigTable.out_en_2.parameter_size, Channel.off, @@ -96,7 +96,7 @@ def pack_pdu1_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): if op_code == Pdu1OpCodes.SUS_NOMINAL_ON.value: tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn SUS nominal on")) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_4.parameter_address, PDUConfigTable.out_en_4.parameter_size, Channel.on, @@ -105,7 +105,7 @@ def pack_pdu1_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): if op_code == Pdu1OpCodes.SUS_NOMINAL_OFF.value: tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn SUS nominal off")) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_4.parameter_address, PDUConfigTable.out_en_4.parameter_size, Channel.off, @@ -114,7 +114,7 @@ def pack_pdu1_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): if op_code == Pdu1OpCodes.ACS_A_SIDE_ON.value: tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn ACS Side A on")) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_7.parameter_address, PDUConfigTable.out_en_7.parameter_size, Channel.on, @@ -123,7 +123,7 @@ def pack_pdu1_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): if op_code == Pdu1OpCodes.ACS_A_SIDE_OFF.value: tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn ACS Side A off")) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_7.parameter_address, PDUConfigTable.out_en_7.parameter_size, Channel.off, @@ -132,7 +132,7 @@ def pack_pdu1_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): if op_code == Pdu1OpCodes.SUS_NOMINAL_OFF.value: tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn SUS nominal off")) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_4.parameter_address, PDUConfigTable.out_en_4.parameter_size, Channel.off, @@ -143,7 +143,7 @@ def pack_pdu1_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): (QueueCommands.PRINT, "PDU1: Turn Solar Cell Experiment on") ) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_5.parameter_address, PDUConfigTable.out_en_5.parameter_size, Channel.on, @@ -154,7 +154,7 @@ def pack_pdu1_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): (QueueCommands.PRINT, "PDU1: Turn Solar Cell Experiment off") ) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_5.parameter_address, PDUConfigTable.out_en_5.parameter_size, Channel.off, @@ -163,7 +163,7 @@ def pack_pdu1_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): if op_code == Pdu1OpCodes.SYRLINKS_ON.value: tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn Syrlinks on")) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_1.parameter_address, PDUConfigTable.out_en_1.parameter_size, Channel.on, @@ -172,7 +172,7 @@ def pack_pdu1_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): if op_code == Pdu1OpCodes.SYRLINKS_OFF.value: tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn Syrlinks off")) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_1.parameter_address, PDUConfigTable.out_en_1.parameter_size, Channel.off, @@ -181,7 +181,7 @@ def pack_pdu1_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): if op_code == Pdu1OpCodes.MGT_ON.value: tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn MGT on")) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_3.parameter_address, PDUConfigTable.out_en_3.parameter_size, Channel.on, @@ -190,7 +190,7 @@ def pack_pdu1_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): if op_code == Pdu1OpCodes.MGT_OFF.value: tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn MGT off")) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_3.parameter_address, PDUConfigTable.out_en_3.parameter_size, Channel.off, @@ -199,7 +199,7 @@ def pack_pdu1_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): if op_code == Pdu1OpCodes.PLOC_ON.value: tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn PLOC on")) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_6.parameter_address, PDUConfigTable.out_en_6.parameter_size, Channel.on, @@ -208,7 +208,7 @@ def pack_pdu1_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): if op_code == Pdu1OpCodes.PLOC_OFF.value: tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn PLOC off")) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_6.parameter_address, PDUConfigTable.out_en_6.parameter_size, Channel.off, @@ -229,13 +229,13 @@ def pack_pdu1_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): (QueueCommands.PRINT, "PDU1: Print Switches, Voltages, Currents") ) command = generate_action_command( - object_id=object_id, action_id=GomspaceDeviceActionIds.PRINT_SWITCH_V_I + object_id=objb, action_id=GomspaceDeviceActionIds.PRINT_SWITCH_V_I ) tc_queue.appendleft(command.pack_command_tuple()) if op_code in GomspaceOpCodes.PRINT_LATCHUPS: tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Print Latchups")) command = generate_action_command( - object_id=object_id, action_id=GomspaceDeviceActionIds.PRINT_LATCHUPS + object_id=objb, action_id=GomspaceDeviceActionIds.PRINT_LATCHUPS ) tc_queue.appendleft(command.pack_command_tuple()) if PDU1TestProcedure.all or PDU1TestProcedure.ping: @@ -246,7 +246,7 @@ def pack_pdu1_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): if PDU1TestProcedure.all or PDU1TestProcedure.read_temperature: tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Testing temperature reading")) command = pack_get_param_command( - object_id, + objb, TableIds.hk, PDUHkTable.temperature.parameter_address, PDUHkTable.temperature.parameter_size, @@ -257,7 +257,7 @@ def pack_pdu1_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): (QueueCommands.PRINT, "PDU1: Turn channel 2 on (Star Tracker)") ) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_2.parameter_address, PDUConfigTable.out_en_2.parameter_size, Channel.on, @@ -268,7 +268,7 @@ def pack_pdu1_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): (QueueCommands.PRINT, "PDU1: Turn channel 2 off (Star Tracker)") ) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_2.parameter_address, PDUConfigTable.out_en_2.parameter_size, Channel.off, @@ -277,7 +277,7 @@ def pack_pdu1_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): if PDU1TestProcedure.all or PDU1TestProcedure.turn_channel_3_on: tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn channel 3 on (MTQ)")) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_3.parameter_address, PDUConfigTable.out_en_3.parameter_size, Channel.on, @@ -286,7 +286,7 @@ def pack_pdu1_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): if PDU1TestProcedure.all or PDU1TestProcedure.turn_channel_3_off: tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn channel 3 off (MTQ)")) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_3.parameter_address, PDUConfigTable.out_en_3.parameter_size, Channel.off, diff --git a/pus_tc/devs/pdu2.py b/pus_tc/devs/pdu2.py index 12c1636..df4c883 100644 --- a/pus_tc/devs/pdu2.py +++ b/pus_tc/devs/pdu2.py @@ -66,13 +66,13 @@ class PDU2TestProcedure: request_hk_table = False -def pack_pdu2_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): +def pack_pdu2_commands(object_id: ObjectId, tc_queue: TcQueueT, op_code: str): tc_queue.appendleft((QueueCommands.PRINT, "Testing PDU2")) - + objb = object_id.as_bytes if op_code == Pdu2OpCodes.ACS_SIDE_B_ON.value: tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Turn ACS Side B on")) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_7.parameter_address, PDUConfigTable.out_en_7.parameter_size, Channel.on, @@ -82,7 +82,7 @@ def pack_pdu2_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): if op_code == Pdu2OpCodes.ACS_SIDE_B_OFF.value: tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Turn ACS Side B off")) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_7.parameter_address, PDUConfigTable.out_en_7.parameter_size, Channel.off, @@ -92,7 +92,7 @@ def pack_pdu2_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): if op_code == Pdu2OpCodes.Q7S_OFF.value: tc_queue.appendleft((QueueCommands.PRINT, "Turning off Q7S OBC")) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_0.parameter_address, PDUConfigTable.out_en_0.parameter_size, Channel.off, @@ -101,7 +101,7 @@ def pack_pdu2_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): if op_code == Pdu2OpCodes.SUS_REDUNDANT_ON.value: tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Turn SUS redundant on")) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_4.parameter_address, PDUConfigTable.out_en_4.parameter_size, Channel.on, @@ -110,7 +110,7 @@ def pack_pdu2_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): if op_code == Pdu2OpCodes.SUS_REDUNDANT_OFF.value: tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Turn SUS redundant off")) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_4.parameter_address, PDUConfigTable.out_en_4.parameter_size, Channel.off, @@ -119,7 +119,7 @@ def pack_pdu2_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): if op_code == Pdu2OpCodes.RW_ON.value: tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Turn reaction wheels on")) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_2.parameter_address, PDUConfigTable.out_en_2.parameter_size, Channel.on, @@ -128,7 +128,7 @@ def pack_pdu2_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): if op_code == Pdu2OpCodes.RW_OFF.value: tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Turn reaction wheels off")) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_2.parameter_address, PDUConfigTable.out_en_2.parameter_size, Channel.off, @@ -139,7 +139,7 @@ def pack_pdu2_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): (QueueCommands.PRINT, "PDU2: Turn PDU2 PL PCDU Channel 1 on") ) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_1.parameter_address, PDUConfigTable.out_en_1.parameter_size, Channel.on, @@ -150,7 +150,7 @@ def pack_pdu2_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): (QueueCommands.PRINT, "PDU2: Turn PDU2 PL PCDU Channel 1 off") ) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_1.parameter_address, PDUConfigTable.out_en_1.parameter_size, Channel.off, @@ -161,7 +161,7 @@ def pack_pdu2_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): (QueueCommands.PRINT, "PDU2: Turn PDU2 PL PCDU Channel 6 on") ) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_6.parameter_address, PDUConfigTable.out_en_6.parameter_size, Channel.off, @@ -172,7 +172,7 @@ def pack_pdu2_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): (QueueCommands.PRINT, "PDU2: Turn PDU2 PL PCDU Channel 6 off") ) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_6.parameter_address, PDUConfigTable.out_en_6.parameter_size, Channel.off, @@ -181,7 +181,7 @@ def pack_pdu2_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): if op_code == Pdu2OpCodes.TCS_HEATER_IN_ON.value: tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Turn TCS Heater Input on")) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_3.parameter_address, PDUConfigTable.out_en_3.parameter_size, Channel.on, @@ -190,7 +190,7 @@ def pack_pdu2_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): if op_code == Pdu2OpCodes.TCS_HEATER_IN_OFF.value: tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Turn TCS Heater Input off")) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_3.parameter_address, PDUConfigTable.out_en_3.parameter_size, Channel.off, @@ -201,7 +201,7 @@ def pack_pdu2_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): (QueueCommands.PRINT, "PDU2: Turn Solar Array Deployment On") ) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_5.parameter_address, PDUConfigTable.out_en_5.parameter_size, Channel.on, @@ -212,7 +212,7 @@ def pack_pdu2_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): (QueueCommands.PRINT, "PDU2: Turn Solar Array Deployment Off") ) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_5.parameter_address, PDUConfigTable.out_en_5.parameter_size, Channel.off, @@ -221,7 +221,7 @@ def pack_pdu2_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): if op_code == Pdu2OpCodes.PL_CAMERA_ON.value: tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Turn payload camera on")) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_8.parameter_address, PDUConfigTable.out_en_8.parameter_size, Channel.on, @@ -230,7 +230,7 @@ def pack_pdu2_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): if op_code == Pdu2OpCodes.PL_CAMERA_OFF.value: tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Turn payload camera off")) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_8.parameter_address, PDUConfigTable.out_en_8.parameter_size, Channel.off, @@ -251,13 +251,13 @@ def pack_pdu2_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): (QueueCommands.PRINT, "PDU2: Print Switches, Currents, Voltahes") ) command = generate_action_command( - object_id=object_id, action_id=GomspaceDeviceActionIds.PRINT_SWITCH_V_I + object_id=objb, action_id=GomspaceDeviceActionIds.PRINT_SWITCH_V_I ) tc_queue.appendleft(command.pack_command_tuple()) if op_code in GomspaceOpCodes.PRINT_LATCHUPS: tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Print Latchups")) command = generate_action_command( - object_id=object_id, action_id=GomspaceDeviceActionIds.PRINT_LATCHUPS + object_id=objb, action_id=GomspaceDeviceActionIds.PRINT_LATCHUPS ) tc_queue.appendleft(command.pack_command_tuple()) if PDU2TestProcedure.all or PDU2TestProcedure.reboot: @@ -269,7 +269,7 @@ def pack_pdu2_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): (QueueCommands.PRINT, "PDU2: Reading ground watchdog timer value") ) command = pack_get_param_command( - object_id, + objb, TableIds.hk, PDUHkTable.wdt_gnd_left.parameter_address, PDUHkTable.wdt_gnd_left.parameter_size, @@ -294,7 +294,7 @@ def pack_pdu2_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): ) ) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_2.parameter_address, PDUConfigTable.out_en_2.parameter_size, Channel.on, @@ -303,7 +303,7 @@ def pack_pdu2_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): if PDU2TestProcedure.all or PDU2TestProcedure.read_temperature: tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Testing temperature reading")) command = pack_get_param_command( - object_id, + objb, TableIds.hk, PDUHkTable.temperature.parameter_address, PDUHkTable.temperature.parameter_size, @@ -314,7 +314,7 @@ def pack_pdu2_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): (QueueCommands.PRINT, "PDU2: Reading output channel 2 state (TCS Heater)") ) command = pack_get_param_command( - object_id, + objb, TableIds.config, PDUConfigTable.out_en_2.parameter_address, PDUConfigTable.out_en_2.parameter_size, @@ -328,7 +328,7 @@ def pack_pdu2_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): ) ) command = pack_get_param_command( - object_id, + objb, TableIds.config, PDUConfigTable.cur_lu_lim_0.parameter_address, PDUConfigTable.cur_lu_lim_0.parameter_size, @@ -339,7 +339,7 @@ def pack_pdu2_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str): (QueueCommands.PRINT, "PDU2: Testing setting output channel 2 off") ) command = pack_set_param_command( - object_id, + objb, PDUConfigTable.out_en_2.parameter_address, PDUConfigTable.out_en_2.parameter_size, Channel.off, diff --git a/pus_tc/tc_packer_hook.py b/pus_tc/tc_packer_hook.py index c2e3f60..22eb89a 100644 --- a/pus_tc/tc_packer_hook.py +++ b/pus_tc/tc_packer_hook.py @@ -20,7 +20,7 @@ from pus_tc.devs.p60dock import pack_p60dock_cmds from pus_tc.devs.pdu2 import pack_pdu2_commands from pus_tc.devs.pdu1 import pack_pdu1_commands from pus_tc.devs.bpx_batt import pack_bpx_commands -from pus_tc.devs.acu import pack_acu_test_into +from pus_tc.devs.acu import pack_acu_commands from pus_tc.devs.solar_array_deployment import pack_solar_array_deployment_test_into from pus_tc.devs.imtq import pack_imtq_test_into from pus_tc.devs.tmp1075 import pack_tmp1075_test_into @@ -44,6 +44,7 @@ from pus_tc.system.proc import pack_proc_commands from pus_tc.system.controllers import pack_controller_commands from config.definitions import CustomServiceList from config.object_ids import ( + get_object_ids, P60_DOCK_HANDLER, PDU_1_HANDLER_ID, PDU_2_HANDLER_ID, @@ -98,6 +99,7 @@ def pre_tc_send_cb( def pack_service_queue_user( service: Union[str, int], op_code: str, service_queue: TcQueueT ): + obj_id_man = get_object_ids() if service == CoreServiceList.SERVICE_5.value: return pack_generic_service5_test_into(tc_queue=service_queue) if service == CoreServiceList.SERVICE_17.value: @@ -107,23 +109,23 @@ def pack_service_queue_user( if service == CoreServiceList.SERVICE_200.value: return pack_service200_test_into(tc_queue=service_queue) if service == CustomServiceList.P60DOCK.value: - object_id = P60_DOCK_HANDLER + object_id = obj_id_man.get(P60_DOCK_HANDLER) return pack_p60dock_cmds( object_id=object_id, tc_queue=service_queue, op_code=op_code ) if service == CustomServiceList.PDU1.value: - object_id = PDU_1_HANDLER_ID + object_id = obj_id_man.get(PDU_1_HANDLER_ID) return pack_pdu1_commands( object_id=object_id, tc_queue=service_queue, op_code=op_code ) if service == CustomServiceList.PDU2.value: - object_id = PDU_2_HANDLER_ID + object_id = obj_id_man.get(PDU_2_HANDLER_ID) return pack_pdu2_commands( object_id=object_id, tc_queue=service_queue, op_code=op_code ) if service == CustomServiceList.ACU.value: - object_id = ACU_HANDLER_ID - return pack_acu_test_into( + object_id = obj_id_man.get(ACU_HANDLER_ID) + return pack_acu_commands( object_id=object_id, tc_queue=service_queue, op_code=op_code ) if service == CustomServiceList.BPX_BATTERY.value: diff --git a/pus_tm/devs/pcdu.py b/pus_tm/devs/pcdu.py index d33ddc6..e0ac54e 100644 --- a/pus_tm/devs/pcdu.py +++ b/pus_tm/devs/pcdu.py @@ -292,3 +292,13 @@ def handle_p60_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): printer.print_validity_buffer( validity_buffer=hk_data[current_idx:], num_vars=27 ) + + +def handle_acu_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): + pw = PrintWrapper(printer=printer) + if set_id == SetIds.ACU_CORE: + pw.dlog("Received ACU Core HK. Voltages in mV, currents in mA") + pass + if set_id == SetIds.ACU_AUX: + pw.dlog("Received ACU Aux HK. Voltages in mV, currents in mA") + pass -- 2.43.0 From c01f1d01916160888cf745ee3db6a2c46d795408 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Mon, 23 May 2022 11:25:33 +0200 Subject: [PATCH 2/7] run black --- gomspace/gomspace_common.py | 4 +++- pus_tc/devs/acu.py | 32 ++++++++++++++++++-------------- pus_tc/devs/pcdu.py | 10 ++++++---- tmtccmd | 2 +- 4 files changed, 28 insertions(+), 20 deletions(-) diff --git a/gomspace/gomspace_common.py b/gomspace/gomspace_common.py index 5fe1cb9..6e82950 100644 --- a/gomspace/gomspace_common.py +++ b/gomspace/gomspace_common.py @@ -139,7 +139,9 @@ def pack_ping_command(object_id: ObjectId, data: bytearray) -> PusTelecommand: data are simply copied by the device and then sent back. """ return generate_action_command( - object_id=object_id.as_bytes, action_id=GomspaceDeviceActionIds.PING, app_data=data + object_id=object_id.as_bytes, + action_id=GomspaceDeviceActionIds.PING, + app_data=data, ) diff --git a/pus_tc/devs/acu.py b/pus_tc/devs/acu.py index 4379dca..06d6533 100644 --- a/pus_tc/devs/acu.py +++ b/pus_tc/devs/acu.py @@ -7,7 +7,11 @@ from config.definitions import CustomServiceList from tmtccmd.config import add_op_code_entry, add_service_op_code_entry from tmtccmd.tc.packer import TcQueueT from tmtccmd.config.definitions import QueueCommands, ServiceOpCodeDictT -from tmtccmd.tc.pus_3_fsfw_hk import make_sid, generate_one_diag_command, generate_one_hk_command +from tmtccmd.tc.pus_3_fsfw_hk import ( + make_sid, + generate_one_diag_command, + generate_one_hk_command, +) import gomspace.gomspace_common as gs from gomspace.gomspace_common import GomspaceOpCodes from gomspace.gomspace_common import Info as GsInfo @@ -47,28 +51,24 @@ def add_acu_cmds(cmd_dict: ServiceOpCodeDictT): add_op_code_entry( op_code_dict=op_code_dict, keys=GomspaceOpCodes.REQUEST_CORE_HK_ONCE, - info=GsInfo.REQUEST_CORE_HK_ONCE + info=GsInfo.REQUEST_CORE_HK_ONCE, ) add_op_code_entry( op_code_dict=op_code_dict, keys=GomspaceOpCodes.REQUEST_AUX_HK_ONCE, - info=GsInfo.REQUEST_AUX_HK_ONCE + info=GsInfo.REQUEST_AUX_HK_ONCE, ) add_op_code_entry( op_code_dict=op_code_dict, keys=GomspaceOpCodes.REQUEST_AUX_HK_ONCE, - info=GsInfo.REQUEST_AUX_HK_ONCE - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=OpCodes.TEST, - info=Info.TEST + info=GsInfo.REQUEST_AUX_HK_ONCE, ) + add_op_code_entry(op_code_dict=op_code_dict, keys=OpCodes.TEST, info=Info.TEST) add_service_op_code_entry( srv_op_code_dict=cmd_dict, op_code_entry=op_code_dict, name=CustomServiceList.ACU.value, - info="ACU Device" + info="ACU Device", ) @@ -80,16 +80,20 @@ def pack_acu_commands( tc_queue.appendleft((QueueCommands.PRINT, "ACU: Print channel stats")) command = generate_action_command( object_id=object_id.as_bytes, - action_id=gs.GomspaceDeviceActionIds.PRINT_SWITCH_V_I + action_id=gs.GomspaceDeviceActionIds.PRINT_SWITCH_V_I, ) tc_queue.appendleft(command.pack_command_tuple()) if op_code in GomspaceOpCodes.REQUEST_CORE_HK_ONCE: - tc_queue.appendleft((QueueCommands.PRINT, f"PDU1: {GsInfo.REQUEST_CORE_HK_ONCE}")) + tc_queue.appendleft( + (QueueCommands.PRINT, f"PDU1: {GsInfo.REQUEST_CORE_HK_ONCE}") + ) hk_sid = make_sid(object_id=object_id.as_bytes, set_id=gs.SetIds.ACU_CORE) command = generate_one_diag_command(sid=hk_sid, ssc=0) tc_queue.appendleft(command.pack_command_tuple()) if op_code in GomspaceOpCodes.REQUEST_AUX_HK_ONCE: - tc_queue.appendleft((QueueCommands.PRINT, f"PDU1: {GsInfo.REQUEST_AUX_HK_ONCE}")) + tc_queue.appendleft( + (QueueCommands.PRINT, f"PDU1: {GsInfo.REQUEST_AUX_HK_ONCE}") + ) hk_sid = make_sid(object_id=object_id.as_bytes, set_id=gs.SetIds.ACU_AUX) command = generate_one_hk_command(sid=hk_sid, ssc=0) tc_queue.appendleft(command.pack_command_tuple()) @@ -208,4 +212,4 @@ def pack_test_cmds(object_id: ObjectId, tc_queue: TcQueueT): P60DockConfigTable.out_en_0.parameter_size, gs.Channel.off, ) - tc_queue.appendleft(command.pack_command_tuple()) \ No newline at end of file + tc_queue.appendleft(command.pack_command_tuple()) diff --git a/pus_tc/devs/pcdu.py b/pus_tc/devs/pcdu.py index 9cab064..bd95d3d 100644 --- a/pus_tc/devs/pcdu.py +++ b/pus_tc/devs/pcdu.py @@ -1,6 +1,10 @@ from config.definitions import CustomServiceList -from tmtccmd.config import ServiceOpCodeDictT, add_op_code_entry, add_service_op_code_entry, \ - OpCodeDictKeys +from tmtccmd.config import ( + ServiceOpCodeDictT, + add_op_code_entry, + add_service_op_code_entry, + OpCodeDictKeys, +) from pus_tc.devs.p60dock import P60OpCodes, GomspaceOpCodes, Info from pus_tc.devs.pdu1 import Pdu1OpCodes @@ -291,5 +295,3 @@ def add_pcdu_cmds(cmd_dict: ServiceOpCodeDictT): add_pdu1_cmds(cmd_dict) add_pdu2_cmds(cmd_dict) add_acu_cmds(cmd_dict) - - diff --git a/tmtccmd b/tmtccmd index 4fbbf12..0e193f9 160000 --- a/tmtccmd +++ b/tmtccmd @@ -1 +1 @@ -Subproject commit 4fbbf129e140e593b1cc54a0361fa20cc8726789 +Subproject commit 0e193f9c76973a6105926ed133b179f8ea467981 -- 2.43.0 From 580b3818489a87f9ea1b31754c7b0285c2bb6a10 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Mon, 23 May 2022 12:38:50 +0200 Subject: [PATCH 3/7] completed ACU hk parsing --- pus_tc/cmd_definitions.py | 1 + pus_tm/devs/pcdu.py | 136 ++++++++++++++++++++++++++++++++------ 2 files changed, 115 insertions(+), 22 deletions(-) diff --git a/pus_tc/cmd_definitions.py b/pus_tc/cmd_definitions.py index 5aa0075..8f812c7 100644 --- a/pus_tc/cmd_definitions.py +++ b/pus_tc/cmd_definitions.py @@ -1,4 +1,5 @@ from pus_tc.devs.gps import GpsOpCodes +from pus_tc.devs.pcdu import add_pcdu_cmds from tmtccmd.config import ( add_op_code_entry, add_service_op_code_entry, diff --git a/pus_tm/devs/pcdu.py b/pus_tm/devs/pcdu.py index e0ac54e..2134352 100644 --- a/pus_tm/devs/pcdu.py +++ b/pus_tm/devs/pcdu.py @@ -1,4 +1,5 @@ import struct +from typing import List, Tuple from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter from pus_tm.defs import PrintWrapper @@ -87,6 +88,26 @@ class WdtInfo: return current_idx +class DevicesInfoParser: + def __int__(self): + self.dev_types = [] + self.dev_statuses = [] + + def parse(self, hk_data: bytes, current_idx: int) -> int: + for idx in range(8): + self.dev_types.append(hk_data[current_idx]) + current_idx += 1 + for idx in range(8): + self.dev_statuses.append(hk_data[current_idx]) + current_idx += 1 + return current_idx + + def print(self, pw: PrintWrapper): + pw.dlog(f"Device Type | Device State (0:None | 1:OK | 3:ERROR | 4:NOT FOUND)") + for i in range(len(self.dev_types)): + pw.dlog(f"{self.dev_types} | {self.dev_statuses}") + + def handle_pdu_data( printer: FsfwTmTcPrinter, pdu_idx: int, set_id: int, hk_data: bytes ): @@ -123,17 +144,13 @@ def handle_pdu_data( ) pw.dlog(content_line) current_idx += 2 - device_types = [] - for idx in range(len(PDU1_CHANNELS_NAMES)): - device_types.append(hk_data[current_idx]) - current_idx += 1 - device_statuses = [] - for idx in range(len(PDU1_CHANNELS_NAMES)): - device_statuses.append(hk_data[current_idx]) - current_idx += 1 + dev_parser = DevicesInfoParser() + current_idx = dev_parser.parse(hk_data=hk_data, current_idx=current_idx) wdt = WdtInfo(pw=pw) current_idx = wdt.parse(wdt_data=hk_data[current_idx:], current_idx=current_idx) wdt.print() + pw.dlog(f"PDU Device Types: 0:FRAM|1:ADC|2:ADC|3:TempSens|4,5,6,7:Reserved") + dev_parser.print(pw=pw) if set_id == SetIds.PDU_1_CORE or set_id == SetIds.PDU_2_CORE: pw.dlog(f"Received PDU HK from PDU {pdu_idx}") current_list = [] @@ -153,8 +170,7 @@ def handle_pdu_data( output_enb_list.append(hk_data[current_idx]) current_idx += 1 header_str = f"{'Name'.ljust(24)} | OutEnb | U [mV] | I [mA]" - print(header_str) - printer.file_logger.info(header_str) + pw.dlog(header_str) for idx in range(len(PDU1_CHANNELS_NAMES)): out_enb = f"{output_enb_list[idx]}".ljust(6) content_line = ( @@ -196,8 +212,7 @@ def handle_p60_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): out_enb_list.append(hk_data[current_idx]) current_idx += 1 header_str = f"{'Name'.ljust(24)} | OutEnb | U [mV] | I [mA]" - print(header_str) - printer.file_logger.info(header_str) + pw.dlog(header_str) for idx in range(13): out_enb = f"{out_enb_list[idx]}".ljust(6) content_line = ( @@ -262,14 +277,8 @@ def handle_p60_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): ar6_depl, ) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len]) current_idx += inc_len - device_types = [] - device_statuses = [] - for idx in range(8): - device_types.append(hk_data[current_idx]) - current_idx += 1 - for idx in range(8): - device_statuses.append(hk_data[current_idx]) - current_idx += 1 + dev_parser = DevicesInfoParser() + current_idx = dev_parser.parse(hk_data=hk_data, current_idx=current_idx) util_info = ( f"Reset Cause {reset_cause} | Boot Cause {boot_cause} | Uptime {uptime}" ) @@ -289,16 +298,99 @@ def handle_p60_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): f"Charge Current {batt_charge_current} | Discharge Current {batt_discharge_current}" ) pw.dlog(batt_info) + pw.dlog( + "P60 Dock Dev Types: 0:FRAM|1:ADC|2:ADC|3:ADC|4:TempSens|5:RTC|" + "6:TempSens(BatPack)|7:TempSens(BatPack)" + ) + dev_parser.print(pw=pw) printer.print_validity_buffer( validity_buffer=hk_data[current_idx:], num_vars=27 ) +def gen_six_entry_u16_list(hk_data: bytes, current_idx: int) -> Tuple[int, List[int]]: + u16_list = [] + for idx in range(6): + u16_list.append(hk_data[current_idx : current_idx + 2]) + current_idx += 2 + return current_idx, u16_list + + def handle_acu_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): pw = PrintWrapper(printer=printer) if set_id == SetIds.ACU_CORE: + current_idx = 0 + fmt_str = "!B" + inc_len = struct.calcsize(fmt_str) + mppt_mode = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len]) + current_idx += inc_len + current_idx, currents = gen_six_entry_u16_list( + hk_data=hk_data, current_idx=current_idx + ) + current_idx, voltages = gen_six_entry_u16_list( + hk_data=hk_data, current_idx=current_idx + ) + vcc = hk_data[current_idx : current_idx + 2] + current_idx += 2 + vbat = hk_data[current_idx : current_idx + 2] + current_idx += 2 + current_idx, vboosts = gen_six_entry_u16_list( + hk_data=hk_data, current_idx=current_idx + ) + current_idx, powers = gen_six_entry_u16_list( + hk_data=hk_data, current_idx=current_idx + ) + fmt_str = "!HHHIIHH" + inc_len = struct.calcsize(fmt_str) + (tmp0, tmp1, tmp2, bootcnt, uptime, mppt_time, mppt_period) = struct.unpack( + fmt_str, hk_data[current_idx : current_idx + inc_len] + ) + current_idx += inc_len pw.dlog("Received ACU Core HK. Voltages in mV, currents in mA") - pass + pw.dlog(f"VCC {vcc} mV | VBAT {vbat} mV | MPPT Mode {mppt_mode}") + header_str = f"Channel | Input U [mV] | Input I [mA] | U Boost [mV] | Power [?]" + pw.dlog(header_str) + for i in range(6): + pw.dlog(f"{i} | {voltages[i]} | {currents[i]} | {vboosts[i]} | {powers[i]}") + pw.dlog(f"Temperatures in C: Ch0 {tmp0} | Ch1 {tmp1} | Ch2 {tmp2}") + pw.dlog( + f"Boot Count {bootcnt} | Uptime {uptime} | " + f"MPPT Time {mppt_time} msec | MPPT Period {mppt_period} msec" + ) + printer.print_validity_buffer( + validity_buffer=hk_data[current_idx:], num_vars=12 + ) if set_id == SetIds.ACU_AUX: + current_idx = 0 + fmt_str = "!IBhHhh" + inc_len = struct.calcsize(fmt_str) + (dac_enb0, dac_enb1, dac_enb2) = struct.unpack( + fmt_str, hk_data[current_idx : current_idx + inc_len] + ) + current_idx += inc_len + current_idx, dac_channels_raw = gen_six_entry_u16_list( + hk_data=hk_data, current_idx=current_idx + ) + fmt_str = "!IHII" + inc_len = struct.calcsize(fmt_str) + (boot_cause, reset_cause, wdt_cnt_gnd, wdt_gnd_time_left) = struct.unpack( + fmt_str, hk_data[current_idx : current_idx + inc_len] + ) + current_idx += inc_len + dev_parser = DevicesInfoParser() + current_idx = dev_parser.parse(hk_data=hk_data, current_idx=current_idx) pw.dlog("Received ACU Aux HK. Voltages in mV, currents in mA") - pass + pw.dlog( + f"DAC Enable States: DAC 0 {dac_enb0} | DAC 1 {dac_enb1} | DAC 2 {dac_enb2}" + ) + pw.dlog(f"Boot Cause {boot_cause} | Reset Cause {reset_cause}") + pw.dlog( + f"Ground WDT: Reboot Count {wdt_cnt_gnd} | Time Left {wdt_gnd_time_left} sec" + ) + + pw.dlog( + f"ACU Dev Types: 0:FRAM|1:ADC|2:ADC|3:DAC|4:DAC|" + f"5:DAC|6:TempSens|7:Reserved" + ) + dev_parser.print(pw=pw) + printer.print_validity_buffer(validity_buffer=hk_data[current_idx:], num_vars=8) -- 2.43.0 From e03360917784fdf98a753a2ec493b0041473ccef Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Mon, 23 May 2022 13:47:01 +0200 Subject: [PATCH 4/7] run black --- pus_tm/devs/pcdu.py | 24 ++++++++++++++++-------- pus_tm/hk_handling.py | 4 +++- 2 files changed, 19 insertions(+), 9 deletions(-) diff --git a/pus_tm/devs/pcdu.py b/pus_tm/devs/pcdu.py index 2134352..c97bf5a 100644 --- a/pus_tm/devs/pcdu.py +++ b/pus_tm/devs/pcdu.py @@ -311,7 +311,7 @@ def handle_p60_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): def gen_six_entry_u16_list(hk_data: bytes, current_idx: int) -> Tuple[int, List[int]]: u16_list = [] for idx in range(6): - u16_list.append(hk_data[current_idx : current_idx + 2]) + u16_list.append(struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0]) current_idx += 2 return current_idx, u16_list @@ -322,7 +322,9 @@ def handle_acu_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): current_idx = 0 fmt_str = "!B" inc_len = struct.calcsize(fmt_str) - mppt_mode = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len]) + mppt_mode = struct.unpack( + fmt_str, hk_data[current_idx : current_idx + inc_len] + )[0] current_idx += inc_len current_idx, currents = gen_six_entry_u16_list( hk_data=hk_data, current_idx=current_idx @@ -330,9 +332,9 @@ def handle_acu_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): current_idx, voltages = gen_six_entry_u16_list( hk_data=hk_data, current_idx=current_idx ) - vcc = hk_data[current_idx : current_idx + 2] + vcc = struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0] current_idx += 2 - vbat = hk_data[current_idx : current_idx + 2] + vbat = struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0] current_idx += 2 current_idx, vboosts = gen_six_entry_u16_list( hk_data=hk_data, current_idx=current_idx @@ -348,13 +350,19 @@ def handle_acu_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): current_idx += inc_len pw.dlog("Received ACU Core HK. Voltages in mV, currents in mA") pw.dlog(f"VCC {vcc} mV | VBAT {vbat} mV | MPPT Mode {mppt_mode}") - header_str = f"Channel | Input U [mV] | Input I [mA] | U Boost [mV] | Power [?]" + header_str = ( + f"Channel | Input U [mV] | Input I [mA] | U Boost [mV] | Power [mW]" + ) pw.dlog(header_str) for i in range(6): - pw.dlog(f"{i} | {voltages[i]} | {currents[i]} | {vboosts[i]} | {powers[i]}") - pw.dlog(f"Temperatures in C: Ch0 {tmp0} | Ch1 {tmp1} | Ch2 {tmp2}") + pw.dlog( + f"{i} | {str(voltages[i]).ljust(4)} | {str(currents[i]).ljust(4)} | {str(vboosts[i]).ljust(4)} | {str(powers[i]).ljust(2)}" + ) pw.dlog( - f"Boot Count {bootcnt} | Uptime {uptime} | " + f"Temperatures in C: Ch0 {tmp0/10.0} | Ch1 {tmp1/10.0} | Ch2 {tmp2/10.0}" + ) + pw.dlog( + f"Boot Count {bootcnt} | Uptime {uptime} sec | " f"MPPT Time {mppt_time} msec | MPPT Period {mppt_period} msec" ) printer.print_validity_buffer( diff --git a/pus_tm/hk_handling.py b/pus_tm/hk_handling.py index a7178b0..81166da 100644 --- a/pus_tm/hk_handling.py +++ b/pus_tm/hk_handling.py @@ -15,7 +15,7 @@ from pus_tm.devs.bpx_bat import handle_bpx_hk_data from pus_tm.devs.gps import handle_gps_data from pus_tm.devs.gyros import handle_gyros_hk_data from pus_tm.devs.imtq_mgt import handle_self_test_data -from pus_tm.devs.pcdu import handle_pdu_data, handle_p60_hk_data +from pus_tm.devs.pcdu import handle_pdu_data, handle_p60_hk_data, handle_acu_hk_data from pus_tm.devs.syrlinks import handle_syrlinks_hk_data from pus_tc.devs.imtq import ImtqSetIds from pus_tm.devs.reaction_wheels import handle_rw_hk_data @@ -92,6 +92,8 @@ def handle_regular_hk_print( return handle_pdu_data( printer=printer, pdu_idx=2, set_id=set_id, hk_data=hk_data ) + if objb == obj_ids.ACU_HANDLER_ID: + return handle_acu_hk_data(printer=printer, hk_data=hk_data, set_id=set_id) if objb in [obj_ids.RW1_ID, obj_ids.RW2_ID, obj_ids.RW3_ID, obj_ids.RW4_ID]: return handle_rw_hk_data( printer=printer, object_id=object_id, set_id=set_id, hk_data=hk_data -- 2.43.0 From b9de13ffe7a4f8112e2e7732049816833f89d17d Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Mon, 23 May 2022 14:04:34 +0200 Subject: [PATCH 5/7] something not working --- pus_tm/devs/pcdu.py | 13 ++++++++----- pus_tm/hk_handling.py | 17 +++++++++++------ 2 files changed, 19 insertions(+), 11 deletions(-) diff --git a/pus_tm/devs/pcdu.py b/pus_tm/devs/pcdu.py index c97bf5a..de62cb3 100644 --- a/pus_tm/devs/pcdu.py +++ b/pus_tm/devs/pcdu.py @@ -89,11 +89,13 @@ class WdtInfo: class DevicesInfoParser: - def __int__(self): - self.dev_types = [] - self.dev_statuses = [] + def __init__(self): + self.dev_types = None + self.dev_statuses = None def parse(self, hk_data: bytes, current_idx: int) -> int: + self.dev_types = [] + self.dev_statuses = [] for idx in range(8): self.dev_types.append(hk_data[current_idx]) current_idx += 1 @@ -356,7 +358,8 @@ def handle_acu_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): pw.dlog(header_str) for i in range(6): pw.dlog( - f"{i} | {str(voltages[i]).ljust(4)} | {str(currents[i]).ljust(4)} | {str(vboosts[i]).ljust(4)} | {str(powers[i]).ljust(2)}" + f"{i} | {str(voltages[i]).ljust(4)} | {str(currents[i]).ljust(4)} | " + f"{str(vboosts[i]).ljust(4)} | {str(powers[i]).ljust(2)}" ) pw.dlog( f"Temperatures in C: Ch0 {tmp0/10.0} | Ch1 {tmp1/10.0} | Ch2 {tmp2/10.0}" @@ -370,7 +373,7 @@ def handle_acu_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): ) if set_id == SetIds.ACU_AUX: current_idx = 0 - fmt_str = "!IBhHhh" + fmt_str = "!HHH" inc_len = struct.calcsize(fmt_str) (dac_enb0, dac_enb1, dac_enb2) = struct.unpack( fmt_str, hk_data[current_idx : current_idx + inc_len] diff --git a/pus_tm/hk_handling.py b/pus_tm/hk_handling.py index 81166da..b67299f 100644 --- a/pus_tm/hk_handling.py +++ b/pus_tm/hk_handling.py @@ -48,12 +48,17 @@ def handle_hk_packet( set_id=tm_packet.set_id, hk_data=hk_data, ) - handle_regular_hk_print( - printer=printer, - object_id=named_obj_id, - hk_packet=tm_packet, - hk_data=hk_data, - ) + try: + handle_regular_hk_print( + printer=printer, + object_id=named_obj_id, + hk_packet=tm_packet, + hk_data=hk_data, + ) + except ValueError as e: + LOGGER.exception( + f"{e} error when parsing HK data coming from {named_obj_id}" + ) if tm_packet.subservice == 10 or tm_packet.subservice == 12: LOGGER.warning("HK definitions printout not implemented yet") -- 2.43.0 From 2698756b2946339432121f7ea089d23bef5808b1 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Mon, 23 May 2022 18:32:59 +0200 Subject: [PATCH 6/7] update changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index f351cc8..0a24f0b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,8 @@ list yields a list of all related PRs for each release. https://egit.irs.uni-stuttgart.de/eive/eive-tmtc/pulls/74 https://egit.irs.uni-stuttgart.de/eive/eive-tmtc/pulls/79 https://egit.irs.uni-stuttgart.de/eive/eive-tmtc/pulls/73 +- Add ACU HK parsing + PR: https://egit.irs.uni-stuttgart.de/eive/eive-tmtc/pulls/80 # [v1.11.0] -- 2.43.0 From 7f03dcb3a613d26462ac2571140ed6ecb91dd9f0 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Mon, 23 May 2022 18:59:19 +0200 Subject: [PATCH 7/7] bugfix for acu hk parsing --- pus_tm/devs/pcdu.py | 43 ++++++++++++++++++++++++++++++------------- 1 file changed, 30 insertions(+), 13 deletions(-) diff --git a/pus_tm/devs/pcdu.py b/pus_tm/devs/pcdu.py index de62cb3..ae24d7e 100644 --- a/pus_tm/devs/pcdu.py +++ b/pus_tm/devs/pcdu.py @@ -104,10 +104,31 @@ class DevicesInfoParser: current_idx += 1 return current_idx + def map_idx_to_type(self, devtype: int) -> str: + if devtype == 0: + return "Reserved" + if devtype == 1: + return "ADC" + if devtype == 2: + return "ADC" + if devtype == 3: + return "DAC" + if devtype == 4: + return "Temperature Sensor" + if devtype == 5: + return "Temperature Sensor (Bat Pack)" + if devtype == 6: + return "RTC" + if devtype == 7: + return "FRAM" + return "Unknown Type" + def print(self, pw: PrintWrapper): pw.dlog(f"Device Type | Device State (0:None | 1:OK | 3:ERROR | 4:NOT FOUND)") for i in range(len(self.dev_types)): - pw.dlog(f"{self.dev_types} | {self.dev_statuses}") + pw.dlog( + f"{self.map_idx_to_type(self.dev_types[i])} | {self.dev_statuses[i]}" + ) def handle_pdu_data( @@ -321,13 +342,8 @@ def gen_six_entry_u16_list(hk_data: bytes, current_idx: int) -> Tuple[int, List[ def handle_acu_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): pw = PrintWrapper(printer=printer) if set_id == SetIds.ACU_CORE: - current_idx = 0 - fmt_str = "!B" - inc_len = struct.calcsize(fmt_str) - mppt_mode = struct.unpack( - fmt_str, hk_data[current_idx : current_idx + inc_len] - )[0] - current_idx += inc_len + mppt_mode = hk_data[0] + current_idx = 1 current_idx, currents = gen_six_entry_u16_list( hk_data=hk_data, current_idx=current_idx ) @@ -373,11 +389,11 @@ def handle_acu_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): ) if set_id == SetIds.ACU_AUX: current_idx = 0 - fmt_str = "!HHH" + fmt_str = "!BBB" inc_len = struct.calcsize(fmt_str) - (dac_enb0, dac_enb1, dac_enb2) = struct.unpack( - fmt_str, hk_data[current_idx : current_idx + inc_len] - ) + enb_tuple = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len]) + (dac_enb0, dac_enb1, dac_enb2) = enb_tuple + dac_enb_str = ["on" if entry == 1 else "off" for entry in enb_tuple] current_idx += inc_len current_idx, dac_channels_raw = gen_six_entry_u16_list( hk_data=hk_data, current_idx=current_idx @@ -391,8 +407,9 @@ def handle_acu_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): dev_parser = DevicesInfoParser() current_idx = dev_parser.parse(hk_data=hk_data, current_idx=current_idx) pw.dlog("Received ACU Aux HK. Voltages in mV, currents in mA") + pw.dlog( - f"DAC Enable States: DAC 0 {dac_enb0} | DAC 1 {dac_enb1} | DAC 2 {dac_enb2}" + f"DAC Enable States: DAC 0 {dac_enb_str[0]} | DAC 1 {dac_enb_str[1]} | DAC 2 {dac_enb_str[2]}" ) pw.dlog(f"Boot Cause {boot_cause} | Reset Cause {reset_cause}") pw.dlog( -- 2.43.0