import enum import struct from eive_tmtc.pus_tm.defs import PrintWrapper from spacepackets.ecss import PusTelecommand from eive_tmtc.config.definitions import CustomServiceList from eive_tmtc.config.object_ids import BPX_HANDLER_ID from tmtccmd.config.tmtc import ( tmtc_definitions_provider, TmtcDefinitionWrapper, OpCodeEntry, ) from tmtccmd.tmtc import service_provider from tmtccmd.tmtc.decorator import ServiceProviderParams from tmtccmd.pus.s8_fsfw_action import create_action_cmd from tmtccmd.pus.tc.s3_fsfw_hk import generate_one_hk_command, make_sid from tmtccmd.pus.s200_fsfw_mode import pack_mode_data, Mode from tmtccmd.pus.s200_fsfw_mode import Subservice as ModeSubservices from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter class BpxSetId(enum.IntEnum): GET_HK_SET = 0 GET_CFG_SET = 5 class BpxActionId: REBOOT = 2 RESET_COUNTERS = 3 CONFIG_CMD = 4 GET_CFG = 5 SET_CFG = 6 MAN_HEATER_ON = 10 MAN_HEATER_OFF = 11 class BpxHeaterModeSelect(enum.IntEnum): OFF = 0 AUTO = 1 class BpxOpCode: HK = "hk" OFF = "off" ON = "on" RST_CFG = "reset_cfg" SET_CFG = "set_cfg" MAN_HEATER_ON = "man_heater_on" MAN_HEATER_OFF = "man_heater_off" RST_BOOT_CNT = "rst_boot_cnt" REQUEST_CFG = "cfg" REQUEST_CFG_HK = "cfg_hk" REBOOT = "reboot" @tmtc_definitions_provider def add_bpx_cmd_definitions(defs: TmtcDefinitionWrapper): oce = OpCodeEntry() oce.add(keys=BpxOpCode.ON, info="On command") oce.add(keys=BpxOpCode.OFF, info="Off command") oce.add(keys=BpxOpCode.HK, info="Request BPX HK") oce.add(keys=BpxOpCode.RST_BOOT_CNT, info="Reset Boot Count") oce.add(keys=BpxOpCode.RST_CFG, info="Reset Config to stored default settings") oce.add(keys=BpxOpCode.SET_CFG, info="Set BPX configuration") oce.add(keys=BpxOpCode.MAN_HEATER_ON, info="Manual heater on") oce.add(keys=BpxOpCode.MAN_HEATER_OFF, info="Manual heater off") oce.add(keys=BpxOpCode.REQUEST_CFG, info="Request Configuration Struct (Step 1)") oce.add( keys=BpxOpCode.REQUEST_CFG_HK, info="Request Configuration Struct HK (Step 2)" ) oce.add(keys=BpxOpCode.REBOOT, info="Reboot Command") defs.add_service( name=CustomServiceList.BPX_BATTERY.value, info="BPX Battery Handler", op_code_entry=oce, ) @service_provider(CustomServiceList.BPX_BATTERY.value) def pack_bpx_commands(p: ServiceProviderParams): # noqa C901: Complexity is okay here. op_code = p.op_code q = p.queue_helper if op_code == BpxOpCode.HK: q.add_log_cmd("Requesting BPX battery HK set") sid = make_sid(object_id=BPX_HANDLER_ID, set_id=BpxSetId.GET_HK_SET) q.add_pus_tc(generate_one_hk_command(sid=sid)) if op_code == BpxOpCode.OFF: q.add_log_cmd("Off mode") mode_cmd = pack_mode_data(BPX_HANDLER_ID, Mode.OFF, 0) q.add_pus_tc( PusTelecommand( service=200, subservice=ModeSubservices.TC_MODE_COMMAND, app_data=mode_cmd, ) ) if op_code == BpxOpCode.ON: q.add_log_cmd("On mode") mode_cmd = pack_mode_data(BPX_HANDLER_ID, Mode.ON, 0) q.add_pus_tc( PusTelecommand( service=200, subservice=ModeSubservices.TC_MODE_COMMAND, app_data=mode_cmd, ) ) if op_code == BpxOpCode.RST_BOOT_CNT: q.add_log_cmd("Resetting reboot counters") q.add_pus_tc( create_action_cmd( object_id=BPX_HANDLER_ID, action_id=BpxActionId.RESET_COUNTERS ) ) if op_code == BpxOpCode.RST_CFG: q.add_log_cmd("Reset BPX configuration") q.add_pus_tc( create_action_cmd( object_id=BPX_HANDLER_ID, action_id=BpxActionId.CONFIG_CMD ) ) if op_code == BpxOpCode.SET_CFG: q.add_log_cmd("Setting BPX configuration") user_data = bytearray() batt_mode = BpxHeaterModeSelect( int(input("BPX heater mode select, 0 for OFF 1 for AUTO: ")) ) user_data.append(batt_mode) lower_limit = int(input("Lower heater limit (-2 default): ")) user_data.append(struct.pack("!b", lower_limit)[0]) upper_limit = int(input("Upper heater limit (3 default): ")) user_data.append(struct.pack("!b", upper_limit)[0]) q.add_pus_tc( create_action_cmd( object_id=BPX_HANDLER_ID, action_id=BpxActionId.SET_CFG, user_data=user_data, ) ) if op_code == BpxOpCode.REQUEST_CFG: q.add_log_cmd("Requesting configuration struct") q.add_pus_tc( create_action_cmd(object_id=BPX_HANDLER_ID, action_id=BpxActionId.GET_CFG) ) if op_code == BpxOpCode.REQUEST_CFG_HK: q.add_log_cmd("Requesting configuration struct HK") sid = make_sid(object_id=BPX_HANDLER_ID, set_id=BpxSetId.GET_CFG_SET) q.add_pus_tc(generate_one_hk_command(sid=sid)) if op_code == BpxOpCode.REBOOT: q.add_log_cmd("Rebooting BPX battery") q.add_pus_tc( create_action_cmd(object_id=BPX_HANDLER_ID, action_id=BpxActionId.REBOOT) ) if op_code == BpxOpCode.MAN_HEATER_ON: q.add_log_cmd("BPX manual heater on with seconds burntime") burn_time = int(input("BPX heater burn time in seconds [1-65535]: ")) if burn_time < 1 or burn_time > 65535: raise ValueError("Invalid burntime, smaller than 0 or larger than 65535") q.add_pus_tc( create_action_cmd( object_id=BPX_HANDLER_ID, action_id=BpxActionId.MAN_HEATER_ON, user_data=struct.pack("!H", burn_time), ) ) if op_code == BpxOpCode.MAN_HEATER_OFF: q.add_log_cmd("BPX manual heater off") q.add_pus_tc( create_action_cmd( object_id=BPX_HANDLER_ID, action_id=BpxActionId.MAN_HEATER_OFF ) ) HEADER_LIST = [ "Charge Current", "Discharge Current", "Heater Current", "Battery Voltage", "Batt Temp 1", "Batt Temp 2", "Batt Temp 3", "Batt Temp 4", "Reboot Counter", "Boot Cause", ] def handle_bpx_hk_data(pw: PrintWrapper, set_id: int, hk_data: bytes): if set_id == BpxSetId.GET_HK_SET: fmt_str = "!HHHHhhhhIB" inc_len = struct.calcsize(fmt_str) ( charge_current, discharge_current, heater_current, batt_voltage, batt_temp_1, batt_temp_2, batt_temp_3, batt_temp_4, reboot_cntr, boot_cause, ) = struct.unpack(fmt_str, hk_data[0:inc_len]) content_list = [ charge_current, discharge_current, heater_current, batt_voltage, batt_temp_1, batt_temp_2, batt_temp_3, batt_temp_4, reboot_cntr, boot_cause, ] validity_buffer = hk_data[inc_len:] pw.dlog(str(HEADER_LIST)) pw.dlog(str(content_list)) FsfwTmTcPrinter.get_validity_buffer( validity_buffer=validity_buffer, num_vars=10 ) elif set_id == BpxSetId.GET_CFG_SET: battheat_mode = hk_data[0] battheat_low = struct.unpack("!b", hk_data[1:2])[0] battheat_high = struct.unpack("!b", hk_data[2:3])[0] header_list = [ "Battery Heater Mode", "Battery Heater Low Limit", "Battery Heater High Limit", ] content_list = [battheat_mode, battheat_low, battheat_high] validity_buffer = hk_data[3:] pw.dlog(str(header_list)) pw.dlog(str(content_list)) FsfwTmTcPrinter.get_validity_buffer( validity_buffer=validity_buffer, num_vars=10 )