|
|
|
@ -20,7 +20,8 @@ from tmtccmd.tc.pus_11_tc_sched import (
|
|
|
|
|
create_enable_tc_sched_cmd,
|
|
|
|
|
create_time_tagged_cmd,
|
|
|
|
|
)
|
|
|
|
|
from tmtccmd.tc.pus_200_fsfw_mode import pack_mode_data, Mode, Subservice
|
|
|
|
|
from tmtccmd.pus.s200_fsfw_mode import Subservice
|
|
|
|
|
from tmtccmd.tc.pus_200_fsfw_mode import pack_mode_data, Mode
|
|
|
|
|
from tmtccmd.tc.pus_20_fsfw_param import (
|
|
|
|
|
create_scalar_double_parameter,
|
|
|
|
|
create_load_param_cmd,
|
|
|
|
@ -35,18 +36,22 @@ _LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
class OpCode:
|
|
|
|
|
SWITCH_HPA_ON_PROC = ["0", "proc_hpa"]
|
|
|
|
|
SWITCH_ON = ["2", "on"]
|
|
|
|
|
SWITCH_OFF = ["3", "off"]
|
|
|
|
|
NORMAL_SSR = ["4", "nml_ssr"]
|
|
|
|
|
NORMAL_DRO = ["5", "nml_dro"]
|
|
|
|
|
NORMAL_X8 = ["6", "nml_x8"]
|
|
|
|
|
NORMAL_TX = ["7", "nml_tx"]
|
|
|
|
|
NORMAL_MPA = ["8", "nml_mpa"]
|
|
|
|
|
NORMAL_HPA = ["9", "nml_hpa"]
|
|
|
|
|
SWITCH_ON = "on"
|
|
|
|
|
SWITCH_OFF = "off"
|
|
|
|
|
NORMAL_SSR = "nml_ssr"
|
|
|
|
|
NORMAL_DRO = "nml_dro"
|
|
|
|
|
NORMAL_X8 = "nml_x8"
|
|
|
|
|
NORMAL_TX = "nml_tx"
|
|
|
|
|
NORMAL_MPA = "nml_mpa"
|
|
|
|
|
NORMAL_HPA = "nml_hpa"
|
|
|
|
|
|
|
|
|
|
ENABLE_HK = ["enable_hk"]
|
|
|
|
|
DISABLE_HK = ["disable_hk"]
|
|
|
|
|
REQ_OS_HK = ["hk_os"]
|
|
|
|
|
ENABLE_HK = "enable_hk"
|
|
|
|
|
DISABLE_HK = "disable_hk"
|
|
|
|
|
REQ_OS_HK = "hk_os"
|
|
|
|
|
|
|
|
|
|
UPDATE_I_UPPER_LIMIT = "update_i_upper_limit"
|
|
|
|
|
UPDATE_V_LOWER_LIMIT = "update_v_lower_limit"
|
|
|
|
|
UPDATE_V_UPPER_LIMIT = "update_v_upper_limit"
|
|
|
|
|
|
|
|
|
|
INJECT_SSR_TO_DRO_FAILURE = ["10", "inject_ssr_dro_fault"]
|
|
|
|
|
INJECT_DRO_TO_X8_FAILURE = ["11", "inject_dro_x8_fault"]
|
|
|
|
@ -70,6 +75,9 @@ class Info:
|
|
|
|
|
SWITCH_HPA_ON_PROC = "Switch HPA on procedure"
|
|
|
|
|
ENABLE_HK = "Enable HK"
|
|
|
|
|
DISABLE_HK = "Disable HK"
|
|
|
|
|
UPDATE_I_UPPER_LIMIT = "Update upper current parameter"
|
|
|
|
|
UPDATE_V_LOWER_LIMIT = "Update lower voltage parameter"
|
|
|
|
|
UPDATE_V_UPPER_LIMIT = "Update upper voltage parameter"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class SetId(enum.IntEnum):
|
|
|
|
@ -119,7 +127,7 @@ class SubmodeForNormalMode(enum.IntEnum):
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ParamIds(enum.IntEnum):
|
|
|
|
|
class ParamId(enum.IntEnum):
|
|
|
|
|
NEG_V_LOWER_BOUND = 0
|
|
|
|
|
NEG_V_UPPER_BOUND = 1
|
|
|
|
|
|
|
|
|
@ -157,6 +165,15 @@ class ParamIds(enum.IntEnum):
|
|
|
|
|
INJECT_ALL_ON_FAILURE = 35
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class DevSelect(enum.IntEnum):
|
|
|
|
|
SSR_NEG_V = 0
|
|
|
|
|
DRO = 1
|
|
|
|
|
X8 = 2
|
|
|
|
|
TX = 3
|
|
|
|
|
MPA = 4
|
|
|
|
|
HPA = 5
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@tmtc_definitions_provider
|
|
|
|
|
def add_pl_pcdu_cmds(defs: TmtcDefinitionWrapper):
|
|
|
|
|
oce = OpCodeEntry()
|
|
|
|
@ -171,6 +188,9 @@ def add_pl_pcdu_cmds(defs: TmtcDefinitionWrapper):
|
|
|
|
|
oce.add(keys=OpCode.NORMAL_HPA, info=Info.NORMAL_HPA)
|
|
|
|
|
oce.add(keys=OpCode.REQ_OS_HK, info=Info.REQ_OS_HK)
|
|
|
|
|
oce.add(keys=OpCode.ENABLE_HK, info=Info.ENABLE_HK)
|
|
|
|
|
oce.add(keys=OpCode.UPDATE_V_LOWER_LIMIT, info=Info.UPDATE_V_LOWER_LIMIT)
|
|
|
|
|
oce.add(keys=OpCode.UPDATE_V_UPPER_LIMIT, info=Info.UPDATE_V_UPPER_LIMIT)
|
|
|
|
|
oce.add(keys=OpCode.UPDATE_I_UPPER_LIMIT, info=Info.UPDATE_I_UPPER_LIMIT)
|
|
|
|
|
oce.add(
|
|
|
|
|
keys=OpCode.INJECT_SSR_TO_DRO_FAILURE,
|
|
|
|
|
info="Inject failure SSR to DRO transition",
|
|
|
|
@ -198,9 +218,9 @@ def add_pl_pcdu_cmds(defs: TmtcDefinitionWrapper):
|
|
|
|
|
def pack_pl_pcdu_commands( # noqa C901: Complexity is okay here.
|
|
|
|
|
q: DefaultPusQueueHelper, op_code: str
|
|
|
|
|
): # noqa C901: Complexity is okay here.
|
|
|
|
|
if op_code in OpCode.SWITCH_ON:
|
|
|
|
|
if op_code == OpCode.SWITCH_ON:
|
|
|
|
|
pack_pl_pcdu_mode_cmd(q=q, info=Info.SWITCH_ON, mode=Mode.ON, submode=0)
|
|
|
|
|
if op_code in OpCode.SWITCH_OFF:
|
|
|
|
|
if op_code == OpCode.SWITCH_OFF:
|
|
|
|
|
pack_pl_pcdu_mode_cmd(q=q, info=Info.SWITCH_OFF, mode=Mode.OFF, submode=0)
|
|
|
|
|
if op_code in OpCode.ENABLE_HK:
|
|
|
|
|
interval = float(
|
|
|
|
@ -212,13 +232,13 @@ def pack_pl_pcdu_commands( # noqa C901: Complexity is okay here.
|
|
|
|
|
q.add_log_cmd(f"Enable PL PCDU HK with interval of {interval} seconds")
|
|
|
|
|
for cmd in cmds:
|
|
|
|
|
q.add_pus_tc(cmd)
|
|
|
|
|
if op_code in OpCode.DISABLE_HK:
|
|
|
|
|
if op_code == OpCode.DISABLE_HK:
|
|
|
|
|
cmd = disable_periodic_hk_command(
|
|
|
|
|
diag=True, sid=make_sid(PL_PCDU_ID, SetId.ADC)
|
|
|
|
|
)
|
|
|
|
|
q.add_log_cmd("Disabling PL PCDU HK")
|
|
|
|
|
q.add_pus_tc(cmd)
|
|
|
|
|
if op_code in OpCode.NORMAL_SSR:
|
|
|
|
|
if op_code == OpCode.NORMAL_SSR:
|
|
|
|
|
pack_pl_pcdu_mode_cmd(
|
|
|
|
|
q=q,
|
|
|
|
|
info=Info.NORMAL_SSR,
|
|
|
|
@ -227,54 +247,90 @@ def pack_pl_pcdu_commands( # noqa C901: Complexity is okay here.
|
|
|
|
|
NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON
|
|
|
|
|
),
|
|
|
|
|
)
|
|
|
|
|
if op_code in OpCode.NORMAL_DRO:
|
|
|
|
|
if op_code == OpCode.NORMAL_DRO:
|
|
|
|
|
pack_pl_pcdu_mode_cmd(
|
|
|
|
|
q=q,
|
|
|
|
|
info=Info.NORMAL_DRO,
|
|
|
|
|
mode=Mode.NORMAL,
|
|
|
|
|
submode=submode_mask_to_submode(NormalSubmodesMask.DRO_ON),
|
|
|
|
|
)
|
|
|
|
|
if op_code in OpCode.NORMAL_X8:
|
|
|
|
|
if op_code == OpCode.NORMAL_X8:
|
|
|
|
|
pack_pl_pcdu_mode_cmd(
|
|
|
|
|
q=q,
|
|
|
|
|
info=Info.NORMAL_X8,
|
|
|
|
|
mode=Mode.NORMAL,
|
|
|
|
|
submode=submode_mask_to_submode(NormalSubmodesMask.X8_ON),
|
|
|
|
|
)
|
|
|
|
|
if op_code in OpCode.NORMAL_TX:
|
|
|
|
|
if op_code == OpCode.NORMAL_TX:
|
|
|
|
|
pack_pl_pcdu_mode_cmd(
|
|
|
|
|
q=q,
|
|
|
|
|
info=Info.NORMAL_TX,
|
|
|
|
|
mode=Mode.NORMAL,
|
|
|
|
|
submode=submode_mask_to_submode(NormalSubmodesMask.TX_ON),
|
|
|
|
|
)
|
|
|
|
|
if op_code in OpCode.NORMAL_MPA:
|
|
|
|
|
if op_code == OpCode.NORMAL_MPA:
|
|
|
|
|
pack_pl_pcdu_mode_cmd(
|
|
|
|
|
q=q,
|
|
|
|
|
info=Info.NORMAL_MPA,
|
|
|
|
|
mode=Mode.NORMAL,
|
|
|
|
|
submode=submode_mask_to_submode(NormalSubmodesMask.MPA_ON),
|
|
|
|
|
)
|
|
|
|
|
if op_code in OpCode.NORMAL_HPA:
|
|
|
|
|
if op_code == OpCode.NORMAL_HPA:
|
|
|
|
|
pack_pl_pcdu_mode_cmd(
|
|
|
|
|
q=q,
|
|
|
|
|
info=Info.NORMAL_HPA,
|
|
|
|
|
mode=Mode.NORMAL,
|
|
|
|
|
submode=submode_mask_to_submode(NormalSubmodesMask.HPA_ON),
|
|
|
|
|
)
|
|
|
|
|
if op_code in OpCode.REQ_OS_HK:
|
|
|
|
|
if op_code == OpCode.REQ_OS_HK:
|
|
|
|
|
q.add_log_cmd(f"PL PCDU: {Info.REQ_OS_HK}")
|
|
|
|
|
q.add_pus_tc(
|
|
|
|
|
generate_one_diag_command(
|
|
|
|
|
sid=make_sid(object_id=PL_PCDU_ID, set_id=SetId.ADC)
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
if op_code in OpCode.SWITCH_HPA_ON_PROC:
|
|
|
|
|
if op_code == OpCode.UPDATE_I_UPPER_LIMIT:
|
|
|
|
|
q.add_log_cmd(Info.UPDATE_I_UPPER_LIMIT)
|
|
|
|
|
print("Select device to update lower current limit for: ")
|
|
|
|
|
param_id = dev_select_to_upper_i_update_param_id(dev_select_prompt(True))
|
|
|
|
|
new_param_value = float(
|
|
|
|
|
input("Please specify new parameter value as a double: ")
|
|
|
|
|
)
|
|
|
|
|
q.add_pus_tc(
|
|
|
|
|
create_load_param_cmd(
|
|
|
|
|
create_scalar_double_parameter(PL_PCDU_ID, 0, param_id, new_param_value)
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
if op_code == OpCode.UPDATE_V_LOWER_LIMIT:
|
|
|
|
|
q.add_log_cmd(Info.UPDATE_V_LOWER_LIMIT)
|
|
|
|
|
print("Select device to update lower voltage limit for: ")
|
|
|
|
|
param_id = dev_select_to_lower_u_update_param_id(dev_select_prompt(False))
|
|
|
|
|
new_param_value = float(
|
|
|
|
|
input("Please specify new parameter value as a double: ")
|
|
|
|
|
)
|
|
|
|
|
q.add_pus_tc(
|
|
|
|
|
create_load_param_cmd(
|
|
|
|
|
create_scalar_double_parameter(PL_PCDU_ID, 0, param_id, new_param_value)
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
if op_code == OpCode.UPDATE_V_UPPER_LIMIT:
|
|
|
|
|
q.add_log_cmd(Info.UPDATE_V_UPPER_LIMIT)
|
|
|
|
|
print("Select device to update upper voltage limit for: ")
|
|
|
|
|
param_id = dev_select_to_upper_u_update_param_id(dev_select_prompt(False))
|
|
|
|
|
new_param_value = float(
|
|
|
|
|
input("Please specify new parameter value as a double: ")
|
|
|
|
|
)
|
|
|
|
|
q.add_pus_tc(
|
|
|
|
|
create_load_param_cmd(
|
|
|
|
|
create_scalar_double_parameter(PL_PCDU_ID, 0, param_id, new_param_value)
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
if op_code == OpCode.SWITCH_HPA_ON_PROC:
|
|
|
|
|
hpa_on_procedure(q)
|
|
|
|
|
if op_code in OpCode.INJECT_ALL_ON_FAILURE:
|
|
|
|
|
if op_code == OpCode.INJECT_ALL_ON_FAILURE:
|
|
|
|
|
pack_failure_injection_cmd(
|
|
|
|
|
q=q,
|
|
|
|
|
param_id=ParamIds.INJECT_ALL_ON_FAILURE,
|
|
|
|
|
param_id=ParamId.INJECT_ALL_ON_FAILURE,
|
|
|
|
|
print_str="All On",
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
@ -510,3 +566,72 @@ def handle_plpcdu_hk(pw: PrintWrapper, set_id: int, hk_data: bytes):
|
|
|
|
|
FsfwTmTcPrinter.get_validity_buffer(
|
|
|
|
|
validity_buffer=hk_data[current_idx:], num_vars=3
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def dev_select_prompt(skip_ssr: bool) -> DevSelect:
|
|
|
|
|
while True:
|
|
|
|
|
for dev in DevSelect:
|
|
|
|
|
if skip_ssr and dev == DevSelect.SSR_NEG_V:
|
|
|
|
|
continue
|
|
|
|
|
print(f"{dev}: {dev.name}")
|
|
|
|
|
dev_select = int(input("Select device by index: "))
|
|
|
|
|
try:
|
|
|
|
|
return DevSelect(dev_select)
|
|
|
|
|
except IndexError:
|
|
|
|
|
_LOGGER.warn("Invalid paramter index, try again.")
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def dev_select_to_upper_i_update_param_id(dev_select: DevSelect) -> ParamId:
|
|
|
|
|
param_id = None
|
|
|
|
|
if dev_select == DevSelect.DRO:
|
|
|
|
|
param_id = ParamId.DRO_I_UPPER_BOUND
|
|
|
|
|
elif dev_select == DevSelect.X8:
|
|
|
|
|
param_id = ParamId.X8_I_UPPER_BOUND
|
|
|
|
|
elif dev_select == DevSelect.TX:
|
|
|
|
|
param_id = ParamId.TX_I_UPPER_BOUND
|
|
|
|
|
elif dev_select == DevSelect.MPA:
|
|
|
|
|
param_id = ParamId.MPA_I_UPPER_BOUND
|
|
|
|
|
elif dev_select == DevSelect.HPA:
|
|
|
|
|
param_id = ParamId.HPA_I_UPPER_BOUND
|
|
|
|
|
if param_id is None:
|
|
|
|
|
raise ValueError("invalid parameter ID")
|
|
|
|
|
return param_id
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def dev_select_to_lower_u_update_param_id(dev_select: DevSelect) -> ParamId:
|
|
|
|
|
param_id = None
|
|
|
|
|
if dev_select == DevSelect.SSR_NEG_V:
|
|
|
|
|
param_id = ParamId.NEG_V_LOWER_BOUND
|
|
|
|
|
if dev_select == DevSelect.DRO:
|
|
|
|
|
param_id = ParamId.DRO_U_LOWER_BOUND
|
|
|
|
|
elif dev_select == DevSelect.X8:
|
|
|
|
|
param_id = ParamId.X8_U_LOWER_BOUND
|
|
|
|
|
elif dev_select == DevSelect.TX:
|
|
|
|
|
param_id = ParamId.TX_U_LOWER_BOUND
|
|
|
|
|
elif dev_select == DevSelect.MPA:
|
|
|
|
|
param_id = ParamId.MPA_U_LOWER_BOUND
|
|
|
|
|
elif dev_select == DevSelect.HPA:
|
|
|
|
|
param_id = ParamId.HPA_U_LOWER_BOUND
|
|
|
|
|
if param_id is None:
|
|
|
|
|
raise ValueError("invalid parameter ID")
|
|
|
|
|
return param_id
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def dev_select_to_upper_u_update_param_id(dev_select: DevSelect) -> ParamId:
|
|
|
|
|
param_id = None
|
|
|
|
|
if dev_select == DevSelect.SSR_NEG_V:
|
|
|
|
|
param_id = ParamId.NEG_V_UPPER_BOUND
|
|
|
|
|
if dev_select == DevSelect.DRO:
|
|
|
|
|
param_id = ParamId.DRO_U_UPPER_BOUND
|
|
|
|
|
elif dev_select == DevSelect.X8:
|
|
|
|
|
param_id = ParamId.X8_U_UPPER_BOUND
|
|
|
|
|
elif dev_select == DevSelect.TX:
|
|
|
|
|
param_id = ParamId.TX_U_UPPER_BOUND
|
|
|
|
|
elif dev_select == DevSelect.MPA:
|
|
|
|
|
param_id = ParamId.MPA_U_UPPER_BOUND
|
|
|
|
|
elif dev_select == DevSelect.HPA:
|
|
|
|
|
param_id = ParamId.HPA_U_UPPER_BOUND
|
|
|
|
|
if param_id is None:
|
|
|
|
|
raise ValueError("invalid parameter ID")
|
|
|
|
|
return param_id
|