black preview mode, flake8
All checks were successful
EIVE/-/pipeline/head This commit looks good

This commit is contained in:
Robin Müller 2023-09-12 13:48:38 +02:00
parent 2d08fc0bfa
commit 66db12796b
Signed by: muellerr
GPG Key ID: A649FB78196E3849
20 changed files with 95 additions and 83 deletions

View File

@ -42,7 +42,8 @@ def handle_event_packet( # noqa C901: Complexity okay here
) )
_LOGGER.info(generic_event_string) _LOGGER.info(generic_event_string)
pw.file_logger.info( pw.file_logger.info(
f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}: {generic_event_string}" f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}:"
f" {generic_event_string}"
) )
specific_handler = False specific_handler = False
if info.name == "MODE_TRANSITION_FAILED": if info.name == "MODE_TRANSITION_FAILED":
@ -54,8 +55,8 @@ def handle_event_packet( # noqa C901: Complexity okay here
if info.name == "SUPV_UPDATE_PROGRESS" or info.name == "WRITE_MEMORY_FAILED": if info.name == "SUPV_UPDATE_PROGRESS" or info.name == "WRITE_MEMORY_FAILED":
additional_event_info = f"Additional info: {info.info}" additional_event_info = f"Additional info: {info.info}"
context = ( context = (
f"Progress Percent: {event_def.param1 >> 24 & 0xff} | " f"Progress Percent: {event_def.param1 >> 24 & 0xff} | Sequence Count:"
f"Sequence Count: {event_def.param1 & 0xffff} | Bytes Written: {event_def.param2}" f" {event_def.param1 & 0xffff} | Bytes Written: {event_def.param2}"
) )
pw.dlog(additional_event_info) pw.dlog(additional_event_info)
pw.dlog(context) pw.dlog(context)

View File

@ -188,5 +188,5 @@ def handle_regular_hk_print( # noqa C901: Complexity okay here
else: else:
_LOGGER.info( _LOGGER.info(
f"Service 3 TM: Parsing for object {object_id} and set ID {set_id} " f"Service 3 TM: Parsing for object {object_id} and set ID {set_id} "
f"has not been implemented." "has not been implemented."
) )

View File

@ -87,7 +87,8 @@ def pus_factory_hook( # noqa C901 : Complexity okay here
# TODO: Could improve display further by actually displaying a matrix as a # TODO: Could improve display further by actually displaying a matrix as a
# matrix using row and column information # matrix using row and column information
pw.dlog( pw.dlog(
f"Received vector or matrix data: {param.param_raw.hex(sep=',')}" "Received vector or matrix data:"
f" {param.param_raw.hex(sep=',')}"
) )
except ValueError as e: except ValueError as e:
pw.dlog(f"received {e} when trying to parse parameters") pw.dlog(f"received {e} when trying to parse parameters")

View File

@ -48,7 +48,7 @@ def generic_retval_printout(
if retval_info is None: if retval_info is None:
raw_err = retval raw_err = retval
return [ return [
f"No returnvalue information found for error code with " "No returnvalue information found for error code with "
f"subsystem ID {(raw_err >> 8) & 0xff} and unique ID {raw_err & 0xff}" f"subsystem ID {(raw_err >> 8) & 0xff} and unique ID {raw_err & 0xff}"
] ]
else: else:
@ -58,9 +58,9 @@ def generic_retval_printout(
) )
string_list = [retval_string] string_list = [retval_string]
if p1: if p1:
error_param_1_str = f"Error Parameter 1: hex {p1:#010x} " f"dec {p1} " error_param_1_str = f"Error Parameter 1: hex {p1:#010x} dec {p1} "
string_list.append(error_param_1_str) string_list.append(error_param_1_str)
if p2: if p2:
error_param_2_str = f"Error Parameter 2: hex {p2:#010x} " f"dec {p2}" error_param_2_str = f"Error Parameter 2: hex {p2:#010x} dec {p2}"
string_list.append(error_param_2_str) string_list.append(error_param_2_str)
return string_list return string_list

View File

@ -571,8 +571,8 @@ def pack_acs_ctrl_command(p: ServiceProviderParams): # noqa C901
def set_acs_ctrl_param_scalar(q: DefaultPusQueueHelper): def set_acs_ctrl_param_scalar(q: DefaultPusQueueHelper):
pt = int( pt = int(
input( input(
'Specify parameter type to set {0: "uint8", 1: "uint16", 2: "int32", 3: "float", ' 'Specify parameter type to set {0: "uint8", 1: "uint16", 2: "int32", 3:'
'4: "double"}: ' ' "float", 4: "double"}: '
) )
) )
sid = int(input("Specify parameter struct ID to set: ")) sid = int(input("Specify parameter struct ID to set: "))
@ -808,8 +808,8 @@ def handle_acs_ctrl_sus_raw_data(pw: PrintWrapper, hk_data: bytes):
def handle_acs_ctrl_sus_processed_data(pw: PrintWrapper, hk_data: bytes): def handle_acs_ctrl_sus_processed_data(pw: PrintWrapper, hk_data: bytes):
if len(hk_data) < 3 * 4 * 12 + 3 * 8 * 3: if len(hk_data) < 3 * 4 * 12 + 3 * 8 * 3:
pw.dlog( pw.dlog(
f"SUS Processed dataset with size {len(hk_data)} does not have expected size" f"SUS Processed dataset with size {len(hk_data)} does not have expected"
f" of {3 * 4 * 12 + 3 * 8 * 3} bytes" f" size of {3 * 4 * 12 + 3 * 8 * 3} bytes"
) )
return return
current_idx = 0 current_idx = 0
@ -846,7 +846,8 @@ def handle_raw_mgm_data(pw: PrintWrapper, hk_data: bytes):
if len(hk_data) < 61: if len(hk_data) < 61:
pw.dlog( pw.dlog(
f"ACS CTRL HK: MGM HK data with length {len(hk_data)} shorter than expected 61 bytes" f"ACS CTRL HK: MGM HK data with length {len(hk_data)} shorter than expected"
" 61 bytes"
) )
pw.dlog(f"Raw Data: {hk_data.hex(sep=',')}") pw.dlog(f"Raw Data: {hk_data.hex(sep=',')}")
return return
@ -1276,8 +1277,8 @@ def perform_mgm_calibration( # noqa C901: Complexity okay
reply = CALIBR_SOCKET.recv(1024) reply = CALIBR_SOCKET.recv(1024)
if len(reply) != 2: if len(reply) != 2:
pw.dlog( pw.dlog(
f"MGM calibration: Reply received command magnetometer_field has invalid " "MGM calibration: Reply received command magnetometer_field has"
f"length {len(reply)}" f" invalid length {len(reply)}"
) )
return return
else: else:

View File

@ -12,8 +12,7 @@ from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_3_fsfw_hk import ( from tmtccmd.tc.pus_3_fsfw_hk import (
make_sid, make_sid,
create_request_one_hk_command, create_request_one_hk_command,
create_enable_periodic_hk_command_with_interval, create_enable_periodic_hk_command_with_interval_with_diag,
create_disable_periodic_hk_command, create_enable_periodic_hk_command_with_interval_with_diag,
create_disable_periodic_hk_command_with_diag, create_disable_periodic_hk_command_with_diag,
) )
from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter

View File

@ -24,8 +24,6 @@ from tmtccmd.tc.pus_3_fsfw_hk import (
generate_one_diag_command, generate_one_diag_command,
generate_one_hk_command, generate_one_hk_command,
create_request_one_diag_command, create_request_one_diag_command,
create_disable_periodic_hk_command,
create_enable_periodic_hk_command_with_interval,
create_enable_periodic_hk_command_with_interval_with_diag, create_enable_periodic_hk_command_with_interval_with_diag,
create_disable_periodic_hk_command_with_diag, create_disable_periodic_hk_command_with_diag,
) )
@ -229,12 +227,13 @@ def pack_imtq_test_into( # noqa C901
duration = int( duration = int(
input( input(
f"Specify torque duration [range [0, {pow(2, 16) - 1}, " f"Specify torque duration [range [0, {pow(2, 16) - 1}, "
f"0 for continuous generation until update]: " "0 for continuous generation until update]: "
) )
) )
dur_str = "infinite" if duration == 0 else str(duration) dur_str = "infinite" if duration == 0 else str(duration)
q.add_log_cmd( q.add_log_cmd(
f"IMTQ: Commanding dipole X={x_dipole}, Y={y_dipole}, Z={y_dipole}, duration={dur_str}" f"IMTQ: Commanding dipole X={x_dipole}, Y={y_dipole}, Z={y_dipole},"
f" duration={dur_str}"
) )
q.add_pus_tc( q.add_pus_tc(
pack_dipole_command( pack_dipole_command(
@ -390,7 +389,8 @@ def pack_dipole_command(
duration = int(round(duration)) duration = int(round(duration))
if duration < 0 or duration > pow(2, 16) - 1: if duration < 0 or duration > pow(2, 16) - 1:
raise ValueError( raise ValueError(
f"Duration in ms of {duration} smaller than 0 or larger than allowed {pow(2, 16) - 1}" f"Duration in ms of {duration} smaller than 0 or larger than allowed"
f" {pow(2, 16) - 1}"
) )
command += struct.pack("!h", x_dipole) command += struct.pack("!h", x_dipole)
command += struct.pack("!h", y_dipole) command += struct.pack("!h", y_dipole)
@ -402,7 +402,8 @@ def pack_dipole_command(
def raise_dipole_error(dipole_str: str, value: int): def raise_dipole_error(dipole_str: str, value: int):
raise ValueError( raise ValueError(
f"{dipole_str} {value} negative or larger than maximum allowed 2000 * 10^-4*Am^2" f"{dipole_str} {value} negative or larger than maximum allowed 2000 *"
" 10^-4*Am^2"
) )

View File

@ -261,14 +261,12 @@ def pack_set_speed_command(
if speed > 0: if speed > 0:
if speed < 1000 or speed > 65000: if speed < 1000 or speed > 65000:
raise ValueError( raise ValueError(
"Invalid RW speed specified. " "Invalid RW speed specified. Allowed range is [1000, 65000] 0.1 * RPM"
"Allowed range is [1000, 65000] 0.1 * RPM"
) )
elif speed < 0: elif speed < 0:
if speed < -65000 or speed > -1000: if speed < -65000 or speed > -1000:
raise ValueError( raise ValueError(
"Invalid RW speed specified. " "Invalid RW speed specified. Allowed range is [-65000, -1000] 0.1 * RPM"
"Allowed range is [-65000, -1000] 0.1 * RPM"
) )
else: else:
# Speed is 0 # Speed is 0
@ -304,15 +302,16 @@ def handle_rw_hk_data(
speed_rpm = speed / 10.0 speed_rpm = speed / 10.0
ref_speed_rpm = ref_speed / 10.0 ref_speed_rpm = ref_speed / 10.0
pw.dlog( pw.dlog(
f"Temperature {temp} C | Speed {speed_rpm} rpm | Reference Speed {ref_speed_rpm} rpm" f"Temperature {temp} C | Speed {speed_rpm} rpm | Reference Speed"
f" {ref_speed_rpm} rpm"
) )
pw.dlog( pw.dlog(
f"State {state}. 0: Error, 1: Idle, 2: Coasting, 3: Running, speed stable, " f"State {state}. 0: Error, 1: Idle, 2: Coasting, 3: Running, speed stable, "
f"4: Running, speed changing" "4: Running, speed changing"
) )
pw.dlog( pw.dlog(
f"Current Limit Control mode {clc_mode}. 0: Low Current Mode (0.3 A), " f"Current Limit Control mode {clc_mode}. 0: Low Current Mode (0.3 A), "
f"1: High Current Mode (0.6 A)" "1: High Current Mode (0.6 A)"
) )
pw.dlog(FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], 5)) pw.dlog(FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], 5))
if set_id == RwSetId.LAST_RESET: if set_id == RwSetId.LAST_RESET:
@ -361,22 +360,24 @@ def handle_rw_hk_data(
) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len]) ) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len])
pw.dlog( pw.dlog(
f"MCU Temperature {mcu_temp} | Pressure Sensore Temperature {pressure_sens_temp} C" f"MCU Temperature {mcu_temp} | Pressure Sensore Temperature"
f" {pressure_sens_temp} C"
) )
pw.dlog(f"Last Reset Status {last_reset_status}") pw.dlog(f"Last Reset Status {last_reset_status}")
pw.dlog( pw.dlog(
f"Current Limit Control mode {clc_mode}. 0: Low Current Mode (0.3 A), " f"Current Limit Control mode {clc_mode}. 0: Low Current Mode (0.3 A), "
f"1: High Current Mode (0.6 A)" "1: High Current Mode (0.6 A)"
) )
pw.dlog(f"Speed {current_speed} rpm | Reference Speed {ref_speed} rpm") pw.dlog(f"Speed {current_speed} rpm | Reference Speed {ref_speed} rpm")
pw.dlog( pw.dlog(
f"State {state}. 0: Error, 1: Idle, 2: Coasting, 3: Running, speed stable, " f"State {state}. 0: Error, 1: Idle, 2: Coasting, 3: Running, speed stable, "
f"4: Running, speed changing" "4: Running, speed changing"
) )
pw.dlog("Number Of Invalid Packets:") pw.dlog("Number Of Invalid Packets:")
pw.dlog("CRC | Length | CMD") pw.dlog("CRC | Length | CMD")
pw.dlog( pw.dlog(
f"{num_invalid_crc_packets} | {num_invalid_len_packets} | {num_invalid_cmd_packets}" f"{num_invalid_crc_packets} | {num_invalid_len_packets} |"
f" {num_invalid_cmd_packets}"
) )
pw.dlog( pw.dlog(
f"Num Of CMD Executed Requests {num_of_cmd_executed_requests} | " f"Num Of CMD Executed Requests {num_of_cmd_executed_requests} | "
@ -388,15 +389,16 @@ def handle_rw_hk_data(
"RegOverrunErrs | TotalErrs" "RegOverrunErrs | TotalErrs"
) )
pw.dlog( pw.dlog(
f"{uart_num_of_bytes_written} | {uart_num_of_bytes_read} | {uart_num_parity_errors} | " f"{uart_num_of_bytes_written} | {uart_num_of_bytes_read} |"
f"{uart_num_noise_errors} | {uart_num_frame_errors} | {uart_num_reg_overrun_errors} | " f" {uart_num_parity_errors} | {uart_num_noise_errors} |"
f" {uart_num_frame_errors} | {uart_num_reg_overrun_errors} |"
f" {uart_total_num_errors}" f" {uart_total_num_errors}"
) )
pw.dlog("SPI COM Info:") pw.dlog("SPI COM Info:")
pw.dlog("NumBytesWritten | NumBytesRead | RegOverrunErrs | TotalErrs") pw.dlog("NumBytesWritten | NumBytesRead | RegOverrunErrs | TotalErrs")
pw.dlog( pw.dlog(
f"{spi_num_bytes_written} | {spi_num_bytes_read} | {spi_num_reg_overrun_errors} | " f"{spi_num_bytes_written} | {spi_num_bytes_read} |"
f"{spi_total_num_errors}" f" {spi_num_reg_overrun_errors} | {spi_total_num_errors}"
) )
if current_idx > 0: if current_idx > 0:
pw.dlog( pw.dlog(

View File

@ -22,7 +22,6 @@ from tmtccmd.tc.pus_200_fsfw_mode import (
) )
from tmtccmd.tc.pus_20_fsfw_param import ( from tmtccmd.tc.pus_20_fsfw_param import (
create_load_param_cmd, create_load_param_cmd,
pack_scalar_u8_parameter_app_data,
) )
from tmtccmd.pus.s20_fsfw_param import create_scalar_u32_parameter from tmtccmd.pus.s20_fsfw_param import create_scalar_u32_parameter
@ -93,7 +92,6 @@ def build_com_subsystem_cmd(p: ServiceProviderParams): # noqa C901
0, 0,
ParameterId.DATARATE, ParameterId.DATARATE,
Datarate.LOW_RATE_MODULATION_BPSK, Datarate.LOW_RATE_MODULATION_BPSK,
) )
) )
) )

