New sus event #199

Closed
muellerr wants to merge 67 commits from relax-sus-fdir into main
15 changed files with 88 additions and 111 deletions
Showing only changes of commit e9e43f03d2 - Show all commits

View File

@ -1,6 +1,5 @@
import logging
import datetime
import struct
import sys
from eive_tmtc.config.events import get_event_dict
@ -12,15 +11,13 @@ from tmtccmd.tc.pus_200_fsfw_mode import Mode
from tmtccmd.tc.pus_201_fsfw_health import FsfwHealth
from tmtccmd.tm import Service5Tm
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter
from tmtccmd.fsfw import EventInfo
from spacepackets.ccsds.time import CdsShortTimestamp
_LOGGER = logging.getLogger(__name__)
def handle_event_packet(raw_tm: bytes, printer: FsfwTmTcPrinter):
pw = PrintWrapper(printer)
def handle_event_packet(raw_tm: bytes, pw: PrintWrapper):
tm = Service5Tm.unpack(data=raw_tm, time_reader=CdsShortTimestamp.empty())
event_dict = get_event_dict()
event_def = tm.event_definition
@ -38,7 +35,7 @@ def handle_event_packet(raw_tm: bytes, printer: FsfwTmTcPrinter):
obj_name = obj_id_obj.name
generic_event_string = f"Object {obj_name} generated Event {info.name} (ID: {event_def.event_id:#04x}) at {tm.time_provider.as_date_time()}"
_LOGGER.info(generic_event_string)
pw.printer.file_logger.info(
pw.file_logger.info(
f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}: {generic_event_string}"
)
specific_handler = False

View File

