diff --git a/.run/tmtcloop.run.xml b/.run/tmtcloop.run.xml new file mode 100644 index 0000000..ee5f5f7 --- /dev/null +++ b/.run/tmtcloop.run.xml @@ -0,0 +1,24 @@ + + + + + \ No newline at end of file diff --git a/config/definitions.py b/config/definitions.py index 7d88116..708701f 100644 --- a/config/definitions.py +++ b/config/definitions.py @@ -47,4 +47,6 @@ class CustomServiceList(enum.Enum): SUS_ASS = "sus-ass" TCS_ASS = "tcs-ass" TIME = "time" + PROCEDURE = "proc" + TVTTESTPROCEDURE = "tvtestproc" CONTROLLERS = "controllers" diff --git a/config/object_ids.py b/config/object_ids.py index b7c605c..b2b3aaf 100644 --- a/config/object_ids.py +++ b/config/object_ids.py @@ -36,14 +36,14 @@ TMP_1075_2_HANDLER_ID = bytes([0x44, 0x42, 0x00, 0x05]) SYRLINKS_HANDLER_ID = bytes([0x44, 0x53, 0x00, 0xA3]) # ACS Object IDs -MGM_0_HANDLER_ID = bytes([0x44, 0x12, 0x00, 0x06]) -MGM_1_HANDLER_ID = bytes([0x44, 0x12, 0x01, 0x07]) -MGM_2_HANDLER_ID = bytes([0x44, 0x12, 0x02, 0x08]) -MGM_3_HANDLER_ID = bytes([0x44, 0x12, 0x03, 0x09]) -GYRO_0_HANDLER_ID = bytes([0x44, 0x12, 0x00, 0x10]) -GYRO_1_HANDLER_ID = bytes([0x44, 0x12, 0x01, 0x11]) -GYRO_2_HANDLER_ID = bytes([0x44, 0x12, 0x02, 0x12]) -GYRO_3_HANDLER_ID = bytes([0x44, 0x12, 0x03, 0x13]) +MGM_0_LIS3_HANDLER_ID = bytes([0x44, 0x12, 0x00, 0x06]) +MGM_1_RM3100_HANDLER_ID = bytes([0x44, 0x12, 0x01, 0x07]) +MGM_2_LIS3_HANDLER_ID = bytes([0x44, 0x12, 0x02, 0x08]) +MGM_3_RM3100_HANDLER_ID = bytes([0x44, 0x12, 0x03, 0x09]) +GYRO_0_ADIS_HANDLER_ID = bytes([0x44, 0x12, 0x00, 0x10]) +GYRO_1_L3G_HANDLER_ID = bytes([0x44, 0x12, 0x01, 0x11]) +GYRO_2_ADIS_HANDLER_ID = bytes([0x44, 0x12, 0x02, 0x12]) +GYRO_3_L3G_HANDLER_ID = bytes([0x44, 0x12, 0x03, 0x13]) GPS_HANDLER_0_ID = bytes([0x44, 0x13, 0x00, 0x45]) GPS_HANDLER_1_ID = bytes([0x44, 0x13, 0x01, 0x46]) RW1_ID = bytes([0x44, 0x12, 0x00, 0x47]) diff --git a/pus_tc/cmd_definitions.py b/pus_tc/cmd_definitions.py index c3dd7e8..4d5920a 100644 --- a/pus_tc/cmd_definitions.py +++ b/pus_tc/cmd_definitions.py @@ -34,6 +34,7 @@ def get_eive_service_op_code_dict() -> ServiceOpCodeDictT: add_pdec_cmds(cmd_dict=service_op_code_dict) add_heater_cmds(cmd_dict=service_op_code_dict) add_tmp_sens_cmds(cmd_dict=service_op_code_dict) + add_proc_cmds(cmd_dict=service_op_code_dict) return service_op_code_dict @@ -946,6 +947,37 @@ def add_ploc_supv_cmds(cmd_dict: ServiceOpCodeDictT): ] = service_ploc_memory_dumper_tuple +def add_proc_cmds(cmd_dict: ServiceOpCodeDictT): + from pus_tc.system.proc import OpCodes, KAI + + op_code_dict = dict() + add_op_code_entry( + op_code_dict=op_code_dict, keys=OpCodes.HEATER, info=KAI.HEATER[1] + ) + add_service_op_code_entry( + srv_op_code_dict=cmd_dict, + name=CustomServiceList.PROCEDURE.value, + info="Procedures", + op_code_entry=op_code_dict, + ) + op_code_dict = dict() + add_op_code_entry( + op_code_dict=op_code_dict, keys=OpCodes.BAT_FT, info=KAI.BAT_FT[1] + ) + add_op_code_entry( + op_code_dict=op_code_dict, keys=OpCodes.PCDU_FT, info=KAI.PCDU_FT[1] + ) + add_op_code_entry( + op_code_dict=op_code_dict, keys=OpCodes.CORE_FT, info=KAI.CORE_FT[1], options={} + ) + add_service_op_code_entry( + srv_op_code_dict=cmd_dict, + name=CustomServiceList.PROCEDURE.value, + info="TV Test Procedures", + op_code_entry=op_code_dict, + ) + + def add_system_cmds(cmd_dict: ServiceOpCodeDictT): from pus_tc.system.acs import AcsOpCodes, SusOpCodes import pus_tc.system.tcs as tcs diff --git a/pus_tc/devs/gyros.py b/pus_tc/devs/gyros.py new file mode 100644 index 0000000..6cdc775 --- /dev/null +++ b/pus_tc/devs/gyros.py @@ -0,0 +1,10 @@ +import enum + + +class AdisGyroSetIds(enum.IntEnum): + CORE_HK = 0 + CFG_HK = 1 + + +class L3gGyroSetIds(enum.IntEnum): + CORE_HK = 0 diff --git a/pus_tc/devs/mgms.py b/pus_tc/devs/mgms.py new file mode 100644 index 0000000..9bd6199 --- /dev/null +++ b/pus_tc/devs/mgms.py @@ -0,0 +1,9 @@ +import enum + + +class MgmLis3SetIds(enum.IntEnum): + CORE_HK = 0 + + +class MgmRm3100SetIds(enum.IntEnum): + CORE_HK = 0 diff --git a/pus_tc/devs/p60dock.py b/pus_tc/devs/p60dock.py index 03bbd5b..be71cac 100644 --- a/pus_tc/devs/p60dock.py +++ b/pus_tc/devs/p60dock.py @@ -16,9 +16,6 @@ from gomspace.gomspace_common import * from config.object_ids import P60_DOCK_HANDLER -HK_SET_ID = 0x3 - - class P60OpCodes: STACK_3V3_ON = ["stack-3v3-on", "1"] STACK_3V3_OFF = ["stack-3v3-off", "2"] diff --git a/pus_tc/system/acs.py b/pus_tc/system/acs.py index 7d3eb48..76b3dcc 100644 --- a/pus_tc/system/acs.py +++ b/pus_tc/system/acs.py @@ -5,6 +5,7 @@ from config.object_ids import ACS_BOARD_ASS_ID, SUS_BOARD_ASS_ID from .common import command_mode + class AcsOpCodes: ACS_ASS_A_SIDE = ["0", "acs-a"] ACS_ASS_B_SIDE = ["1", "acs-b"] diff --git a/pus_tc/system/controllers.py b/pus_tc/system/controllers.py index 16f3252..50db0de 100644 --- a/pus_tc/system/controllers.py +++ b/pus_tc/system/controllers.py @@ -18,10 +18,12 @@ class Info: def pack_controller_commands(tc_queue: TcQueueT, op_code: str): - parameters = prompt_parameters([{"name": "Mode", "defaultValue": "2"}, { "name": "Submode", "defaultValue": "0"}]) mode = int(parameters["Mode"]) + if mode < 0 or mode > 2: + print("Invalid Mode, defaulting to OFF") + mode = 0 submode = int(parameters["Submode"]) command_mode( object_id=get_object_from_op_code(op_code), diff --git a/pus_tc/system/proc.py b/pus_tc/system/proc.py new file mode 100644 index 0000000..edd2c29 --- /dev/null +++ b/pus_tc/system/proc.py @@ -0,0 +1,120 @@ +from tmtccmd.config import QueueCommands +from tmtccmd.tc.definitions import TcQueueT +from tmtccmd.tc.pus_3_fsfw_hk import * +from config.object_ids import ( + BPX_HANDLER_ID, + P60_DOCK_HANDLER, + PDU_1_HANDLER_ID, + PDU_2_HANDLER_ID, + ACU_HANDLER_ID, + CORE_CONTROLLER_ID, +) +import config.object_ids as oids +from pus_tc.devs.bpx_batt import BpxSetIds +from pus_tc.system.core import SetIds as CoreSetIds +from gomspace.gomspace_common import SetIds as GsSetIds + + +class OpCodes: + HEATER = ["0", "heater"] + BAT_FT = ["bat-ft"] + CORE_FT = ["core-ft"] + PCDU_FT = ["pcdu-ft"] + + +class KeyAndInfo: + HEATER = ["Heater", "heater procedure"] + BAT_FT = ["BPX Battery", "battery functional test"] + PCDU_FT = ["PCDU", "PCDU functional test"] + CORE_FT = ["OBC", "OBC functional test"] + + +KAI = KeyAndInfo + +PROC_INFO_DICT = { + KAI.BAT_FT[0]: [OpCodes.BAT_FT, KAI.BAT_FT[1], 120.0, 10.0], + KAI.PCDU_FT[0]: [OpCodes.PCDU_FT, KeyAndInfo.PCDU_FT[1], 120.0, 10.0], + KAI.CORE_FT[0]: [OpCodes.CORE_FT, KeyAndInfo.CORE_FT[1], 120.0, 10.0], +} + + +def generic_print(tc_queue: TcQueueT, key: str): + info = PROC_INFO_DICT[key] + tc_queue.appendleft( + (QueueCommands.PRINT, f"Executing {info[0]} Procedure (OpCodes: {info[1]})") + ) + + +def pack_generic_hk_listening_cmds( + tc_queue: TcQueueT, proc_key: str, sid: bytes, diag: bool +): + generic_print(tc_queue=tc_queue, key=proc_key) + listen_to_hk_for_x_seconds( + diag=diag, + tc_queue=tc_queue, + device=proc_key, + sid=sid, + interval_seconds=10.0, + collection_time=120.0, + ) + + +def pack_proc_commands(tc_queue: TcQueueT, op_code: str): + if op_code in OpCodes.BAT_FT: + key = KAI.BAT_FT[0] + sid = make_sid(BPX_HANDLER_ID, BpxSetIds.GET_HK_SET) + pack_generic_hk_listening_cmds( + tc_queue=tc_queue, proc_key=key, sid=sid, diag=False + ) + + if op_code in OpCodes.PCDU_FT: + key = KAI.PCDU_FT[0] + sid = make_sid(P60_DOCK_HANDLER, GsSetIds.P60_CORE) + pack_generic_hk_listening_cmds( + tc_queue=tc_queue, proc_key=key, sid=sid, diag=False + ) + + if op_code in OpCodes.CORE_FT: + key = KAI.CORE_FT[0] + sid = make_sid(oids.CORE_CONTROLLER_ID, CoreSetIds.HK) + pack_generic_hk_listening_cmds( + tc_queue=tc_queue, proc_key=key, sid=sid, diag=False + ) + + +def listen_to_hk_for_x_seconds( + tc_queue: TcQueueT, + diag: bool, + device: str, + sid: bytes, + interval_seconds: float, + collection_time: float, +): + """ + enable_periodic_hk_command_with_interval( + diag: bool, sid: bytes, interval_seconds: float, ssc: int + """ + """ + function with periodic HK generation + interval_seconds = at which rate HK is saved + collection_time = how long the HK is saved for + device = for which device the HK is saved + functional_test = + diagnostic Hk = yes diagnostic or no diagnostic + sid = structural ID for specific device + device Hk set ID + """ + + tc_queue.appendleft((QueueCommands.PRINT, f"Enabling periodic HK for {device}")) + cmd_tuple = enable_periodic_hk_command_with_interval( + diag=diag, sid=sid, interval_seconds=interval_seconds, ssc=0 + ) + for cmd in cmd_tuple: + tc_queue.appendleft(cmd.pack_command_tuple()) + + tc_queue.appendleft((QueueCommands.WAIT, collection_time)) + + tc_queue.appendleft((QueueCommands.PRINT, f"Disabling periodic HK for {device}")) + tc_queue.appendleft( + disable_periodic_hk_command(diag=diag, sid=sid, ssc=0).pack_command_tuple() + ) diff --git a/pus_tc/tc_packer_hook.py b/pus_tc/tc_packer_hook.py index 90340f4..c2e3f60 100644 --- a/pus_tc/tc_packer_hook.py +++ b/pus_tc/tc_packer_hook.py @@ -40,6 +40,7 @@ from pus_tc.system.acs import pack_acs_command, pack_sus_cmds from pus_tc.devs.plpcdu import pack_pl_pcdu_commands from pus_tc.devs.str_img_helper import pack_str_img_helper_command from pus_tc.system.tcs import pack_tcs_sys_commands +from pus_tc.system.proc import pack_proc_commands from pus_tc.system.controllers import pack_controller_commands from config.definitions import CustomServiceList from config.object_ids import ( @@ -221,6 +222,8 @@ def pack_service_queue_user( return pack_solar_array_deployment_test_into( object_id=SOLAR_ARRAY_DEPLOYMENT_ID, tc_queue=service_queue ) + if service == CustomServiceList.PROCEDURE.value: + return pack_proc_commands(tc_queue=service_queue, op_code=op_code) if service == CustomServiceList.SUS_ASS.value: return pack_sus_cmds(tc_queue=service_queue, op_code=op_code) if service == CustomServiceList.PL_PCDU.value: diff --git a/pus_tm/devs/bpx_bat.py b/pus_tm/devs/bpx_bat.py new file mode 100644 index 0000000..eb27286 --- /dev/null +++ b/pus_tm/devs/bpx_bat.py @@ -0,0 +1,66 @@ +import struct + +from pus_tc.devs.bpx_batt import BpxSetIds +from pus_tm.defs import PrintWrapper +from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter + + +def handle_bpx_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): + pw = PrintWrapper(printer) + if set_id == BpxSetIds.GET_HK_SET: + fmt_str = "!HHHHhhhhIB" + inc_len = struct.calcsize(fmt_str) + ( + charge_current, + discharge_current, + heater_current, + batt_voltage, + batt_temp_1, + batt_temp_2, + batt_temp_3, + batt_temp_4, + reboot_cntr, + boot_cause, + ) = struct.unpack(fmt_str, hk_data[0:inc_len]) + header_list = [ + "Charge Current", + "Discharge Current", + "Heater Current", + "Battery Voltage", + "Batt Temp 1", + "Batt Temp 2", + "Batt Temp 3", + "Batt Temp 4", + "Reboot Counter", + "Boot Cause", + ] + content_list = [ + charge_current, + discharge_current, + heater_current, + batt_voltage, + batt_temp_1, + batt_temp_2, + batt_temp_3, + batt_temp_4, + reboot_cntr, + boot_cause, + ] + validity_buffer = hk_data[inc_len:] + pw.dlog(str(header_list)) + pw.dlog(str(content_list)) + printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=10) + elif set_id == BpxSetIds.GET_CFG_SET: + battheat_mode = hk_data[0] + battheat_low = struct.unpack("!b", hk_data[1:2])[0] + battheat_high = struct.unpack("!b", hk_data[2:3])[0] + header_list = [ + "Battery Heater Mode", + "Battery Heater Low Limit", + "Battery Heater High Limit", + ] + content_list = [battheat_mode, battheat_low, battheat_high] + validity_buffer = hk_data[3:] + pw.dlog(str(header_list)) + pw.dlog(str(content_list)) + printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=10) diff --git a/pus_tm/devs/gps.py b/pus_tm/devs/gps.py new file mode 100644 index 0000000..d60828d --- /dev/null +++ b/pus_tm/devs/gps.py @@ -0,0 +1,59 @@ +import os +import struct +from datetime import datetime + +from pus_tm.defs import PrintWrapper +from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter + + +def handle_gps_data(printer: FsfwTmTcPrinter, hk_data: bytes): + pw = PrintWrapper(printer) + pw.dlog(f"Received GPS data, HK data length {len(hk_data)}") + var_index = 0 + header_list = [ + "Latitude", + "Longitude", + "Altitude", + "Fix Mode", + "Sats in Use", + "Date", + "Unix Seconds", + ] + latitude = struct.unpack("!d", hk_data[0:8])[0] + longitude = struct.unpack("!d", hk_data[8:16])[0] + altitude = struct.unpack("!d", hk_data[16:24])[0] + fix_mode = hk_data[24] + sat_in_use = hk_data[25] + year = struct.unpack("!H", hk_data[26:28])[0] + month = hk_data[28] + day = hk_data[29] + hours = hk_data[30] + minutes = hk_data[31] + seconds = hk_data[32] + date_string = f"{day}.{month}.{year} {hours}:{minutes}:{seconds}" + unix_seconds = struct.unpack("!I", hk_data[33:37])[0] + content_list = [ + latitude, + longitude, + altitude, + fix_mode, + sat_in_use, + date_string, + unix_seconds, + ] + var_index += 13 + if not os.path.isfile("gps_log.txt"): + with open("gps_log.txt", "w") as gps_file: + gps_file.write( + "Time, Latitude [deg], Longitude [deg], Altitude [m], Fix Mode, Sats in Use, " + "Date, Unix Seconds\n" + ) + with open("gps_log.txt", "a") as gps_file: + gps_file.write( + f"{datetime.now()}, {latitude}, {longitude}, {altitude}, " + f"{fix_mode}, {sat_in_use}, {date_string}, {unix_seconds}\n" + ) + validity_buffer = hk_data[37:39] + pw.dlog(str(header_list)) + pw.dlog(str(content_list)) + printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=10) diff --git a/pus_tm/devs/gyros.py b/pus_tm/devs/gyros.py new file mode 100644 index 0000000..ff56abf --- /dev/null +++ b/pus_tm/devs/gyros.py @@ -0,0 +1,75 @@ +import struct + +from pus_tm.defs import PrintWrapper +from tmtccmd.utility import ObjectId +from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter + +from pus_tc.devs.gyros import L3gGyroSetIds, AdisGyroSetIds +import config.object_ids as obj_ids + + +def handle_gyros_hk_data( + object_id: ObjectId, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes +): + if object_id.as_bytes in [ + obj_ids.GYRO_0_ADIS_HANDLER_ID, + obj_ids.GYRO_2_ADIS_HANDLER_ID, + ]: + handle_adis_gyro_hk( + object_id=object_id, printer=printer, set_id=set_id, hk_data=hk_data + ) + elif object_id.as_bytes in [ + obj_ids.GYRO_1_L3G_HANDLER_ID, + obj_ids.GYRO_3_L3G_HANDLER_ID, + ]: + handle_l3g_gyro_hk( + object_id=object_id, printer=printer, set_id=set_id, hk_data=hk_data + ) + + +def handle_adis_gyro_hk( + object_id: ObjectId, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes +): + if set_id == AdisGyroSetIds.CORE_HK: + pw = PrintWrapper(printer) + fmt_str = "!ddddddf" + inc_len = struct.calcsize(fmt_str) + (angVelocX, angVelocY, angVelocZ, accelX, accelY, accelZ, temp) = struct.unpack( + fmt_str, hk_data[0 : 0 + inc_len] + ) + pw.dlog(f"Received ADIS1650X Gyro HK data from object {object_id}") + pw.dlog( + f"Angular Velocities (degrees per second): X {angVelocX} | " + f"Y {angVelocY} | Z {angVelocZ}" + ) + pw.dlog(f"Acceleration (m/s^2): X {accelX} | Y {accelY} | Z {accelZ}") + pw.dlog(f"Temperature {temp} C") + if set_id == AdisGyroSetIds.CFG_HK: + pw = PrintWrapper(printer) + fmt_str = "!HBHH" + inc_len = struct.calcsize(fmt_str) + (diag_stat_reg, filter_setting, msc_ctrl_reg, dec_rate_reg) = struct.unpack( + fmt_str, hk_data[0 : 0 + inc_len] + ) + pw.dlog(f"Diagnostic Status Register {diag_stat_reg:#018b}") + pw.dlog(f"Filter Settings {filter_setting:#010b}") + pw.dlog(f"Miscellaneous Control Register {msc_ctrl_reg:#018b}") + pw.dlog(f"Decimation Rate {dec_rate_reg:#06x}") + + +def handle_l3g_gyro_hk( + object_id: ObjectId, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes +): + if set_id == L3gGyroSetIds: + pw = PrintWrapper(printer) + fmt_str = "!ffff" + inc_len = struct.calcsize(fmt_str) + (angVelocX, angVelocY, angVelocZ, temp) = struct.unpack( + fmt_str, hk_data[0 : 0 + inc_len] + ) + pw.dlog(f"Received L3GD20H Gyro HK data from object {object_id}") + pw.dlog( + f"Angular Velocities (degrees per second): X {angVelocX} | " + f"Y {angVelocY} | Z {angVelocZ}" + ) + pw.dlog(f"Temperature {temp} C") diff --git a/pus_tm/devs/imtq_mgt.py b/pus_tm/devs/imtq_mgt.py new file mode 100644 index 0000000..8ed68c6 --- /dev/null +++ b/pus_tm/devs/imtq_mgt.py @@ -0,0 +1,140 @@ +import struct + +from pus_tm.defs import PrintWrapper +from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter + + +def handle_self_test_data(printer: FsfwTmTcPrinter, hk_data: bytes): + pw = PrintWrapper(printer) + header_list = [ + "Init Err", + "Init Raw Mag X [nT]", + "Init Raw Mag Y [nT]", + "Init Raw Mag Z [nT]", + "Init Cal Mag X [nT]", + "Init Cal Mag Y [nT]", + "Init Cal Mag Z [nT]", + "Init Coil X Current [mA]", + "Init Coil Y Current [mA]", + "Init Coil Z Current [mA]", + "Init Coil X Temperature [°C]", + "Init Coil Y Temperature [°C]", + "Init Coil Z Temperature [°C]", + "Err", + "Raw Mag X [nT]", + "Raw Mag Y [nT]", + "Raw Mag Z [nT]", + "Cal Mag X [nT]", + "Cal Mag Y [nT]", + "Cal Mag Z [nT]", + "Coil X Current [mA]", + "Coil Y Current [mA]", + "Coil Z Current [mA]", + "Coil X Temperature [°C]", + "Coil Y Temperature [°C]", + "Coil Z Temperature [°C]", + "Fina Err", + "Fina Raw Mag X [nT]", + "Fina Raw Mag Y [nT]", + "Fina Raw Mag Z [nT]", + "Fina Cal Mag X [nT]", + "Fina Cal Mag Y [nT]", + "Fina Cal Mag Z [nT]", + "Fina Coil X Current [mA]", + "Fina Coil Y Current [mA]", + "Fina Coil Z Current [mA]", + "Fina Coil X Temperature [°C]", + "Fina Coil Y Temperature [°C]", + "Fina Coil Z Temperature [°C]", + ] + # INIT step (no coil actuation) + init_err = hk_data[0] + init_raw_mag_x = struct.unpack("!f", hk_data[1:5])[0] + init_raw_mag_y = struct.unpack("!f", hk_data[5:9])[0] + init_raw_mag_z = struct.unpack("!f", hk_data[9:13])[0] + init_cal_mag_x = struct.unpack("!f", hk_data[13:17])[0] + init_cal_mag_y = struct.unpack("!f", hk_data[17:21])[0] + init_cal_mag_z = struct.unpack("!f", hk_data[21:25])[0] + init_coil_x_current = struct.unpack("!f", hk_data[25:29])[0] + init_coil_y_current = struct.unpack("!f", hk_data[29:33])[0] + init_coil_z_current = struct.unpack("!f", hk_data[33:37])[0] + init_coil_x_temperature = struct.unpack("!H", hk_data[37:39])[0] + init_coil_y_temperature = struct.unpack("!H", hk_data[39:41])[0] + init_coil_z_temperature = struct.unpack("!H", hk_data[41:43])[0] + + # Actuation step + err = hk_data[43] + raw_mag_x = struct.unpack("!f", hk_data[44:48])[0] + raw_mag_y = struct.unpack("!f", hk_data[48:52])[0] + raw_mag_z = struct.unpack("!f", hk_data[52:56])[0] + cal_mag_x = struct.unpack("!f", hk_data[56:60])[0] + cal_mag_y = struct.unpack("!f", hk_data[60:64])[0] + cal_mag_z = struct.unpack("!f", hk_data[64:68])[0] + coil_x_current = struct.unpack("!f", hk_data[68:72])[0] + coil_y_current = struct.unpack("!f", hk_data[72:76])[0] + coil_z_current = struct.unpack("!f", hk_data[76:80])[0] + coil_x_temperature = struct.unpack("!H", hk_data[80:82])[0] + coil_y_temperature = struct.unpack("!H", hk_data[82:84])[0] + coil_z_temperature = struct.unpack("!H", hk_data[84:86])[0] + + # FINA step (no coil actuation) + fina_err = hk_data[86] + fina_raw_mag_x = struct.unpack("!f", hk_data[87:91])[0] + fina_raw_mag_y = struct.unpack("!f", hk_data[91:95])[0] + fina_raw_mag_z = struct.unpack("!f", hk_data[95:99])[0] + fina_cal_mag_x = struct.unpack("!f", hk_data[99:103])[0] + fina_cal_mag_y = struct.unpack("!f", hk_data[103:107])[0] + fina_cal_mag_z = struct.unpack("!f", hk_data[107:111])[0] + fina_coil_x_current = struct.unpack("!f", hk_data[111:115])[0] + fina_coil_y_current = struct.unpack("!f", hk_data[115:119])[0] + fina_coil_z_current = struct.unpack("!f", hk_data[119:123])[0] + fina_coil_x_temperature = struct.unpack("!H", hk_data[123:125])[0] + fina_coil_y_temperature = struct.unpack("!H", hk_data[125:127])[0] + fina_coil_z_temperature = struct.unpack("!H", hk_data[127:129])[0] + + validity_buffer = hk_data[129:] + content_list = [ + init_err, + init_raw_mag_x, + init_raw_mag_y, + init_raw_mag_z, + init_cal_mag_x, + init_cal_mag_y, + init_cal_mag_z, + init_coil_x_current, + init_coil_y_current, + init_coil_z_current, + init_coil_x_temperature, + init_coil_y_temperature, + init_coil_z_temperature, + err, + raw_mag_x, + init_raw_mag_y, + raw_mag_z, + cal_mag_x, + cal_mag_y, + cal_mag_z, + coil_x_current, + coil_y_current, + coil_z_current, + coil_x_temperature, + coil_y_temperature, + coil_z_temperature, + fina_err, + fina_raw_mag_x, + fina_raw_mag_y, + fina_raw_mag_z, + fina_cal_mag_x, + fina_cal_mag_y, + fina_cal_mag_z, + fina_coil_x_current, + fina_coil_y_current, + fina_coil_z_current, + fina_coil_x_temperature, + fina_coil_y_temperature, + fina_coil_z_temperature, + ] + num_of_vars = len(header_list) + pw.dlog(str(header_list)) + pw.dlog(str(content_list)) + printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=num_of_vars) diff --git a/pus_tm/devs/mgms.py b/pus_tm/devs/mgms.py new file mode 100644 index 0000000..0a9db2c --- /dev/null +++ b/pus_tm/devs/mgms.py @@ -0,0 +1,46 @@ +import struct + +from pus_tm.defs import PrintWrapper +from pus_tc.devs.mgms import MgmRm3100SetIds, MgmLis3SetIds +from tmtccmd.utility import ObjectId +from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter +import config.object_ids as obj_ids + + +def handle_mgm_hk_data( + object_id: ObjectId, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes +): + if object_id in [obj_ids.MGM_0_LIS3_HANDLER_ID, obj_ids.MGM_2_LIS3_HANDLER_ID]: + handle_mgm_lis3_hk_data(object_id, printer, set_id, hk_data) + pass + + +def handle_mgm_lis3_hk_data( + object_id: ObjectId, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes +): + if set_id == MgmLis3SetIds.CORE_HK: + pw = PrintWrapper(printer) + fmt_str = "!ffff" + inc_len = struct.calcsize(fmt_str) + (field_x, field_y, field_z, temp) = struct.unpack( + fmt_str, hk_data[0 : 0 + inc_len] + ) + pw.dlog(f"Received MGM LIS3 from object {object_id}") + pw.dlog( + f"Field strengths in micro Tesla X {field_x} | Y {field_y} | Z {field_z}" + ) + pw.dlog(f"Temperature {temp} C") + + +def handle_mgm_rm3100_hk_data( + object_id: ObjectId, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes +): + if set_id == MgmRm3100SetIds.CORE_HK: + pw = PrintWrapper(printer) + fmt_str = f"!fff" + inc_len = struct.calcsize(fmt_str) + (field_x, field_y, field_z) = struct.unpack(fmt_str, hk_data[0 : 0 + inc_len]) + pw.dlog(f"Received MGM LIS3 from object {object_id}") + pw.dlog( + f"Field strengths in micro Tesla X {field_x} | Y {field_y} | Z {field_z}" + ) diff --git a/pus_tm/devs/pcdu.py b/pus_tm/devs/pcdu.py new file mode 100644 index 0000000..d33ddc6 --- /dev/null +++ b/pus_tm/devs/pcdu.py @@ -0,0 +1,294 @@ +import struct + +from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter +from pus_tm.defs import PrintWrapper +from gomspace.gomspace_common import SetIds + +P60_INDEX_LIST = [ + "ACU VCC", + "PDU1 VCC", + "X3 IDLE VCC", + "PDU2 VCC", + "ACU VBAT", + "PDU1 VBAT", + "X3 IDLE VBAT", + "PDU2 VBAT", + "STACK VBAT", + "STACK 3V3", + "STACK 5V", + "GS3V3", + "GS5V", +] + +WDT_LIST = ["GND", "I2C", "CAN", "CSP0", "CSP1"] + +PDU1_CHANNELS_NAMES = [ + "TCS Board", + "Syrlinks", + "Startracker", + "MGT", + "SUS Nominal", + "SCEX", + "PLOC", + "ACS A Side", + "Unused Channel 8", +] + +PDU2_CHANNELS_NAMES = [ + "Q7S", + "Payload PCDU CH1", + "RW", + "TCS Heater In", + "SUS Redundant", + "Deployment Mechanism", + "Payload PCDU CH6", + "ACS B Side", + "Payload Camera", +] + +PDU_CHANNEL_NAMES = [PDU1_CHANNELS_NAMES, PDU2_CHANNELS_NAMES] + + +class WdtInfo: + def __init__(self, pw: PrintWrapper): + self.wdt_reboots_list = [] + self.time_pings_left_list = [] + self.pw = pw + + def print(self): + wdt_info = "WDT Type | Reboots | Time or Pings left (CSP only)" + self.pw.dlog(wdt_info) + for idx in range(len(self.wdt_reboots_list)): + self.pw.dlog( + f"{WDT_LIST[idx].ljust(5)} | " + f"{self.wdt_reboots_list[idx]:010} | {self.time_pings_left_list[idx]:010}", + ) + + def parse(self, wdt_data: bytes, current_idx: int) -> int: + priv_idx = 0 + self.wdt_reboots_list = [] + self.time_pings_left_list = [] + for idx in range(5): + self.wdt_reboots_list.append( + struct.unpack("!I", wdt_data[priv_idx : priv_idx + 4])[0] + ) + priv_idx += 4 + current_idx += 4 + for idx in range(3): + self.time_pings_left_list.append( + struct.unpack("!I", wdt_data[priv_idx : priv_idx + 4])[0] + ) + priv_idx += 4 + current_idx += 4 + for idx in range(2): + self.time_pings_left_list.append(wdt_data[priv_idx]) + current_idx += 1 + priv_idx += 1 + return current_idx + + +def handle_pdu_data( + printer: FsfwTmTcPrinter, pdu_idx: int, set_id: int, hk_data: bytes +): + pw = PrintWrapper(printer=printer) + current_idx = 0 + priv_idx = pdu_idx - 1 + if set_id == SetIds.PDU_1_AUX or set_id == SetIds.PDU_2_AUX: + fmt_str = "!hhBBBIIH" + inc_len = struct.calcsize(fmt_str) + ( + vcc, + vbat, + conv_enb_0, + conv_enb_1, + conv_enb_2, + boot_cause, + uptime, + reset_cause, + ) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len]) + pw.dlog(f"VCC {vcc} mV | VBAT {vbat} mV") + pw.dlog(f"Converter Enables [{conv_enb_0},{conv_enb_1},{conv_enb_2}]") + pw.dlog( + f"Boot Cause {boot_cause} | Uptime {uptime} | Reset Cause {reset_cause}", + ) + current_idx += inc_len + latchup_list = [] + pw.dlog("Latchups") + for idx in range(len(PDU1_CHANNELS_NAMES)): + latchup_list.append( + struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0] + ) + content_line = ( + f"{PDU_CHANNEL_NAMES[priv_idx][idx].ljust(24)} | {latchup_list[idx]}" + ) + pw.dlog(content_line) + current_idx += 2 + device_types = [] + for idx in range(len(PDU1_CHANNELS_NAMES)): + device_types.append(hk_data[current_idx]) + current_idx += 1 + device_statuses = [] + for idx in range(len(PDU1_CHANNELS_NAMES)): + device_statuses.append(hk_data[current_idx]) + current_idx += 1 + wdt = WdtInfo(pw=pw) + current_idx = wdt.parse(wdt_data=hk_data[current_idx:], current_idx=current_idx) + wdt.print() + if set_id == SetIds.PDU_1_CORE or set_id == SetIds.PDU_2_CORE: + pw.dlog(f"Received PDU HK from PDU {pdu_idx}") + current_list = [] + for idx in range(len(PDU1_CHANNELS_NAMES)): + current_list.append( + struct.unpack("!h", hk_data[current_idx : current_idx + 2])[0] + ) + current_idx += 2 + voltage_list = [] + for idx in range(len(PDU1_CHANNELS_NAMES)): + voltage_list.append( + struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0] + ) + current_idx += 2 + output_enb_list = [] + for idx in range(len(PDU1_CHANNELS_NAMES)): + output_enb_list.append(hk_data[current_idx]) + current_idx += 1 + header_str = f"{'Name'.ljust(24)} | OutEnb | U [mV] | I [mA]" + print(header_str) + printer.file_logger.info(header_str) + for idx in range(len(PDU1_CHANNELS_NAMES)): + out_enb = f"{output_enb_list[idx]}".ljust(6) + content_line = ( + f"{PDU_CHANNEL_NAMES[priv_idx][idx].ljust(24)} | {out_enb} | " + f"{voltage_list[idx]:05} | {current_list[idx]:04}" + ) + pw.dlog(content_line) + fmt_str = "!IBh" + inc_len = struct.calcsize(fmt_str) + (boot_count, batt_mode, temperature) = struct.unpack( + fmt_str, hk_data[current_idx : current_idx + inc_len] + ) + info = ( + f"Boot Count {boot_count} | Battery Mode {batt_mode} | " + f"Temperature {temperature / 10.0}" + ) + pw.dlog(info) + + +def handle_p60_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): + pw = PrintWrapper(printer=printer) + if set_id == SetIds.P60_CORE: + pw.dlog("Received P60 Core HK. Voltages in mV, currents in mA") + current_idx = 0 + current_list = [] + for idx in range(13): + current_list.append( + struct.unpack("!h", hk_data[current_idx : current_idx + 2])[0] + ) + current_idx += 2 + voltage_list = [] + for idx in range(13): + voltage_list.append( + struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0] + ) + current_idx += 2 + out_enb_list = [] + for idx in range(13): + out_enb_list.append(hk_data[current_idx]) + current_idx += 1 + header_str = f"{'Name'.ljust(24)} | OutEnb | U [mV] | I [mA]" + print(header_str) + printer.file_logger.info(header_str) + for idx in range(13): + out_enb = f"{out_enb_list[idx]}".ljust(6) + content_line = ( + f"{P60_INDEX_LIST[idx].ljust(24)} | {out_enb} | " + f"{voltage_list[idx]:05} | {current_list[idx]:04}" + ) + pw.dlog(content_line) + fmt_str = "!IBhHhh" + inc_len = struct.calcsize(fmt_str) + ( + boot_count, + batt_mode, + batt_current, + batt_voltage, + temp_0, + temp_1, + ) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len]) + current_idx += inc_len + batt_info = ( + f"Batt: Mode {batt_mode} | Boot Count {boot_count} | " + f"Charge current {batt_current} | Voltage {batt_voltage}" + ) + temps = f"In C: Temp 0 {temp_0 / 10.0} | Temp 1 {temp_1 / 10.0} | " + pw.dlog(temps) + pw.dlog(batt_info) + printer.print_validity_buffer(validity_buffer=hk_data[current_idx:], num_vars=9) + if set_id == SetIds.P60_AUX: + pw.dlog("Received P60 AUX HK. Voltages in mV, currents in mA") + current_idx = 0 + latchup_list = [] + pw.dlog("P60 Dock Latchups") + for idx in range(0, 13): + latchup_list.append( + struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0] + ) + content_line = f"{P60_INDEX_LIST[idx].ljust(24)} | {latchup_list[idx]}" + pw.dlog(content_line) + current_idx += 2 + fmt_str = "!IIHBBHHhhB" + inc_len = struct.calcsize(fmt_str) + ( + boot_cause, + uptime, + reset_cause, + heater_on, + conv_5v_on, + dock_vbat, + dock_vcc_c, + batt_temp_0, + batt_temp_1, + dearm_status, + ) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len]) + current_idx += inc_len + wdt = WdtInfo(pw=pw) + current_idx = wdt.parse(wdt_data=hk_data[current_idx:], current_idx=current_idx) + fmt_str = "!hhbb" + inc_len = struct.calcsize(fmt_str) + ( + batt_charge_current, + batt_discharge_current, + ant6_depl, + ar6_depl, + ) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len]) + current_idx += inc_len + device_types = [] + device_statuses = [] + for idx in range(8): + device_types.append(hk_data[current_idx]) + current_idx += 1 + for idx in range(8): + device_statuses.append(hk_data[current_idx]) + current_idx += 1 + util_info = ( + f"Reset Cause {reset_cause} | Boot Cause {boot_cause} | Uptime {uptime}" + ) + util_info_2 = ( + f"Conv 5V on {conv_5v_on} | Heater On {heater_on} | " + f"Dock VBAT {dock_vbat} | DOCK VCC Current {dock_vcc_c}" + ) + pw.dlog(util_info) + pw.dlog(util_info_2) + wdt.print() + misc_info = ( + f"Dearm {dearm_status} | ANT6 Depl {ant6_depl} | AR6 Deply {ar6_depl}" + ) + pw.dlog(misc_info) + batt_info = ( + f"Batt Temp 0 {batt_temp_0 / 10.0} | Batt Temp 1 {batt_temp_1 / 10.0} | " + f"Charge Current {batt_charge_current} | Discharge Current {batt_discharge_current}" + ) + pw.dlog(batt_info) + printer.print_validity_buffer( + validity_buffer=hk_data[current_idx:], num_vars=27 + ) diff --git a/pus_tm/devs/syrlinks.py b/pus_tm/devs/syrlinks.py new file mode 100644 index 0000000..3c82f7a --- /dev/null +++ b/pus_tm/devs/syrlinks.py @@ -0,0 +1,67 @@ +import struct + +from pus_tm.defs import PrintWrapper +from pus_tc.devs.syrlinks_hk_handler import SetIds +from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter + + +def handle_syrlinks_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): + if set_id == SetIds.RX_REGISTERS_DATASET: + return handle_syrlinks_rx_registers_dataset(printer, hk_data) + elif set_id == SetIds.TX_REGISTERS_DATASET: + return handle_syrlinks_tx_registers_dataset(printer, hk_data) + else: + pw = PrintWrapper(printer) + pw.dlog(f"Service 3 TM: Syrlinks handler reply with unknown set ID {set_id}") + + +def handle_syrlinks_rx_registers_dataset(printer: FsfwTmTcPrinter, hk_data: bytes): + pw = PrintWrapper(printer) + header_list = [ + "RX Status", + "RX Sensitivity", + "RX Frequency Shift", + "RX IQ Power", + "RX AGC Value", + "RX Demod Eb", + "RX Demod N0", + "RX Datarate", + ] + rx_status = hk_data[0] + rx_sensitivity = struct.unpack("!I", hk_data[1:5]) + rx_frequency_shift = struct.unpack("!I", hk_data[5:9]) + rx_iq_power = struct.unpack("!H", hk_data[9:11]) + rx_agc_value = struct.unpack("!H", hk_data[11:13]) + rx_demod_eb = struct.unpack("!I", hk_data[13:17]) + rx_demod_n0 = struct.unpack("!I", hk_data[17:21]) + rx_data_rate = hk_data[21] + content_list = [ + rx_status, + rx_sensitivity, + rx_frequency_shift, + rx_iq_power, + rx_agc_value, + rx_demod_eb, + rx_demod_n0, + rx_data_rate, + ] + validity_buffer = hk_data[22:] + pw.dlog(str(header_list)) + pw.dlog(str(content_list)) + printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=8) + + +def handle_syrlinks_tx_registers_dataset( + printer: FsfwTmTcPrinter, + hk_data: bytes, +): + pw = PrintWrapper(printer) + header_list = ["TX Status", "TX Waveform", "TX AGC value"] + tx_status = hk_data[0] + tx_waveform = hk_data[1] + tx_agc_value = struct.unpack("!H", hk_data[2:4]) + content_list = [tx_status, tx_waveform, tx_agc_value] + validity_buffer = hk_data[4:] + pw.dlog(str(header_list)) + pw.dlog(str(content_list)) + printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=3) diff --git a/pus_tm/factory_hook.py b/pus_tm/factory_hook.py index afac7c0..8bd8fe1 100644 --- a/pus_tm/factory_hook.py +++ b/pus_tm/factory_hook.py @@ -33,7 +33,6 @@ def ccsds_tm_handler(apid: int, raw_tm_packet: bytes, _user_args: any) -> None: pus_factory_hook(raw_tm_packet=raw_tm_packet) - def pus_factory_hook(raw_tm_packet: bytes): if len(raw_tm_packet) < 8: LOGGER.warning("Detected packet shorter than 8 bytes!") @@ -81,6 +80,5 @@ def pus_factory_hook(raw_tm_packet: bytes): packet=raw_tm_packet, srv_subservice=(service_type, subservice_type) ) except ValueError: - # TODO: Log faulty packet LOGGER.warning("Invalid packet format detected") log_raw_unknown_packet(packet=raw_tm_packet, packet_type=PacketTypes.TM) diff --git a/pus_tm/hk_handling.py b/pus_tm/hk_handling.py index 53b7aab..a7178b0 100644 --- a/pus_tm/hk_handling.py +++ b/pus_tm/hk_handling.py @@ -1,35 +1,32 @@ """HK Handling for EIVE OBSW""" import struct -import os -import datetime - - +from pus_tm.system.tcs import handle_thermal_controller_hk_data, TM_TCP_SERVER from tmtccmd.config.definitions import HkReplyUnpacked from tmtccmd.tm.pus_3_fsfw_hk import ( Service3Base, HkContentType, Service3FsfwTm, ) -from tmtccmd.logging import get_console_logger -from pus_tc.devs.bpx_batt import BpxSetIds -from pus_tc.devs.syrlinks_hk_handler import SetIds -from pus_tc.devs.p60dock import SetIds -from pus_tc.devs.imtq import ImtqSetIds from tmtccmd.utility.obj_id import ObjectId, ObjectIdDictT -import config.object_ids as obj_ids +from tmtccmd.logging import get_console_logger +from pus_tm.devs.bpx_bat import handle_bpx_hk_data +from pus_tm.devs.gps import handle_gps_data +from pus_tm.devs.gyros import handle_gyros_hk_data +from pus_tm.devs.imtq_mgt import handle_self_test_data +from pus_tm.devs.pcdu import handle_pdu_data, handle_p60_hk_data +from pus_tm.devs.syrlinks import handle_syrlinks_hk_data +from pus_tc.devs.imtq import ImtqSetIds from pus_tm.devs.reaction_wheels import handle_rw_hk_data from pus_tm.defs import FsfwTmTcPrinter, log_to_both - -from pus_tm.tm_tcp_server import TmTcpServer - +from pus_tm.system.core import handle_core_hk_data +from pus_tm.devs.mgms import handle_mgm_hk_data +import config.object_ids as obj_ids LOGGER = get_console_logger() -TM_TCP_SERVER = TmTcpServer.getInstance() - def handle_hk_packet( raw_tm: bytes, @@ -42,9 +39,9 @@ def handle_hk_packet( named_obj_id = tm_packet.object_id if tm_packet.subservice == 25 or tm_packet.subservice == 26: hk_data = tm_packet.tm_data[8:] - TM_TCP_SERVER.report_raw_hk_data(object_id=named_obj_id, - set_id=tm_packet.set_id, - hk_data=hk_data) + TM_TCP_SERVER.report_raw_hk_data( + object_id=named_obj_id, set_id=tm_packet.set_id, hk_data=hk_data + ) printer.generic_hk_tm_print( content_type=HkContentType.HK, object_id=named_obj_id, @@ -60,6 +57,7 @@ def handle_hk_packet( if tm_packet.subservice == 10 or tm_packet.subservice == 12: LOGGER.warning("HK definitions printout not implemented yet") + def handle_regular_hk_print( printer: FsfwTmTcPrinter, object_id: ObjectId, @@ -72,12 +70,7 @@ def handle_regular_hk_print( if object_id in [obj_ids.RW1_ID, obj_ids.RW2_ID, obj_ids.RW3_ID, obj_ids.RW4_ID]: handle_rw_hk_data(printer, object_id, set_id, hk_data) if objb == obj_ids.SYRLINKS_HANDLER_ID: - if set_id == SetIds.RX_REGISTERS_DATASET: - return handle_syrlinks_rx_registers_dataset(printer, hk_data) - elif set_id == SetIds.TX_REGISTERS_DATASET: - return handle_syrlinks_tx_registers_dataset(printer, hk_data) - else: - LOGGER.info("Service 3 TM: Syrlinks handler reply with unknown set id") + handle_syrlinks_hk_data(printer=printer, hk_data=hk_data, set_id=set_id) if objb == obj_ids.IMTQ_HANDLER_ID: if (set_id >= ImtqSetIds.POSITIVE_X_TEST) and ( set_id <= ImtqSetIds.NEGATIVE_Z_TEST @@ -90,7 +83,7 @@ def handle_regular_hk_print( if objb == obj_ids.BPX_HANDLER_ID: handle_bpx_hk_data(hk_data=hk_data, set_id=set_id, printer=printer) if objb == obj_ids.CORE_CONTROLLER_ID: - return handle_core_hk_data(printer=printer, hk_data=hk_data) + return handle_core_hk_data(printer=printer, hk_data=hk_data, set_id=set_id) if objb == obj_ids.PDU_1_HANDLER_ID: return handle_pdu_data( printer=printer, pdu_idx=1, set_id=set_id, hk_data=hk_data @@ -105,635 +98,30 @@ def handle_regular_hk_print( ) if objb == obj_ids.P60_DOCK_HANDLER: handle_p60_hk_data(printer=printer, set_id=set_id, hk_data=hk_data) + if objb in [ + obj_ids.GYRO_0_ADIS_HANDLER_ID, + obj_ids.GYRO_1_L3G_HANDLER_ID, + obj_ids.GYRO_2_ADIS_HANDLER_ID, + obj_ids.GYRO_3_L3G_HANDLER_ID, + ]: + handle_gyros_hk_data( + object_id=object_id, hk_data=hk_data, printer=printer, set_id=set_id + ) + if objb in [ + obj_ids.MGM_0_LIS3_HANDLER_ID, + obj_ids.MGM_1_RM3100_HANDLER_ID, + obj_ids.MGM_2_LIS3_HANDLER_ID, + obj_ids.MGM_3_RM3100_HANDLER_ID, + ]: + handle_mgm_hk_data( + object_id=object_id, hk_data=hk_data, printer=printer, set_id=set_id + ) if objb == obj_ids.PL_PCDU_ID: log_to_both(printer, "Received PL PCDU HK data") if objb == obj_ids.THERMAL_CONTROLLER_ID: - handle_thermal_controller_hk_data( object_id=object_id, printer=printer, set_id=set_id, hk_data=hk_data) + handle_thermal_controller_hk_data( + object_id=object_id, printer=printer, set_id=set_id, hk_data=hk_data + ) else: LOGGER.info("Service 3 TM: Parsing for this SID has not been implemented.") return HkReplyUnpacked() - -def handle_syrlinks_rx_registers_dataset( printer: FsfwTmTcPrinter, hk_data: bytes): - reply = HkReplyUnpacked() - header_list = [ - "RX Status", - "RX Sensitivity", - "RX Frequency Shift", - "RX IQ Power", - "RX AGC Value", - "RX Demod Eb", - "RX Demod N0", - "RX Datarate", - ] - rx_status = hk_data[0] - rx_sensitivity = struct.unpack("!I", hk_data[1:5]) - rx_frequency_shift = struct.unpack("!I", hk_data[5:9]) - rx_iq_power = struct.unpack("!H", hk_data[9:11]) - rx_agc_value = struct.unpack("!H", hk_data[11:13]) - rx_demod_eb = struct.unpack("!I", hk_data[13:17]) - rx_demod_n0 = struct.unpack("!I", hk_data[17:21]) - rx_data_rate = hk_data[21] - content_list = [ - rx_status, - rx_sensitivity, - rx_frequency_shift, - rx_iq_power, - rx_agc_value, - rx_demod_eb, - rx_demod_n0, - rx_data_rate, - ] - validity_buffer = hk_data[22:] - log_to_both(printer, str(header_list)) - log_to_both(printer, str(content_list)) - printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=8) - -def handle_syrlinks_tx_registers_dataset( - printer: FsfwTmTcPrinter, - hk_data: bytes, -): - reply = HkReplyUnpacked() - header_list = ["TX Status", "TX Waveform", "TX AGC value"] - tx_status = hk_data[0] - tx_waveform = hk_data[1] - tx_agc_value = struct.unpack("!H", hk_data[2:4]) - content_list = [tx_status, tx_waveform, tx_agc_value] - validity_buffer = hk_data[4:] - log_to_both(printer, str(header_list)) - log_to_both(printer, str(content_list)) - printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=3) - -def handle_self_test_data(printer: FsfwTmTcPrinter, hk_data: bytes): - header_list = [ - "Init Err", - "Init Raw Mag X [nT]", - "Init Raw Mag Y [nT]", - "Init Raw Mag Z [nT]", - "Init Cal Mag X [nT]", - "Init Cal Mag Y [nT]", - "Init Cal Mag Z [nT]", - "Init Coil X Current [mA]", - "Init Coil Y Current [mA]", - "Init Coil Z Current [mA]", - "Init Coil X Temperature [°C]", - "Init Coil Y Temperature [°C]", - "Init Coil Z Temperature [°C]", - "Err", - "Raw Mag X [nT]", - "Raw Mag Y [nT]", - "Raw Mag Z [nT]", - "Cal Mag X [nT]", - "Cal Mag Y [nT]", - "Cal Mag Z [nT]", - "Coil X Current [mA]", - "Coil Y Current [mA]", - "Coil Z Current [mA]", - "Coil X Temperature [°C]", - "Coil Y Temperature [°C]", - "Coil Z Temperature [°C]", - "Fina Err", - "Fina Raw Mag X [nT]", - "Fina Raw Mag Y [nT]", - "Fina Raw Mag Z [nT]", - "Fina Cal Mag X [nT]", - "Fina Cal Mag Y [nT]", - "Fina Cal Mag Z [nT]", - "Fina Coil X Current [mA]", - "Fina Coil Y Current [mA]", - "Fina Coil Z Current [mA]", - "Fina Coil X Temperature [°C]", - "Fina Coil Y Temperature [°C]", - "Fina Coil Z Temperature [°C]", - ] - # INIT step (no coil actuation) - init_err = hk_data[0] - init_raw_mag_x = struct.unpack("!f", hk_data[1:5])[0] - init_raw_mag_y = struct.unpack("!f", hk_data[5:9])[0] - init_raw_mag_z = struct.unpack("!f", hk_data[9:13])[0] - init_cal_mag_x = struct.unpack("!f", hk_data[13:17])[0] - init_cal_mag_y = struct.unpack("!f", hk_data[17:21])[0] - init_cal_mag_z = struct.unpack("!f", hk_data[21:25])[0] - init_coil_x_current = struct.unpack("!f", hk_data[25:29])[0] - init_coil_y_current = struct.unpack("!f", hk_data[29:33])[0] - init_coil_z_current = struct.unpack("!f", hk_data[33:37])[0] - init_coil_x_temperature = struct.unpack("!H", hk_data[37:39])[0] - init_coil_y_temperature = struct.unpack("!H", hk_data[39:41])[0] - init_coil_z_temperature = struct.unpack("!H", hk_data[41:43])[0] - - # Actuation step - err = hk_data[43] - raw_mag_x = struct.unpack("!f", hk_data[44:48])[0] - raw_mag_y = struct.unpack("!f", hk_data[48:52])[0] - raw_mag_z = struct.unpack("!f", hk_data[52:56])[0] - cal_mag_x = struct.unpack("!f", hk_data[56:60])[0] - cal_mag_y = struct.unpack("!f", hk_data[60:64])[0] - cal_mag_z = struct.unpack("!f", hk_data[64:68])[0] - coil_x_current = struct.unpack("!f", hk_data[68:72])[0] - coil_y_current = struct.unpack("!f", hk_data[72:76])[0] - coil_z_current = struct.unpack("!f", hk_data[76:80])[0] - coil_x_temperature = struct.unpack("!H", hk_data[80:82])[0] - coil_y_temperature = struct.unpack("!H", hk_data[82:84])[0] - coil_z_temperature = struct.unpack("!H", hk_data[84:86])[0] - - # FINA step (no coil actuation) - fina_err = hk_data[86] - fina_raw_mag_x = struct.unpack("!f", hk_data[87:91])[0] - fina_raw_mag_y = struct.unpack("!f", hk_data[91:95])[0] - fina_raw_mag_z = struct.unpack("!f", hk_data[95:99])[0] - fina_cal_mag_x = struct.unpack("!f", hk_data[99:103])[0] - fina_cal_mag_y = struct.unpack("!f", hk_data[103:107])[0] - fina_cal_mag_z = struct.unpack("!f", hk_data[107:111])[0] - fina_coil_x_current = struct.unpack("!f", hk_data[111:115])[0] - fina_coil_y_current = struct.unpack("!f", hk_data[115:119])[0] - fina_coil_z_current = struct.unpack("!f", hk_data[119:123])[0] - fina_coil_x_temperature = struct.unpack("!H", hk_data[123:125])[0] - fina_coil_y_temperature = struct.unpack("!H", hk_data[125:127])[0] - fina_coil_z_temperature = struct.unpack("!H", hk_data[127:129])[0] - - validity_buffer = hk_data[129:] - content_list = [ - init_err, - init_raw_mag_x, - init_raw_mag_y, - init_raw_mag_z, - init_cal_mag_x, - init_cal_mag_y, - init_cal_mag_z, - init_coil_x_current, - init_coil_y_current, - init_coil_z_current, - init_coil_x_temperature, - init_coil_y_temperature, - init_coil_z_temperature, - err, - raw_mag_x, - raw_mag_y, - raw_mag_z, - cal_mag_x, - cal_mag_y, - cal_mag_z, - coil_x_current, - coil_y_current, - coil_z_current, - coil_x_temperature, - coil_y_temperature, - coil_z_temperature, - fina_err, - fina_raw_mag_x, - fina_raw_mag_y, - fina_raw_mag_z, - fina_cal_mag_x, - fina_cal_mag_y, - fina_cal_mag_z, - fina_coil_x_current, - fina_coil_y_current, - fina_coil_z_current, - fina_coil_x_temperature, - fina_coil_y_temperature, - fina_coil_z_temperature, - ] - num_of_vars = len(header_list) - log_to_both(printer, str(header_list)) - log_to_both(printer, str(content_list)) - printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=num_of_vars) - -def handle_thermal_controller_hk_data(object_id: ObjectId, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): - if set_id == 0: - LOGGER.info("Received Sensor Temperature data") - - # get all the floats - tm_data = struct.unpack("!ffffffffffffffff", hk_data[:16 * 4]) - parsed_data = {} - - # put them into an nice dictionary - parsed_data["SENSOR_PLOC_HEATSPREADER"] = tm_data[0] - parsed_data["SENSOR_PLOC_MISSIONBOARD"] = tm_data[1] - parsed_data["SENSOR_4K_CAMERA"] = tm_data[2] - parsed_data["SENSOR_DAC_HEATSPREADER"] = tm_data[3] - parsed_data["SENSOR_STARTRACKER"] = tm_data[4] - parsed_data["SENSOR_RW1"] = tm_data[5] - parsed_data["SENSOR_DRO"] = tm_data[6] - parsed_data["SENSOR_SCEX"] = tm_data[7] - parsed_data["SENSOR_X8"] = tm_data[8] - parsed_data["SENSOR_HPA"] = tm_data[9] - parsed_data["SENSOR_TX_MODUL"] = tm_data[10] - parsed_data["SENSOR_MPA"] = tm_data[11] - parsed_data["SENSOR_ACU"] = tm_data[12] - parsed_data["SENSOR_PLPCDU_HEATSPREADER"] = tm_data[13] - parsed_data["SENSOR_TCS_BOARD"] = tm_data[14] - parsed_data["SENSOR_MAGNETTORQUER"] = tm_data[15] - - TM_TCP_SERVER.report_parsed_hk_data(object_id, set_id, parsed_data) - -def handle_gps_data(printer: FsfwTmTcPrinter, hk_data: bytes): - LOGGER.info(f"Received GPS data, HK data length {len(hk_data)}") - reply = HkReplyUnpacked() - var_index = 0 - header_list = [ - "Latitude", - "Longitude", - "Altitude", - "Fix Mode", - "Sats in Use", - "Date", - "Unix Seconds", - ] - latitude = struct.unpack("!d", hk_data[0:8])[0] - longitude = struct.unpack("!d", hk_data[8:16])[0] - altitude = struct.unpack("!d", hk_data[16:24])[0] - fix_mode = hk_data[24] - sat_in_use = hk_data[25] - year = struct.unpack("!H", hk_data[26:28])[0] - month = hk_data[28] - day = hk_data[29] - hours = hk_data[30] - minutes = hk_data[31] - seconds = hk_data[32] - date_string = f"{day}.{month}.{year} {hours}:{minutes}:{seconds}" - unix_seconds = struct.unpack("!I", hk_data[33:37])[0] - content_list = [ - latitude, - longitude, - altitude, - fix_mode, - sat_in_use, - date_string, - unix_seconds, - ] - var_index += 13 - reply.num_of_vars = var_index - if not os.path.isfile("gps_log.txt"): - with open("gps_log.txt", "w") as gps_file: - gps_file.write( - "Time, Latitude [deg], Longitude [deg], Altitude [m], Fix Mode, Sats in Use, " - "Date, Unix Seconds\n" - ) - with open("gps_log.txt", "a") as gps_file: - gps_file.write( - f"{datetime.datetime.now()}, {latitude}, {longitude}, {altitude}, " - f"{fix_mode}, {sat_in_use}, {date_string}, {unix_seconds}\n" - ) - validity_buffer = hk_data[37:39] - log_to_both(printer, str(header_list)) - log_to_both(printer, str(content_list)) - printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=10) - -def handle_bpx_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): - if set_id == BpxSetIds.GET_HK_SET: - fmt_str = "!HHHHhhhhIB" - inc_len = struct.calcsize(fmt_str) - ( - charge_current, - discharge_current, - heater_current, - batt_voltage, - batt_temp_1, - batt_temp_2, - batt_temp_3, - batt_temp_4, - reboot_cntr, - boot_cause, - ) = struct.unpack(fmt_str, hk_data[0:inc_len]) - header_list = [ - "Charge Current", - "Discharge Current", - "Heater Current", - "Battery Voltage", - "Batt Temp 1", - "Batt Temp 2", - "Batt Temp 3", - "Batt Temp 4", - "Reboot Counter", - "Boot Cause", - ] - content_list = [ - charge_current, - discharge_current, - heater_current, - batt_voltage, - batt_temp_1, - batt_temp_2, - batt_temp_3, - batt_temp_4, - reboot_cntr, - boot_cause, - ] - validity_buffer = hk_data[inc_len:] - log_to_both(printer, str(header_list)) - log_to_both(printer, str(content_list)) - printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=10) - elif set_id == BpxSetIds.GET_CFG_SET: - battheat_mode = hk_data[0] - battheat_low = struct.unpack("!b", hk_data[1:2])[0] - battheat_high = struct.unpack("!b", hk_data[2:3])[0] - header_list = [ - "Battery Heater Mode", - "Battery Heater Low Limit", - "Battery Heater High Limit", - ] - content_list = [battheat_mode, battheat_low, battheat_high] - validity_buffer = hk_data[3:] - log_to_both(printer, str(header_list)) - log_to_both(printer, str(content_list)) - printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=10) - -def handle_core_hk_data(printer: FsfwTmTcPrinter, hk_data: bytes): - - fmt_str = "!fffH" - inc_len = struct.calcsize(fmt_str) - (temperature, ps_voltage, pl_voltage, tx_agc_value) = struct.unpack( - fmt_str, hk_data[0: 0 + inc_len] - ) - printout = ( - f"Chip Temperature [°C] {temperature} | PS Voltage [mV] {ps_voltage} | " - f"PL Voltage [mV] {pl_voltage} | TX AGC {tx_agc_value}" - ) - log_to_both(printer, printout) - printer.print_validity_buffer(validity_buffer=hk_data[inc_len:], num_vars=4) - -P60_INDEX_LIST = [ - "ACU VCC", - "PDU1 VCC", - "X3 IDLE VCC", - "PDU2 VCC", - "ACU VBAT", - "PDU1 VBAT", - "X3 IDLE VBAT", - "PDU2 VBAT", - "STACK VBAT", - "STACK 3V3", - "STACK 5V", - "GS3V3", - "GS5V", -] - -WDT_LIST = ["GND", "I2C", "CAN", "CSP0", "CSP1"] - -PDU1_CHANNELS_NAMES = [ - "TCS Board", - "Syrlinks", - "Startracker", - "MGT", - "SUS Nominal", - "SCEX", - "PLOC", - "ACS A Side", - "Unused Channel 8", -] - -PDU2_CHANNELS_NAMES = [ - "Q7S", - "Payload PCDU CH1", - "RW", - "TCS Heater In", - "SUS Redundant", - "Deployment Mechanism", - "Payload PCDU CH6", - "ACS B Side", - "Payload Camera", -] - -PDU_CHANNEL_NAMES = [PDU1_CHANNELS_NAMES, PDU2_CHANNELS_NAMES] - -class WdtInfo: - def __init__(self): - self.wdt_reboots_list = [] - self.time_pings_left_list = [] - - def print(self, printer: FsfwTmTcPrinter): - wdt_info = "WDT Type | Reboots | Time or Pings left (CSP only)" - log_to_both(printer, wdt_info) - for idx in range(len(self.wdt_reboots_list)): - log_to_both( - printer, - f"{TmHandler.WDT_LIST[idx].ljust(5)} | " - f"{self.wdt_reboots_list[idx]:010} | {self.time_pings_left_list[idx]:010}", - ) - - def parse(self, wdt_data: bytes, current_idx: int) -> int: - priv_idx = 0 - self.wdt_reboots_list = [] - self.time_pings_left_list = [] - for idx in range(5): - self.wdt_reboots_list.append( - struct.unpack("!I", wdt_data[priv_idx: priv_idx + 4])[0] - ) - priv_idx += 4 - current_idx += 4 - for idx in range(3): - self.time_pings_left_list.append( - struct.unpack("!I", wdt_data[priv_idx: priv_idx + 4])[0] - ) - priv_idx += 4 - current_idx += 4 - for idx in range(2): - self.time_pings_left_list.append(wdt_data[priv_idx]) - current_idx += 1 - priv_idx += 1 - return current_idx - -def handle_pdu_data( - printer: FsfwTmTcPrinter, pdu_idx: int, set_id: int, hk_data: bytes -): - current_idx = 0 - priv_idx = pdu_idx - 1 - if set_id == SetIds.PDU_1_AUX or set_id == SetIds.PDU_2_AUX: - fmt_str = "!hhBBBIIH" - inc_len = struct.calcsize(fmt_str) - ( - vcc, - vbat, - conv_enb_0, - conv_enb_1, - conv_enb_2, - boot_cause, - uptime, - reset_cause, - ) = struct.unpack(fmt_str, hk_data[current_idx: current_idx + inc_len]) - log_to_both(printer, f"VCC {vcc} mV | VBAT {vbat} mV") - log_to_both( - printer, f"Converter Enables [{conv_enb_0},{conv_enb_1},{conv_enb_2}]" - ) - log_to_both( - printer, - f"Boot Cause {boot_cause} | Uptime {uptime} | Reset Cause {reset_cause}", - ) - current_idx += inc_len - latchup_list = [] - log_to_both(printer, "Latchups") - for idx in range(len(TmHandler.PDU1_CHANNELS_NAMES)): - latchup_list.append( - struct.unpack("!H", hk_data[current_idx: current_idx + 2])[0] - ) - content_line = ( - f"{TmHandler.PDU_CHANNEL_NAMES[priv_idx][idx].ljust(24)} | {latchup_list[idx]}" - ) - log_to_both(printer, content_line) - current_idx += 2 - device_types = [] - for idx in range(len(TmHandler.PDU1_CHANNELS_NAMES)): - device_types.append(hk_data[current_idx]) - current_idx += 1 - device_statuses = [] - for idx in range(len(TmHandler.PDU1_CHANNELS_NAMES)): - device_statuses.append(hk_data[current_idx]) - current_idx += 1 - wdt = WdtInfo() - current_idx = wdt.parse(wdt_data=hk_data[current_idx:], current_idx=current_idx) - wdt.print(printer=printer) - if set_id == SetIds.PDU_1_CORE or set_id == SetIds.PDU_2_CORE: - log_to_both(printer, f"Received PDU HK from PDU {pdu_idx}") - current_list = [] - for idx in range(len(TmHandler.PDU1_CHANNELS_NAMES)): - current_list.append( - struct.unpack("!h", hk_data[current_idx: current_idx + 2])[0] - ) - current_idx += 2 - voltage_list = [] - for idx in range(len(TmHandler.PDU1_CHANNELS_NAMES)): - voltage_list.append( - struct.unpack("!H", hk_data[current_idx: current_idx + 2])[0] - ) - current_idx += 2 - output_enb_list = [] - for idx in range(len(TmHandler.PDU1_CHANNELS_NAMES)): - output_enb_list.append(hk_data[current_idx]) - current_idx += 1 - header_str = f"{'Name'.ljust(24)} | OutEnb | U [mV] | I [mA]" - print(header_str) - printer.file_logger.info(header_str) - for idx in range(len(TmHandler.PDU1_CHANNELS_NAMES)): - out_enb = f"{output_enb_list[idx]}".ljust(6) - content_line = ( - f"{TmHandler.PDU_CHANNEL_NAMES[priv_idx][idx].ljust(24)} | {out_enb} | " - f"{voltage_list[idx]:05} | {current_list[idx]:04}" - ) - log_to_both(printer, content_line) - fmt_str = "!IBh" - inc_len = struct.calcsize(fmt_str) - (boot_count, batt_mode, temperature) = struct.unpack( - fmt_str, hk_data[current_idx: current_idx + inc_len] - ) - info = ( - f"Boot Count {boot_count} | Battery Mode {batt_mode} | " - f"Temperature {temperature / 10.0}" - ) - log_to_both(printer, info) - -def handle_p60_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): - if set_id == SetIds.P60_CORE: - log_to_both(printer, "Received P60 Core HK. Voltages in mV, currents in mA") - current_idx = 0 - current_list = [] - for idx in range(13): - current_list.append( - struct.unpack("!h", hk_data[current_idx: current_idx + 2])[0] - ) - current_idx += 2 - voltage_list = [] - for idx in range(13): - voltage_list.append( - struct.unpack("!H", hk_data[current_idx: current_idx + 2])[0] - ) - current_idx += 2 - out_enb_list = [] - for idx in range(13): - out_enb_list.append(hk_data[current_idx]) - current_idx += 1 - header_str = f"{'Name'.ljust(24)} | OutEnb | U [mV] | I [mA]" - print(header_str) - printer.file_logger.info(header_str) - for idx in range(13): - out_enb = f"{out_enb_list[idx]}".ljust(6) - content_line = ( - f"{TmHandler.P60_INDEX_LIST[idx].ljust(24)} | {out_enb} | " - f"{voltage_list[idx]:05} | {current_list[idx]:04}" - ) - log_to_both(printer, content_line) - fmt_str = "!IBhHhh" - inc_len = struct.calcsize(fmt_str) - ( - boot_count, - batt_mode, - batt_current, - batt_voltage, - temp_0, - temp_1, - ) = struct.unpack(fmt_str, hk_data[current_idx: current_idx + inc_len]) - current_idx += inc_len - batt_info = ( - f"Batt: Mode {batt_mode} | Boot Count {boot_count} | " - f"Charge current {batt_current} | Voltage {batt_voltage}" - ) - temps = f"In C: Temp 0 {temp_0 / 10.0} | Temp 1 {temp_1 / 10.0} | " - log_to_both(printer, temps) - log_to_both(printer, batt_info) - printer.print_validity_buffer(validity_buffer=hk_data[current_idx:], num_vars=9) - if set_id == SetIds.P60_AUX: - log_to_both(printer, "Received P60 AUX HK. Voltages in mV, currents in mA") - current_idx = 0 - latchup_list = [] - log_to_both(printer, "P60 Dock Latchups") - for idx in range(0, 13): - latchup_list.append( - struct.unpack("!H", hk_data[current_idx: current_idx + 2])[0] - ) - content_line = f"{TmHandler.P60_INDEX_LIST[idx].ljust(24)} | {latchup_list[idx]}" - log_to_both(printer, content_line) - current_idx += 2 - fmt_str = "!IIHBBHHhhB" - inc_len = struct.calcsize(fmt_str) - ( - boot_cause, - uptime, - reset_cause, - heater_on, - conv_5v_on, - dock_vbat, - dock_vcc_c, - batt_temp_0, - batt_temp_1, - dearm_status, - ) = struct.unpack(fmt_str, hk_data[current_idx: current_idx + inc_len]) - current_idx += inc_len - wdt = WdtInfo() - current_idx = wdt.parse(wdt_data=hk_data[current_idx:], current_idx=current_idx) - fmt_str = "!hhbb" - inc_len = struct.calcsize(fmt_str) - ( - batt_charge_current, - batt_discharge_current, - ant6_depl, - ar6_depl, - ) = struct.unpack(fmt_str, hk_data[current_idx: current_idx + inc_len]) - current_idx += inc_len - device_types = [] - device_statuses = [] - for idx in range(8): - device_types.append(hk_data[current_idx]) - current_idx += 1 - for idx in range(8): - device_statuses.append(hk_data[current_idx]) - current_idx += 1 - util_info = ( - f"Reset Cause {reset_cause} | Boot Cause {boot_cause} | Uptime {uptime}" - ) - util_info_2 = ( - f"Conv 5V on {conv_5v_on} | Heater On {heater_on} | " - f"Dock VBAT {dock_vbat} | DOCK VCC Current {dock_vcc_c}" - ) - log_to_both(printer, util_info) - log_to_both(printer, util_info_2) - wdt.print(printer) - misc_info = ( - f"Dearm {dearm_status} | ANT6 Depl {ant6_depl} | AR6 Deply {ar6_depl}" - ) - log_to_both(printer, misc_info) - batt_info = ( - f"Batt Temp 0 {batt_temp_0 / 10.0} | Batt Temp 1 {batt_temp_1 / 10.0} | " - f"Charge Current {batt_charge_current} | Discharge Current {batt_discharge_current}" - ) - log_to_both(printer, batt_info) - printer.print_validity_buffer( - validity_buffer=hk_data[current_idx:], num_vars=27 - ) diff --git a/pus_tm/system/__init__.py b/pus_tm/system/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pus_tm/system/core.py b/pus_tm/system/core.py new file mode 100644 index 0000000..883d044 --- /dev/null +++ b/pus_tm/system/core.py @@ -0,0 +1,21 @@ +import struct + +from pus_tm.defs import PrintWrapper +from pus_tc.system.core import SetIds +from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter + + +def handle_core_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): + if set_id == SetIds.HK: + pw = PrintWrapper(printer) + fmt_str = "!fff" + inc_len = struct.calcsize(fmt_str) + (temperature, ps_voltage, pl_voltage) = struct.unpack( + fmt_str, hk_data[0 : 0 + inc_len] + ) + printout = ( + f"Chip Temperature [°C] {temperature} | PS Voltage [mV] {ps_voltage} | " + f"PL Voltage [mV] {pl_voltage}" + ) + pw.dlog(printout) + printer.print_validity_buffer(validity_buffer=hk_data[inc_len:], num_vars=3) diff --git a/pus_tm/system/tcs.py b/pus_tm/system/tcs.py new file mode 100644 index 0000000..867347c --- /dev/null +++ b/pus_tm/system/tcs.py @@ -0,0 +1,41 @@ +import struct + +from pus_tm.defs import PrintWrapper +from pus_tm.tm_tcp_server import TmTcpServer +from tmtccmd.utility import ObjectId +from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter + + +TM_TCP_SERVER = TmTcpServer.getInstance() + + +def handle_thermal_controller_hk_data( + object_id: ObjectId, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes +): + if set_id == 0: + pw = PrintWrapper(printer) + pw.dlog("Received Sensor Temperature data") + + # get all the floats + tm_data = struct.unpack("!ffffffffffffffff", hk_data[: 16 * 4]) + parsed_data = {} + + # put them into an nice dictionary + parsed_data["SENSOR_PLOC_HEATSPREADER"] = tm_data[0] + parsed_data["SENSOR_PLOC_MISSIONBOARD"] = tm_data[1] + parsed_data["SENSOR_4K_CAMERA"] = tm_data[2] + parsed_data["SENSOR_DAC_HEATSPREADER"] = tm_data[3] + parsed_data["SENSOR_STARTRACKER"] = tm_data[4] + parsed_data["SENSOR_RW1"] = tm_data[5] + parsed_data["SENSOR_DRO"] = tm_data[6] + parsed_data["SENSOR_SCEX"] = tm_data[7] + parsed_data["SENSOR_X8"] = tm_data[8] + parsed_data["SENSOR_HPA"] = tm_data[9] + parsed_data["SENSOR_TX_MODUL"] = tm_data[10] + parsed_data["SENSOR_MPA"] = tm_data[11] + parsed_data["SENSOR_ACU"] = tm_data[12] + parsed_data["SENSOR_PLPCDU_HEATSPREADER"] = tm_data[13] + parsed_data["SENSOR_TCS_BOARD"] = tm_data[14] + parsed_data["SENSOR_MAGNETTORQUER"] = tm_data[15] + + TM_TCP_SERVER.report_parsed_hk_data(object_id, set_id, parsed_data) diff --git a/pus_tm/tm_tcp_server.py b/pus_tm/tm_tcp_server.py index 4bf0138..f6d0e88 100644 --- a/pus_tm/tm_tcp_server.py +++ b/pus_tm/tm_tcp_server.py @@ -18,8 +18,7 @@ class TmTcpServer: _Instance = None - def __init__( - self): + def __init__(self): self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -65,7 +64,7 @@ class TmTcpServer: # dle encode the bytes # adding a newline because someone might want to look at it in a console - data_json_bytes = self.dle_encoder.encode(data_json_bytes + b'\n') + data_json_bytes = self.dle_encoder.encode(data_json_bytes + b"\n") try: sent_length = self.client_connection.send(data_json_bytes) @@ -76,27 +75,25 @@ class TmTcpServer: self.client_connection.close() self.client_connection = None - def report_raw_hk_data(self, object_id: ObjectId, - set_id: int, - hk_data: bytes): + def report_raw_hk_data(self, object_id: ObjectId, set_id: int, hk_data: bytes): - data_dict = {} - data_dict["type"] = "TM" - data_dict["tmType"] = "Raw HK" - data_dict["objectId"] = object_id.as_string - data_dict["setId"] = set_id - data_dict["rawData"] = base64.b64encode(hk_data).decode() + data_dict = { + "type": "TM", + "tmType": "Raw HK", + "objectId": object_id.as_string, + "setId": set_id, + "rawData": base64.b64encode(hk_data).decode(), + } self._send_dictionary_over_socket(data_dict) - def report_parsed_hk_data(self, object_id: ObjectId, - set_id: int, - data_dictionary): - data_dict = {} - data_dict["type"] = "TM" - data_dict["tmType"] = "Parsed HK" - data_dict["objectId"] = object_id.as_string - data_dict["setId"] = set_id - data_dict["content"] = data_dictionary + def report_parsed_hk_data(self, object_id: ObjectId, set_id: int, data_dictionary): + data_dict = { + "type": "TM", + "tmType": "Parsed HK", + "objectId": object_id.as_string, + "setId": set_id, + "content": data_dictionary, + } - self._send_dictionary_over_socket(data_dict) \ No newline at end of file + self._send_dictionary_over_socket(data_dict) diff --git a/tmtccmd b/tmtccmd index de8656e..208f577 160000 --- a/tmtccmd +++ b/tmtccmd @@ -1 +1 @@ -Subproject commit de8656e3a657782e457630ea10de0fce08856f35 +Subproject commit 208f5779327db817841f073d0bbdd96da491b5d9 diff --git a/tmtcloop.py b/tmtcloop.py index 52d1848..0a7fdc8 100755 --- a/tmtcloop.py +++ b/tmtcloop.py @@ -1,6 +1,5 @@ #!/usr/bin/env python3 """EIVE TMTC Commander""" -from distutils.log import debug import sys import traceback @@ -63,11 +62,11 @@ def main(): tmtc_backend.set_mode(CoreModeList.CONTINUOUS_MODE) - get_console_logger().info("Disabling console logger for continuous operation") - get_console_logger().setLevel("ERROR") + # get_console_logger().info("Disabling console logger for continuous operation") + # get_console_logger().setLevel("ERROR") tmtccmd.init_and_start_daemons(tmtc_backend=tmtc_backend) - tmtccmd.performOperation(tmtc_backend=tmtc_backend) + tmtc_backend.perform_operation() # remove cmdline args so that we can reuse code sys.argv = sys.argv[:1] @@ -81,7 +80,7 @@ def main(): tmtc_backend.set_opcode(args.op_code) tmtc_backend.set_mode(CoreModeList.CONTINUOUS_MODE) - tmtccmd.performOperation(tmtc_backend=tmtc_backend) + tmtc_backend.perform_operation() if __name__ == "__main__":