continued hk parsing
This commit is contained in:
parent
480e0f07e0
commit
260a083091
@ -5,6 +5,7 @@ from config.object_ids import ACS_BOARD_ASS_ID, SUS_BOARD_ASS_ID
|
||||
|
||||
from .common import command_mode
|
||||
|
||||
|
||||
class AcsOpCodes:
|
||||
ACS_ASS_A_SIDE = ["0", "acs-a"]
|
||||
ACS_ASS_B_SIDE = ["1", "acs-b"]
|
||||
|
@ -2,7 +2,6 @@ from ast import Pass
|
||||
from tmtccmd.tc.definitions import TcQueueT
|
||||
|
||||
|
||||
|
||||
from .common import command_mode
|
||||
import config.object_ids as obj_ids
|
||||
|
||||
@ -32,6 +31,7 @@ def pack_controller_commands(tc_queue: TcQueueT, op_code: str):
|
||||
info=op_code + " to " + str(mode) + "," + str(submode),
|
||||
)
|
||||
|
||||
|
||||
def get_object_from_op_code(op_code: str):
|
||||
try:
|
||||
return bytes.fromhex(op_code)
|
||||
|
66
pus_tm/devs/bpx_bat.py
Normal file
66
pus_tm/devs/bpx_bat.py
Normal file
@ -0,0 +1,66 @@
|
||||
import struct
|
||||
|
||||
from pus_tc.devs.bpx_batt import BpxSetIds
|
||||
from pus_tm.defs import PrintWrapper
|
||||
from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter
|
||||
|
||||
|
||||
def handle_bpx_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
|
||||
pw = PrintWrapper(printer)
|
||||
if set_id == BpxSetIds.GET_HK_SET:
|
||||
fmt_str = "!HHHHhhhhIB"
|
||||
inc_len = struct.calcsize(fmt_str)
|
||||
(
|
||||
charge_current,
|
||||
discharge_current,
|
||||
heater_current,
|
||||
batt_voltage,
|
||||
batt_temp_1,
|
||||
batt_temp_2,
|
||||
batt_temp_3,
|
||||
batt_temp_4,
|
||||
reboot_cntr,
|
||||
boot_cause,
|
||||
) = struct.unpack(fmt_str, hk_data[0:inc_len])
|
||||
header_list = [
|
||||
"Charge Current",
|
||||
"Discharge Current",
|
||||
"Heater Current",
|
||||
"Battery Voltage",
|
||||
"Batt Temp 1",
|
||||
"Batt Temp 2",
|
||||
"Batt Temp 3",
|
||||
"Batt Temp 4",
|
||||
"Reboot Counter",
|
||||
"Boot Cause",
|
||||
]
|
||||
content_list = [
|
||||
charge_current,
|
||||
discharge_current,
|
||||
heater_current,
|
||||
batt_voltage,
|
||||
batt_temp_1,
|
||||
batt_temp_2,
|
||||
batt_temp_3,
|
||||
batt_temp_4,
|
||||
reboot_cntr,
|
||||
boot_cause,
|
||||
]
|
||||
validity_buffer = hk_data[inc_len:]
|
||||
pw.dlog(str(header_list))
|
||||
pw.dlog(str(content_list))
|
||||
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=10)
|
||||
elif set_id == BpxSetIds.GET_CFG_SET:
|
||||
battheat_mode = hk_data[0]
|
||||
battheat_low = struct.unpack("!b", hk_data[1:2])[0]
|
||||
battheat_high = struct.unpack("!b", hk_data[2:3])[0]
|
||||
header_list = [
|
||||
"Battery Heater Mode",
|
||||
"Battery Heater Low Limit",
|
||||
"Battery Heater High Limit",
|
||||
]
|
||||
content_list = [battheat_mode, battheat_low, battheat_high]
|
||||
validity_buffer = hk_data[3:]
|
||||
pw.dlog(str(header_list))
|
||||
pw.dlog(str(content_list))
|
||||
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=10)
|
59
pus_tm/devs/gps.py
Normal file
59
pus_tm/devs/gps.py
Normal file
@ -0,0 +1,59 @@
|
||||
import os
|
||||
import struct
|
||||
from datetime import datetime
|
||||
|
||||
from pus_tm.defs import PrintWrapper
|
||||
from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter
|
||||
|
||||
|
||||
def handle_gps_data(printer: FsfwTmTcPrinter, hk_data: bytes):
|
||||
pw = PrintWrapper(printer)
|
||||
pw.dlog(f"Received GPS data, HK data length {len(hk_data)}")
|
||||
var_index = 0
|
||||
header_list = [
|
||||
"Latitude",
|
||||
"Longitude",
|
||||
"Altitude",
|
||||
"Fix Mode",
|
||||
"Sats in Use",
|
||||
"Date",
|
||||
"Unix Seconds",
|
||||
]
|
||||
latitude = struct.unpack("!d", hk_data[0:8])[0]
|
||||
longitude = struct.unpack("!d", hk_data[8:16])[0]
|
||||
altitude = struct.unpack("!d", hk_data[16:24])[0]
|
||||
fix_mode = hk_data[24]
|
||||
sat_in_use = hk_data[25]
|
||||
year = struct.unpack("!H", hk_data[26:28])[0]
|
||||
month = hk_data[28]
|
||||
day = hk_data[29]
|
||||
hours = hk_data[30]
|
||||
minutes = hk_data[31]
|
||||
seconds = hk_data[32]
|
||||
date_string = f"{day}.{month}.{year} {hours}:{minutes}:{seconds}"
|
||||
unix_seconds = struct.unpack("!I", hk_data[33:37])[0]
|
||||
content_list = [
|
||||
latitude,
|
||||
longitude,
|
||||
altitude,
|
||||
fix_mode,
|
||||
sat_in_use,
|
||||
date_string,
|
||||
unix_seconds,
|
||||
]
|
||||
var_index += 13
|
||||
if not os.path.isfile("gps_log.txt"):
|
||||
with open("gps_log.txt", "w") as gps_file:
|
||||
gps_file.write(
|
||||
"Time, Latitude [deg], Longitude [deg], Altitude [m], Fix Mode, Sats in Use, "
|
||||
"Date, Unix Seconds\n"
|
||||
)
|
||||
with open("gps_log.txt", "a") as gps_file:
|
||||
gps_file.write(
|
||||
f"{datetime.now()}, {latitude}, {longitude}, {altitude}, "
|
||||
f"{fix_mode}, {sat_in_use}, {date_string}, {unix_seconds}\n"
|
||||
)
|
||||
validity_buffer = hk_data[37:39]
|
||||
pw.dlog(str(header_list))
|
||||
pw.dlog(str(content_list))
|
||||
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=10)
|
8
pus_tm/devs/gyros.py
Normal file
8
pus_tm/devs/gyros.py
Normal file
@ -0,0 +1,8 @@
|
||||
from tmtccmd.utility import ObjectId
|
||||
from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter
|
||||
|
||||
|
||||
def handle_gyros_hk_data(
|
||||
object_id: ObjectId, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes
|
||||
):
|
||||
pass
|
140
pus_tm/devs/imtq_mgt.py
Normal file
140
pus_tm/devs/imtq_mgt.py
Normal file
@ -0,0 +1,140 @@
|
||||
import struct
|
||||
|
||||
from pus_tm.defs import PrintWrapper
|
||||
from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter
|
||||
|
||||
|
||||
def handle_self_test_data(printer: FsfwTmTcPrinter, hk_data: bytes):
|
||||
pw = PrintWrapper(printer)
|
||||
header_list = [
|
||||
"Init Err",
|
||||
"Init Raw Mag X [nT]",
|
||||
"Init Raw Mag Y [nT]",
|
||||
"Init Raw Mag Z [nT]",
|
||||
"Init Cal Mag X [nT]",
|
||||
"Init Cal Mag Y [nT]",
|
||||
"Init Cal Mag Z [nT]",
|
||||
"Init Coil X Current [mA]",
|
||||
"Init Coil Y Current [mA]",
|
||||
"Init Coil Z Current [mA]",
|
||||
"Init Coil X Temperature [°C]",
|
||||
"Init Coil Y Temperature [°C]",
|
||||
"Init Coil Z Temperature [°C]",
|
||||
"Err",
|
||||
"Raw Mag X [nT]",
|
||||
"Raw Mag Y [nT]",
|
||||
"Raw Mag Z [nT]",
|
||||
"Cal Mag X [nT]",
|
||||
"Cal Mag Y [nT]",
|
||||
"Cal Mag Z [nT]",
|
||||
"Coil X Current [mA]",
|
||||
"Coil Y Current [mA]",
|
||||
"Coil Z Current [mA]",
|
||||
"Coil X Temperature [°C]",
|
||||
"Coil Y Temperature [°C]",
|
||||
"Coil Z Temperature [°C]",
|
||||
"Fina Err",
|
||||
"Fina Raw Mag X [nT]",
|
||||
"Fina Raw Mag Y [nT]",
|
||||
"Fina Raw Mag Z [nT]",
|
||||
"Fina Cal Mag X [nT]",
|
||||
"Fina Cal Mag Y [nT]",
|
||||
"Fina Cal Mag Z [nT]",
|
||||
"Fina Coil X Current [mA]",
|
||||
"Fina Coil Y Current [mA]",
|
||||
"Fina Coil Z Current [mA]",
|
||||
"Fina Coil X Temperature [°C]",
|
||||
"Fina Coil Y Temperature [°C]",
|
||||
"Fina Coil Z Temperature [°C]",
|
||||
]
|
||||
# INIT step (no coil actuation)
|
||||
init_err = hk_data[0]
|
||||
init_raw_mag_x = struct.unpack("!f", hk_data[1:5])[0]
|
||||
init_raw_mag_y = struct.unpack("!f", hk_data[5:9])[0]
|
||||
init_raw_mag_z = struct.unpack("!f", hk_data[9:13])[0]
|
||||
init_cal_mag_x = struct.unpack("!f", hk_data[13:17])[0]
|
||||
init_cal_mag_y = struct.unpack("!f", hk_data[17:21])[0]
|
||||
init_cal_mag_z = struct.unpack("!f", hk_data[21:25])[0]
|
||||
init_coil_x_current = struct.unpack("!f", hk_data[25:29])[0]
|
||||
init_coil_y_current = struct.unpack("!f", hk_data[29:33])[0]
|
||||
init_coil_z_current = struct.unpack("!f", hk_data[33:37])[0]
|
||||
init_coil_x_temperature = struct.unpack("!H", hk_data[37:39])[0]
|
||||
init_coil_y_temperature = struct.unpack("!H", hk_data[39:41])[0]
|
||||
init_coil_z_temperature = struct.unpack("!H", hk_data[41:43])[0]
|
||||
|
||||
# Actuation step
|
||||
err = hk_data[43]
|
||||
raw_mag_x = struct.unpack("!f", hk_data[44:48])[0]
|
||||
raw_mag_y = struct.unpack("!f", hk_data[48:52])[0]
|
||||
raw_mag_z = struct.unpack("!f", hk_data[52:56])[0]
|
||||
cal_mag_x = struct.unpack("!f", hk_data[56:60])[0]
|
||||
cal_mag_y = struct.unpack("!f", hk_data[60:64])[0]
|
||||
cal_mag_z = struct.unpack("!f", hk_data[64:68])[0]
|
||||
coil_x_current = struct.unpack("!f", hk_data[68:72])[0]
|
||||
coil_y_current = struct.unpack("!f", hk_data[72:76])[0]
|
||||
coil_z_current = struct.unpack("!f", hk_data[76:80])[0]
|
||||
coil_x_temperature = struct.unpack("!H", hk_data[80:82])[0]
|
||||
coil_y_temperature = struct.unpack("!H", hk_data[82:84])[0]
|
||||
coil_z_temperature = struct.unpack("!H", hk_data[84:86])[0]
|
||||
|
||||
# FINA step (no coil actuation)
|
||||
fina_err = hk_data[86]
|
||||
fina_raw_mag_x = struct.unpack("!f", hk_data[87:91])[0]
|
||||
fina_raw_mag_y = struct.unpack("!f", hk_data[91:95])[0]
|
||||
fina_raw_mag_z = struct.unpack("!f", hk_data[95:99])[0]
|
||||
fina_cal_mag_x = struct.unpack("!f", hk_data[99:103])[0]
|
||||
fina_cal_mag_y = struct.unpack("!f", hk_data[103:107])[0]
|
||||
fina_cal_mag_z = struct.unpack("!f", hk_data[107:111])[0]
|
||||
fina_coil_x_current = struct.unpack("!f", hk_data[111:115])[0]
|
||||
fina_coil_y_current = struct.unpack("!f", hk_data[115:119])[0]
|
||||
fina_coil_z_current = struct.unpack("!f", hk_data[119:123])[0]
|
||||
fina_coil_x_temperature = struct.unpack("!H", hk_data[123:125])[0]
|
||||
fina_coil_y_temperature = struct.unpack("!H", hk_data[125:127])[0]
|
||||
fina_coil_z_temperature = struct.unpack("!H", hk_data[127:129])[0]
|
||||
|
||||
validity_buffer = hk_data[129:]
|
||||
content_list = [
|
||||
init_err,
|
||||
init_raw_mag_x,
|
||||
init_raw_mag_y,
|
||||
init_raw_mag_z,
|
||||
init_cal_mag_x,
|
||||
init_cal_mag_y,
|
||||
init_cal_mag_z,
|
||||
init_coil_x_current,
|
||||
init_coil_y_current,
|
||||
init_coil_z_current,
|
||||
init_coil_x_temperature,
|
||||
init_coil_y_temperature,
|
||||
init_coil_z_temperature,
|
||||
err,
|
||||
raw_mag_x,
|
||||
init_raw_mag_y,
|
||||
raw_mag_z,
|
||||
cal_mag_x,
|
||||
cal_mag_y,
|
||||
cal_mag_z,
|
||||
coil_x_current,
|
||||
coil_y_current,
|
||||
coil_z_current,
|
||||
coil_x_temperature,
|
||||
coil_y_temperature,
|
||||
coil_z_temperature,
|
||||
fina_err,
|
||||
fina_raw_mag_x,
|
||||
fina_raw_mag_y,
|
||||
fina_raw_mag_z,
|
||||
fina_cal_mag_x,
|
||||
fina_cal_mag_y,
|
||||
fina_cal_mag_z,
|
||||
fina_coil_x_current,
|
||||
fina_coil_y_current,
|
||||
fina_coil_z_current,
|
||||
fina_coil_x_temperature,
|
||||
fina_coil_y_temperature,
|
||||
fina_coil_z_temperature,
|
||||
]
|
||||
num_of_vars = len(header_list)
|
||||
pw.dlog(str(header_list))
|
||||
pw.dlog(str(content_list))
|
||||
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=num_of_vars)
|
8
pus_tm/devs/mgms.py
Normal file
8
pus_tm/devs/mgms.py
Normal file
@ -0,0 +1,8 @@
|
||||
from tmtccmd.utility import ObjectId
|
||||
from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter
|
||||
|
||||
|
||||
def handle_mgm_hk_data(
|
||||
object_id: ObjectId, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes
|
||||
):
|
||||
pass
|
294
pus_tm/devs/pcdu.py
Normal file
294
pus_tm/devs/pcdu.py
Normal file
@ -0,0 +1,294 @@
|
||||
import struct
|
||||
|
||||
from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter
|
||||
from pus_tm.defs import PrintWrapper
|
||||
from gomspace.gomspace_common import SetIds
|
||||
|
||||
P60_INDEX_LIST = [
|
||||
"ACU VCC",
|
||||
"PDU1 VCC",
|
||||
"X3 IDLE VCC",
|
||||
"PDU2 VCC",
|
||||
"ACU VBAT",
|
||||
"PDU1 VBAT",
|
||||
"X3 IDLE VBAT",
|
||||
"PDU2 VBAT",
|
||||
"STACK VBAT",
|
||||
"STACK 3V3",
|
||||
"STACK 5V",
|
||||
"GS3V3",
|
||||
"GS5V",
|
||||
]
|
||||
|
||||
WDT_LIST = ["GND", "I2C", "CAN", "CSP0", "CSP1"]
|
||||
|
||||
PDU1_CHANNELS_NAMES = [
|
||||
"TCS Board",
|
||||
"Syrlinks",
|
||||
"Startracker",
|
||||
"MGT",
|
||||
"SUS Nominal",
|
||||
"SCEX",
|
||||
"PLOC",
|
||||
"ACS A Side",
|
||||
"Unused Channel 8",
|
||||
]
|
||||
|
||||
PDU2_CHANNELS_NAMES = [
|
||||
"Q7S",
|
||||
"Payload PCDU CH1",
|
||||
"RW",
|
||||
"TCS Heater In",
|
||||
"SUS Redundant",
|
||||
"Deployment Mechanism",
|
||||
"Payload PCDU CH6",
|
||||
"ACS B Side",
|
||||
"Payload Camera",
|
||||
]
|
||||
|
||||
PDU_CHANNEL_NAMES = [PDU1_CHANNELS_NAMES, PDU2_CHANNELS_NAMES]
|
||||
|
||||
|
||||
class WdtInfo:
|
||||
def __init__(self, pw: PrintWrapper):
|
||||
self.wdt_reboots_list = []
|
||||
self.time_pings_left_list = []
|
||||
self.pw = pw
|
||||
|
||||
def print(self):
|
||||
wdt_info = "WDT Type | Reboots | Time or Pings left (CSP only)"
|
||||
self.pw.dlog(wdt_info)
|
||||
for idx in range(len(self.wdt_reboots_list)):
|
||||
self.pw.dlog(
|
||||
f"{WDT_LIST[idx].ljust(5)} | "
|
||||
f"{self.wdt_reboots_list[idx]:010} | {self.time_pings_left_list[idx]:010}",
|
||||
)
|
||||
|
||||
def parse(self, wdt_data: bytes, current_idx: int) -> int:
|
||||
priv_idx = 0
|
||||
self.wdt_reboots_list = []
|
||||
self.time_pings_left_list = []
|
||||
for idx in range(5):
|
||||
self.wdt_reboots_list.append(
|
||||
struct.unpack("!I", wdt_data[priv_idx : priv_idx + 4])[0]
|
||||
)
|
||||
priv_idx += 4
|
||||
current_idx += 4
|
||||
for idx in range(3):
|
||||
self.time_pings_left_list.append(
|
||||
struct.unpack("!I", wdt_data[priv_idx : priv_idx + 4])[0]
|
||||
)
|
||||
priv_idx += 4
|
||||
current_idx += 4
|
||||
for idx in range(2):
|
||||
self.time_pings_left_list.append(wdt_data[priv_idx])
|
||||
current_idx += 1
|
||||
priv_idx += 1
|
||||
return current_idx
|
||||
|
||||
|
||||
def handle_pdu_data(
|
||||
printer: FsfwTmTcPrinter, pdu_idx: int, set_id: int, hk_data: bytes
|
||||
):
|
||||
pw = PrintWrapper(printer=printer)
|
||||
current_idx = 0
|
||||
priv_idx = pdu_idx - 1
|
||||
if set_id == SetIds.PDU_1_AUX or set_id == SetIds.PDU_2_AUX:
|
||||
fmt_str = "!hhBBBIIH"
|
||||
inc_len = struct.calcsize(fmt_str)
|
||||
(
|
||||
vcc,
|
||||
vbat,
|
||||
conv_enb_0,
|
||||
conv_enb_1,
|
||||
conv_enb_2,
|
||||
boot_cause,
|
||||
uptime,
|
||||
reset_cause,
|
||||
) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len])
|
||||
pw.dlog(f"VCC {vcc} mV | VBAT {vbat} mV")
|
||||
pw.dlog(f"Converter Enables [{conv_enb_0},{conv_enb_1},{conv_enb_2}]")
|
||||
pw.dlog(
|
||||
f"Boot Cause {boot_cause} | Uptime {uptime} | Reset Cause {reset_cause}",
|
||||
)
|
||||
current_idx += inc_len
|
||||
latchup_list = []
|
||||
pw.dlog("Latchups")
|
||||
for idx in range(len(PDU1_CHANNELS_NAMES)):
|
||||
latchup_list.append(
|
||||
struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0]
|
||||
)
|
||||
content_line = (
|
||||
f"{PDU_CHANNEL_NAMES[priv_idx][idx].ljust(24)} | {latchup_list[idx]}"
|
||||
)
|
||||
pw.dlog(content_line)
|
||||
current_idx += 2
|
||||
device_types = []
|
||||
for idx in range(len(PDU1_CHANNELS_NAMES)):
|
||||
device_types.append(hk_data[current_idx])
|
||||
current_idx += 1
|
||||
device_statuses = []
|
||||
for idx in range(len(PDU1_CHANNELS_NAMES)):
|
||||
device_statuses.append(hk_data[current_idx])
|
||||
current_idx += 1
|
||||
wdt = WdtInfo(pw=pw)
|
||||
current_idx = wdt.parse(wdt_data=hk_data[current_idx:], current_idx=current_idx)
|
||||
wdt.print()
|
||||
if set_id == SetIds.PDU_1_CORE or set_id == SetIds.PDU_2_CORE:
|
||||
pw.dlog(f"Received PDU HK from PDU {pdu_idx}")
|
||||
current_list = []
|
||||
for idx in range(len(PDU1_CHANNELS_NAMES)):
|
||||
current_list.append(
|
||||
struct.unpack("!h", hk_data[current_idx : current_idx + 2])[0]
|
||||
)
|
||||
current_idx += 2
|
||||
voltage_list = []
|
||||
for idx in range(len(PDU1_CHANNELS_NAMES)):
|
||||
voltage_list.append(
|
||||
struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0]
|
||||
)
|
||||
current_idx += 2
|
||||
output_enb_list = []
|
||||
for idx in range(len(PDU1_CHANNELS_NAMES)):
|
||||
output_enb_list.append(hk_data[current_idx])
|
||||
current_idx += 1
|
||||
header_str = f"{'Name'.ljust(24)} | OutEnb | U [mV] | I [mA]"
|
||||
print(header_str)
|
||||
printer.file_logger.info(header_str)
|
||||
for idx in range(len(PDU1_CHANNELS_NAMES)):
|
||||
out_enb = f"{output_enb_list[idx]}".ljust(6)
|
||||
content_line = (
|
||||
f"{PDU_CHANNEL_NAMES[priv_idx][idx].ljust(24)} | {out_enb} | "
|
||||
f"{voltage_list[idx]:05} | {current_list[idx]:04}"
|
||||
)
|
||||
pw.dlog(content_line)
|
||||
fmt_str = "!IBh"
|
||||
inc_len = struct.calcsize(fmt_str)
|
||||
(boot_count, batt_mode, temperature) = struct.unpack(
|
||||
fmt_str, hk_data[current_idx : current_idx + inc_len]
|
||||
)
|
||||
info = (
|
||||
f"Boot Count {boot_count} | Battery Mode {batt_mode} | "
|
||||
f"Temperature {temperature / 10.0}"
|
||||
)
|
||||
pw.dlog(info)
|
||||
|
||||
|
||||
def handle_p60_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
|
||||
pw = PrintWrapper(printer=printer)
|
||||
if set_id == SetIds.P60_CORE:
|
||||
pw.dlog("Received P60 Core HK. Voltages in mV, currents in mA")
|
||||
current_idx = 0
|
||||
current_list = []
|
||||
for idx in range(13):
|
||||
current_list.append(
|
||||
struct.unpack("!h", hk_data[current_idx : current_idx + 2])[0]
|
||||
)
|
||||
current_idx += 2
|
||||
voltage_list = []
|
||||
for idx in range(13):
|
||||
voltage_list.append(
|
||||
struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0]
|
||||
)
|
||||
current_idx += 2
|
||||
out_enb_list = []
|
||||
for idx in range(13):
|
||||
out_enb_list.append(hk_data[current_idx])
|
||||
current_idx += 1
|
||||
header_str = f"{'Name'.ljust(24)} | OutEnb | U [mV] | I [mA]"
|
||||
print(header_str)
|
||||
printer.file_logger.info(header_str)
|
||||
for idx in range(13):
|
||||
out_enb = f"{out_enb_list[idx]}".ljust(6)
|
||||
content_line = (
|
||||
f"{P60_INDEX_LIST[idx].ljust(24)} | {out_enb} | "
|
||||
f"{voltage_list[idx]:05} | {current_list[idx]:04}"
|
||||
)
|
||||
pw.dlog(content_line)
|
||||
fmt_str = "!IBhHhh"
|
||||
inc_len = struct.calcsize(fmt_str)
|
||||
(
|
||||
boot_count,
|
||||
batt_mode,
|
||||
batt_current,
|
||||
batt_voltage,
|
||||
temp_0,
|
||||
temp_1,
|
||||
) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len])
|
||||
current_idx += inc_len
|
||||
batt_info = (
|
||||
f"Batt: Mode {batt_mode} | Boot Count {boot_count} | "
|
||||
f"Charge current {batt_current} | Voltage {batt_voltage}"
|
||||
)
|
||||
temps = f"In C: Temp 0 {temp_0 / 10.0} | Temp 1 {temp_1 / 10.0} | "
|
||||
pw.dlog(temps)
|
||||
pw.dlog(batt_info)
|
||||
printer.print_validity_buffer(validity_buffer=hk_data[current_idx:], num_vars=9)
|
||||
if set_id == SetIds.P60_AUX:
|
||||
pw.dlog("Received P60 AUX HK. Voltages in mV, currents in mA")
|
||||
current_idx = 0
|
||||
latchup_list = []
|
||||
pw.dlog("P60 Dock Latchups")
|
||||
for idx in range(0, 13):
|
||||
latchup_list.append(
|
||||
struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0]
|
||||
)
|
||||
content_line = f"{P60_INDEX_LIST[idx].ljust(24)} | {latchup_list[idx]}"
|
||||
pw.dlog(content_line)
|
||||
current_idx += 2
|
||||
fmt_str = "!IIHBBHHhhB"
|
||||
inc_len = struct.calcsize(fmt_str)
|
||||
(
|
||||
boot_cause,
|
||||
uptime,
|
||||
reset_cause,
|
||||
heater_on,
|
||||
conv_5v_on,
|
||||
dock_vbat,
|
||||
dock_vcc_c,
|
||||
batt_temp_0,
|
||||
batt_temp_1,
|
||||
dearm_status,
|
||||
) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len])
|
||||
current_idx += inc_len
|
||||
wdt = WdtInfo(pw=pw)
|
||||
current_idx = wdt.parse(wdt_data=hk_data[current_idx:], current_idx=current_idx)
|
||||
fmt_str = "!hhbb"
|
||||
inc_len = struct.calcsize(fmt_str)
|
||||
(
|
||||
batt_charge_current,
|
||||
batt_discharge_current,
|
||||
ant6_depl,
|
||||
ar6_depl,
|
||||
) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len])
|
||||
current_idx += inc_len
|
||||
device_types = []
|
||||
device_statuses = []
|
||||
for idx in range(8):
|
||||
device_types.append(hk_data[current_idx])
|
||||
current_idx += 1
|
||||
for idx in range(8):
|
||||
device_statuses.append(hk_data[current_idx])
|
||||
current_idx += 1
|
||||
util_info = (
|
||||
f"Reset Cause {reset_cause} | Boot Cause {boot_cause} | Uptime {uptime}"
|
||||
)
|
||||
util_info_2 = (
|
||||
f"Conv 5V on {conv_5v_on} | Heater On {heater_on} | "
|
||||
f"Dock VBAT {dock_vbat} | DOCK VCC Current {dock_vcc_c}"
|
||||
)
|
||||
pw.dlog(util_info)
|
||||
pw.dlog(util_info_2)
|
||||
wdt.print()
|
||||
misc_info = (
|
||||
f"Dearm {dearm_status} | ANT6 Depl {ant6_depl} | AR6 Deply {ar6_depl}"
|
||||
)
|
||||
pw.dlog(misc_info)
|
||||
batt_info = (
|
||||
f"Batt Temp 0 {batt_temp_0 / 10.0} | Batt Temp 1 {batt_temp_1 / 10.0} | "
|
||||
f"Charge Current {batt_charge_current} | Discharge Current {batt_discharge_current}"
|
||||
)
|
||||
pw.dlog(batt_info)
|
||||
printer.print_validity_buffer(
|
||||
validity_buffer=hk_data[current_idx:], num_vars=27
|
||||
)
|
67
pus_tm/devs/syrlinks.py
Normal file
67
pus_tm/devs/syrlinks.py
Normal file
@ -0,0 +1,67 @@
|
||||
import struct
|
||||
|
||||
from pus_tm.defs import PrintWrapper
|
||||
from pus_tc.devs.syrlinks_hk_handler import SetIds
|
||||
from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter
|
||||
|
||||
|
||||
def handle_syrlinks_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
|
||||
if set_id == SetIds.RX_REGISTERS_DATASET:
|
||||
return handle_syrlinks_rx_registers_dataset(printer, hk_data)
|
||||
elif set_id == SetIds.TX_REGISTERS_DATASET:
|
||||
return handle_syrlinks_tx_registers_dataset(printer, hk_data)
|
||||
else:
|
||||
pw = PrintWrapper(printer)
|
||||
pw.dlog(f"Service 3 TM: Syrlinks handler reply with unknown set ID {set_id}")
|
||||
|
||||
|
||||
def handle_syrlinks_rx_registers_dataset(printer: FsfwTmTcPrinter, hk_data: bytes):
|
||||
pw = PrintWrapper(printer)
|
||||
header_list = [
|
||||
"RX Status",
|
||||
"RX Sensitivity",
|
||||
"RX Frequency Shift",
|
||||
"RX IQ Power",
|
||||
"RX AGC Value",
|
||||
"RX Demod Eb",
|
||||
"RX Demod N0",
|
||||
"RX Datarate",
|
||||
]
|
||||
rx_status = hk_data[0]
|
||||
rx_sensitivity = struct.unpack("!I", hk_data[1:5])
|
||||
rx_frequency_shift = struct.unpack("!I", hk_data[5:9])
|
||||
rx_iq_power = struct.unpack("!H", hk_data[9:11])
|
||||
rx_agc_value = struct.unpack("!H", hk_data[11:13])
|
||||
rx_demod_eb = struct.unpack("!I", hk_data[13:17])
|
||||
rx_demod_n0 = struct.unpack("!I", hk_data[17:21])
|
||||
rx_data_rate = hk_data[21]
|
||||
content_list = [
|
||||
rx_status,
|
||||
rx_sensitivity,
|
||||
rx_frequency_shift,
|
||||
rx_iq_power,
|
||||
rx_agc_value,
|
||||
rx_demod_eb,
|
||||
rx_demod_n0,
|
||||
rx_data_rate,
|
||||
]
|
||||
validity_buffer = hk_data[22:]
|
||||
pw.dlog(str(header_list))
|
||||
pw.dlog(str(content_list))
|
||||
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=8)
|
||||
|
||||
|
||||
def handle_syrlinks_tx_registers_dataset(
|
||||
printer: FsfwTmTcPrinter,
|
||||
hk_data: bytes,
|
||||
):
|
||||
pw = PrintWrapper(printer)
|
||||
header_list = ["TX Status", "TX Waveform", "TX AGC value"]
|
||||
tx_status = hk_data[0]
|
||||
tx_waveform = hk_data[1]
|
||||
tx_agc_value = struct.unpack("!H", hk_data[2:4])
|
||||
content_list = [tx_status, tx_waveform, tx_agc_value]
|
||||
validity_buffer = hk_data[4:]
|
||||
pw.dlog(str(header_list))
|
||||
pw.dlog(str(content_list))
|
||||
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=3)
|
@ -33,7 +33,6 @@ def ccsds_tm_handler(apid: int, raw_tm_packet: bytes, _user_args: any) -> None:
|
||||
pus_factory_hook(raw_tm_packet=raw_tm_packet)
|
||||
|
||||
|
||||
|
||||
def pus_factory_hook(raw_tm_packet: bytes):
|
||||
if len(raw_tm_packet) < 8:
|
||||
LOGGER.warning("Detected packet shorter than 8 bytes!")
|
||||
@ -81,6 +80,5 @@ def pus_factory_hook(raw_tm_packet: bytes):
|
||||
packet=raw_tm_packet, srv_subservice=(service_type, subservice_type)
|
||||
)
|
||||
except ValueError:
|
||||
# TODO: Log faulty packet
|
||||
LOGGER.warning("Invalid packet format detected")
|
||||
log_raw_unknown_packet(packet=raw_tm_packet, packet_type=PacketTypes.TM)
|
||||
|
@ -1,31 +1,32 @@
|
||||
"""HK Handling for EIVE OBSW"""
|
||||
import struct
|
||||
import os
|
||||
import datetime
|
||||
|
||||
|
||||
|
||||
from pus_tm.system.tcs import handle_thermal_controller_hk_data
|
||||
from tmtccmd.config.definitions import HkReplyUnpacked
|
||||
from tmtccmd.tm.pus_3_fsfw_hk import (
|
||||
Service3Base,
|
||||
HkContentType,
|
||||
Service3FsfwTm,
|
||||
)
|
||||
from tmtccmd.logging import get_console_logger
|
||||
from pus_tc.devs.bpx_batt import BpxSetIds
|
||||
from pus_tc.devs.syrlinks_hk_handler import SetIds
|
||||
from pus_tc.devs.p60dock import SetIds
|
||||
from pus_tc.devs.imtq import ImtqSetIds
|
||||
from tmtccmd.utility.obj_id import ObjectId, ObjectIdDictT
|
||||
import config.object_ids as obj_ids
|
||||
from tmtccmd.logging import get_console_logger
|
||||
|
||||
from pus_tm.devs.bpx_bat import handle_bpx_hk_data
|
||||
from pus_tm.devs.gps import handle_gps_data
|
||||
from pus_tm.devs.gyros import handle_gyros_hk_data
|
||||
from pus_tm.devs.imtq_mgt import handle_self_test_data
|
||||
from pus_tm.devs.pcdu import handle_pdu_data, handle_p60_hk_data
|
||||
from pus_tm.devs.syrlinks import handle_syrlinks_hk_data
|
||||
from pus_tc.devs.imtq import ImtqSetIds
|
||||
from pus_tm.devs.reaction_wheels import handle_rw_hk_data
|
||||
from pus_tm.defs import FsfwTmTcPrinter, log_to_both
|
||||
from pus_tm.system.core import handle_core_hk_data
|
||||
from pus_tm.devs.mgms import handle_mgm_hk_data
|
||||
import config.object_ids as obj_ids
|
||||
|
||||
from pus_tm.tm_tcp_server import TmTcpServer
|
||||
|
||||
|
||||
|
||||
LOGGER = get_console_logger()
|
||||
|
||||
TM_TCP_SERVER = TmTcpServer.getInstance()
|
||||
@ -42,9 +43,9 @@ def handle_hk_packet(
|
||||
named_obj_id = tm_packet.object_id
|
||||
if tm_packet.subservice == 25 or tm_packet.subservice == 26:
|
||||
hk_data = tm_packet.tm_data[8:]
|
||||
TM_TCP_SERVER.report_raw_hk_data(object_id=named_obj_id,
|
||||
set_id=tm_packet.set_id,
|
||||
hk_data=hk_data)
|
||||
TM_TCP_SERVER.report_raw_hk_data(
|
||||
object_id=named_obj_id, set_id=tm_packet.set_id, hk_data=hk_data
|
||||
)
|
||||
printer.generic_hk_tm_print(
|
||||
content_type=HkContentType.HK,
|
||||
object_id=named_obj_id,
|
||||
@ -60,6 +61,7 @@ def handle_hk_packet(
|
||||
if tm_packet.subservice == 10 or tm_packet.subservice == 12:
|
||||
LOGGER.warning("HK definitions printout not implemented yet")
|
||||
|
||||
|
||||
def handle_regular_hk_print(
|
||||
printer: FsfwTmTcPrinter,
|
||||
object_id: ObjectId,
|
||||
@ -72,12 +74,7 @@ def handle_regular_hk_print(
|
||||
if object_id in [obj_ids.RW1_ID, obj_ids.RW2_ID, obj_ids.RW3_ID, obj_ids.RW4_ID]:
|
||||
handle_rw_hk_data(printer, object_id, set_id, hk_data)
|
||||
if objb == obj_ids.SYRLINKS_HANDLER_ID:
|
||||
if set_id == SetIds.RX_REGISTERS_DATASET:
|
||||
return handle_syrlinks_rx_registers_dataset(printer, hk_data)
|
||||
elif set_id == SetIds.TX_REGISTERS_DATASET:
|
||||
return handle_syrlinks_tx_registers_dataset(printer, hk_data)
|
||||
else:
|
||||
LOGGER.info("Service 3 TM: Syrlinks handler reply with unknown set id")
|
||||
handle_syrlinks_hk_data(printer=printer, hk_data=hk_data, set_id=set_id)
|
||||
if objb == obj_ids.IMTQ_HANDLER_ID:
|
||||
if (set_id >= ImtqSetIds.POSITIVE_X_TEST) and (
|
||||
set_id <= ImtqSetIds.NEGATIVE_Z_TEST
|
||||
@ -90,7 +87,7 @@ def handle_regular_hk_print(
|
||||
if objb == obj_ids.BPX_HANDLER_ID:
|
||||
handle_bpx_hk_data(hk_data=hk_data, set_id=set_id, printer=printer)
|
||||
if objb == obj_ids.CORE_CONTROLLER_ID:
|
||||
return handle_core_hk_data(printer=printer, hk_data=hk_data)
|
||||
return handle_core_hk_data(printer=printer, hk_data=hk_data, set_id=set_id)
|
||||
if objb == obj_ids.PDU_1_HANDLER_ID:
|
||||
return handle_pdu_data(
|
||||
printer=printer, pdu_idx=1, set_id=set_id, hk_data=hk_data
|
||||
@ -105,636 +102,30 @@ def handle_regular_hk_print(
|
||||
)
|
||||
if objb == obj_ids.P60_DOCK_HANDLER:
|
||||
handle_p60_hk_data(printer=printer, set_id=set_id, hk_data=hk_data)
|
||||
if objb in [
|
||||
obj_ids.GYRO_0_HANDLER_ID,
|
||||
obj_ids.GYRO_1_HANDLER_ID,
|
||||
obj_ids.GYRO_2_HANDLER_ID,
|
||||
obj_ids.GYRO_3_HANDLER_ID,
|
||||
]:
|
||||
handle_gyros_hk_data(
|
||||
object_id=object_id, hk_data=hk_data, printer=printer, set_id=set_id
|
||||
)
|
||||
if objb in [
|
||||
obj_ids.MGM_0_HANDLER_ID,
|
||||
obj_ids.MGM_1_HANDLER_ID,
|
||||
obj_ids.MGM_2_HANDLER_ID,
|
||||
obj_ids.MGM_3_HANDLER_ID,
|
||||
]:
|
||||
handle_mgm_hk_data(
|
||||
object_id=object_id, hk_data=hk_data, printer=printer, set_id=set_id
|
||||
)
|
||||
if objb == obj_ids.PL_PCDU_ID:
|
||||
log_to_both(printer, "Received PL PCDU HK data")
|
||||
if objb == obj_ids.THERMAL_CONTROLLER_ID:
|
||||
handle_thermal_controller_hk_data( object_id=object_id, printer=printer, set_id=set_id, hk_data=hk_data)
|
||||
handle_thermal_controller_hk_data(
|
||||
object_id=object_id, printer=printer, set_id=set_id, hk_data=hk_data
|
||||
)
|
||||
else:
|
||||
LOGGER.info("Service 3 TM: Parsing for this SID has not been implemented.")
|
||||
return HkReplyUnpacked()
|
||||
|
||||
def handle_syrlinks_rx_registers_dataset( printer: FsfwTmTcPrinter, hk_data: bytes):
|
||||
reply = HkReplyUnpacked()
|
||||
header_list = [
|
||||
"RX Status",
|
||||
"RX Sensitivity",
|
||||
"RX Frequency Shift",
|
||||
"RX IQ Power",
|
||||
"RX AGC Value",
|
||||
"RX Demod Eb",
|
||||
"RX Demod N0",
|
||||
"RX Datarate",
|
||||
]
|
||||
rx_status = hk_data[0]
|
||||
rx_sensitivity = struct.unpack("!I", hk_data[1:5])
|
||||
rx_frequency_shift = struct.unpack("!I", hk_data[5:9])
|
||||
rx_iq_power = struct.unpack("!H", hk_data[9:11])
|
||||
rx_agc_value = struct.unpack("!H", hk_data[11:13])
|
||||
rx_demod_eb = struct.unpack("!I", hk_data[13:17])
|
||||
rx_demod_n0 = struct.unpack("!I", hk_data[17:21])
|
||||
rx_data_rate = hk_data[21]
|
||||
content_list = [
|
||||
rx_status,
|
||||
rx_sensitivity,
|
||||
rx_frequency_shift,
|
||||
rx_iq_power,
|
||||
rx_agc_value,
|
||||
rx_demod_eb,
|
||||
rx_demod_n0,
|
||||
rx_data_rate,
|
||||
]
|
||||
validity_buffer = hk_data[22:]
|
||||
log_to_both(printer, str(header_list))
|
||||
log_to_both(printer, str(content_list))
|
||||
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=8)
|
||||
|
||||
def handle_syrlinks_tx_registers_dataset(
|
||||
printer: FsfwTmTcPrinter,
|
||||
hk_data: bytes,
|
||||
):
|
||||
reply = HkReplyUnpacked()
|
||||
header_list = ["TX Status", "TX Waveform", "TX AGC value"]
|
||||
tx_status = hk_data[0]
|
||||
tx_waveform = hk_data[1]
|
||||
tx_agc_value = struct.unpack("!H", hk_data[2:4])
|
||||
content_list = [tx_status, tx_waveform, tx_agc_value]
|
||||
validity_buffer = hk_data[4:]
|
||||
log_to_both(printer, str(header_list))
|
||||
log_to_both(printer, str(content_list))
|
||||
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=3)
|
||||
|
||||
def handle_self_test_data(printer: FsfwTmTcPrinter, hk_data: bytes):
|
||||
header_list = [
|
||||
"Init Err",
|
||||
"Init Raw Mag X [nT]",
|
||||
"Init Raw Mag Y [nT]",
|
||||
"Init Raw Mag Z [nT]",
|
||||
"Init Cal Mag X [nT]",
|
||||
"Init Cal Mag Y [nT]",
|
||||
"Init Cal Mag Z [nT]",
|
||||
"Init Coil X Current [mA]",
|
||||
"Init Coil Y Current [mA]",
|
||||
"Init Coil Z Current [mA]",
|
||||
"Init Coil X Temperature [°C]",
|
||||
"Init Coil Y Temperature [°C]",
|
||||
"Init Coil Z Temperature [°C]",
|
||||
"Err",
|
||||
"Raw Mag X [nT]",
|
||||
"Raw Mag Y [nT]",
|
||||
"Raw Mag Z [nT]",
|
||||
"Cal Mag X [nT]",
|
||||
"Cal Mag Y [nT]",
|
||||
"Cal Mag Z [nT]",
|
||||
"Coil X Current [mA]",
|
||||
"Coil Y Current [mA]",
|
||||
"Coil Z Current [mA]",
|
||||
"Coil X Temperature [°C]",
|
||||
"Coil Y Temperature [°C]",
|
||||
"Coil Z Temperature [°C]",
|
||||
"Fina Err",
|
||||
"Fina Raw Mag X [nT]",
|
||||
"Fina Raw Mag Y [nT]",
|
||||
"Fina Raw Mag Z [nT]",
|
||||
"Fina Cal Mag X [nT]",
|
||||
"Fina Cal Mag Y [nT]",
|
||||
"Fina Cal Mag Z [nT]",
|
||||
"Fina Coil X Current [mA]",
|
||||
"Fina Coil Y Current [mA]",
|
||||
"Fina Coil Z Current [mA]",
|
||||
"Fina Coil X Temperature [°C]",
|
||||
"Fina Coil Y Temperature [°C]",
|
||||
"Fina Coil Z Temperature [°C]",
|
||||
]
|
||||
# INIT step (no coil actuation)
|
||||
init_err = hk_data[0]
|
||||
init_raw_mag_x = struct.unpack("!f", hk_data[1:5])[0]
|
||||
init_raw_mag_y = struct.unpack("!f", hk_data[5:9])[0]
|
||||
init_raw_mag_z = struct.unpack("!f", hk_data[9:13])[0]
|
||||
init_cal_mag_x = struct.unpack("!f", hk_data[13:17])[0]
|
||||
init_cal_mag_y = struct.unpack("!f", hk_data[17:21])[0]
|
||||
init_cal_mag_z = struct.unpack("!f", hk_data[21:25])[0]
|
||||
init_coil_x_current = struct.unpack("!f", hk_data[25:29])[0]
|
||||
init_coil_y_current = struct.unpack("!f", hk_data[29:33])[0]
|
||||
init_coil_z_current = struct.unpack("!f", hk_data[33:37])[0]
|
||||
init_coil_x_temperature = struct.unpack("!H", hk_data[37:39])[0]
|
||||
init_coil_y_temperature = struct.unpack("!H", hk_data[39:41])[0]
|
||||
init_coil_z_temperature = struct.unpack("!H", hk_data[41:43])[0]
|
||||
|
||||
# Actuation step
|
||||
err = hk_data[43]
|
||||
raw_mag_x = struct.unpack("!f", hk_data[44:48])[0]
|
||||
raw_mag_y = struct.unpack("!f", hk_data[48:52])[0]
|
||||
raw_mag_z = struct.unpack("!f", hk_data[52:56])[0]
|
||||
cal_mag_x = struct.unpack("!f", hk_data[56:60])[0]
|
||||
cal_mag_y = struct.unpack("!f", hk_data[60:64])[0]
|
||||
cal_mag_z = struct.unpack("!f", hk_data[64:68])[0]
|
||||
coil_x_current = struct.unpack("!f", hk_data[68:72])[0]
|
||||
coil_y_current = struct.unpack("!f", hk_data[72:76])[0]
|
||||
coil_z_current = struct.unpack("!f", hk_data[76:80])[0]
|
||||
coil_x_temperature = struct.unpack("!H", hk_data[80:82])[0]
|
||||
coil_y_temperature = struct.unpack("!H", hk_data[82:84])[0]
|
||||
coil_z_temperature = struct.unpack("!H", hk_data[84:86])[0]
|
||||
|
||||
# FINA step (no coil actuation)
|
||||
fina_err = hk_data[86]
|
||||
fina_raw_mag_x = struct.unpack("!f", hk_data[87:91])[0]
|
||||
fina_raw_mag_y = struct.unpack("!f", hk_data[91:95])[0]
|
||||
fina_raw_mag_z = struct.unpack("!f", hk_data[95:99])[0]
|
||||
fina_cal_mag_x = struct.unpack("!f", hk_data[99:103])[0]
|
||||
fina_cal_mag_y = struct.unpack("!f", hk_data[103:107])[0]
|
||||
fina_cal_mag_z = struct.unpack("!f", hk_data[107:111])[0]
|
||||
fina_coil_x_current = struct.unpack("!f", hk_data[111:115])[0]
|
||||
fina_coil_y_current = struct.unpack("!f", hk_data[115:119])[0]
|
||||
fina_coil_z_current = struct.unpack("!f", hk_data[119:123])[0]
|
||||
fina_coil_x_temperature = struct.unpack("!H", hk_data[123:125])[0]
|
||||
fina_coil_y_temperature = struct.unpack("!H", hk_data[125:127])[0]
|
||||
fina_coil_z_temperature = struct.unpack("!H", hk_data[127:129])[0]
|
||||
|
||||
validity_buffer = hk_data[129:]
|
||||
content_list = [
|
||||
init_err,
|
||||
init_raw_mag_x,
|
||||
init_raw_mag_y,
|
||||
init_raw_mag_z,
|
||||
init_cal_mag_x,
|
||||
init_cal_mag_y,
|
||||
init_cal_mag_z,
|
||||
init_coil_x_current,
|
||||
init_coil_y_current,
|
||||
init_coil_z_current,
|
||||
init_coil_x_temperature,
|
||||
init_coil_y_temperature,
|
||||
init_coil_z_temperature,
|
||||
err,
|
||||
raw_mag_x,
|
||||
raw_mag_y,
|
||||
raw_mag_z,
|
||||
cal_mag_x,
|
||||
cal_mag_y,
|
||||
cal_mag_z,
|
||||
coil_x_current,
|
||||
coil_y_current,
|
||||
coil_z_current,
|
||||
coil_x_temperature,
|
||||
coil_y_temperature,
|
||||
coil_z_temperature,
|
||||
fina_err,
|
||||
fina_raw_mag_x,
|
||||
fina_raw_mag_y,
|
||||
fina_raw_mag_z,
|
||||
fina_cal_mag_x,
|
||||
fina_cal_mag_y,
|
||||
fina_cal_mag_z,
|
||||
fina_coil_x_current,
|
||||
fina_coil_y_current,
|
||||
fina_coil_z_current,
|
||||
fina_coil_x_temperature,
|
||||
fina_coil_y_temperature,
|
||||
fina_coil_z_temperature,
|
||||
]
|
||||
num_of_vars = len(header_list)
|
||||
log_to_both(printer, str(header_list))
|
||||
log_to_both(printer, str(content_list))
|
||||
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=num_of_vars)
|
||||
|
||||
def handle_thermal_controller_hk_data(object_id: ObjectId, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
|
||||
if set_id == 0:
|
||||
LOGGER.info("Received Sensor Temperature data")
|
||||
|
||||
# get all the floats
|
||||
tm_data = struct.unpack("!ffffffffffffffff", hk_data[:16 * 4])
|
||||
parsed_data = {}
|
||||
|
||||
# put them into an list with their names
|
||||
parsed_data = []
|
||||
parsed_data.append({"SENSOR_PLOC_HEATSPREADER": tm_data[0]})
|
||||
parsed_data.append({"SENSOR_PLOC_MISSIONBOARD": tm_data[1]})
|
||||
parsed_data.append({"SENSOR_4K_CAMERA": tm_data[2]})
|
||||
parsed_data.append({"SENSOR_DAC_HEATSPREADER": tm_data[3]})
|
||||
parsed_data.append({"SENSOR_STARTRACKER": tm_data[4]})
|
||||
parsed_data.append({"SENSOR_RW1": tm_data[5]})
|
||||
parsed_data.append({"SENSOR_DRO": tm_data[6]})
|
||||
parsed_data.append({"SENSOR_SCEX": tm_data[7]})
|
||||
parsed_data.append({"SENSOR_X8": tm_data[8]})
|
||||
parsed_data.append({"SENSOR_HPA": tm_data[9]})
|
||||
parsed_data.append({"SENSOR_TX_MODUL": tm_data[10]})
|
||||
parsed_data.append({"SENSOR_MPA": tm_data[11]})
|
||||
parsed_data.append({"SENSOR_ACU": tm_data[12]})
|
||||
parsed_data.append({"SENSOR_PLPCDU_HEATSPREADER": tm_data[13]})
|
||||
parsed_data.append({"SENSOR_TCS_BOARD": tm_data[14]})
|
||||
parsed_data.append({"SENSOR_MAGNETTORQUER": tm_data[15]})
|
||||
|
||||
TM_TCP_SERVER.report_parsed_hk_data(object_id, set_id, parsed_data)
|
||||
|
||||
def handle_gps_data(printer: FsfwTmTcPrinter, hk_data: bytes):
|
||||
LOGGER.info(f"Received GPS data, HK data length {len(hk_data)}")
|
||||
reply = HkReplyUnpacked()
|
||||
var_index = 0
|
||||
header_list = [
|
||||
"Latitude",
|
||||
"Longitude",
|
||||
"Altitude",
|
||||
"Fix Mode",
|
||||
"Sats in Use",
|
||||
"Date",
|
||||
"Unix Seconds",
|
||||
]
|
||||
latitude = struct.unpack("!d", hk_data[0:8])[0]
|
||||
longitude = struct.unpack("!d", hk_data[8:16])[0]
|
||||
altitude = struct.unpack("!d", hk_data[16:24])[0]
|
||||
fix_mode = hk_data[24]
|
||||
sat_in_use = hk_data[25]
|
||||
year = struct.unpack("!H", hk_data[26:28])[0]
|
||||
month = hk_data[28]
|
||||
day = hk_data[29]
|
||||
hours = hk_data[30]
|
||||
minutes = hk_data[31]
|
||||
seconds = hk_data[32]
|
||||
date_string = f"{day}.{month}.{year} {hours}:{minutes}:{seconds}"
|
||||
unix_seconds = struct.unpack("!I", hk_data[33:37])[0]
|
||||
content_list = [
|
||||
latitude,
|
||||
longitude,
|
||||
altitude,
|
||||
fix_mode,
|
||||
sat_in_use,
|
||||
date_string,
|
||||
unix_seconds,
|
||||
]
|
||||
var_index += 13
|
||||
reply.num_of_vars = var_index
|
||||
if not os.path.isfile("gps_log.txt"):
|
||||
with open("gps_log.txt", "w") as gps_file:
|
||||
gps_file.write(
|
||||
"Time, Latitude [deg], Longitude [deg], Altitude [m], Fix Mode, Sats in Use, "
|
||||
"Date, Unix Seconds\n"
|
||||
)
|
||||
with open("gps_log.txt", "a") as gps_file:
|
||||
gps_file.write(
|
||||
f"{datetime.datetime.now()}, {latitude}, {longitude}, {altitude}, "
|
||||
f"{fix_mode}, {sat_in_use}, {date_string}, {unix_seconds}\n"
|
||||
)
|
||||
validity_buffer = hk_data[37:39]
|
||||
log_to_both(printer, str(header_list))
|
||||
log_to_both(printer, str(content_list))
|
||||
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=10)
|
||||
|
||||
def handle_bpx_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
|
||||
if set_id == BpxSetIds.GET_HK_SET:
|
||||
fmt_str = "!HHHHhhhhIB"
|
||||
inc_len = struct.calcsize(fmt_str)
|
||||
(
|
||||
charge_current,
|
||||
discharge_current,
|
||||
heater_current,
|
||||
batt_voltage,
|
||||
batt_temp_1,
|
||||
batt_temp_2,
|
||||
batt_temp_3,
|
||||
batt_temp_4,
|
||||
reboot_cntr,
|
||||
boot_cause,
|
||||
) = struct.unpack(fmt_str, hk_data[0:inc_len])
|
||||
header_list = [
|
||||
"Charge Current",
|
||||
"Discharge Current",
|
||||
"Heater Current",
|
||||
"Battery Voltage",
|
||||
"Batt Temp 1",
|
||||
"Batt Temp 2",
|
||||
"Batt Temp 3",
|
||||
"Batt Temp 4",
|
||||
"Reboot Counter",
|
||||
"Boot Cause",
|
||||
]
|
||||
content_list = [
|
||||
charge_current,
|
||||
discharge_current,
|
||||
heater_current,
|
||||
batt_voltage,
|
||||
batt_temp_1,
|
||||
batt_temp_2,
|
||||
batt_temp_3,
|
||||
batt_temp_4,
|
||||
reboot_cntr,
|
||||
boot_cause,
|
||||
]
|
||||
validity_buffer = hk_data[inc_len:]
|
||||
log_to_both(printer, str(header_list))
|
||||
log_to_both(printer, str(content_list))
|
||||
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=10)
|
||||
elif set_id == BpxSetIds.GET_CFG_SET:
|
||||
battheat_mode = hk_data[0]
|
||||
battheat_low = struct.unpack("!b", hk_data[1:2])[0]
|
||||
battheat_high = struct.unpack("!b", hk_data[2:3])[0]
|
||||
header_list = [
|
||||
"Battery Heater Mode",
|
||||
"Battery Heater Low Limit",
|
||||
"Battery Heater High Limit",
|
||||
]
|
||||
content_list = [battheat_mode, battheat_low, battheat_high]
|
||||
validity_buffer = hk_data[3:]
|
||||
log_to_both(printer, str(header_list))
|
||||
log_to_both(printer, str(content_list))
|
||||
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=10)
|
||||
|
||||
def handle_core_hk_data(printer: FsfwTmTcPrinter, hk_data: bytes):
|
||||
|
||||
fmt_str = "!fffH"
|
||||
inc_len = struct.calcsize(fmt_str)
|
||||
(temperature, ps_voltage, pl_voltage, tx_agc_value) = struct.unpack(
|
||||
fmt_str, hk_data[0: 0 + inc_len]
|
||||
)
|
||||
printout = (
|
||||
f"Chip Temperature [°C] {temperature} | PS Voltage [mV] {ps_voltage} | "
|
||||
f"PL Voltage [mV] {pl_voltage} | TX AGC {tx_agc_value}"
|
||||
)
|
||||
log_to_both(printer, printout)
|
||||
printer.print_validity_buffer(validity_buffer=hk_data[inc_len:], num_vars=4)
|
||||
|
||||
P60_INDEX_LIST = [
|
||||
"ACU VCC",
|
||||
"PDU1 VCC",
|
||||
"X3 IDLE VCC",
|
||||
"PDU2 VCC",
|
||||
"ACU VBAT",
|
||||
"PDU1 VBAT",
|
||||
"X3 IDLE VBAT",
|
||||
"PDU2 VBAT",
|
||||
"STACK VBAT",
|
||||
"STACK 3V3",
|
||||
"STACK 5V",
|
||||
"GS3V3",
|
||||
"GS5V",
|
||||
]
|
||||
|
||||
WDT_LIST = ["GND", "I2C", "CAN", "CSP0", "CSP1"]
|
||||
|
||||
PDU1_CHANNELS_NAMES = [
|
||||
"TCS Board",
|
||||
"Syrlinks",
|
||||
"Startracker",
|
||||
"MGT",
|
||||
"SUS Nominal",
|
||||
"SCEX",
|
||||
"PLOC",
|
||||
"ACS A Side",
|
||||
"Unused Channel 8",
|
||||
]
|
||||
|
||||
PDU2_CHANNELS_NAMES = [
|
||||
"Q7S",
|
||||
"Payload PCDU CH1",
|
||||
"RW",
|
||||
"TCS Heater In",
|
||||
"SUS Redundant",
|
||||
"Deployment Mechanism",
|
||||
"Payload PCDU CH6",
|
||||
"ACS B Side",
|
||||
"Payload Camera",
|
||||
]
|
||||
|
||||
PDU_CHANNEL_NAMES = [PDU1_CHANNELS_NAMES, PDU2_CHANNELS_NAMES]
|
||||
|
||||
class WdtInfo:
|
||||
def __init__(self):
|
||||
self.wdt_reboots_list = []
|
||||
self.time_pings_left_list = []
|
||||
|
||||
def print(self, printer: FsfwTmTcPrinter):
|
||||
wdt_info = "WDT Type | Reboots | Time or Pings left (CSP only)"
|
||||
log_to_both(printer, wdt_info)
|
||||
for idx in range(len(self.wdt_reboots_list)):
|
||||
log_to_both(
|
||||
printer,
|
||||
f"{TmHandler.WDT_LIST[idx].ljust(5)} | "
|
||||
f"{self.wdt_reboots_list[idx]:010} | {self.time_pings_left_list[idx]:010}",
|
||||
)
|
||||
|
||||
def parse(self, wdt_data: bytes, current_idx: int) -> int:
|
||||
priv_idx = 0
|
||||
self.wdt_reboots_list = []
|
||||
self.time_pings_left_list = []
|
||||
for idx in range(5):
|
||||
self.wdt_reboots_list.append(
|
||||
struct.unpack("!I", wdt_data[priv_idx: priv_idx + 4])[0]
|
||||
)
|
||||
priv_idx += 4
|
||||
current_idx += 4
|
||||
for idx in range(3):
|
||||
self.time_pings_left_list.append(
|
||||
struct.unpack("!I", wdt_data[priv_idx: priv_idx + 4])[0]
|
||||
)
|
||||
priv_idx += 4
|
||||
current_idx += 4
|
||||
for idx in range(2):
|
||||
self.time_pings_left_list.append(wdt_data[priv_idx])
|
||||
current_idx += 1
|
||||
priv_idx += 1
|
||||
return current_idx
|
||||
|
||||
def handle_pdu_data(
|
||||
printer: FsfwTmTcPrinter, pdu_idx: int, set_id: int, hk_data: bytes
|
||||
):
|
||||
current_idx = 0
|
||||
priv_idx = pdu_idx - 1
|
||||
if set_id == SetIds.PDU_1_AUX or set_id == SetIds.PDU_2_AUX:
|
||||
fmt_str = "!hhBBBIIH"
|
||||
inc_len = struct.calcsize(fmt_str)
|
||||
(
|
||||
vcc,
|
||||
vbat,
|
||||
conv_enb_0,
|
||||
conv_enb_1,
|
||||
conv_enb_2,
|
||||
boot_cause,
|
||||
uptime,
|
||||
reset_cause,
|
||||
) = struct.unpack(fmt_str, hk_data[current_idx: current_idx + inc_len])
|
||||
log_to_both(printer, f"VCC {vcc} mV | VBAT {vbat} mV")
|
||||
log_to_both(
|
||||
printer, f"Converter Enables [{conv_enb_0},{conv_enb_1},{conv_enb_2}]"
|
||||
)
|
||||
log_to_both(
|
||||
printer,
|
||||
f"Boot Cause {boot_cause} | Uptime {uptime} | Reset Cause {reset_cause}",
|
||||
)
|
||||
current_idx += inc_len
|
||||
latchup_list = []
|
||||
log_to_both(printer, "Latchups")
|
||||
for idx in range(len(TmHandler.PDU1_CHANNELS_NAMES)):
|
||||
latchup_list.append(
|
||||
struct.unpack("!H", hk_data[current_idx: current_idx + 2])[0]
|
||||
)
|
||||
content_line = (
|
||||
f"{TmHandler.PDU_CHANNEL_NAMES[priv_idx][idx].ljust(24)} | {latchup_list[idx]}"
|
||||
)
|
||||
log_to_both(printer, content_line)
|
||||
current_idx += 2
|
||||
device_types = []
|
||||
for idx in range(len(TmHandler.PDU1_CHANNELS_NAMES)):
|
||||
device_types.append(hk_data[current_idx])
|
||||
current_idx += 1
|
||||
device_statuses = []
|
||||
for idx in range(len(TmHandler.PDU1_CHANNELS_NAMES)):
|
||||
device_statuses.append(hk_data[current_idx])
|
||||
current_idx += 1
|
||||
wdt = WdtInfo()
|
||||
current_idx = wdt.parse(wdt_data=hk_data[current_idx:], current_idx=current_idx)
|
||||
wdt.print(printer=printer)
|
||||
if set_id == SetIds.PDU_1_CORE or set_id == SetIds.PDU_2_CORE:
|
||||
log_to_both(printer, f"Received PDU HK from PDU {pdu_idx}")
|
||||
current_list = []
|
||||
for idx in range(len(TmHandler.PDU1_CHANNELS_NAMES)):
|
||||
current_list.append(
|
||||
struct.unpack("!h", hk_data[current_idx: current_idx + 2])[0]
|
||||
)
|
||||
current_idx += 2
|
||||
voltage_list = []
|
||||
for idx in range(len(TmHandler.PDU1_CHANNELS_NAMES)):
|
||||
voltage_list.append(
|
||||
struct.unpack("!H", hk_data[current_idx: current_idx + 2])[0]
|
||||
)
|
||||
current_idx += 2
|
||||
output_enb_list = []
|
||||
for idx in range(len(TmHandler.PDU1_CHANNELS_NAMES)):
|
||||
output_enb_list.append(hk_data[current_idx])
|
||||
current_idx += 1
|
||||
header_str = f"{'Name'.ljust(24)} | OutEnb | U [mV] | I [mA]"
|
||||
print(header_str)
|
||||
printer.file_logger.info(header_str)
|
||||
for idx in range(len(TmHandler.PDU1_CHANNELS_NAMES)):
|
||||
out_enb = f"{output_enb_list[idx]}".ljust(6)
|
||||
content_line = (
|
||||
f"{TmHandler.PDU_CHANNEL_NAMES[priv_idx][idx].ljust(24)} | {out_enb} | "
|
||||
f"{voltage_list[idx]:05} | {current_list[idx]:04}"
|
||||
)
|
||||
log_to_both(printer, content_line)
|
||||
fmt_str = "!IBh"
|
||||
inc_len = struct.calcsize(fmt_str)
|
||||
(boot_count, batt_mode, temperature) = struct.unpack(
|
||||
fmt_str, hk_data[current_idx: current_idx + inc_len]
|
||||
)
|
||||
info = (
|
||||
f"Boot Count {boot_count} | Battery Mode {batt_mode} | "
|
||||
f"Temperature {temperature / 10.0}"
|
||||
)
|
||||
log_to_both(printer, info)
|
||||
|
||||
def handle_p60_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
|
||||
if set_id == SetIds.P60_CORE:
|
||||
log_to_both(printer, "Received P60 Core HK. Voltages in mV, currents in mA")
|
||||
current_idx = 0
|
||||
current_list = []
|
||||
for idx in range(13):
|
||||
current_list.append(
|
||||
struct.unpack("!h", hk_data[current_idx: current_idx + 2])[0]
|
||||
)
|
||||
current_idx += 2
|
||||
voltage_list = []
|
||||
for idx in range(13):
|
||||
voltage_list.append(
|
||||
struct.unpack("!H", hk_data[current_idx: current_idx + 2])[0]
|
||||
)
|
||||
current_idx += 2
|
||||
out_enb_list = []
|
||||
for idx in range(13):
|
||||
out_enb_list.append(hk_data[current_idx])
|
||||
current_idx += 1
|
||||
header_str = f"{'Name'.ljust(24)} | OutEnb | U [mV] | I [mA]"
|
||||
print(header_str)
|
||||
printer.file_logger.info(header_str)
|
||||
for idx in range(13):
|
||||
out_enb = f"{out_enb_list[idx]}".ljust(6)
|
||||
content_line = (
|
||||
f"{TmHandler.P60_INDEX_LIST[idx].ljust(24)} | {out_enb} | "
|
||||
f"{voltage_list[idx]:05} | {current_list[idx]:04}"
|
||||
)
|
||||
log_to_both(printer, content_line)
|
||||
fmt_str = "!IBhHhh"
|
||||
inc_len = struct.calcsize(fmt_str)
|
||||
(
|
||||
boot_count,
|
||||
batt_mode,
|
||||
batt_current,
|
||||
batt_voltage,
|
||||
temp_0,
|
||||
temp_1,
|
||||
) = struct.unpack(fmt_str, hk_data[current_idx: current_idx + inc_len])
|
||||
current_idx += inc_len
|
||||
batt_info = (
|
||||
f"Batt: Mode {batt_mode} | Boot Count {boot_count} | "
|
||||
f"Charge current {batt_current} | Voltage {batt_voltage}"
|
||||
)
|
||||
temps = f"In C: Temp 0 {temp_0 / 10.0} | Temp 1 {temp_1 / 10.0} | "
|
||||
log_to_both(printer, temps)
|
||||
log_to_both(printer, batt_info)
|
||||
printer.print_validity_buffer(validity_buffer=hk_data[current_idx:], num_vars=9)
|
||||
if set_id == SetIds.P60_AUX:
|
||||
log_to_both(printer, "Received P60 AUX HK. Voltages in mV, currents in mA")
|
||||
current_idx = 0
|
||||
latchup_list = []
|
||||
log_to_both(printer, "P60 Dock Latchups")
|
||||
for idx in range(0, 13):
|
||||
latchup_list.append(
|
||||
struct.unpack("!H", hk_data[current_idx: current_idx + 2])[0]
|
||||
)
|
||||
content_line = f"{TmHandler.P60_INDEX_LIST[idx].ljust(24)} | {latchup_list[idx]}"
|
||||
log_to_both(printer, content_line)
|
||||
current_idx += 2
|
||||
fmt_str = "!IIHBBHHhhB"
|
||||
inc_len = struct.calcsize(fmt_str)
|
||||
(
|
||||
boot_cause,
|
||||
uptime,
|
||||
reset_cause,
|
||||
heater_on,
|
||||
conv_5v_on,
|
||||
dock_vbat,
|
||||
dock_vcc_c,
|
||||
batt_temp_0,
|
||||
batt_temp_1,
|
||||
dearm_status,
|
||||
) = struct.unpack(fmt_str, hk_data[current_idx: current_idx + inc_len])
|
||||
current_idx += inc_len
|
||||
wdt = WdtInfo()
|
||||
current_idx = wdt.parse(wdt_data=hk_data[current_idx:], current_idx=current_idx)
|
||||
fmt_str = "!hhbb"
|
||||
inc_len = struct.calcsize(fmt_str)
|
||||
(
|
||||
batt_charge_current,
|
||||
batt_discharge_current,
|
||||
ant6_depl,
|
||||
ar6_depl,
|
||||
) = struct.unpack(fmt_str, hk_data[current_idx: current_idx + inc_len])
|
||||
current_idx += inc_len
|
||||
device_types = []
|
||||
device_statuses = []
|
||||
for idx in range(8):
|
||||
device_types.append(hk_data[current_idx])
|
||||
current_idx += 1
|
||||
for idx in range(8):
|
||||
device_statuses.append(hk_data[current_idx])
|
||||
current_idx += 1
|
||||
util_info = (
|
||||
f"Reset Cause {reset_cause} | Boot Cause {boot_cause} | Uptime {uptime}"
|
||||
)
|
||||
util_info_2 = (
|
||||
f"Conv 5V on {conv_5v_on} | Heater On {heater_on} | "
|
||||
f"Dock VBAT {dock_vbat} | DOCK VCC Current {dock_vcc_c}"
|
||||
)
|
||||
log_to_both(printer, util_info)
|
||||
log_to_both(printer, util_info_2)
|
||||
wdt.print(printer)
|
||||
misc_info = (
|
||||
f"Dearm {dearm_status} | ANT6 Depl {ant6_depl} | AR6 Deply {ar6_depl}"
|
||||
)
|
||||
log_to_both(printer, misc_info)
|
||||
batt_info = (
|
||||
f"Batt Temp 0 {batt_temp_0 / 10.0} | Batt Temp 1 {batt_temp_1 / 10.0} | "
|
||||
f"Charge Current {batt_charge_current} | Discharge Current {batt_discharge_current}"
|
||||
)
|
||||
log_to_both(printer, batt_info)
|
||||
printer.print_validity_buffer(
|
||||
validity_buffer=hk_data[current_idx:], num_vars=27
|
||||
)
|
||||
|
0
pus_tm/system/__init__.py
Normal file
0
pus_tm/system/__init__.py
Normal file
21
pus_tm/system/core.py
Normal file
21
pus_tm/system/core.py
Normal file
@ -0,0 +1,21 @@
|
||||
import struct
|
||||
|
||||
from pus_tm.defs import PrintWrapper
|
||||
from pus_tc.system.core import SetIds
|
||||
from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter
|
||||
|
||||
|
||||
def handle_core_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
|
||||
if set_id == SetIds.HK:
|
||||
pw = PrintWrapper(printer)
|
||||
fmt_str = "!fff"
|
||||
inc_len = struct.calcsize(fmt_str)
|
||||
(temperature, ps_voltage, pl_voltage) = struct.unpack(
|
||||
fmt_str, hk_data[0 : 0 + inc_len]
|
||||
)
|
||||
printout = (
|
||||
f"Chip Temperature [°C] {temperature} | PS Voltage [mV] {ps_voltage} | "
|
||||
f"PL Voltage [mV] {pl_voltage}"
|
||||
)
|
||||
pw.dlog(printout)
|
||||
printer.print_validity_buffer(validity_buffer=hk_data[inc_len:], num_vars=4)
|
39
pus_tm/system/tcs.py
Normal file
39
pus_tm/system/tcs.py
Normal file
@ -0,0 +1,39 @@
|
||||
import struct
|
||||
|
||||
from pus_tm.defs import PrintWrapper
|
||||
from pus_tm.hk_handling import TM_TCP_SERVER
|
||||
from tmtccmd.utility import ObjectId
|
||||
from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter
|
||||
|
||||
|
||||
def handle_thermal_controller_hk_data(
|
||||
object_id: ObjectId, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes
|
||||
):
|
||||
if set_id == 0:
|
||||
pw = PrintWrapper(printer)
|
||||
pw.dlog("Received Sensor Temperature data")
|
||||
|
||||
# get all the floats
|
||||
tm_data = struct.unpack("!ffffffffffffffff", hk_data[: 16 * 4])
|
||||
|
||||
# put them into an list with their names
|
||||
parsed_data = [
|
||||
{"SENSOR_PLOC_HEATSPREADER": tm_data[0]},
|
||||
{"SENSOR_PLOC_MISSIONBOARD": tm_data[1]},
|
||||
{"SENSOR_4K_CAMERA": tm_data[2]},
|
||||
{"SENSOR_DAC_HEATSPREADER": tm_data[3]},
|
||||
{"SENSOR_STARTRACKER": tm_data[4]},
|
||||
{"SENSOR_RW1": tm_data[5]},
|
||||
{"SENSOR_DRO": tm_data[6]},
|
||||
{"SENSOR_SCEX": tm_data[7]},
|
||||
{"SENSOR_X8": tm_data[8]},
|
||||
{"SENSOR_HPA": tm_data[9]},
|
||||
{"SENSOR_TX_MODUL": tm_data[10]},
|
||||
{"SENSOR_MPA": tm_data[11]},
|
||||
{"SENSOR_ACU": tm_data[12]},
|
||||
{"SENSOR_PLPCDU_HEATSPREADER": tm_data[13]},
|
||||
{"SENSOR_TCS_BOARD": tm_data[14]},
|
||||
{"SENSOR_MAGNETTORQUER": tm_data[15]},
|
||||
]
|
||||
|
||||
TM_TCP_SERVER.report_parsed_hk_data(object_id, set_id, parsed_data)
|
@ -18,8 +18,7 @@ class TmTcpServer:
|
||||
|
||||
_Instance = None
|
||||
|
||||
def __init__(
|
||||
self):
|
||||
def __init__(self):
|
||||
|
||||
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
|
||||
@ -65,7 +64,7 @@ class TmTcpServer:
|
||||
|
||||
# dle encode the bytes
|
||||
# adding a newline because someone might want to look at it in a console
|
||||
data_json_bytes = self.dle_encoder.encode(data_json_bytes + b'\n')
|
||||
data_json_bytes = self.dle_encoder.encode(data_json_bytes + b"\n")
|
||||
|
||||
try:
|
||||
sent_length = self.client_connection.send(data_json_bytes)
|
||||
@ -76,27 +75,25 @@ class TmTcpServer:
|
||||
self.client_connection.close()
|
||||
self.client_connection = None
|
||||
|
||||
def report_raw_hk_data(self, object_id: ObjectId,
|
||||
set_id: int,
|
||||
hk_data: bytes):
|
||||
def report_raw_hk_data(self, object_id: ObjectId, set_id: int, hk_data: bytes):
|
||||
|
||||
data_dict = {}
|
||||
data_dict["type"] = "TM"
|
||||
data_dict["tmType"] = "Raw HK"
|
||||
data_dict["objectId"] = object_id.as_string
|
||||
data_dict["setId"] = set_id
|
||||
data_dict["rawData"] = base64.b64encode(hk_data).decode()
|
||||
data_dict = {
|
||||
"type": "TM",
|
||||
"tmType": "Raw HK",
|
||||
"objectId": object_id.as_string,
|
||||
"setId": set_id,
|
||||
"rawData": base64.b64encode(hk_data).decode(),
|
||||
}
|
||||
|
||||
self._send_dictionary_over_socket(data_dict)
|
||||
|
||||
def report_parsed_hk_data(self, object_id: ObjectId,
|
||||
set_id: int,
|
||||
data_dictionary):
|
||||
data_dict = {}
|
||||
data_dict["type"] = "TM"
|
||||
data_dict["tmType"] = "Parsed HK"
|
||||
data_dict["objectId"] = object_id.as_string
|
||||
data_dict["setId"] = set_id
|
||||
data_dict["content"] = data_dictionary
|
||||
def report_parsed_hk_data(self, object_id: ObjectId, set_id: int, data_dictionary):
|
||||
data_dict = {
|
||||
"type": "TM",
|
||||
"tmType": "Parsed HK",
|
||||
"objectId": object_id.as_string,
|
||||
"setId": set_id,
|
||||
"content": data_dictionary,
|
||||
}
|
||||
|
||||
self._send_dictionary_over_socket(data_dict)
|
2
tmtccmd
2
tmtccmd
@ -1 +1 @@
|
||||
Subproject commit 862fdf23bc4a90ced47ee1c811de4237e3508536
|
||||
Subproject commit 443328422a03395bd51d072d3360de3397e1d88b
|
@ -10,3 +10,4 @@ def main():
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
c
|
||||
|
Loading…
Reference in New Issue
Block a user