pwr ctrl init

This commit is contained in:
Marius Eggert 2023-10-09 11:10:39 +02:00
parent 88b0c33632
commit 11f7ed8436
1 changed files with 323 additions and 0 deletions

View File

@ -0,0 +1,323 @@
import datetime
import enum
import logging
import struct
from eive_tmtc.config.definitions import CustomServiceList
from eive_tmtc.config.object_ids import PWR_CONTROLLER
from eive_tmtc.pus_tm.defs import PrintWrapper
from tmtccmd.config.tmtc import (
tmtc_definitions_provider,
TmtcDefinitionWrapper,
OpCodeEntry,
)
from tmtccmd.tc import service_provider
from tmtccmd.tc.queue import DefaultPusQueueHelper
from tmtccmd.tc.pus_200_fsfw_mode import Mode, pack_mode_command
from tmtccmd.tc.decorator import ServiceProviderParams
from tmtccmd.tc.pus_3_fsfw_hk import (
generate_one_hk_command,
make_sid,
enable_periodic_hk_command_with_interval,
disable_periodic_hk_command,
create_request_one_diag_command,
)
from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter
from tmtccmd.tc.pus_20_fsfw_param import create_load_param_cmd
from tmtccmd.pus.s20_fsfw_param_defs import (
create_scalar_float_parameter,
create_scalar_double_parameter,
)
_LOGGER = logging.getLogger(__name__)
class SetId(enum.IntEnum):
CORE_HK_SET = 0
ENABLE_PL_SET = 1
# class ActionId(enum.IntEnum):
class ParamId(enum.IntEnum):
BATTERY_INTERNAL_RESISTANCE = 0
BATTERY_MAXIMUM_CAPACITY = 1
COULOMB_COUNTER_VOLTAGE_UPPER_THRESHOLD = 2
MAX_ALLOWED_TIME_DIFF = 3
PAYLOAD_OP_LIMIT_ON = 4
PAYLOAD_OP_LIMIT_LOW = 5
HIGHER_MODES_LIMIT = 6
PARAM_DICT = {
ParamId.BATTERY_INTERNAL_RESISTANCE: "BATTERY_INTERNAL_RESISTANCE",
ParamId.BATTERY_MAXIMUM_CAPACITY: "BATTERY_MAXIMUM_CAPACITY",
ParamId.COULOMB_COUNTER_VOLTAGE_UPPER_THRESHOLD: "COULOMB_COUNTER_VOLTAGE_UPPER_THRESHOLD",
ParamId.MAX_ALLOWED_TIME_DIFF: "MAX_ALLOWED_TIME_DIFF",
ParamId.PAYLOAD_OP_LIMIT_ON: "PAYLOAD_OP_LIMIT_ON",
ParamId.PAYLOAD_OP_LIMIT_LOW: "PAYLOAD_OP_LIMIT_LOW",
ParamId.HIGHER_MODES_LIMIT: "HIGHER_MODES_LIMIT",
}
class OpCodes:
OFF = ["mode_off"]
ON = ["mode_on"]
NML = ["mode_normal"]
SET_PARAMETER = ["set_parameter"]
REQUEST_CORE_HK = ["core_hk"]
ENABLE_CORE_HK = ["core_enable_hk"]
DISABLE_CORE_HK = ["core_disable_hk"]
REQUEST_ENABLE_PL_HK = ["enable_pl_hk"]
ENABLE_ENABLE_PL_HK = ["enable_pl_enable_hk"]
DISABLE_ENABLE_PL_HK = ["enable_pl_disable_hk"]
class Info:
OFF = "PWR Ctrl Mode to OFF"
ON = "PWR Ctrl Mode to ON"
NML = "PWR Ctrl Mode to NORMAL"
SET_PARAMETER = "Set Parameter"
REQUEST_CORE_HK = "Request Core HK once"
ENABLE_CORE_HK = "Enable Core HK Data Generation"
DISABLE_CORE_HK = "Disable Core HK Data Generation"
REQUEST_ENABLE_PL_HK = "Request Enable PL HK once"
ENABLE_ENABLE_PL_HK = "Enable Enable PL HK Data Generation"
DISABLE_ENABLE_PL_HK = "Disable Enable PL HK Data Generation"
@tmtc_definitions_provider
def acs_cmd_defs(defs: TmtcDefinitionWrapper):
oce = OpCodeEntry()
oce.add(keys=OpCodes.OFF, info=Info.OFF)
oce.add(keys=OpCodes.ON, info=Info.ON)
oce.add(keys=OpCodes.NML, info=Info.NML)
oce.add(keys=OpCodes.SET_PARAMETER, info=Info.SET_PARAMETER)
oce.add(keys=OpCodes.REQUEST_CORE_HK, info=Info.REQUEST_CORE_HK)
oce.add(keys=OpCodes.ENABLE_CORE_HK, info=Info.ENABLE_CORE_HK)
oce.add(keys=OpCodes.DISABLE_CORE_HK, info=Info.DISABLE_CORE_HK)
oce.add(keys=OpCodes.REQUEST_ENABLE_PL_HK, info=Info.REQUEST_ENABLE_PL_HK)
oce.add(keys=OpCodes.ENABLE_ENABLE_PL_HK, info=Info.ENABLE_ENABLE_PL_HK)
oce.add(keys=OpCodes.DISABLE_ENABLE_PL_HK, info=Info.DISABLE_ENABLE_PL_HK)
@service_provider(CustomServiceList.PWR_CTRL.value)
def pack_acs_ctrl_command(p: ServiceProviderParams):
op_code = p.op_code
q = p.queue_helper
if op_code in OpCodes.OFF:
q.add_log_cmd(f"{Info.OFF}")
q.add_pus_tc(pack_mode_command(PWR_CONTROLLER, Mode.OFF, 0))
elif op_code in OpCodes.ON:
q.add_log_cmd(f"{Info.ON}")
q.add_pus_tc(pack_mode_command(PWR_CONTROLLER, Mode.ON, 0))
elif op_code in OpCodes.NML:
q.add_log_cmd(f"{Info.NML}")
q.add_pus_tc(pack_mode_command(PWR_CONTROLLER, Mode.NORMAL, 0))
elif op_code in OpCodes.SET_PARAMETER:
q.add_log_cmd(f"{Info.SET_PARAMETER}")
set_pwr_ctrl_param(q)
elif op_code in OpCodes.REQUEST_CORE_HK:
q.add_log_cmd(Info.REQUEST_CORE_HK)
q.add_pus_tc(
generate_one_hk_command(make_sid(PWR_CONTROLLER, SetId.CORE_HK_SET))
)
elif op_code in OpCodes.ENABLE_CORE_HK:
interval = float(input("Please specify interval in floating point seconds: "))
q.add_log_cmd(Info.ENABLE_CORE_HK)
cmd_tuple = enable_periodic_hk_command_with_interval(
False, make_sid(PWR_CONTROLLER, SetId.CORE_HK_SET), interval
)
q.add_pus_tc(cmd_tuple[0])
q.add_pus_tc(cmd_tuple[1])
elif op_code in OpCodes.DISABLE_CORE_HK:
q.add_log_cmd(Info.DISABLE_CORE_HK)
q.add_pus_tc(
disable_periodic_hk_command(
False, make_sid(PWR_CONTROLLER, SetId.CORE_HK_SET)
)
)
elif op_code in OpCodes.REQUEST_ENABLE_PL_HK:
q.add_log_cmd(Info.REQUEST_ENABLE_PL_HK)
q.add_pus_tc(
generate_one_hk_command(make_sid(PWR_CONTROLLER, SetId.ENABLE_PL_SET))
)
elif op_code in OpCodes.ENABLE_ENABLE_PL_HK:
interval = float(input("Please specify interval in floating point seconds: "))
q.add_log_cmd(Info.ENABLE_ENABLE_PL_HK)
cmd_tuple = enable_periodic_hk_command_with_interval(
False, make_sid(PWR_CONTROLLER, SetId.ENABLE_PL_SET), interval
)
q.add_pus_tc(cmd_tuple[0])
q.add_pus_tc(cmd_tuple[1])
elif op_code in OpCodes.DISABLE_ENABLE_PL_HK:
q.add_log_cmd(Info.DISABLE_ENABLE_PL_HK)
q.add_pus_tc(
disable_periodic_hk_command(
False, make_sid(PWR_CONTROLLER, SetId.ENABLE_PL_SET)
)
)
def set_pwr_ctrl_param(q: DefaultPusQueueHelper):
for val in ParamId:
print("{:<2}: {:<20}".format(val, PARAM_DICT[val]))
param = int(input(f"Specify parameter to set \n" f""))
match param:
case ParamId.BATTERY_INTERNAL_RESISTANCE:
value = int(input("Specify parameter value to set [Ohm]: "))
q.add_pus_tc(
create_load_param_cmd(
create_scalar_float_parameter(
object_id=PWR_CONTROLLER,
domain_id=0,
unique_id=ParamId.BATTERY_INTERNAL_RESISTANCE,
parameter=value,
)
)
)
case ParamId.BATTERY_MAXIMUM_CAPACITY:
value = int(input("Specify parameter value to set [Ah]: "))
q.add_pus_tc(
create_load_param_cmd(
create_scalar_float_parameter(
object_id=PWR_CONTROLLER,
domain_id=0,
unique_id=ParamId.BATTERY_MAXIMUM_CAPACITY,
parameter=value,
)
)
)
case ParamId.COULOMB_COUNTER_VOLTAGE_UPPER_THRESHOLD:
value = int(input("Specify parameter value to set [V]: "))
q.add_pus_tc(
create_load_param_cmd(
create_scalar_float_parameter(
object_id=PWR_CONTROLLER,
domain_id=0,
unique_id=ParamId.COULOMB_COUNTER_VOLTAGE_UPPER_THRESHOLD,
parameter=value,
)
)
)
case ParamId.MAX_ALLOWED_TIME_DIFF:
value = int(input("Specify parameter value to set [s]: "))
q.add_pus_tc(
create_load_param_cmd(
create_scalar_double_parameter(
object_id=PWR_CONTROLLER,
domain_id=0,
unique_id=ParamId.MAX_ALLOWED_TIME_DIFF,
parameter=value,
)
)
)
case ParamId.PAYLOAD_OP_LIMIT_ON:
value = int(input("Specify parameter value to set [1]: "))
q.add_pus_tc(
create_load_param_cmd(
create_scalar_float_parameter(
object_id=PWR_CONTROLLER,
domain_id=0,
unique_id=ParamId.PAYLOAD_OP_LIMIT_ON,
parameter=value,
)
)
)
case ParamId.PAYLOAD_OP_LIMIT_LOW:
value = int(input("Specify parameter value to set [1]: "))
q.add_pus_tc(
create_load_param_cmd(
create_scalar_float_parameter(
object_id=PWR_CONTROLLER,
domain_id=0,
unique_id=ParamId.PAYLOAD_OP_LIMIT_LOW,
parameter=value,
)
)
)
case ParamId.HIGHER_MODES_LIMIT:
value = int(input("Specify parameter value to set [1]: "))
q.add_pus_tc(
create_load_param_cmd(
create_scalar_float_parameter(
object_id=PWR_CONTROLLER,
domain_id=0,
unique_id=ParamId.HIGHER_MODES_LIMIT,
parameter=value,
)
)
)
def handle_pwr_ctrl_hk_data(
pw: PrintWrapper,
set_id: int,
hk_data: bytes,
packet_time: datetime.datetime,
):
pw.ilog(_LOGGER, f"Received PWR CTRL HK with packet time {packet_time}")
match set_id:
case SetId.CORE_HK_SET:
handle_core_hk_data(pw, hk_data)
case SetId.ENABLE_PL_SET:
handle_enable_pl_data(pw, hk_data)
def handle_core_hk_data(pw: PrintWrapper, hk_data: bytes):
pw.dlog("Received Core HK Set")
fmt_int16 = "!h"
fmt_float = "!f"
inc_len_int16 = struct.calcsize(fmt_int16)
inc_len_float = struct.calcsize(fmt_float)
if len(hk_data) < inc_len_int16 + 2 * inc_len_float:
pw.dlog("Received HK set too small")
return
current_idx = 0
total_battery_current = [
f"{val:d}"
for val in struct.unpack(
fmt_int16, hk_data[current_idx : current_idx + inc_len_int16][0]
)
]
current_idx += inc_len_int16
open_circuit_voltage_charge = [
f"{val:8.3f}"
for val in struct.unpack(
fmt_float, hk_data[current_idx : current_idx + inc_len_float][0]
)
]
current_idx += inc_len_float
coulomb_counter_charge = [
f"{val:d}"
for val in struct.unpack(
fmt_float, hk_data[current_idx : current_idx + inc_len_float]
)
]
current_idx += inc_len_float
pw.dlog(f"Total Battery Current: {total_battery_current} [mA]")
pw.dlog(f"Open Circuit Voltage Charge: {open_circuit_voltage_charge*100} [%]")
pw.dlog(f"Coulomb Counter Charge: {coulomb_counter_charge*100} [%]")
FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=3)
def handle_enable_pl_data(pw: PrintWrapper, hk_data: bytes):
pw.dlog("Received Enable PL HK Set")
fmt_uint16 = "!B"
inc_len_uint16 = struct.calcsize(fmt_uint16)
if len(hk_data) < inc_len_uint16:
pw.dlog("Received HK set too small")
return
current_idx = 0
pl_use_allowed = [
f"{val:d}"
for val in struct.unpack(
fmt_uint16, hk_data[current_idx : current_idx + inc_len_uint16][0]
)
]
current_idx += inc_len_uint16
pw.dlog(f"PL Use Allowed: {pl_use_allowed}")
FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=1)