RW CMDs #61
0
pus_tm/devs/__init__.py
Normal file
0
pus_tm/devs/__init__.py
Normal file
115
pus_tm/devs/reaction_wheels.py
Normal file
115
pus_tm/devs/reaction_wheels.py
Normal file
@ -0,0 +1,115 @@
|
||||
import struct
|
||||
|
||||
from pus_tm.hk_handling import FsfwTmTcPrinter, PrintWrapper
|
||||
from tmtccmd.pus.obj_id import ObjectId
|
||||
|
||||
|
||||
def handle_rw_hk_data(
|
||||
printer: FsfwTmTcPrinter, object_id: ObjectId, set_id: int, hk_data: bytes
|
||||
):
|
||||
from pus_tc.devs.reaction_wheels import RwSetIds
|
||||
|
||||
pw = PrintWrapper(printer)
|
||||
current_idx = 0
|
||||
if set_id == RwSetIds.TEMPERATURE_SET_ID:
|
||||
pw.dlog(
|
||||
f"Received Temperature HK (ID {set_id}) from Reaction Wheel {object_id.name}"
|
||||
)
|
||||
temp = struct.unpack("!I", hk_data[0:4])
|
||||
pw.dlog(f"Temperature {temp}")
|
||||
current_idx += 4
|
||||
if set_id == RwSetIds.STATUS_SET_ID:
|
||||
pw.dlog(
|
||||
f"Received Status HK (ID {set_id}) from Reaction Wheel {object_id.name}"
|
||||
)
|
||||
fmt_str = "!iiBB"
|
||||
inc_len = struct.calcsize(fmt_str)
|
||||
(speed, ref_speed, state, clc_mode) = struct.unpack(
|
||||
fmt_str, hk_data[current_idx : current_idx + inc_len]
|
||||
)
|
||||
current_idx += inc_len
|
||||
pw.dlog(
|
||||
f"Speed {speed} rpm | Reference Speed {ref_speed} rpm | State {state} | CLC Mode {clc_mode}"
|
||||
)
|
||||
if set_id == RwSetIds.LAST_RESET:
|
||||
pw.dlog(
|
||||
f"Received Last Reset HK (ID {set_id}) from Reaction Wheel {object_id.name}"
|
||||
)
|
||||
fmt_str = "!BB"
|
||||
inc_len = struct.calcsize(fmt_str)
|
||||
(last_not_cleared_reset_status, current_reset_status) = struct.unpack(
|
||||
fmt_str, hk_data[current_idx : current_idx + inc_len]
|
||||
)
|
||||
current_idx += inc_len
|
||||
pw.dlog(
|
||||
f"Last Non-Cleared (Cached) Reset Status {last_not_cleared_reset_status} | "
|
||||
f"Current Reset Status {current_reset_status}"
|
||||
)
|
||||
if set_id == RwSetIds.TM_SET:
|
||||
pw.dlog(f"Received TM HK (ID {set_id}) from Reaction Wheel {object_id.name}")
|
||||
fmt_str = "!BiffBBiiIIIIIIIIIIIIIIII"
|
||||
inc_len = struct.calcsize(fmt_str)
|
||||
(
|
||||
last_reset_status,
|
||||
mcu_temp,
|
||||
pressure_sens_temp,
|
||||
pressure,
|
||||
state,
|
||||
clc_mode,
|
||||
current_speed,
|
||||
ref_speed,
|
||||
num_invalid_crc_packets,
|
||||
num_invalid_len_packets,
|
||||
num_invalid_cmd_packets,
|
||||
num_of_cmd_executed_requests,
|
||||
num_of_cmd_replies,
|
||||
uart_num_of_bytes_written,
|
||||
uart_num_of_bytes_read,
|
||||
uart_num_parity_errors,
|
||||
uart_num_noise_errors,
|
||||
uart_num_frame_errors,
|
||||
uart_num_reg_overrun_errors,
|
||||
uart_total_num_errors,
|
||||
spi_num_bytes_written,
|
||||
spi_num_bytes_read,
|
||||
spi_num_reg_overrun_errors,
|
||||
spi_total_num_errors,
|
||||
) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len])
|
||||
|
||||
pw.dlog(
|
||||
f"MCU Temperature {mcu_temp} | Pressure Sensore Temperature {pressure_sens_temp} C"
|
||||
)
|
||||
pw.dlog(f"Last Reset Status {last_reset_status}")
|
||||
pw.dlog(
|
||||
f"Current Limit Control mode {clc_mode}. 0: Low Current Mode (0.3 A), "
|
||||
f"1: High Current Mode (0.6 A)"
|
||||
)
|
||||
pw.dlog(f"Speed {current_speed} rpm | Reference Speed {ref_speed} rpm")
|
||||
pw.dlog(
|
||||
f"State {state}. 0: Error, 1: Idle, 2: Coasting, 3: Running, speed stable, "
|
||||
f"4: Running, speed changing"
|
||||
)
|
||||
pw.dlog(f"Number Of Invalid Packets:")
|
||||
pw.dlog("CRC | Length | CMD")
|
||||
pw.dlog(
|
||||
f"{num_invalid_crc_packets} | {num_invalid_len_packets} | {num_invalid_cmd_packets}"
|
||||
)
|
||||
pw.dlog(
|
||||
f"Num Of CMD Executed Requests {num_of_cmd_executed_requests} | "
|
||||
f"Num of CMD Replies {num_of_cmd_replies}"
|
||||
)
|
||||
pw.dlog("UART COM information:")
|
||||
pw.dlog(
|
||||
f"NumBytesWritten | NumBytesRead | ParityErr | NoiseErr | FrameErr | "
|
||||
f"RegOverrunErr | TotalErr"
|
||||
)
|
||||
pw.dlog(
|
||||
f"{uart_num_of_bytes_written} | {uart_num_of_bytes_read} | {uart_num_parity_errors} | "
|
||||
f"{uart_num_noise_errors} | {uart_num_frame_errors} | {uart_num_reg_overrun_errors} | "
|
||||
f"{uart_total_num_errors}"
|
||||
)
|
||||
pw.dlog("SPI COM Info:")
|
||||
if current_idx > 0:
|
||||
printer.print_validity_buffer(
|
||||
validity_buffer=hk_data[current_idx:], num_vars=27
|
||||
)
|
@ -18,10 +18,25 @@ from pus_tc.devs.imtq import ImtqSetIds
|
||||
from tmtccmd.pus.obj_id import ObjectId, ObjectIdDictT
|
||||
import config.object_ids as obj_ids
|
||||
|
||||
from .devs.reaction_wheel import handle_rw_hk_data
|
||||
|
||||
LOGGER = get_console_logger()
|
||||
|
||||
|
||||
class PrintWrapper:
|
||||
def __init__(self, printer: FsfwTmTcPrinter):
|
||||
self.printer = printer
|
||||
|
||||
def dlog(self, string: str):
|
||||
print(string)
|
||||
self.printer.file_logger.info(string)
|
||||
|
||||
|
||||
def log_to_both(printer: FsfwTmTcPrinter, string: str):
|
||||
print(string)
|
||||
printer.file_logger.info(string)
|
||||
|
||||
|
||||
def handle_hk_packet(
|
||||
raw_tm: bytes,
|
||||
obj_id_dict: ObjectIdDictT,
|
||||
@ -706,93 +721,3 @@ def handle_p60_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
|
||||
printer.print_validity_buffer(
|
||||
validity_buffer=hk_data[current_idx:], num_vars=27
|
||||
)
|
||||
|
||||
|
||||
def handle_rw_hk_data(
|
||||
printer: FsfwTmTcPrinter, object_id: ObjectId, set_id: int, hk_data: bytes
|
||||
):
|
||||
from pus_tc.devs.reaction_wheels import RwSetIds
|
||||
|
||||
current_idx = 0
|
||||
if set_id == RwSetIds.TEMPERATURE_SET_ID:
|
||||
log_to_both(
|
||||
printer,
|
||||
f"Received Temperature HK (ID {set_id}) from Reaction Wheel {object_id.name}",
|
||||
)
|
||||
temp = struct.unpack("!I", hk_data[0:4])
|
||||
log_to_both(printer, f"Temperature {temp}")
|
||||
current_idx += 4
|
||||
if set_id == RwSetIds.STATUS_SET_ID:
|
||||
log_to_both(
|
||||
printer,
|
||||
f"Received Status HK (ID {set_id}) from Reaction Wheel {object_id.name}",
|
||||
)
|
||||
fmt_str = "!iiBB"
|
||||
inc_len = struct.calcsize(fmt_str)
|
||||
(speed, ref_speed, state, clc_mode) = struct.unpack(
|
||||
fmt_str, hk_data[current_idx : current_idx + inc_len]
|
||||
)
|
||||
current_idx += inc_len
|
||||
log_to_both(
|
||||
printer,
|
||||
f"Speed {speed} rpm | Reference Speed {ref_speed} rpm | State {state} | CLC Mode {clc_mode}",
|
||||
)
|
||||
if set_id == RwSetIds.LAST_RESET:
|
||||
log_to_both(
|
||||
printer,
|
||||
f"Received Last Reset HK (ID {set_id}) from Reaction Wheel {object_id.name}",
|
||||
)
|
||||
fmt_str = "!BB"
|
||||
inc_len = struct.calcsize(fmt_str)
|
||||
(last_reset_status, curr_reset_status) = struct.unpack(
|
||||
fmt_str, hk_data[current_idx : current_idx + inc_len]
|
||||
)
|
||||
current_idx += inc_len
|
||||
log_to_both(
|
||||
printer,
|
||||
f"Last Reset Status {last_reset_status} | Current Reset Status {curr_reset_status}",
|
||||
)
|
||||
if set_id == RwSetIds.TM_SET:
|
||||
log_to_both(
|
||||
printer,
|
||||
f"Received TM HK (ID {set_id}) from Reaction Wheel {object_id.name}",
|
||||
)
|
||||
fmt_str = "!BiffBBiiIIIIIIIIIIIIIIII"
|
||||
inc_len = struct.calcsize(fmt_str)
|
||||
(
|
||||
last_reset_status,
|
||||
mcu_temp,
|
||||
pressure_sens_temp,
|
||||
pressure,
|
||||
state,
|
||||
clc_mode,
|
||||
current_speed,
|
||||
ref_speed,
|
||||
num_invalid_crc_packets,
|
||||
num_invalid_len_packets,
|
||||
num_invalid_cmd_packets,
|
||||
num_of_cmd_executed_requests,
|
||||
num_of_cmd_replies,
|
||||
uart_num_of_bytes_written,
|
||||
uart_num_of_bytes_read,
|
||||
uart_num_parity_errors,
|
||||
uart_num_noise_errors,
|
||||
uart_num_frame_errors,
|
||||
uart_num_reg_overrun_errors,
|
||||
uart_total_num_errors,
|
||||
spi_num_bytes_written,
|
||||
spi_num_bytes_read,
|
||||
spi_num_reg_overrun_errors,
|
||||
spi_total_num_errors,
|
||||
) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len])
|
||||
pass
|
||||
if current_idx > 0:
|
||||
printer.print_validity_buffer(
|
||||
validity_buffer=hk_data[current_idx:], num_vars=27
|
||||
)
|
||||
pass
|
||||
|
||||
|
||||
def log_to_both(printer: FsfwTmTcPrinter, string: str):
|
||||
print(string)
|
||||
printer.file_logger.info(string)
|
||||
|
Loading…
Reference in New Issue
Block a user