@ -104,32 +104,26 @@ def handle_regular_hk_print(
elif objb == obj_ids.IMTQ_HANDLER_ID:
return handle_imtq_hk(pw=pw, hk_data=hk_data, set_id=set_id)
elif objb == obj_ids.GPS_CONTROLLER:
return handle_gps_data(printer=printer, hk_data=hk_data)
return handle_gps_data(pw=pw, hk_data=hk_data)
elif objb == obj_ids.PCDU_HANDLER_ID:
return handle_pcdu_hk(printer=printer, set_id=set_id, hk_data=hk_data)
return handle_pcdu_hk(pw=pw, set_id=set_id, hk_data=hk_data)
elif objb == obj_ids.BPX_HANDLER_ID:
return handle_bpx_hk_data(hk_data=hk_data, set_id=set_id, printer=printer)
elif objb == obj_ids.CORE_CONTROLLER_ID:
return handle_core_hk_data(printer=printer, hk_data=hk_data, set_id=set_id)
elif objb == obj_ids.PDU_1_HANDLER_ID:
return handle_pdu_data(
printer=printer, pdu_idx=1, set_id=set_id, hk_data=hk_data
)
return handle_pdu_data(pw=pw, pdu_idx=1, set_id=set_id, hk_data=hk_data)
elif objb == obj_ids.PDU_2_HANDLER_ID:
return handle_pdu_data(
printer=printer, pdu_idx=2, set_id=set_id, hk_data=hk_data
)
return handle_pdu_data(pw=pw, pdu_idx=2, set_id=set_id, hk_data=hk_data)
elif objb == obj_ids.PLOC_MPSOC_ID:
return handle_ploc_mpsoc_hk_data(
printer=printer, hk_data=hk_data, set_id=set_id
)
return handle_ploc_mpsoc_hk_data(pw=pw, hk_data=hk_data, set_id=set_id)
elif objb == obj_ids.ACU_HANDLER_ID:
return handle_acu_hk_data(printer=printer, hk_data=hk_data, set_id=set_id)
return handle_acu_hk_data(pw=pw, hk_data=hk_data, set_id=set_id)
elif objb == obj_ids.RAD_SENSOR_ID:
return handle_rad_sensor_data(printer=printer, hk_data=hk_data, set_id=set_id)
return handle_rad_sensor_data(pw=pw, hk_data=hk_data, set_id=set_id)
elif objb in [obj_ids.RW1_ID, obj_ids.RW2_ID, obj_ids.RW3_ID, obj_ids.RW4_ID]:
return handle_rw_hk_data(
printer=printer, object_id=object_id, set_id=set_id, hk_data=hk_data
pw=pw, object_id=object_id, set_id=set_id, hk_data=hk_data
)
if objb in [
obj_ids.SUS_0_N_LOC_XFYFZM_PT_XF,
@ -145,13 +139,11 @@ def handle_regular_hk_print(
obj_ids.SUS_10_R_LOC_XMYBZF_PT_ZF,
obj_ids.SUS_11_R_LOC_XBYMZB_PT_ZB,
]:
return handle_sus_hk(
object_id=object_id, hk_data=hk_data, printer=printer, set_id=set_id
)
return handle_sus_hk(object_id=object_id, hk_data=hk_data, pw=pw, set_id=set_id)
elif objb in RTD_NAMES.keys():
return handle_rtd_hk(object_id=objb, hk_data=hk_data, printer=printer)
return handle_rtd_hk(object_id=objb, hk_data=hk_data, pw=pw)
elif objb == obj_ids.P60_DOCK_HANDLER:
return handle_p60_hk_data(printer=printer, set_id=set_id, hk_data=hk_data)
return handle_p60_hk_data(pw=pw, set_id=set_id, hk_data=hk_data)
elif objb in [
obj_ids.GYRO_0_ADIS_HANDLER_ID,
obj_ids.GYRO_1_L3G_HANDLER_ID,
@ -159,7 +151,7 @@ def handle_regular_hk_print(
obj_ids.GYRO_3_L3G_HANDLER_ID,
]:
return handle_gyros_hk_data(
object_id=object_id, hk_data=hk_data, printer=printer, set_id=set_id
object_id=object_id, hk_data=hk_data, pw=pw, set_id=set_id
)
elif objb in [
obj_ids.MGM_0_LIS3_HANDLER_ID,
@ -168,21 +160,21 @@ def handle_regular_hk_print(
obj_ids.MGM_3_RM3100_HANDLER_ID,
]:
return handle_mgm_hk_data(
object_id=object_id, hk_data=hk_data, printer=printer, set_id=set_id
object_id=object_id, hk_data=hk_data, pw=pw, set_id=set_id
)
elif objb == obj_ids.PL_PCDU_ID:
return handle_plpcdu_hk(set_id=set_id, hk_data=hk_data, printer=printer)
return handle_plpcdu_hk(set_id=set_id, hk_data=hk_data, pw=pw)
elif objb == obj_ids.THERMAL_CONTROLLER_ID:
return handle_thermal_controller_hk_data(
object_id=object_id, printer=printer, set_id=set_id, hk_data=hk_data
object_id=object_id, pw=pw, set_id=set_id, hk_data=hk_data
)
elif objb == obj_ids.STAR_TRACKER_ID:
return handle_str_hk_data(set_id=set_id, hk_data=hk_data, printer=printer)
return handle_str_hk_data(set_id=set_id, hk_data=hk_data, pw=pw)
elif objb == obj_ids.PLOC_SUPV_ID:
return handle_supv_hk_data(set_id=set_id, hk_data=hk_data, printer=printer)
return handle_supv_hk_data(set_id=set_id, hk_data=hk_data, pw=pw)
elif objb == obj_ids.ACS_CONTROLLER:
return handle_acs_ctrl_hk_data(
printer=printer, set_id=set_id, hk_data=hk_data, packet_time=packet_dt
pw=pw, set_id=set_id, hk_data=hk_data, packet_time=packet_dt
)
else:
_LOGGER.info(

View File

@ -672,12 +672,11 @@ def set_acs_ctrl_param_matrix(q: DefaultPusQueueHelper):
def handle_acs_ctrl_hk_data(
printer: FsfwTmTcPrinter,
pw: PrintWrapper,
set_id: int,
hk_data: bytes,
packet_time: datetime.datetime,
):
pw = PrintWrapper(printer)
pw.ilog(_LOGGER, f"Received ACS CTRL HK with packet time {packet_time}")
match set_id:
case SetId.MGM_RAW_SET:
@ -721,7 +720,7 @@ def handle_acs_ctrl_sus_raw_data(pw: PrintWrapper, hk_data: bytes):
sus_list_formatted = vec_fmt.format(*sus_list)
current_idx += length
pw.dlog(f"SUS {idx} RAW: {sus_list_formatted}")
pw.printer.get_validity_buffer(hk_data[current_idx:], num_vars=12)
FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=12)
def handle_acs_ctrl_sus_processed_data(pw: PrintWrapper, hk_data: bytes):
@ -757,7 +756,7 @@ def handle_acs_ctrl_sus_processed_data(pw: PrintWrapper, hk_data: bytes):
sun_ijk_model = vec_fmt.format(*sun_ijk_model)
current_idx += inc_len
pw.dlog(f"{'SUS ijk Model'.ljust(25)}: {sun_ijk_model}")
pw.printer.get_validity_buffer(hk_data[current_idx:], num_vars=15)
FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=15)
def handle_raw_mgm_data(pw: PrintWrapper, hk_data: bytes):
@ -812,7 +811,7 @@ def handle_raw_mgm_data(pw: PrintWrapper, hk_data: bytes):
pw.dlog(f"{entry[0].ljust(28)}: {entry[1]}")
current_idx += 1
assert current_idx == 61
pw.printer.get_validity_buffer(hk_data[current_idx:], num_vars=6)
FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=6)
def handle_mgm_data_processed(pw: PrintWrapper, hk_data: bytes):
@ -866,7 +865,7 @@ def handle_mgm_data_processed(pw: PrintWrapper, hk_data: bytes):
current_idx += inc_len
if PERFORM_MGM_CALIBRATION:
perform_mgm_calibration(pw, mgm_3)
pw.printer.get_validity_buffer(hk_data[current_idx:], num_vars=8)
FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=8)
def handle_gyr_data_raw(pw: PrintWrapper, hk_data: bytes):
@ -900,7 +899,7 @@ def handle_gyr_data_raw(pw: PrintWrapper, hk_data: bytes):
pw.dlog(f"{'GYR 1 L3'.ljust(15)}: {float_str_fmt.format(*gyr_1_l3)}")
pw.dlog(f"{'GYR 2 ADIS'.ljust(15)}: {float_str_fmt.format(*gyr_2_adis)}")
pw.dlog(f"{'GYR 3 L3'.ljust(15)}: {float_str_fmt.format(*gyr_3_l3)}")
pw.printer.get_validity_buffer(hk_data[current_idx:], 4)
FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], 4)
GYR_NAMES = ["GYR 0 ADIS", "GYR 1 L3", "GYR 2 ADIS", "GYR 3 L3"]
@ -926,7 +925,7 @@ def handle_gyr_data_processed(pw: PrintWrapper, hk_data: bytes):
]
pw.dlog(f"GYR Vec Total: {gyr_vec_tot}")
current_idx += inc_len
pw.printer.get_validity_buffer(hk_data[current_idx:], num_vars=5)
FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=5)
def handle_gps_data_processed(pw: PrintWrapper, hk_data: bytes):
@ -979,7 +978,7 @@ def handle_gps_data_processed(pw: PrintWrapper, hk_data: bytes):
pw.dlog(f"GPS Altitude: {alt} [m]")
pw.dlog(f"GPS Position: {pos} [m]")
pw.dlog(f"GPS Velocity: {velo} [m/s]")
pw.printer.get_validity_buffer(hk_data[current_idx:], num_vars=5)
FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=5)
def handle_mekf_data(pw: PrintWrapper, hk_data: bytes):
@ -1023,7 +1022,7 @@ def handle_mekf_data(pw: PrintWrapper, hk_data: bytes):
pw.dlog(f"{'MEKF Raw Status (key unknown)'.ljust(25)}: {status}")
pw.dlog(f"{'MEKF Quaternion'.ljust(25)}: {fmt_str_4.format(*quat)}")
pw.dlog(f"{'MEKF Rotational Rate'.ljust(25)}: {fmt_str_3.format(*rates)}")
pw.printer.get_validity_buffer(hk_data[current_idx:], num_vars=3)
FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=3)
def handle_ctrl_val_data(pw: PrintWrapper, hk_data: bytes):
@ -1091,7 +1090,7 @@ def handle_ctrl_val_data(pw: PrintWrapper, hk_data: bytes):
pw.dlog(f"Control Values Error Quaternion: {err_quat}")
pw.dlog(f"Control Values Error Angle: {err_ang} [deg]")
pw.dlog(f"Control Values Target Rotational Rate: {tgt_rot} [deg/s]")
pw.printer.get_validity_buffer(hk_data[current_idx:], num_vars=5)
FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=5)
def handle_act_cmd_data(pw: PrintWrapper, hk_data: bytes):
@ -1130,7 +1129,7 @@ def handle_act_cmd_data(pw: PrintWrapper, hk_data: bytes):
pw.dlog(f"Actuator Commands RW Target Torque: {rw_tgt_torque}")
pw.dlog(f"Actuator Commands RW Target Speed: {rw_tgt_speed}")
pw.dlog(f"Actuator Commands MTQ Target Dipole: {mtq_tgt_dipole}")
pw.printer.get_validity_buffer(hk_data[current_idx:], num_vars=3)
FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=3)
def perform_mgm_calibration(pw: PrintWrapper, mgm_tuple: Tuple):

