Merge pull request 'Update PL PCDU Commands' (#50) from mueller/new-pl-pcdu-cmds into develop

Reviewed-on: #50
This commit is contained in:
Jakob Meier 2022-04-07 11:36:01 +02:00
commit 43be3fd370
5 changed files with 168 additions and 101 deletions

View File

@ -97,25 +97,44 @@ def add_core_controller_definitions(cmd_dict: ServiceOpCodeDictT):
def add_pl_pcdu_cmds(cmd_dict: ServiceOpCodeDictT): def add_pl_pcdu_cmds(cmd_dict: ServiceOpCodeDictT):
from pus_tc.devs.plpcdu import OpCodes from pus_tc.devs.plpcdu import OpCodes, Info
op_code_dict = dict() op_code_dict = dict()
add_op_code_entry( add_op_code_entry(
op_code_dict=op_code_dict, keys=OpCodes.SWITCH_ON, info="Switch PL PCDU on" op_code_dict=op_code_dict, keys=OpCodes.SWITCH_ON, info=Info.SWITCH_ON
) )
add_op_code_entry( add_op_code_entry(
op_code_dict=op_code_dict, op_code_dict=op_code_dict,
keys=OpCodes.SWITCH_ADC_NORMAL, keys=OpCodes.NORMAL_SSR,
info="Switch PL PCDU ADC normal, submode ADC ON", info=Info.NORMAL_SSR,
)
add_op_code_entry(
op_code_dict=op_code_dict, keys=OpCodes.SWITCH_OFF, info="Switch PL PCDU off"
) )
add_op_code_entry( add_op_code_entry(
op_code_dict=op_code_dict, op_code_dict=op_code_dict,
keys=OpCodes.SWITCH_ALL_NORMAL, keys=OpCodes.NORMAL_DRO,
info="Switch all PL PCDU modules normal, submode ALL ON", info=Info.NORMAL_DRO,
options=generate_op_code_options(enter_listener_mode=True), )
add_op_code_entry(
op_code_dict=op_code_dict,
keys=OpCodes.NORMAL_X8,
info=Info.NORMAL_X8,
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=OpCodes.NORMAL_TX,
info=Info.NORMAL_TX,
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=OpCodes.NORMAL_MPA,
info=Info.NORMAL_MPA,
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=OpCodes.NORMAL_HPA,
info=Info.NORMAL_HPA,
)
add_op_code_entry(
op_code_dict=op_code_dict, keys=OpCodes.SWITCH_OFF, info=Info.SWITCH_OFF
) )
add_op_code_entry( add_op_code_entry(
op_code_dict=op_code_dict, op_code_dict=op_code_dict,

View File

@ -60,7 +60,7 @@ class PlocReplyIds:
def pack_ploc_mpsoc_commands( def pack_ploc_mpsoc_commands(
object_id: bytearray, tc_queue: TcQueueT, op_code: str object_id: bytearray, tc_queue: TcQueueT, op_code: str
) -> TcQueueT: ) -> TcQueueT:
tc_queue.appendleft( tc_queue.appendleft(
( (
@ -90,7 +90,9 @@ def pack_ploc_mpsoc_commands(
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
if op_code == "3": if op_code == "3":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: TC mem write test")) tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: TC mem write test"))
memory_address = int(input("PLOC MPSoC: Tc Mem Write: Type memory address: 0x"), 16) memory_address = int(
input("PLOC MPSoC: Tc Mem Write: Type memory address: 0x"), 16
)
memory_data = int(input("PLOC MPSoC: Tc Mem Write: Type memory data: 0x"), 16) memory_data = int(input("PLOC MPSoC: Tc Mem Write: Type memory data: 0x"), 16)
# TODO: implement variable length mem write command # TODO: implement variable length mem write command
mem_len = 1 # 1 32-bit word mem_len = 1 # 1 32-bit word
@ -121,7 +123,7 @@ def pack_ploc_mpsoc_commands(
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "8": elif op_code == "8":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Replay stop")) tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Replay stop"))
command = object_id + struct.pack('!I', CommandIds.TC_REPLAY_STOP) command = object_id + struct.pack("!I", CommandIds.TC_REPLAY_STOP)
command = PusTelecommand(service=8, subservice=128, ssc=25, app_data=command) command = PusTelecommand(service=8, subservice=128, ssc=25, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "9": elif op_code == "9":
@ -131,7 +133,7 @@ def pack_ploc_mpsoc_commands(
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "10": elif op_code == "10":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Downlink pwr off")) tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Downlink pwr off"))
command = object_id + struct.pack('!I', CommandIds.TC_DOWNLINK_PWR_OFF) command = object_id + struct.pack("!I", CommandIds.TC_DOWNLINK_PWR_OFF)
command = PusTelecommand(service=8, subservice=128, ssc=26, app_data=command) command = PusTelecommand(service=8, subservice=128, ssc=26, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "11": elif op_code == "11":
@ -147,13 +149,17 @@ def pack_ploc_mpsoc_commands(
elif op_code == "13": elif op_code == "13":
num_words = 1 num_words = 1
tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Read DEADBEEF address")) tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Read DEADBEEF address"))
command = object_id + struct.pack('!I', CommandIds.TC_MEM_READ) + struct.pack("!I", MemAddresses.DEADBEEF) + \ command = (
struct.pack('!H', num_words) object_id
+ struct.pack("!I", CommandIds.TC_MEM_READ)
+ struct.pack("!I", MemAddresses.DEADBEEF)
+ struct.pack("!H", num_words)
)
command = PusTelecommand(service=8, subservice=128, ssc=29, app_data=command) command = PusTelecommand(service=8, subservice=128, ssc=29, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "14": elif op_code == "14":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Tc mode replay")) tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Tc mode replay"))
command = object_id + struct.pack('!I', CommandIds.TC_MODE_REPLAY) command = object_id + struct.pack("!I", CommandIds.TC_MODE_REPLAY)
command = PusTelecommand(service=8, subservice=128, ssc=30, app_data=command) command = PusTelecommand(service=8, subservice=128, ssc=30, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
@ -161,7 +167,7 @@ def pack_ploc_mpsoc_commands(
def generate_write_mem_command( def generate_write_mem_command(
object_id: bytearray, memory_address: int, memory_data: int, mem_len: int object_id: bytearray, memory_address: int, memory_data: int, mem_len: int
) -> bytearray: ) -> bytearray:
"""This function generates the command to write to a memory address within the PLOC """This function generates the command to write to a memory address within the PLOC
@param object_id The object id of the PlocHandler @param object_id The object id of the PlocHandler
@ -169,11 +175,11 @@ def generate_write_mem_command(
@param memory_data The data to write to the memory address specified by the bytearray memory_address. @param memory_data The data to write to the memory address specified by the bytearray memory_address.
""" """
command = ( command = (
object_id object_id
+ struct.pack('!I', CommandIds.TC_MEM_WRITE) + struct.pack("!I", CommandIds.TC_MEM_WRITE)
+ struct.pack('!I', memory_address) + struct.pack("!I", memory_address)
+ struct.pack('!H', mem_len) + struct.pack("!H", mem_len)
+ struct.pack("!I", memory_data) + struct.pack("!I", memory_data)
) )
return command return command
@ -182,8 +188,10 @@ def prepare_mem_read_command(object_id: bytearray) -> bytearray:
memory_address = int(input("PLOC MPSoC Tc Mem Read: Type memory address: 0x"), 16) memory_address = int(input("PLOC MPSoC Tc Mem Read: Type memory address: 0x"), 16)
num_words = int(input("PLOC MPSoC specify number of words (32-bit) to read: ")) num_words = int(input("PLOC MPSoC specify number of words (32-bit) to read: "))
command = ( command = (
object_id + struct.pack('!I', CommandIds.TC_MEM_READ) + struct.pack("!I", memory_address) + struct.pack( object_id
'!H', num_words) + struct.pack("!I", CommandIds.TC_MEM_READ)
+ struct.pack("!I", memory_address)
+ struct.pack("!H", num_words)
) )
return command return command
@ -191,28 +199,44 @@ def prepare_mem_read_command(object_id: bytearray) -> bytearray:
def prepare_flash_write_cmd(object_id: bytearray) -> bytearray: def prepare_flash_write_cmd(object_id: bytearray) -> bytearray:
obcFile = get_obc_file() obcFile = get_obc_file()
mpsocFile = get_mpsoc_file() mpsocFile = get_mpsoc_file()
command = object_id + struct.pack('!I', CommandIds.FLASH_WRITE) + bytearray(obcFile, 'utf-8') + bytearray(mpsocFile, command = (
'utf-8') object_id
+ struct.pack("!I", CommandIds.FLASH_WRITE)
+ bytearray(obcFile, "utf-8")
+ bytearray(mpsocFile, "utf-8")
)
return command return command
def prepare_flash_delete_cmd(object_id: bytearray) -> bytearray: def prepare_flash_delete_cmd(object_id: bytearray) -> bytearray:
file = get_mpsoc_file() file = get_mpsoc_file()
command = object_id + struct.pack('!I', CommandIds.TC_FLASH_DELETE) + bytearray(file, 'utf-8') command = (
object_id
+ struct.pack("!I", CommandIds.TC_FLASH_DELETE)
+ bytearray(file, "utf-8")
)
return command return command
def prepare_replay_start_cmd(object_id: bytearray) -> bytearray: def prepare_replay_start_cmd(object_id: bytearray) -> bytearray:
replay = int(input("Specify replay mode (0 - once, 1 - repeated): ")) replay = int(input("Specify replay mode (0 - once, 1 - repeated): "))
command = object_id + struct.pack('!I', CommandIds.TC_REPLAY_START) + struct.pack('!B', replay) command = (
object_id
+ struct.pack("!I", CommandIds.TC_REPLAY_START)
+ struct.pack("!B", replay)
)
return command return command
def prepare_downlink_pwr_on_cmd(object_id: bytearray) -> bytearray: def prepare_downlink_pwr_on_cmd(object_id: bytearray) -> bytearray:
mode = int(input("Specify JESD mode (0 - 5): ")) mode = int(input("Specify JESD mode (0 - 5): "))
lane_rate = int(input("Specify lane rate (0 - 9): ")) lane_rate = int(input("Specify lane rate (0 - 9): "))
command = object_id + struct.pack('!I', CommandIds.TC_DOWNLINK_PWR_ON) + struct.pack('!B', mode) \ command = (
+ struct.pack('!B', lane_rate) object_id
+ struct.pack("!I", CommandIds.TC_DOWNLINK_PWR_ON)
+ struct.pack("!B", mode)
+ struct.pack("!B", lane_rate)
)
return command return command
@ -220,8 +244,12 @@ def prepare_replay_write_sequence_cmd(object_id: bytearray) -> bytearray:
null_terminator = 0 null_terminator = 0
use_decoding = int(input("Use decoding (set to 1): ")) use_decoding = int(input("Use decoding (set to 1): "))
file = get_sequence_file() file = get_sequence_file()
command = object_id + struct.pack('!I', CommandIds.TC_REPLAY_WRITE_SEQUENCE) + struct.pack('!B', use_decoding) + \ command = (
bytearray(file, 'utf-8') object_id
+ struct.pack("!I", CommandIds.TC_REPLAY_WRITE_SEQUENCE)
+ struct.pack("!B", use_decoding)
+ bytearray(file, "utf-8")
)
return command return command

View File

@ -86,16 +86,12 @@ def pack_ploc_supv_commands(
) )
) )
if op_code == "0": if op_code == "0":
tc_queue.appendleft( tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Set mode off"))
(QueueCommands.PRINT, "PLOC Supervisor: Set mode off")
)
command = pack_mode_data(object_id, Modes.OFF, 0) command = pack_mode_data(object_id, Modes.OFF, 0)
command = PusTelecommand(service=200, subservice=1, ssc=9, app_data=command) command = PusTelecommand(service=200, subservice=1, ssc=9, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
if op_code == "1": if op_code == "1":
tc_queue.appendleft( tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Set mode on"))
(QueueCommands.PRINT, "PLOC Supervisor: Set mode on")
)
command = pack_mode_data(object_id, Modes.ON, 0) command = pack_mode_data(object_id, Modes.ON, 0)
command = PusTelecommand(service=200, subservice=1, ssc=10, app_data=command) command = PusTelecommand(service=200, subservice=1, ssc=10, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())

View File

@ -18,13 +18,13 @@ LOGGER = get_console_logger()
class OpCodes: class OpCodes:
SWITCH_ON = ["0", "on"] SWITCH_ON = ["0", "on"]
SWITCH_ADC_NORMAL = ["1", "adc-normal"] SWITCH_OFF = ["1", "off"]
SWITCH_ALL_NORMAL = ["2", "all-normal"] NORMAL_SSR = ["2", "nml-ssr"]
SWITCH_OFF = ["3", "off"] NORMAL_DRO = ["3", "nml-dro"]
UPDATE_DRO_TO_X8_WAIT = ["6", "dro-to-x8-wait"] NORMAL_X8 = ["4", "nml-x8"]
UPDATE_X8_TO_TX_WAIT_TIME = ["7", "x8-to-tx-wait"] NORMAL_TX = ["5", "nml-tx"]
UPDATE_TX_TO_MPA_WAIT_TIME = ["8", "tx-to-mpa-wait"] NORMAL_MPA = ["6", "nml-mpa"]
UPDATE_MPA_TO_HPA_WAIT_TIME = ["9", "mpa-to-hpa-wait"] NORMAL_HPA = ["7", "nml-hpa"]
INJECT_SSR_TO_DRO_FAILURE = ["10", "inject-ssr-dro-fault"] INJECT_SSR_TO_DRO_FAILURE = ["10", "inject-ssr-dro-fault"]
INJECT_DRO_TO_X8_FAILURE = ["11", "inject-dro-x8-fault"] INJECT_DRO_TO_X8_FAILURE = ["11", "inject-dro-x8-fault"]
@ -33,10 +33,33 @@ class OpCodes:
INJECT_MPA_TO_HPA_FAILURE = ["14", "inject-mpa-hpa-fault"] INJECT_MPA_TO_HPA_FAILURE = ["14", "inject-mpa-hpa-fault"]
INJECT_ALL_ON_FAILURE = ["15", "inject-all-on-fault"] INJECT_ALL_ON_FAILURE = ["15", "inject-all-on-fault"]
# The following commands might become deprecated in the future
UPDATE_DRO_TO_X8_WAIT = ["128", "dro-to-x8-wait"]
UPDATE_X8_TO_TX_WAIT_TIME = ["129", "x8-to-tx-wait"]
UPDATE_TX_TO_MPA_WAIT_TIME = ["130", "tx-to-mpa-wait"]
UPDATE_MPA_TO_HPA_WAIT_TIME = ["131", "mpa-to-hpa-wait"]
class Submodes(enum.IntEnum):
ADC_ON = 0 class Info:
ALL_ON = 1 NORMAL = "PL PCDU ADC modules normal"
SWITCH_ON = "Switching PL PCDU on"
SWITCH_OFF = "Switching PL PCDU off"
NORMAL_SSR = f"{NORMAL}, SSR on"
NORMAL_DRO = f"{NORMAL},DRO on"
NORMAL_X8 = f"{NORMAL}, X8 on"
NORMAL_TX = f"{NORMAL}, TX on"
NORMAL_MPA = f"{NORMAL}, MPA on"
NORMAL_HPA = f"{NORMAL}, HPA on"
class NormalSubmodes(enum.IntEnum):
ALL_OFF = 0
SOLID_STATE_RELAYS_ADC_ON = 1
DRO_ON = 2
X8_ON = 3
TX_ON = 4
MPA_ON = 5
HPA_ON = 6
class ParamIds(enum.IntEnum): class ParamIds(enum.IntEnum):
@ -79,67 +102,54 @@ class ParamIds(enum.IntEnum):
def pack_pl_pcdu_commands(tc_queue: TcQueueT, op_code: str): def pack_pl_pcdu_commands(tc_queue: TcQueueT, op_code: str):
if op_code in OpCodes.SWITCH_ON: if op_code in OpCodes.SWITCH_ON:
tc_queue.appendleft((QueueCommands.PRINT, "Switching PL PCDU on")) pack_pl_pcdu_mode_cmd(
mode_data = pack_mode_data(object_id=PL_PCDU_ID, mode=Modes.ON, submode=0) tc_queue=tc_queue, info=Info.SWITCH_ON, mode=Modes.ON, submode=0
mode_cmd = PusTelecommand(
service=200, subservice=Subservices.COMMAND_MODE_COMMAND, app_data=mode_data
) )
tc_queue.appendleft(mode_cmd.pack_command_tuple())
if op_code in OpCodes.SWITCH_OFF: if op_code in OpCodes.SWITCH_OFF:
tc_queue.appendleft((QueueCommands.PRINT, "Switching PL PCDU off")) pack_pl_pcdu_mode_cmd(
mode_data = pack_mode_data(object_id=PL_PCDU_ID, mode=Modes.OFF, submode=0) tc_queue=tc_queue, info=Info.SWITCH_OFF, mode=Modes.OFF, submode=0
mode_cmd = PusTelecommand(
service=200, subservice=Subservices.COMMAND_MODE_COMMAND, app_data=mode_data
) )
tc_queue.appendleft(mode_cmd.pack_command_tuple()) if op_code in OpCodes.NORMAL_SSR:
if op_code in OpCodes.SWITCH_ADC_NORMAL: pack_pl_pcdu_mode_cmd(
tc_queue.appendleft(
(QueueCommands.PRINT, "Switching PL PCDU ADC module normal, submode ADC ON")
)
mode_data = pack_mode_data(
object_id=PL_PCDU_ID, mode=Modes.NORMAL, submode=Submodes.ADC_ON
)
mode_cmd = PusTelecommand(
service=200, subservice=Subservices.COMMAND_MODE_COMMAND, app_data=mode_data
)
tc_queue.appendleft(mode_cmd.pack_command_tuple())
if op_code in OpCodes.SWITCH_ALL_NORMAL:
tc_queue.appendleft(
(
QueueCommands.PRINT,
"Switching all PL PCDU modules normal, submode ALL ON",
)
)
mode_data = pack_mode_data(
object_id=PL_PCDU_ID, mode=Modes.NORMAL, submode=Submodes.ALL_ON
)
mode_cmd = PusTelecommand(
service=200, subservice=Subservices.COMMAND_MODE_COMMAND, app_data=mode_data
)
tc_queue.appendleft(mode_cmd.pack_command_tuple())
if op_code in OpCodes.UPDATE_DRO_TO_X8_WAIT:
pack_wait_time_cmd(
tc_queue=tc_queue, tc_queue=tc_queue,
param_id=ParamIds.DRO_TO_X8_WAIT_TIME, info=Info.NORMAL_SSR,
print_str="DRO to X8", mode=Modes.NORMAL,
submode=NormalSubmodes.SOLID_STATE_RELAYS_ADC_ON,
) )
if op_code in OpCodes.UPDATE_X8_TO_TX_WAIT_TIME: if op_code in OpCodes.NORMAL_DRO:
pack_wait_time_cmd( pack_pl_pcdu_mode_cmd(
tc_queue=tc_queue, tc_queue=tc_queue,
param_id=ParamIds.X8_TO_TX_WAIT_TIME, info=Info.NORMAL_DRO,
print_str="X8 to TX", mode=Modes.NORMAL,
submode=NormalSubmodes.DRO_ON,
) )
if op_code in OpCodes.UPDATE_TX_TO_MPA_WAIT_TIME: if op_code in OpCodes.NORMAL_X8:
pack_wait_time_cmd( pack_pl_pcdu_mode_cmd(
tc_queue=tc_queue, tc_queue=tc_queue,
param_id=ParamIds.TX_TO_MPA_WAIT_TIME, info=Info.NORMAL_X8,
print_str="TX to MPA", mode=Modes.NORMAL,
submode=NormalSubmodes.X8_ON,
) )
if op_code in OpCodes.UPDATE_MPA_TO_HPA_WAIT_TIME: if op_code in OpCodes.NORMAL_TX:
pack_wait_time_cmd( pack_pl_pcdu_mode_cmd(
tc_queue=tc_queue, tc_queue=tc_queue,
param_id=ParamIds.MPA_TO_HPA_WAIT_TIME, info=Info.NORMAL_TX,
print_str="MPA to HPA", mode=Modes.NORMAL,
submode=NormalSubmodes.TX_ON,
)
if op_code in OpCodes.NORMAL_MPA:
pack_pl_pcdu_mode_cmd(
tc_queue=tc_queue,
info=Info.NORMAL_MPA,
mode=Modes.NORMAL,
submode=NormalSubmodes.MPA_ON,
)
if op_code in OpCodes.NORMAL_HPA:
pack_pl_pcdu_mode_cmd(
tc_queue=tc_queue,
info=Info.NORMAL_HPA,
mode=Modes.NORMAL,
submode=NormalSubmodes.HPA_ON,
) )
if op_code in OpCodes.INJECT_ALL_ON_FAILURE: if op_code in OpCodes.INJECT_ALL_ON_FAILURE:
pack_failure_injection_cmd( pack_failure_injection_cmd(
@ -189,3 +199,17 @@ def pack_failure_injection_cmd(tc_queue: TcQueueT, param_id: int, print_str: str
) )
cmd = pack_fsfw_load_param_cmd(ssc=0, app_data=param_data) cmd = pack_fsfw_load_param_cmd(ssc=0, app_data=param_data)
tc_queue.appendleft(cmd.pack_command_tuple()) tc_queue.appendleft(cmd.pack_command_tuple())
def pack_pl_pcdu_mode_cmd(tc_queue: TcQueueT, info: str, mode: int, submode: int):
tc_queue.appendleft(
(
QueueCommands.PRINT,
info,
)
)
mode_data = pack_mode_data(object_id=PL_PCDU_ID, mode=mode, submode=submode)
mode_cmd = PusTelecommand(
service=200, subservice=Subservices.COMMAND_MODE_COMMAND, app_data=mode_data
)
tc_queue.appendleft(mode_cmd.pack_command_tuple())

@ -1 +1 @@
Subproject commit 86cf0bf9530f0d31784ff96b025f8b778d1732b1 Subproject commit 06aed2f309a105f8f0e183d359a432301eb6947d