From 96c6feb7469418d4f985b57b6c56d9a22b3b02fa Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Tue, 24 May 2022 17:04:03 +0200 Subject: [PATCH] cleaned up a bit --- pus_tc/devs/plpcdu.py | 334 +++++++++++++++++++++--------------------- 1 file changed, 163 insertions(+), 171 deletions(-) diff --git a/pus_tc/devs/plpcdu.py b/pus_tc/devs/plpcdu.py index 05367e3..2ff3946 100644 --- a/pus_tc/devs/plpcdu.py +++ b/pus_tc/devs/plpcdu.py @@ -190,45 +190,6 @@ def add_pl_pcdu_cmds(cmd_dict: ServiceOpCodeDictT): ) -def submode_mask_to_submode(on_tgt: NormalSubmodesMask) -> int: - if on_tgt == NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON: - return 1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON - if on_tgt == NormalSubmodesMask.DRO_ON: - return 1 << NormalSubmodesMask.DRO_ON | ( - 1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON - ) - if on_tgt == NormalSubmodesMask.X8_ON: - return ( - 1 << NormalSubmodesMask.DRO_ON - | (1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON) - | (1 << NormalSubmodesMask.X8_ON) - ) - if on_tgt == NormalSubmodesMask.TX_ON: - return ( - 1 << NormalSubmodesMask.DRO_ON - | (1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON) - | (1 << NormalSubmodesMask.X8_ON) - | (1 << NormalSubmodesMask.TX_ON) - ) - if on_tgt == NormalSubmodesMask.MPA_ON: - return ( - 1 << NormalSubmodesMask.DRO_ON - | (1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON) - | (1 << NormalSubmodesMask.X8_ON) - | (1 << NormalSubmodesMask.TX_ON) - | (1 << NormalSubmodesMask.MPA_ON) - ) - if on_tgt == NormalSubmodesMask.HPA_ON: - return ( - 1 << NormalSubmodesMask.DRO_ON - | (1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON) - | (1 << NormalSubmodesMask.X8_ON) - | (1 << NormalSubmodesMask.TX_ON) - | (1 << NormalSubmodesMask.MPA_ON) - | (1 << NormalSubmodesMask.HPA_ON) - ) - - def pack_pl_pcdu_commands(tc_queue: TcQueueT, op_code: str): if op_code in OpCodes.SWITCH_ON: pack_pl_pcdu_mode_cmd( @@ -283,138 +244,7 @@ def pack_pl_pcdu_commands(tc_queue: TcQueueT, op_code: str): submode=submode_mask_to_submode(NormalSubmodesMask.HPA_ON), ) if op_code in OpCodes.SWITCH_HPA_ON_PROC: - while True: - delay_dro_to_x8 = input( - "Please specify delay between DRO and switching " - "on X8 [default 900]: " - ) - if delay_dro_to_x8 == "": - delay_dro_to_x8 = 900 - break - if delay_dro_to_x8.isdigit(): - print("Invalid value, not a number") - continue - delay_dro_to_x8 = int(delay_dro_to_x8) - if delay_dro_to_x8 < 0: - print("Invalid number") - break - tc_queue.appendleft( - ( - QueueCommands.PRINT, - f"Starting procedure to switch on PL PCDU HPA with DRO to X8 delay of {delay_dro_to_x8} seconds", - ) - ) - pl_pcdu_on = PusTelecommand( - service=200, - subservice=Subservices.TC_MODE_COMMAND, - app_data=pack_mode_data(object_id=PL_PCDU_ID, mode=Modes.ON, submode=0), - ) - ssr_on = PusTelecommand( - service=200, - subservice=Subservices.TC_MODE_COMMAND, - app_data=pack_mode_data( - object_id=PL_PCDU_ID, - mode=Modes.NORMAL, - submode=submode_mask_to_submode( - NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON - ), - ), - ) - dro_on = PusTelecommand( - service=200, - subservice=Subservices.TC_MODE_COMMAND, - app_data=pack_mode_data( - object_id=PL_PCDU_ID, - mode=Modes.NORMAL, - submode=submode_mask_to_submode(NormalSubmodesMask.DRO_ON), - ), - ) - x8_on = PusTelecommand( - service=200, - subservice=Subservices.TC_MODE_COMMAND, - app_data=pack_mode_data( - object_id=PL_PCDU_ID, - mode=Modes.NORMAL, - submode=submode_mask_to_submode(NormalSubmodesMask.X8_ON), - ), - ) - tx_on = PusTelecommand( - service=200, - subservice=Subservices.TC_MODE_COMMAND, - app_data=pack_mode_data( - object_id=PL_PCDU_ID, - mode=Modes.NORMAL, - submode=submode_mask_to_submode(NormalSubmodesMask.TX_ON), - ), - ) - mpa_on = PusTelecommand( - service=200, - subservice=Subservices.TC_MODE_COMMAND, - app_data=pack_mode_data( - object_id=PL_PCDU_ID, - mode=Modes.NORMAL, - submode=submode_mask_to_submode(NormalSubmodesMask.MPA_ON), - ), - ) - hpa_on = PusTelecommand( - service=200, - subservice=Subservices.TC_MODE_COMMAND, - app_data=pack_mode_data( - object_id=PL_PCDU_ID, - mode=Modes.NORMAL, - submode=submode_mask_to_submode(NormalSubmodesMask.HPA_ON), - ), - ) - current_time = time.time() - - enb_sched = generate_enable_tc_sched_cmd(ssc=0) - - sched_time = current_time + 10 - tc_queue.appendleft(enb_sched.pack_command_tuple()) - tagged_on_cmd = generate_time_tagged_cmd( - release_time=struct.pack("!I", sched_time), - tc_to_insert=pl_pcdu_on, - ssc=1, - ) - tc_queue.appendleft(tagged_on_cmd.pack_command_tuple()) - - sched_time += 5 - tagged_ssr_cmd = generate_time_tagged_cmd( - release_time=struct.pack("!I", sched_time), - tc_to_insert=ssr_on, - ssc=2, - ) - tc_queue.appendleft(tagged_ssr_cmd.pack_command_tuple()) - - sched_time += 5 - tagged_dro_cmd = generate_time_tagged_cmd( - release_time=struct.pack("!I", sched_time), tc_to_insert=dro_on, ssc=3 - ) - tc_queue.appendleft(tagged_dro_cmd.pack_command_tuple()) - - sched_time += delay_dro_to_x8 - tagged_x8_cmd = generate_time_tagged_cmd( - release_time=struct.pack("!I", sched_time), tc_to_insert=x8_on, ssc=4 - ) - tc_queue.appendleft(tagged_x8_cmd.pack_command_tuple()) - - sched_time += 5 - tagged_tx_cmd = generate_time_tagged_cmd( - release_time=struct.pack("!I", sched_time), tc_to_insert=tx_on, ssc=5 - ) - tc_queue.appendleft(tagged_tx_cmd.pack_command_tuple()) - - sched_time += 5 - tagged_mpa_cmd = generate_time_tagged_cmd( - release_time=struct.pack("!I", sched_time), tc_to_insert=mpa_on, ssc=6 - ) - tc_queue.appendleft(tagged_mpa_cmd.pack_command_tuple()) - - sched_time += 5 - tagged_hpa_cmd = generate_time_tagged_cmd( - release_time=struct.pack("!I", sched_time), tc_to_insert=hpa_on, ssc=7 - ) - tc_queue.appendleft(tagged_hpa_cmd.pack_command_tuple()) + hpa_on_procedure(tc_queue) if op_code in OpCodes.INJECT_ALL_ON_FAILURE: pack_failure_injection_cmd( @@ -424,6 +254,129 @@ def pack_pl_pcdu_commands(tc_queue: TcQueueT, op_code: str): ) +def hpa_on_procedure(tc_queue: TcQueueT): + delay_dro_to_x8 = request_wait_time() + if delay_dro_to_x8 is None: + delay_dro_to_x8 = 900 + tc_queue.appendleft( + ( + QueueCommands.PRINT, + f"Starting procedure to switch on PL PCDU HPA with DRO to X8 delay of {delay_dro_to_x8} seconds", + ) + ) + pl_pcdu_on = PusTelecommand( + service=200, + subservice=Subservices.TC_MODE_COMMAND, + app_data=pack_mode_data(object_id=PL_PCDU_ID, mode=Modes.ON, submode=0), + ) + ssr_on = PusTelecommand( + service=200, + subservice=Subservices.TC_MODE_COMMAND, + app_data=pack_mode_data( + object_id=PL_PCDU_ID, + mode=Modes.NORMAL, + submode=submode_mask_to_submode( + NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON + ), + ), + ) + dro_on = PusTelecommand( + service=200, + subservice=Subservices.TC_MODE_COMMAND, + app_data=pack_mode_data( + object_id=PL_PCDU_ID, + mode=Modes.NORMAL, + submode=submode_mask_to_submode(NormalSubmodesMask.DRO_ON), + ), + ) + x8_on = PusTelecommand( + service=200, + subservice=Subservices.TC_MODE_COMMAND, + app_data=pack_mode_data( + object_id=PL_PCDU_ID, + mode=Modes.NORMAL, + submode=submode_mask_to_submode(NormalSubmodesMask.X8_ON), + ), + ) + tx_on = PusTelecommand( + service=200, + subservice=Subservices.TC_MODE_COMMAND, + app_data=pack_mode_data( + object_id=PL_PCDU_ID, + mode=Modes.NORMAL, + submode=submode_mask_to_submode(NormalSubmodesMask.TX_ON), + ), + ) + mpa_on = PusTelecommand( + service=200, + subservice=Subservices.TC_MODE_COMMAND, + app_data=pack_mode_data( + object_id=PL_PCDU_ID, + mode=Modes.NORMAL, + submode=submode_mask_to_submode(NormalSubmodesMask.MPA_ON), + ), + ) + hpa_on = PusTelecommand( + service=200, + subservice=Subservices.TC_MODE_COMMAND, + app_data=pack_mode_data( + object_id=PL_PCDU_ID, + mode=Modes.NORMAL, + submode=submode_mask_to_submode(NormalSubmodesMask.HPA_ON), + ), + ) + current_time = time.time() + + enb_sched = generate_enable_tc_sched_cmd(ssc=0) + + sched_time = current_time + 10 + tc_queue.appendleft(enb_sched.pack_command_tuple()) + tagged_on_cmd = generate_time_tagged_cmd( + release_time=struct.pack("!I", sched_time), + tc_to_insert=pl_pcdu_on, + ssc=1, + ) + tc_queue.appendleft(tagged_on_cmd.pack_command_tuple()) + + sched_time += 5 + tagged_ssr_cmd = generate_time_tagged_cmd( + release_time=struct.pack("!I", sched_time), + tc_to_insert=ssr_on, + ssc=2, + ) + tc_queue.appendleft(tagged_ssr_cmd.pack_command_tuple()) + + sched_time += 5 + tagged_dro_cmd = generate_time_tagged_cmd( + release_time=struct.pack("!I", sched_time), tc_to_insert=dro_on, ssc=3 + ) + tc_queue.appendleft(tagged_dro_cmd.pack_command_tuple()) + + sched_time += delay_dro_to_x8 + tagged_x8_cmd = generate_time_tagged_cmd( + release_time=struct.pack("!I", sched_time), tc_to_insert=x8_on, ssc=4 + ) + tc_queue.appendleft(tagged_x8_cmd.pack_command_tuple()) + + sched_time += 5 + tagged_tx_cmd = generate_time_tagged_cmd( + release_time=struct.pack("!I", sched_time), tc_to_insert=tx_on, ssc=5 + ) + tc_queue.appendleft(tagged_tx_cmd.pack_command_tuple()) + + sched_time += 5 + tagged_mpa_cmd = generate_time_tagged_cmd( + release_time=struct.pack("!I", sched_time), tc_to_insert=mpa_on, ssc=6 + ) + tc_queue.appendleft(tagged_mpa_cmd.pack_command_tuple()) + + sched_time += 5 + tagged_hpa_cmd = generate_time_tagged_cmd( + release_time=struct.pack("!I", sched_time), tc_to_insert=hpa_on, ssc=7 + ) + tc_queue.appendleft(tagged_hpa_cmd.pack_command_tuple()) + + def request_wait_time() -> Optional[float]: while True: wait_time = input("Please enter DRO to X8 wait time in seconds, x to cancel: ") @@ -440,6 +393,45 @@ def request_wait_time() -> Optional[float]: return wait_time +def submode_mask_to_submode(on_tgt: NormalSubmodesMask) -> int: + if on_tgt == NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON: + return 1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON + if on_tgt == NormalSubmodesMask.DRO_ON: + return 1 << NormalSubmodesMask.DRO_ON | ( + 1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON + ) + if on_tgt == NormalSubmodesMask.X8_ON: + return ( + 1 << NormalSubmodesMask.DRO_ON + | (1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON) + | (1 << NormalSubmodesMask.X8_ON) + ) + if on_tgt == NormalSubmodesMask.TX_ON: + return ( + 1 << NormalSubmodesMask.DRO_ON + | (1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON) + | (1 << NormalSubmodesMask.X8_ON) + | (1 << NormalSubmodesMask.TX_ON) + ) + if on_tgt == NormalSubmodesMask.MPA_ON: + return ( + 1 << NormalSubmodesMask.DRO_ON + | (1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON) + | (1 << NormalSubmodesMask.X8_ON) + | (1 << NormalSubmodesMask.TX_ON) + | (1 << NormalSubmodesMask.MPA_ON) + ) + if on_tgt == NormalSubmodesMask.HPA_ON: + return ( + 1 << NormalSubmodesMask.DRO_ON + | (1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON) + | (1 << NormalSubmodesMask.X8_ON) + | (1 << NormalSubmodesMask.TX_ON) + | (1 << NormalSubmodesMask.MPA_ON) + | (1 << NormalSubmodesMask.HPA_ON) + ) + + def pack_wait_time_cmd(tc_queue: TcQueueT, param_id: int, print_str: str): wait_time = request_wait_time() tc_queue.appendleft(