View File

@ -87,8 +87,7 @@ def pack_gps_command(object_id: bytes, q: DefaultPusQueueHelper, op_code: str):
q.add_pus_tc(create_mode_command(object_id, Mode.OFF, 0))
def handle_gps_data(printer: FsfwTmTcPrinter, hk_data: bytes):
pw = PrintWrapper(printer)
def handle_gps_data(pw: PrintWrapper, hk_data: bytes):
pw.dlog(f"Received GPS data, HK data length {len(hk_data)}")
current_idx = 0
fmt_str = "!ddddBBBHBBBBBI"
@ -124,4 +123,6 @@ def handle_gps_data(printer: FsfwTmTcPrinter, hk_data: bytes):
)
pw.dlog(f"GNSS Date: {date_string}")
pw.dlog(f"Unix seconds {unix_seconds}")
printer.get_validity_buffer(validity_buffer=hk_data[current_idx:], num_vars=14)
FsfwTmTcPrinter.get_validity_buffer(
validity_buffer=hk_data[current_idx:], num_vars=14
)

View File

@ -25,7 +25,6 @@ from eive_tmtc.config.definitions import CustomServiceList
from eive_tmtc.pus_tm.defs import PrintWrapper
from tmtccmd.util import ObjectIdU32
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter
class OpCode:
@ -107,29 +106,24 @@ def handle_gyr_cmd(q: DefaultPusQueueHelper, op_code: str):
def handle_gyros_hk_data(
object_id: ObjectIdU32, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes
object_id: ObjectIdU32, pw: PrintWrapper, set_id: int, hk_data: bytes
):
if object_id.as_bytes in [
obj_ids.GYRO_0_ADIS_HANDLER_ID,
obj_ids.GYRO_2_ADIS_HANDLER_ID,
]:
handle_adis_gyro_hk(
object_id=object_id, printer=printer, set_id=set_id, hk_data=hk_data
)
handle_adis_gyro_hk(object_id=object_id, pw=pw, set_id=set_id, hk_data=hk_data)
elif object_id.as_bytes in [
obj_ids.GYRO_1_L3G_HANDLER_ID,
obj_ids.GYRO_3_L3G_HANDLER_ID,
]:
handle_l3g_gyro_hk(
object_id=object_id, printer=printer, set_id=set_id, hk_data=hk_data
)
handle_l3g_gyro_hk(object_id=object_id, pw=pw, set_id=set_id, hk_data=hk_data)
def handle_adis_gyro_hk(
object_id: ObjectIdU32, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes
object_id: ObjectIdU32, pw: PrintWrapper, set_id: int, hk_data: bytes
):
if set_id == AdisGyroSetId.CORE_HK:
pw = PrintWrapper(printer)
fmt_str = "!ddddddf"
inc_len = struct.calcsize(fmt_str)
(
@ -149,7 +143,6 @@ def handle_adis_gyro_hk(
pw.dlog(f"Acceleration (m/s^2): X {accel_x} | Y {accel_y} | Z {accel_z}")
pw.dlog(f"Temperature {temp} C")
if set_id == AdisGyroSetId.CFG_HK:
pw = PrintWrapper(printer)
fmt_str = "!HBHHH"
inc_len = struct.calcsize(fmt_str)
print(len(hk_data))
@ -168,10 +161,9 @@ def handle_adis_gyro_hk(
def handle_l3g_gyro_hk(
object_id: ObjectIdU32, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes
object_id: ObjectIdU32, pw: PrintWrapper, set_id: int, hk_data: bytes
):
if set_id == L3gGyroSetId.CORE_HK:
pw = PrintWrapper(printer)
fmt_str = "!ffff"
inc_len = struct.calcsize(fmt_str)
(angVelocX, angVelocY, angVelocZ, temp) = struct.unpack(

View File

@ -64,26 +64,25 @@ def handle_mgm_cmd(q: DefaultPusQueueHelper, op_code: str):
def handle_mgm_hk_data(
object_id: ObjectIdU32, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes
object_id: ObjectIdU32, pw: PrintWrapper, set_id: int, hk_data: bytes
):
if object_id.as_bytes in [
obj_ids.MGM_0_LIS3_HANDLER_ID,
obj_ids.MGM_2_LIS3_HANDLER_ID,
]:
handle_mgm_lis3_hk_data(object_id, printer, set_id, hk_data)
handle_mgm_lis3_hk_data(object_id, pw, set_id, hk_data)
elif object_id.as_bytes in [
obj_ids.MGM_1_RM3100_HANDLER_ID,
obj_ids.MGM_3_RM3100_HANDLER_ID,
]:
handle_mgm_rm3100_hk_data(object_id, printer, set_id, hk_data)
handle_mgm_rm3100_hk_data(object_id, pw, set_id, hk_data)
pass
def handle_mgm_lis3_hk_data(
object_id: ObjectIdU32, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes
object_id: ObjectIdU32, pw: PrintWrapper, set_id: int, hk_data: bytes
):
if set_id == MgmLis3SetId.CORE_HK:
pw = PrintWrapper(printer)
fmt_str = "!ffff"
inc_len = struct.calcsize(fmt_str)
(field_x, field_y, field_z, temp) = struct.unpack(
@ -97,10 +96,9 @@ def handle_mgm_lis3_hk_data(
def handle_mgm_rm3100_hk_data(
object_id: ObjectIdU32, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes
object_id: ObjectIdU32, pw: PrintWrapper, set_id: int, hk_data: bytes
):
if set_id == MgmRm3100SetId.CORE_HK:
pw = PrintWrapper(printer)
fmt_str = f"!fff"
inc_len = struct.calcsize(fmt_str)
(field_x, field_y, field_z) = struct.unpack(fmt_str, hk_data[0 : 0 + inc_len])

View File

@ -707,8 +707,7 @@ def get_upload_image() -> str:
return image
def handle_str_hk_data(set_id: int, hk_data: bytes, printer: FsfwTmTcPrinter):
pw = PrintWrapper(printer)
def handle_str_hk_data(set_id: int, hk_data: bytes, pw: PrintWrapper):
pw.dlog(f"Received STR HK set with set ID {set_id}")
if set_id == SetId.SOLUTION:
handle_solution_set(hk_data, pw)
@ -747,7 +746,7 @@ def handle_temperature_set(hk_data: bytes, pw: PrintWrapper):
pw.dlog(f"CMOS Temperature: {cmos_temp}")
pw.dlog(f"FPGA Temperature: {fpga_temp}")
current_idx += fmt_len
pw.printer.get_validity_buffer(hk_data[current_idx:], 5)
FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], 5)
def handle_solution_set(hk_data: bytes, pw: PrintWrapper):
@ -821,7 +820,7 @@ def handle_solution_set(hk_data: bytes, pw: PrintWrapper):
solution_strategy = hk_data[current_idx]
pw.dlog(f"Solution strategy: {solution_strategy}")
current_idx += 1
pw.printer.get_validity_buffer(hk_data[current_idx:], 23)
FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], 23)
@tmtc_definitions_provider

View File

@ -11,9 +11,8 @@ class SetId(enum.IntEnum):
def handle_sus_hk(
object_id: ObjectIdU32, hk_data: bytes, printer: FsfwTmTcPrinter, set_id: int
object_id: ObjectIdU32, hk_data: bytes, pw: PrintWrapper, set_id: int
):
pw = PrintWrapper(printer)
pw.dlog(f"Received SUS HK data from {object_id}")
if set_id == SetId.HK:
current_idx = 0
@ -27,4 +26,6 @@ def handle_sus_hk(
pw.dlog(f"AIN Channel | Raw Value (hex) | Raw Value (dec)")
for idx, val in enumerate(channels):
pw.dlog(f"{idx} | {val[0]:#06x} |" + str(val[0]).ljust(5))
printer.get_validity_buffer(validity_buffer=hk_data[current_idx:], num_vars=7)
FsfwTmTcPrinter.get_validity_buffer(
validity_buffer=hk_data[current_idx:], num_vars=7
)

View File

@ -25,7 +25,6 @@ from tmtccmd.tc.decorator import ServiceProviderParams
from eive_tmtc.utility.input_helper import InputHelper
from tmtccmd.tc.pus_200_fsfw_mode import pack_mode_data, Mode
from tmtccmd.tc.pus_8_fsfw_funccmd import create_action_cmd
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter
_LOGGER = logging.getLogger(__name__)

View File

@ -721,8 +721,7 @@ def get_event_buffer_path() -> str:
return file
def handle_supv_hk_data(set_id: int, hk_data: bytes, printer: FsfwTmTcPrinter):
pw = PrintWrapper(printer)
def handle_supv_hk_data(set_id: int, hk_data: bytes, pw: PrintWrapper):
current_idx = 0
if set_id == SetIds.HK_REPORT:
pass
@ -752,7 +751,7 @@ def handle_supv_hk_data(set_id: int, hk_data: bytes, printer: FsfwTmTcPrinter):
f"BP0 State {bp_0_state} | BP1 State {bp_1_state} | BP2 State {bp_2_state}"
)
pw.dlog(f"Boot State {boot_state} | Boot Cycles {boot_cycles}")
pw.printer.get_validity_buffer(hk_data[current_idx:], 10)
FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], 10)
else:
pw.dlog(f"PLOC SUPV: HK handling not implemented for set ID {set_id}")
pw.dlog(f"Raw Data: 0x[{hk_data.hex(sep=',')}]")

