v1.9.0 #53
@ -23,7 +23,9 @@ def handle_action_reply(
|
||||
object_id = obj_id_dict.get(tm_packet.source_object_id)
|
||||
custom_data = tm_packet.custom_data
|
||||
action_id = tm_packet.action_id
|
||||
generic_print_str = printer.generic_action_packet_tm_print(packet=tm_packet, obj_id=object_id)
|
||||
generic_print_str = printer.generic_action_packet_tm_print(
|
||||
packet=tm_packet, obj_id=object_id
|
||||
)
|
||||
print(generic_print_str)
|
||||
printer.file_logger.info(generic_print_str)
|
||||
if object_id == IMTQ_HANDLER_ID:
|
||||
@ -37,9 +39,7 @@ def handle_action_reply(
|
||||
|
||||
|
||||
def handle_imtq_replies(
|
||||
action_id: int,
|
||||
printer: FsfwTmTcPrinter,
|
||||
custom_data: bytearray
|
||||
action_id: int, printer: FsfwTmTcPrinter, custom_data: bytearray
|
||||
):
|
||||
if action_id == struct.unpack("!I", ImtqActionIds.get_commanded_dipole)[0]:
|
||||
header_list = [
|
||||
@ -47,11 +47,7 @@ def handle_imtq_replies(
|
||||
"Commanded Y-Dipole",
|
||||
"Commanded Z-Dipole",
|
||||
]
|
||||
[
|
||||
x_dipole,
|
||||
y_dipole,
|
||||
z_dipole
|
||||
] = struct.unpack("!HHH", custom_data[0:6])
|
||||
[x_dipole, y_dipole, z_dipole] = struct.unpack("!HHH", custom_data[0:6])
|
||||
content_list = [x_dipole, y_dipole, z_dipole]
|
||||
print(header_list)
|
||||
print(content_list)
|
||||
@ -60,9 +56,7 @@ def handle_imtq_replies(
|
||||
|
||||
|
||||
def handle_ploc_replies(
|
||||
action_id: int,
|
||||
printer: FsfwTmTcPrinter,
|
||||
custom_data: bytearray
|
||||
action_id: int, printer: FsfwTmTcPrinter, custom_data: bytearray
|
||||
):
|
||||
if action_id == PlocReplyIds.tm_mem_read_report:
|
||||
header_list = [
|
||||
@ -82,9 +76,7 @@ def handle_ploc_replies(
|
||||
|
||||
|
||||
def handle_supervisor_replies(
|
||||
action_id: int,
|
||||
printer: FsfwTmTcPrinter,
|
||||
custom_data: bytearray
|
||||
action_id: int, printer: FsfwTmTcPrinter, custom_data: bytearray
|
||||
):
|
||||
reply = DataReplyUnpacked()
|
||||
if action_id == SupvActionIds.DUMP_MRAM:
|
||||
@ -97,9 +89,7 @@ def handle_supervisor_replies(
|
||||
|
||||
|
||||
def handle_startracker_replies(
|
||||
action_id: int,
|
||||
printer: FsfwTmTcPrinter,
|
||||
custom_data: bytearray
|
||||
action_id: int, printer: FsfwTmTcPrinter, custom_data: bytearray
|
||||
):
|
||||
if action_id == StarTrackerActionIds.CHECKSUM:
|
||||
if len(custom_data) != 5:
|
||||
|
@ -66,22 +66,22 @@ def handle_regular_hk_print(
|
||||
"""This function is called when a Service 3 Housekeeping packet is received."""
|
||||
if object_id == SYRLINKS_HANDLER_ID:
|
||||
if set_id == SetIds.RX_REGISTERS_DATASET:
|
||||
return handle_syrlinks_rx_registers_dataset(hk_data)
|
||||
return handle_syrlinks_rx_registers_dataset(printer, hk_data)
|
||||
elif set_id == SetIds.TX_REGISTERS_DATASET:
|
||||
return handle_syrlinks_tx_registers_dataset(hk_data)
|
||||
return handle_syrlinks_tx_registers_dataset(printer, hk_data)
|
||||
else:
|
||||
LOGGER.info("Serive 3 TM: Syrlinks handler reply with unknown set id")
|
||||
elif object_id == IMTQ_HANDLER_ID:
|
||||
if (set_id >= ImtqSetIds.POSITIVE_X_TEST) and (
|
||||
set_id <= ImtqSetIds.NEGATIVE_Z_TEST
|
||||
):
|
||||
return handle_self_test_data(hk_data)
|
||||
return handle_self_test_data(printer, hk_data)
|
||||
else:
|
||||
LOGGER.info("Serive 3 TM: Syrlinks handler reply with unknown set id")
|
||||
elif object_id == GPS_HANDLER_0_ID or object_id == GPS_HANDLER_1_ID:
|
||||
return handle_gps_data(hk_data=hk_data)
|
||||
return handle_gps_data(printer=printer, hk_data=hk_data)
|
||||
elif object_id == BPX_HANDLER_ID:
|
||||
return handle_bpx_hk_data(hk_data=hk_data, set_id=set_id)
|
||||
return handle_bpx_hk_data(hk_data=hk_data, set_id=set_id, printer=printer)
|
||||
elif object_id == CORE_CONTROLLER_ID:
|
||||
return handle_core_hk_data(printer=printer, hk_data=hk_data)
|
||||
elif object_id == P60_DOCK_HANDLER:
|
||||
@ -91,11 +91,9 @@ def handle_regular_hk_print(
|
||||
return HkReplyUnpacked()
|
||||
|
||||
|
||||
def handle_syrlinks_rx_registers_dataset(
|
||||
hk_data: bytes,
|
||||
) -> HkReplyUnpacked:
|
||||
def handle_syrlinks_rx_registers_dataset(printer: FsfwTmTcPrinter, hk_data: bytes):
|
||||
reply = HkReplyUnpacked()
|
||||
reply.header_list = [
|
||||
header_list = [
|
||||
"RX Status",
|
||||
"RX Sensitivity",
|
||||
"RX Frequency Shift",
|
||||
@ -113,7 +111,7 @@ def handle_syrlinks_rx_registers_dataset(
|
||||
rx_demod_eb = struct.unpack("!I", hk_data[13:17])
|
||||
rx_demod_n0 = struct.unpack("!I", hk_data[17:21])
|
||||
rx_data_rate = hk_data[21]
|
||||
reply.content_list = [
|
||||
content_list = [
|
||||
rx_status,
|
||||
rx_sensitivity,
|
||||
rx_frequency_shift,
|
||||
@ -123,28 +121,30 @@ def handle_syrlinks_rx_registers_dataset(
|
||||
rx_demod_n0,
|
||||
rx_data_rate,
|
||||
]
|
||||
reply.validity_buffer = hk_data[22:]
|
||||
reply.num_of_vars = 8
|
||||
return reply
|
||||
validity_buffer = hk_data[22:]
|
||||
log_to_both(printer, str(header_list))
|
||||
log_to_both(printer, str(content_list))
|
||||
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=8)
|
||||
|
||||
|
||||
def handle_syrlinks_tx_registers_dataset(
|
||||
printer: FsfwTmTcPrinter,
|
||||
hk_data: bytes,
|
||||
) -> HkReplyUnpacked:
|
||||
):
|
||||
reply = HkReplyUnpacked()
|
||||
reply.header_list = ["TX Status", "TX Waveform", "TX AGC value"]
|
||||
header_list = ["TX Status", "TX Waveform", "TX AGC value"]
|
||||
tx_status = hk_data[0]
|
||||
tx_waveform = hk_data[1]
|
||||
tx_agc_value = struct.unpack("!H", hk_data[2:4])
|
||||
reply.content_list = [tx_status, tx_waveform, tx_agc_value]
|
||||
reply.validity_buffer = hk_data[4:]
|
||||
reply.num_of_vars = 3
|
||||
return reply
|
||||
content_list = [tx_status, tx_waveform, tx_agc_value]
|
||||
validity_buffer = hk_data[4:]
|
||||
log_to_both(printer, str(header_list))
|
||||
log_to_both(printer, str(content_list))
|
||||
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=3)
|
||||
|
||||
|
||||
def handle_self_test_data(hk_data: bytes) -> HkReplyUnpacked:
|
||||
reply = HkReplyUnpacked()
|
||||
reply.hk_header = [
|
||||
def handle_self_test_data(printer: FsfwTmTcPrinter, hk_data: bytes):
|
||||
header_list = [
|
||||
"Init Err",
|
||||
"Init Raw Mag X [nT]",
|
||||
"Init Raw Mag Y [nT]",
|
||||
@ -230,8 +230,8 @@ def handle_self_test_data(hk_data: bytes) -> HkReplyUnpacked:
|
||||
fina_coil_y_temperature = struct.unpack("!H", hk_data[125:127])[0]
|
||||
fina_coil_z_temperature = struct.unpack("!H", hk_data[127:129])[0]
|
||||
|
||||
reply.validity_buffer = hk_data[129:]
|
||||
reply.content_list = [
|
||||
validity_buffer = hk_data[129:]
|
||||
content_list = [
|
||||
init_err,
|
||||
init_raw_mag_x,
|
||||
init_raw_mag_y,
|
||||
@ -272,17 +272,17 @@ def handle_self_test_data(hk_data: bytes) -> HkReplyUnpacked:
|
||||
fina_coil_y_temperature,
|
||||
fina_coil_z_temperature,
|
||||
]
|
||||
reply.num_of_vars = len(reply.hk_header)
|
||||
return reply
|
||||
num_of_vars = len(header_list)
|
||||
log_to_both(printer, str(header_list))
|
||||
log_to_both(printer, str(content_list))
|
||||
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=num_of_vars)
|
||||
|
||||
|
||||
def handle_gps_data(hk_data: bytes) -> HkReplyUnpacked:
|
||||
def handle_gps_data(printer: FsfwTmTcPrinter, hk_data: bytes):
|
||||
LOGGER.info(f"Received GPS data, HK data length {len(hk_data)}")
|
||||
reply = HkReplyUnpacked()
|
||||
var_index = 0
|
||||
header_array = []
|
||||
content_array = []
|
||||
reply.header_list = [
|
||||
header_list = [
|
||||
"Latitude",
|
||||
"Longitude",
|
||||
"Altitude",
|
||||
@ -304,7 +304,7 @@ def handle_gps_data(hk_data: bytes) -> HkReplyUnpacked:
|
||||
seconds = hk_data[32]
|
||||
date_string = f"{day}.{month}.{year} {hours}:{minutes}:{seconds}"
|
||||
unix_seconds = struct.unpack("!I", hk_data[33:37])[0]
|
||||
content_array = [
|
||||
content_list = [
|
||||
latitude,
|
||||
longitude,
|
||||
altitude,
|
||||
@ -326,27 +326,29 @@ def handle_gps_data(hk_data: bytes) -> HkReplyUnpacked:
|
||||
f"{datetime.datetime.now()}, {latitude}, {longitude}, {altitude}, "
|
||||
f"{fix_mode}, {sat_in_use}, {date_string}, {unix_seconds}\n"
|
||||
)
|
||||
reply.header_list = header_array
|
||||
reply.content_list = content_array
|
||||
reply.validity_buffer = hk_data[37:39]
|
||||
return reply
|
||||
validity_buffer = hk_data[37:39]
|
||||
log_to_both(printer, str(header_list))
|
||||
log_to_both(printer, str(content_list))
|
||||
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=10)
|
||||
|
||||
|
||||
def handle_bpx_hk_data(hk_data: bytes, set_id: int) -> HkReplyUnpacked:
|
||||
LOGGER.info(f"Received BPX data, HK data length {len(hk_data)}")
|
||||
reply = HkReplyUnpacked()
|
||||
def handle_bpx_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
|
||||
if set_id == BpxSetIds.GET_HK_SET:
|
||||
charge_current = struct.unpack("!H", hk_data[0:2])[0]
|
||||
discharge_current = struct.unpack("!H", hk_data[2:4])[0]
|
||||
heater_current = struct.unpack("!H", hk_data[4:6])[0]
|
||||
batt_voltage = struct.unpack("!H", hk_data[6:8])[0]
|
||||
batt_temp_1 = struct.unpack("!h", hk_data[8:10])[0]
|
||||
batt_temp_2 = struct.unpack("!h", hk_data[10:12])[0]
|
||||
batt_temp_3 = struct.unpack("!h", hk_data[12:14])[0]
|
||||
batt_temp_4 = struct.unpack("!h", hk_data[14:16])[0]
|
||||
reboot_cntr = struct.unpack("!I", hk_data[16:20])[0]
|
||||
boot_cause = hk_data[20]
|
||||
reply.header_list = [
|
||||
fmt_str = "!HHHHhhhhI"
|
||||
inc_len = struct.calcsize(fmt_str)
|
||||
(
|
||||
charge_current,
|
||||
discharge_current,
|
||||
heater_current,
|
||||
batt_voltage,
|
||||
batt_temp_1,
|
||||
batt_temp_2,
|
||||
batt_temp_3,
|
||||
batt_temp_4,
|
||||
reboot_cntr,
|
||||
boot_cause,
|
||||
) = struct.unpack(fmt_str, hk_data[0:inc_len])
|
||||
header_list = [
|
||||
"Charge Current",
|
||||
"Discharge Current",
|
||||
"Heater Current",
|
||||
@ -358,7 +360,7 @@ def handle_bpx_hk_data(hk_data: bytes, set_id: int) -> HkReplyUnpacked:
|
||||
"Reboot Counter",
|
||||
"Boot Cause",
|
||||
]
|
||||
reply.content_list = [
|
||||
content_list = [
|
||||
charge_current,
|
||||
discharge_current,
|
||||
heater_current,
|
||||
@ -370,19 +372,24 @@ def handle_bpx_hk_data(hk_data: bytes, set_id: int) -> HkReplyUnpacked:
|
||||
reboot_cntr,
|
||||
boot_cause,
|
||||
]
|
||||
reply.validity_buffer = hk_data[21:]
|
||||
validity_buffer = hk_data[inc_len:]
|
||||
log_to_both(printer, str(header_list))
|
||||
log_to_both(printer, str(content_list))
|
||||
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=10)
|
||||
elif set_id == BpxSetIds.GET_CFG_SET:
|
||||
battheat_mode = hk_data[0]
|
||||
battheat_low = struct.unpack("!b", hk_data[1:2])[0]
|
||||
battheat_high = struct.unpack("!b", hk_data[2:3])[0]
|
||||
reply.header_list = [
|
||||
header_list = [
|
||||
"Battery Heater Mode",
|
||||
"Battery Heater Low Limit",
|
||||
"Battery Heater High Limit",
|
||||
]
|
||||
reply.content_list = [battheat_mode, battheat_low, battheat_high]
|
||||
reply.validity_buffer = hk_data[3:]
|
||||
return reply
|
||||
content_list = [battheat_mode, battheat_low, battheat_high]
|
||||
validity_buffer = hk_data[3:]
|
||||
log_to_both(printer, str(header_list))
|
||||
log_to_both(printer, str(content_list))
|
||||
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=10)
|
||||
|
||||
|
||||
def handle_core_hk_data(printer: FsfwTmTcPrinter, hk_data: bytes):
|
||||
|
Loading…
x
Reference in New Issue
Block a user