import enum import struct from eive_tmtc.config.definitions import CustomServiceList from tmtccmd.config import OpCodeEntry import eive_tmtc.config.object_ids as obj_ids from eive_tmtc.config.object_ids import ( MGM_0_LIS3_HANDLER_ID, MGM_1_RM3100_HANDLER_ID, MGM_2_LIS3_HANDLER_ID, MGM_3_RM3100_HANDLER_ID, ) from eive_tmtc.pus_tm.defs import PrintWrapper from tmtccmd.config.tmtc import tmtc_definitions_provider, TmtcDefinitionWrapper from tmtccmd.tc import DefaultPusQueueHelper from tmtccmd.tc.pus_200_fsfw_mode import create_mode_command, Mode from tmtccmd.util import ObjectIdU32 from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter class OpCode: NORMAL = "normal" OFF = "off" class MgmLis3SetId(enum.IntEnum): CORE_HK = 0 class MgmRm3100SetId(enum.IntEnum): CORE_HK = 0 class MgmSel(enum.IntEnum): MGM_0_LIS3 = 0 MGM_1_RM3100 = 1 MGM_2_LIS3 = 2 MGM_3_RM3100 = 3 MGM_SEL_DICT = { MgmSel.MGM_0_LIS3: ("MGM_0_LIS3", MGM_0_LIS3_HANDLER_ID), MgmSel.MGM_1_RM3100: ("MGM_1_RM3100", MGM_1_RM3100_HANDLER_ID), MgmSel.MGM_2_LIS3: ("MGM_2_LIS3", MGM_2_LIS3_HANDLER_ID), MgmSel.MGM_3_RM3100: ("MGM_3_RM3100", MGM_3_RM3100_HANDLER_ID), } def handle_mgm_cmd(q: DefaultPusQueueHelper, op_code: str): print("Please select the MGM Device") for (k, v) in MGM_SEL_DICT.items(): print(f"{k}: {v[0]}") sel_idx = int(input("Select MGM device by index: ")) mgm_info = MGM_SEL_DICT[MgmSel(sel_idx)] mgm_obj_id = mgm_info[1] if op_code == OpCode.NORMAL: q.add_log_cmd(f"Gyro {mgm_info[0]} NORMAL mode") q.add_pus_tc(create_mode_command(mgm_obj_id, Mode.NORMAL, 0)) if op_code == OpCode.OFF: q.add_log_cmd(f"Gyro {mgm_info[0]} OFF mode") q.add_pus_tc(create_mode_command(mgm_obj_id, Mode.OFF, 0)) def handle_mgm_hk_data( object_id: ObjectIdU32, pw: PrintWrapper, set_id: int, hk_data: bytes ): if object_id.as_bytes in [ obj_ids.MGM_0_LIS3_HANDLER_ID, obj_ids.MGM_2_LIS3_HANDLER_ID, ]: handle_mgm_lis3_hk_data(object_id, pw, set_id, hk_data) elif object_id.as_bytes in [ obj_ids.MGM_1_RM3100_HANDLER_ID, obj_ids.MGM_3_RM3100_HANDLER_ID, ]: handle_mgm_rm3100_hk_data(object_id, pw, set_id, hk_data) pass def handle_mgm_lis3_hk_data( object_id: ObjectIdU32, pw: PrintWrapper, set_id: int, hk_data: bytes ): if set_id == MgmLis3SetId.CORE_HK: fmt_str = "!ffff" inc_len = struct.calcsize(fmt_str) (field_x, field_y, field_z, temp) = struct.unpack( fmt_str, hk_data[0 : 0 + inc_len] ) pw.dlog(f"Received MGM LIS3 from object {object_id}") pw.dlog( f"Field strengths in micro Tesla X {field_x} | Y {field_y} | Z {field_z}" ) pw.dlog(f"Temperature {temp} C") def handle_mgm_rm3100_hk_data( object_id: ObjectIdU32, pw: PrintWrapper, set_id: int, hk_data: bytes ): if set_id == MgmRm3100SetId.CORE_HK: fmt_str = f"!fff" inc_len = struct.calcsize(fmt_str) (field_x, field_y, field_z) = struct.unpack(fmt_str, hk_data[0 : 0 + inc_len]) pw.dlog(f"Received MGM RM3100 from object {object_id}") pw.dlog( f"Field strengths in micro Tesla X {field_x} | Y {field_y} | Z {field_z}" ) @tmtc_definitions_provider def add_mgm_cmd_defs(defs: TmtcDefinitionWrapper): oce = OpCodeEntry() oce.add(keys=OpCode.NORMAL, info="Normal Mode") oce.add(keys=OpCode.OFF, info="Off Mode") defs.add_service(CustomServiceList.MGMS, info="MGMs", op_code_entry=oce)