View File

@ -100,9 +100,8 @@ def rad_sensor_mode_cmd(
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data))
def handle_rad_sensor_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
def handle_rad_sensor_data(pw: PrintWrapper, set_id: int, hk_data: bytes):
if set_id == SetId.HK:
pw = PrintWrapper(printer)
current_idx = 0
pw.dlog("Received Radiation Sensor HK data")
fmt_str = "!fHHHHHH"
@ -116,4 +115,6 @@ def handle_rad_sensor_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes
for idx, val in ain_dict.items():
pw.dlog(f"{idx} | {val:#06x} | {str(val).ljust(5)}")
current_idx += inc_len
printer.get_validity_buffer(validity_buffer=hk_data[current_idx:], num_vars=7)
FsfwTmTcPrinter.get_validity_buffer(
validity_buffer=hk_data[current_idx:], num_vars=7
)

View File

@ -22,9 +22,9 @@ from tmtccmd.tc.pus_11_tc_sched import (
)
from tmtccmd.tc.pus_200_fsfw_mode import pack_mode_data, Mode, Subservice
from tmtccmd.tc.pus_20_fsfw_param import (
pack_scalar_double_param_app_data,
create_scalar_double_parameter,
create_load_param_cmd,
pack_boolean_parameter_app_data,
create_scalar_boolean_parameter,
)
from spacepackets.ecss.tc import PusTelecommand
from eive_tmtc.config.object_ids import PL_PCDU_ID
@ -423,21 +423,21 @@ def pack_wait_time_cmd(q: DefaultPusQueueHelper, param_id: int, print_str: str):
q.add_log_cmd(f"Updating {print_str} wait time to {wait_time}")
if wait_time is None:
return
param_data = pack_scalar_double_param_app_data(
param_data = create_scalar_double_parameter(
object_id=PL_PCDU_ID,
domain_id=0,
unique_id=param_id,
parameter=wait_time,
)
q.add_pus_tc(create_load_param_cmd(app_data=param_data))
q.add_pus_tc(create_load_param_cmd(param_data))
def pack_failure_injection_cmd(q: DefaultPusQueueHelper, param_id: int, print_str: str):
q.add_log_cmd(f"Inserting {print_str} error")
param_data = pack_boolean_parameter_app_data(
param_data = create_scalar_boolean_parameter(
object_id=PL_PCDU_ID, domain_id=0, unique_id=param_id, parameter=True
)
q.add_pus_tc(create_load_param_cmd(app_data=param_data))
q.add_pus_tc(create_load_param_cmd(param_data))
def pack_pl_pcdu_mode_cmd(
@ -468,9 +468,8 @@ ADC_CHANNELS_NAMED = [
]
def handle_plpcdu_hk(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
def handle_plpcdu_hk(pw: PrintWrapper, set_id: int, hk_data: bytes):
if set_id == SetId.ADC:
pw = PrintWrapper(printer)
current_idx = 0
pw.dlog("Received PL PCDU ADC HK data")
channels = []
@ -496,4 +495,6 @@ def handle_plpcdu_hk(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
pw.dlog(ch_print)
for i in range(12):
pw.dlog(f"{ADC_CHANNELS_NAMED[i].ljust(24)} | {processed_vals[i]}")
printer.get_validity_buffer(validity_buffer=hk_data[current_idx:], num_vars=3)
FsfwTmTcPrinter.get_validity_buffer(
validity_buffer=hk_data[current_idx:], num_vars=3
)

View File

@ -146,10 +146,7 @@ class DevicesInfoParser:
return "Unknown Type"
def handle_pdu_data(
printer: FsfwTmTcPrinter, pdu_idx: int, set_id: int, hk_data: bytes
):
pw = PrintWrapper(printer=printer)
def handle_pdu_data(pw: PrintWrapper, pdu_idx: int, set_id: int, hk_data: bytes):
current_idx = 0
priv_idx = pdu_idx - 1
if set_id == SetId.AUX or set_id == SetId.AUX:
@ -228,8 +225,7 @@ def handle_pdu_data(
pw.dlog(info)
def handle_p60_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
pw = PrintWrapper(printer=printer)
def handle_p60_hk_data(pw: PrintWrapper, set_id: int, hk_data: bytes):
if set_id == SetId.CORE:
pw.dlog("Received P60 Core HK. Voltages in mV, currents in mA")
current_idx = 0
@ -276,7 +272,9 @@ def handle_p60_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
temps = f"In C: Temp 0 {temp_0} | Temp 1 {temp_1} | "
pw.dlog(temps)
pw.dlog(batt_info)
printer.get_validity_buffer(validity_buffer=hk_data[current_idx:], num_vars=9)
FsfwTmTcPrinter.get_validity_buffer(
validity_buffer=hk_data[current_idx:], num_vars=9
)
if set_id == SetId.AUX:
pw.dlog("Received P60 AUX HK. Voltages in mV, currents in mA")
current_idx = 0
@ -341,7 +339,9 @@ def handle_p60_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
"6:TempSens(BatPack)|7:TempSens(BatPack)"
)
dev_parser.print(pw=pw)
printer.get_validity_buffer(validity_buffer=hk_data[current_idx:], num_vars=27)
FsfwTmTcPrinter.get_validity_buffer(
validity_buffer=hk_data[current_idx:], num_vars=27
)
def gen_six_entry_u16_list(hk_data: bytes, current_idx: int) -> Tuple[int, List[int]]:
@ -352,8 +352,7 @@ def gen_six_entry_u16_list(hk_data: bytes, current_idx: int) -> Tuple[int, List[
return current_idx, u16_list
def handle_acu_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
pw = PrintWrapper(printer=printer)
def handle_acu_hk_data(pw: PrintWrapper, set_id: int, hk_data: bytes):
if set_id == SetId.CORE:
mppt_mode = hk_data[0]
current_idx = 1
@ -395,7 +394,9 @@ def handle_acu_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
f"Boot Count {bootcnt} | Uptime {uptime} sec | "
f"MPPT Time {mppt_time} msec | MPPT Period {mppt_period} msec"
)
printer.get_validity_buffer(validity_buffer=hk_data[current_idx:], num_vars=12)
FsfwTmTcPrinter.get_validity_buffer(
validity_buffer=hk_data[current_idx:], num_vars=12
)
if set_id == SetId.AUX:
current_idx = 0
fmt_str = "!BBB"
@ -430,7 +431,9 @@ def handle_acu_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
f"5:DAC|6:TempSens|7:Reserved"
)
dev_parser.print(pw=pw)
printer.get_validity_buffer(validity_buffer=hk_data[current_idx:], num_vars=8)
FsfwTmTcPrinter.get_validity_buffer(
validity_buffer=hk_data[current_idx:], num_vars=8
)
def handle_get_param_data_reply(
@ -553,8 +556,7 @@ def parse_name_list(data: bytes, name_len: int):
return ch_list
def handle_pcdu_hk(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
pw = PrintWrapper(printer)
def handle_pcdu_hk(pw: PrintWrapper, set_id: int, hk_data: bytes):
pw.dlog("Received PCDU HK")
if set_id == PcduSetIds.SWITCHER_SET:
current_idx = 0
@ -574,4 +576,4 @@ def handle_pcdu_hk(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
pw.dlog(f"{name.ljust(25)}: {val}")
pw.dlog(f"{'P60 Dock 5V Stack'.ljust(25)}: {p60_stack_5v_val}")
pw.dlog(f"{'P60 Dock 3V3 Stack'.ljust(25)}: {p60_stack_3v3_val}")
pw.printer.get_validity_buffer(hk_data[current_idx:], 4)
FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], 4)

View File

@ -147,8 +147,7 @@ def pack_rtd_commands(
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
def handle_rtd_hk(object_id: bytes, hk_data: bytes, printer: FsfwTmTcPrinter):
pw = PrintWrapper(printer)
def handle_rtd_hk(object_id: bytes, hk_data: bytes, pw: PrintWrapper):
rtd_name = RTD_NAMES.get(object_id)
if rtd_name is None:
rtd_name = "unknown RTD device"
@ -162,7 +161,7 @@ def handle_rtd_hk(object_id: bytes, hk_data: bytes, printer: FsfwTmTcPrinter):
pw.dlog(f"RTD Value: {rtd_val}")
pw.dlog(f"Error Byte: {error_byte}")
pw.dlog(f"Last Error Byte: {last_err_byte}")
pw.printer.get_validity_buffer(hk_data[fmt_len:], 4)
FsfwTmTcPrinter.get_validity_buffer(hk_data[fmt_len:], 4)
def prompt_rtd_idx():

View File

@ -14,7 +14,7 @@ _LOGGER = logging.getLogger(__name__)
def handle_thermal_controller_hk_data(
object_id: ObjectIdU32, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes
object_id: ObjectIdU32, pw: PrintWrapper, set_id: int, hk_data: bytes
):
# need a better solutuon for this is this is used again..
"""
@ -24,7 +24,6 @@ def handle_thermal_controller_hk_data(
TCP_TEMP_DEV_SERVER = TmTcpServer("localhost", 7306)
"""
if set_id == CtrlSetId.PRIMARY_SENSORS:
pw = PrintWrapper(printer)
pw.dlog("Received sensor temperature data")
# get all the floats
@ -58,7 +57,6 @@ def handle_thermal_controller_hk_data(
for idx, (k, v) in enumerate(parsed_data.items()):
print(f"{str(k).ljust(30)}: Valid: {valid_list[idx]}, Value: {v}")
elif set_id == CtrlSetId.DEVICE_SENSORS:
pw = PrintWrapper(printer)
pw.dlog("Received device temperature data")
fmt_str = "!fhhhhiiiifffhffffffffffffff"
fmt_len = struct.calcsize(fmt_str)
@ -94,7 +92,6 @@ def handle_thermal_controller_hk_data(
for idx, (k, v) in enumerate(parsed_data.items()):
print(f"{str(k).ljust(30)}: Valid: {valid_list[idx]}, Value: {v}")
elif set_id == CtrlSetId.SUS_TEMP_SENSORS:
pw = PrintWrapper(printer)
pw.dlog("Received SUS temperature data")
fmt_str = "!ffffffffffff"
fmt_len = struct.calcsize(fmt_str)