diff --git a/config/object_ids.py b/config/object_ids.py index b0e450f..82089ba 100644 --- a/config/object_ids.py +++ b/config/object_ids.py @@ -94,6 +94,33 @@ RTD_13_PLPCDU_HSPD = bytes([0x44, 0x42, 0x00, 0x29]) RTD_14_TCS_BRD = bytes([0x44, 0x42, 0x00, 0x30]) RTD_15_IMTQ = bytes([0x44, 0x42, 0x00, 0x31]) +# SUS +""" +Name convention for SUS devices +SUS___LOC_XYZ_PT_ +LOC: Location +PT: Pointing +N/R: Nominal/Redundant +F/M/B: Forward/Middle/Backwards +""" +SUS_0_N_LOC_XFYFZM_PT_XF = bytes([0x44, 0x12, 0x00, 0x32]) +SUS_6_R_LOC_XFYBZM_PT_XF = bytes([0x44, 0x12, 0x00, 0x38]) + +SUS_1_N_LOC_XBYFZM_PT_XB = bytes([0x44, 0x12, 0x00, 0x33]) +SUS_7_R_LOC_XBYBZM_PT_XB = bytes([0x44, 0x12, 0x00, 0x39]) + +SUS_2_N_LOC_XFYBZB_PT_YB = bytes([0x44, 0x12, 0x00, 0x34]) +SUS_8_R_LOC_XBYBZB_PT_YB = bytes([0x44, 0x12, 0x00, 0x40]) + +SUS_3_N_LOC_XFYBZF_PT_YF = bytes([0x44, 0x12, 0x00, 0x35]) +SUS_9_R_LOC_XBYBZB_PT_YF = bytes([0x44, 0x12, 0x00, 0x41]) + +SUS_4_N_LOC_XMYFZF_PT_ZF = bytes([0x44, 0x12, 0x00, 0x36]) +SUS_10_N_LOC_XMYBZF_PT_ZF = bytes([0x44, 0x12, 0x00, 0x42]) + +SUS_5_N_LOC_XFYMZB_PT_ZB = bytes([0x44, 0x12, 0x00, 0x37]) +SUS_11_R_LOC_XBYMZB_PT_ZB = bytes([0x44, 0x12, 0x00, 0x43]) + # System and Assembly Objects ACS_BOARD_ASS_ID = bytes([0x73, 0x00, 0x00, 0x01]) SUS_BOARD_ASS_ID = bytes([0x73, 0x00, 0x00, 0x02]) diff --git a/pus_tc/cmd_definitions.py b/pus_tc/cmd_definitions.py index 22baa12..49253b3 100644 --- a/pus_tc/cmd_definitions.py +++ b/pus_tc/cmd_definitions.py @@ -1,7 +1,3 @@ -from pus_tc.devs.gps import add_gps_cmds -from pus_tc.devs.pcdu import add_pcdu_cmds -from pus_tc.devs.rad_sensor import add_rad_sens_cmds -from pus_tc.system.core import add_core_controller_definitions from tmtccmd.config import ( add_op_code_entry, add_service_op_code_entry, @@ -9,12 +5,20 @@ from tmtccmd.config import ( ServiceOpCodeDictT, OpCodeDictKeys, ) -from config.definitions import CustomServiceList +from tmtccmd.config.globals import get_default_service_op_code_dict + +from pus_tc.devs.gps import add_gps_cmds +from pus_tc.devs.gps import GpsOpCodes +from pus_tc.devs.pcdu import add_pcdu_cmds +from pus_tc.devs.plpcdu import add_pl_pcdu_cmds +from pus_tc.devs.rad_sensor import add_rad_sens_cmds +from pus_tc.system.core import add_core_controller_definitions from pus_tc.devs.heater import add_heater_cmds from pus_tc.devs.rtd import specify_rtd_cmds from pus_tc.devs.reaction_wheels import add_rw_cmds from pus_tc.devs.bpx_batt import BpxOpCodes -from tmtccmd.config.globals import get_default_service_op_code_dict + +from config.definitions import CustomServiceList def get_eive_service_op_code_dict() -> ServiceOpCodeDictT: @@ -313,88 +317,6 @@ def add_bpx_cmd_definitions(cmd_dict: ServiceOpCodeDictT): ) -def add_pl_pcdu_cmds(cmd_dict: ServiceOpCodeDictT): - from pus_tc.devs.plpcdu import OpCodes, Info - op_code_dict = dict() - add_op_code_entry( - op_code_dict=op_code_dict, keys=OpCodes.SWITCH_ON, info=Info.SWITCH_ON - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=OpCodes.NORMAL_SSR, - info=Info.NORMAL_SSR, - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=OpCodes.NORMAL_DRO, - info=Info.NORMAL_DRO, - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=OpCodes.NORMAL_X8, - info=Info.NORMAL_X8, - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=OpCodes.NORMAL_TX, - info=Info.NORMAL_TX, - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=OpCodes.NORMAL_MPA, - info=Info.NORMAL_MPA, - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=OpCodes.NORMAL_HPA, - info=Info.NORMAL_HPA, - ) - add_op_code_entry( - op_code_dict=op_code_dict, keys=OpCodes.SWITCH_OFF, info=Info.SWITCH_OFF - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=OpCodes.UPDATE_DRO_TO_X8_WAIT, - info="Update DRO to X8 wait time", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=OpCodes.INJECT_SSR_TO_DRO_FAILURE, - info="Inject failure SSR to DRO transition", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=OpCodes.INJECT_DRO_TO_X8_FAILURE, - info="Inject failure in DRO to X8 transition", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=OpCodes.INJECT_X8_TO_TX_FAILURE, - info="Inject failure in X8 to TX transition", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=OpCodes.INJECT_TX_TO_MPA_FAILURE, - info="Inject failure in TX to MPA transition", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=OpCodes.INJECT_MPA_TO_HPA_FAILURE, - info="Inject failure in MPA to HPA transition", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=OpCodes.INJECT_ALL_ON_FAILURE, - info="Inject failure in all on mode", - ) - add_service_op_code_entry( - srv_op_code_dict=cmd_dict, - name=CustomServiceList.PL_PCDU.value, - info="PL PCDU", - op_code_entry=op_code_dict, - ) - - def add_time_cmds(cmd_dict: ServiceOpCodeDictT): from pus_tc.system.time import OpCodes, Info @@ -414,15 +336,20 @@ def add_time_cmds(cmd_dict: ServiceOpCodeDictT): def add_imtq_cmds(cmd_dict: ServiceOpCodeDictT): op_code_dict_srv_imtq = { - "0": ("IMTQ Tests All", {OpCodeDictKeys.TIMEOUT: 2.0}), - "1": ("IMTQ perform pos X self test", {OpCodeDictKeys.TIMEOUT: 2.0}), - "2": ("IMTQ perform neg X self test", {OpCodeDictKeys.TIMEOUT: 2.0}), - "3": ("IMTQ perform pos Y self test", {OpCodeDictKeys.TIMEOUT: 2.0}), - "4": ("IMTQ perform neg Y self test", {OpCodeDictKeys.TIMEOUT: 2.0}), - "5": ("IMTQ perform pos Z self test", {OpCodeDictKeys.TIMEOUT: 2.0}), - "6": ("IMTQ perform neg Z self test", {OpCodeDictKeys.TIMEOUT: 2.0}), - "7": ("IMTQ command dipole", {OpCodeDictKeys.TIMEOUT: 2.0}), - "8": ("IMTQ get commanded dipole", {OpCodeDictKeys.TIMEOUT: 2.0}), + "0": ("Mode off", {OpCodeDictKeys.TIMEOUT: 2.0}), + "1": ("Mode on", {OpCodeDictKeys.TIMEOUT: 2.0}), + "2": ("Mode normal", {OpCodeDictKeys.TIMEOUT: 2.0}), + "3": ("IMTQ perform pos X self test", {OpCodeDictKeys.TIMEOUT: 2.0}), + "4": ("IMTQ perform neg X self test", {OpCodeDictKeys.TIMEOUT: 2.0}), + "5": ("IMTQ perform pos Y self test", {OpCodeDictKeys.TIMEOUT: 2.0}), + "6": ("IMTQ perform neg Y self test", {OpCodeDictKeys.TIMEOUT: 2.0}), + "7": ("IMTQ perform pos Z self test", {OpCodeDictKeys.TIMEOUT: 2.0}), + "8": ("IMTQ perform neg Z self test", {OpCodeDictKeys.TIMEOUT: 2.0}), + "9": ("IMTQ command dipole", {OpCodeDictKeys.TIMEOUT: 2.0}), + "10": ("IMTQ get commanded dipole", {OpCodeDictKeys.TIMEOUT: 2.0}), + "11": ("IMTQ get engineering hk set", {OpCodeDictKeys.TIMEOUT: 2.0}), + "12": ("IMTQ get calibrated MTM measurement one shot", {OpCodeDictKeys.TIMEOUT: 2.0}), + "13": ("IMTQ get raw MTM measurement one shot", {OpCodeDictKeys.TIMEOUT: 2.0}), } service_imtq_tuple = ("IMTQ Device", op_code_dict_srv_imtq) cmd_dict[CustomServiceList.IMTQ.value] = service_imtq_tuple diff --git a/pus_tc/devs/imtq.py b/pus_tc/devs/imtq.py index 7c1f136..754c8a6 100644 --- a/pus_tc/devs/imtq.py +++ b/pus_tc/devs/imtq.py @@ -9,7 +9,8 @@ from tmtccmd.config.definitions import QueueCommands from tmtccmd.tc.packer import TcQueueT from spacepackets.ecss.tc import PusTelecommand -from tmtccmd.tc.pus_3_fsfw_hk import make_sid, generate_one_hk_command +from tmtccmd.tc.pus_3_fsfw_hk import make_sid, generate_one_diag_command, generate_one_hk_command +from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes class ImtqSetIds: @@ -48,7 +49,22 @@ def pack_imtq_test_into( ) ) - if op_code == "0" or op_code == "1": + if op_code == "0": + tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Set mode off")) + command = pack_mode_data(object_id, Modes.OFF, 0) + command = PusTelecommand(service=200, subservice=1, ssc=9, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + if op_code == "1": + tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Set mode on")) + command = pack_mode_data(object_id, Modes.ON, 0) + command = PusTelecommand(service=200, subservice=1, ssc=10, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + if op_code == "2": + tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Mode Normal")) + command = pack_mode_data(object_id, Modes.NORMAL, 0) + command = PusTelecommand(service=200, subservice=1, ssc=11, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + if op_code == "3": tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Perform positive x self test")) command = object_id + ImtqActionIds.perform_positive_x_test command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command) @@ -74,7 +90,7 @@ def pack_imtq_test_into( command = generate_one_hk_command(sid, 24) tc_queue.appendleft(command.pack_command_tuple()) - if op_code == "0" or op_code == "2": + if op_code == "4": tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Perform negative x self test")) command = object_id + ImtqActionIds.perform_negative_x_test command = PusTelecommand(service=8, subservice=128, ssc=25, app_data=command) @@ -100,7 +116,7 @@ def pack_imtq_test_into( command = generate_one_hk_command(sid, 27) tc_queue.appendleft(command.pack_command_tuple()) - if op_code == "0" or op_code == "3": + if op_code == "5": tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Perform positive y self test")) command = object_id + ImtqActionIds.perform_positive_y_test command = PusTelecommand(service=8, subservice=128, ssc=28, app_data=command) @@ -126,7 +142,7 @@ def pack_imtq_test_into( command = generate_one_hk_command(sid, 30) tc_queue.appendleft(command.pack_command_tuple()) - if op_code == "0" or op_code == "4": + if op_code == "6": tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Perform negative y self test")) command = object_id + ImtqActionIds.perform_negative_y_test command = PusTelecommand(service=8, subservice=128, ssc=31, app_data=command) @@ -152,7 +168,7 @@ def pack_imtq_test_into( command = generate_one_hk_command(sid, 33) tc_queue.appendleft(command.pack_command_tuple()) - if op_code == "0" or op_code == "5": + if op_code == "7": tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Perform positive z self test")) command = object_id + ImtqActionIds.perform_positive_z_test command = PusTelecommand(service=8, subservice=128, ssc=34, app_data=command) @@ -178,7 +194,7 @@ def pack_imtq_test_into( command = generate_one_hk_command(sid, 36) tc_queue.appendleft(command.pack_command_tuple()) - if op_code == "0" or op_code == "6": + if op_code == "8": tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Perform negative z self test")) command = object_id + ImtqActionIds.perform_negative_z_test command = PusTelecommand(service=8, subservice=128, ssc=35, app_data=command) @@ -204,28 +220,48 @@ def pack_imtq_test_into( command = generate_one_hk_command(sid, 37) tc_queue.appendleft(command.pack_command_tuple()) - if op_code == "0" or op_code == "7": + if op_code == "9": tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Commanding dipole")) x_dipole = 0 y_dipole = 0 z_dipole = 0 duration = 0 # ms command = pack_dipole_command(object_id, x_dipole, y_dipole, z_dipole, duration) - command = PusTelecommand(service=8, subservice=128, ssc=20, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) - if op_code == "0" or op_code == "8": + if op_code == "10": tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Get commanded dipole")) command = object_id + ImtqActionIds.get_commanded_dipole command = PusTelecommand(service=8, subservice=128, ssc=21, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) + if op_code == "11": + tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Get engineering hk set")) + command = generate_one_diag_command( + sid=make_sid(object_id=object_id, set_id=ImtqSetIds.ENG_HK_SET), ssc=0 + ) + tc_queue.appendleft(command.pack_command_tuple()) + + if op_code == "12": + tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Get calibrated MTM hk set")) + command = generate_one_diag_command( + sid=make_sid(object_id=object_id, set_id=ImtqSetIds.CAL_MTM_SET), ssc=0 + ) + tc_queue.appendleft(command.pack_command_tuple()) + + if op_code == "13": + tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Get raw MTM hk set")) + command = generate_one_diag_command( + sid=make_sid(object_id=object_id, set_id=ImtqSetIds.RAW_MTM_SET), ssc=0 + ) + tc_queue.appendleft(command.pack_command_tuple()) + return tc_queue def pack_dipole_command( object_id: bytearray, x_dipole: int, y_dipole: int, z_dipole: int, duration: int -) -> bytearray: +) -> PusTelecommand: """This function packs the command causing the ISIS IMTQ to generate a dipole. @param object_id The object id of the IMTQ handler. @param x_dipole The dipole of the x coil in 10^-4*Am^2 (max. 2000) @@ -242,4 +278,5 @@ def pack_dipole_command( command.extend(y_dipole.to_bytes(length=2, byteorder="big")) command.extend(z_dipole.to_bytes(length=2, byteorder="big")) command.extend(duration.to_bytes(length=2, byteorder="big")) + command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command) return command diff --git a/pus_tc/devs/ploc_supervisor.py b/pus_tc/devs/ploc_supervisor.py index 8222cc7..45810d3 100644 --- a/pus_tc/devs/ploc_supervisor.py +++ b/pus_tc/devs/ploc_supervisor.py @@ -126,7 +126,7 @@ def pack_ploc_supv_commands( command = PusTelecommand(service=200, subservice=1, ssc=10, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) if op_code == "2": - tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Mode Normal")) + tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Mode Normal")) command = pack_mode_data(object_id, Modes.NORMAL, 0) command = PusTelecommand(service=200, subservice=1, ssc=11, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) diff --git a/pus_tc/devs/plpcdu.py b/pus_tc/devs/plpcdu.py index 28775d0..1728753 100644 --- a/pus_tc/devs/plpcdu.py +++ b/pus_tc/devs/plpcdu.py @@ -1,8 +1,20 @@ import enum +import struct +import time from typing import Optional -from tmtccmd.config import QueueCommands +from config.definitions import CustomServiceList +from tmtccmd.config import ( + QueueCommands, + ServiceOpCodeDictT, + add_op_code_entry, + add_service_op_code_entry, +) from tmtccmd.tc.definitions import TcQueueT +from tmtccmd.tc.pus_11_tc_sched import ( + generate_enable_tc_sched_cmd, + generate_time_tagged_cmd, +) from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes, Subservices from tmtccmd.tc.pus_20_params import ( pack_scalar_double_param_app_data, @@ -17,27 +29,22 @@ LOGGER = get_console_logger() class OpCodes: - SWITCH_ON = ["0", "on"] - SWITCH_OFF = ["1", "off"] - NORMAL_SSR = ["2", "nml-ssr"] - NORMAL_DRO = ["3", "nml-dro"] - NORMAL_X8 = ["4", "nml-x8"] - NORMAL_TX = ["5", "nml-tx"] - NORMAL_MPA = ["6", "nml-mpa"] - NORMAL_HPA = ["7", "nml-hpa"] + SWITCH_HPA_ON_PROC = ["0", "proc-hpa"] + SWITCH_ON = ["2", "on"] + SWITCH_OFF = ["3", "off"] + NORMAL_SSR = ["4", "nml-ssr"] + NORMAL_DRO = ["5", "nml-dro"] + NORMAL_X8 = ["6", "nml-x8"] + NORMAL_TX = ["7", "nml-tx"] + NORMAL_MPA = ["8", "nml-mpa"] + NORMAL_HPA = ["9", "nml-hpa"] - INJECT_SSR_TO_DRO_FAILURE = ["10", "inject-ssr-dro-fault"] - INJECT_DRO_TO_X8_FAILURE = ["11", "inject-dro-x8-fault"] - INJECT_X8_TO_TX_FAILURE = ["12", "inject-x8-tx-fault"] - INJECT_TX_TO_MPA_FAILURE = ["13", "inject-tx-mpa-fault"] - INJECT_MPA_TO_HPA_FAILURE = ["14", "inject-mpa-hpa-fault"] - INJECT_ALL_ON_FAILURE = ["15", "inject-all-on-fault"] - - # The following commands might become deprecated in the future - UPDATE_DRO_TO_X8_WAIT = ["128", "dro-to-x8-wait"] - UPDATE_X8_TO_TX_WAIT_TIME = ["129", "x8-to-tx-wait"] - UPDATE_TX_TO_MPA_WAIT_TIME = ["130", "tx-to-mpa-wait"] - UPDATE_MPA_TO_HPA_WAIT_TIME = ["131", "mpa-to-hpa-wait"] + INJECT_SSR_TO_DRO_FAILURE = ["15", "inject-ssr-dro-fault"] + INJECT_DRO_TO_X8_FAILURE = ["16", "inject-dro-x8-fault"] + INJECT_X8_TO_TX_FAILURE = ["17", "inject-x8-tx-fault"] + INJECT_TX_TO_MPA_FAILURE = ["18", "inject-tx-mpa-fault"] + INJECT_MPA_TO_HPA_FAILURE = ["19", "inject-mpa-hpa-fault"] + INJECT_ALL_ON_FAILURE = ["20", "inject-all-on-fault"] class Info: @@ -51,6 +58,8 @@ class Info: NORMAL_MPA = f"{NORMAL}, MPA on" NORMAL_HPA = f"{NORMAL}, HPA on" + SWITCH_HPA_ON_PROC = "Full Procedure to switch HPA on" + class NormalSubmodesMask(enum.IntEnum): SOLID_STATE_RELAYS_ADC_ON = 0 @@ -99,6 +108,87 @@ class ParamIds(enum.IntEnum): INJECT_ALL_ON_FAILURE = 35 +def add_pl_pcdu_cmds(cmd_dict: ServiceOpCodeDictT): + op_code_dict = dict() + add_op_code_entry( + op_code_dict=op_code_dict, + keys=OpCodes.SWITCH_HPA_ON_PROC, + info=Info.SWITCH_HPA_ON_PROC, + ) + add_op_code_entry( + op_code_dict=op_code_dict, keys=OpCodes.SWITCH_ON, info=Info.SWITCH_ON + ) + add_op_code_entry( + op_code_dict=op_code_dict, keys=OpCodes.SWITCH_OFF, info=Info.SWITCH_OFF + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=OpCodes.NORMAL_SSR, + info=Info.NORMAL_SSR, + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=OpCodes.NORMAL_DRO, + info=Info.NORMAL_DRO, + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=OpCodes.NORMAL_X8, + info=Info.NORMAL_X8, + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=OpCodes.NORMAL_TX, + info=Info.NORMAL_TX, + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=OpCodes.NORMAL_MPA, + info=Info.NORMAL_MPA, + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=OpCodes.NORMAL_HPA, + info=Info.NORMAL_HPA, + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=OpCodes.INJECT_SSR_TO_DRO_FAILURE, + info="Inject failure SSR to DRO transition", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=OpCodes.INJECT_DRO_TO_X8_FAILURE, + info="Inject failure in DRO to X8 transition", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=OpCodes.INJECT_X8_TO_TX_FAILURE, + info="Inject failure in X8 to TX transition", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=OpCodes.INJECT_TX_TO_MPA_FAILURE, + info="Inject failure in TX to MPA transition", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=OpCodes.INJECT_MPA_TO_HPA_FAILURE, + info="Inject failure in MPA to HPA transition", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=OpCodes.INJECT_ALL_ON_FAILURE, + info="Inject failure in all on mode", + ) + add_service_op_code_entry( + srv_op_code_dict=cmd_dict, + name=CustomServiceList.PL_PCDU.value, + info="PL PCDU", + op_code_entry=op_code_dict, + ) + + def pack_pl_pcdu_commands(tc_queue: TcQueueT, op_code: str): if op_code in OpCodes.SWITCH_ON: pack_pl_pcdu_mode_cmd( @@ -113,68 +203,47 @@ def pack_pl_pcdu_commands(tc_queue: TcQueueT, op_code: str): tc_queue=tc_queue, info=Info.NORMAL_SSR, mode=Modes.NORMAL, - submode=(1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON), + submode=submode_mask_to_submode( + NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON + ), ) if op_code in OpCodes.NORMAL_DRO: pack_pl_pcdu_mode_cmd( tc_queue=tc_queue, info=Info.NORMAL_DRO, mode=Modes.NORMAL, - submode=( - 1 << NormalSubmodesMask.DRO_ON - | (1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON) - ), + submode=submode_mask_to_submode(NormalSubmodesMask.DRO_ON), ) if op_code in OpCodes.NORMAL_X8: pack_pl_pcdu_mode_cmd( tc_queue=tc_queue, info=Info.NORMAL_X8, mode=Modes.NORMAL, - submode=( - 1 << NormalSubmodesMask.DRO_ON - | (1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON) - | (1 << NormalSubmodesMask.X8_ON) - ), + submode=submode_mask_to_submode(NormalSubmodesMask.X8_ON), ) if op_code in OpCodes.NORMAL_TX: pack_pl_pcdu_mode_cmd( tc_queue=tc_queue, info=Info.NORMAL_TX, mode=Modes.NORMAL, - submode=( - 1 << NormalSubmodesMask.DRO_ON - | (1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON) - | (1 << NormalSubmodesMask.X8_ON) - | (1 << NormalSubmodesMask.TX_ON) - ), + submode=submode_mask_to_submode(NormalSubmodesMask.TX_ON), ) if op_code in OpCodes.NORMAL_MPA: pack_pl_pcdu_mode_cmd( tc_queue=tc_queue, info=Info.NORMAL_MPA, mode=Modes.NORMAL, - submode=( - 1 << NormalSubmodesMask.DRO_ON - | (1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON) - | (1 << NormalSubmodesMask.X8_ON) - | (1 << NormalSubmodesMask.TX_ON) - | (1 << NormalSubmodesMask.MPA_ON) - ), + submode=submode_mask_to_submode(NormalSubmodesMask.MPA_ON), ) if op_code in OpCodes.NORMAL_HPA: pack_pl_pcdu_mode_cmd( tc_queue=tc_queue, info=Info.NORMAL_HPA, mode=Modes.NORMAL, - submode=( - 1 << NormalSubmodesMask.DRO_ON - | (1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON) - | (1 << NormalSubmodesMask.X8_ON) - | (1 << NormalSubmodesMask.TX_ON) - | (1 << NormalSubmodesMask.MPA_ON) - | (1 << NormalSubmodesMask.HPA_ON) - ), + submode=submode_mask_to_submode(NormalSubmodesMask.HPA_ON), ) + if op_code in OpCodes.SWITCH_HPA_ON_PROC: + hpa_on_procedure(tc_queue) if op_code in OpCodes.INJECT_ALL_ON_FAILURE: pack_failure_injection_cmd( tc_queue=tc_queue, @@ -183,6 +252,129 @@ def pack_pl_pcdu_commands(tc_queue: TcQueueT, op_code: str): ) +def hpa_on_procedure(tc_queue: TcQueueT): + delay_dro_to_x8 = request_wait_time() + if delay_dro_to_x8 is None: + delay_dro_to_x8 = 900 + tc_queue.appendleft( + ( + QueueCommands.PRINT, + f"Starting procedure to switch on PL PCDU HPA with DRO to X8 delay of {delay_dro_to_x8} seconds", + ) + ) + pl_pcdu_on = PusTelecommand( + service=200, + subservice=Subservices.TC_MODE_COMMAND, + app_data=pack_mode_data(object_id=PL_PCDU_ID, mode=Modes.ON, submode=0), + ) + ssr_on = PusTelecommand( + service=200, + subservice=Subservices.TC_MODE_COMMAND, + app_data=pack_mode_data( + object_id=PL_PCDU_ID, + mode=Modes.NORMAL, + submode=submode_mask_to_submode( + NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON + ), + ), + ) + dro_on = PusTelecommand( + service=200, + subservice=Subservices.TC_MODE_COMMAND, + app_data=pack_mode_data( + object_id=PL_PCDU_ID, + mode=Modes.NORMAL, + submode=submode_mask_to_submode(NormalSubmodesMask.DRO_ON), + ), + ) + x8_on = PusTelecommand( + service=200, + subservice=Subservices.TC_MODE_COMMAND, + app_data=pack_mode_data( + object_id=PL_PCDU_ID, + mode=Modes.NORMAL, + submode=submode_mask_to_submode(NormalSubmodesMask.X8_ON), + ), + ) + tx_on = PusTelecommand( + service=200, + subservice=Subservices.TC_MODE_COMMAND, + app_data=pack_mode_data( + object_id=PL_PCDU_ID, + mode=Modes.NORMAL, + submode=submode_mask_to_submode(NormalSubmodesMask.TX_ON), + ), + ) + mpa_on = PusTelecommand( + service=200, + subservice=Subservices.TC_MODE_COMMAND, + app_data=pack_mode_data( + object_id=PL_PCDU_ID, + mode=Modes.NORMAL, + submode=submode_mask_to_submode(NormalSubmodesMask.MPA_ON), + ), + ) + hpa_on = PusTelecommand( + service=200, + subservice=Subservices.TC_MODE_COMMAND, + app_data=pack_mode_data( + object_id=PL_PCDU_ID, + mode=Modes.NORMAL, + submode=submode_mask_to_submode(NormalSubmodesMask.HPA_ON), + ), + ) + current_time = time.time() + + enb_sched = generate_enable_tc_sched_cmd(ssc=0) + + sched_time = current_time + 10 + tc_queue.appendleft(enb_sched.pack_command_tuple()) + tagged_on_cmd = generate_time_tagged_cmd( + release_time=struct.pack("!I", sched_time), + tc_to_insert=pl_pcdu_on, + ssc=1, + ) + tc_queue.appendleft(tagged_on_cmd.pack_command_tuple()) + + sched_time += 5 + tagged_ssr_cmd = generate_time_tagged_cmd( + release_time=struct.pack("!I", sched_time), + tc_to_insert=ssr_on, + ssc=2, + ) + tc_queue.appendleft(tagged_ssr_cmd.pack_command_tuple()) + + sched_time += 5 + tagged_dro_cmd = generate_time_tagged_cmd( + release_time=struct.pack("!I", sched_time), tc_to_insert=dro_on, ssc=3 + ) + tc_queue.appendleft(tagged_dro_cmd.pack_command_tuple()) + + sched_time += delay_dro_to_x8 + tagged_x8_cmd = generate_time_tagged_cmd( + release_time=struct.pack("!I", sched_time), tc_to_insert=x8_on, ssc=4 + ) + tc_queue.appendleft(tagged_x8_cmd.pack_command_tuple()) + + sched_time += 5 + tagged_tx_cmd = generate_time_tagged_cmd( + release_time=struct.pack("!I", sched_time), tc_to_insert=tx_on, ssc=5 + ) + tc_queue.appendleft(tagged_tx_cmd.pack_command_tuple()) + + sched_time += 5 + tagged_mpa_cmd = generate_time_tagged_cmd( + release_time=struct.pack("!I", sched_time), tc_to_insert=mpa_on, ssc=6 + ) + tc_queue.appendleft(tagged_mpa_cmd.pack_command_tuple()) + + sched_time += 5 + tagged_hpa_cmd = generate_time_tagged_cmd( + release_time=struct.pack("!I", sched_time), tc_to_insert=hpa_on, ssc=7 + ) + tc_queue.appendleft(tagged_hpa_cmd.pack_command_tuple()) + + def request_wait_time() -> Optional[float]: while True: wait_time = input("Please enter DRO to X8 wait time in seconds, x to cancel: ") @@ -199,6 +391,45 @@ def request_wait_time() -> Optional[float]: return wait_time +def submode_mask_to_submode(on_tgt: NormalSubmodesMask) -> int: + if on_tgt == NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON: + return 1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON + if on_tgt == NormalSubmodesMask.DRO_ON: + return 1 << NormalSubmodesMask.DRO_ON | ( + 1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON + ) + if on_tgt == NormalSubmodesMask.X8_ON: + return ( + 1 << NormalSubmodesMask.DRO_ON + | (1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON) + | (1 << NormalSubmodesMask.X8_ON) + ) + if on_tgt == NormalSubmodesMask.TX_ON: + return ( + 1 << NormalSubmodesMask.DRO_ON + | (1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON) + | (1 << NormalSubmodesMask.X8_ON) + | (1 << NormalSubmodesMask.TX_ON) + ) + if on_tgt == NormalSubmodesMask.MPA_ON: + return ( + 1 << NormalSubmodesMask.DRO_ON + | (1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON) + | (1 << NormalSubmodesMask.X8_ON) + | (1 << NormalSubmodesMask.TX_ON) + | (1 << NormalSubmodesMask.MPA_ON) + ) + if on_tgt == NormalSubmodesMask.HPA_ON: + return ( + 1 << NormalSubmodesMask.DRO_ON + | (1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON) + | (1 << NormalSubmodesMask.X8_ON) + | (1 << NormalSubmodesMask.TX_ON) + | (1 << NormalSubmodesMask.MPA_ON) + | (1 << NormalSubmodesMask.HPA_ON) + ) + + def pack_wait_time_cmd(tc_queue: TcQueueT, param_id: int, print_str: str): wait_time = request_wait_time() tc_queue.appendleft( diff --git a/pus_tc/system/core.py b/pus_tc/system/core.py index ece17c2..f44ad87 100644 --- a/pus_tc/system/core.py +++ b/pus_tc/system/core.py @@ -133,7 +133,9 @@ def pack_core_commands(tc_queue: TcQueueT, op_code: str): ) if op_code in OpCodes.REBOOT_FULL: tc_queue.appendleft((QueueCommands.PRINT, f"Core Command: {Info.REBOOT_FULL}")) - cmd = generate_action_command(object_id=CORE_CONTROLLER_ID, action_id=ActionIds.FULL_REBOOT) + cmd = generate_action_command( + object_id=CORE_CONTROLLER_ID, action_id=ActionIds.FULL_REBOOT + ) tc_queue.appendleft(cmd.pack_command_tuple()) if op_code in OpCodes.XSC_REBOOT_SELF: perform_reboot_cmd(tc_queue=tc_queue, reboot_self=True) diff --git a/pus_tc/system/proc.py b/pus_tc/system/proc.py index edd2c29..c2cb707 100644 --- a/pus_tc/system/proc.py +++ b/pus_tc/system/proc.py @@ -1,6 +1,8 @@ from tmtccmd.config import QueueCommands from tmtccmd.tc.definitions import TcQueueT from tmtccmd.tc.pus_3_fsfw_hk import * + +""" from config.object_ids import ( BPX_HANDLER_ID, P60_DOCK_HANDLER, @@ -9,10 +11,25 @@ from config.object_ids import ( ACU_HANDLER_ID, CORE_CONTROLLER_ID, ) +""" import config.object_ids as oids from pus_tc.devs.bpx_batt import BpxSetIds from pus_tc.system.core import SetIds as CoreSetIds from gomspace.gomspace_common import SetIds as GsSetIds +from pus_tc.devs.rad_sensor import SetIds as RadSetIds +from pus_tc.devs.mgms import MgmLis3SetIds as MgmSetIds_0_2 +from pus_tc.devs.mgms import MgmRm3100SetIds as MgmSetIds_1_3 +from pus_tc.devs.gyros import AdisGyroSetIds as GyroSetIds_0_2 +from pus_tc.devs.gyros import L3gGyroSetIds as GyroSetIds_1_3 +from pus_tc.devs.gps import Gnss0SetIds as GnssSetIds_0 +from pus_tc.devs.gps import Gnss1SetIds as GnssSetIds_1 +from pus_tc.devs.imtq import ImtqSetIds + + +from pus_tc.system.tcs import pack_tcs_sys_commands +from pus_tc.system.controllers import pack_controller_commands +from pus_tc.system.acs import pack_acs_command +from pus_tc.devs.imtq import pack_dipole_command class OpCodes: @@ -20,68 +37,235 @@ class OpCodes: BAT_FT = ["bat-ft"] CORE_FT = ["core-ft"] PCDU_FT = ["pcdu-ft"] + RAD_SEN_FT = ["rad-sen-ft"] + TCS_FT_ON = ["tcs-ft-on"] + TCS_FT_OFF = ["tcs-ft-off"] + ACS_FT = ["acs-ft"] + MGT_FT = ["mgt-ft"] + MGT_FT_DP = ["mgt-ft-dp"] class KeyAndInfo: HEATER = ["Heater", "heater procedure"] BAT_FT = ["BPX Battery", "battery functional test"] - PCDU_FT = ["PCDU", "PCDU functional test"] CORE_FT = ["OBC", "OBC functional test"] + PCDU_FT = ["PCDU", "PCDU functional test"] + RAD_SEN_FT = ["Radiation Sensor", "Radiation Sensor functional test"] + TCS_FT_ON = ["TCS Act.", "TCS functional test activation"] + TCS_FT_OFF = ["TCS Deact.", "TCS functional test deactivation"] + ACS_FT = ["ACS", "ACS functional test"] + MGT_FT = ["MGT", "MGT functional test"] + MGT_FT_DP = ["MGT dipole", "MGT functional test with dipole"] KAI = KeyAndInfo PROC_INFO_DICT = { KAI.BAT_FT[0]: [OpCodes.BAT_FT, KAI.BAT_FT[1], 120.0, 10.0], - KAI.PCDU_FT[0]: [OpCodes.PCDU_FT, KeyAndInfo.PCDU_FT[1], 120.0, 10.0], - KAI.CORE_FT[0]: [OpCodes.CORE_FT, KeyAndInfo.CORE_FT[1], 120.0, 10.0], + KAI.CORE_FT[0]: [OpCodes.CORE_FT, KAI.CORE_FT[1], 120.0, 10.0], + KAI.PCDU_FT[0]: [OpCodes.PCDU_FT, KAI.PCDU_FT[1], 120.0, 10.0], + KAI.RAD_SEN_FT[0]: [OpCodes.RAD_SEN_FT, KAI.RAD_SEN_FT[1], 120.0, 10.0], + KAI.TCS_FT_ON[0]: [OpCodes.TCS_FT_ON, KAI.TCS_FT_ON[1], 120.0, 10.0], + KAI.TCS_FT_OFF[0]: [OpCodes.TCS_FT_OFF, KAI.TCS_FT_OFF[1], 120.0, 10.0], + KAI.ACS_FT[0]: [OpCodes.ACS_FT, KAI.ACS_FT[1], 120.0, 10.0], + KAI.MGT_FT[0]: [OpCodes.MGT_FT, KAI.MGT_FT[1], 120.0, 10.0], + KAI.MGT_FT_DP[0]: [OpCodes.MGT_FT_DP, KAI.MGT_FT_DP[1], 120.0, 10.0], + # collection_time for KAI.MGT_FT_DP maybe be reduced as a full 120seconds is not needed after MGTs are tested } -def generic_print(tc_queue: TcQueueT, key: str): - info = PROC_INFO_DICT[key] +def generic_print(tc_queue: TcQueueT, info: dict): tc_queue.appendleft( - (QueueCommands.PRINT, f"Executing {info[0]} Procedure (OpCodes: {info[1]})") + (QueueCommands.PRINT, f"Executing {info[1]} Procedure (OpCodes: {info[0]})") ) def pack_generic_hk_listening_cmds( - tc_queue: TcQueueT, proc_key: str, sid: bytes, diag: bool + tc_queue: TcQueueT, + proc_key: str, + sid_list: list[bytearray], + diag: bool, + mgt: bool, ): - generic_print(tc_queue=tc_queue, key=proc_key) - listen_to_hk_for_x_seconds( - diag=diag, - tc_queue=tc_queue, - device=proc_key, - sid=sid, - interval_seconds=10.0, - collection_time=120.0, - ) + info = PROC_INFO_DICT[proc_key] + collection_time = info[2] + generic_print(tc_queue=tc_queue, info=info) + + for sid in sid_list: + enable_listen_to_hk_for_x_seconds( + diag=diag, + tc_queue=tc_queue, + device=proc_key, + sid=sid, + interval_seconds=info[3], + ) + + if mgt is True: + activate_mgts_alternately( + tc_queue=tc_queue, + ) + + else: + pass + + tc_queue.appendleft((QueueCommands.WAIT, collection_time)) + + for sid in sid_list: + disable_listen_to_hk_for_x_seconds( + diag=diag, + tc_queue=tc_queue, + device=proc_key, + sid=sid, + ) + + sid_list.clear() def pack_proc_commands(tc_queue: TcQueueT, op_code: str): + sid_list = [] if op_code in OpCodes.BAT_FT: key = KAI.BAT_FT[0] - sid = make_sid(BPX_HANDLER_ID, BpxSetIds.GET_HK_SET) + sid_list.append(make_sid(oids.BPX_HANDLER_ID, BpxSetIds.GET_HK_SET)) pack_generic_hk_listening_cmds( - tc_queue=tc_queue, proc_key=key, sid=sid, diag=False - ) - - if op_code in OpCodes.PCDU_FT: - key = KAI.PCDU_FT[0] - sid = make_sid(P60_DOCK_HANDLER, GsSetIds.P60_CORE) - pack_generic_hk_listening_cmds( - tc_queue=tc_queue, proc_key=key, sid=sid, diag=False + tc_queue=tc_queue, proc_key=key, sid_list=sid_list, diag=False, mgt=False ) if op_code in OpCodes.CORE_FT: key = KAI.CORE_FT[0] - sid = make_sid(oids.CORE_CONTROLLER_ID, CoreSetIds.HK) + sid_list.append(make_sid(oids.CORE_CONTROLLER_ID, CoreSetIds.HK)) pack_generic_hk_listening_cmds( - tc_queue=tc_queue, proc_key=key, sid=sid, diag=False + tc_queue=tc_queue, proc_key=key, sid_list=sid_list, diag=False, mgt=False ) + if op_code in OpCodes.PCDU_FT: + key = KAI.PCDU_FT[0] + sid_list.append(make_sid(oids.P60_DOCK_HANDLER, GsSetIds.P60_CORE)) + sid_list.append(make_sid(oids.PDU_1_HANDLER_ID, GsSetIds.PDU_1_CORE)) + sid_list.append(make_sid(oids.PDU_2_HANDLER_ID, GsSetIds.PDU_2_CORE)) + sid_list.append(make_sid(oids.ACU_HANDLER_ID, GsSetIds.ACU_CORE)) + sid_list.append(make_sid(oids.P60_DOCK_HANDLER, GsSetIds.P60_AUX)) + sid_list.append(make_sid(oids.PDU_1_HANDLER_ID, GsSetIds.PDU_1_AUX)) + sid_list.append(make_sid(oids.PDU_2_HANDLER_ID, GsSetIds.PDU_2_AUX)) + sid_list.append(make_sid(oids.ACU_HANDLER_ID, GsSetIds.ACU_AUX)) + pack_generic_hk_listening_cmds( + tc_queue=tc_queue, proc_key=key, sid_list=sid_list, diag=False, mgt=False + ) + if op_code in OpCodes.RAD_SEN_FT: + key = KAI.RAD_SEN_FT[0] + sid_list.append(make_sid(oids.RAD_SENSOR_ID, RadSetIds.RAD_SEN_CORE)) + pack_generic_hk_listening_cmds( + tc_queue=tc_queue, proc_key=key, sid_list=sid_list, diag=False, mgt=False + ) + + if op_code in OpCodes.TCS_FT_ON: + # check whether tcs_assembly also has to be commanded to NORMAL Mode + # pack_tcs_sys_commands(tc_queue=tc_queue, op_code="tcs-normal") + pack_controller_commands(tc_queue=tc_queue, op_code="thermal_controller") + + if op_code in OpCodes.TCS_FT_OFF: + # check whether tcs_assembly also has to be commanded to OFF Mode + # pack_tcs_sys_commands(tc_queue=tc_queue, op_code="tcs-off") + pack_controller_commands(tc_queue=tc_queue, op_code="thermal_controller") + + if op_code in OpCodes.ACS_FT: + key = KAI.ACS_FT[0] + + pack_acs_command(tc_queue=tc_queue, op_code="acs-a") + + # MGMs + sid_list.append(make_sid(oids.MGM_0_LIS3_HANDLER_ID, MgmSetIds_0_2.CORE_HK)) + sid_list.append(make_sid(oids.MGM_1_RM3100_HANDLER_ID, MgmSetIds_1_3.CORE_HK)) + # Gyros + sid_list.append(make_sid(oids.GYRO_0_ADIS_HANDLER_ID, GyroSetIds_0_2.CORE_HK)) + sid_list.append(make_sid(oids.GYRO_0_ADIS_HANDLER_ID, GyroSetIds_0_2.CFG_HK)) + sid_list.append(make_sid(oids.GYRO_1_L3G_HANDLER_ID, GyroSetIds_1_3.CORE_HK)) + # GNSS0 + sid_list.append(make_sid(oids.GPS_HANDLER_0_ID, GnssSetIds_0.CORE_HK)) + pack_generic_hk_listening_cmds( + tc_queue=tc_queue, proc_key=key, sid_list=sid_list, diag=False, mgt=False + ) + + pack_acs_command(tc_queue=tc_queue, op_code="acs-off") + tc_queue.appendleft((QueueCommands.WAIT, 5.0)) + pack_acs_command(tc_queue=tc_queue, op_code="acs-b") + + # MGMs + sid_list.append(make_sid(oids.MGM_2_LIS3_HANDLER_ID, MgmSetIds_0_2.CORE_HK)) + sid_list.append(make_sid(oids.MGM_3_RM3100_HANDLER_ID, MgmSetIds_1_3.CORE_HK)) + # Gyros + sid_list.append(make_sid(oids.GYRO_2_ADIS_HANDLER_ID, GyroSetIds_0_2.CORE_HK)) + sid_list.append(make_sid(oids.GYRO_2_ADIS_HANDLER_ID, GyroSetIds_0_2.CFG_HK)) + sid_list.append(make_sid(oids.GYRO_3_L3G_HANDLER_ID, GyroSetIds_1_3.CORE_HK)) + # GNSS1 + sid_list.append(make_sid(oids.GPS_HANDLER_1_ID, GnssSetIds_1.CORE_HK)) + pack_generic_hk_listening_cmds( + tc_queue=tc_queue, proc_key=key, sid_list=sid_list, diag=False, mgt=False + ) + + pack_acs_command(tc_queue=tc_queue, op_code="acs-off") + tc_queue.appendleft((QueueCommands.WAIT, 5.0)) + pack_acs_command(tc_queue=tc_queue, op_code="acs-d") + + # MGMs + sid_list.append(make_sid(oids.MGM_0_LIS3_HANDLER_ID, MgmSetIds_0_2.CORE_HK)) + sid_list.append(make_sid(oids.MGM_1_RM3100_HANDLER_ID, MgmSetIds_1_3.CORE_HK)) + sid_list.append(make_sid(oids.MGM_2_LIS3_HANDLER_ID, MgmSetIds_0_2.CORE_HK)) + sid_list.append(make_sid(oids.MGM_3_RM3100_HANDLER_ID, MgmSetIds_1_3.CORE_HK)) + # Gyros + sid_list.append(make_sid(oids.GYRO_0_ADIS_HANDLER_ID, GyroSetIds_0_2.CORE_HK)) + sid_list.append(make_sid(oids.GYRO_0_ADIS_HANDLER_ID, GyroSetIds_0_2.CFG_HK)) + sid_list.append(make_sid(oids.GYRO_1_L3G_HANDLER_ID, GyroSetIds_1_3.CORE_HK)) + sid_list.append(make_sid(oids.GYRO_2_ADIS_HANDLER_ID, GyroSetIds_0_2.CORE_HK)) + sid_list.append(make_sid(oids.GYRO_2_ADIS_HANDLER_ID, GyroSetIds_0_2.CFG_HK)) + sid_list.append(make_sid(oids.GYRO_3_L3G_HANDLER_ID, GyroSetIds_1_3.CORE_HK)) + # GNSS0+1 + sid_list.append(make_sid(oids.GPS_HANDLER_0_ID, GnssSetIds_0.CORE_HK)) + sid_list.append(make_sid(oids.GPS_HANDLER_1_ID, GnssSetIds_1.CORE_HK)) + pack_generic_hk_listening_cmds( + tc_queue=tc_queue, proc_key=key, sid_list=sid_list, diag=False, mgt=False + ) + + pack_acs_command(tc_queue=tc_queue, op_code="acs-off") + + if op_code in OpCodes.MGT_FT: + key = KAI.MGT_FT[0] + + # missing imtq to ON + # mgt 1: imtq und hk + + # MISSING: imtq board activation to ON (implementation pending by Jakob) + + sid_list.append(make_sid(oids.IMTQ_HANDLER_ID, ImtqSetIds.ENG_HK_SET)) + sid_list.append(make_sid(oids.IMTQ_HANDLER_ID, ImtqSetIds.CAL_MTM_SET)) + sid_list.append(make_sid(oids.IMTQ_HANDLER_ID, ImtqSetIds.RAW_MTM_SET)) + pack_generic_hk_listening_cmds( + tc_queue=tc_queue, proc_key=key, sid_list=sid_list, diag=False, mgt=False + ) + + # MISSING: imtq board deactivation to OFF (implementation pending by Jakob) + + # missing imtq to ON + # mgt 2.: imtq + dual side + dipole + + if op_code in OpCodes.MGT_FT_DP: + key = KAI.MGT_FT_DP[0] + pack_acs_command(tc_queue=tc_queue, op_code="acs-d") + + # MISSING: imtq board activation to ON (implementation pending by Jakob) + + sid_list.append(make_sid(oids.IMTQ_HANDLER_ID, ImtqSetIds.ENG_HK_SET)) + sid_list.append(make_sid(oids.IMTQ_HANDLER_ID, ImtqSetIds.CAL_MTM_SET)) + sid_list.append(make_sid(oids.IMTQ_HANDLER_ID, ImtqSetIds.RAW_MTM_SET)) + pack_generic_hk_listening_cmds( + tc_queue=tc_queue, proc_key=key, sid_list=sid_list, diag=False, mgt=True + ) + + # MISSING: imtq board deactivation to OFF (implementation pending by Jakob) + pack_acs_command(tc_queue=tc_queue, op_code="acs-off") + + +""" def listen_to_hk_for_x_seconds( tc_queue: TcQueueT, diag: bool, @@ -90,20 +274,6 @@ def listen_to_hk_for_x_seconds( interval_seconds: float, collection_time: float, ): - """ - enable_periodic_hk_command_with_interval( - diag: bool, sid: bytes, interval_seconds: float, ssc: int - """ - """ - function with periodic HK generation - interval_seconds = at which rate HK is saved - collection_time = how long the HK is saved for - device = for which device the HK is saved - functional_test = - diagnostic Hk = yes diagnostic or no diagnostic - sid = structural ID for specific device - device Hk set ID - """ tc_queue.appendleft((QueueCommands.PRINT, f"Enabling periodic HK for {device}")) cmd_tuple = enable_periodic_hk_command_with_interval( @@ -118,3 +288,98 @@ def listen_to_hk_for_x_seconds( tc_queue.appendleft( disable_periodic_hk_command(diag=diag, sid=sid, ssc=0).pack_command_tuple() ) +""" + + +def enable_listen_to_hk_for_x_seconds( + tc_queue: TcQueueT, + diag: bool, + device: str, + sid: bytes, + interval_seconds: float, +): + tc_queue.appendleft((QueueCommands.PRINT, f"Enabling periodic HK for {device}")) + cmd_tuple = enable_periodic_hk_command_with_interval( + diag=diag, sid=sid, interval_seconds=interval_seconds, ssc=0 + ) + for cmd in cmd_tuple: + tc_queue.appendleft(cmd.pack_command_tuple()) + + tc_queue.appendleft((QueueCommands.WAIT, 2.0)) + + +def disable_listen_to_hk_for_x_seconds( + tc_queue: TcQueueT, + diag: bool, + device: str, + sid: bytes, +): + tc_queue.appendleft((QueueCommands.PRINT, f"Disabling periodic HK for {device}")) + tc_queue.appendleft( + disable_periodic_hk_command(diag=diag, sid=sid, ssc=0).pack_command_tuple() + ) + + +def activate_mgts_alternately( + tc_queue: TcQueueT, +): + + command = pack_dipole_command( + object_id=oids.IMTQ_HANDLER_ID, + x_dipole=2000, + y_dipole=0, + z_dipole=0, + duration=30000, + ) + tc_queue.appendleft(command.pack_command_tuple()) + tc_queue.appendleft((QueueCommands.WAIT, 10.0)) + + command = pack_dipole_command( + object_id=oids.IMTQ_HANDLER_ID, + x_dipole=-2000, + y_dipole=0, + z_dipole=0, + duration=30000, + ) + tc_queue.appendleft(command.pack_command_tuple()) + tc_queue.appendleft((QueueCommands.WAIT, 10.0)) + + command = pack_dipole_command( + object_id=oids.IMTQ_HANDLER_ID, + x_dipole=0, + y_dipole=2000, + z_dipole=0, + duration=30000, + ) + tc_queue.appendleft(command.pack_command_tuple()) + tc_queue.appendleft((QueueCommands.WAIT, 10.0)) + + command = pack_dipole_command( + object_id=oids.IMTQ_HANDLER_ID, + x_dipole=0, + y_dipole=-2000, + z_dipole=0, + duration=30000, + ) + tc_queue.appendleft(command.pack_command_tuple()) + tc_queue.appendleft((QueueCommands.WAIT, 10.0)) + + command = pack_dipole_command( + object_id=oids.IMTQ_HANDLER_ID, + x_dipole=0, + y_dipole=0, + z_dipole=2000, + duration=30000, + ) + tc_queue.appendleft(command.pack_command_tuple()) + tc_queue.appendleft((QueueCommands.WAIT, 10.0)) + + command = pack_dipole_command( + object_id=oids.IMTQ_HANDLER_ID, + x_dipole=0, + y_dipole=0, + z_dipole=-2000, + duration=30000, + ) + tc_queue.appendleft(command.pack_command_tuple()) + tc_queue.appendleft((QueueCommands.WAIT, 10.0)) diff --git a/pus_tm/devs/imtq_mgt.py b/pus_tm/devs/imtq_mgt.py index 8ed68c6..6277f2d 100644 --- a/pus_tm/devs/imtq_mgt.py +++ b/pus_tm/devs/imtq_mgt.py @@ -4,6 +4,104 @@ from pus_tm.defs import PrintWrapper from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter +def handle_eng_set(printer: FsfwTmTcPrinter, hk_data: bytes): + pw = PrintWrapper(printer) + header_list = [ + "Digital Voltage [mV]", + "Analog Voltage [mV]", + "Digital Current [mA]", + "Analog Current [mA]", + "Coil Current X [mA]", + "Coil Current Y [mA]", + "Coil Current Z [mA]", + "Coil X Temperature [°C]", + "Coil Y Temperature [°C]", + "Coil Z Temperature [°C]", + "Coil Z Temperature [°C]", + "MCU Temperature [°C]", + ] + digital_voltage = struct.unpack("!H", hk_data[0:2])[0] + analog_voltage = struct.unpack("!H", hk_data[2:4])[0] + digital_current = struct.unpack("!f", hk_data[4:8])[0] + analog_current = struct.unpack("!f", hk_data[8:12])[0] + coil_x_current = struct.unpack("!f", hk_data[12:16])[0] + coil_y_current = struct.unpack("!f", hk_data[16:20])[0] + coil_z_current = struct.unpack("!f", hk_data[20:24])[0] + coil_x_temperature = struct.unpack("!H", hk_data[24:26])[0] + coil_y_temperature = struct.unpack("!H", hk_data[26:28])[0] + coil_z_temperature = struct.unpack("!H", hk_data[30:32])[0] + mcu_temperature = struct.unpack("!H", hk_data[32:34])[0] + + validity_buffer = hk_data[42:] + content_list = [ + digital_voltage, + analog_voltage, + digital_current, + analog_current, + coil_x_current, + coil_y_current, + coil_z_current, + coil_x_temperature, + coil_y_temperature, + coil_z_temperature, + mcu_temperature + ] + num_of_vars = len(header_list) + pw.dlog(str(header_list)) + pw.dlog(str(content_list)) + printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=num_of_vars) + + +def handle_calibrated_mtm_measurement(printer: FsfwTmTcPrinter, hk_data: bytes): + pw = PrintWrapper(printer) + header_list = [ + "Calibrated MTM X [nT]", + "Calibrated MTM Y [nT]", + "Calibrated MTM Z [nT]", + "Coild actuation status" + ] + mtm_x = struct.unpack("!I", hk_data[0:4])[0] + mtm_y = struct.unpack("!I", hk_data[4:8])[0] + mtm_z = struct.unpack("!I", hk_data[8:12])[0] + coil_actuation_status = hk_data[12] + validity_buffer = hk_data[12:] + content_list = [ + mtm_x, + mtm_y, + mtm_z, + coil_actuation_status + ] + num_of_vars = len(header_list) + pw.dlog(str(header_list)) + pw.dlog(str(content_list)) + printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=num_of_vars) + + +def handle_raw_mtm_measurement(printer: FsfwTmTcPrinter, hk_data: bytes): + pw = PrintWrapper(printer) + header_list = [ + "Raw MTM X [nT]", + "Raw MTM Y [nT]", + "Raw MTM Z [nT]", + "Coild actuation status" + ] + mtm_x = struct.unpack("!f", hk_data[0:4])[0] + mtm_y = struct.unpack("!f", hk_data[4:8])[0] + mtm_z = struct.unpack("!f", hk_data[8:12])[0] + coil_actuation_status = hk_data[12] + validity_buffer = hk_data[12:] + content_list = [ + mtm_x, + mtm_y, + mtm_z, + coil_actuation_status + ] + num_of_vars = len(header_list) + pw.dlog(str(header_list)) + pw.dlog(str(content_list)) + printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=num_of_vars) + + def handle_self_test_data(printer: FsfwTmTcPrinter, hk_data: bytes): pw = PrintWrapper(printer) header_list = [ diff --git a/pus_tm/hk_handling.py b/pus_tm/hk_handling.py index a468aaf..a6b9ebc 100644 --- a/pus_tm/hk_handling.py +++ b/pus_tm/hk_handling.py @@ -15,7 +15,8 @@ from tmtccmd.logging import get_console_logger from pus_tm.devs.bpx_bat import handle_bpx_hk_data from pus_tm.devs.gps import handle_gps_data from pus_tm.devs.gyros import handle_gyros_hk_data -from pus_tm.devs.imtq_mgt import handle_self_test_data +from pus_tm.devs.imtq_mgt import handle_self_test_data, handle_eng_set, handle_calibrated_mtm_measurement, \ + handle_raw_mtm_measurement from pus_tm.devs.pcdu import handle_pdu_data, handle_p60_hk_data, handle_acu_hk_data from pus_tm.devs.syrlinks import handle_syrlinks_hk_data from pus_tc.devs.imtq import ImtqSetIds @@ -82,13 +83,19 @@ def handle_regular_hk_print( set_id <= ImtqSetIds.NEGATIVE_Z_TEST ): return handle_self_test_data(printer, hk_data) + elif set_id == ImtqSetIds.ENG_HK_SET: + return handle_eng_set(printer, hk_data) + elif set_id == ImtqSetIds.CAL_MTM_SET: + return handle_calibrated_mtm_measurement(printer, hk_data) + elif set_id == ImtqSetIds.RAW_MTM_SET: + return handle_raw_mtm_measurement(printer, hk_data) else: - LOGGER.info("Service 3 TM: Syrlinks handler reply with unknown set id") + LOGGER.info("Service 3 TM: IMTQ handler reply with unknown set id") elif objb == obj_ids.GPS_CONTROLLER: return handle_gps_data(printer=printer, hk_data=hk_data) - elif objb == obj_ids.BPX_HANDLER_ID: - return handle_bpx_hk_data(hk_data=hk_data, set_id=set_id, printer=printer) - elif objb == obj_ids.CORE_CONTROLLER_ID: + if objb == obj_ids.BPX_HANDLER_ID: + handle_bpx_hk_data(hk_data=hk_data, set_id=set_id, printer=printer) + if objb == obj_ids.CORE_CONTROLLER_ID: return handle_core_hk_data(printer=printer, hk_data=hk_data, set_id=set_id) elif objb == obj_ids.PDU_1_HANDLER_ID: return handle_pdu_data( diff --git a/spacepackets b/spacepackets index d0c3f4a..9ee7922 160000 --- a/spacepackets +++ b/spacepackets @@ -1 +1 @@ -Subproject commit d0c3f4a802c3cddc5be2919af763b08fe6a6b05c +Subproject commit 9ee7922bf7b7a678f8e5ebd5926001defac9a3d4