v1.11.0 #72

Merged
muellerr merged 72 commits from develop into master 2022-05-17 15:04:48 +02:00
5 changed files with 78 additions and 56 deletions
Showing only changes of commit 3d1a4044fe - Show all commits

View File

@ -627,18 +627,36 @@ def add_ploc_supv_cmds(cmd_dict: ServiceOpCodeDictT):
), ),
"41": ("PLOC Supervisor: CAN loopback test", {OpCodeDictKeys.TIMEOUT: 2.0}), "41": ("PLOC Supervisor: CAN loopback test", {OpCodeDictKeys.TIMEOUT: 2.0}),
"42": ("PLOC Supervisor: Perform update", {OpCodeDictKeys.TIMEOUT: 2.0}), "42": ("PLOC Supervisor: Perform update", {OpCodeDictKeys.TIMEOUT: 2.0}),
"43": ("PLOC Supervisor: Terminate supervisor process", {OpCodeDictKeys.TIMEOUT: 2.0}), "43": (
"PLOC Supervisor: Terminate supervisor process",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"44": ("PLOC Supervisor: Start MPSoC quiet", {OpCodeDictKeys.TIMEOUT: 2.0}), "44": ("PLOC Supervisor: Start MPSoC quiet", {OpCodeDictKeys.TIMEOUT: 2.0}),
"45": ("PLOC Supervisor: Set shutdown timeout", {OpCodeDictKeys.TIMEOUT: 2.0}), "45": ("PLOC Supervisor: Set shutdown timeout", {OpCodeDictKeys.TIMEOUT: 2.0}),
"46": ("PLOC Supervisor: Factory flash", {OpCodeDictKeys.TIMEOUT: 2.0}), "46": ("PLOC Supervisor: Factory flash", {OpCodeDictKeys.TIMEOUT: 2.0}),
"47": ("PLOC Supervisor: Enable auto TM", {OpCodeDictKeys.TIMEOUT: 2.0}), "47": ("PLOC Supervisor: Enable auto TM", {OpCodeDictKeys.TIMEOUT: 2.0}),
"48": ("PLOC Supervisor: Disable auto TM", {OpCodeDictKeys.TIMEOUT: 2.0}), "48": ("PLOC Supervisor: Disable auto TM", {OpCodeDictKeys.TIMEOUT: 2.0}),
"49": ("PLOC Supervisor: Enable ADC monitor task", {OpCodeDictKeys.TIMEOUT: 2.0}), "49": (
"50": ("PLOC Supervisor: Disable ADC monitor task", {OpCodeDictKeys.TIMEOUT: 2.0}), "PLOC Supervisor: Enable ADC monitor task",
"51": ("PLOC Supervisor: Logging request event buffers", {OpCodeDictKeys.TIMEOUT: 2.0}), {OpCodeDictKeys.TIMEOUT: 2.0},
"52": ("PLOC Supervisor: Logging clear counters", {OpCodeDictKeys.TIMEOUT: 2.0}), ),
"50": (
"PLOC Supervisor: Disable ADC monitor task",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"51": (
"PLOC Supervisor: Logging request event buffers",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"52": (
"PLOC Supervisor: Logging clear counters",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"53": ("PLOC Supervisor: Logging set topic", {OpCodeDictKeys.TIMEOUT: 2.0}), "53": ("PLOC Supervisor: Logging set topic", {OpCodeDictKeys.TIMEOUT: 2.0}),
"54": ("PLOC Supervisor: Logging request counters", {OpCodeDictKeys.TIMEOUT: 2.0}), "54": (
"PLOC Supervisor: Logging request counters",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"55": ("PLOC Supervisor: Request ADC Report", {OpCodeDictKeys.TIMEOUT: 2.0}), "55": ("PLOC Supervisor: Request ADC Report", {OpCodeDictKeys.TIMEOUT: 2.0}),
} }
service_ploc_supv_tuple = ("PLOC Supervisor", op_code_dict_srv_ploc_supv) service_ploc_supv_tuple = ("PLOC Supervisor", op_code_dict_srv_ploc_supv)

View File

@ -172,7 +172,11 @@ def pack_ploc_mpsoc_commands(
elif op_code == "16": elif op_code == "16":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Tc cam command send")) tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Tc cam command send"))
cam_cmd = input("Specify cam command string: ") cam_cmd = input("Specify cam command string: ")
command = object_id + struct.pack("!I", CommandIds.TC_CAM_CMD_SEND) + bytearray(cam_cmd, 'utf-8') command = (
object_id
+ struct.pack("!I", CommandIds.TC_CAM_CMD_SEND)
+ bytearray(cam_cmd, "utf-8")
)
command = PusTelecommand(service=8, subservice=128, ssc=32, app_data=command) command = PusTelecommand(service=8, subservice=128, ssc=32, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())

View File

@ -384,69 +384,65 @@ def pack_ploc_supv_commands(
command = PusTelecommand(service=8, subservice=128, ssc=57, app_data=command) command = PusTelecommand(service=8, subservice=128, ssc=57, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "43": elif op_code == "43":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Terminate supervisor process")) tc_queue.appendleft(
command = object_id + struct.pack( (QueueCommands.PRINT, "PLOC Supervisor: Terminate supervisor process")
"!I", SupvActionIds.TERMINATE_SUPV_HELPER
) )
command = object_id + struct.pack("!I", SupvActionIds.TERMINATE_SUPV_HELPER)
command = PusTelecommand(service=8, subservice=128, ssc=58, app_data=command) command = PusTelecommand(service=8, subservice=128, ssc=58, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "44": elif op_code == "44":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Start MPSoC quiet")) tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Start MPSoC quiet"))
command = object_id + struct.pack( command = object_id + struct.pack("!I", SupvActionIds.START_MPSOC_QUIET)
"!I", SupvActionIds.START_MPSOC_QUIET
)
command = PusTelecommand(service=8, subservice=128, ssc=59, app_data=command) command = PusTelecommand(service=8, subservice=128, ssc=59, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "45": elif op_code == "45":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Set shutdown timeout")) tc_queue.appendleft(
(QueueCommands.PRINT, "PLOC Supervisor: Set shutdown timeout")
)
command = pack_set_shutdown_timeout_command(object_id) command = pack_set_shutdown_timeout_command(object_id)
command = PusTelecommand(service=8, subservice=128, ssc=60, app_data=command) command = PusTelecommand(service=8, subservice=128, ssc=60, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "46": elif op_code == "46":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Factory flash")) tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Factory flash"))
command = object_id + struct.pack( command = object_id + struct.pack("!I", SupvActionIds.FACTORY_FLASH)
"!I", SupvActionIds.FACTORY_FLASH
)
command = PusTelecommand(service=8, subservice=128, ssc=61, app_data=command) command = PusTelecommand(service=8, subservice=128, ssc=61, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "47": elif op_code == "47":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Enable auto TM")) tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Enable auto TM"))
command = object_id + struct.pack( command = object_id + struct.pack("!I", SupvActionIds.ENABLE_AUTO_TM)
"!I", SupvActionIds.ENABLE_AUTO_TM
)
command = PusTelecommand(service=8, subservice=128, ssc=62, app_data=command) command = PusTelecommand(service=8, subservice=128, ssc=62, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "48": elif op_code == "48":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Disable auto TM")) tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Disable auto TM"))
command = object_id + struct.pack( command = object_id + struct.pack("!I", SupvActionIds.DISABLE_AUTO_TM)
"!I", SupvActionIds.DISABLE_AUTO_TM
)
command = PusTelecommand(service=8, subservice=128, ssc=63, app_data=command) command = PusTelecommand(service=8, subservice=128, ssc=63, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "49": elif op_code == "49":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Enable ADC monitor task")) tc_queue.appendleft(
command = object_id + struct.pack( (QueueCommands.PRINT, "PLOC Supervisor: Enable ADC monitor task")
"!I", SupvActionIds.ENABLE_ADC_MONITOR_TASK
) )
command = object_id + struct.pack("!I", SupvActionIds.ENABLE_ADC_MONITOR_TASK)
command = PusTelecommand(service=8, subservice=128, ssc=64, app_data=command) command = PusTelecommand(service=8, subservice=128, ssc=64, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "50": elif op_code == "50":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Disable ADC monitor task")) tc_queue.appendleft(
command = object_id + struct.pack( (QueueCommands.PRINT, "PLOC Supervisor: Disable ADC monitor task")
"!I", SupvActionIds.DISABLE_ADC_MONITOR_TASK
) )
command = object_id + struct.pack("!I", SupvActionIds.DISABLE_ADC_MONITOR_TASK)
command = PusTelecommand(service=8, subservice=128, ssc=65, app_data=command) command = PusTelecommand(service=8, subservice=128, ssc=65, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "51": elif op_code == "51":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Logging request event buffers")) tc_queue.appendleft(
(QueueCommands.PRINT, "PLOC Supervisor: Logging request event buffers")
)
command = pack_logging_buffer_request(object_id) command = pack_logging_buffer_request(object_id)
command = PusTelecommand(service=8, subservice=128, ssc=66, app_data=command) command = PusTelecommand(service=8, subservice=128, ssc=66, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "52": elif op_code == "52":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Logging clear counters")) tc_queue.appendleft(
command = object_id + struct.pack( (QueueCommands.PRINT, "PLOC Supervisor: Logging clear counters")
"!I", SupvActionIds.LOGGING_CLEAR_COUNTERS
) )
command = object_id + struct.pack("!I", SupvActionIds.LOGGING_CLEAR_COUNTERS)
command = PusTelecommand(service=8, subservice=128, ssc=67, app_data=command) command = PusTelecommand(service=8, subservice=128, ssc=67, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "53": elif op_code == "53":
@ -455,13 +451,17 @@ def pack_ploc_supv_commands(
command = PusTelecommand(service=8, subservice=128, ssc=68, app_data=command) command = PusTelecommand(service=8, subservice=128, ssc=68, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "54": elif op_code == "54":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Logging request counters")) tc_queue.appendleft(
command = object_id + struct.pack('!I', SupvActionIds.LOGGING_REQUEST_COUNTERS) (QueueCommands.PRINT, "PLOC Supervisor: Logging request counters")
)
command = object_id + struct.pack("!I", SupvActionIds.LOGGING_REQUEST_COUNTERS)
command = PusTelecommand(service=8, subservice=128, ssc=69, app_data=command) command = PusTelecommand(service=8, subservice=128, ssc=69, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "55": elif op_code == "55":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Request ADC report")) tc_queue.appendleft(
command = object_id + struct.pack('!I', SupvActionIds.REQUEST_ADC_REPORT) (QueueCommands.PRINT, "PLOC Supervisor: Request ADC report")
)
command = object_id + struct.pack("!I", SupvActionIds.REQUEST_ADC_REPORT)
command = PusTelecommand(service=8, subservice=128, ssc=70, app_data=command) command = PusTelecommand(service=8, subservice=128, ssc=70, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
@ -714,30 +714,30 @@ def pack_update_command(object_id: bytearray) -> bytearray:
start_address = int(input("Specify start address: 0x"), 16) start_address = int(input("Specify start address: 0x"), 16)
update_file = get_update_file() update_file = get_update_file()
command += object_id command += object_id
command += struct.pack('!I', SupvActionIds.PERFORM_UPDATE) command += struct.pack("!I", SupvActionIds.PERFORM_UPDATE)
command += bytearray(update_file, 'utf-8') command += bytearray(update_file, "utf-8")
# Adding null terminator # Adding null terminator
command += struct.pack('!B', 0) command += struct.pack("!B", 0)
command += struct.pack('!B', memory_id) command += struct.pack("!B", memory_id)
command += struct.pack('!I', start_address) command += struct.pack("!I", start_address)
return command return command
def pack_set_shutdown_timeout_command(object_id: bytearray) -> bytearray: def pack_set_shutdown_timeout_command(object_id: bytearray) -> bytearray:
command = bytearray() command = bytearray()
command += object_id command += object_id
command += struct.pack('!I', SupvActionIds.SET_SHUTDOWN_TIMEOUT) command += struct.pack("!I", SupvActionIds.SET_SHUTDOWN_TIMEOUT)
timeout = int(input("Specify shutdown timeout (ms): ")) timeout = int(input("Specify shutdown timeout (ms): "))
command += struct.pack('!I', timeout) command += struct.pack("!I", timeout)
return command return command
def pack_logging_buffer_request(object_id: bytearray) -> bytearray: def pack_logging_buffer_request(object_id: bytearray) -> bytearray:
command = bytearray() command = bytearray()
command += object_id command += object_id
command += struct.pack('!I', SupvActionIds.LOGGING_REQUEST_EVENT_BUFFERS) command += struct.pack("!I", SupvActionIds.LOGGING_REQUEST_EVENT_BUFFERS)
path = get_event_buffer_path() path = get_event_buffer_path()
command += bytearray(path, 'utf-8') command += bytearray(path, "utf-8")
return command return command
@ -789,9 +789,9 @@ def pack_read_gpio_cmd(object_id: bytearray) -> bytearray:
def pack_logging_set_topic(objetc_id: bytearray) -> bytearray: def pack_logging_set_topic(objetc_id: bytearray) -> bytearray:
command = objetc_id + struct.pack('!I', SupvActionIds.LOGGING_SET_TOPIC) command = objetc_id + struct.pack("!I", SupvActionIds.LOGGING_SET_TOPIC)
tpc = int(input("Specify logging topic: ")) tpc = int(input("Specify logging topic: "))
command += struct.pack('!B', tpc) command += struct.pack("!B", tpc)
return command return command

View File

@ -64,17 +64,17 @@ def pack_syrlinks_command(
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
if op_code == "3": if op_code == "3":
tc_queue.appendleft((QueueCommands.PRINT, "syrlinks: Set TX mode standby")) tc_queue.appendleft((QueueCommands.PRINT, "syrlinks: Set TX mode standby"))
command = object_id + struct.pack('!I', CommandIds.SET_TX_MODE_STANDBY) command = object_id + struct.pack("!I", CommandIds.SET_TX_MODE_STANDBY)
command = PusTelecommand(service=8, subservice=128, ssc=10, app_data=command) command = PusTelecommand(service=8, subservice=128, ssc=10, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
if op_code == "4": if op_code == "4":
tc_queue.appendleft((QueueCommands.PRINT, "syrlinks: Set TX mode modulation")) tc_queue.appendleft((QueueCommands.PRINT, "syrlinks: Set TX mode modulation"))
command = object_id + struct.pack('!I', CommandIds.SET_TX_MODE_MODULATION) command = object_id + struct.pack("!I", CommandIds.SET_TX_MODE_MODULATION)
command = PusTelecommand(service=8, subservice=128, ssc=11, app_data=command) command = PusTelecommand(service=8, subservice=128, ssc=11, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
if op_code == "5": if op_code == "5":
tc_queue.appendleft((QueueCommands.PRINT, "syrlinks: Set TX mode CW")) tc_queue.appendleft((QueueCommands.PRINT, "syrlinks: Set TX mode CW"))
command = object_id + struct.pack('!I', CommandIds.SET_TX_MODE_CW) command = object_id + struct.pack("!I", CommandIds.SET_TX_MODE_CW)
command = PusTelecommand(service=8, subservice=128, ssc=12, app_data=command) command = PusTelecommand(service=8, subservice=128, ssc=12, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
if op_code == "6": if op_code == "6":
@ -89,26 +89,26 @@ def pack_syrlinks_command(
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
if op_code == "8": if op_code == "8":
tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Read TX status")) tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Read TX status"))
command = object_id + struct.pack('!I', CommandIds.READ_TX_STATUS) command = object_id + struct.pack("!I", CommandIds.READ_TX_STATUS)
command = PusTelecommand(service=8, subservice=128, ssc=13, app_data=command) command = PusTelecommand(service=8, subservice=128, ssc=13, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
if op_code == "9": if op_code == "9":
tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Read TX waveform")) tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Read TX waveform"))
command = object_id + struct.pack('!I', CommandIds.READ_TX_WAVEFORM) command = object_id + struct.pack("!I", CommandIds.READ_TX_WAVEFORM)
command = PusTelecommand(service=8, subservice=128, ssc=14, app_data=command) command = PusTelecommand(service=8, subservice=128, ssc=14, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
if op_code == "10": if op_code == "10":
tc_queue.appendleft( tc_queue.appendleft(
(QueueCommands.PRINT, "Syrlinks: Read TX AGC value high byte") (QueueCommands.PRINT, "Syrlinks: Read TX AGC value high byte")
) )
command = object_id + struct.pack('!I', CommandIds.READ_TX_AGC_VALUE_HIGH_BYTE) command = object_id + struct.pack("!I", CommandIds.READ_TX_AGC_VALUE_HIGH_BYTE)
command = PusTelecommand(service=8, subservice=128, ssc=15, app_data=command) command = PusTelecommand(service=8, subservice=128, ssc=15, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
if op_code == "11": if op_code == "11":
tc_queue.appendleft( tc_queue.appendleft(
(QueueCommands.PRINT, "Syrlinks: Read TX AGC value low byte") (QueueCommands.PRINT, "Syrlinks: Read TX AGC value low byte")
) )
command = object_id + struct.pack('!I', CommandIds.READ_TX_AGC_VALUE_LOW_BYTE) command = object_id + struct.pack("!I", CommandIds.READ_TX_AGC_VALUE_LOW_BYTE)
command = PusTelecommand(service=8, subservice=128, ssc=16, app_data=command) command = PusTelecommand(service=8, subservice=128, ssc=16, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
if op_code == "12": if op_code == "12":

View File

@ -238,7 +238,7 @@ def pack_service_queue_user(
CustomServiceList.REACTION_WHEEL_1, CustomServiceList.REACTION_WHEEL_1,
CustomServiceList.REACTION_WHEEL_2, CustomServiceList.REACTION_WHEEL_2,
CustomServiceList.REACTION_WHEEL_3, CustomServiceList.REACTION_WHEEL_3,
CustomServiceList.REACTION_WHEEL_4 CustomServiceList.REACTION_WHEEL_4,
]: ]:
return pack_rw_cmds(op_code=op_code) return pack_rw_cmds(op_code=op_code)
LOGGER.warning("Invalid Service !") LOGGER.warning("Invalid Service !")