import datetime import enum import logging import struct from eive_tmtc.config.definitions import CustomServiceList from eive_tmtc.config.object_ids import PWR_CONTROLLER from eive_tmtc.pus_tm.defs import PrintWrapper from tmtccmd.config.tmtc import ( tmtc_definitions_provider, TmtcDefinitionWrapper, OpCodeEntry, ) from tmtccmd.tc import service_provider from tmtccmd.tc.queue import DefaultPusQueueHelper from tmtccmd.tc.pus_200_fsfw_mode import Mode, pack_mode_command from tmtccmd.tc.decorator import ServiceProviderParams from tmtccmd.tc.pus_3_fsfw_hk import ( generate_one_hk_command, make_sid, enable_periodic_hk_command_with_interval, disable_periodic_hk_command, ) from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter from tmtccmd.tc.pus_20_fsfw_param import create_load_param_cmd from tmtccmd.pus.s20_fsfw_param_defs import ( create_scalar_float_parameter, create_scalar_double_parameter, ) _LOGGER = logging.getLogger(__name__) class SetId(enum.IntEnum): CORE_HK_SET = 0 ENABLE_PL_SET = 1 # class ActionId(enum.IntEnum): class ParamId(enum.IntEnum): BATTERY_INTERNAL_RESISTANCE = 0 BATTERY_MAXIMUM_CAPACITY = 1 COULOMB_COUNTER_VOLTAGE_UPPER_THRESHOLD = 2 MAX_ALLOWED_TIME_DIFF = 3 PAYLOAD_OP_LIMIT_ON = 4 PAYLOAD_OP_LIMIT_LOW = 5 HIGHER_MODES_LIMIT = 6 class OpCodes: OFF = ["mode_off"] ON = ["mode_on"] NML = ["mode_normal"] SET_PARAMETER = ["set_parameter"] REQUEST_CORE_HK = ["core_hk"] ENABLE_CORE_HK = ["core_enable_hk"] DISABLE_CORE_HK = ["core_disable_hk"] REQUEST_ENABLE_PL_HK = ["enable_pl_hk"] ENABLE_ENABLE_PL_HK = ["enable_pl_enable_hk"] DISABLE_ENABLE_PL_HK = ["enable_pl_disable_hk"] class Info: OFF = "PWR Ctrl Mode to OFF" ON = "PWR Ctrl Mode to ON" NML = "PWR Ctrl Mode to NORMAL" SET_PARAMETER = "Set Parameter" REQUEST_CORE_HK = "Request Core HK once" ENABLE_CORE_HK = "Enable Core HK Data Generation" DISABLE_CORE_HK = "Disable Core HK Data Generation" REQUEST_ENABLE_PL_HK = "Request Enable PL HK once" ENABLE_ENABLE_PL_HK = "Enable Enable PL HK Data Generation" DISABLE_ENABLE_PL_HK = "Disable Enable PL HK Data Generation" @tmtc_definitions_provider def acs_cmd_defs(defs: TmtcDefinitionWrapper): oce = OpCodeEntry() oce.add(keys=OpCodes.OFF, info=Info.OFF) oce.add(keys=OpCodes.ON, info=Info.ON) oce.add(keys=OpCodes.NML, info=Info.NML) oce.add(keys=OpCodes.SET_PARAMETER, info=Info.SET_PARAMETER) oce.add(keys=OpCodes.REQUEST_CORE_HK, info=Info.REQUEST_CORE_HK) oce.add(keys=OpCodes.ENABLE_CORE_HK, info=Info.ENABLE_CORE_HK) oce.add(keys=OpCodes.DISABLE_CORE_HK, info=Info.DISABLE_CORE_HK) oce.add(keys=OpCodes.REQUEST_ENABLE_PL_HK, info=Info.REQUEST_ENABLE_PL_HK) oce.add(keys=OpCodes.ENABLE_ENABLE_PL_HK, info=Info.ENABLE_ENABLE_PL_HK) oce.add(keys=OpCodes.DISABLE_ENABLE_PL_HK, info=Info.DISABLE_ENABLE_PL_HK) defs.add_service( name=CustomServiceList.PWR_CTRL.value, info="PWR Controller", op_code_entry=oce ) @service_provider(CustomServiceList.PWR_CTRL.value) def pack_acs_ctrl_command(p: ServiceProviderParams): op_code = p.op_code q = p.queue_helper if op_code in OpCodes.OFF: q.add_log_cmd(f"{Info.OFF}") q.add_pus_tc(pack_mode_command(PWR_CONTROLLER, Mode.OFF, 0)) elif op_code in OpCodes.ON: q.add_log_cmd(f"{Info.ON}") q.add_pus_tc(pack_mode_command(PWR_CONTROLLER, Mode.ON, 0)) elif op_code in OpCodes.NML: q.add_log_cmd(f"{Info.NML}") q.add_pus_tc(pack_mode_command(PWR_CONTROLLER, Mode.NORMAL, 0)) elif op_code in OpCodes.SET_PARAMETER: q.add_log_cmd(f"{Info.SET_PARAMETER}") set_pwr_ctrl_param(q) elif op_code in OpCodes.REQUEST_CORE_HK: q.add_log_cmd(Info.REQUEST_CORE_HK) q.add_pus_tc( generate_one_hk_command(make_sid(PWR_CONTROLLER, SetId.CORE_HK_SET)) ) elif op_code in OpCodes.ENABLE_CORE_HK: interval = float(input("Please specify interval in floating point seconds: ")) q.add_log_cmd(Info.ENABLE_CORE_HK) cmd_tuple = enable_periodic_hk_command_with_interval( False, make_sid(PWR_CONTROLLER, SetId.CORE_HK_SET), interval ) q.add_pus_tc(cmd_tuple[0]) q.add_pus_tc(cmd_tuple[1]) elif op_code in OpCodes.DISABLE_CORE_HK: q.add_log_cmd(Info.DISABLE_CORE_HK) q.add_pus_tc( disable_periodic_hk_command( False, make_sid(PWR_CONTROLLER, SetId.CORE_HK_SET) ) ) elif op_code in OpCodes.REQUEST_ENABLE_PL_HK: q.add_log_cmd(Info.REQUEST_ENABLE_PL_HK) q.add_pus_tc( generate_one_hk_command(make_sid(PWR_CONTROLLER, SetId.ENABLE_PL_SET)) ) elif op_code in OpCodes.ENABLE_ENABLE_PL_HK: interval = float(input("Please specify interval in floating point seconds: ")) q.add_log_cmd(Info.ENABLE_ENABLE_PL_HK) cmd_tuple = enable_periodic_hk_command_with_interval( False, make_sid(PWR_CONTROLLER, SetId.ENABLE_PL_SET), interval ) q.add_pus_tc(cmd_tuple[0]) q.add_pus_tc(cmd_tuple[1]) elif op_code in OpCodes.DISABLE_ENABLE_PL_HK: q.add_log_cmd(Info.DISABLE_ENABLE_PL_HK) q.add_pus_tc( disable_periodic_hk_command( False, make_sid(PWR_CONTROLLER, SetId.ENABLE_PL_SET) ) ) def set_pwr_ctrl_param(q: DefaultPusQueueHelper): for val in ParamId: print("{:<2}: {:<20}".format(val, val.name)) param = int(input("Specify parameter to set \n" "")) match param: case ParamId.BATTERY_INTERNAL_RESISTANCE: value = float(input("Specify parameter value to set [Ohm]: ")) q.add_pus_tc( create_load_param_cmd( create_scalar_float_parameter( object_id=PWR_CONTROLLER, domain_id=0, unique_id=ParamId.BATTERY_INTERNAL_RESISTANCE, parameter=value, ) ) ) case ParamId.BATTERY_MAXIMUM_CAPACITY: value = float(input("Specify parameter value to set [Ah]: ")) q.add_pus_tc( create_load_param_cmd( create_scalar_float_parameter( object_id=PWR_CONTROLLER, domain_id=0, unique_id=ParamId.BATTERY_MAXIMUM_CAPACITY, parameter=value, ) ) ) case ParamId.COULOMB_COUNTER_VOLTAGE_UPPER_THRESHOLD: value = float(input("Specify parameter value to set [V]: ")) q.add_pus_tc( create_load_param_cmd( create_scalar_float_parameter( object_id=PWR_CONTROLLER, domain_id=0, unique_id=ParamId.COULOMB_COUNTER_VOLTAGE_UPPER_THRESHOLD, parameter=value, ) ) ) case ParamId.MAX_ALLOWED_TIME_DIFF: value = float(input("Specify parameter value to set [s]: ")) q.add_pus_tc( create_load_param_cmd( create_scalar_double_parameter( object_id=PWR_CONTROLLER, domain_id=0, unique_id=ParamId.MAX_ALLOWED_TIME_DIFF, parameter=value, ) ) ) case ParamId.PAYLOAD_OP_LIMIT_ON: value = float(input("Specify parameter value to set [1]: ")) q.add_pus_tc( create_load_param_cmd( create_scalar_float_parameter( object_id=PWR_CONTROLLER, domain_id=0, unique_id=ParamId.PAYLOAD_OP_LIMIT_ON, parameter=value, ) ) ) case ParamId.PAYLOAD_OP_LIMIT_LOW: value = float(input("Specify parameter value to set [1]: ")) q.add_pus_tc( create_load_param_cmd( create_scalar_float_parameter( object_id=PWR_CONTROLLER, domain_id=0, unique_id=ParamId.PAYLOAD_OP_LIMIT_LOW, parameter=value, ) ) ) case ParamId.HIGHER_MODES_LIMIT: value = float(input("Specify parameter value to set [1]: ")) q.add_pus_tc( create_load_param_cmd( create_scalar_float_parameter( object_id=PWR_CONTROLLER, domain_id=0, unique_id=ParamId.HIGHER_MODES_LIMIT, parameter=value, ) ) ) def handle_pwr_ctrl_hk_data( pw: PrintWrapper, set_id: int, hk_data: bytes, packet_time: datetime.datetime, ): pw.ilog(_LOGGER, f"Received PWR CTRL HK with packet time {packet_time}") match set_id: case SetId.CORE_HK_SET: handle_core_hk_data(pw, hk_data) case SetId.ENABLE_PL_SET: handle_enable_pl_data(pw, hk_data) def handle_core_hk_data(pw: PrintWrapper, hk_data: bytes): pw.dlog("Received Core HK Set") fmt_int16 = "!h" fmt_float = "!f" inc_len_int16 = struct.calcsize(fmt_int16) inc_len_float = struct.calcsize(fmt_float) if len(hk_data) < inc_len_int16 + 2 * inc_len_float: pw.dlog("Received HK set too small") return current_idx = 0 total_battery_current = struct.unpack( fmt_int16, hk_data[current_idx : current_idx + inc_len_int16] )[0] current_idx += inc_len_int16 open_circuit_voltage_charge = struct.unpack( fmt_float, hk_data[current_idx : current_idx + inc_len_float] )[0] current_idx += inc_len_float coulomb_counter_charge = struct.unpack( fmt_float, hk_data[current_idx : current_idx + inc_len_float] )[0] current_idx += inc_len_float pw.dlog(f"Total Battery Current: {total_battery_current} [mA]") pw.dlog(f"Open Circuit Voltage Charge: {open_circuit_voltage_charge*100:8.3f} [%]") pw.dlog(f"Coulomb Counter Charge: {coulomb_counter_charge*100:8.3f} [%]") FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=3) def handle_enable_pl_data(pw: PrintWrapper, hk_data: bytes): pw.dlog("Received Enable PL HK Set") fmt_uint16 = "!B" inc_len_uint16 = struct.calcsize(fmt_uint16) if len(hk_data) < inc_len_uint16: pw.dlog("Received HK set too small") return current_idx = 0 pl_use_allowed = struct.unpack( fmt_uint16, hk_data[current_idx : current_idx + inc_len_uint16] )[0] current_idx += inc_len_uint16 pw.dlog(f"PL Use Allowed: {pl_use_allowed}") FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=1)