diff --git a/eive_tmtc/tmtc/power/pwr_ctrl.py b/eive_tmtc/tmtc/power/pwr_ctrl.py new file mode 100644 index 0000000..02e9dbb --- /dev/null +++ b/eive_tmtc/tmtc/power/pwr_ctrl.py @@ -0,0 +1,323 @@ +import datetime +import enum +import logging +import struct + +from eive_tmtc.config.definitions import CustomServiceList +from eive_tmtc.config.object_ids import PWR_CONTROLLER +from eive_tmtc.pus_tm.defs import PrintWrapper +from tmtccmd.config.tmtc import ( + tmtc_definitions_provider, + TmtcDefinitionWrapper, + OpCodeEntry, +) +from tmtccmd.tc import service_provider +from tmtccmd.tc.queue import DefaultPusQueueHelper +from tmtccmd.tc.pus_200_fsfw_mode import Mode, pack_mode_command +from tmtccmd.tc.decorator import ServiceProviderParams +from tmtccmd.tc.pus_3_fsfw_hk import ( + generate_one_hk_command, + make_sid, + enable_periodic_hk_command_with_interval, + disable_periodic_hk_command, + create_request_one_diag_command, +) +from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter + +from tmtccmd.tc.pus_20_fsfw_param import create_load_param_cmd + +from tmtccmd.pus.s20_fsfw_param_defs import ( + create_scalar_float_parameter, + create_scalar_double_parameter, +) + +_LOGGER = logging.getLogger(__name__) + + +class SetId(enum.IntEnum): + CORE_HK_SET = 0 + ENABLE_PL_SET = 1 + + +# class ActionId(enum.IntEnum): + + +class ParamId(enum.IntEnum): + BATTERY_INTERNAL_RESISTANCE = 0 + BATTERY_MAXIMUM_CAPACITY = 1 + COULOMB_COUNTER_VOLTAGE_UPPER_THRESHOLD = 2 + MAX_ALLOWED_TIME_DIFF = 3 + PAYLOAD_OP_LIMIT_ON = 4 + PAYLOAD_OP_LIMIT_LOW = 5 + HIGHER_MODES_LIMIT = 6 + + +PARAM_DICT = { + ParamId.BATTERY_INTERNAL_RESISTANCE: "BATTERY_INTERNAL_RESISTANCE", + ParamId.BATTERY_MAXIMUM_CAPACITY: "BATTERY_MAXIMUM_CAPACITY", + ParamId.COULOMB_COUNTER_VOLTAGE_UPPER_THRESHOLD: "COULOMB_COUNTER_VOLTAGE_UPPER_THRESHOLD", + ParamId.MAX_ALLOWED_TIME_DIFF: "MAX_ALLOWED_TIME_DIFF", + ParamId.PAYLOAD_OP_LIMIT_ON: "PAYLOAD_OP_LIMIT_ON", + ParamId.PAYLOAD_OP_LIMIT_LOW: "PAYLOAD_OP_LIMIT_LOW", + ParamId.HIGHER_MODES_LIMIT: "HIGHER_MODES_LIMIT", +} + + +class OpCodes: + OFF = ["mode_off"] + ON = ["mode_on"] + NML = ["mode_normal"] + SET_PARAMETER = ["set_parameter"] + REQUEST_CORE_HK = ["core_hk"] + ENABLE_CORE_HK = ["core_enable_hk"] + DISABLE_CORE_HK = ["core_disable_hk"] + REQUEST_ENABLE_PL_HK = ["enable_pl_hk"] + ENABLE_ENABLE_PL_HK = ["enable_pl_enable_hk"] + DISABLE_ENABLE_PL_HK = ["enable_pl_disable_hk"] + + +class Info: + OFF = "PWR Ctrl Mode to OFF" + ON = "PWR Ctrl Mode to ON" + NML = "PWR Ctrl Mode to NORMAL" + SET_PARAMETER = "Set Parameter" + REQUEST_CORE_HK = "Request Core HK once" + ENABLE_CORE_HK = "Enable Core HK Data Generation" + DISABLE_CORE_HK = "Disable Core HK Data Generation" + REQUEST_ENABLE_PL_HK = "Request Enable PL HK once" + ENABLE_ENABLE_PL_HK = "Enable Enable PL HK Data Generation" + DISABLE_ENABLE_PL_HK = "Disable Enable PL HK Data Generation" + + +@tmtc_definitions_provider +def acs_cmd_defs(defs: TmtcDefinitionWrapper): + oce = OpCodeEntry() + oce.add(keys=OpCodes.OFF, info=Info.OFF) + oce.add(keys=OpCodes.ON, info=Info.ON) + oce.add(keys=OpCodes.NML, info=Info.NML) + oce.add(keys=OpCodes.SET_PARAMETER, info=Info.SET_PARAMETER) + oce.add(keys=OpCodes.REQUEST_CORE_HK, info=Info.REQUEST_CORE_HK) + oce.add(keys=OpCodes.ENABLE_CORE_HK, info=Info.ENABLE_CORE_HK) + oce.add(keys=OpCodes.DISABLE_CORE_HK, info=Info.DISABLE_CORE_HK) + oce.add(keys=OpCodes.REQUEST_ENABLE_PL_HK, info=Info.REQUEST_ENABLE_PL_HK) + oce.add(keys=OpCodes.ENABLE_ENABLE_PL_HK, info=Info.ENABLE_ENABLE_PL_HK) + oce.add(keys=OpCodes.DISABLE_ENABLE_PL_HK, info=Info.DISABLE_ENABLE_PL_HK) + + +@service_provider(CustomServiceList.PWR_CTRL.value) +def pack_acs_ctrl_command(p: ServiceProviderParams): + op_code = p.op_code + q = p.queue_helper + if op_code in OpCodes.OFF: + q.add_log_cmd(f"{Info.OFF}") + q.add_pus_tc(pack_mode_command(PWR_CONTROLLER, Mode.OFF, 0)) + elif op_code in OpCodes.ON: + q.add_log_cmd(f"{Info.ON}") + q.add_pus_tc(pack_mode_command(PWR_CONTROLLER, Mode.ON, 0)) + elif op_code in OpCodes.NML: + q.add_log_cmd(f"{Info.NML}") + q.add_pus_tc(pack_mode_command(PWR_CONTROLLER, Mode.NORMAL, 0)) + elif op_code in OpCodes.SET_PARAMETER: + q.add_log_cmd(f"{Info.SET_PARAMETER}") + set_pwr_ctrl_param(q) + elif op_code in OpCodes.REQUEST_CORE_HK: + q.add_log_cmd(Info.REQUEST_CORE_HK) + q.add_pus_tc( + generate_one_hk_command(make_sid(PWR_CONTROLLER, SetId.CORE_HK_SET)) + ) + elif op_code in OpCodes.ENABLE_CORE_HK: + interval = float(input("Please specify interval in floating point seconds: ")) + q.add_log_cmd(Info.ENABLE_CORE_HK) + cmd_tuple = enable_periodic_hk_command_with_interval( + False, make_sid(PWR_CONTROLLER, SetId.CORE_HK_SET), interval + ) + q.add_pus_tc(cmd_tuple[0]) + q.add_pus_tc(cmd_tuple[1]) + elif op_code in OpCodes.DISABLE_CORE_HK: + q.add_log_cmd(Info.DISABLE_CORE_HK) + q.add_pus_tc( + disable_periodic_hk_command( + False, make_sid(PWR_CONTROLLER, SetId.CORE_HK_SET) + ) + ) + elif op_code in OpCodes.REQUEST_ENABLE_PL_HK: + q.add_log_cmd(Info.REQUEST_ENABLE_PL_HK) + q.add_pus_tc( + generate_one_hk_command(make_sid(PWR_CONTROLLER, SetId.ENABLE_PL_SET)) + ) + elif op_code in OpCodes.ENABLE_ENABLE_PL_HK: + interval = float(input("Please specify interval in floating point seconds: ")) + q.add_log_cmd(Info.ENABLE_ENABLE_PL_HK) + cmd_tuple = enable_periodic_hk_command_with_interval( + False, make_sid(PWR_CONTROLLER, SetId.ENABLE_PL_SET), interval + ) + q.add_pus_tc(cmd_tuple[0]) + q.add_pus_tc(cmd_tuple[1]) + elif op_code in OpCodes.DISABLE_ENABLE_PL_HK: + q.add_log_cmd(Info.DISABLE_ENABLE_PL_HK) + q.add_pus_tc( + disable_periodic_hk_command( + False, make_sid(PWR_CONTROLLER, SetId.ENABLE_PL_SET) + ) + ) + + +def set_pwr_ctrl_param(q: DefaultPusQueueHelper): + for val in ParamId: + print("{:<2}: {:<20}".format(val, PARAM_DICT[val])) + param = int(input(f"Specify parameter to set \n" f"")) + match param: + case ParamId.BATTERY_INTERNAL_RESISTANCE: + value = int(input("Specify parameter value to set [Ohm]: ")) + q.add_pus_tc( + create_load_param_cmd( + create_scalar_float_parameter( + object_id=PWR_CONTROLLER, + domain_id=0, + unique_id=ParamId.BATTERY_INTERNAL_RESISTANCE, + parameter=value, + ) + ) + ) + case ParamId.BATTERY_MAXIMUM_CAPACITY: + value = int(input("Specify parameter value to set [Ah]: ")) + q.add_pus_tc( + create_load_param_cmd( + create_scalar_float_parameter( + object_id=PWR_CONTROLLER, + domain_id=0, + unique_id=ParamId.BATTERY_MAXIMUM_CAPACITY, + parameter=value, + ) + ) + ) + case ParamId.COULOMB_COUNTER_VOLTAGE_UPPER_THRESHOLD: + value = int(input("Specify parameter value to set [V]: ")) + q.add_pus_tc( + create_load_param_cmd( + create_scalar_float_parameter( + object_id=PWR_CONTROLLER, + domain_id=0, + unique_id=ParamId.COULOMB_COUNTER_VOLTAGE_UPPER_THRESHOLD, + parameter=value, + ) + ) + ) + case ParamId.MAX_ALLOWED_TIME_DIFF: + value = int(input("Specify parameter value to set [s]: ")) + q.add_pus_tc( + create_load_param_cmd( + create_scalar_double_parameter( + object_id=PWR_CONTROLLER, + domain_id=0, + unique_id=ParamId.MAX_ALLOWED_TIME_DIFF, + parameter=value, + ) + ) + ) + case ParamId.PAYLOAD_OP_LIMIT_ON: + value = int(input("Specify parameter value to set [1]: ")) + q.add_pus_tc( + create_load_param_cmd( + create_scalar_float_parameter( + object_id=PWR_CONTROLLER, + domain_id=0, + unique_id=ParamId.PAYLOAD_OP_LIMIT_ON, + parameter=value, + ) + ) + ) + case ParamId.PAYLOAD_OP_LIMIT_LOW: + value = int(input("Specify parameter value to set [1]: ")) + q.add_pus_tc( + create_load_param_cmd( + create_scalar_float_parameter( + object_id=PWR_CONTROLLER, + domain_id=0, + unique_id=ParamId.PAYLOAD_OP_LIMIT_LOW, + parameter=value, + ) + ) + ) + case ParamId.HIGHER_MODES_LIMIT: + value = int(input("Specify parameter value to set [1]: ")) + q.add_pus_tc( + create_load_param_cmd( + create_scalar_float_parameter( + object_id=PWR_CONTROLLER, + domain_id=0, + unique_id=ParamId.HIGHER_MODES_LIMIT, + parameter=value, + ) + ) + ) + + +def handle_pwr_ctrl_hk_data( + pw: PrintWrapper, + set_id: int, + hk_data: bytes, + packet_time: datetime.datetime, +): + pw.ilog(_LOGGER, f"Received PWR CTRL HK with packet time {packet_time}") + match set_id: + case SetId.CORE_HK_SET: + handle_core_hk_data(pw, hk_data) + case SetId.ENABLE_PL_SET: + handle_enable_pl_data(pw, hk_data) + + +def handle_core_hk_data(pw: PrintWrapper, hk_data: bytes): + pw.dlog("Received Core HK Set") + fmt_int16 = "!h" + fmt_float = "!f" + inc_len_int16 = struct.calcsize(fmt_int16) + inc_len_float = struct.calcsize(fmt_float) + if len(hk_data) < inc_len_int16 + 2 * inc_len_float: + pw.dlog("Received HK set too small") + return + current_idx = 0 + total_battery_current = [ + f"{val:d}" + for val in struct.unpack( + fmt_int16, hk_data[current_idx : current_idx + inc_len_int16][0] + ) + ] + current_idx += inc_len_int16 + open_circuit_voltage_charge = [ + f"{val:8.3f}" + for val in struct.unpack( + fmt_float, hk_data[current_idx : current_idx + inc_len_float][0] + ) + ] + current_idx += inc_len_float + coulomb_counter_charge = [ + f"{val:d}" + for val in struct.unpack( + fmt_float, hk_data[current_idx : current_idx + inc_len_float] + ) + ] + current_idx += inc_len_float + pw.dlog(f"Total Battery Current: {total_battery_current} [mA]") + pw.dlog(f"Open Circuit Voltage Charge: {open_circuit_voltage_charge*100} [%]") + pw.dlog(f"Coulomb Counter Charge: {coulomb_counter_charge*100} [%]") + FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=3) + + +def handle_enable_pl_data(pw: PrintWrapper, hk_data: bytes): + pw.dlog("Received Enable PL HK Set") + fmt_uint16 = "!B" + inc_len_uint16 = struct.calcsize(fmt_uint16) + if len(hk_data) < inc_len_uint16: + pw.dlog("Received HK set too small") + return + current_idx = 0 + pl_use_allowed = [ + f"{val:d}" + for val in struct.unpack( + fmt_uint16, hk_data[current_idx : current_idx + inc_len_uint16][0] + ) + ] + current_idx += inc_len_uint16 + pw.dlog(f"PL Use Allowed: {pl_use_allowed}") + FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=1)