diff --git a/pus_tc/system/acs_ctrl.py b/pus_tc/system/acs_ctrl.py index 63972b2..01a8daf 100644 --- a/pus_tc/system/acs_ctrl.py +++ b/pus_tc/system/acs_ctrl.py @@ -1,12 +1,18 @@ import enum +import struct from config.definitions import CustomServiceList from config.object_ids import ACS_CONTROLLER +from pus_tm.defs import PrintWrapper from tmtccmd import DefaultProcedureInfo, get_console_logger -from tmtccmd.config.tmtc import tmtc_definitions_provider, TmtcDefinitionWrapper, OpCodeEntry +from tmtccmd.config.tmtc import ( + tmtc_definitions_provider, + TmtcDefinitionWrapper, + OpCodeEntry, +) from tmtccmd.tc import DefaultPusQueueHelper, service_provider from tmtccmd.tc.pus_3_fsfw_hk import generate_one_hk_command, make_sid - +from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter LOGGER = get_console_logger() @@ -34,17 +40,13 @@ def acs_cmd_defs(defs: TmtcDefinitionWrapper): oce.add(keys=OpCodes.ENABLE_MGM_HK, info=Info.ENABLE_MGM_HK) oce.add(keys=OpCodes.DISABLE_MGM_HK, info=Info.DISABLE_MGM_HK) defs.add_service( - name=CustomServiceList.ACS_CTRL.value, - info="ACS Controller", - op_code_entry=oce + name=CustomServiceList.ACS_CTRL.value, info="ACS Controller", op_code_entry=oce ) @service_provider(CustomServiceList.ACS_CTRL.value) def pack_acs_ctrl_command( - _info: DefaultProcedureInfo, - q: DefaultPusQueueHelper, - op_code: str + _info: DefaultProcedureInfo, q: DefaultPusQueueHelper, op_code: str ): if op_code in OpCodes.REQUEST_MGM_HK: sid = make_sid(ACS_CONTROLLER, SetIds.MGM_SET) @@ -52,3 +54,64 @@ def pack_acs_ctrl_command( q.add_pus_tc(generate_one_hk_command(sid)) else: LOGGER.info(f"Unknown op code {op_code}") + + +def handle_acs_ctrl_mgm_data(printer: FsfwTmTcPrinter, hk_data: bytes): + print("Received ACS CTRL data") + current_idx = 0 + pw = PrintWrapper(printer) + + if len(hk_data) < 61: + pw.dlog( + f"Received MGM HK data with length {len(hk_data)} shorter than expected 61 bytes" + ) + pw.dlog(f"Raw Data: {hk_data.hex(sep=',')}") + return + + def unpack_float_tuple(idx: int) -> (tuple, int): + f_tuple = struct.unpack( + float_tuple_fmt_str, + hk_data[idx : idx + struct.calcsize(float_tuple_fmt_str)], + ) + idx += struct.calcsize(float_tuple_fmt_str) + return f_tuple, idx + + float_tuple_fmt_str = "!fff" + mgm_0_lis3_floats_ut, current_idx = unpack_float_tuple(current_idx) + mgm_1_rm3100_floats_ut, current_idx = unpack_float_tuple(current_idx) + mgm_2_lis3_floats_ut, current_idx = unpack_float_tuple(current_idx) + mgm_3_rm3100_floats_ut, current_idx = unpack_float_tuple(current_idx) + i32_tuple_fmt = "!iii" + imtq_mgm_nt = struct.unpack( + i32_tuple_fmt, + hk_data[current_idx : current_idx + struct.calcsize(i32_tuple_fmt)], + ) + imtq_mgm_ut = tuple(val / 1000.0 for val in imtq_mgm_nt) + current_idx += struct.calcsize(i32_tuple_fmt) + print("MGM values [X,Y,Z] in floating point uT: ") + mgm_lists = [ + mgm_0_lis3_floats_ut, + mgm_1_rm3100_floats_ut, + mgm_2_lis3_floats_ut, + mgm_3_rm3100_floats_ut, + imtq_mgm_ut, + ] + formatted_list = [] + # Reserve 8 decimal digits, use precision 3 + float_str_fmt = "[{:8.3f}, {:8.3f}, {:8.3f}]" + for mgm_entry in mgm_lists[0:4]: + formatted_list.append(float_str_fmt.format(*mgm_entry)) + formatted_list.append(hk_data[current_idx]) + formatted_list.append(float_str_fmt.format(*mgm_lists[4])) + print_str_list = [ + "ACS Board MGM 0 LIS3MDL", + "ACS Board MGM 1 RM3100", + "ACS Board MGM 2 LIS3MDL", + "ACS Board MGM 3 RM3100", + "IMTQ Actuation Status:", + "IMTQ MGM:", + ] + for entry in zip(print_str_list, formatted_list): + pw.dlog(f"{entry[0].ljust(28)}: {entry[1]}") + current_idx += 1 + assert current_idx == 61 diff --git a/pus_tm/hk_handling.py b/pus_tm/hk_handling.py index 109a222..8a7e022 100644 --- a/pus_tm/hk_handling.py +++ b/pus_tm/hk_handling.py @@ -1,5 +1,5 @@ """HK Handling for EIVE OBSW""" - +from pus_tc.system.acs_ctrl import handle_acs_ctrl_mgm_data from pus_tm.devs.plpcdu import handle_plpcdu_hk from pus_tm.devs.rad_sensor import handle_rad_sensor_data from pus_tm.devs.sus import handle_sus_hk @@ -161,8 +161,7 @@ def handle_regular_hk_print( object_id=object_id, printer=printer, set_id=set_id, hk_data=hk_data ) elif objb == obj_ids.ACS_CONTROLLER: - print("Received ACS CTRL data") - print(hk_data.hex(sep=',')) + handle_acs_ctrl_mgm_data(printer, hk_data) else: LOGGER.info( f"Service 3 TM: Parsing for object {object_id} and set ID {set_id} "