import enum from typing import Optional import struct from eive_tmtc.config.definitions import CustomServiceList from eive_tmtc.pus_tm.defs import PrintWrapper from spacepackets.ecss import PusTelecommand from tmtccmd.config import TmtcDefinitionWrapper, OpCodeEntry from tmtccmd.config.tmtc import tmtc_definitions_provider from tmtccmd.tc import DefaultPusQueueHelper from tmtccmd.util import ObjectIdU32 from tmtccmd.tc.pus_200_fsfw_mode import Mode, pack_mode_data, Subservice import eive_tmtc.config.object_ids as oids from eive_tmtc.config.object_ids import get_object_ids from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter RTD_IDS = [ oids.RTD_0_PLOC_HSPD, oids.RTD_1_PLOC_MISSIONBRD, oids.RTD_2_4K_CAM, oids.RTD_3_DAC_HSPD, oids.RTD_4_STR, oids.RTD_5_RW1_MX_MY, oids.RTD_6_DRO, oids.RTD_7_SCEX, oids.RTD_8_X8, oids.RTD_9_HPA, oids.RTD_10_PL_TX, oids.RTD_11_MPA, oids.RTD_12_ACU, oids.RTD_13_PLPCDU_HSPD, oids.RTD_14_TCS_BRD, oids.RTD_15_IMTQ, ] RTD_NAMES = { oids.RTD_0_PLOC_HSPD: "RTD 0 PLOC Heatspreader", oids.RTD_1_PLOC_MISSIONBRD: "RTD 1 PLOC Missionboard", oids.RTD_2_4K_CAM: "RTD 2 4K Camera", oids.RTD_3_DAC_HSPD: "RTD 3 DAC HSPC", oids.RTD_4_STR: "RTD 4 Startracker", oids.RTD_5_RW1_MX_MY: "RTD 5 RW1 MX MY", oids.RTD_6_DRO: "RTD 6 DRO", oids.RTD_7_SCEX: "RTD 7 SCEX", oids.RTD_8_X8: "RTD 8 X8", oids.RTD_9_HPA: "RTD 9 HPA", oids.RTD_10_PL_TX: "RTD 10 PL TX", oids.RTD_11_MPA: "RTD 11 MPA", oids.RTD_12_ACU: "RTD 12 ACU", oids.RTD_13_PLPCDU_HSPD: "RTD 13 PL PCDU Heatspreader", oids.RTD_14_TCS_BRD: "RTD 14 TCS Board", oids.RTD_15_IMTQ: "RTD 15 iMTQ", } class CommandId: WRITE_CONFIG = 6 class RtdId(enum.IntEnum): RTD_0_PLOC_HSPC = 0 RTD_1_PLOC_MISSIONBRD = 1 RTD_2_4K_CAM = 2 RTD_3_DAC_HSPD = 3 RTD_4_STR = 4 RTD_5_RW1_MX_MY = 5 RTD_6_DRO = 6 RTD_7_SCEX = 7 RTD_8_X8 = 8 RTD_9_HPA = 9 RTD_10_PL_TX = 10 RTD_11_MPA = 11 RTD_12_ACU = 12 RTD_13_PLPCDU_HSPD = 13 RTD_14_TCS_BRD = 14 RTD_15_IMTQ = 15 class SetId(enum.IntEnum): TEMPERATURE = 1 class OpCode: ON = ["0", "on"] OFF = ["1", "off"] NORMAL = ["2", "normal"] WRITE_CONFIG = ["3", "Write config"] class Info: ON = "Switch handler on" OFF = "Switch handler off" NORMAL = "Switch handler normal" WRITE_CONFIG = "Write config" @tmtc_definitions_provider def specify_rtd_cmds(defs: TmtcDefinitionWrapper): oce = OpCodeEntry() oce.add(keys=OpCode.ON, info=Info.ON) oce.add(keys=OpCode.NORMAL, info=Info.NORMAL) oce.add(keys=OpCode.OFF, info=Info.OFF) oce.add(keys=OpCode.WRITE_CONFIG, info=Info.WRITE_CONFIG) defs.add_service( name=CustomServiceList.RTD.value, info="RTD commands", op_code_entry=oce ) def pack_rtd_commands( op_code: str, object_id: Optional[ObjectIdU32], q: DefaultPusQueueHelper ): if object_id is not None and object_id not in RTD_IDS: print("Specified object ID not a valid RTD ID") object_id = None if object_id is None: tgt_rtd_idx = prompt_rtd_idx() object_id_dict = get_object_ids() object_id = object_id_dict.get(RTD_IDS[tgt_rtd_idx]) if op_code in OpCode.ON: app_data = pack_mode_data(object_id=object_id.as_bytes, mode=Mode.ON, submode=0) q.add_pus_tc( PusTelecommand( service=200, subservice=Subservice.TC_MODE_COMMAND, app_data=app_data ) ) if op_code in OpCode.NORMAL: app_data = pack_mode_data( object_id=object_id.as_bytes, mode=Mode.NORMAL, submode=0 ) q.add_pus_tc( PusTelecommand( service=200, subservice=Subservice.TC_MODE_COMMAND, app_data=app_data ) ) if op_code in OpCode.OFF: app_data = pack_mode_data( object_id=object_id.as_bytes, mode=Mode.OFF, submode=0 ) q.add_pus_tc( PusTelecommand( service=200, subservice=Subservice.TC_MODE_COMMAND, app_data=app_data ) ) if op_code in OpCode.WRITE_CONFIG: command = object_id.as_bytes + struct.pack("!I", CommandId.WRITE_CONFIG) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) def handle_rtd_hk(object_id: bytes, hk_data: bytes, printer: FsfwTmTcPrinter): pw = PrintWrapper(printer) rtd_name = RTD_NAMES.get(object_id) if rtd_name is None: rtd_name = "unknown RTD device" pw.dlog(f"Received RTD HK for RTD {rtd_name} with object ID {object_id}") fmt_str = "!ffBB" fmt_len = struct.calcsize(fmt_str) (rtd_val, temp_celcius, last_err_byte, error_byte) = struct.unpack( fmt_str, hk_data[0 : 0 + fmt_len] ) pw.dlog(f"Temperature Celcius: {temp_celcius}") pw.dlog(f"RTD Value: {rtd_val}") pw.dlog(f"Error Byte: {error_byte}") pw.dlog(f"Last Error Byte: {last_err_byte}") pw.printer.print_validity_buffer(hk_data[fmt_len:], 4) def prompt_rtd_idx(): while True: rtd_idx = input("Please specify RTD index [0-15]: ") if not rtd_idx.isdigit(): print("Invalid input") continue rtd_idx = int(rtd_idx) if rtd_idx < 0 or rtd_idx > 15: print("Invalid device index") continue return rtd_idx