Rework logging handling #194

Merged
muellerr merged 7 commits from rework_logging_handling into v4.0.0-dev 2023-06-09 12:44:00 +02:00
24 changed files with 217 additions and 237 deletions

View File

@ -8,7 +8,7 @@ from eive_tmtc.tmtc.payload.ploc_supervisor import SupvActionId
from eive_tmtc.tmtc.acs.star_tracker import StarTrackerActionId from eive_tmtc.tmtc.acs.star_tracker import StarTrackerActionId
from eive_tmtc.tmtc.power.tm import handle_get_param_data_reply from eive_tmtc.tmtc.power.tm import handle_get_param_data_reply
from tmtccmd.tm import Service8FsfwTm from tmtccmd.tm import Service8FsfwTm
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter
from spacepackets.ccsds.time import CdsShortTimestamp from spacepackets.ccsds.time import CdsShortTimestamp
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -24,7 +24,7 @@ def handle_action_reply(
raw_telemetry=raw_tm, time_reader=CdsShortTimestamp.empty() raw_telemetry=raw_tm, time_reader=CdsShortTimestamp.empty()
) )
object_id = obj_id_dict.get(tm_packet.source_object_id_as_bytes) object_id = obj_id_dict.get(tm_packet.source_object_id_as_bytes)
pw = PrintWrapper(printer) pw = PrintWrapper(printer.file_logger)
custom_data = tm_packet.custom_data custom_data = tm_packet.custom_data
action_id = tm_packet.action_id action_id = tm_packet.action_id
generic_print_str = printer.generic_action_packet_tm_print( generic_print_str = printer.generic_action_packet_tm_print(
@ -32,15 +32,15 @@ def handle_action_reply(
) )
pw.dlog(generic_print_str) pw.dlog(generic_print_str)
if object_id.as_bytes == IMTQ_HANDLER_ID: if object_id.as_bytes == IMTQ_HANDLER_ID:
return handle_imtq_replies(action_id, printer, custom_data) return handle_imtq_replies(action_id, pw, custom_data)
elif object_id.as_bytes == PLOC_MPSOC_ID: elif object_id.as_bytes == PLOC_MPSOC_ID:
return handle_mpsoc_data_reply(action_id, printer, custom_data) return handle_mpsoc_data_reply(action_id, pw, custom_data)
elif object_id.as_bytes == PLOC_SUPV_ID: elif object_id.as_bytes == PLOC_SUPV_ID:
return handle_supervisor_replies(action_id, printer, custom_data) return handle_supervisor_replies(action_id, pw, custom_data)
elif object_id.as_bytes == CORE_CONTROLLER_ID: elif object_id.as_bytes == CORE_CONTROLLER_ID:
return handle_core_ctrl_action_replies(action_id, printer, custom_data) return handle_core_ctrl_action_replies(action_id, pw, custom_data)
elif object_id.as_bytes == STAR_TRACKER_ID: elif object_id.as_bytes == STAR_TRACKER_ID:
return handle_startracker_replies(action_id, printer, custom_data) return handle_startracker_replies(action_id, pw, custom_data)
elif object_id.as_bytes in [ elif object_id.as_bytes in [
ACU_HANDLER_ID, ACU_HANDLER_ID,
PDU_1_HANDLER_ID, PDU_1_HANDLER_ID,
@ -53,9 +53,7 @@ def handle_action_reply(
pw.dlog(f"Raw Data: {tm_packet.custom_data.hex(sep=',')}") pw.dlog(f"Raw Data: {tm_packet.custom_data.hex(sep=',')}")
def handle_imtq_replies( def handle_imtq_replies(action_id: int, pw: PrintWrapper, custom_data: bytearray):
action_id: int, printer: FsfwTmTcPrinter, custom_data: bytearray
):
if action_id == struct.unpack("!I", ImtqActionId.get_commanded_dipole)[0]: if action_id == struct.unpack("!I", ImtqActionId.get_commanded_dipole)[0]:
header_list = [ header_list = [
"Commanded X-Dipole", "Commanded X-Dipole",
@ -64,33 +62,25 @@ def handle_imtq_replies(
] ]
[x_dipole, y_dipole, z_dipole] = struct.unpack("!HHH", custom_data[0:6]) [x_dipole, y_dipole, z_dipole] = struct.unpack("!HHH", custom_data[0:6])
content_list = [x_dipole, y_dipole, z_dipole] content_list = [x_dipole, y_dipole, z_dipole]
print(header_list) pw.dlog(f"{header_list}")
print(content_list) pw.dlog(f"{content_list}")
printer.file_logger.info(header_list)
printer.file_logger.info(content_list)
def handle_supervisor_replies( def handle_supervisor_replies(action_id: int, pw: PrintWrapper, custom_data: bytearray):
action_id: int, printer: FsfwTmTcPrinter, custom_data: bytearray
):
if action_id == SupvActionId.DUMP_MRAM: if action_id == SupvActionId.DUMP_MRAM:
header_list = ["MRAM Dump"] header_list = ["MRAM Dump"]
content_list = [custom_data[: len(custom_data)]] content_list = [custom_data[: len(custom_data)]]
print(header_list) pw.dlog(f"{header_list}")
print(content_list) pw.dlog(f"{content_list}")
printer.file_logger.info(header_list)
printer.file_logger.info(content_list)
elif action_id == SupvActionId.READ_GPIO: elif action_id == SupvActionId.READ_GPIO:
header_list = ["GPIO state"] header_list = ["GPIO state"]
content_list = [struct.unpack("!H", custom_data[:2])[0]] content_list = [struct.unpack("!H", custom_data[:2])[0]]
print(header_list) pw.dlog(f"{header_list}")
print(content_list) pw.dlog(f"{content_list}")
printer.file_logger.info(header_list)
printer.file_logger.info(content_list)
def handle_startracker_replies( def handle_startracker_replies(
action_id: int, printer: FsfwTmTcPrinter, custom_data: bytearray action_id: int, pw: PrintWrapper, custom_data: bytearray
): ):
if action_id == StarTrackerActionId.CHECKSUM: if action_id == StarTrackerActionId.CHECKSUM:
if len(custom_data) != 5: if len(custom_data) != 5:
@ -102,7 +92,5 @@ def handle_startracker_replies(
print(custom_data[4]) print(custom_data[4])
checksum_valid_flag = custom_data[4] >> 8 checksum_valid_flag = custom_data[4] >> 8
content_list = ["0x" + custom_data[:4].hex(), checksum_valid_flag] content_list = ["0x" + custom_data[:4].hex(), checksum_valid_flag]
print(header_list) pw.dlog(f"{header_list}")
print(content_list) pw.dlog(f"{content_list}")
printer.file_logger.info(header_list)
printer.file_logger.info(content_list)

View File

@ -1,19 +1,27 @@
import logging import logging
from typing import Optional
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter
class PrintWrapper: class PrintWrapper:
def __init__(self, printer: FsfwTmTcPrinter): def __init__(self, file_logger: Optional[logging.Logger]):
self.printer = printer self.file_logger = file_logger
def dlog(self, string: str): def dlog(self, string: str):
print(string) print(string)
self.printer.file_logger.info(string) if self.file_logger:
self.file_logger.info(string)
def wlog(self, logger: logging.Logger, string: str):
logger.warning(string)
if self.file_logger:
self.file_logger.warning(string)
def ilog(self, logger: logging.Logger, string: str): def ilog(self, logger: logging.Logger, string: str):
logger.info(string) logger.info(string)
self.printer.file_logger.info(string) if self.file_logger:
self.file_logger.info(string)
def log_to_both(printer: FsfwTmTcPrinter, string: str): def log_to_both(printer: FsfwTmTcPrinter, string: str):

View File

@ -1,6 +1,5 @@
import logging import logging
import datetime import datetime
import struct
import sys import sys
from eive_tmtc.config.events import get_event_dict from eive_tmtc.config.events import get_event_dict
@ -12,15 +11,13 @@ from tmtccmd.tc.pus_200_fsfw_mode import Mode
from tmtccmd.tc.pus_201_fsfw_health import FsfwHealth from tmtccmd.tc.pus_201_fsfw_health import FsfwHealth
from tmtccmd.tm import Service5Tm from tmtccmd.tm import Service5Tm
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter
from tmtccmd.fsfw import EventInfo from tmtccmd.fsfw import EventInfo
from spacepackets.ccsds.time import CdsShortTimestamp from spacepackets.ccsds.time import CdsShortTimestamp
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
def handle_event_packet(raw_tm: bytes, printer: FsfwTmTcPrinter): def handle_event_packet(raw_tm: bytes, pw: PrintWrapper):
pw = PrintWrapper(printer)
tm = Service5Tm.unpack(data=raw_tm, time_reader=CdsShortTimestamp.empty()) tm = Service5Tm.unpack(data=raw_tm, time_reader=CdsShortTimestamp.empty())
event_dict = get_event_dict() event_dict = get_event_dict()
event_def = tm.event_definition event_def = tm.event_definition
@ -38,7 +35,7 @@ def handle_event_packet(raw_tm: bytes, printer: FsfwTmTcPrinter):
obj_name = obj_id_obj.name obj_name = obj_id_obj.name
generic_event_string = f"Object {obj_name} generated Event {info.name} (ID: {event_def.event_id:#04x}) at {tm.time_provider.as_date_time()}" generic_event_string = f"Object {obj_name} generated Event {info.name} (ID: {event_def.event_id:#04x}) at {tm.time_provider.as_date_time()}"
_LOGGER.info(generic_event_string) _LOGGER.info(generic_event_string)
pw.printer.file_logger.info( pw.file_logger.info(
f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}: {generic_event_string}" f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}: {generic_event_string}"
) )
specific_handler = False specific_handler = False

View File

@ -13,7 +13,7 @@ from tmtccmd.tm import Service20FsfwTm, Service200FsfwTm
from tmtccmd.tm.pus_20_fsfw_param import Service20ParamDumpWrapper from tmtccmd.tm.pus_20_fsfw_param import Service20ParamDumpWrapper
from tmtccmd.pus.s20_fsfw_param_defs import CustomSubservice as ParamSubservice from tmtccmd.pus.s20_fsfw_param_defs import CustomSubservice as ParamSubservice
from tmtccmd.tm.pus_200_fsfw_mode import Subservice as ModeSubservice from tmtccmd.tm.pus_200_fsfw_mode import Subservice as ModeSubservice
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter
from .defs import PrintWrapper from .defs import PrintWrapper
from .event_handler import handle_event_packet from .event_handler import handle_event_packet
@ -42,14 +42,14 @@ def pus_factory_hook(
return return
service = tm_packet.service service = tm_packet.service
obj_id_dict = get_object_ids() obj_id_dict = get_object_ids()
pw = PrintWrapper(printer) pw = PrintWrapper(printer.file_logger)
dedicated_handler = True dedicated_handler = True
if service == 1: if service == 1:
handle_service_1_fsfw_packet(wrapper=verif_wrapper, raw_tm=packet) handle_service_1_fsfw_packet(wrapper=verif_wrapper, raw_tm=packet)
elif service == 3: elif service == 3:
handle_hk_packet(printer=printer, raw_tm=packet, obj_id_dict=obj_id_dict) handle_hk_packet(printer=printer, raw_tm=packet, obj_id_dict=obj_id_dict)
elif service == 5: elif service == 5:
handle_event_packet(raw_tm=packet, printer=printer) handle_event_packet(raw_tm=packet, pw=pw)
elif service == 8: elif service == 8:
handle_action_reply(raw_tm=packet, printer=printer, obj_id_dict=obj_id_dict) handle_action_reply(raw_tm=packet, printer=printer, obj_id_dict=obj_id_dict)
elif service == 17: elif service == 17:

View File

@ -34,7 +34,7 @@ from eive_tmtc.tmtc.power.tm import (
from eive_tmtc.tmtc.acs.imtq import ( from eive_tmtc.tmtc.acs.imtq import (
handle_imtq_hk, handle_imtq_hk,
) )
from eive_tmtc.pus_tm.defs import FsfwTmTcPrinter from eive_tmtc.pus_tm.defs import FsfwTmTcPrinter, PrintWrapper
from eive_tmtc.tmtc.core import handle_core_hk_data from eive_tmtc.tmtc.core import handle_core_hk_data
from eive_tmtc.tmtc.acs.mgms import handle_mgm_hk_data from eive_tmtc.tmtc.acs.mgms import handle_mgm_hk_data
import eive_tmtc.config.object_ids as obj_ids import eive_tmtc.config.object_ids as obj_ids
@ -95,40 +95,35 @@ def handle_regular_hk_print(
objb = object_id.as_bytes objb = object_id.as_bytes
set_id = hk_packet.set_id set_id = hk_packet.set_id
packet_dt = tm.time_provider.as_date_time() packet_dt = tm.time_provider.as_date_time()
pw = PrintWrapper(printer.file_logger)
"""This function is called when a Service 3 Housekeeping packet is received.""" """This function is called when a Service 3 Housekeeping packet is received."""
if objb in [obj_ids.RW1_ID, obj_ids.RW2_ID, obj_ids.RW3_ID, obj_ids.RW4_ID]: if objb in [obj_ids.RW1_ID, obj_ids.RW2_ID, obj_ids.RW3_ID, obj_ids.RW4_ID]:
return handle_rw_hk_data(printer, object_id, set_id, hk_data) return handle_rw_hk_data(pw, object_id, set_id, hk_data)
elif objb == obj_ids.SYRLINKS_HANDLER_ID: elif objb == obj_ids.SYRLINKS_HANDLER_ID:
return handle_syrlinks_hk_data(printer=printer, hk_data=hk_data, set_id=set_id) return handle_syrlinks_hk_data(pw=pw, hk_data=hk_data, set_id=set_id)
elif objb == obj_ids.IMTQ_HANDLER_ID: elif objb == obj_ids.IMTQ_HANDLER_ID:
return handle_imtq_hk(printer=printer, hk_data=hk_data, set_id=set_id) return handle_imtq_hk(pw=pw, hk_data=hk_data, set_id=set_id)
elif objb == obj_ids.GPS_CONTROLLER: elif objb == obj_ids.GPS_CONTROLLER:
return handle_gps_data(printer=printer, hk_data=hk_data) return handle_gps_data(pw=pw, hk_data=hk_data)
elif objb == obj_ids.PCDU_HANDLER_ID: elif objb == obj_ids.PCDU_HANDLER_ID:
return handle_pcdu_hk(printer=printer, set_id=set_id, hk_data=hk_data) return handle_pcdu_hk(pw=pw, set_id=set_id, hk_data=hk_data)
elif objb == obj_ids.BPX_HANDLER_ID: elif objb == obj_ids.BPX_HANDLER_ID:
return handle_bpx_hk_data(hk_data=hk_data, set_id=set_id, printer=printer) return handle_bpx_hk_data(hk_data=hk_data, set_id=set_id, pw=pw)
elif objb == obj_ids.CORE_CONTROLLER_ID: elif objb == obj_ids.CORE_CONTROLLER_ID:
return handle_core_hk_data(printer=printer, hk_data=hk_data, set_id=set_id) return handle_core_hk_data(pw=pw, hk_data=hk_data, set_id=set_id)
elif objb == obj_ids.PDU_1_HANDLER_ID: elif objb == obj_ids.PDU_1_HANDLER_ID:
return handle_pdu_data( return handle_pdu_data(pw=pw, pdu_idx=1, set_id=set_id, hk_data=hk_data)
printer=printer, pdu_idx=1, set_id=set_id, hk_data=hk_data
)
elif objb == obj_ids.PDU_2_HANDLER_ID: elif objb == obj_ids.PDU_2_HANDLER_ID:
return handle_pdu_data( return handle_pdu_data(pw=pw, pdu_idx=2, set_id=set_id, hk_data=hk_data)
printer=printer, pdu_idx=2, set_id=set_id, hk_data=hk_data
)
elif objb == obj_ids.PLOC_MPSOC_ID: elif objb == obj_ids.PLOC_MPSOC_ID:
return handle_ploc_mpsoc_hk_data( return handle_ploc_mpsoc_hk_data(pw=pw, hk_data=hk_data, set_id=set_id)
printer=printer, hk_data=hk_data, set_id=set_id
)
elif objb == obj_ids.ACU_HANDLER_ID: elif objb == obj_ids.ACU_HANDLER_ID:
return handle_acu_hk_data(printer=printer, hk_data=hk_data, set_id=set_id) return handle_acu_hk_data(pw=pw, hk_data=hk_data, set_id=set_id)
elif objb == obj_ids.RAD_SENSOR_ID: elif objb == obj_ids.RAD_SENSOR_ID:
return handle_rad_sensor_data(printer=printer, hk_data=hk_data, set_id=set_id) return handle_rad_sensor_data(pw=pw, hk_data=hk_data, set_id=set_id)
elif objb in [obj_ids.RW1_ID, obj_ids.RW2_ID, obj_ids.RW3_ID, obj_ids.RW4_ID]: elif objb in [obj_ids.RW1_ID, obj_ids.RW2_ID, obj_ids.RW3_ID, obj_ids.RW4_ID]:
return handle_rw_hk_data( return handle_rw_hk_data(
printer=printer, object_id=object_id, set_id=set_id, hk_data=hk_data pw=pw, object_id=object_id, set_id=set_id, hk_data=hk_data
) )
if objb in [ if objb in [
obj_ids.SUS_0_N_LOC_XFYFZM_PT_XF, obj_ids.SUS_0_N_LOC_XFYFZM_PT_XF,
@ -144,13 +139,11 @@ def handle_regular_hk_print(
obj_ids.SUS_10_R_LOC_XMYBZF_PT_ZF, obj_ids.SUS_10_R_LOC_XMYBZF_PT_ZF,
obj_ids.SUS_11_R_LOC_XBYMZB_PT_ZB, obj_ids.SUS_11_R_LOC_XBYMZB_PT_ZB,
]: ]:
return handle_sus_hk( return handle_sus_hk(object_id=object_id, hk_data=hk_data, pw=pw, set_id=set_id)
object_id=object_id, hk_data=hk_data, printer=printer, set_id=set_id
)
elif objb in RTD_NAMES.keys(): elif objb in RTD_NAMES.keys():
return handle_rtd_hk(object_id=objb, hk_data=hk_data, printer=printer) return handle_rtd_hk(object_id=objb, hk_data=hk_data, pw=pw)
elif objb == obj_ids.P60_DOCK_HANDLER: elif objb == obj_ids.P60_DOCK_HANDLER:
return handle_p60_hk_data(printer=printer, set_id=set_id, hk_data=hk_data) return handle_p60_hk_data(pw=pw, set_id=set_id, hk_data=hk_data)
elif objb in [ elif objb in [
obj_ids.GYRO_0_ADIS_HANDLER_ID, obj_ids.GYRO_0_ADIS_HANDLER_ID,
obj_ids.GYRO_1_L3G_HANDLER_ID, obj_ids.GYRO_1_L3G_HANDLER_ID,
@ -158,7 +151,7 @@ def handle_regular_hk_print(
obj_ids.GYRO_3_L3G_HANDLER_ID, obj_ids.GYRO_3_L3G_HANDLER_ID,
]: ]:
return handle_gyros_hk_data( return handle_gyros_hk_data(
object_id=object_id, hk_data=hk_data, printer=printer, set_id=set_id object_id=object_id, hk_data=hk_data, pw=pw, set_id=set_id
) )
elif objb in [ elif objb in [
obj_ids.MGM_0_LIS3_HANDLER_ID, obj_ids.MGM_0_LIS3_HANDLER_ID,
@ -167,21 +160,21 @@ def handle_regular_hk_print(
obj_ids.MGM_3_RM3100_HANDLER_ID, obj_ids.MGM_3_RM3100_HANDLER_ID,
]: ]:
return handle_mgm_hk_data( return handle_mgm_hk_data(
object_id=object_id, hk_data=hk_data, printer=printer, set_id=set_id object_id=object_id, hk_data=hk_data, pw=pw, set_id=set_id
) )
elif objb == obj_ids.PL_PCDU_ID: elif objb == obj_ids.PL_PCDU_ID:
return handle_plpcdu_hk(set_id=set_id, hk_data=hk_data, printer=printer) return handle_plpcdu_hk(set_id=set_id, hk_data=hk_data, pw=pw)
elif objb == obj_ids.THERMAL_CONTROLLER_ID: elif objb == obj_ids.THERMAL_CONTROLLER_ID:
return handle_thermal_controller_hk_data( return handle_thermal_controller_hk_data(
object_id=object_id, printer=printer, set_id=set_id, hk_data=hk_data object_id=object_id, pw=pw, set_id=set_id, hk_data=hk_data
) )
elif objb == obj_ids.STAR_TRACKER_ID: elif objb == obj_ids.STAR_TRACKER_ID:
return handle_str_hk_data(set_id=set_id, hk_data=hk_data, printer=printer) return handle_str_hk_data(set_id=set_id, hk_data=hk_data, pw=pw)
elif objb == obj_ids.PLOC_SUPV_ID: elif objb == obj_ids.PLOC_SUPV_ID:
return handle_supv_hk_data(set_id=set_id, hk_data=hk_data, printer=printer) return handle_supv_hk_data(set_id=set_id, hk_data=hk_data, pw=pw)
elif objb == obj_ids.ACS_CONTROLLER: elif objb == obj_ids.ACS_CONTROLLER:
return handle_acs_ctrl_hk_data( return handle_acs_ctrl_hk_data(
printer=printer, set_id=set_id, hk_data=hk_data, packet_time=packet_dt pw=pw, set_id=set_id, hk_data=hk_data, packet_time=packet_dt
) )
else: else:
_LOGGER.info( _LOGGER.info(

View File

@ -28,7 +28,7 @@ from tmtccmd.tc.pus_3_fsfw_hk import (
create_request_one_diag_command, create_request_one_diag_command,
) )
from tmtccmd.pus.s8_fsfw_funccmd import create_action_cmd from tmtccmd.pus.s8_fsfw_funccmd import create_action_cmd
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter
from tmtccmd.tc.pus_20_fsfw_param import create_load_param_cmd from tmtccmd.tc.pus_20_fsfw_param import create_load_param_cmd
@ -672,12 +672,11 @@ def set_acs_ctrl_param_matrix(q: DefaultPusQueueHelper):
def handle_acs_ctrl_hk_data( def handle_acs_ctrl_hk_data(
printer: FsfwTmTcPrinter, pw: PrintWrapper,
set_id: int, set_id: int,
hk_data: bytes, hk_data: bytes,
packet_time: datetime.datetime, packet_time: datetime.datetime,
): ):
pw = PrintWrapper(printer)
pw.ilog(_LOGGER, f"Received ACS CTRL HK with packet time {packet_time}") pw.ilog(_LOGGER, f"Received ACS CTRL HK with packet time {packet_time}")
match set_id: match set_id:
case SetId.MGM_RAW_SET: case SetId.MGM_RAW_SET:
@ -721,7 +720,7 @@ def handle_acs_ctrl_sus_raw_data(pw: PrintWrapper, hk_data: bytes):
sus_list_formatted = vec_fmt.format(*sus_list) sus_list_formatted = vec_fmt.format(*sus_list)
current_idx += length current_idx += length
pw.dlog(f"SUS {idx} RAW: {sus_list_formatted}") pw.dlog(f"SUS {idx} RAW: {sus_list_formatted}")
pw.printer.print_validity_buffer(hk_data[current_idx:], num_vars=12) FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=12)
def handle_acs_ctrl_sus_processed_data(pw: PrintWrapper, hk_data: bytes): def handle_acs_ctrl_sus_processed_data(pw: PrintWrapper, hk_data: bytes):
@ -757,7 +756,7 @@ def handle_acs_ctrl_sus_processed_data(pw: PrintWrapper, hk_data: bytes):
sun_ijk_model = vec_fmt.format(*sun_ijk_model) sun_ijk_model = vec_fmt.format(*sun_ijk_model)
current_idx += inc_len current_idx += inc_len
pw.dlog(f"{'SUS ijk Model'.ljust(25)}: {sun_ijk_model}") pw.dlog(f"{'SUS ijk Model'.ljust(25)}: {sun_ijk_model}")
pw.printer.print_validity_buffer(hk_data[current_idx:], num_vars=15) FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=15)
def handle_raw_mgm_data(pw: PrintWrapper, hk_data: bytes): def handle_raw_mgm_data(pw: PrintWrapper, hk_data: bytes):
@ -812,7 +811,7 @@ def handle_raw_mgm_data(pw: PrintWrapper, hk_data: bytes):
pw.dlog(f"{entry[0].ljust(28)}: {entry[1]}") pw.dlog(f"{entry[0].ljust(28)}: {entry[1]}")
current_idx += 1 current_idx += 1
assert current_idx == 61 assert current_idx == 61
pw.printer.print_validity_buffer(hk_data[current_idx:], num_vars=6) FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=6)
def handle_mgm_data_processed(pw: PrintWrapper, hk_data: bytes): def handle_mgm_data_processed(pw: PrintWrapper, hk_data: bytes):
@ -866,7 +865,7 @@ def handle_mgm_data_processed(pw: PrintWrapper, hk_data: bytes):
current_idx += inc_len current_idx += inc_len
if PERFORM_MGM_CALIBRATION: if PERFORM_MGM_CALIBRATION:
perform_mgm_calibration(pw, mgm_3) perform_mgm_calibration(pw, mgm_3)
pw.printer.print_validity_buffer(hk_data[current_idx:], num_vars=8) FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=8)
def handle_gyr_data_raw(pw: PrintWrapper, hk_data: bytes): def handle_gyr_data_raw(pw: PrintWrapper, hk_data: bytes):
@ -900,7 +899,7 @@ def handle_gyr_data_raw(pw: PrintWrapper, hk_data: bytes):
pw.dlog(f"{'GYR 1 L3'.ljust(15)}: {float_str_fmt.format(*gyr_1_l3)}") pw.dlog(f"{'GYR 1 L3'.ljust(15)}: {float_str_fmt.format(*gyr_1_l3)}")
pw.dlog(f"{'GYR 2 ADIS'.ljust(15)}: {float_str_fmt.format(*gyr_2_adis)}") pw.dlog(f"{'GYR 2 ADIS'.ljust(15)}: {float_str_fmt.format(*gyr_2_adis)}")
pw.dlog(f"{'GYR 3 L3'.ljust(15)}: {float_str_fmt.format(*gyr_3_l3)}") pw.dlog(f"{'GYR 3 L3'.ljust(15)}: {float_str_fmt.format(*gyr_3_l3)}")
pw.printer.print_validity_buffer(hk_data[current_idx:], 4) FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], 4)
GYR_NAMES = ["GYR 0 ADIS", "GYR 1 L3", "GYR 2 ADIS", "GYR 3 L3"] GYR_NAMES = ["GYR 0 ADIS", "GYR 1 L3", "GYR 2 ADIS", "GYR 3 L3"]
@ -926,7 +925,7 @@ def handle_gyr_data_processed(pw: PrintWrapper, hk_data: bytes):
] ]
pw.dlog(f"GYR Vec Total: {gyr_vec_tot}") pw.dlog(f"GYR Vec Total: {gyr_vec_tot}")
current_idx += inc_len current_idx += inc_len
pw.printer.print_validity_buffer(hk_data[current_idx:], num_vars=5) FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=5)
def handle_gps_data_processed(pw: PrintWrapper, hk_data: bytes): def handle_gps_data_processed(pw: PrintWrapper, hk_data: bytes):
@ -979,7 +978,7 @@ def handle_gps_data_processed(pw: PrintWrapper, hk_data: bytes):
pw.dlog(f"GPS Altitude: {alt} [m]") pw.dlog(f"GPS Altitude: {alt} [m]")
pw.dlog(f"GPS Position: {pos} [m]") pw.dlog(f"GPS Position: {pos} [m]")
pw.dlog(f"GPS Velocity: {velo} [m/s]") pw.dlog(f"GPS Velocity: {velo} [m/s]")
pw.printer.print_validity_buffer(hk_data[current_idx:], num_vars=5) FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=5)
def handle_mekf_data(pw: PrintWrapper, hk_data: bytes): def handle_mekf_data(pw: PrintWrapper, hk_data: bytes):
@ -1023,7 +1022,7 @@ def handle_mekf_data(pw: PrintWrapper, hk_data: bytes):
pw.dlog(f"{'MEKF Raw Status (key unknown)'.ljust(25)}: {status}") pw.dlog(f"{'MEKF Raw Status (key unknown)'.ljust(25)}: {status}")
pw.dlog(f"{'MEKF Quaternion'.ljust(25)}: {fmt_str_4.format(*quat)}") pw.dlog(f"{'MEKF Quaternion'.ljust(25)}: {fmt_str_4.format(*quat)}")
pw.dlog(f"{'MEKF Rotational Rate'.ljust(25)}: {fmt_str_3.format(*rates)}") pw.dlog(f"{'MEKF Rotational Rate'.ljust(25)}: {fmt_str_3.format(*rates)}")
pw.printer.print_validity_buffer(hk_data[current_idx:], num_vars=3) FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=3)
def handle_ctrl_val_data(pw: PrintWrapper, hk_data: bytes): def handle_ctrl_val_data(pw: PrintWrapper, hk_data: bytes):
@ -1091,7 +1090,7 @@ def handle_ctrl_val_data(pw: PrintWrapper, hk_data: bytes):
pw.dlog(f"Control Values Error Quaternion: {err_quat}") pw.dlog(f"Control Values Error Quaternion: {err_quat}")
pw.dlog(f"Control Values Error Angle: {err_ang} [deg]") pw.dlog(f"Control Values Error Angle: {err_ang} [deg]")
pw.dlog(f"Control Values Target Rotational Rate: {tgt_rot} [deg/s]") pw.dlog(f"Control Values Target Rotational Rate: {tgt_rot} [deg/s]")
pw.printer.print_validity_buffer(hk_data[current_idx:], num_vars=5) FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=5)
def handle_act_cmd_data(pw: PrintWrapper, hk_data: bytes): def handle_act_cmd_data(pw: PrintWrapper, hk_data: bytes):
@ -1130,7 +1129,7 @@ def handle_act_cmd_data(pw: PrintWrapper, hk_data: bytes):
pw.dlog(f"Actuator Commands RW Target Torque: {rw_tgt_torque}") pw.dlog(f"Actuator Commands RW Target Torque: {rw_tgt_torque}")
pw.dlog(f"Actuator Commands RW Target Speed: {rw_tgt_speed}") pw.dlog(f"Actuator Commands RW Target Speed: {rw_tgt_speed}")
pw.dlog(f"Actuator Commands MTQ Target Dipole: {mtq_tgt_dipole}") pw.dlog(f"Actuator Commands MTQ Target Dipole: {mtq_tgt_dipole}")
pw.printer.print_validity_buffer(hk_data[current_idx:], num_vars=3) FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=3)
def perform_mgm_calibration(pw: PrintWrapper, mgm_tuple: Tuple): def perform_mgm_calibration(pw: PrintWrapper, mgm_tuple: Tuple):

