From 0c9547eb3ac2be0977d82ef84848ec9d4d6a0e03 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Tue, 24 May 2022 17:01:35 +0200 Subject: [PATCH 1/3] pl pcdu procedure --- pus_tc/cmd_definitions.py | 97 ++---------- pus_tc/devs/plpcdu.py | 325 +++++++++++++++++++++++++++++++++----- pus_tc/system/core.py | 4 +- 3 files changed, 295 insertions(+), 131 deletions(-) diff --git a/pus_tc/cmd_definitions.py b/pus_tc/cmd_definitions.py index 16bd542..63e4df0 100644 --- a/pus_tc/cmd_definitions.py +++ b/pus_tc/cmd_definitions.py @@ -1,7 +1,3 @@ -from pus_tc.devs.gps import GpsOpCodes -from pus_tc.devs.pcdu import add_pcdu_cmds -from pus_tc.devs.rad_sensor import add_rad_sens_cmds -from pus_tc.system.core import add_core_controller_definitions from tmtccmd.config import ( add_op_code_entry, add_service_op_code_entry, @@ -9,12 +5,19 @@ from tmtccmd.config import ( ServiceOpCodeDictT, OpCodeDictKeys, ) -from config.definitions import CustomServiceList +from tmtccmd.config.globals import get_default_service_op_code_dict + +from pus_tc.devs.gps import GpsOpCodes +from pus_tc.devs.pcdu import add_pcdu_cmds +from pus_tc.devs.plpcdu import add_pl_pcdu_cmds +from pus_tc.devs.rad_sensor import add_rad_sens_cmds +from pus_tc.system.core import add_core_controller_definitions from pus_tc.devs.heater import add_heater_cmds from pus_tc.devs.rtd import specify_rtd_cmds from pus_tc.devs.reaction_wheels import add_rw_cmds from pus_tc.devs.bpx_batt import BpxOpCodes -from tmtccmd.config.globals import get_default_service_op_code_dict + +from config.definitions import CustomServiceList def get_eive_service_op_code_dict() -> ServiceOpCodeDictT: @@ -322,88 +325,6 @@ def add_bpx_cmd_definitions(cmd_dict: ServiceOpCodeDictT): ) -def add_pl_pcdu_cmds(cmd_dict: ServiceOpCodeDictT): - from pus_tc.devs.plpcdu import OpCodes, Info - op_code_dict = dict() - add_op_code_entry( - op_code_dict=op_code_dict, keys=OpCodes.SWITCH_ON, info=Info.SWITCH_ON - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=OpCodes.NORMAL_SSR, - info=Info.NORMAL_SSR, - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=OpCodes.NORMAL_DRO, - info=Info.NORMAL_DRO, - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=OpCodes.NORMAL_X8, - info=Info.NORMAL_X8, - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=OpCodes.NORMAL_TX, - info=Info.NORMAL_TX, - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=OpCodes.NORMAL_MPA, - info=Info.NORMAL_MPA, - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=OpCodes.NORMAL_HPA, - info=Info.NORMAL_HPA, - ) - add_op_code_entry( - op_code_dict=op_code_dict, keys=OpCodes.SWITCH_OFF, info=Info.SWITCH_OFF - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=OpCodes.UPDATE_DRO_TO_X8_WAIT, - info="Update DRO to X8 wait time", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=OpCodes.INJECT_SSR_TO_DRO_FAILURE, - info="Inject failure SSR to DRO transition", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=OpCodes.INJECT_DRO_TO_X8_FAILURE, - info="Inject failure in DRO to X8 transition", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=OpCodes.INJECT_X8_TO_TX_FAILURE, - info="Inject failure in X8 to TX transition", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=OpCodes.INJECT_TX_TO_MPA_FAILURE, - info="Inject failure in TX to MPA transition", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=OpCodes.INJECT_MPA_TO_HPA_FAILURE, - info="Inject failure in MPA to HPA transition", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=OpCodes.INJECT_ALL_ON_FAILURE, - info="Inject failure in all on mode", - ) - add_service_op_code_entry( - srv_op_code_dict=cmd_dict, - name=CustomServiceList.PL_PCDU.value, - info="PL PCDU", - op_code_entry=op_code_dict, - ) - - def add_time_cmds(cmd_dict: ServiceOpCodeDictT): from pus_tc.system.time import OpCodes, Info diff --git a/pus_tc/devs/plpcdu.py b/pus_tc/devs/plpcdu.py index 28775d0..05367e3 100644 --- a/pus_tc/devs/plpcdu.py +++ b/pus_tc/devs/plpcdu.py @@ -1,8 +1,20 @@ import enum +import struct +import time from typing import Optional -from tmtccmd.config import QueueCommands +from config.definitions import CustomServiceList +from tmtccmd.config import ( + QueueCommands, + ServiceOpCodeDictT, + add_op_code_entry, + add_service_op_code_entry, +) from tmtccmd.tc.definitions import TcQueueT +from tmtccmd.tc.pus_11_tc_sched import ( + generate_enable_tc_sched_cmd, + generate_time_tagged_cmd, +) from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes, Subservices from tmtccmd.tc.pus_20_params import ( pack_scalar_double_param_app_data, @@ -26,18 +38,14 @@ class OpCodes: NORMAL_MPA = ["6", "nml-mpa"] NORMAL_HPA = ["7", "nml-hpa"] - INJECT_SSR_TO_DRO_FAILURE = ["10", "inject-ssr-dro-fault"] - INJECT_DRO_TO_X8_FAILURE = ["11", "inject-dro-x8-fault"] - INJECT_X8_TO_TX_FAILURE = ["12", "inject-x8-tx-fault"] - INJECT_TX_TO_MPA_FAILURE = ["13", "inject-tx-mpa-fault"] - INJECT_MPA_TO_HPA_FAILURE = ["14", "inject-mpa-hpa-fault"] - INJECT_ALL_ON_FAILURE = ["15", "inject-all-on-fault"] + SWITCH_HPA_ON_PROC = ["12", "proc-hpa"] - # The following commands might become deprecated in the future - UPDATE_DRO_TO_X8_WAIT = ["128", "dro-to-x8-wait"] - UPDATE_X8_TO_TX_WAIT_TIME = ["129", "x8-to-tx-wait"] - UPDATE_TX_TO_MPA_WAIT_TIME = ["130", "tx-to-mpa-wait"] - UPDATE_MPA_TO_HPA_WAIT_TIME = ["131", "mpa-to-hpa-wait"] + INJECT_SSR_TO_DRO_FAILURE = ["15", "inject-ssr-dro-fault"] + INJECT_DRO_TO_X8_FAILURE = ["16", "inject-dro-x8-fault"] + INJECT_X8_TO_TX_FAILURE = ["17", "inject-x8-tx-fault"] + INJECT_TX_TO_MPA_FAILURE = ["18", "inject-tx-mpa-fault"] + INJECT_MPA_TO_HPA_FAILURE = ["19", "inject-mpa-hpa-fault"] + INJECT_ALL_ON_FAILURE = ["20", "inject-all-on-fault"] class Info: @@ -51,6 +59,8 @@ class Info: NORMAL_MPA = f"{NORMAL}, MPA on" NORMAL_HPA = f"{NORMAL}, HPA on" + SWITCH_HPA_ON_PROC = "Full Procedure to switch HPA on" + class NormalSubmodesMask(enum.IntEnum): SOLID_STATE_RELAYS_ADC_ON = 0 @@ -99,6 +109,126 @@ class ParamIds(enum.IntEnum): INJECT_ALL_ON_FAILURE = 35 +def add_pl_pcdu_cmds(cmd_dict: ServiceOpCodeDictT): + op_code_dict = dict() + add_op_code_entry( + op_code_dict=op_code_dict, keys=OpCodes.SWITCH_ON, info=Info.SWITCH_ON + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=OpCodes.NORMAL_SSR, + info=Info.NORMAL_SSR, + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=OpCodes.NORMAL_DRO, + info=Info.NORMAL_DRO, + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=OpCodes.NORMAL_X8, + info=Info.NORMAL_X8, + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=OpCodes.NORMAL_TX, + info=Info.NORMAL_TX, + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=OpCodes.NORMAL_MPA, + info=Info.NORMAL_MPA, + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=OpCodes.NORMAL_HPA, + info=Info.NORMAL_HPA, + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=OpCodes.SWITCH_HPA_ON_PROC, + info=Info.SWITCH_HPA_ON_PROC, + ) + add_op_code_entry( + op_code_dict=op_code_dict, keys=OpCodes.SWITCH_OFF, info=Info.SWITCH_OFF + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=OpCodes.INJECT_SSR_TO_DRO_FAILURE, + info="Inject failure SSR to DRO transition", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=OpCodes.INJECT_DRO_TO_X8_FAILURE, + info="Inject failure in DRO to X8 transition", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=OpCodes.INJECT_X8_TO_TX_FAILURE, + info="Inject failure in X8 to TX transition", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=OpCodes.INJECT_TX_TO_MPA_FAILURE, + info="Inject failure in TX to MPA transition", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=OpCodes.INJECT_MPA_TO_HPA_FAILURE, + info="Inject failure in MPA to HPA transition", + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=OpCodes.INJECT_ALL_ON_FAILURE, + info="Inject failure in all on mode", + ) + add_service_op_code_entry( + srv_op_code_dict=cmd_dict, + name=CustomServiceList.PL_PCDU.value, + info="PL PCDU", + op_code_entry=op_code_dict, + ) + + +def submode_mask_to_submode(on_tgt: NormalSubmodesMask) -> int: + if on_tgt == NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON: + return 1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON + if on_tgt == NormalSubmodesMask.DRO_ON: + return 1 << NormalSubmodesMask.DRO_ON | ( + 1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON + ) + if on_tgt == NormalSubmodesMask.X8_ON: + return ( + 1 << NormalSubmodesMask.DRO_ON + | (1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON) + | (1 << NormalSubmodesMask.X8_ON) + ) + if on_tgt == NormalSubmodesMask.TX_ON: + return ( + 1 << NormalSubmodesMask.DRO_ON + | (1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON) + | (1 << NormalSubmodesMask.X8_ON) + | (1 << NormalSubmodesMask.TX_ON) + ) + if on_tgt == NormalSubmodesMask.MPA_ON: + return ( + 1 << NormalSubmodesMask.DRO_ON + | (1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON) + | (1 << NormalSubmodesMask.X8_ON) + | (1 << NormalSubmodesMask.TX_ON) + | (1 << NormalSubmodesMask.MPA_ON) + ) + if on_tgt == NormalSubmodesMask.HPA_ON: + return ( + 1 << NormalSubmodesMask.DRO_ON + | (1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON) + | (1 << NormalSubmodesMask.X8_ON) + | (1 << NormalSubmodesMask.TX_ON) + | (1 << NormalSubmodesMask.MPA_ON) + | (1 << NormalSubmodesMask.HPA_ON) + ) + + def pack_pl_pcdu_commands(tc_queue: TcQueueT, op_code: str): if op_code in OpCodes.SWITCH_ON: pack_pl_pcdu_mode_cmd( @@ -113,68 +243,179 @@ def pack_pl_pcdu_commands(tc_queue: TcQueueT, op_code: str): tc_queue=tc_queue, info=Info.NORMAL_SSR, mode=Modes.NORMAL, - submode=(1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON), + submode=submode_mask_to_submode( + NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON + ), ) if op_code in OpCodes.NORMAL_DRO: pack_pl_pcdu_mode_cmd( tc_queue=tc_queue, info=Info.NORMAL_DRO, mode=Modes.NORMAL, - submode=( - 1 << NormalSubmodesMask.DRO_ON - | (1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON) - ), + submode=submode_mask_to_submode(NormalSubmodesMask.DRO_ON), ) if op_code in OpCodes.NORMAL_X8: pack_pl_pcdu_mode_cmd( tc_queue=tc_queue, info=Info.NORMAL_X8, mode=Modes.NORMAL, - submode=( - 1 << NormalSubmodesMask.DRO_ON - | (1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON) - | (1 << NormalSubmodesMask.X8_ON) - ), + submode=submode_mask_to_submode(NormalSubmodesMask.X8_ON), ) if op_code in OpCodes.NORMAL_TX: pack_pl_pcdu_mode_cmd( tc_queue=tc_queue, info=Info.NORMAL_TX, mode=Modes.NORMAL, - submode=( - 1 << NormalSubmodesMask.DRO_ON - | (1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON) - | (1 << NormalSubmodesMask.X8_ON) - | (1 << NormalSubmodesMask.TX_ON) - ), + submode=submode_mask_to_submode(NormalSubmodesMask.TX_ON), ) if op_code in OpCodes.NORMAL_MPA: pack_pl_pcdu_mode_cmd( tc_queue=tc_queue, info=Info.NORMAL_MPA, mode=Modes.NORMAL, - submode=( - 1 << NormalSubmodesMask.DRO_ON - | (1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON) - | (1 << NormalSubmodesMask.X8_ON) - | (1 << NormalSubmodesMask.TX_ON) - | (1 << NormalSubmodesMask.MPA_ON) - ), + submode=submode_mask_to_submode(NormalSubmodesMask.MPA_ON), ) if op_code in OpCodes.NORMAL_HPA: pack_pl_pcdu_mode_cmd( tc_queue=tc_queue, info=Info.NORMAL_HPA, mode=Modes.NORMAL, - submode=( - 1 << NormalSubmodesMask.DRO_ON - | (1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON) - | (1 << NormalSubmodesMask.X8_ON) - | (1 << NormalSubmodesMask.TX_ON) - | (1 << NormalSubmodesMask.MPA_ON) - | (1 << NormalSubmodesMask.HPA_ON) + submode=submode_mask_to_submode(NormalSubmodesMask.HPA_ON), + ) + if op_code in OpCodes.SWITCH_HPA_ON_PROC: + while True: + delay_dro_to_x8 = input( + "Please specify delay between DRO and switching " + "on X8 [default 900]: " + ) + if delay_dro_to_x8 == "": + delay_dro_to_x8 = 900 + break + if delay_dro_to_x8.isdigit(): + print("Invalid value, not a number") + continue + delay_dro_to_x8 = int(delay_dro_to_x8) + if delay_dro_to_x8 < 0: + print("Invalid number") + break + tc_queue.appendleft( + ( + QueueCommands.PRINT, + f"Starting procedure to switch on PL PCDU HPA with DRO to X8 delay of {delay_dro_to_x8} seconds", + ) + ) + pl_pcdu_on = PusTelecommand( + service=200, + subservice=Subservices.TC_MODE_COMMAND, + app_data=pack_mode_data(object_id=PL_PCDU_ID, mode=Modes.ON, submode=0), + ) + ssr_on = PusTelecommand( + service=200, + subservice=Subservices.TC_MODE_COMMAND, + app_data=pack_mode_data( + object_id=PL_PCDU_ID, + mode=Modes.NORMAL, + submode=submode_mask_to_submode( + NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON + ), ), ) + dro_on = PusTelecommand( + service=200, + subservice=Subservices.TC_MODE_COMMAND, + app_data=pack_mode_data( + object_id=PL_PCDU_ID, + mode=Modes.NORMAL, + submode=submode_mask_to_submode(NormalSubmodesMask.DRO_ON), + ), + ) + x8_on = PusTelecommand( + service=200, + subservice=Subservices.TC_MODE_COMMAND, + app_data=pack_mode_data( + object_id=PL_PCDU_ID, + mode=Modes.NORMAL, + submode=submode_mask_to_submode(NormalSubmodesMask.X8_ON), + ), + ) + tx_on = PusTelecommand( + service=200, + subservice=Subservices.TC_MODE_COMMAND, + app_data=pack_mode_data( + object_id=PL_PCDU_ID, + mode=Modes.NORMAL, + submode=submode_mask_to_submode(NormalSubmodesMask.TX_ON), + ), + ) + mpa_on = PusTelecommand( + service=200, + subservice=Subservices.TC_MODE_COMMAND, + app_data=pack_mode_data( + object_id=PL_PCDU_ID, + mode=Modes.NORMAL, + submode=submode_mask_to_submode(NormalSubmodesMask.MPA_ON), + ), + ) + hpa_on = PusTelecommand( + service=200, + subservice=Subservices.TC_MODE_COMMAND, + app_data=pack_mode_data( + object_id=PL_PCDU_ID, + mode=Modes.NORMAL, + submode=submode_mask_to_submode(NormalSubmodesMask.HPA_ON), + ), + ) + current_time = time.time() + + enb_sched = generate_enable_tc_sched_cmd(ssc=0) + + sched_time = current_time + 10 + tc_queue.appendleft(enb_sched.pack_command_tuple()) + tagged_on_cmd = generate_time_tagged_cmd( + release_time=struct.pack("!I", sched_time), + tc_to_insert=pl_pcdu_on, + ssc=1, + ) + tc_queue.appendleft(tagged_on_cmd.pack_command_tuple()) + + sched_time += 5 + tagged_ssr_cmd = generate_time_tagged_cmd( + release_time=struct.pack("!I", sched_time), + tc_to_insert=ssr_on, + ssc=2, + ) + tc_queue.appendleft(tagged_ssr_cmd.pack_command_tuple()) + + sched_time += 5 + tagged_dro_cmd = generate_time_tagged_cmd( + release_time=struct.pack("!I", sched_time), tc_to_insert=dro_on, ssc=3 + ) + tc_queue.appendleft(tagged_dro_cmd.pack_command_tuple()) + + sched_time += delay_dro_to_x8 + tagged_x8_cmd = generate_time_tagged_cmd( + release_time=struct.pack("!I", sched_time), tc_to_insert=x8_on, ssc=4 + ) + tc_queue.appendleft(tagged_x8_cmd.pack_command_tuple()) + + sched_time += 5 + tagged_tx_cmd = generate_time_tagged_cmd( + release_time=struct.pack("!I", sched_time), tc_to_insert=tx_on, ssc=5 + ) + tc_queue.appendleft(tagged_tx_cmd.pack_command_tuple()) + + sched_time += 5 + tagged_mpa_cmd = generate_time_tagged_cmd( + release_time=struct.pack("!I", sched_time), tc_to_insert=mpa_on, ssc=6 + ) + tc_queue.appendleft(tagged_mpa_cmd.pack_command_tuple()) + + sched_time += 5 + tagged_hpa_cmd = generate_time_tagged_cmd( + release_time=struct.pack("!I", sched_time), tc_to_insert=hpa_on, ssc=7 + ) + tc_queue.appendleft(tagged_hpa_cmd.pack_command_tuple()) + if op_code in OpCodes.INJECT_ALL_ON_FAILURE: pack_failure_injection_cmd( tc_queue=tc_queue, diff --git a/pus_tc/system/core.py b/pus_tc/system/core.py index ece17c2..f44ad87 100644 --- a/pus_tc/system/core.py +++ b/pus_tc/system/core.py @@ -133,7 +133,9 @@ def pack_core_commands(tc_queue: TcQueueT, op_code: str): ) if op_code in OpCodes.REBOOT_FULL: tc_queue.appendleft((QueueCommands.PRINT, f"Core Command: {Info.REBOOT_FULL}")) - cmd = generate_action_command(object_id=CORE_CONTROLLER_ID, action_id=ActionIds.FULL_REBOOT) + cmd = generate_action_command( + object_id=CORE_CONTROLLER_ID, action_id=ActionIds.FULL_REBOOT + ) tc_queue.appendleft(cmd.pack_command_tuple()) if op_code in OpCodes.XSC_REBOOT_SELF: perform_reboot_cmd(tc_queue=tc_queue, reboot_self=True) From 96c6feb7469418d4f985b57b6c56d9a22b3b02fa Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Tue, 24 May 2022 17:04:03 +0200 Subject: [PATCH 2/3] cleaned up a bit --- pus_tc/devs/plpcdu.py | 334 +++++++++++++++++++++--------------------- 1 file changed, 163 insertions(+), 171 deletions(-) diff --git a/pus_tc/devs/plpcdu.py b/pus_tc/devs/plpcdu.py index 05367e3..2ff3946 100644 --- a/pus_tc/devs/plpcdu.py +++ b/pus_tc/devs/plpcdu.py @@ -190,45 +190,6 @@ def add_pl_pcdu_cmds(cmd_dict: ServiceOpCodeDictT): ) -def submode_mask_to_submode(on_tgt: NormalSubmodesMask) -> int: - if on_tgt == NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON: - return 1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON - if on_tgt == NormalSubmodesMask.DRO_ON: - return 1 << NormalSubmodesMask.DRO_ON | ( - 1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON - ) - if on_tgt == NormalSubmodesMask.X8_ON: - return ( - 1 << NormalSubmodesMask.DRO_ON - | (1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON) - | (1 << NormalSubmodesMask.X8_ON) - ) - if on_tgt == NormalSubmodesMask.TX_ON: - return ( - 1 << NormalSubmodesMask.DRO_ON - | (1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON) - | (1 << NormalSubmodesMask.X8_ON) - | (1 << NormalSubmodesMask.TX_ON) - ) - if on_tgt == NormalSubmodesMask.MPA_ON: - return ( - 1 << NormalSubmodesMask.DRO_ON - | (1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON) - | (1 << NormalSubmodesMask.X8_ON) - | (1 << NormalSubmodesMask.TX_ON) - | (1 << NormalSubmodesMask.MPA_ON) - ) - if on_tgt == NormalSubmodesMask.HPA_ON: - return ( - 1 << NormalSubmodesMask.DRO_ON - | (1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON) - | (1 << NormalSubmodesMask.X8_ON) - | (1 << NormalSubmodesMask.TX_ON) - | (1 << NormalSubmodesMask.MPA_ON) - | (1 << NormalSubmodesMask.HPA_ON) - ) - - def pack_pl_pcdu_commands(tc_queue: TcQueueT, op_code: str): if op_code in OpCodes.SWITCH_ON: pack_pl_pcdu_mode_cmd( @@ -283,138 +244,7 @@ def pack_pl_pcdu_commands(tc_queue: TcQueueT, op_code: str): submode=submode_mask_to_submode(NormalSubmodesMask.HPA_ON), ) if op_code in OpCodes.SWITCH_HPA_ON_PROC: - while True: - delay_dro_to_x8 = input( - "Please specify delay between DRO and switching " - "on X8 [default 900]: " - ) - if delay_dro_to_x8 == "": - delay_dro_to_x8 = 900 - break - if delay_dro_to_x8.isdigit(): - print("Invalid value, not a number") - continue - delay_dro_to_x8 = int(delay_dro_to_x8) - if delay_dro_to_x8 < 0: - print("Invalid number") - break - tc_queue.appendleft( - ( - QueueCommands.PRINT, - f"Starting procedure to switch on PL PCDU HPA with DRO to X8 delay of {delay_dro_to_x8} seconds", - ) - ) - pl_pcdu_on = PusTelecommand( - service=200, - subservice=Subservices.TC_MODE_COMMAND, - app_data=pack_mode_data(object_id=PL_PCDU_ID, mode=Modes.ON, submode=0), - ) - ssr_on = PusTelecommand( - service=200, - subservice=Subservices.TC_MODE_COMMAND, - app_data=pack_mode_data( - object_id=PL_PCDU_ID, - mode=Modes.NORMAL, - submode=submode_mask_to_submode( - NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON - ), - ), - ) - dro_on = PusTelecommand( - service=200, - subservice=Subservices.TC_MODE_COMMAND, - app_data=pack_mode_data( - object_id=PL_PCDU_ID, - mode=Modes.NORMAL, - submode=submode_mask_to_submode(NormalSubmodesMask.DRO_ON), - ), - ) - x8_on = PusTelecommand( - service=200, - subservice=Subservices.TC_MODE_COMMAND, - app_data=pack_mode_data( - object_id=PL_PCDU_ID, - mode=Modes.NORMAL, - submode=submode_mask_to_submode(NormalSubmodesMask.X8_ON), - ), - ) - tx_on = PusTelecommand( - service=200, - subservice=Subservices.TC_MODE_COMMAND, - app_data=pack_mode_data( - object_id=PL_PCDU_ID, - mode=Modes.NORMAL, - submode=submode_mask_to_submode(NormalSubmodesMask.TX_ON), - ), - ) - mpa_on = PusTelecommand( - service=200, - subservice=Subservices.TC_MODE_COMMAND, - app_data=pack_mode_data( - object_id=PL_PCDU_ID, - mode=Modes.NORMAL, - submode=submode_mask_to_submode(NormalSubmodesMask.MPA_ON), - ), - ) - hpa_on = PusTelecommand( - service=200, - subservice=Subservices.TC_MODE_COMMAND, - app_data=pack_mode_data( - object_id=PL_PCDU_ID, - mode=Modes.NORMAL, - submode=submode_mask_to_submode(NormalSubmodesMask.HPA_ON), - ), - ) - current_time = time.time() - - enb_sched = generate_enable_tc_sched_cmd(ssc=0) - - sched_time = current_time + 10 - tc_queue.appendleft(enb_sched.pack_command_tuple()) - tagged_on_cmd = generate_time_tagged_cmd( - release_time=struct.pack("!I", sched_time), - tc_to_insert=pl_pcdu_on, - ssc=1, - ) - tc_queue.appendleft(tagged_on_cmd.pack_command_tuple()) - - sched_time += 5 - tagged_ssr_cmd = generate_time_tagged_cmd( - release_time=struct.pack("!I", sched_time), - tc_to_insert=ssr_on, - ssc=2, - ) - tc_queue.appendleft(tagged_ssr_cmd.pack_command_tuple()) - - sched_time += 5 - tagged_dro_cmd = generate_time_tagged_cmd( - release_time=struct.pack("!I", sched_time), tc_to_insert=dro_on, ssc=3 - ) - tc_queue.appendleft(tagged_dro_cmd.pack_command_tuple()) - - sched_time += delay_dro_to_x8 - tagged_x8_cmd = generate_time_tagged_cmd( - release_time=struct.pack("!I", sched_time), tc_to_insert=x8_on, ssc=4 - ) - tc_queue.appendleft(tagged_x8_cmd.pack_command_tuple()) - - sched_time += 5 - tagged_tx_cmd = generate_time_tagged_cmd( - release_time=struct.pack("!I", sched_time), tc_to_insert=tx_on, ssc=5 - ) - tc_queue.appendleft(tagged_tx_cmd.pack_command_tuple()) - - sched_time += 5 - tagged_mpa_cmd = generate_time_tagged_cmd( - release_time=struct.pack("!I", sched_time), tc_to_insert=mpa_on, ssc=6 - ) - tc_queue.appendleft(tagged_mpa_cmd.pack_command_tuple()) - - sched_time += 5 - tagged_hpa_cmd = generate_time_tagged_cmd( - release_time=struct.pack("!I", sched_time), tc_to_insert=hpa_on, ssc=7 - ) - tc_queue.appendleft(tagged_hpa_cmd.pack_command_tuple()) + hpa_on_procedure(tc_queue) if op_code in OpCodes.INJECT_ALL_ON_FAILURE: pack_failure_injection_cmd( @@ -424,6 +254,129 @@ def pack_pl_pcdu_commands(tc_queue: TcQueueT, op_code: str): ) +def hpa_on_procedure(tc_queue: TcQueueT): + delay_dro_to_x8 = request_wait_time() + if delay_dro_to_x8 is None: + delay_dro_to_x8 = 900 + tc_queue.appendleft( + ( + QueueCommands.PRINT, + f"Starting procedure to switch on PL PCDU HPA with DRO to X8 delay of {delay_dro_to_x8} seconds", + ) + ) + pl_pcdu_on = PusTelecommand( + service=200, + subservice=Subservices.TC_MODE_COMMAND, + app_data=pack_mode_data(object_id=PL_PCDU_ID, mode=Modes.ON, submode=0), + ) + ssr_on = PusTelecommand( + service=200, + subservice=Subservices.TC_MODE_COMMAND, + app_data=pack_mode_data( + object_id=PL_PCDU_ID, + mode=Modes.NORMAL, + submode=submode_mask_to_submode( + NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON + ), + ), + ) + dro_on = PusTelecommand( + service=200, + subservice=Subservices.TC_MODE_COMMAND, + app_data=pack_mode_data( + object_id=PL_PCDU_ID, + mode=Modes.NORMAL, + submode=submode_mask_to_submode(NormalSubmodesMask.DRO_ON), + ), + ) + x8_on = PusTelecommand( + service=200, + subservice=Subservices.TC_MODE_COMMAND, + app_data=pack_mode_data( + object_id=PL_PCDU_ID, + mode=Modes.NORMAL, + submode=submode_mask_to_submode(NormalSubmodesMask.X8_ON), + ), + ) + tx_on = PusTelecommand( + service=200, + subservice=Subservices.TC_MODE_COMMAND, + app_data=pack_mode_data( + object_id=PL_PCDU_ID, + mode=Modes.NORMAL, + submode=submode_mask_to_submode(NormalSubmodesMask.TX_ON), + ), + ) + mpa_on = PusTelecommand( + service=200, + subservice=Subservices.TC_MODE_COMMAND, + app_data=pack_mode_data( + object_id=PL_PCDU_ID, + mode=Modes.NORMAL, + submode=submode_mask_to_submode(NormalSubmodesMask.MPA_ON), + ), + ) + hpa_on = PusTelecommand( + service=200, + subservice=Subservices.TC_MODE_COMMAND, + app_data=pack_mode_data( + object_id=PL_PCDU_ID, + mode=Modes.NORMAL, + submode=submode_mask_to_submode(NormalSubmodesMask.HPA_ON), + ), + ) + current_time = time.time() + + enb_sched = generate_enable_tc_sched_cmd(ssc=0) + + sched_time = current_time + 10 + tc_queue.appendleft(enb_sched.pack_command_tuple()) + tagged_on_cmd = generate_time_tagged_cmd( + release_time=struct.pack("!I", sched_time), + tc_to_insert=pl_pcdu_on, + ssc=1, + ) + tc_queue.appendleft(tagged_on_cmd.pack_command_tuple()) + + sched_time += 5 + tagged_ssr_cmd = generate_time_tagged_cmd( + release_time=struct.pack("!I", sched_time), + tc_to_insert=ssr_on, + ssc=2, + ) + tc_queue.appendleft(tagged_ssr_cmd.pack_command_tuple()) + + sched_time += 5 + tagged_dro_cmd = generate_time_tagged_cmd( + release_time=struct.pack("!I", sched_time), tc_to_insert=dro_on, ssc=3 + ) + tc_queue.appendleft(tagged_dro_cmd.pack_command_tuple()) + + sched_time += delay_dro_to_x8 + tagged_x8_cmd = generate_time_tagged_cmd( + release_time=struct.pack("!I", sched_time), tc_to_insert=x8_on, ssc=4 + ) + tc_queue.appendleft(tagged_x8_cmd.pack_command_tuple()) + + sched_time += 5 + tagged_tx_cmd = generate_time_tagged_cmd( + release_time=struct.pack("!I", sched_time), tc_to_insert=tx_on, ssc=5 + ) + tc_queue.appendleft(tagged_tx_cmd.pack_command_tuple()) + + sched_time += 5 + tagged_mpa_cmd = generate_time_tagged_cmd( + release_time=struct.pack("!I", sched_time), tc_to_insert=mpa_on, ssc=6 + ) + tc_queue.appendleft(tagged_mpa_cmd.pack_command_tuple()) + + sched_time += 5 + tagged_hpa_cmd = generate_time_tagged_cmd( + release_time=struct.pack("!I", sched_time), tc_to_insert=hpa_on, ssc=7 + ) + tc_queue.appendleft(tagged_hpa_cmd.pack_command_tuple()) + + def request_wait_time() -> Optional[float]: while True: wait_time = input("Please enter DRO to X8 wait time in seconds, x to cancel: ") @@ -440,6 +393,45 @@ def request_wait_time() -> Optional[float]: return wait_time +def submode_mask_to_submode(on_tgt: NormalSubmodesMask) -> int: + if on_tgt == NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON: + return 1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON + if on_tgt == NormalSubmodesMask.DRO_ON: + return 1 << NormalSubmodesMask.DRO_ON | ( + 1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON + ) + if on_tgt == NormalSubmodesMask.X8_ON: + return ( + 1 << NormalSubmodesMask.DRO_ON + | (1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON) + | (1 << NormalSubmodesMask.X8_ON) + ) + if on_tgt == NormalSubmodesMask.TX_ON: + return ( + 1 << NormalSubmodesMask.DRO_ON + | (1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON) + | (1 << NormalSubmodesMask.X8_ON) + | (1 << NormalSubmodesMask.TX_ON) + ) + if on_tgt == NormalSubmodesMask.MPA_ON: + return ( + 1 << NormalSubmodesMask.DRO_ON + | (1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON) + | (1 << NormalSubmodesMask.X8_ON) + | (1 << NormalSubmodesMask.TX_ON) + | (1 << NormalSubmodesMask.MPA_ON) + ) + if on_tgt == NormalSubmodesMask.HPA_ON: + return ( + 1 << NormalSubmodesMask.DRO_ON + | (1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON) + | (1 << NormalSubmodesMask.X8_ON) + | (1 << NormalSubmodesMask.TX_ON) + | (1 << NormalSubmodesMask.MPA_ON) + | (1 << NormalSubmodesMask.HPA_ON) + ) + + def pack_wait_time_cmd(tc_queue: TcQueueT, param_id: int, print_str: str): wait_time = request_wait_time() tc_queue.appendleft( From e6eee0d5181c671d7116c97a24f2a189989a523e Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Tue, 24 May 2022 17:07:13 +0200 Subject: [PATCH 3/3] some reordering --- pus_tc/devs/plpcdu.py | 36 +++++++++++++++++------------------- spacepackets | 2 +- 2 files changed, 18 insertions(+), 20 deletions(-) diff --git a/pus_tc/devs/plpcdu.py b/pus_tc/devs/plpcdu.py index 2ff3946..1728753 100644 --- a/pus_tc/devs/plpcdu.py +++ b/pus_tc/devs/plpcdu.py @@ -29,16 +29,15 @@ LOGGER = get_console_logger() class OpCodes: - SWITCH_ON = ["0", "on"] - SWITCH_OFF = ["1", "off"] - NORMAL_SSR = ["2", "nml-ssr"] - NORMAL_DRO = ["3", "nml-dro"] - NORMAL_X8 = ["4", "nml-x8"] - NORMAL_TX = ["5", "nml-tx"] - NORMAL_MPA = ["6", "nml-mpa"] - NORMAL_HPA = ["7", "nml-hpa"] - - SWITCH_HPA_ON_PROC = ["12", "proc-hpa"] + SWITCH_HPA_ON_PROC = ["0", "proc-hpa"] + SWITCH_ON = ["2", "on"] + SWITCH_OFF = ["3", "off"] + NORMAL_SSR = ["4", "nml-ssr"] + NORMAL_DRO = ["5", "nml-dro"] + NORMAL_X8 = ["6", "nml-x8"] + NORMAL_TX = ["7", "nml-tx"] + NORMAL_MPA = ["8", "nml-mpa"] + NORMAL_HPA = ["9", "nml-hpa"] INJECT_SSR_TO_DRO_FAILURE = ["15", "inject-ssr-dro-fault"] INJECT_DRO_TO_X8_FAILURE = ["16", "inject-dro-x8-fault"] @@ -111,9 +110,17 @@ class ParamIds(enum.IntEnum): def add_pl_pcdu_cmds(cmd_dict: ServiceOpCodeDictT): op_code_dict = dict() + add_op_code_entry( + op_code_dict=op_code_dict, + keys=OpCodes.SWITCH_HPA_ON_PROC, + info=Info.SWITCH_HPA_ON_PROC, + ) add_op_code_entry( op_code_dict=op_code_dict, keys=OpCodes.SWITCH_ON, info=Info.SWITCH_ON ) + add_op_code_entry( + op_code_dict=op_code_dict, keys=OpCodes.SWITCH_OFF, info=Info.SWITCH_OFF + ) add_op_code_entry( op_code_dict=op_code_dict, keys=OpCodes.NORMAL_SSR, @@ -144,14 +151,6 @@ def add_pl_pcdu_cmds(cmd_dict: ServiceOpCodeDictT): keys=OpCodes.NORMAL_HPA, info=Info.NORMAL_HPA, ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=OpCodes.SWITCH_HPA_ON_PROC, - info=Info.SWITCH_HPA_ON_PROC, - ) - add_op_code_entry( - op_code_dict=op_code_dict, keys=OpCodes.SWITCH_OFF, info=Info.SWITCH_OFF - ) add_op_code_entry( op_code_dict=op_code_dict, keys=OpCodes.INJECT_SSR_TO_DRO_FAILURE, @@ -245,7 +244,6 @@ def pack_pl_pcdu_commands(tc_queue: TcQueueT, op_code: str): ) if op_code in OpCodes.SWITCH_HPA_ON_PROC: hpa_on_procedure(tc_queue) - if op_code in OpCodes.INJECT_ALL_ON_FAILURE: pack_failure_injection_cmd( tc_queue=tc_queue, diff --git a/spacepackets b/spacepackets index d0c3f4a..9ee7922 160000 --- a/spacepackets +++ b/spacepackets @@ -1 +1 @@ -Subproject commit d0c3f4a802c3cddc5be2919af763b08fe6a6b05c +Subproject commit 9ee7922bf7b7a678f8e5ebd5926001defac9a3d4