Power Controller #238
@ -0,0 +1 @@
|
|||||||
|
from .subsystem import add_eps_subsystem_cmds
|
@ -52,17 +52,6 @@ class ParamId(enum.IntEnum):
|
|||||||
HIGHER_MODES_LIMIT = 6
|
HIGHER_MODES_LIMIT = 6
|
||||||
|
|
||||||
|
|
||||||
PARAM_DICT = {
|
|
||||||
ParamId.BATTERY_INTERNAL_RESISTANCE: "BATTERY_INTERNAL_RESISTANCE",
|
|
||||||
ParamId.BATTERY_MAXIMUM_CAPACITY: "BATTERY_MAXIMUM_CAPACITY",
|
|
||||||
ParamId.COULOMB_COUNTER_VOLTAGE_UPPER_THRESHOLD: "COULOMB_COUNTER_VOLTAGE_UPPER_THRESHOLD",
|
|
||||||
ParamId.MAX_ALLOWED_TIME_DIFF: "MAX_ALLOWED_TIME_DIFF",
|
|
||||||
ParamId.PAYLOAD_OP_LIMIT_ON: "PAYLOAD_OP_LIMIT_ON",
|
|
||||||
ParamId.PAYLOAD_OP_LIMIT_LOW: "PAYLOAD_OP_LIMIT_LOW",
|
|
||||||
ParamId.HIGHER_MODES_LIMIT: "HIGHER_MODES_LIMIT",
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
class OpCodes:
|
class OpCodes:
|
||||||
OFF = ["mode_off"]
|
OFF = ["mode_off"]
|
||||||
ON = ["mode_on"]
|
ON = ["mode_on"]
|
||||||
@ -102,6 +91,9 @@ def acs_cmd_defs(defs: TmtcDefinitionWrapper):
|
|||||||
oce.add(keys=OpCodes.REQUEST_ENABLE_PL_HK, info=Info.REQUEST_ENABLE_PL_HK)
|
oce.add(keys=OpCodes.REQUEST_ENABLE_PL_HK, info=Info.REQUEST_ENABLE_PL_HK)
|
||||||
oce.add(keys=OpCodes.ENABLE_ENABLE_PL_HK, info=Info.ENABLE_ENABLE_PL_HK)
|
oce.add(keys=OpCodes.ENABLE_ENABLE_PL_HK, info=Info.ENABLE_ENABLE_PL_HK)
|
||||||
oce.add(keys=OpCodes.DISABLE_ENABLE_PL_HK, info=Info.DISABLE_ENABLE_PL_HK)
|
oce.add(keys=OpCodes.DISABLE_ENABLE_PL_HK, info=Info.DISABLE_ENABLE_PL_HK)
|
||||||
|
defs.add_service(
|
||||||
|
name=CustomServiceList.PWR_CTRL.value, info="PWR Controller", op_code_entry=oce
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@service_provider(CustomServiceList.PWR_CTRL.value)
|
@service_provider(CustomServiceList.PWR_CTRL.value)
|
||||||
@ -164,11 +156,11 @@ def pack_acs_ctrl_command(p: ServiceProviderParams):
|
|||||||
|
|
||||||
def set_pwr_ctrl_param(q: DefaultPusQueueHelper):
|
def set_pwr_ctrl_param(q: DefaultPusQueueHelper):
|
||||||
for val in ParamId:
|
for val in ParamId:
|
||||||
print("{:<2}: {:<20}".format(val, PARAM_DICT[val]))
|
print("{:<2}: {:<20}".format(val, val.name))
|
||||||
param = int(input(f"Specify parameter to set \n" f""))
|
param = int(input(f"Specify parameter to set \n" f""))
|
||||||
match param:
|
match param:
|
||||||
case ParamId.BATTERY_INTERNAL_RESISTANCE:
|
case ParamId.BATTERY_INTERNAL_RESISTANCE:
|
||||||
value = int(input("Specify parameter value to set [Ohm]: "))
|
value = float(input("Specify parameter value to set [Ohm]: "))
|
||||||
q.add_pus_tc(
|
q.add_pus_tc(
|
||||||
create_load_param_cmd(
|
create_load_param_cmd(
|
||||||
create_scalar_float_parameter(
|
create_scalar_float_parameter(
|
||||||
@ -180,7 +172,7 @@ def set_pwr_ctrl_param(q: DefaultPusQueueHelper):
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
case ParamId.BATTERY_MAXIMUM_CAPACITY:
|
case ParamId.BATTERY_MAXIMUM_CAPACITY:
|
||||||
value = int(input("Specify parameter value to set [Ah]: "))
|
value = float(input("Specify parameter value to set [Ah]: "))
|
||||||
q.add_pus_tc(
|
q.add_pus_tc(
|
||||||
create_load_param_cmd(
|
create_load_param_cmd(
|
||||||
create_scalar_float_parameter(
|
create_scalar_float_parameter(
|
||||||
@ -192,7 +184,7 @@ def set_pwr_ctrl_param(q: DefaultPusQueueHelper):
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
case ParamId.COULOMB_COUNTER_VOLTAGE_UPPER_THRESHOLD:
|
case ParamId.COULOMB_COUNTER_VOLTAGE_UPPER_THRESHOLD:
|
||||||
value = int(input("Specify parameter value to set [V]: "))
|
value = float(input("Specify parameter value to set [V]: "))
|
||||||
q.add_pus_tc(
|
q.add_pus_tc(
|
||||||
create_load_param_cmd(
|
create_load_param_cmd(
|
||||||
create_scalar_float_parameter(
|
create_scalar_float_parameter(
|
||||||
@ -204,7 +196,7 @@ def set_pwr_ctrl_param(q: DefaultPusQueueHelper):
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
case ParamId.MAX_ALLOWED_TIME_DIFF:
|
case ParamId.MAX_ALLOWED_TIME_DIFF:
|
||||||
value = int(input("Specify parameter value to set [s]: "))
|
value = float(input("Specify parameter value to set [s]: "))
|
||||||
q.add_pus_tc(
|
q.add_pus_tc(
|
||||||
create_load_param_cmd(
|
create_load_param_cmd(
|
||||||
create_scalar_double_parameter(
|
create_scalar_double_parameter(
|
||||||
@ -216,7 +208,7 @@ def set_pwr_ctrl_param(q: DefaultPusQueueHelper):
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
case ParamId.PAYLOAD_OP_LIMIT_ON:
|
case ParamId.PAYLOAD_OP_LIMIT_ON:
|
||||||
value = int(input("Specify parameter value to set [1]: "))
|
value = float(input("Specify parameter value to set [1]: "))
|
||||||
q.add_pus_tc(
|
q.add_pus_tc(
|
||||||
create_load_param_cmd(
|
create_load_param_cmd(
|
||||||
create_scalar_float_parameter(
|
create_scalar_float_parameter(
|
||||||
@ -228,7 +220,7 @@ def set_pwr_ctrl_param(q: DefaultPusQueueHelper):
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
case ParamId.PAYLOAD_OP_LIMIT_LOW:
|
case ParamId.PAYLOAD_OP_LIMIT_LOW:
|
||||||
value = int(input("Specify parameter value to set [1]: "))
|
value = float(input("Specify parameter value to set [1]: "))
|
||||||
q.add_pus_tc(
|
q.add_pus_tc(
|
||||||
create_load_param_cmd(
|
create_load_param_cmd(
|
||||||
create_scalar_float_parameter(
|
create_scalar_float_parameter(
|
||||||
@ -240,7 +232,7 @@ def set_pwr_ctrl_param(q: DefaultPusQueueHelper):
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
case ParamId.HIGHER_MODES_LIMIT:
|
case ParamId.HIGHER_MODES_LIMIT:
|
||||||
value = int(input("Specify parameter value to set [1]: "))
|
value = float(input("Specify parameter value to set [1]: "))
|
||||||
q.add_pus_tc(
|
q.add_pus_tc(
|
||||||
create_load_param_cmd(
|
create_load_param_cmd(
|
||||||
create_scalar_float_parameter(
|
create_scalar_float_parameter(
|
||||||
@ -277,30 +269,21 @@ def handle_core_hk_data(pw: PrintWrapper, hk_data: bytes):
|
|||||||
pw.dlog("Received HK set too small")
|
pw.dlog("Received HK set too small")
|
||||||
return
|
return
|
||||||
current_idx = 0
|
current_idx = 0
|
||||||
total_battery_current = [
|
total_battery_current = struct.unpack(
|
||||||
f"{val:d}"
|
fmt_int16, hk_data[current_idx : current_idx + inc_len_int16]
|
||||||
for val in struct.unpack(
|
)[0]
|
||||||
fmt_int16, hk_data[current_idx : current_idx + inc_len_int16][0]
|
|
||||||
)
|
|
||||||
]
|
|
||||||
current_idx += inc_len_int16
|
current_idx += inc_len_int16
|
||||||
open_circuit_voltage_charge = [
|
open_circuit_voltage_charge = struct.unpack(
|
||||||
f"{val:8.3f}"
|
fmt_float, hk_data[current_idx : current_idx + inc_len_float]
|
||||||
for val in struct.unpack(
|
)[0]
|
||||||
fmt_float, hk_data[current_idx : current_idx + inc_len_float][0]
|
|
||||||
)
|
|
||||||
]
|
|
||||||
current_idx += inc_len_float
|
current_idx += inc_len_float
|
||||||
coulomb_counter_charge = [
|
coulomb_counter_charge = struct.unpack(
|
||||||
f"{val:d}"
|
fmt_float, hk_data[current_idx : current_idx + inc_len_float]
|
||||||
for val in struct.unpack(
|
)[0]
|
||||||
fmt_float, hk_data[current_idx : current_idx + inc_len_float]
|
|
||||||
)
|
|
||||||
]
|
|
||||||
current_idx += inc_len_float
|
current_idx += inc_len_float
|
||||||
pw.dlog(f"Total Battery Current: {total_battery_current} [mA]")
|
pw.dlog(f"Total Battery Current: {total_battery_current} [mA]")
|
||||||
pw.dlog(f"Open Circuit Voltage Charge: {open_circuit_voltage_charge*100} [%]")
|
pw.dlog(f"Open Circuit Voltage Charge: {open_circuit_voltage_charge*100:8.3f} [%]")
|
||||||
pw.dlog(f"Coulomb Counter Charge: {coulomb_counter_charge*100} [%]")
|
pw.dlog(f"Coulomb Counter Charge: {coulomb_counter_charge*100:8.3f} [%]")
|
||||||
FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=3)
|
FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=3)
|
||||||
|
|
||||||
|
|
||||||
@ -312,12 +295,9 @@ def handle_enable_pl_data(pw: PrintWrapper, hk_data: bytes):
|
|||||||
pw.dlog("Received HK set too small")
|
pw.dlog("Received HK set too small")
|
||||||
return
|
return
|
||||||
current_idx = 0
|
current_idx = 0
|
||||||
pl_use_allowed = [
|
pl_use_allowed = struct.unpack(
|
||||||
f"{val:d}"
|
fmt_uint16, hk_data[current_idx : current_idx + inc_len_uint16]
|
||||||
for val in struct.unpack(
|
)[0]
|
||||||
fmt_uint16, hk_data[current_idx : current_idx + inc_len_uint16][0]
|
|
||||||
)
|
|
||||||
]
|
|
||||||
current_idx += inc_len_uint16
|
current_idx += inc_len_uint16
|
||||||
pw.dlog(f"PL Use Allowed: {pl_use_allowed}")
|
pw.dlog(f"PL Use Allowed: {pl_use_allowed}")
|
||||||
FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=1)
|
FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=1)
|
||||||
|
68
eive_tmtc/tmtc/power/subsystem.py
Normal file
68
eive_tmtc/tmtc/power/subsystem.py
Normal file
@ -0,0 +1,68 @@
|
|||||||
|
import enum
|
||||||
|
from typing import Tuple, Dict
|
||||||
|
|
||||||
|
from spacepackets.ecss import PusTelecommand
|
||||||
|
from eive_tmtc.tmtc.common import pack_mode_cmd_with_info
|
||||||
|
from eive_tmtc.config.object_ids import EPS_SUBSYSTEM_ID
|
||||||
|
from eive_tmtc.config.definitions import CustomServiceList
|
||||||
|
from tmtccmd.config.tmtc import (
|
||||||
|
tmtc_definitions_provider,
|
||||||
|
TmtcDefinitionWrapper,
|
||||||
|
OpCodeEntry,
|
||||||
|
)
|
||||||
|
from tmtccmd.tc.pus_200_fsfw_mode import Subservice as ModeSubservices, Mode
|
||||||
|
from tmtccmd.tc import service_provider
|
||||||
|
from tmtccmd.tc.decorator import ServiceProviderParams
|
||||||
|
|
||||||
|
|
||||||
|
class OpCode(str, enum.Enum):
|
||||||
|
OFF = "off"
|
||||||
|
NML = "normal"
|
||||||
|
REPORT_ALL_MODES = "all_modes"
|
||||||
|
|
||||||
|
|
||||||
|
class Info(str, enum.Enum):
|
||||||
|
OFF = "Off Mode Command"
|
||||||
|
NML = "Normal Mode Command"
|
||||||
|
REPORT_ALL_MODES = "Report All Modes Recursively"
|
||||||
|
|
||||||
|
|
||||||
|
HANDLER_LIST: Dict[str, Tuple[int, int, str]] = {
|
||||||
|
OpCode.OFF: (Mode.OFF, 0, Info.OFF),
|
||||||
|
OpCode.NML: (Mode.NORMAL, 0, Info.NML),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@service_provider(CustomServiceList.EPS_SS.value)
|
||||||
|
def build_eps_subsystem_cmd(p: ServiceProviderParams):
|
||||||
|
op_code = p.op_code
|
||||||
|
q = p.queue_helper
|
||||||
|
info_prefix = "EPS Subsystem"
|
||||||
|
if op_code in OpCode.REPORT_ALL_MODES:
|
||||||
|
q.add_log_cmd(f"{info_prefix}: {Info.REPORT_ALL_MODES}")
|
||||||
|
q.add_pus_tc(
|
||||||
|
PusTelecommand(
|
||||||
|
service=200,
|
||||||
|
subservice=ModeSubservices.TC_MODE_ANNOUNCE_RECURSIVE,
|
||||||
|
app_data=EPS_SUBSYSTEM_ID,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
mode_info_tup = HANDLER_LIST.get(op_code)
|
||||||
|
if mode_info_tup is None:
|
||||||
|
return
|
||||||
|
pack_mode_cmd_with_info(
|
||||||
|
object_id=EPS_SUBSYSTEM_ID,
|
||||||
|
info=f"{info_prefix}: {mode_info_tup[2]}",
|
||||||
|
mode=mode_info_tup[0],
|
||||||
|
submode=mode_info_tup[1],
|
||||||
|
q=q,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@tmtc_definitions_provider
|
||||||
|
def add_eps_subsystem_cmds(defs: TmtcDefinitionWrapper):
|
||||||
|
oce = OpCodeEntry()
|
||||||
|
for op_code, (_, _, info) in HANDLER_LIST.items():
|
||||||
|
oce.add(op_code, info)
|
||||||
|
oce.add(OpCode.REPORT_ALL_MODES, Info.REPORT_ALL_MODES)
|
||||||
|
defs.add_service(CustomServiceList.EPS_SS, "EPS Subsystem", oce)
|
Loading…
x
Reference in New Issue
Block a user