View File

@ -15,7 +15,7 @@ from tmtccmd.tc.pus_3_fsfw_hk import (
create_enable_periodic_hk_command_with_interval, create_enable_periodic_hk_command_with_interval,
create_disable_periodic_hk_command, create_disable_periodic_hk_command,
) )
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -87,8 +87,7 @@ def pack_gps_command(object_id: bytes, q: DefaultPusQueueHelper, op_code: str):
q.add_pus_tc(create_mode_command(object_id, Mode.OFF, 0)) q.add_pus_tc(create_mode_command(object_id, Mode.OFF, 0))
def handle_gps_data(printer: FsfwTmTcPrinter, hk_data: bytes): def handle_gps_data(pw: PrintWrapper, hk_data: bytes):
pw = PrintWrapper(printer)
pw.dlog(f"Received GPS data, HK data length {len(hk_data)}") pw.dlog(f"Received GPS data, HK data length {len(hk_data)}")
current_idx = 0 current_idx = 0
fmt_str = "!ddddBBBHBBBBBI" fmt_str = "!ddddBBBHBBBBBI"
@ -124,4 +123,6 @@ def handle_gps_data(printer: FsfwTmTcPrinter, hk_data: bytes):
) )
pw.dlog(f"GNSS Date: {date_string}") pw.dlog(f"GNSS Date: {date_string}")
pw.dlog(f"Unix seconds {unix_seconds}") pw.dlog(f"Unix seconds {unix_seconds}")
printer.print_validity_buffer(validity_buffer=hk_data[current_idx:], num_vars=14) FsfwTmTcPrinter.get_validity_buffer(
validity_buffer=hk_data[current_idx:], num_vars=14
)

