# -*- coding: utf-8 -*- """ @file syrlinks_hk_handler.py @brief Syrlinks Hk Handler tests @author J. Meier @date 13.12.2020 """ import enum from eive_tmtc.pus_tm.defs import PrintWrapper from eive_tmtc.config.definitions import CustomServiceList from tmtccmd.config.tmtc import ( tmtc_definitions_provider, TmtcDefinitionWrapper, OpCodeEntry, ) from tmtccmd.tc import DefaultPusQueueHelper from tmtccmd.tc.pus_3_fsfw_hk import ( make_sid, create_request_one_diag_command, create_enable_periodic_hk_command_with_interval, create_disable_periodic_hk_command, ) from spacepackets.ecss.tc import PusTelecommand from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Mode import struct from tmtccmd.util import ObjectIdU32 from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter class SetId: RX_REGISTERS_DATASET = 1 TX_REGISTERS_DATASET = 2 TEMPERATURE_SET_ID = 3 class OpCode: OFF = "off" ON = "on" NORMAL = "nml" STANDBY = "set_tx_standby" SET_CW = "set_tx_carrier_wave" MODULATION = "modulation" HK_RX_REGS = "hk_rx_regs" ENABLE_HK_RX_REGS = "enable_hk_rx" DISABLE_HK_RX_REGS = "disable_hk_rx" HK_TX_REGS = "hk_tx_regs" TX_STATUS = "tx_status" RX_STATUS = "rx_status" class Info: HK_RX_REGS = "Request RX register set" HK_TX_REGS = "Request TX register set" ENABLE_HK_RX_REGS = "Enable periodic RX register HK" DISABLE_HK_RX_REGS = "Disable periodic RX register HK" EMABLE_HK_TX_REGS = "Enable periodic TX register HK" TX_STATUS = "Read TX status (always read in normal mode)" RX_STATUS = "Read RX status (always read in normal mode)" SET_CW = "Set TX carrier wave" class CommandId(enum.IntEnum): READ_RX_STATUS_REGISTERS = 2 SET_TX_MODE_STANDBY = 3 SET_TX_MODE_MODULATION = 4 SET_TX_MODE_CW = 5 READ_TX_STATUS = 7 READ_TX_WAVEFORM = 8 READ_TX_AGC_VALUE_HIGH_BYTE = 9 READ_TX_AGC_VALUE_LOW_BYTE = 10 WRITE_LCL_CONFIG = 11 READ_LCL_CONFIG_REGISTER = 12 SET_WAVEFORM_OQPSK = 17 SET_WAVEFORM_BPSK = 18 SET_SECOND_CONFIG = 19 ENABLE_DEBUG = 20 DISABLE_DEBUG = 21 @tmtc_definitions_provider def add_syrlinks_cmds(defs: TmtcDefinitionWrapper): oce = OpCodeEntry() oce.add(OpCode.OFF, "Syrlinks Handler: Set mode off") oce.add(OpCode.ON, "Syrlinks Handler: Set mode on") oce.add(OpCode.NORMAL, "Syrlinks Handler: Set mode normal") oce.add(OpCode.STANDBY, "Syrlinks Handler: Set TX standby") oce.add(OpCode.MODULATION, "Syrlinks Handler: Set TX modulation") oce.add(OpCode.HK_RX_REGS, Info.HK_RX_REGS) oce.add(OpCode.HK_TX_REGS, Info.HK_TX_REGS) oce.add(OpCode.SET_CW, Info.SET_CW) oce.add(OpCode.TX_STATUS, Info.TX_STATUS) oce.add(OpCode.RX_STATUS, Info.RX_STATUS) oce.add(OpCode.ENABLE_HK_RX_REGS, Info.ENABLE_HK_RX_REGS) oce.add(OpCode.DISABLE_HK_RX_REGS, Info.DISABLE_HK_RX_REGS) oce.add("7", "Syrlinks Handler: Read TX waveform") oce.add("8", "Syrlinks Handler: Read TX AGC value high byte") oce.add("9", "Syrlinks Handler: Read TX AGC value low byte") oce.add("12", "Syrlinks Handler: Write LCL config") oce.add("14", "Syrlinks Handler: Read LCL config register") oce.add("15", "Syrlinks Handler: Set waveform OQPSK") oce.add("16", "Syrlinks Handler: Set waveform BPSK") oce.add("17", "Syrlinks Handler: Set second config") oce.add("18", "Syrlinks Handler: Enable debug output") oce.add("19", "Syrlinks Handler: Disable debug output") defs.add_service(CustomServiceList.SYRLINKS.value, "Syrlinks Handler", oce) def pack_syrlinks_command( object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code: str ): obyt = object_id.as_bytes prefix = "Syrlinks" q.add_log_cmd(f"Testing Syrlinks with object id: {object_id.as_hex_string}") if op_code == OpCode.OFF: q.add_log_cmd(f"{prefix}: Set mode off") data = pack_mode_data(obyt, Mode.OFF, 0) q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=data)) if op_code == OpCode.ON: q.add_log_cmd(f"{prefix}: Set mode on") data = pack_mode_data(obyt, Mode.ON, 0) q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=data)) if op_code == OpCode.NORMAL: q.add_log_cmd(f"{prefix}: Mode Normal") data = pack_mode_data(obyt, Mode.NORMAL, 0) q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=data)) if op_code == OpCode.STANDBY: q.add_log_cmd(f"{prefix}: Set TX mode standby") data = obyt + struct.pack("!I", CommandId.SET_TX_MODE_STANDBY) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == OpCode.MODULATION: q.add_log_cmd(f"{prefix}: Set TX mode modulation") data = obyt + struct.pack("!I", CommandId.SET_TX_MODE_MODULATION) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code in OpCode.SET_CW: q.add_log_cmd(f"{prefix}: {Info.SET_CW}") data = obyt + struct.pack("!I", CommandId.SET_TX_MODE_CW) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code in OpCode.HK_RX_REGS: q.add_log_cmd(f"{prefix}: {Info.HK_RX_REGS}") sid = make_sid(obyt, SetId.RX_REGISTERS_DATASET) q.add_pus_tc(create_request_one_diag_command(sid)) if op_code in OpCode.ENABLE_HK_RX_REGS: q.add_log_cmd(f"{prefix}: {Info.ENABLE_HK_RX_REGS}") sid = make_sid(obyt, SetId.RX_REGISTERS_DATASET) cmds = create_enable_periodic_hk_command_with_interval(True, sid, 2.0) for cmd in cmds: q.add_pus_tc(cmd) if op_code in OpCode.DISABLE_HK_RX_REGS: q.add_log_cmd(f"{prefix}: {Info.DISABLE_HK_RX_REGS}") sid = make_sid(obyt, SetId.RX_REGISTERS_DATASET) q.add_pus_tc(create_disable_periodic_hk_command(True, sid)) if op_code in OpCode.HK_TX_REGS: q.add_log_cmd(f"{prefix}: {Info.HK_TX_REGS}") sid = make_sid(obyt, SetId.TX_REGISTERS_DATASET) q.add_pus_tc(create_request_one_diag_command(sid)) if op_code in OpCode.TX_STATUS: q.add_log_cmd(f"{prefix}: {Info.TX_STATUS}") command = obyt + struct.pack("!I", CommandId.READ_TX_STATUS) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "9": q.add_log_cmd("Syrlinks: Read TX waveform") command = obyt + struct.pack("!I", CommandId.READ_TX_WAVEFORM) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "10": q.add_log_cmd("Syrlinks: Read TX AGC value high byte") command = obyt + struct.pack("!I", CommandId.READ_TX_AGC_VALUE_HIGH_BYTE) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "11": q.add_log_cmd("Syrlinks: Read TX AGC value low byte") command = obyt + struct.pack("!I", CommandId.READ_TX_AGC_VALUE_LOW_BYTE) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "12": q.add_log_cmd("Syrlinks: Write LCL config") command = obyt + struct.pack("!I", CommandId.WRITE_LCL_CONFIG) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "13": q.add_log_cmd("Syrlinks: Read RX status registers") command = obyt + struct.pack("!I", CommandId.READ_RX_STATUS_REGISTERS) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "14": q.add_log_cmd("Syrlinks: Read LCL config register") command = obyt + struct.pack("!I", CommandId.READ_LCL_CONFIG_REGISTER) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "15": q.add_log_cmd("Syrlinks: Set waveform OQPSK") command = obyt + struct.pack("!I", CommandId.SET_WAVEFORM_OQPSK) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "16": q.add_log_cmd("Syrlinks: Set waveform BPSK") command = obyt + struct.pack("!I", CommandId.SET_WAVEFORM_BPSK) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "17": q.add_log_cmd("Syrlinks: Set second config") command = obyt + struct.pack("!I", CommandId.SET_SECOND_CONFIG) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "18": q.add_log_cmd("Syrlinks: Enable debug printout") command = obyt + struct.pack("!I", CommandId.ENABLE_DEBUG) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "19": q.add_log_cmd("Syrlinks: Disable debug printout") command = obyt + struct.pack("!I", CommandId.DISABLE_DEBUG) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) def handle_syrlinks_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): if set_id == SetId.RX_REGISTERS_DATASET: return handle_syrlinks_rx_registers_dataset(printer, hk_data) elif set_id == SetId.TX_REGISTERS_DATASET: return handle_syrlinks_tx_registers_dataset(printer, hk_data) else: pw = PrintWrapper(printer) pw.dlog(f"Service 3 TM: Syrlinks handler reply with unknown set ID {set_id}") def handle_syrlinks_rx_registers_dataset(printer: FsfwTmTcPrinter, hk_data: bytes): pw = PrintWrapper(printer) header_list = [ "RX Status", "RX Sensitivity", "RX Frequency Shift", "RX IQ Power", "RX AGC Value", "RX Demod Eb", "RX Demod N0", "RX Datarate", ] rx_status = hk_data[0] rx_sensitivity = struct.unpack("!I", hk_data[1:5])[0] rx_frequency_shift = struct.unpack("!I", hk_data[5:9])[0] rx_iq_power = struct.unpack("!H", hk_data[9:11])[0] rx_agc_value = struct.unpack("!H", hk_data[11:13])[0] rx_demod_eb = struct.unpack("!I", hk_data[13:17])[0] rx_demod_n0 = struct.unpack("!I", hk_data[17:21])[0] rx_data_rate = hk_data[21] content_list = [ rx_status, rx_sensitivity, rx_frequency_shift, rx_iq_power, rx_agc_value, rx_demod_eb, rx_demod_n0, rx_data_rate, ] validity_buffer = hk_data[22:] for header, content in zip(header_list, content_list): pw.dlog(f"{header}: {content}") printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=8) def handle_syrlinks_tx_registers_dataset( printer: FsfwTmTcPrinter, hk_data: bytes, ): pw = PrintWrapper(printer) header_list = ["TX Status", "TX Waveform", "TX AGC value"] tx_status = hk_data[0] tx_waveform = hk_data[1] tx_agc_value = struct.unpack("!H", hk_data[2:4]) content_list = [tx_status, tx_waveform, tx_agc_value] validity_buffer = hk_data[4:] for header, content in zip(header_list, content_list): pw.dlog(f"{header}: {content}") printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=3)