From b5a9dac8f6d3733779a67a72b14a20091069a346 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Wed, 30 Mar 2022 12:08:50 +0200 Subject: [PATCH] new commands for PL PCDU mode commanding --- pus_tc/cmd_definitions.py | 39 +++++++--- pus_tc/devs/ploc_mpsoc.py | 80 ++++++++++++++------- pus_tc/devs/ploc_supervisor.py | 8 +-- pus_tc/devs/plpcdu.py | 126 +++++++++++++++++++++++---------- 4 files changed, 173 insertions(+), 80 deletions(-) diff --git a/pus_tc/cmd_definitions.py b/pus_tc/cmd_definitions.py index c6eb1a9..8436bb4 100644 --- a/pus_tc/cmd_definitions.py +++ b/pus_tc/cmd_definitions.py @@ -97,25 +97,44 @@ def add_core_controller_definitions(cmd_dict: ServiceOpCodeDictT): def add_pl_pcdu_cmds(cmd_dict: ServiceOpCodeDictT): - from pus_tc.devs.plpcdu import OpCodes + from pus_tc.devs.plpcdu import OpCodes, Info op_code_dict = dict() add_op_code_entry( - op_code_dict=op_code_dict, keys=OpCodes.SWITCH_ON, info="Switch PL PCDU on" + op_code_dict=op_code_dict, keys=OpCodes.SWITCH_ON, info=Info.SWITCH_ON ) add_op_code_entry( op_code_dict=op_code_dict, - keys=OpCodes.SWITCH_ADC_NORMAL, - info="Switch PL PCDU ADC normal, submode ADC ON", - ) - add_op_code_entry( - op_code_dict=op_code_dict, keys=OpCodes.SWITCH_OFF, info="Switch PL PCDU off" + keys=OpCodes.NORMAL_SSR, + info=Info.NORMAL_SSR, ) add_op_code_entry( op_code_dict=op_code_dict, - keys=OpCodes.SWITCH_ALL_NORMAL, - info="Switch all PL PCDU modules normal, submode ALL ON", - options=generate_op_code_options(enter_listener_mode=True), + keys=OpCodes.NORMAL_DRO, + info=Info.NORMAL_DRO, + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=OpCodes.NORMAL_X8, + info=Info.NORMAL_X8, + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=OpCodes.NORMAL_TX, + info=Info.NORMAL_TX, + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=OpCodes.NORMAL_MPA, + info=Info.NORMAL_MPA, + ) + add_op_code_entry( + op_code_dict=op_code_dict, + keys=OpCodes.NORMAL_HPA, + info=Info.NORMAL_HPA, + ) + add_op_code_entry( + op_code_dict=op_code_dict, keys=OpCodes.SWITCH_OFF, info=Info.SWITCH_OFF ) add_op_code_entry( op_code_dict=op_code_dict, diff --git a/pus_tc/devs/ploc_mpsoc.py b/pus_tc/devs/ploc_mpsoc.py index 9f123c7..1895bfb 100644 --- a/pus_tc/devs/ploc_mpsoc.py +++ b/pus_tc/devs/ploc_mpsoc.py @@ -58,7 +58,7 @@ class PlocReplyIds: def pack_ploc_mpsoc_commands( - object_id: bytearray, tc_queue: TcQueueT, op_code: str + object_id: bytearray, tc_queue: TcQueueT, op_code: str ) -> TcQueueT: tc_queue.appendleft( ( @@ -69,7 +69,9 @@ def pack_ploc_mpsoc_commands( if op_code == "0": tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: TC mem write test")) - memory_address = int(input("PLOC MPSoC: Tc Mem Write: Type memory address: 0x"), 16) + memory_address = int( + input("PLOC MPSoC: Tc Mem Write: Type memory address: 0x"), 16 + ) memory_data = int(input("PLOC MPSoC: Tc Mem Write: Type memory data: 0x"), 16) # TODO: implement variable length mem write command mem_len = 1 # 1 32-bit word @@ -100,7 +102,7 @@ def pack_ploc_mpsoc_commands( tc_queue.appendleft(command.pack_command_tuple()) elif op_code == "5": tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Replay stop")) - command = object_id + struct.pack('!I', CommandIds.TC_REPLAY_STOP) + command = object_id + struct.pack("!I", CommandIds.TC_REPLAY_STOP) command = PusTelecommand(service=8, subservice=128, ssc=25, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) elif op_code == "6": @@ -110,7 +112,7 @@ def pack_ploc_mpsoc_commands( tc_queue.appendleft(command.pack_command_tuple()) elif op_code == "7": tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Downlink pwr off")) - command = object_id + struct.pack('!I', CommandIds.TC_DOWNLINK_PWR_OFF) + command = object_id + struct.pack("!I", CommandIds.TC_DOWNLINK_PWR_OFF) command = PusTelecommand(service=8, subservice=128, ssc=26, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) elif op_code == "8": @@ -119,20 +121,26 @@ def pack_ploc_mpsoc_commands( command = PusTelecommand(service=8, subservice=128, ssc=27, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) elif op_code == "9": - tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Reset OBSW sequence count")) - command = object_id + struct.pack('!I', CommandIds.OBSW_RESET_SEQ_COUNT) + tc_queue.appendleft( + (QueueCommands.PRINT, "PLOC MPSoC: Reset OBSW sequence count") + ) + command = object_id + struct.pack("!I", CommandIds.OBSW_RESET_SEQ_COUNT) command = PusTelecommand(service=8, subservice=128, ssc=28, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) elif op_code == "10": num_words = 1 tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Read DEADBEEF address")) - command = object_id + struct.pack('!I', CommandIds.TC_MEM_READ) + struct.pack("!I", MemAddresses.DEADBEEF) + \ - struct.pack('!H', num_words) + command = ( + object_id + + struct.pack("!I", CommandIds.TC_MEM_READ) + + struct.pack("!I", MemAddresses.DEADBEEF) + + struct.pack("!H", num_words) + ) command = PusTelecommand(service=8, subservice=128, ssc=29, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) elif op_code == "11": tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Tc mode replay")) - command = object_id + struct.pack('!I', CommandIds.TC_MODE_REPLAY) + command = object_id + struct.pack("!I", CommandIds.TC_MODE_REPLAY) command = PusTelecommand(service=8, subservice=128, ssc=30, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) @@ -140,7 +148,7 @@ def pack_ploc_mpsoc_commands( def generate_write_mem_command( - object_id: bytearray, memory_address: int, memory_data: int, mem_len: int + object_id: bytearray, memory_address: int, memory_data: int, mem_len: int ) -> bytearray: """This function generates the command to write to a memory address within the PLOC @param object_id The object id of the PlocHandler @@ -148,11 +156,11 @@ def generate_write_mem_command( @param memory_data The data to write to the memory address specified by the bytearray memory_address. """ command = ( - object_id - + struct.pack('!I', CommandIds.TC_MEM_WRITE) - + struct.pack('!I', memory_address) - + struct.pack('!H', mem_len) - + struct.pack("!I", memory_data) + object_id + + struct.pack("!I", CommandIds.TC_MEM_WRITE) + + struct.pack("!I", memory_address) + + struct.pack("!H", mem_len) + + struct.pack("!I", memory_data) ) return command @@ -161,8 +169,10 @@ def prepare_mem_read_command(object_id: bytearray) -> bytearray: memory_address = int(input("PLOC MPSoC Tc Mem Read: Type memory address: 0x"), 16) num_words = int(input("PLOC MPSoC specify number of words (32-bit) to read: ")) command = ( - object_id + struct.pack('!I', CommandIds.TC_MEM_READ) + struct.pack("!I", memory_address) + struct.pack( - '!H', num_words) + object_id + + struct.pack("!I", CommandIds.TC_MEM_READ) + + struct.pack("!I", memory_address) + + struct.pack("!H", num_words) ) return command @@ -170,28 +180,44 @@ def prepare_mem_read_command(object_id: bytearray) -> bytearray: def prepare_flash_write_cmd(object_id: bytearray) -> bytearray: obcFile = get_obc_file() mpsocFile = get_mpsoc_file() - command = object_id + struct.pack('!I', CommandIds.FLASH_WRITE) + bytearray(obcFile, 'utf-8') + bytearray(mpsocFile, - 'utf-8') + command = ( + object_id + + struct.pack("!I", CommandIds.FLASH_WRITE) + + bytearray(obcFile, "utf-8") + + bytearray(mpsocFile, "utf-8") + ) return command def prepare_flash_delete_cmd(object_id: bytearray) -> bytearray: file = get_mpsoc_file() - command = object_id + struct.pack('!I', CommandIds.TC_FLASH_DELETE) + bytearray(file, 'utf-8') + command = ( + object_id + + struct.pack("!I", CommandIds.TC_FLASH_DELETE) + + bytearray(file, "utf-8") + ) return command def prepare_replay_start_cmd(object_id: bytearray) -> bytearray: replay = int(input("Specify replay mode (0 - once, 1 - repeated): ")) - command = object_id + struct.pack('!I', CommandIds.TC_REPLAY_START) + struct.pack('!B', replay) + command = ( + object_id + + struct.pack("!I", CommandIds.TC_REPLAY_START) + + struct.pack("!B", replay) + ) return command def prepare_downlink_pwr_on_cmd(object_id: bytearray) -> bytearray: mode = int(input("Specify JESD mode (0 - 5): ")) lane_rate = int(input("Specify lane rate (0 - 9): ")) - command = object_id + struct.pack('!I', CommandIds.TC_DOWNLINK_PWR_ON) + struct.pack('!B', mode) \ - + struct.pack('!B', lane_rate) + command = ( + object_id + + struct.pack("!I", CommandIds.TC_DOWNLINK_PWR_ON) + + struct.pack("!B", mode) + + struct.pack("!B", lane_rate) + ) return command @@ -199,8 +225,12 @@ def prepare_replay_write_sequence_cmd(object_id: bytearray) -> bytearray: null_terminator = 0 use_decoding = int(input("Use decoding (set to 1): ")) file = get_sequence_file() - command = object_id + struct.pack('!I', CommandIds.TC_REPLAY_WRITE_SEQUENCE) + struct.pack('!B', use_decoding) + \ - bytearray(file, 'utf-8') + command = ( + object_id + + struct.pack("!I", CommandIds.TC_REPLAY_WRITE_SEQUENCE) + + struct.pack("!B", use_decoding) + + bytearray(file, "utf-8") + ) return command diff --git a/pus_tc/devs/ploc_supervisor.py b/pus_tc/devs/ploc_supervisor.py index 6f4e689..afc9170 100644 --- a/pus_tc/devs/ploc_supervisor.py +++ b/pus_tc/devs/ploc_supervisor.py @@ -86,16 +86,12 @@ def pack_ploc_supv_commands( ) ) if op_code == "0": - tc_queue.appendleft( - (QueueCommands.PRINT, "PLOC Supervisor: Set mode off") - ) + tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Set mode off")) command = pack_mode_data(object_id, Modes.OFF, 0) command = PusTelecommand(service=200, subservice=1, ssc=9, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) if op_code == "1": - tc_queue.appendleft( - (QueueCommands.PRINT, "PLOC Supervisor: Set mode on") - ) + tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Set mode on")) command = pack_mode_data(object_id, Modes.ON, 0) command = PusTelecommand(service=200, subservice=1, ssc=10, app_data=command) tc_queue.appendleft(command.pack_command_tuple()) diff --git a/pus_tc/devs/plpcdu.py b/pus_tc/devs/plpcdu.py index 9473db8..2e631fa 100644 --- a/pus_tc/devs/plpcdu.py +++ b/pus_tc/devs/plpcdu.py @@ -18,13 +18,13 @@ LOGGER = get_console_logger() class OpCodes: SWITCH_ON = ["0", "on"] - SWITCH_ADC_NORMAL = ["1", "adc-normal"] - SWITCH_ALL_NORMAL = ["2", "all-normal"] - SWITCH_OFF = ["3", "off"] - UPDATE_DRO_TO_X8_WAIT = ["6", "dro-to-x8-wait"] - UPDATE_X8_TO_TX_WAIT_TIME = ["7", "x8-to-tx-wait"] - UPDATE_TX_TO_MPA_WAIT_TIME = ["8", "tx-to-mpa-wait"] - UPDATE_MPA_TO_HPA_WAIT_TIME = ["9", "mpa-to-hpa-wait"] + SWITCH_OFF = ["1", "off"] + NORMAL_SSR = ["2", "nml-ssr"] + NORMAL_DRO = ["3", "nml-dro"] + NORMAL_X8 = ["4", "nml-x8"] + NORMAL_TX = ["5", "nml-tx"] + NORMAL_MPA = ["6", "nml-mpa"] + NORMAL_HPA = ["7", "nml-hpa"] INJECT_SSR_TO_DRO_FAILURE = ["10", "inject-ssr-dro-fault"] INJECT_DRO_TO_X8_FAILURE = ["11", "inject-dro-x8-fault"] @@ -33,10 +33,33 @@ class OpCodes: INJECT_MPA_TO_HPA_FAILURE = ["14", "inject-mpa-hpa-fault"] INJECT_ALL_ON_FAILURE = ["15", "inject-all-on-fault"] + # The following commands might become deprecated in the future + UPDATE_DRO_TO_X8_WAIT = ["128", "dro-to-x8-wait"] + UPDATE_X8_TO_TX_WAIT_TIME = ["129", "x8-to-tx-wait"] + UPDATE_TX_TO_MPA_WAIT_TIME = ["130", "tx-to-mpa-wait"] + UPDATE_MPA_TO_HPA_WAIT_TIME = ["131", "mpa-to-hpa-wait"] -class Submodes(enum.IntEnum): - ADC_ON = 0 - ALL_ON = 1 + +class Info: + NORMAL = "PL PCDU ADC modules normal" + SWITCH_ON = "Switching PL PCDU on" + SWITCH_OFF = "Switching PL PCDU off" + NORMAL_SSR = f"{NORMAL}, SSR on" + NORMAL_DRO = f"{NORMAL},DRO on" + NORMAL_X8 = f"{NORMAL}, X8 on" + NORMAL_TX = f"{NORMAL}, TX on" + NORMAL_MPA = f"{NORMAL}, MPA on" + NORMAL_HPA = f"{NORMAL}, HPA on" + + +class NormalSubmodes(enum.IntEnum): + ALL_OFF = 0 + SOLID_STATE_RELAYS_ADC_ON = 1 + DRO_ON = 2 + X8_ON = 3 + TX_ON = 4 + MPA_ON = 5 + HPA_ON = 6 class ParamIds(enum.IntEnum): @@ -79,44 +102,55 @@ class ParamIds(enum.IntEnum): def pack_pl_pcdu_commands(tc_queue: TcQueueT, op_code: str): if op_code in OpCodes.SWITCH_ON: - tc_queue.appendleft((QueueCommands.PRINT, "Switching PL PCDU on")) - mode_data = pack_mode_data(object_id=PL_PCDU_ID, mode=Modes.ON, submode=0) - mode_cmd = PusTelecommand( - service=200, subservice=Subservices.COMMAND_MODE_COMMAND, app_data=mode_data + pack_pl_pcdu_mode_cmd( + tc_queue=tc_queue, info=Info.SWITCH_ON, mode=Modes.ON, submode=0 ) - tc_queue.appendleft(mode_cmd.pack_command_tuple()) if op_code in OpCodes.SWITCH_OFF: - tc_queue.appendleft((QueueCommands.PRINT, "Switching PL PCDU off")) - mode_data = pack_mode_data(object_id=PL_PCDU_ID, mode=Modes.OFF, submode=0) - mode_cmd = PusTelecommand( - service=200, subservice=Subservices.COMMAND_MODE_COMMAND, app_data=mode_data + pack_pl_pcdu_mode_cmd( + tc_queue=tc_queue, info=Info.SWITCH_OFF, mode=Modes.OFF, submode=0 ) - tc_queue.appendleft(mode_cmd.pack_command_tuple()) - if op_code in OpCodes.SWITCH_ADC_NORMAL: - tc_queue.appendleft( - (QueueCommands.PRINT, "Switching PL PCDU ADC module normal, submode ADC ON") + if op_code in OpCodes.NORMAL_SSR: + pack_pl_pcdu_mode_cmd( + tc_queue=tc_queue, + info=Info.NORMAL_SSR, + mode=Modes.NORMAL, + submode=NormalSubmodes.SOLID_STATE_RELAYS_ADC_ON, ) - mode_data = pack_mode_data( - object_id=PL_PCDU_ID, mode=Modes.NORMAL, submode=Submodes.ADC_ON + if op_code in OpCodes.NORMAL_DRO: + pack_pl_pcdu_mode_cmd( + tc_queue=tc_queue, + info=Info.NORMAL_DRO, + mode=Modes.NORMAL, + submode=NormalSubmodes.DRO_ON, ) - mode_cmd = PusTelecommand( - service=200, subservice=Subservices.COMMAND_MODE_COMMAND, app_data=mode_data + if op_code in OpCodes.NORMAL_X8: + pack_pl_pcdu_mode_cmd( + tc_queue=tc_queue, + info=Info.NORMAL_X8, + mode=Modes.NORMAL, + submode=NormalSubmodes.X8_ON, ) - tc_queue.appendleft(mode_cmd.pack_command_tuple()) - if op_code in OpCodes.SWITCH_ALL_NORMAL: - tc_queue.appendleft( - ( - QueueCommands.PRINT, - "Switching all PL PCDU modules normal, submode ALL ON", - ) + if op_code in OpCodes.NORMAL_TX: + pack_pl_pcdu_mode_cmd( + tc_queue=tc_queue, + info=Info.NORMAL_TX, + mode=Modes.NORMAL, + submode=NormalSubmodes.TX_ON, ) - mode_data = pack_mode_data( - object_id=PL_PCDU_ID, mode=Modes.NORMAL, submode=Submodes.ALL_ON + if op_code in OpCodes.NORMAL_MPA: + pack_pl_pcdu_mode_cmd( + tc_queue=tc_queue, + info=Info.NORMAL_MPA, + mode=Modes.NORMAL, + submode=NormalSubmodes.MPA_ON, ) - mode_cmd = PusTelecommand( - service=200, subservice=Subservices.COMMAND_MODE_COMMAND, app_data=mode_data + if op_code in OpCodes.NORMAL_HPA: + pack_pl_pcdu_mode_cmd( + tc_queue=tc_queue, + info=Info.NORMAL_HPA, + mode=Modes.NORMAL, + submode=NormalSubmodes.HPA_ON, ) - tc_queue.appendleft(mode_cmd.pack_command_tuple()) if op_code in OpCodes.UPDATE_DRO_TO_X8_WAIT: pack_wait_time_cmd( tc_queue=tc_queue, @@ -189,3 +223,17 @@ def pack_failure_injection_cmd(tc_queue: TcQueueT, param_id: int, print_str: str ) cmd = pack_fsfw_load_param_cmd(ssc=0, app_data=param_data) tc_queue.appendleft(cmd.pack_command_tuple()) + + +def pack_pl_pcdu_mode_cmd(tc_queue: TcQueueT, info: str, mode: int, submode: int): + tc_queue.appendleft( + ( + QueueCommands.PRINT, + info, + ) + ) + mode_data = pack_mode_data(object_id=PL_PCDU_ID, mode=mode, submode=submode) + mode_cmd = PusTelecommand( + service=200, subservice=Subservices.COMMAND_MODE_COMMAND, app_data=mode_data + ) + tc_queue.appendleft(mode_cmd.pack_command_tuple())