diff --git a/eive_tmtc/tmtc/power/__init__.py b/eive_tmtc/tmtc/power/__init__.py index e69de29..7167fcc 100644 --- a/eive_tmtc/tmtc/power/__init__.py +++ b/eive_tmtc/tmtc/power/__init__.py @@ -0,0 +1 @@ +from .subsystem import add_eps_subsystem_cmds diff --git a/eive_tmtc/tmtc/power/pwr_ctrl.py b/eive_tmtc/tmtc/power/pwr_ctrl.py index 02e9dbb..4393a07 100644 --- a/eive_tmtc/tmtc/power/pwr_ctrl.py +++ b/eive_tmtc/tmtc/power/pwr_ctrl.py @@ -52,17 +52,6 @@ class ParamId(enum.IntEnum): HIGHER_MODES_LIMIT = 6 -PARAM_DICT = { - ParamId.BATTERY_INTERNAL_RESISTANCE: "BATTERY_INTERNAL_RESISTANCE", - ParamId.BATTERY_MAXIMUM_CAPACITY: "BATTERY_MAXIMUM_CAPACITY", - ParamId.COULOMB_COUNTER_VOLTAGE_UPPER_THRESHOLD: "COULOMB_COUNTER_VOLTAGE_UPPER_THRESHOLD", - ParamId.MAX_ALLOWED_TIME_DIFF: "MAX_ALLOWED_TIME_DIFF", - ParamId.PAYLOAD_OP_LIMIT_ON: "PAYLOAD_OP_LIMIT_ON", - ParamId.PAYLOAD_OP_LIMIT_LOW: "PAYLOAD_OP_LIMIT_LOW", - ParamId.HIGHER_MODES_LIMIT: "HIGHER_MODES_LIMIT", -} - - class OpCodes: OFF = ["mode_off"] ON = ["mode_on"] @@ -102,6 +91,9 @@ def acs_cmd_defs(defs: TmtcDefinitionWrapper): oce.add(keys=OpCodes.REQUEST_ENABLE_PL_HK, info=Info.REQUEST_ENABLE_PL_HK) oce.add(keys=OpCodes.ENABLE_ENABLE_PL_HK, info=Info.ENABLE_ENABLE_PL_HK) oce.add(keys=OpCodes.DISABLE_ENABLE_PL_HK, info=Info.DISABLE_ENABLE_PL_HK) + defs.add_service( + name=CustomServiceList.PWR_CTRL.value, info="PWR Controller", op_code_entry=oce + ) @service_provider(CustomServiceList.PWR_CTRL.value) @@ -164,11 +156,11 @@ def pack_acs_ctrl_command(p: ServiceProviderParams): def set_pwr_ctrl_param(q: DefaultPusQueueHelper): for val in ParamId: - print("{:<2}: {:<20}".format(val, PARAM_DICT[val])) + print("{:<2}: {:<20}".format(val, val.name)) param = int(input(f"Specify parameter to set \n" f"")) match param: case ParamId.BATTERY_INTERNAL_RESISTANCE: - value = int(input("Specify parameter value to set [Ohm]: ")) + value = float(input("Specify parameter value to set [Ohm]: ")) q.add_pus_tc( create_load_param_cmd( create_scalar_float_parameter( @@ -180,7 +172,7 @@ def set_pwr_ctrl_param(q: DefaultPusQueueHelper): ) ) case ParamId.BATTERY_MAXIMUM_CAPACITY: - value = int(input("Specify parameter value to set [Ah]: ")) + value = float(input("Specify parameter value to set [Ah]: ")) q.add_pus_tc( create_load_param_cmd( create_scalar_float_parameter( @@ -192,7 +184,7 @@ def set_pwr_ctrl_param(q: DefaultPusQueueHelper): ) ) case ParamId.COULOMB_COUNTER_VOLTAGE_UPPER_THRESHOLD: - value = int(input("Specify parameter value to set [V]: ")) + value = float(input("Specify parameter value to set [V]: ")) q.add_pus_tc( create_load_param_cmd( create_scalar_float_parameter( @@ -204,7 +196,7 @@ def set_pwr_ctrl_param(q: DefaultPusQueueHelper): ) ) case ParamId.MAX_ALLOWED_TIME_DIFF: - value = int(input("Specify parameter value to set [s]: ")) + value = float(input("Specify parameter value to set [s]: ")) q.add_pus_tc( create_load_param_cmd( create_scalar_double_parameter( @@ -216,7 +208,7 @@ def set_pwr_ctrl_param(q: DefaultPusQueueHelper): ) ) case ParamId.PAYLOAD_OP_LIMIT_ON: - value = int(input("Specify parameter value to set [1]: ")) + value = float(input("Specify parameter value to set [1]: ")) q.add_pus_tc( create_load_param_cmd( create_scalar_float_parameter( @@ -228,7 +220,7 @@ def set_pwr_ctrl_param(q: DefaultPusQueueHelper): ) ) case ParamId.PAYLOAD_OP_LIMIT_LOW: - value = int(input("Specify parameter value to set [1]: ")) + value = float(input("Specify parameter value to set [1]: ")) q.add_pus_tc( create_load_param_cmd( create_scalar_float_parameter( @@ -240,7 +232,7 @@ def set_pwr_ctrl_param(q: DefaultPusQueueHelper): ) ) case ParamId.HIGHER_MODES_LIMIT: - value = int(input("Specify parameter value to set [1]: ")) + value = float(input("Specify parameter value to set [1]: ")) q.add_pus_tc( create_load_param_cmd( create_scalar_float_parameter( @@ -277,30 +269,21 @@ def handle_core_hk_data(pw: PrintWrapper, hk_data: bytes): pw.dlog("Received HK set too small") return current_idx = 0 - total_battery_current = [ - f"{val:d}" - for val in struct.unpack( - fmt_int16, hk_data[current_idx : current_idx + inc_len_int16][0] - ) - ] + total_battery_current = struct.unpack( + fmt_int16, hk_data[current_idx : current_idx + inc_len_int16] + )[0] current_idx += inc_len_int16 - open_circuit_voltage_charge = [ - f"{val:8.3f}" - for val in struct.unpack( - fmt_float, hk_data[current_idx : current_idx + inc_len_float][0] - ) - ] + open_circuit_voltage_charge = struct.unpack( + fmt_float, hk_data[current_idx : current_idx + inc_len_float] + )[0] current_idx += inc_len_float - coulomb_counter_charge = [ - f"{val:d}" - for val in struct.unpack( - fmt_float, hk_data[current_idx : current_idx + inc_len_float] - ) - ] + coulomb_counter_charge = struct.unpack( + fmt_float, hk_data[current_idx : current_idx + inc_len_float] + )[0] current_idx += inc_len_float pw.dlog(f"Total Battery Current: {total_battery_current} [mA]") - pw.dlog(f"Open Circuit Voltage Charge: {open_circuit_voltage_charge*100} [%]") - pw.dlog(f"Coulomb Counter Charge: {coulomb_counter_charge*100} [%]") + pw.dlog(f"Open Circuit Voltage Charge: {open_circuit_voltage_charge*100:8.3f} [%]") + pw.dlog(f"Coulomb Counter Charge: {coulomb_counter_charge*100:8.3f} [%]") FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=3) @@ -312,12 +295,9 @@ def handle_enable_pl_data(pw: PrintWrapper, hk_data: bytes): pw.dlog("Received HK set too small") return current_idx = 0 - pl_use_allowed = [ - f"{val:d}" - for val in struct.unpack( - fmt_uint16, hk_data[current_idx : current_idx + inc_len_uint16][0] - ) - ] + pl_use_allowed = struct.unpack( + fmt_uint16, hk_data[current_idx : current_idx + inc_len_uint16] + )[0] current_idx += inc_len_uint16 pw.dlog(f"PL Use Allowed: {pl_use_allowed}") FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=1) diff --git a/eive_tmtc/tmtc/power/subsystem.py b/eive_tmtc/tmtc/power/subsystem.py new file mode 100644 index 0000000..830a353 --- /dev/null +++ b/eive_tmtc/tmtc/power/subsystem.py @@ -0,0 +1,68 @@ +import enum +from typing import Tuple, Dict + +from spacepackets.ecss import PusTelecommand +from eive_tmtc.tmtc.common import pack_mode_cmd_with_info +from eive_tmtc.config.object_ids import EPS_SUBSYSTEM_ID +from eive_tmtc.config.definitions import CustomServiceList +from tmtccmd.config.tmtc import ( + tmtc_definitions_provider, + TmtcDefinitionWrapper, + OpCodeEntry, +) +from tmtccmd.tc.pus_200_fsfw_mode import Subservice as ModeSubservices, Mode +from tmtccmd.tc import service_provider +from tmtccmd.tc.decorator import ServiceProviderParams + + +class OpCode(str, enum.Enum): + OFF = "off" + NML = "normal" + REPORT_ALL_MODES = "all_modes" + + +class Info(str, enum.Enum): + OFF = "Off Mode Command" + NML = "Normal Mode Command" + REPORT_ALL_MODES = "Report All Modes Recursively" + + +HANDLER_LIST: Dict[str, Tuple[int, int, str]] = { + OpCode.OFF: (Mode.OFF, 0, Info.OFF), + OpCode.NML: (Mode.NORMAL, 0, Info.NML), +} + + +@service_provider(CustomServiceList.EPS_SS.value) +def build_eps_subsystem_cmd(p: ServiceProviderParams): + op_code = p.op_code + q = p.queue_helper + info_prefix = "EPS Subsystem" + if op_code in OpCode.REPORT_ALL_MODES: + q.add_log_cmd(f"{info_prefix}: {Info.REPORT_ALL_MODES}") + q.add_pus_tc( + PusTelecommand( + service=200, + subservice=ModeSubservices.TC_MODE_ANNOUNCE_RECURSIVE, + app_data=EPS_SUBSYSTEM_ID, + ) + ) + mode_info_tup = HANDLER_LIST.get(op_code) + if mode_info_tup is None: + return + pack_mode_cmd_with_info( + object_id=EPS_SUBSYSTEM_ID, + info=f"{info_prefix}: {mode_info_tup[2]}", + mode=mode_info_tup[0], + submode=mode_info_tup[1], + q=q, + ) + + +@tmtc_definitions_provider +def add_eps_subsystem_cmds(defs: TmtcDefinitionWrapper): + oce = OpCodeEntry() + for op_code, (_, _, info) in HANDLER_LIST.items(): + oce.add(op_code, info) + oce.add(OpCode.REPORT_ALL_MODES, Info.REPORT_ALL_MODES) + defs.add_service(CustomServiceList.EPS_SS, "EPS Subsystem", oce)