View File

@ -25,7 +25,6 @@ from eive_tmtc.config.definitions import CustomServiceList
from eive_tmtc.pus_tm.defs import PrintWrapper from eive_tmtc.pus_tm.defs import PrintWrapper
from tmtccmd.util import ObjectIdU32 from tmtccmd.util import ObjectIdU32
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter
class OpCode: class OpCode:
@ -107,29 +106,24 @@ def handle_gyr_cmd(q: DefaultPusQueueHelper, op_code: str):
def handle_gyros_hk_data( def handle_gyros_hk_data(
object_id: ObjectIdU32, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes object_id: ObjectIdU32, pw: PrintWrapper, set_id: int, hk_data: bytes
): ):
if object_id.as_bytes in [ if object_id.as_bytes in [
obj_ids.GYRO_0_ADIS_HANDLER_ID, obj_ids.GYRO_0_ADIS_HANDLER_ID,
obj_ids.GYRO_2_ADIS_HANDLER_ID, obj_ids.GYRO_2_ADIS_HANDLER_ID,
]: ]:
handle_adis_gyro_hk( handle_adis_gyro_hk(object_id=object_id, pw=pw, set_id=set_id, hk_data=hk_data)
object_id=object_id, printer=printer, set_id=set_id, hk_data=hk_data
)
elif object_id.as_bytes in [ elif object_id.as_bytes in [
obj_ids.GYRO_1_L3G_HANDLER_ID, obj_ids.GYRO_1_L3G_HANDLER_ID,
obj_ids.GYRO_3_L3G_HANDLER_ID, obj_ids.GYRO_3_L3G_HANDLER_ID,
]: ]:
handle_l3g_gyro_hk( handle_l3g_gyro_hk(object_id=object_id, pw=pw, set_id=set_id, hk_data=hk_data)
object_id=object_id, printer=printer, set_id=set_id, hk_data=hk_data
)
def handle_adis_gyro_hk( def handle_adis_gyro_hk(
object_id: ObjectIdU32, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes object_id: ObjectIdU32, pw: PrintWrapper, set_id: int, hk_data: bytes
): ):
if set_id == AdisGyroSetId.CORE_HK: if set_id == AdisGyroSetId.CORE_HK:
pw = PrintWrapper(printer)
fmt_str = "!ddddddf" fmt_str = "!ddddddf"
inc_len = struct.calcsize(fmt_str) inc_len = struct.calcsize(fmt_str)
( (
@ -149,7 +143,6 @@ def handle_adis_gyro_hk(
pw.dlog(f"Acceleration (m/s^2): X {accel_x} | Y {accel_y} | Z {accel_z}") pw.dlog(f"Acceleration (m/s^2): X {accel_x} | Y {accel_y} | Z {accel_z}")
pw.dlog(f"Temperature {temp} C") pw.dlog(f"Temperature {temp} C")
if set_id == AdisGyroSetId.CFG_HK: if set_id == AdisGyroSetId.CFG_HK:
pw = PrintWrapper(printer)
fmt_str = "!HBHHH" fmt_str = "!HBHHH"
inc_len = struct.calcsize(fmt_str) inc_len = struct.calcsize(fmt_str)
print(len(hk_data)) print(len(hk_data))
@ -168,10 +161,9 @@ def handle_adis_gyro_hk(
def handle_l3g_gyro_hk( def handle_l3g_gyro_hk(
object_id: ObjectIdU32, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes object_id: ObjectIdU32, pw: PrintWrapper, set_id: int, hk_data: bytes
): ):
if set_id == L3gGyroSetId.CORE_HK: if set_id == L3gGyroSetId.CORE_HK:
pw = PrintWrapper(printer)
fmt_str = "!ffff" fmt_str = "!ffff"
inc_len = struct.calcsize(fmt_str) inc_len = struct.calcsize(fmt_str)
(angVelocX, angVelocY, angVelocZ, temp) = struct.unpack( (angVelocX, angVelocY, angVelocZ, temp) = struct.unpack(

View File

@ -30,7 +30,7 @@ from tmtccmd.tc.pus_3_fsfw_hk import (
) )
from tmtccmd.tc.pus_200_fsfw_mode import pack_mode_data, Mode from tmtccmd.tc.pus_200_fsfw_mode import pack_mode_data, Mode
from tmtccmd.util import ObjectIdU32 from tmtccmd.util import ObjectIdU32
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -426,11 +426,11 @@ ENG_HK_HEADERS = [
] ]
def handle_imtq_hk(printer: FsfwTmTcPrinter, hk_data: bytes, set_id: int): def handle_imtq_hk(pw: PrintWrapper, hk_data: bytes, set_id: int):
if (set_id >= ImtqSetId.POSITIVE_X_TEST) and (set_id <= ImtqSetId.NEGATIVE_Z_TEST): if (set_id >= ImtqSetId.POSITIVE_X_TEST) and (set_id <= ImtqSetId.NEGATIVE_Z_TEST):
return handle_self_test_data(printer, hk_data) return handle_self_test_data(pw, hk_data)
elif set_id == ImtqSetId.ENG_HK_NO_TORQUE: elif set_id == ImtqSetId.ENG_HK_NO_TORQUE:
return handle_eng_set(printer, hk_data, False) return handle_eng_set(pw, hk_data, False)
elif set_id == ImtqSetId.ENG_HK_SET_WITH_TORQUE: elif set_id == ImtqSetId.ENG_HK_SET_WITH_TORQUE:
return handle_eng_set(printer, hk_data, True) return handle_eng_set(printer, hk_data, True)
elif set_id == ImtqSetId.CAL_MTM_SET: elif set_id == ImtqSetId.CAL_MTM_SET:
@ -444,8 +444,9 @@ def handle_imtq_hk(printer: FsfwTmTcPrinter, hk_data: bytes, set_id: int):
elif set_id == ImtqSetId.STATUS_SET: elif set_id == ImtqSetId.STATUS_SET:
return handle_status_set(printer, hk_data) return handle_status_set(printer, hk_data)
else: else:
_LOGGER.warning( pw.wlog(
f"IMTQ handler HK reply with unknown or unimplemented set id {set_id}" _LOGGER,
f"IMTQ handler HK reply with unknown or unimplemented set id {set_id}",
) )
@ -469,7 +470,7 @@ def handle_dipole_set(printer: FsfwTmTcPrinter, hk_data: bytes):
pw.dlog(f"Dipole Y: {dipole_y}") pw.dlog(f"Dipole Y: {dipole_y}")
pw.dlog(f"Dipole Z: {dipole_z}") pw.dlog(f"Dipole Z: {dipole_z}")
pw.dlog(f"Current torque duration: {current_torque_duration}") pw.dlog(f"Current torque duration: {current_torque_duration}")
pw.printer.print_validity_buffer(hk_data[fmt_len:], 2) pw.printer.get_validity_buffer(hk_data[fmt_len:], 2)
def unpack_eng_hk(hk_data: bytes) -> List: def unpack_eng_hk(hk_data: bytes) -> List:
@ -500,8 +501,7 @@ def unpack_eng_hk(hk_data: bytes) -> List:
return content_list return content_list
def handle_eng_set(printer: FsfwTmTcPrinter, hk_data: bytes, torque_on: bool): def handle_eng_set(pw: PrintWrapper, hk_data: bytes, torque_on: bool):
pw = PrintWrapper(printer)
pw.dlog(f"Found engineering HK. Torque Status: {torque_on}") pw.dlog(f"Found engineering HK. Torque Status: {torque_on}")
content_list = unpack_eng_hk(hk_data) content_list = unpack_eng_hk(hk_data)
validity_buffer = hk_data[32:] validity_buffer = hk_data[32:]
@ -509,22 +509,28 @@ def handle_eng_set(printer: FsfwTmTcPrinter, hk_data: bytes, torque_on: bool):
num_of_vars = len(ENG_HK_HEADERS) num_of_vars = len(ENG_HK_HEADERS)
for k, v in zip(ENG_HK_HEADERS, content_list): for k, v in zip(ENG_HK_HEADERS, content_list):
pw.dlog(f"{k.ljust(30)}: {v}") pw.dlog(f"{k.ljust(30)}: {v}")
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=num_of_vars) pw.dlog(
FsfwTmTcPrinter.get_validity_buffer(
validity_buffer=validity_buffer, num_vars=num_of_vars
)
)
def handle_status_set(printer: FsfwTmTcPrinter, hk_data: bytes): def handle_status_set(pw: PrintWrapper, hk_data: bytes):
pw = PrintWrapper(printer)
content_list = unpack_status_set(hk_data) content_list = unpack_status_set(hk_data)
validity_buffer = hk_data[7:] validity_buffer = hk_data[7:]
num_of_vars = 4 num_of_vars = 4
for k, v in zip(STATUS_HEADERS, content_list): for k, v in zip(STATUS_HEADERS, content_list):
pw.dlog(f"{k.ljust(30)}: {v}") pw.dlog(f"{k.ljust(30)}: {v}")
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=num_of_vars) pw.dlog(
FsfwTmTcPrinter.get_validity_buffer(
validity_buffer=validity_buffer, num_vars=num_of_vars
)
)
def handle_calibrated_mtm_measurement(printer: FsfwTmTcPrinter, hk_data: bytes): def handle_calibrated_mtm_measurement(pw: PrintWrapper, hk_data: bytes):
pw = PrintWrapper(printer)
header_list = [ header_list = [
"Calibrated MTM X [nT]", "Calibrated MTM X [nT]",
"Calibrated MTM Y [nT]", "Calibrated MTM Y [nT]",
@ -540,13 +546,14 @@ def handle_calibrated_mtm_measurement(printer: FsfwTmTcPrinter, hk_data: bytes):
num_of_vars = len(header_list) num_of_vars = len(header_list)
pw.dlog(str(header_list)) pw.dlog(str(header_list))
pw.dlog(str(content_list)) pw.dlog(str(content_list))
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=num_of_vars) pw.dlog(
FsfwTmTcPrinter.get_validity_buffer(
validity_buffer=validity_buffer, num_vars=num_of_vars
)
)
def handle_raw_mtm_measurement( def handle_raw_mtm_measurement(pw: PrintWrapper, hk_data: bytes, torque_status: bool):
printer: FsfwTmTcPrinter, hk_data: bytes, torque_status: bool
):
pw = PrintWrapper(printer)
pw.dlog(f"Found raw MTM measurement. Torque Status: {torque_status}") pw.dlog(f"Found raw MTM measurement. Torque Status: {torque_status}")
header_list = [ header_list = [
"Raw MTM X [nT]", "Raw MTM X [nT]",
@ -563,11 +570,14 @@ def handle_raw_mtm_measurement(
num_of_vars = 2 num_of_vars = 2
pw.dlog(str(header_list)) pw.dlog(str(header_list))
pw.dlog(str(content_list)) pw.dlog(str(content_list))
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=num_of_vars) pw.dlog(
FsfwTmTcPrinter.get_validity_buffer(
validity_buffer=validity_buffer, num_vars=num_of_vars
)
)
def handle_self_test_data(printer: FsfwTmTcPrinter, hk_data: bytes): def handle_self_test_data(pw: PrintWrapper, hk_data: bytes):
pw = PrintWrapper(printer)
header_list = [ header_list = [
"Init Err", "Init Err",
"Init Raw Mag X [nT]", "Init Raw Mag X [nT]",
@ -699,4 +709,8 @@ def handle_self_test_data(printer: FsfwTmTcPrinter, hk_data: bytes):
num_of_vars = len(header_list) num_of_vars = len(header_list)
pw.dlog(str(header_list)) pw.dlog(str(header_list))
pw.dlog(str(content_list)) pw.dlog(str(content_list))
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=num_of_vars) pw.dlog(
FsfwTmTcPrinter.get_validity_buffer(
validity_buffer=validity_buffer, num_vars=num_of_vars
)
)

View File

@ -17,7 +17,7 @@ from tmtccmd.config.tmtc import tmtc_definitions_provider, TmtcDefinitionWrapper
from tmtccmd.tc import DefaultPusQueueHelper from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_200_fsfw_mode import create_mode_command, Mode from tmtccmd.tc.pus_200_fsfw_mode import create_mode_command, Mode
from tmtccmd.util import ObjectIdU32 from tmtccmd.util import ObjectIdU32
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter
class OpCode: class OpCode:
@ -64,26 +64,25 @@ def handle_mgm_cmd(q: DefaultPusQueueHelper, op_code: str):
def handle_mgm_hk_data( def handle_mgm_hk_data(
object_id: ObjectIdU32, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes object_id: ObjectIdU32, pw: PrintWrapper, set_id: int, hk_data: bytes
): ):
if object_id.as_bytes in [ if object_id.as_bytes in [
obj_ids.MGM_0_LIS3_HANDLER_ID, obj_ids.MGM_0_LIS3_HANDLER_ID,
obj_ids.MGM_2_LIS3_HANDLER_ID, obj_ids.MGM_2_LIS3_HANDLER_ID,
]: ]:
handle_mgm_lis3_hk_data(object_id, printer, set_id, hk_data) handle_mgm_lis3_hk_data(object_id, pw, set_id, hk_data)
elif object_id.as_bytes in [ elif object_id.as_bytes in [
obj_ids.MGM_1_RM3100_HANDLER_ID, obj_ids.MGM_1_RM3100_HANDLER_ID,
obj_ids.MGM_3_RM3100_HANDLER_ID, obj_ids.MGM_3_RM3100_HANDLER_ID,
]: ]:
handle_mgm_rm3100_hk_data(object_id, printer, set_id, hk_data) handle_mgm_rm3100_hk_data(object_id, pw, set_id, hk_data)
pass pass
def handle_mgm_lis3_hk_data( def handle_mgm_lis3_hk_data(
object_id: ObjectIdU32, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes object_id: ObjectIdU32, pw: PrintWrapper, set_id: int, hk_data: bytes
): ):
if set_id == MgmLis3SetId.CORE_HK: if set_id == MgmLis3SetId.CORE_HK:
pw = PrintWrapper(printer)
fmt_str = "!ffff" fmt_str = "!ffff"
inc_len = struct.calcsize(fmt_str) inc_len = struct.calcsize(fmt_str)
(field_x, field_y, field_z, temp) = struct.unpack( (field_x, field_y, field_z, temp) = struct.unpack(
@ -97,10 +96,9 @@ def handle_mgm_lis3_hk_data(
def handle_mgm_rm3100_hk_data( def handle_mgm_rm3100_hk_data(
object_id: ObjectIdU32, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes object_id: ObjectIdU32, pw: PrintWrapper, set_id: int, hk_data: bytes
): ):
if set_id == MgmRm3100SetId.CORE_HK: if set_id == MgmRm3100SetId.CORE_HK:
pw = PrintWrapper(printer)
fmt_str = f"!fff" fmt_str = f"!fff"
inc_len = struct.calcsize(fmt_str) inc_len = struct.calcsize(fmt_str)
(field_x, field_y, field_z) = struct.unpack(fmt_str, hk_data[0 : 0 + inc_len]) (field_x, field_y, field_z) = struct.unpack(fmt_str, hk_data[0 : 0 + inc_len])

View File

@ -25,7 +25,7 @@ from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.tc.pus_200_fsfw_mode import pack_mode_data, Mode, Subservice from tmtccmd.tc.pus_200_fsfw_mode import pack_mode_data, Mode, Subservice
from eive_tmtc.config.definitions import CustomServiceList from eive_tmtc.config.definitions import CustomServiceList
from tmtccmd.util import ObjectIdU32 from tmtccmd.util import ObjectIdU32
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter
class OpCodesDev: class OpCodesDev:
@ -288,10 +288,9 @@ def pack_set_speed_command(
def handle_rw_hk_data( def handle_rw_hk_data(
printer: FsfwTmTcPrinter, object_id: ObjectIdU32, set_id: int, hk_data: bytes pw: PrintWrapper, object_id: ObjectIdU32, set_id: int, hk_data: bytes
): ):
pw = PrintWrapper(printer)
current_idx = 0 current_idx = 0
if set_id == RwSetId.STATUS_SET_ID: if set_id == RwSetId.STATUS_SET_ID:
pw.dlog( pw.dlog(
@ -316,7 +315,7 @@ def handle_rw_hk_data(
f"Current Limit Control mode {clc_mode}. 0: Low Current Mode (0.3 A), " f"Current Limit Control mode {clc_mode}. 0: Low Current Mode (0.3 A), "
f"1: High Current Mode (0.6 A)" f"1: High Current Mode (0.6 A)"
) )
printer.print_validity_buffer(hk_data[current_idx:], 5) pw.dlog(FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], 5))
if set_id == RwSetId.LAST_RESET: if set_id == RwSetId.LAST_RESET:
pw.dlog( pw.dlog(
f"Received Last Reset HK (ID {set_id}) from Reaction Wheel {object_id.name}" f"Received Last Reset HK (ID {set_id}) from Reaction Wheel {object_id.name}"
@ -401,8 +400,10 @@ def handle_rw_hk_data(
f"{spi_total_num_errors}" f"{spi_total_num_errors}"
) )
if current_idx > 0: if current_idx > 0:
printer.print_validity_buffer( pw.dlog(
validity_buffer=hk_data[current_idx:], num_vars=27 FsfwTmTcPrinter.get_validity_buffer(
validity_buffer=hk_data[current_idx:], num_vars=27
)
) )

View File

@ -20,7 +20,7 @@ from tmtccmd.tc.pus_3_fsfw_hk import create_request_one_diag_command, make_sid
from tmtccmd.tc import DefaultPusQueueHelper from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_200_fsfw_mode import pack_mode_data, Mode from tmtccmd.tc.pus_200_fsfw_mode import pack_mode_data, Mode
from tmtccmd.util import ObjectIdU32 from tmtccmd.util import ObjectIdU32
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter
from eive_tmtc.config.object_ids import STR_ASSEMBLY, STAR_TRACKER_ID from eive_tmtc.config.object_ids import STR_ASSEMBLY, STAR_TRACKER_ID
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -707,8 +707,7 @@ def get_upload_image() -> str:
return image return image
def handle_str_hk_data(set_id: int, hk_data: bytes, printer: FsfwTmTcPrinter): def handle_str_hk_data(set_id: int, hk_data: bytes, pw: PrintWrapper):
pw = PrintWrapper(printer)
pw.dlog(f"Received STR HK set with set ID {set_id}") pw.dlog(f"Received STR HK set with set ID {set_id}")
if set_id == SetId.SOLUTION: if set_id == SetId.SOLUTION:
handle_solution_set(hk_data, pw) handle_solution_set(hk_data, pw)
@ -747,7 +746,7 @@ def handle_temperature_set(hk_data: bytes, pw: PrintWrapper):
pw.dlog(f"CMOS Temperature: {cmos_temp}") pw.dlog(f"CMOS Temperature: {cmos_temp}")
pw.dlog(f"FPGA Temperature: {fpga_temp}") pw.dlog(f"FPGA Temperature: {fpga_temp}")
current_idx += fmt_len current_idx += fmt_len
pw.printer.print_validity_buffer(hk_data[current_idx:], 5) FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], 5)
def handle_solution_set(hk_data: bytes, pw: PrintWrapper): def handle_solution_set(hk_data: bytes, pw: PrintWrapper):
@ -821,7 +820,7 @@ def handle_solution_set(hk_data: bytes, pw: PrintWrapper):
solution_strategy = hk_data[current_idx] solution_strategy = hk_data[current_idx]
pw.dlog(f"Solution strategy: {solution_strategy}") pw.dlog(f"Solution strategy: {solution_strategy}")
current_idx += 1 current_idx += 1
pw.printer.print_validity_buffer(hk_data[current_idx:], 23) FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], 23)
@tmtc_definitions_provider @tmtc_definitions_provider

View File

@ -3,7 +3,7 @@ import struct
from eive_tmtc.pus_tm.defs import PrintWrapper from eive_tmtc.pus_tm.defs import PrintWrapper
from tmtccmd.util import ObjectIdU32 from tmtccmd.util import ObjectIdU32
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter
class SetId(enum.IntEnum): class SetId(enum.IntEnum):
@ -11,9 +11,8 @@ class SetId(enum.IntEnum):
def handle_sus_hk( def handle_sus_hk(
object_id: ObjectIdU32, hk_data: bytes, printer: FsfwTmTcPrinter, set_id: int object_id: ObjectIdU32, hk_data: bytes, pw: PrintWrapper, set_id: int
): ):
pw = PrintWrapper(printer)
pw.dlog(f"Received SUS HK data from {object_id}") pw.dlog(f"Received SUS HK data from {object_id}")
if set_id == SetId.HK: if set_id == SetId.HK:
current_idx = 0 current_idx = 0
@ -27,4 +26,6 @@ def handle_sus_hk(
pw.dlog(f"AIN Channel | Raw Value (hex) | Raw Value (dec)") pw.dlog(f"AIN Channel | Raw Value (hex) | Raw Value (dec)")
for idx, val in enumerate(channels): for idx, val in enumerate(channels):
pw.dlog(f"{idx} | {val[0]:#06x} |" + str(val[0]).ljust(5)) pw.dlog(f"{idx} | {val[0]:#06x} |" + str(val[0]).ljust(5))
printer.print_validity_buffer(validity_buffer=hk_data[current_idx:], num_vars=7) FsfwTmTcPrinter.get_validity_buffer(
validity_buffer=hk_data[current_idx:], num_vars=7
)

View File

@ -31,7 +31,7 @@ from eive_tmtc.config.object_ids import SYRLINKS_HANDLER_ID
import struct import struct
from tmtccmd.util import ObjectIdU32 from tmtccmd.util import ObjectIdU32
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter
class SetId(enum.IntEnum): class SetId(enum.IntEnum):
@ -254,31 +254,28 @@ def pack_syrlinks_command(
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
def handle_syrlinks_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): def handle_syrlinks_hk_data(pw: PrintWrapper, set_id: int, hk_data: bytes):
if set_id == SetId.RX_REGISTERS_DATASET: if set_id == SetId.RX_REGISTERS_DATASET:
return handle_syrlinks_rx_registers_dataset(printer, hk_data) return handle_syrlinks_rx_registers_dataset(pw, hk_data)
elif set_id == SetId.TX_REGISTERS_DATASET: elif set_id == SetId.TX_REGISTERS_DATASET:
return handle_syrlinks_tx_registers_dataset(printer, hk_data) return handle_syrlinks_tx_registers_dataset(pw, hk_data)
elif set_id == SetId.TEMPERATURE_SET_ID: elif set_id == SetId.TEMPERATURE_SET_ID:
return handle_syrlinks_temp_dataset(printer, hk_data) return handle_syrlinks_temp_dataset(pw, hk_data)
else: else:
pw = PrintWrapper(printer)
pw.dlog(f"Service 3 TM: Syrlinks handler reply with unknown set ID {set_id}") pw.dlog(f"Service 3 TM: Syrlinks handler reply with unknown set ID {set_id}")
def handle_syrlinks_temp_dataset(printer: FsfwTmTcPrinter, hk_data: bytes): def handle_syrlinks_temp_dataset(pw: PrintWrapper, hk_data: bytes):
pw = PrintWrapper(printer)
if len(hk_data) < 8: if len(hk_data) < 8:
raise ValueError("expected at least 8 bytes of HK data") raise ValueError("expected at least 8 bytes of HK data")
temp_power_amplifier = struct.unpack("!f", hk_data[0:4])[0] temp_power_amplifier = struct.unpack("!f", hk_data[0:4])[0]
temp_baseband_board = struct.unpack("!f", hk_data[4:8])[0] temp_baseband_board = struct.unpack("!f", hk_data[4:8])[0]
pw.dlog(f"Temperatur Power Amplifier [C]: {temp_power_amplifier}") pw.dlog(f"Temperatur Power Amplifier [C]: {temp_power_amplifier}")
pw.dlog(f"Temperatur Baseband Board [C]: {temp_baseband_board}") pw.dlog(f"Temperatur Baseband Board [C]: {temp_baseband_board}")
printer.print_validity_buffer(hk_data[8:], 2) pw.dlog(FsfwTmTcPrinter.get_validity_buffer(hk_data[8:], 2))
def handle_syrlinks_rx_registers_dataset(printer: FsfwTmTcPrinter, hk_data: bytes): def handle_syrlinks_rx_registers_dataset(pw: PrintWrapper, hk_data: bytes):
pw = PrintWrapper(printer)
header_list = [ header_list = [
"RX Status", "RX Status",
"RX Sensitivity", "RX Sensitivity",
@ -338,7 +335,9 @@ def handle_syrlinks_rx_registers_dataset(printer: FsfwTmTcPrinter, hk_data: byte
validity_buffer = hk_data[22:] validity_buffer = hk_data[22:]
for header, content in zip(header_list, content_list): for header, content in zip(header_list, content_list):
pw.dlog(f"{header}: {content}") pw.dlog(f"{header}: {content}")
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=8) pw.dlog(
FsfwTmTcPrinter.get_validity_buffer(validity_buffer=validity_buffer, num_vars=8)
)
pw.dlog(f"Carrier Detect: {carrier_detect}") pw.dlog(f"Carrier Detect: {carrier_detect}")
pw.dlog(f"Carrier Lock: {carrier_lock}") pw.dlog(f"Carrier Lock: {carrier_lock}")
pw.dlog(f"Data Lock (data clock recovery loop lock status): {data_lock}") pw.dlog(f"Data Lock (data clock recovery loop lock status): {data_lock}")
@ -372,10 +371,9 @@ WAVEFORM_STRINGS = ["OFF", "CW", "QPSK", "0QPSK", "PCM/PM", "PSK/PM", "BPSK"]
def handle_syrlinks_tx_registers_dataset( def handle_syrlinks_tx_registers_dataset(
printer: FsfwTmTcPrinter, pw: PrintWrapper,
hk_data: bytes, hk_data: bytes,
): ):
pw = PrintWrapper(printer)
header_list = ["TX Status Raw", "TX Waveform", "TX AGC value"] header_list = ["TX Status Raw", "TX Waveform", "TX AGC value"]
tx_status = hk_data[0] tx_status = hk_data[0]
""" """
@ -412,7 +410,9 @@ def handle_syrlinks_tx_registers_dataset(
validity_buffer = hk_data[4:] validity_buffer = hk_data[4:]
for header, content in zip(header_list, content_list): for header, content in zip(header_list, content_list):
pw.dlog(f"{header}: {content}") pw.dlog(f"{header}: {content}")
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=3) pw.dlog(
FsfwTmTcPrinter.get_validity_buffer(validity_buffer=validity_buffer, num_vars=3)
)
# pw.dlog(f"TX CONV: {tx_conv!r}") # pw.dlog(f"TX CONV: {tx_conv!r}")
# pw.dlog(f"TX DIFF (differential encoder enable): {tx_diff_encoder_enable}") # pw.dlog(f"TX DIFF (differential encoder enable): {tx_diff_encoder_enable}")
pw.dlog(f"TX Status: {tx_status_status!r}") pw.dlog(f"TX Status: {tx_status_status!r}")

View File

@ -19,7 +19,7 @@ from tmtccmd.tc.pus_20_fsfw_param import (
) )
from tmtccmd.config.tmtc import OpCodeEntry, tmtc_definitions_provider from tmtccmd.config.tmtc import OpCodeEntry, tmtc_definitions_provider
from eive_tmtc.config.object_ids import CORE_CONTROLLER_ID from eive_tmtc.config.object_ids import CORE_CONTROLLER_ID
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -581,9 +581,8 @@ def create_xsc_reboot_cmds(
) )
def handle_core_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): def handle_core_hk_data(pw: PrintWrapper, set_id: int, hk_data: bytes):
if set_id == SetId.HK: if set_id == SetId.HK:
pw = PrintWrapper(printer)
fmt_str = "!fff" fmt_str = "!fff"
inc_len = struct.calcsize(fmt_str) inc_len = struct.calcsize(fmt_str)
(temperature, ps_voltage, pl_voltage) = struct.unpack( (temperature, ps_voltage, pl_voltage) = struct.unpack(
@ -594,13 +593,14 @@ def handle_core_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
f"PL Voltage [mV] {pl_voltage}" f"PL Voltage [mV] {pl_voltage}"
) )
pw.dlog(printout) pw.dlog(printout)
printer.print_validity_buffer(validity_buffer=hk_data[inc_len:], num_vars=3) FsfwTmTcPrinter.get_validity_buffer(
validity_buffer=hk_data[inc_len:], num_vars=3
)
def handle_core_ctrl_action_replies( def handle_core_ctrl_action_replies(
action_id: int, printer: FsfwTmTcPrinter, custom_data: bytes action_id: int, pw: PrintWrapper, custom_data: bytes
): ):
pw = PrintWrapper(printer)
if action_id == ActionId.LIST_DIR_DUMP_DIRECTLY: if action_id == ActionId.LIST_DIR_DUMP_DIRECTLY:
if len(custom_data) < 4: if len(custom_data) < 4:
_LOGGER.warning("Data unexpectedly small") _LOGGER.warning("Data unexpectedly small")

View File

@ -25,7 +25,6 @@ from tmtccmd.tc.decorator import ServiceProviderParams
from eive_tmtc.utility.input_helper import InputHelper from eive_tmtc.utility.input_helper import InputHelper
from tmtccmd.tc.pus_200_fsfw_mode import pack_mode_data, Mode from tmtccmd.tc.pus_200_fsfw_mode import pack_mode_data, Mode
from tmtccmd.tc.pus_8_fsfw_funccmd import create_action_cmd from tmtccmd.tc.pus_8_fsfw_funccmd import create_action_cmd
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -524,8 +523,7 @@ def get_sequence_file() -> str:
return file return file
def handle_ploc_mpsoc_hk_data(printer: FsfwTmTcPrinter, hk_data: bytes, set_id: int): def handle_ploc_mpsoc_hk_data(pw: PrintWrapper, hk_data: bytes, set_id: int):
pw = PrintWrapper(printer)
if set_id == SetId.HK_ID: if set_id == SetId.HK_ID:
fmt_str = "!IBBBBBBB" fmt_str = "!IBBBBBBB"
current_idx = 0 current_idx = 0
@ -640,10 +638,7 @@ class DirElement:
size: int size: int
def handle_mpsoc_data_reply( def handle_mpsoc_data_reply(action_id: int, pw: PrintWrapper, custom_data: bytearray):
action_id: int, printer: FsfwTmTcPrinter, custom_data: bytearray
):
pw = PrintWrapper(printer)
if action_id == ActionId.TM_MEM_READ_RPT: if action_id == ActionId.TM_MEM_READ_RPT:
header_list = [ header_list = [
"PLOC Memory Address", "PLOC Memory Address",
@ -655,20 +650,16 @@ def handle_mpsoc_data_reply(
struct.unpack("!H", custom_data[4:6])[0], struct.unpack("!H", custom_data[4:6])[0],
"0x" + custom_data[6:10].hex(), "0x" + custom_data[6:10].hex(),
] ]
print(header_list) pw.dlog(f"{header_list}")
print(content_list) pw.dlog(f"{content_list}")
printer.file_logger.info(header_list)
printer.file_logger.info(content_list)
elif action_id == ActionId.TM_CAM_CMD_RPT: elif action_id == ActionId.TM_CAM_CMD_RPT:
header_list = ["Camera reply string", "ACK"] header_list = ["Camera reply string", "ACK"]
content_list = [ content_list = [
custom_data[: len(custom_data) - 1].decode("utf-8"), custom_data[: len(custom_data) - 1].decode("utf-8"),
hex(custom_data[-1]), hex(custom_data[-1]),
] ]
print(header_list) pw.dlog(f"{header_list}")
print(content_list) pw.dlog(f"{content_list}")
printer.file_logger.info(header_list)
printer.file_logger.info(content_list)
elif action_id == ActionId.TM_FLASH_DIRECTORY_CONTENT: elif action_id == ActionId.TM_FLASH_DIRECTORY_CONTENT:
if len(custom_data) < 16: if len(custom_data) < 16:
_LOGGER.warning( _LOGGER.warning(

View File

@ -21,7 +21,7 @@ from tmtccmd.tc import service_provider
from tmtccmd.tc.decorator import ServiceProviderParams from tmtccmd.tc.decorator import ServiceProviderParams
from tmtccmd.pus.s200_fsfw_mode import pack_mode_data, Mode from tmtccmd.pus.s200_fsfw_mode import pack_mode_data, Mode
from tmtccmd.pus.s8_fsfw_funccmd import create_action_cmd from tmtccmd.pus.s8_fsfw_funccmd import create_action_cmd
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter
from eive_tmtc.utility.input_helper import InputHelper from eive_tmtc.utility.input_helper import InputHelper
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -721,8 +721,7 @@ def get_event_buffer_path() -> str:
return file return file
def handle_supv_hk_data(set_id: int, hk_data: bytes, printer: FsfwTmTcPrinter): def handle_supv_hk_data(set_id: int, hk_data: bytes, pw: PrintWrapper):
pw = PrintWrapper(printer)
current_idx = 0 current_idx = 0
if set_id == SetIds.HK_REPORT: if set_id == SetIds.HK_REPORT:
pass pass
@ -752,7 +751,7 @@ def handle_supv_hk_data(set_id: int, hk_data: bytes, printer: FsfwTmTcPrinter):
f"BP0 State {bp_0_state} | BP1 State {bp_1_state} | BP2 State {bp_2_state}" f"BP0 State {bp_0_state} | BP1 State {bp_1_state} | BP2 State {bp_2_state}"
) )
pw.dlog(f"Boot State {boot_state} | Boot Cycles {boot_cycles}") pw.dlog(f"Boot State {boot_state} | Boot Cycles {boot_cycles}")
pw.printer.print_validity_buffer(hk_data[current_idx:], 10) FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], 10)
else: else:
pw.dlog(f"PLOC SUPV: HK handling not implemented for set ID {set_id}") pw.dlog(f"PLOC SUPV: HK handling not implemented for set ID {set_id}")
pw.dlog(f"Raw Data: 0x[{hk_data.hex(sep=',')}]") pw.dlog(f"Raw Data: 0x[{hk_data.hex(sep=',')}]")

View File

@ -18,7 +18,7 @@ from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_200_fsfw_mode import Mode, pack_mode_data from tmtccmd.tc.pus_200_fsfw_mode import Mode, pack_mode_data
from tmtccmd.tc.pus_3_fsfw_hk import generate_one_hk_command, make_sid from tmtccmd.tc.pus_3_fsfw_hk import generate_one_hk_command, make_sid
from tmtccmd.util import ObjectIdU32 from tmtccmd.util import ObjectIdU32
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter
class SetId(enum.IntEnum): class SetId(enum.IntEnum):
@ -100,9 +100,8 @@ def rad_sensor_mode_cmd(
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data)) q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data))
def handle_rad_sensor_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): def handle_rad_sensor_data(pw: PrintWrapper, set_id: int, hk_data: bytes):
if set_id == SetId.HK: if set_id == SetId.HK:
pw = PrintWrapper(printer)
current_idx = 0 current_idx = 0
pw.dlog("Received Radiation Sensor HK data") pw.dlog("Received Radiation Sensor HK data")
fmt_str = "!fHHHHHH" fmt_str = "!fHHHHHH"
@ -116,4 +115,6 @@ def handle_rad_sensor_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes
for idx, val in ain_dict.items(): for idx, val in ain_dict.items():
pw.dlog(f"{idx} | {val:#06x} | {str(val).ljust(5)}") pw.dlog(f"{idx} | {val:#06x} | {str(val).ljust(5)}")
current_idx += inc_len current_idx += inc_len
printer.print_validity_buffer(validity_buffer=hk_data[current_idx:], num_vars=7) FsfwTmTcPrinter.get_validity_buffer(
validity_buffer=hk_data[current_idx:], num_vars=7
)

View File

@ -17,7 +17,7 @@ from tmtccmd.pus.s8_fsfw_funccmd import create_action_cmd
from tmtccmd.tc.pus_3_fsfw_hk import generate_one_hk_command, make_sid from tmtccmd.tc.pus_3_fsfw_hk import generate_one_hk_command, make_sid
from tmtccmd.tc.pus_200_fsfw_mode import pack_mode_data, Mode from tmtccmd.tc.pus_200_fsfw_mode import pack_mode_data, Mode
from tmtccmd.tc.pus_200_fsfw_mode import Subservice as ModeSubservices from tmtccmd.tc.pus_200_fsfw_mode import Subservice as ModeSubservices
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter
class BpxSetId(enum.IntEnum): class BpxSetId(enum.IntEnum):
@ -126,8 +126,7 @@ HEADER_LIST = [
] ]
def handle_bpx_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): def handle_bpx_hk_data(pw: PrintWrapper, set_id: int, hk_data: bytes):
pw = PrintWrapper(printer)
if set_id == BpxSetId.GET_HK_SET: if set_id == BpxSetId.GET_HK_SET:
fmt_str = "!HHHHhhhhIB" fmt_str = "!HHHHhhhhIB"
inc_len = struct.calcsize(fmt_str) inc_len = struct.calcsize(fmt_str)
@ -158,7 +157,9 @@ def handle_bpx_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
validity_buffer = hk_data[inc_len:] validity_buffer = hk_data[inc_len:]
pw.dlog(str(HEADER_LIST)) pw.dlog(str(HEADER_LIST))
pw.dlog(str(content_list)) pw.dlog(str(content_list))
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=10) FsfwTmTcPrinter.get_validity_buffer(
validity_buffer=validity_buffer, num_vars=10
)
elif set_id == BpxSetId.GET_CFG_SET: elif set_id == BpxSetId.GET_CFG_SET:
battheat_mode = hk_data[0] battheat_mode = hk_data[0]
battheat_low = struct.unpack("!b", hk_data[1:2])[0] battheat_low = struct.unpack("!b", hk_data[1:2])[0]
@ -172,4 +173,6 @@ def handle_bpx_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
validity_buffer = hk_data[3:] validity_buffer = hk_data[3:]
pw.dlog(str(header_list)) pw.dlog(str(header_list))
pw.dlog(str(content_list)) pw.dlog(str(content_list))
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=10) FsfwTmTcPrinter.get_validity_buffer(
validity_buffer=validity_buffer, num_vars=10
)

