diff --git a/pus_tc/cmd_definitions.py b/pus_tc/cmd_definitions.py index 42daa1c..31ab58d 100644 --- a/pus_tc/cmd_definitions.py +++ b/pus_tc/cmd_definitions.py @@ -345,15 +345,20 @@ def add_time_cmds(cmd_dict: ServiceOpCodeDictT): def add_imtq_cmds(cmd_dict: ServiceOpCodeDictT): op_code_dict_srv_imtq = { - "0": ("IMTQ Tests All", {OpCodeDictKeys.TIMEOUT: 2.0}), - "1": ("IMTQ perform pos X self test", {OpCodeDictKeys.TIMEOUT: 2.0}), - "2": ("IMTQ perform neg X self test", {OpCodeDictKeys.TIMEOUT: 2.0}), - "3": ("IMTQ perform pos Y self test", {OpCodeDictKeys.TIMEOUT: 2.0}), - "4": ("IMTQ perform neg Y self test", {OpCodeDictKeys.TIMEOUT: 2.0}), - "5": ("IMTQ perform pos Z self test", {OpCodeDictKeys.TIMEOUT: 2.0}), - "6": ("IMTQ perform neg Z self test", {OpCodeDictKeys.TIMEOUT: 2.0}), - "7": ("IMTQ command dipole", {OpCodeDictKeys.TIMEOUT: 2.0}), - "8": ("IMTQ get commanded dipole", {OpCodeDictKeys.TIMEOUT: 2.0}), + "0": ("Mode off", {OpCodeDictKeys.TIMEOUT: 2.0}), + "1": ("Mode on", {OpCodeDictKeys.TIMEOUT: 2.0}), + "2": ("Mode normal", {OpCodeDictKeys.TIMEOUT: 2.0}), + "3": ("IMTQ perform pos X self test", {OpCodeDictKeys.TIMEOUT: 2.0}), + "4": ("IMTQ perform neg X self test", {OpCodeDictKeys.TIMEOUT: 2.0}), + "5": ("IMTQ perform pos Y self test", {OpCodeDictKeys.TIMEOUT: 2.0}), + "6": ("IMTQ perform neg Y self test", {OpCodeDictKeys.TIMEOUT: 2.0}), + "7": ("IMTQ perform pos Z self test", {OpCodeDictKeys.TIMEOUT: 2.0}), + "8": ("IMTQ perform neg Z self test", {OpCodeDictKeys.TIMEOUT: 2.0}), + "9": ("IMTQ command dipole", {OpCodeDictKeys.TIMEOUT: 2.0}), + "10": ("IMTQ get commanded dipole", {OpCodeDictKeys.TIMEOUT: 2.0}), + "11": ("IMTQ get engineering hk set", {OpCodeDictKeys.TIMEOUT: 2.0}), + "12": ("IMTQ get calibrated MTM measurement one shot", {OpCodeDictKeys.TIMEOUT: 2.0}), + "13": ("IMTQ get raw MTM measurement one shot", {OpCodeDictKeys.TIMEOUT: 2.0}), } service_imtq_tuple = ("IMTQ Device", op_code_dict_srv_imtq) cmd_dict[CustomServiceList.IMTQ.value] = service_imtq_tuple diff --git a/pus_tc/devs/imtq.py b/pus_tc/devs/imtq.py index 3784167..754c8a6 100644 --- a/pus_tc/devs/imtq.py +++ b/pus_tc/devs/imtq.py @@ -9,7 +9,8 @@ from tmtccmd.config.definitions import QueueCommands from tmtccmd.tc.packer import TcQueueT from spacepackets.ecss.tc import PusTelecommand -from tmtccmd.tc.pus_3_fsfw_hk import make_sid, generate_one_hk_command +from tmtccmd.tc.pus_3_fsfw_hk import make_sid, generate_one_diag_command, generate_one_hk_command +from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes class ImtqSetIds: @@ -48,7 +49,22 @@ def pack_imtq_test_into( ) ) - if op_code == "0" or op_code == "1": + if op_code == "0": + tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Set mode off")) + command = pack_mode_data(object_id, Modes.OFF, 0) + command = PusTelecommand(service=200, subservice=1, ssc=9, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + if op_code == "1": + tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Set mode on")) + command = pack_mode_data(object_id, Modes.ON, 0) + command = PusTelecommand(service=200, subservice=1, ssc=10, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + if op_code == "2": + tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Mode Normal")) + command = pack_mode_data(object_id, Modes.NORMAL, 0) + command = PusTelecommand(service=200, subservice=1, ssc=11, app_data=command) + tc_queue.appendleft(command.pack_command_tuple()) + if op_code == "3": tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Perform positive x self test")) command = object_id + ImtqActionIds.perform_positive_x_test command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command) @@ -74,7 +90,7 @@ def pack_imtq_test_into( command = generate_one_hk_command(sid, 24) tc_queue.appendleft(command.pack_command_tuple()) - if op_code == "0" or op_code == "2": + if op_code == "4": tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Perform negative x self test")) command = object_id + ImtqActionIds.perform_negative_x_test command = PusTelecommand(service=8, subservice=128, ssc=25, app_data=command) @@ -100,7 +116,7 @@ def pack_imtq_test_into( command = generate_one_hk_command(sid, 27) tc_queue.appendleft(command.pack_command_tuple()) - if op_code == "0" or op_code == "3": + if op_code == "5": tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Perform positive y self test")) command = object_id + ImtqActionIds.perform_positive_y_test command = PusTelecommand(service=8, subservice=128, ssc=28, app_data=command) @@ -126,7 +142,7 @@ def pack_imtq_test_into( command = generate_one_hk_command(sid, 30) tc_queue.appendleft(command.pack_command_tuple()) - if op_code == "0" or op_code == "4": + if op_code == "6": tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Perform negative y self test")) command = object_id + ImtqActionIds.perform_negative_y_test command = PusTelecommand(service=8, subservice=128, ssc=31, app_data=command) @@ -152,7 +168,7 @@ def pack_imtq_test_into( command = generate_one_hk_command(sid, 33) tc_queue.appendleft(command.pack_command_tuple()) - if op_code == "0" or op_code == "5": + if op_code == "7": tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Perform positive z self test")) command = object_id + ImtqActionIds.perform_positive_z_test command = PusTelecommand(service=8, subservice=128, ssc=34, app_data=command) @@ -178,7 +194,7 @@ def pack_imtq_test_into( command = generate_one_hk_command(sid, 36) tc_queue.appendleft(command.pack_command_tuple()) - if op_code == "0" or op_code == "6": + if op_code == "8": tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Perform negative z self test")) command = object_id + ImtqActionIds.perform_negative_z_test command = PusTelecommand(service=8, subservice=128, ssc=35, app_data=command) @@ -204,7 +220,7 @@ def pack_imtq_test_into( command = generate_one_hk_command(sid, 37) tc_queue.appendleft(command.pack_command_tuple()) - if op_code == "0" or op_code == "7": + if op_code == "9": tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Commanding dipole")) x_dipole = 0 y_dipole = 0 @@ -213,12 +229,33 @@ def pack_imtq_test_into( command = pack_dipole_command(object_id, x_dipole, y_dipole, z_dipole, duration) tc_queue.appendleft(command.pack_command_tuple()) - if op_code == "0" or op_code == "8": + if op_code == "10": tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Get commanded dipole")) command = object_id + ImtqActionIds.get_commanded_dipole command = PusTelecommand(service=8, subservice=128, ssc=21, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) + if op_code == "11": + tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Get engineering hk set")) + command = generate_one_diag_command( + sid=make_sid(object_id=object_id, set_id=ImtqSetIds.ENG_HK_SET), ssc=0 + ) + tc_queue.appendleft(command.pack_command_tuple()) + + if op_code == "12": + tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Get calibrated MTM hk set")) + command = generate_one_diag_command( + sid=make_sid(object_id=object_id, set_id=ImtqSetIds.CAL_MTM_SET), ssc=0 + ) + tc_queue.appendleft(command.pack_command_tuple()) + + if op_code == "13": + tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Get raw MTM hk set")) + command = generate_one_diag_command( + sid=make_sid(object_id=object_id, set_id=ImtqSetIds.RAW_MTM_SET), ssc=0 + ) + tc_queue.appendleft(command.pack_command_tuple()) + return tc_queue diff --git a/pus_tc/devs/ploc_supervisor.py b/pus_tc/devs/ploc_supervisor.py index 8222cc7..45810d3 100644 --- a/pus_tc/devs/ploc_supervisor.py +++ b/pus_tc/devs/ploc_supervisor.py @@ -126,7 +126,7 @@ def pack_ploc_supv_commands( command = PusTelecommand(service=200, subservice=1, ssc=10, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) if op_code == "2": - tc_queue.appendleft((QueueCommands.PRINT, "Star tracker: Mode Normal")) + tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Mode Normal")) command = pack_mode_data(object_id, Modes.NORMAL, 0) command = PusTelecommand(service=200, subservice=1, ssc=11, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) diff --git a/pus_tm/devs/imtq_mgt.py b/pus_tm/devs/imtq_mgt.py index 8ed68c6..6277f2d 100644 --- a/pus_tm/devs/imtq_mgt.py +++ b/pus_tm/devs/imtq_mgt.py @@ -4,6 +4,104 @@ from pus_tm.defs import PrintWrapper from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter +def handle_eng_set(printer: FsfwTmTcPrinter, hk_data: bytes): + pw = PrintWrapper(printer) + header_list = [ + "Digital Voltage [mV]", + "Analog Voltage [mV]", + "Digital Current [mA]", + "Analog Current [mA]", + "Coil Current X [mA]", + "Coil Current Y [mA]", + "Coil Current Z [mA]", + "Coil X Temperature [°C]", + "Coil Y Temperature [°C]", + "Coil Z Temperature [°C]", + "Coil Z Temperature [°C]", + "MCU Temperature [°C]", + ] + digital_voltage = struct.unpack("!H", hk_data[0:2])[0] + analog_voltage = struct.unpack("!H", hk_data[2:4])[0] + digital_current = struct.unpack("!f", hk_data[4:8])[0] + analog_current = struct.unpack("!f", hk_data[8:12])[0] + coil_x_current = struct.unpack("!f", hk_data[12:16])[0] + coil_y_current = struct.unpack("!f", hk_data[16:20])[0] + coil_z_current = struct.unpack("!f", hk_data[20:24])[0] + coil_x_temperature = struct.unpack("!H", hk_data[24:26])[0] + coil_y_temperature = struct.unpack("!H", hk_data[26:28])[0] + coil_z_temperature = struct.unpack("!H", hk_data[30:32])[0] + mcu_temperature = struct.unpack("!H", hk_data[32:34])[0] + + validity_buffer = hk_data[42:] + content_list = [ + digital_voltage, + analog_voltage, + digital_current, + analog_current, + coil_x_current, + coil_y_current, + coil_z_current, + coil_x_temperature, + coil_y_temperature, + coil_z_temperature, + mcu_temperature + ] + num_of_vars = len(header_list) + pw.dlog(str(header_list)) + pw.dlog(str(content_list)) + printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=num_of_vars) + + +def handle_calibrated_mtm_measurement(printer: FsfwTmTcPrinter, hk_data: bytes): + pw = PrintWrapper(printer) + header_list = [ + "Calibrated MTM X [nT]", + "Calibrated MTM Y [nT]", + "Calibrated MTM Z [nT]", + "Coild actuation status" + ] + mtm_x = struct.unpack("!I", hk_data[0:4])[0] + mtm_y = struct.unpack("!I", hk_data[4:8])[0] + mtm_z = struct.unpack("!I", hk_data[8:12])[0] + coil_actuation_status = hk_data[12] + validity_buffer = hk_data[12:] + content_list = [ + mtm_x, + mtm_y, + mtm_z, + coil_actuation_status + ] + num_of_vars = len(header_list) + pw.dlog(str(header_list)) + pw.dlog(str(content_list)) + printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=num_of_vars) + + +def handle_raw_mtm_measurement(printer: FsfwTmTcPrinter, hk_data: bytes): + pw = PrintWrapper(printer) + header_list = [ + "Raw MTM X [nT]", + "Raw MTM Y [nT]", + "Raw MTM Z [nT]", + "Coild actuation status" + ] + mtm_x = struct.unpack("!f", hk_data[0:4])[0] + mtm_y = struct.unpack("!f", hk_data[4:8])[0] + mtm_z = struct.unpack("!f", hk_data[8:12])[0] + coil_actuation_status = hk_data[12] + validity_buffer = hk_data[12:] + content_list = [ + mtm_x, + mtm_y, + mtm_z, + coil_actuation_status + ] + num_of_vars = len(header_list) + pw.dlog(str(header_list)) + pw.dlog(str(content_list)) + printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=num_of_vars) + + def handle_self_test_data(printer: FsfwTmTcPrinter, hk_data: bytes): pw = PrintWrapper(printer) header_list = [ diff --git a/pus_tm/hk_handling.py b/pus_tm/hk_handling.py index 427017c..09ab28e 100644 --- a/pus_tm/hk_handling.py +++ b/pus_tm/hk_handling.py @@ -15,7 +15,8 @@ from tmtccmd.logging import get_console_logger from pus_tm.devs.bpx_bat import handle_bpx_hk_data from pus_tm.devs.gps import handle_gps_data from pus_tm.devs.gyros import handle_gyros_hk_data -from pus_tm.devs.imtq_mgt import handle_self_test_data +from pus_tm.devs.imtq_mgt import handle_self_test_data, handle_eng_set, handle_calibrated_mtm_measurement, \ + handle_raw_mtm_measurement from pus_tm.devs.pcdu import handle_pdu_data, handle_p60_hk_data, handle_acu_hk_data from pus_tm.devs.syrlinks import handle_syrlinks_hk_data from pus_tc.devs.imtq import ImtqSetIds @@ -82,8 +83,14 @@ def handle_regular_hk_print( set_id <= ImtqSetIds.NEGATIVE_Z_TEST ): return handle_self_test_data(printer, hk_data) + elif set_id == ImtqSetIds.ENG_HK_SET: + return handle_eng_set(printer, hk_data) + elif set_id == ImtqSetIds.CAL_MTM_SET: + return handle_calibrated_mtm_measurement(printer, hk_data) + elif set_id == ImtqSetIds.RAW_MTM_SET: + return handle_raw_mtm_measurement(printer, hk_data) else: - LOGGER.info("Service 3 TM: Syrlinks handler reply with unknown set id") + LOGGER.info("Service 3 TM: IMTQ handler reply with unknown set id") if objb == obj_ids.GPS_HANDLER_0_ID or object_id == obj_ids.GPS_HANDLER_1_ID: handle_gps_data(printer=printer, hk_data=hk_data) if objb == obj_ids.BPX_HANDLER_ID: