import enum import logging import struct from spacepackets.ecss import PusTelecommand from tmtccmd.config import CmdTreeNode from tmtccmd.tmtc import DefaultPusQueueHelper from tmtccmd.pus.s201_fsfw_health import pack_set_health_cmd_data, FsfwHealth from tmtccmd.pus.s201_fsfw_health import Subservice import eive_tmtc.config.object_ids as obj_ids from tmtccmd.pus.s200_fsfw_mode import create_mode_command, Mode from tmtccmd.pus.tc.s3_fsfw_hk import create_request_one_hk_command, make_sid from tmtccmd.config.tmtc import ( tmtc_definitions_provider, OpCodeEntry, TmtcDefinitionWrapper, ) from eive_tmtc.config.object_ids import ( GYRO_0_ADIS_HANDLER_ID, GYRO_1_L3G_HANDLER_ID, GYRO_2_ADIS_HANDLER_ID, GYRO_3_L3G_HANDLER_ID, ) from eive_tmtc.config.definitions import CustomServiceList from eive_tmtc.pus_tm.defs import PrintWrapper from tmtccmd.util import ObjectIdU32 class OpCode: NML = "normal" OFF = "off" CORE_HK = "core_hk" CFG_HK = "cfg_hk" SET_FAULTY = "set_faulty" class AdisGyroSetId(enum.IntEnum): CORE_HK = 0 CFG_HK = 1 class L3gGyroSetId(enum.IntEnum): CORE_HK = 0 class GyrSel(enum.IntEnum): GYR_0_ADIS = 0 GYR_1_L3G = 1 GYR_2_ADIS = 2 GYR_3_L3G = 3 GYR_SEL_DICT = { GyrSel.GYR_0_ADIS: ("GYRO_0_ADIS", GYRO_0_ADIS_HANDLER_ID), GyrSel.GYR_1_L3G: ("GYRO_1_L3G", GYRO_1_L3G_HANDLER_ID), GyrSel.GYR_2_ADIS: ("GYRO_2_ADIS", GYRO_2_ADIS_HANDLER_ID), GyrSel.GYR_3_L3G: ("GYRO_3_L3G", GYRO_3_L3G_HANDLER_ID), } def handle_gyr_cmd(q: DefaultPusQueueHelper, cmd_str: str): print("Please select the Gyro Device") for k, v in GYR_SEL_DICT.items(): print(f"{k}: {v[0]}") sel_idx = int(input("Select gyro device by index: ")) gyr_info = GYR_SEL_DICT[GyrSel(sel_idx)] gyr_obj_id = gyr_info[1] is_adis = False if sel_idx == GyrSel.GYR_0_ADIS or sel_idx == GyrSel.GYR_2_ADIS: is_adis = True core_hk_id = AdisGyroSetId.CORE_HK else: core_hk_id = L3gGyroSetId.CORE_HK if cmd_str == OpCode.NML: q.add_log_cmd(f"Gyro {gyr_info[0]} NORMAL mode") q.add_pus_tc(create_mode_command(gyr_obj_id, Mode.NORMAL, 0)) if cmd_str == OpCode.OFF: q.add_log_cmd(f"Gyro {gyr_info[0]} OFF mode") q.add_pus_tc(create_mode_command(gyr_obj_id, Mode.OFF, 0)) elif cmd_str == OpCode.CORE_HK: q.add_log_cmd(f"Gyro {gyr_info[0]} Core HK") q.add_pus_tc(create_request_one_hk_command(make_sid(gyr_obj_id, core_hk_id))) elif cmd_str == OpCode.CFG_HK: if not is_adis: raise ValueError("No config HK for L3 device") q.add_log_cmd(f"Gyro {gyr_info[0]} CFG HK") q.add_pus_tc( create_request_one_hk_command(make_sid(gyr_obj_id, AdisGyroSetId.CFG_HK)) ) elif cmd_str == OpCode.SET_FAULTY: q.add_log_cmd(f"Gyro {gyr_info[0]} set faulty") q.add_pus_tc( PusTelecommand( service=201, subservice=Subservice.TC_SET_HEALTH, app_data=pack_set_health_cmd_data( object_id=gyr_obj_id, health=FsfwHealth.FAULTY ), ) ) else: logging.getLogger(__name__).warning( f"invalid op code {cmd_str} for gyro command" ) def handle_gyros_hk_data( object_id: ObjectIdU32, pw: PrintWrapper, set_id: int, hk_data: bytes ): if object_id.as_bytes in [ obj_ids.GYRO_0_ADIS_HANDLER_ID, obj_ids.GYRO_2_ADIS_HANDLER_ID, ]: handle_adis_gyro_hk(object_id=object_id, pw=pw, set_id=set_id, hk_data=hk_data) elif object_id.as_bytes in [ obj_ids.GYRO_1_L3G_HANDLER_ID, obj_ids.GYRO_3_L3G_HANDLER_ID, ]: handle_l3g_gyro_hk(object_id=object_id, pw=pw, set_id=set_id, hk_data=hk_data) def handle_adis_gyro_hk( object_id: ObjectIdU32, pw: PrintWrapper, set_id: int, hk_data: bytes ): if set_id == AdisGyroSetId.CORE_HK: fmt_str = "!ddddddf" inc_len = struct.calcsize(fmt_str) ( ang_veloc_x, ang_veloc_y, ang_veloc_z, accel_x, accel_y, accel_z, temp, ) = struct.unpack(fmt_str, hk_data[0 : 0 + inc_len]) pw.dlog(f"Received ADIS1650X Gyro HK data from object {object_id}") pw.dlog( f"Angular Velocities (degrees per second): X {ang_veloc_x} | " f"Y {ang_veloc_y} | Z {ang_veloc_z}" ) pw.dlog(f"Acceleration (m/s^2): X {accel_x} | Y {accel_y} | Z {accel_z}") pw.dlog(f"Temperature {temp} C") if set_id == AdisGyroSetId.CFG_HK: fmt_str = "!HBHHH" inc_len = struct.calcsize(fmt_str) print(len(hk_data)) ( diag_stat_reg, filter_setting, range_mdl, msc_ctrl_reg, dec_rate_reg, ) = struct.unpack(fmt_str, hk_data[0 : 0 + inc_len]) pw.dlog(f"Diagnostic Status Register {diag_stat_reg:#018b}") pw.dlog(f"Range MDL {range_mdl}") pw.dlog(f"Filter Settings {filter_setting:#010b}") pw.dlog(f"Miscellaneous Control Register {msc_ctrl_reg:#018b}") pw.dlog(f"Decimation Rate {dec_rate_reg:#06x}") def handle_l3g_gyro_hk( object_id: ObjectIdU32, pw: PrintWrapper, set_id: int, hk_data: bytes ): if set_id == L3gGyroSetId.CORE_HK: fmt_str = "!ffff" inc_len = struct.calcsize(fmt_str) (angVelocX, angVelocY, angVelocZ, temp) = struct.unpack( fmt_str, hk_data[0 : 0 + inc_len] ) pw.dlog(f"Received L3GD20H Gyro HK data from object {object_id}") pw.dlog( f"Angular Velocities (degrees per second): X {angVelocX} | " f"Y {angVelocY} | Z {angVelocZ}" ) pw.dlog(f"Temperature {temp} °C") def create_gyros_node() -> CmdTreeNode: node = CmdTreeNode("gyros", "Gyroscope devices") node.add_child(CmdTreeNode(OpCode.CORE_HK, "Request Core HK")) node.add_child(CmdTreeNode(OpCode.CFG_HK, "Request CFG HK")) node.add_child(CmdTreeNode(OpCode.NML, "Normal Mode")) node.add_child(CmdTreeNode(OpCode.OFF, "Off Mode")) node.add_child(CmdTreeNode(OpCode.SET_FAULTY, "Set Faulty")) return node @tmtc_definitions_provider def add_gyr_cmd_defs(defs: TmtcDefinitionWrapper): oce = OpCodeEntry() oce.add(keys=OpCode.CORE_HK, info="Request Core HK") oce.add(keys=OpCode.CFG_HK, info="Request CFG HK") oce.add(keys=OpCode.NML, info="Normal Mode") oce.add(keys=OpCode.OFF, info="Off Mode") oce.add(keys=OpCode.SET_FAULTY, info="Set Faulty") defs.add_service(CustomServiceList.GYRO, info="Gyro", op_code_entry=oce)