View File

@ -21,9 +21,8 @@ from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_3_fsfw_hk import ( from tmtccmd.tc.pus_3_fsfw_hk import (
make_sid, make_sid,
create_request_one_diag_command, create_request_one_diag_command,
create_enable_periodic_hk_command_with_interval, create_request_one_hk_command,
create_disable_periodic_hk_command, create_enable_periodic_hk_command_with_interval_with_diag,
create_request_one_hk_command, create_enable_periodic_hk_command_with_interval_with_diag,
create_disable_periodic_hk_command_with_diag, create_disable_periodic_hk_command_with_diag,
) )
from spacepackets.ecss.tc import PusTelecommand from spacepackets.ecss.tc import PusTelecommand
@ -183,7 +182,9 @@ def pack_syrlinks_command( # noqa C901: Complexity okay here.
q.add_log_cmd(f"{prefix}: {Info.ENABLE_HK_RX_REGS}") q.add_log_cmd(f"{prefix}: {Info.ENABLE_HK_RX_REGS}")
sid = make_sid(obyt, SetId.RX_REGISTERS_DATASET) sid = make_sid(obyt, SetId.RX_REGISTERS_DATASET)
interval = float(input("HK interval in floating point seconds")) interval = float(input("HK interval in floating point seconds"))
cmds = create_enable_periodic_hk_command_with_interval_with_diag(True, sid, interval) cmds = create_enable_periodic_hk_command_with_interval_with_diag(
True, sid, interval
)
for cmd in cmds: for cmd in cmds:
q.add_pus_tc(cmd) q.add_pus_tc(cmd)
if op_code in OpCode.DISABLE_HK_RX_REGS: if op_code in OpCode.DISABLE_HK_RX_REGS:
@ -194,7 +195,9 @@ def pack_syrlinks_command( # noqa C901: Complexity okay here.
q.add_log_cmd(f"{prefix}: {Info.ENABLE_HK_TX_REGS}") q.add_log_cmd(f"{prefix}: {Info.ENABLE_HK_TX_REGS}")
sid = make_sid(obyt, SetId.TX_REGISTERS_DATASET) sid = make_sid(obyt, SetId.TX_REGISTERS_DATASET)
interval = float(input("HK interval in floating point seconds")) interval = float(input("HK interval in floating point seconds"))
cmds = create_enable_periodic_hk_command_with_interval_with_diag(True, sid, interval) cmds = create_enable_periodic_hk_command_with_interval_with_diag(
True, sid, interval
)
for cmd in cmds: for cmd in cmds:
q.add_pus_tc(cmd) q.add_pus_tc(cmd)
if op_code in OpCode.DISABLE_HK_TX_REGS: if op_code in OpCode.DISABLE_HK_TX_REGS:

View File

@ -570,12 +570,12 @@ def handle_ploc_mpsoc_hk_data(pw: PrintWrapper, hk_data: bytes, set_id: int):
pw.dlog(f"CAM SoC Temperature: {cam_soc_temp}") pw.dlog(f"CAM SoC Temperature: {cam_soc_temp}")
pw.dlog(f"System Monitor Temperature: {sysmon_temp}") pw.dlog(f"System Monitor Temperature: {sysmon_temp}")
pw.dlog( pw.dlog(
f"SYSMON VCC INT {sysmon_vcc_int:.3f} | SYSMON VCC AUX {sysmon_vcc_aux:.3f} | " f"SYSMON VCC INT {sysmon_vcc_int:.3f} | SYSMON VCC AUX"
f"SYSMON VCC BRAM {sysmon_vcc_bram:.3f}" f" {sysmon_vcc_aux:.3f} | SYSMON VCC BRAM {sysmon_vcc_bram:.3f}"
) )
pw.dlog( pw.dlog(
f"SYSMON VCC PAUX {sysmon_vcc_paux:.3f} | SYSMON VCC PINT {sysmon_vcc_pint:.3f} | " f"SYSMON VCC PAUX {sysmon_vcc_paux:.3f} | SYSMON VCC PINT"
f"SYSMON VCC PDRO {sysmon_vcc_pdro:.3f}" f" {sysmon_vcc_pint:.3f} | SYSMON VCC PDRO {sysmon_vcc_pdro:.3f}"
) )
fmt_str = "!fffffffffffff" fmt_str = "!fffffffffffff"
@ -603,7 +603,8 @@ def handle_ploc_mpsoc_hk_data(pw: PrintWrapper, hk_data: bytes, set_id: int):
) )
pw.dlog( pw.dlog(
f"SYSMON VCC 12V {sysmon_vcc_12v:.3f} | SYSMON VCC 5V {sysmon_vcc_5v:.3f} |" f"SYSMON VCC 12V {sysmon_vcc_12v:.3f} | SYSMON VCC 5V {sysmon_vcc_5v:.3f} |"
f"SYSMON VCC 3V3 {sysmon_vcc_3v3:.3f} | SYSMON VCC 3V3VA {sysmon_vcc_3v3va}" f" SYSMON VCC 3V3 {sysmon_vcc_3v3:.3f} | SYSMON VCC 3V3VA"
f" {sysmon_vcc_3v3va}"
) )
pw.dlog( pw.dlog(
f"SYSMON VCC 2V5DDR {sysmon_vcc_2v5ddr:.3f} | " f"SYSMON VCC 2V5DDR {sysmon_vcc_2v5ddr:.3f} | "

View File

@ -471,8 +471,8 @@ def pack_ploc_supv_commands(p: ServiceProviderParams): # noqa C901
custom_data.extend(struct.pack("!B", memory_id)) custom_data.extend(struct.pack("!B", memory_id))
custom_data.extend(struct.pack("!I", start_address)) custom_data.extend(struct.pack("!I", start_address))
q.add_log_cmd( q.add_log_cmd(
f"{prefix}: {Info.MEM_CHECK} for file {update_file} at memory ID {memory_id} at start " f"{prefix}: {Info.MEM_CHECK} for file {update_file} at memory ID"
f"address {start_address}" f" {memory_id} at start address {start_address}"
) )
command = create_action_cmd( command = create_action_cmd(
object_id.as_bytes, SupvActionId.MEM_CHECK, custom_data object_id.as_bytes, SupvActionId.MEM_CHECK, custom_data
@ -775,7 +775,8 @@ def handle_supv_hk_data(set_id: int, hk_data: bytes, pw: PrintWrapper):
) = struct.unpack(fmt_str, hk_data[0 : 0 + inc_len]) ) = struct.unpack(fmt_str, hk_data[0 : 0 + inc_len])
current_idx += inc_len current_idx += inc_len
pw.dlog( pw.dlog(
f"SoC state (0:off, 1:booting, 2:update, 3:operating, 4:shutdown, 5:reset): {soc_state}" "SoC state (0:off, 1:booting, 2:update, 3:operating, 4:shutdown, 5:reset):"
f" {soc_state}"
) )
pw.dlog(f"Power Cycles {power_cycles}") pw.dlog(f"Power Cycles {power_cycles}")
pw.dlog(f"Boot after {boot_after_ms} ms | Boot timeout {boot_timeout_ms} ms") pw.dlog(f"Boot after {boot_after_ms} ms | Boot timeout {boot_timeout_ms} ms")

View File

@ -131,8 +131,7 @@ def pack_scex_cmds(p: ServiceProviderParams): # noqa C901
cell_select = int(cell_select) cell_select = int(cell_select)
if cell_select < 1 or cell_select > 10: if cell_select < 1 or cell_select > 10:
print( print(
f"Invalid cell number {cell_select}, " f"Invalid cell number {cell_select}, Please enter a valid number: "
f"Please enter a valid number: "
) )
continue continue
cn = cell_select - 1 cn = cell_select - 1

View File

@ -200,7 +200,8 @@ def pack_common_gomspace_cmds( # noqa C901: Complexity is okay here.
if op_code in GomspaceOpCode.SAVE_TABLE_DEFAULT: if op_code in GomspaceOpCode.SAVE_TABLE_DEFAULT:
source_table = int( source_table = int(
input( input(
"Source table [0: Board Config, 1: Module Config, 2: Calibration Parameter]: " "Source table [0: Board Config, 1: Module Config, 2: Calibration"
" Parameter]: "
) )
) )
if source_table not in [0, 1, 2]: if source_table not in [0, 1, 2]:
@ -215,8 +216,8 @@ def pack_common_gomspace_cmds( # noqa C901: Complexity is okay here.
if op_code in GomspaceOpCode.LOAD_TABLE: if op_code in GomspaceOpCode.LOAD_TABLE:
target_table = int( target_table = int(
input( input(
"Target table ID [0: Board Config, 1: Module Config, 2: Calibration Parameter, " "Target table ID [0: Board Config, 1: Module Config, 2: Calibration"
"4: HK TM]: " " Parameter, 4: HK TM]: "
) )
) )
if target_table not in [0, 1, 2, 4]: if target_table not in [0, 1, 2, 4]:

View File

@ -284,7 +284,7 @@ def hpa_on_procedure(q: DefaultPusQueueHelper):
if delay_dro_to_x8 is None: if delay_dro_to_x8 is None:
delay_dro_to_x8 = 900 delay_dro_to_x8 = 900
q.add_log_cmd( q.add_log_cmd(
f"Starting procedure to switch on PL PCDU HPA with DRO to X8 " "Starting procedure to switch on PL PCDU HPA with DRO to X8 "
f"delay of {delay_dro_to_x8} seconds" f"delay of {delay_dro_to_x8} seconds"
) )
pl_pcdu_on = PusTelecommand( pl_pcdu_on = PusTelecommand(

View File

@ -75,8 +75,8 @@ class WdtInfo:
self.pw.dlog(wdt_info) self.pw.dlog(wdt_info)
for idx in range(len(self.wdt_reboots_list)): for idx in range(len(self.wdt_reboots_list)):
self.pw.dlog( self.pw.dlog(
f"{WDT_LIST[idx].ljust(5)} | " f"{WDT_LIST[idx].ljust(5)} | {self.wdt_reboots_list[idx]:010} |"
f"{self.wdt_reboots_list[idx]:010} | {self.time_pings_left_list[idx]:010}", f" {self.time_pings_left_list[idx]:010}",
) )
def parse(self, wdt_data: bytes, current_idx: int) -> int: def parse(self, wdt_data: bytes, current_idx: int) -> int:
@ -328,7 +328,8 @@ def handle_p60_hk_data(pw: PrintWrapper, set_id: int, hk_data: bytes):
pw.dlog(misc_info) pw.dlog(misc_info)
batt_info = ( batt_info = (
f"Batt Temp 0 {batt_temp_0 / 10.0} | Batt Temp 1 {batt_temp_1 / 10.0} |" f"Batt Temp 0 {batt_temp_0 / 10.0} | Batt Temp 1 {batt_temp_1 / 10.0} |"
f"Charge Current {batt_charge_current} | Discharge Current {batt_discharge_current}" f" Charge Current {batt_charge_current} | Discharge Current"
f" {batt_discharge_current}"
) )
pw.dlog(batt_info) pw.dlog(batt_info)
pw.dlog( pw.dlog(
@ -419,12 +420,12 @@ def handle_acu_hk_data(pw: PrintWrapper, set_id: int, hk_data: bytes):
) )
pw.dlog(f"Boot Cause {boot_cause} | Reset Cause {reset_cause}") pw.dlog(f"Boot Cause {boot_cause} | Reset Cause {reset_cause}")
pw.dlog( pw.dlog(
f"Ground WDT: Reboot Count {wdt_cnt_gnd} | Time Left {wdt_gnd_time_left} sec" f"Ground WDT: Reboot Count {wdt_cnt_gnd} | Time Left"
f" {wdt_gnd_time_left} sec"
) )
pw.dlog( pw.dlog(
"ACU Dev Types: 0:FRAM|1:ADC|2:ADC|3:DAC|4:DAC|" "ACU Dev Types: 0:FRAM|1:ADC|2:ADC|3:DAC|4:DAC|5:DAC|6:TempSens|7:Reserved"
"5:DAC|6:TempSens|7:Reserved"
) )
dev_parser.print(pw=pw) dev_parser.print(pw=pw)
FsfwTmTcPrinter.get_validity_buffer( FsfwTmTcPrinter.get_validity_buffer(
@ -474,7 +475,8 @@ def pdu_config_table_handler(
pw.dlog("[tcs, syrlinks, str, mgt, sus-n, scex, ploc, acs-a, unused]") pw.dlog("[tcs, syrlinks, str, mgt, sus-n, scex, ploc, acs-a, unused]")
elif obj_id.as_bytes == PDU_2_HANDLER_ID: elif obj_id.as_bytes == PDU_2_HANDLER_ID:
pw.dlog( pw.dlog(
"[obc, pl-pcdu-bat-nom, rw, heaters, sus-r, sa-depl, pl-pcdu-bat-red, acs-b, pl-cam]" "[obc, pl-pcdu-bat-nom, rw, heaters, sus-r, sa-depl, pl-pcdu-bat-red,"
" acs-b, pl-cam]"
) )
out_on_cnt = unpack_array_in_data(custom_data, 0x52, 2, 9, "H") out_on_cnt = unpack_array_in_data(custom_data, 0x52, 2, 9, "H")
out_off_cnt = unpack_array_in_data(custom_data, 0x64, 2, 9, "H") out_off_cnt = unpack_array_in_data(custom_data, 0x64, 2, 9, "H")

View File

@ -12,8 +12,8 @@ from tmtccmd.tc.pus_3_fsfw_hk import (
make_sid, make_sid,
generate_one_hk_command, generate_one_hk_command,
create_request_one_diag_command, create_request_one_diag_command,
create_enable_periodic_hk_command_with_interval, create_request_one_hk_command,
create_request_one_hk_command, create_enable_periodic_hk_command_with_interval_with_diag, create_enable_periodic_hk_command_with_interval_with_diag,
) )

View File

@ -148,8 +148,8 @@ def time_prompt_offset_from_now() -> datetime.datetime:
seconds_offset = math.floor( seconds_offset = math.floor(
float( float(
input( input(
"Please enter the time as a offset from now in seconds. Negative offset is " "Please enter the time as a offset from now in seconds. Negative offset"
"allowed: " " is allowed: "
) )
) )
) )

View File

@ -32,8 +32,8 @@ def main():
if not os.path.exists("setup.cfg"): if not os.path.exists("setup.cfg"):
additional_flags_second_step += " --max-line-length=100" additional_flags_second_step += " --max-line-length=100"
flake8_second_step_cmd = ( flake8_second_step_cmd = (
f"{python_exe} flake8 . {additional_flags_both_steps} {additional_flags_second_step}" f"{python_exe} flake8 . {additional_flags_both_steps} "
f" {exclude_dirs_flag}" f" {additional_flags_second_step} {exclude_dirs_flag}"
) )
os.system(flake8_second_step_cmd) os.system(flake8_second_step_cmd)

View File

@ -192,7 +192,8 @@ class CfdpInCcsdsWrapper(SpecificApidHandlerBase):
_LOGGER.info("Received Finished PDU TM") _LOGGER.info("Received Finished PDU TM")
else: else:
_LOGGER.info( _LOGGER.info(
f"Received File Directive PDU with type {pdu_base.directive_type!r} TM" "Received File Directive PDU with type"
f" {pdu_base.directive_type!r} TM"
) )
self.handler.pass_pdu_packet(pdu_base) self.handler.pass_pdu_packet(pdu_base)
@ -292,7 +293,7 @@ class TcHandler(TcHandlerBase):
if pdu.pdu_directive_type == DirectiveType.METADATA_PDU: if pdu.pdu_directive_type == DirectiveType.METADATA_PDU:
metadata = pdu.to_metadata_pdu() metadata = pdu.to_metadata_pdu()
self.queue_helper.add_log_cmd( self.queue_helper.add_log_cmd(
f"CFDP Source: Sending Metadata PDU for file with size " "CFDP Source: Sending Metadata PDU for file with size "
f"{metadata.file_size}" f"{metadata.file_size}"
) )
elif pdu.pdu_directive_type == DirectiveType.EOF_PDU: elif pdu.pdu_directive_type == DirectiveType.EOF_PDU:
@ -302,7 +303,7 @@ class TcHandler(TcHandlerBase):
else: else:
fd_pdu = pdu.to_file_data_pdu() fd_pdu = pdu.to_file_data_pdu()
self.queue_helper.add_log_cmd( self.queue_helper.add_log_cmd(
f"CFDP Source: Sending File Data PDU for segment at offset " "CFDP Source: Sending File Data PDU for segment at offset "
f"{fd_pdu.offset} with length {len(fd_pdu.file_data)}" f"{fd_pdu.offset} with length {len(fd_pdu.file_data)}"
) )
self.queue_helper.add_ccsds_tc(sp) self.queue_helper.add_ccsds_tc(sp)
@ -314,7 +315,8 @@ class TcHandler(TcHandlerBase):
if info.proc_type == TcQueueEntryType.PUS_TC: if info.proc_type == TcQueueEntryType.PUS_TC:
def_proc = info.to_def_procedure() def_proc = info.to_def_procedure()
_LOGGER.info( _LOGGER.info(
f"Finished queue for service {def_proc.service} and op code {def_proc.op_code}" f"Finished queue for service {def_proc.service} and op code"
f" {def_proc.op_code}"
) )
elif info.proc_type == TcProcedureType.CFDP: elif info.proc_type == TcProcedureType.CFDP:
_LOGGER.info("Finished CFDP queue") _LOGGER.info("Finished CFDP queue")