View File

@ -22,13 +22,13 @@ from tmtccmd.tc.pus_11_tc_sched import (
) )
from tmtccmd.tc.pus_200_fsfw_mode import pack_mode_data, Mode, Subservice from tmtccmd.tc.pus_200_fsfw_mode import pack_mode_data, Mode, Subservice
from tmtccmd.tc.pus_20_fsfw_param import ( from tmtccmd.tc.pus_20_fsfw_param import (
pack_scalar_double_param_app_data, create_scalar_double_parameter,
create_load_param_cmd, create_load_param_cmd,
pack_boolean_parameter_app_data, create_scalar_boolean_parameter,
) )
from spacepackets.ecss.tc import PusTelecommand from spacepackets.ecss.tc import PusTelecommand
from eive_tmtc.config.object_ids import PL_PCDU_ID from eive_tmtc.config.object_ids import PL_PCDU_ID
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -423,21 +423,21 @@ def pack_wait_time_cmd(q: DefaultPusQueueHelper, param_id: int, print_str: str):
q.add_log_cmd(f"Updating {print_str} wait time to {wait_time}") q.add_log_cmd(f"Updating {print_str} wait time to {wait_time}")
if wait_time is None: if wait_time is None:
return return
param_data = pack_scalar_double_param_app_data( param_data = create_scalar_double_parameter(
object_id=PL_PCDU_ID, object_id=PL_PCDU_ID,
domain_id=0, domain_id=0,
unique_id=param_id, unique_id=param_id,
parameter=wait_time, parameter=wait_time,
) )
q.add_pus_tc(create_load_param_cmd(app_data=param_data)) q.add_pus_tc(create_load_param_cmd(param_data))
def pack_failure_injection_cmd(q: DefaultPusQueueHelper, param_id: int, print_str: str): def pack_failure_injection_cmd(q: DefaultPusQueueHelper, param_id: int, print_str: str):
q.add_log_cmd(f"Inserting {print_str} error") q.add_log_cmd(f"Inserting {print_str} error")
param_data = pack_boolean_parameter_app_data( param_data = create_scalar_boolean_parameter(
object_id=PL_PCDU_ID, domain_id=0, unique_id=param_id, parameter=True object_id=PL_PCDU_ID, domain_id=0, unique_id=param_id, parameter=True
) )
q.add_pus_tc(create_load_param_cmd(app_data=param_data)) q.add_pus_tc(create_load_param_cmd(param_data))
def pack_pl_pcdu_mode_cmd( def pack_pl_pcdu_mode_cmd(
@ -468,9 +468,8 @@ ADC_CHANNELS_NAMED = [
] ]
def handle_plpcdu_hk(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): def handle_plpcdu_hk(pw: PrintWrapper, set_id: int, hk_data: bytes):
if set_id == SetId.ADC: if set_id == SetId.ADC:
pw = PrintWrapper(printer)
current_idx = 0 current_idx = 0
pw.dlog("Received PL PCDU ADC HK data") pw.dlog("Received PL PCDU ADC HK data")
channels = [] channels = []
@ -496,4 +495,6 @@ def handle_plpcdu_hk(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
pw.dlog(ch_print) pw.dlog(ch_print)
for i in range(12): for i in range(12):
pw.dlog(f"{ADC_CHANNELS_NAMED[i].ljust(24)} | {processed_vals[i]}") pw.dlog(f"{ADC_CHANNELS_NAMED[i].ljust(24)} | {processed_vals[i]}")
printer.print_validity_buffer(validity_buffer=hk_data[current_idx:], num_vars=3) FsfwTmTcPrinter.get_validity_buffer(
validity_buffer=hk_data[current_idx:], num_vars=3
)

View File

@ -9,7 +9,7 @@ from eive_tmtc.tmtc.power.common_power import (
) )
from eive_tmtc.tmtc.power.power import PcduSetIds from eive_tmtc.tmtc.power.power import PcduSetIds
from tmtccmd.util import ObjectIdBase from tmtccmd.util import ObjectIdBase
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter
from eive_tmtc.pus_tm.defs import PrintWrapper from eive_tmtc.pus_tm.defs import PrintWrapper
from eive_tmtc.gomspace.gomspace_common import GomspaceDeviceActionId from eive_tmtc.gomspace.gomspace_common import GomspaceDeviceActionId
from eive_tmtc.config.object_ids import ( from eive_tmtc.config.object_ids import (
@ -146,10 +146,7 @@ class DevicesInfoParser:
return "Unknown Type" return "Unknown Type"
def handle_pdu_data( def handle_pdu_data(pw: PrintWrapper, pdu_idx: int, set_id: int, hk_data: bytes):
printer: FsfwTmTcPrinter, pdu_idx: int, set_id: int, hk_data: bytes
):
pw = PrintWrapper(printer=printer)
current_idx = 0 current_idx = 0
priv_idx = pdu_idx - 1 priv_idx = pdu_idx - 1
if set_id == SetId.AUX or set_id == SetId.AUX: if set_id == SetId.AUX or set_id == SetId.AUX:
@ -225,8 +222,7 @@ def handle_pdu_data(
pw.dlog(info) pw.dlog(info)
def handle_p60_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): def handle_p60_hk_data(pw: PrintWrapper, set_id: int, hk_data: bytes):
pw = PrintWrapper(printer=printer)
if set_id == SetId.CORE: if set_id == SetId.CORE:
pw.dlog("Received P60 Core HK. Voltages in mV, currents in mA") pw.dlog("Received P60 Core HK. Voltages in mV, currents in mA")
current_idx = 0 current_idx = 0
@ -273,7 +269,9 @@ def handle_p60_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
temps = f"In C: Temp 0 {temp_0} | Temp 1 {temp_1} | " temps = f"In C: Temp 0 {temp_0} | Temp 1 {temp_1} | "
pw.dlog(temps) pw.dlog(temps)
pw.dlog(batt_info) pw.dlog(batt_info)
printer.print_validity_buffer(validity_buffer=hk_data[current_idx:], num_vars=9) FsfwTmTcPrinter.get_validity_buffer(
validity_buffer=hk_data[current_idx:], num_vars=9
)
if set_id == SetId.AUX: if set_id == SetId.AUX:
pw.dlog("Received P60 AUX HK. Voltages in mV, currents in mA") pw.dlog("Received P60 AUX HK. Voltages in mV, currents in mA")
current_idx = 0 current_idx = 0
@ -338,7 +336,7 @@ def handle_p60_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
"6:TempSens(BatPack)|7:TempSens(BatPack)" "6:TempSens(BatPack)|7:TempSens(BatPack)"
) )
dev_parser.print(pw=pw) dev_parser.print(pw=pw)
printer.print_validity_buffer( FsfwTmTcPrinter.get_validity_buffer(
validity_buffer=hk_data[current_idx:], num_vars=27 validity_buffer=hk_data[current_idx:], num_vars=27
) )
@ -351,8 +349,7 @@ def gen_six_entry_u16_list(hk_data: bytes, current_idx: int) -> Tuple[int, List[
return current_idx, u16_list return current_idx, u16_list
def handle_acu_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): def handle_acu_hk_data(pw: PrintWrapper, set_id: int, hk_data: bytes):
pw = PrintWrapper(printer=printer)
if set_id == SetId.CORE: if set_id == SetId.CORE:
mppt_mode = hk_data[0] mppt_mode = hk_data[0]
current_idx = 1 current_idx = 1
@ -394,7 +391,7 @@ def handle_acu_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
f"Boot Count {bootcnt} | Uptime {uptime} sec | " f"Boot Count {bootcnt} | Uptime {uptime} sec | "
f"MPPT Time {mppt_time} msec | MPPT Period {mppt_period} msec" f"MPPT Time {mppt_time} msec | MPPT Period {mppt_period} msec"
) )
printer.print_validity_buffer( FsfwTmTcPrinter.get_validity_buffer(
validity_buffer=hk_data[current_idx:], num_vars=12 validity_buffer=hk_data[current_idx:], num_vars=12
) )
if set_id == SetId.AUX: if set_id == SetId.AUX:
@ -431,7 +428,9 @@ def handle_acu_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
f"5:DAC|6:TempSens|7:Reserved" f"5:DAC|6:TempSens|7:Reserved"
) )
dev_parser.print(pw=pw) dev_parser.print(pw=pw)
printer.print_validity_buffer(validity_buffer=hk_data[current_idx:], num_vars=8) FsfwTmTcPrinter.get_validity_buffer(
validity_buffer=hk_data[current_idx:], num_vars=8
)
def handle_get_param_data_reply( def handle_get_param_data_reply(
@ -554,8 +553,7 @@ def parse_name_list(data: bytes, name_len: int):
return ch_list return ch_list
def handle_pcdu_hk(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): def handle_pcdu_hk(pw: PrintWrapper, set_id: int, hk_data: bytes):
pw = PrintWrapper(printer)
pw.dlog("Received PCDU HK") pw.dlog("Received PCDU HK")
if set_id == PcduSetIds.SWITCHER_SET: if set_id == PcduSetIds.SWITCHER_SET:
current_idx = 0 current_idx = 0
@ -575,4 +573,4 @@ def handle_pcdu_hk(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
pw.dlog(f"{name.ljust(25)}: {val}") pw.dlog(f"{name.ljust(25)}: {val}")
pw.dlog(f"{'P60 Dock 5V Stack'.ljust(25)}: {p60_stack_5v_val}") pw.dlog(f"{'P60 Dock 5V Stack'.ljust(25)}: {p60_stack_5v_val}")
pw.dlog(f"{'P60 Dock 3V3 Stack'.ljust(25)}: {p60_stack_3v3_val}") pw.dlog(f"{'P60 Dock 3V3 Stack'.ljust(25)}: {p60_stack_3v3_val}")
pw.printer.print_validity_buffer(hk_data[current_idx:], 4) FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], 4)

View File

@ -13,7 +13,7 @@ from tmtccmd.util import ObjectIdU32
from tmtccmd.tc.pus_200_fsfw_mode import Mode, pack_mode_data, Subservice from tmtccmd.tc.pus_200_fsfw_mode import Mode, pack_mode_data, Subservice
import eive_tmtc.config.object_ids as oids import eive_tmtc.config.object_ids as oids
from eive_tmtc.config.object_ids import get_object_ids from eive_tmtc.config.object_ids import get_object_ids
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter
RTD_IDS = [ RTD_IDS = [
oids.RTD_0_PLOC_HSPD, oids.RTD_0_PLOC_HSPD,
@ -147,8 +147,7 @@ def pack_rtd_commands(
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
def handle_rtd_hk(object_id: bytes, hk_data: bytes, printer: FsfwTmTcPrinter): def handle_rtd_hk(object_id: bytes, hk_data: bytes, pw: PrintWrapper):
pw = PrintWrapper(printer)
rtd_name = RTD_NAMES.get(object_id) rtd_name = RTD_NAMES.get(object_id)
if rtd_name is None: if rtd_name is None:
rtd_name = "unknown RTD device" rtd_name = "unknown RTD device"
@ -162,7 +161,7 @@ def handle_rtd_hk(object_id: bytes, hk_data: bytes, printer: FsfwTmTcPrinter):
pw.dlog(f"RTD Value: {rtd_val}") pw.dlog(f"RTD Value: {rtd_val}")
pw.dlog(f"Error Byte: {error_byte}") pw.dlog(f"Error Byte: {error_byte}")
pw.dlog(f"Last Error Byte: {last_err_byte}") pw.dlog(f"Last Error Byte: {last_err_byte}")
pw.printer.print_validity_buffer(hk_data[fmt_len:], 4) FsfwTmTcPrinter.get_validity_buffer(hk_data[fmt_len:], 4)
def prompt_rtd_idx(): def prompt_rtd_idx():

View File

@ -5,7 +5,7 @@ import struct
from eive_tmtc.pus_tm.defs import PrintWrapper from eive_tmtc.pus_tm.defs import PrintWrapper
from tmtccmd.fsfw import validity_buffer_list from tmtccmd.fsfw import validity_buffer_list
from tmtccmd.util import ObjectIdU32 from tmtccmd.util import ObjectIdU32
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter
from .defs import CtrlSetId from .defs import CtrlSetId
from .heater import HEATER_LOCATION from .heater import HEATER_LOCATION
@ -14,7 +14,7 @@ _LOGGER = logging.getLogger(__name__)
def handle_thermal_controller_hk_data( def handle_thermal_controller_hk_data(
object_id: ObjectIdU32, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes object_id: ObjectIdU32, pw: PrintWrapper, set_id: int, hk_data: bytes
): ):
# need a better solutuon for this is this is used again.. # need a better solutuon for this is this is used again..
""" """
@ -24,7 +24,6 @@ def handle_thermal_controller_hk_data(
TCP_TEMP_DEV_SERVER = TmTcpServer("localhost", 7306) TCP_TEMP_DEV_SERVER = TmTcpServer("localhost", 7306)
""" """
if set_id == CtrlSetId.PRIMARY_SENSORS: if set_id == CtrlSetId.PRIMARY_SENSORS:
pw = PrintWrapper(printer)
pw.dlog("Received sensor temperature data") pw.dlog("Received sensor temperature data")
# get all the floats # get all the floats
@ -58,7 +57,6 @@ def handle_thermal_controller_hk_data(
for idx, (k, v) in enumerate(parsed_data.items()): for idx, (k, v) in enumerate(parsed_data.items()):
print(f"{str(k).ljust(30)}: Valid: {valid_list[idx]}, Value: {v}") print(f"{str(k).ljust(30)}: Valid: {valid_list[idx]}, Value: {v}")
elif set_id == CtrlSetId.DEVICE_SENSORS: elif set_id == CtrlSetId.DEVICE_SENSORS:
pw = PrintWrapper(printer)
pw.dlog("Received device temperature data") pw.dlog("Received device temperature data")
fmt_str = "!fhhhhiiiifffhffffffffffffff" fmt_str = "!fhhhhiiiifffhffffffffffffff"
fmt_len = struct.calcsize(fmt_str) fmt_len = struct.calcsize(fmt_str)
@ -94,7 +92,6 @@ def handle_thermal_controller_hk_data(
for idx, (k, v) in enumerate(parsed_data.items()): for idx, (k, v) in enumerate(parsed_data.items()):
print(f"{str(k).ljust(30)}: Valid: {valid_list[idx]}, Value: {v}") print(f"{str(k).ljust(30)}: Valid: {valid_list[idx]}, Value: {v}")
elif set_id == CtrlSetId.SUS_TEMP_SENSORS: elif set_id == CtrlSetId.SUS_TEMP_SENSORS:
pw = PrintWrapper(printer)
pw.dlog("Received SUS temperature data") pw.dlog("Received SUS temperature data")
fmt_str = "!ffffffffffff" fmt_str = "!ffffffffffff"
fmt_len = struct.calcsize(fmt_str) fmt_len = struct.calcsize(fmt_str)

View File

@ -56,7 +56,7 @@ except ImportError as error:
from spacepackets.ecss import PusVerificator from spacepackets.ecss import PusVerificator
from tmtccmd import TcHandlerBase, BackendBase from tmtccmd import TcHandlerBase, BackendBase
from tmtccmd.util import FileSeqCountProvider, PusFileSeqCountProvider from tmtccmd.util import FileSeqCountProvider, PusFileSeqCountProvider
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter
from tmtccmd.logging.pus import ( from tmtccmd.logging.pus import (
RawTmtcTimedLogWrapper, RawTmtcTimedLogWrapper,