linter fixes, version bump

This commit is contained in:
Robin Müller 2023-06-10 14:42:28 +02:00
parent d390168829
commit 7b21070363
Signed by: muellerr
GPG Key ID: A649FB78196E3849
48 changed files with 179 additions and 215 deletions

View File

@ -7,6 +7,7 @@ exclude =
.git, .git,
__pycache__, __pycache__,
docs/conf.py, docs/conf.py,
deps
old, old,
build, build,
dist, dist,

View File

@ -1,12 +1,12 @@
__version__ = "3.1.1" __version__ = "4.0.0"
import logging import logging
from pathlib import Path from pathlib import Path
SW_NAME = "eive-tmtc" SW_NAME = "eive-tmtc"
VERSION_MAJOR = 3 VERSION_MAJOR = 4
VERSION_MINOR = 1 VERSION_MINOR = 0
VERSION_REVISION = 1 VERSION_REVISION = 0
EIVE_TMTC_ROOT = Path(__file__).parent EIVE_TMTC_ROOT = Path(__file__).parent
PACKAGE_ROOT = EIVE_TMTC_ROOT.parent PACKAGE_ROOT = EIVE_TMTC_ROOT.parent

View File

@ -9,7 +9,6 @@ import enum
from spacepackets import PacketType from spacepackets import PacketType
from spacepackets.ccsds import PacketId from spacepackets.ccsds import PacketId
from spacepackets.util import UnsignedByteField from spacepackets.util import UnsignedByteField
from pathlib import Path
PUS_APID = 0x65 PUS_APID = 0x65

View File

@ -8,7 +8,7 @@ import os.path
from typing import Dict from typing import Dict
from eive_tmtc import EIVE_TMTC_ROOT from eive_tmtc import EIVE_TMTC_ROOT
from tmtccmd.util.obj_id import ObjectIdDictT, ObjectIdU32 from tmtccmd.util.obj_id import ObjectIdU32
from tmtccmd.fsfw import parse_fsfw_objects_csv from tmtccmd.fsfw import parse_fsfw_objects_csv

View File

@ -74,7 +74,9 @@ def pack_get_param_command(
memory_address: Union[int, bytes], memory_address: Union[int, bytes],
parameter_size: int, parameter_size: int,
) -> PusTelecommand: ) -> PusTelecommand:
"""Function to generate a command to retrieve parameters like the temperature from a gomspace device. """Function to generate a command to retrieve parameters like the temperature from a
gomspace device.
@param object_id: The object id of the gomspace device handler. @param object_id: The object id of the gomspace device handler.
@param table_id: The table id of the gomspace device @param table_id: The table id of the gomspace device
@param memory_address: Address offset within table of the value to read. @param memory_address: Address offset within table of the value to read.
@ -218,9 +220,9 @@ def prompt_and_pack_set_integer_param_command(
def pack_ping_command(object_id: ObjectIdU32, data: bytearray) -> PusTelecommand: def pack_ping_command(object_id: ObjectIdU32, data: bytearray) -> PusTelecommand:
""" " Function to generate the command to ping a gomspace device """ " Function to generate the command to ping a gomspace device
@param object_id Object Id of the gomspace device handler. :param object_id: Object Id of the gomspace device handler.
@param data Bytearray containing the bytes to send to the gomspace device. For now the on board software :param data: Bytearray containing the bytes to send to the gomspace device. For now the on board
supports only the handling of up to 33 bytes. software supports only the handling of up to 33 bytes.
@note The ping request sends the specified data to a gompsace device. These @note The ping request sends the specified data to a gompsace device. These
data are simply copied by the device and then sent back. data are simply copied by the device and then sent back.
""" """

View File

@ -1,7 +1,5 @@
from eive_tmtc.config.definitions import CustomServiceList
from tmtccmd.config import TmtcDefinitionWrapper, OpCodeEntry, CoreServiceList from tmtccmd.config import TmtcDefinitionWrapper, OpCodeEntry, CoreServiceList
from tmtccmd.config.tmtc import ( from tmtccmd.config.tmtc import (
tmtc_definitions_provider,
call_all_definitions_providers, call_all_definitions_providers,
) )
from tmtccmd.config.globals import get_default_tmtc_defs from tmtccmd.config.globals import get_default_tmtc_defs

View File

@ -29,7 +29,7 @@ from eive_tmtc.tmtc.acs.reaction_wheels import (
pack_rw_ass_cmds, pack_rw_ass_cmds,
) )
from eive_tmtc.tmtc.payload.ploc_memory_dumper import pack_ploc_memory_dumper_cmd from eive_tmtc.tmtc.payload.ploc_memory_dumper import pack_ploc_memory_dumper_cmd
from eive_tmtc.tmtc.com.ccsds_handler import pack_ccsds_handler_test from eive_tmtc.tmtc.com.ccsds_handler import pack_ccsds_handler_command
from eive_tmtc.tmtc.core import pack_core_commands from eive_tmtc.tmtc.core import pack_core_commands
from eive_tmtc.tmtc.acs.star_tracker import pack_star_tracker_commands from eive_tmtc.tmtc.acs.star_tracker import pack_star_tracker_commands
from eive_tmtc.tmtc.com.syrlinks_handler import pack_syrlinks_command from eive_tmtc.tmtc.com.syrlinks_handler import pack_syrlinks_command
@ -75,7 +75,7 @@ from tmtccmd.util import ObjectIdU32
from eive_tmtc.utility.input_helper import InputHelper from eive_tmtc.utility.input_helper import InputHelper
def handle_default_procedure( def handle_default_procedure( # noqa C901: Complexity okay here.
tc_base: TcHandlerBase, tc_base: TcHandlerBase,
info: DefaultProcedureInfo, info: DefaultProcedureInfo,
queue_helper: DefaultPusQueueHelper, queue_helper: DefaultPusQueueHelper,
@ -170,7 +170,7 @@ def handle_default_procedure(
) )
if service == CustomServiceList.CCSDS_HANDLER.value: if service == CustomServiceList.CCSDS_HANDLER.value:
object_id = cast(ObjectIdU32, obj_id_man.get(CCSDS_HANDLER_ID)) object_id = cast(ObjectIdU32, obj_id_man.get(CCSDS_HANDLER_ID))
return pack_ccsds_handler_test( return pack_ccsds_handler_command(
object_id=object_id, q=queue_helper, op_code=op_code object_id=object_id, q=queue_helper, op_code=op_code
) )
if service == CustomServiceList.PDEC_HANDLER.value: if service == CustomServiceList.PDEC_HANDLER.value:

View File

@ -10,14 +10,11 @@ from PyQt5.QtWidgets import (
from PyQt5 import QtCore from PyQt5 import QtCore
from tmtccmd.config import CoreModeList
from tmtccmd.core.globals_manager import get_global
class Parameter: class Parameter:
def __init__(self, name: str, defaultValue: str, widget: QLineEdit): def __init__(self, name: str, default_value: str, widget: QLineEdit):
self.name = name self.name = name
self.defaultValue = defaultValue self.defaultValue = default_value
self.widget = widget self.widget = widget
self.value = self.defaultValue self.value = self.defaultValue
self.widget.setPlaceholderText(self.defaultValue) self.widget.setPlaceholderText(self.defaultValue)
@ -34,9 +31,9 @@ class ParameterDialog(QDialog):
self.setWindowTitle("Enter Parameters") self.setWindowTitle("Enter Parameters")
Buttons = QDialogButtonBox.Ok | QDialogButtonBox.Cancel | QDialogButtonBox.Reset buttons = QDialogButtonBox.Ok | QDialogButtonBox.Cancel | QDialogButtonBox.Reset
self.buttonBox = QDialogButtonBox(Buttons) self.buttonBox = QDialogButtonBox(buttons)
self.buttonBox.accepted.connect(self.accept) self.buttonBox.accepted.connect(self.accept)
self.buttonBox.rejected.connect(self.reject) self.buttonBox.rejected.connect(self.reject)
self.buttonBox.button(QDialogButtonBox.Reset).clicked.connect(self._reset) self.buttonBox.button(QDialogButtonBox.Reset).clicked.connect(self._reset)
@ -54,14 +51,14 @@ class ParameterDialog(QDialog):
self.parameters = {} self.parameters = {}
def addParameter(self, name: str, defaultValue: str): def add_parameter(self, name: str, default_value: str):
row = self.groupLayout.rowCount() + 1 row = self.groupLayout.rowCount() + 1
description = QLabel(name) description = QLabel(name)
self.groupLayout.addWidget(description, row, 0) self.groupLayout.addWidget(description, row, 0)
valueWidget = QLineEdit() value_widget = QLineEdit()
self.groupLayout.addWidget(valueWidget, row, 1) self.groupLayout.addWidget(value_widget, row, 1)
parameter = Parameter(name, defaultValue, valueWidget) parameter = Parameter(name, default_value, value_widget)
self.parameters[name] = parameter self.parameters[name] = parameter
@ -69,10 +66,10 @@ class ParameterDialog(QDialog):
for value in self.parameters.values(): for value in self.parameters.values():
value.reset() value.reset()
def getParameters(self): def get_parameters(self):
output = {} output = {}
for key, parameter in self.parameters.items(): for key, parameter in self.parameters.items():
if parameter.widget != None: if parameter.widget is not None:
if parameter.widget.text() != "": if parameter.widget.text() != "":
parameter.value = parameter.widget.text() parameter.value = parameter.widget.text()
output[key] = parameter.value output[key] = parameter.value
@ -103,9 +100,9 @@ def prompt_parameters_cli(param_list) -> dict:
def _gui_prompt(param_list) -> dict: def _gui_prompt(param_list) -> dict:
dialog = ParameterDialog() dialog = ParameterDialog()
for parameter in param_list: for parameter in param_list:
dialog.addParameter(parameter["name"], parameter["defaultValue"]) dialog.add_parameter(parameter["name"], parameter["defaultValue"])
dialog.exec_() dialog.exec_()
return dialog.getParameters() return dialog.get_parameters()
def _cli_prompt(param_list) -> dict: def _cli_prompt(param_list) -> dict:

View File

@ -1 +0,0 @@

View File

@ -86,9 +86,8 @@ def pack_cmd_ctrl_to_nml(
def get_object_from_op_code(op_code: str): def get_object_from_op_code(op_code: str):
try: try:
return bytes.fromhex(op_code) return bytes.fromhex(op_code)
except: except ValueError:
pass pass
if op_code in OpCode.THERMAL_CONTROLLER: if op_code in OpCode.THERMAL_CONTROLLER:
return obj_ids.THERMAL_CONTROLLER_ID return obj_ids.THERMAL_CONTROLLER_ID
if op_code in OpCode.CORE_CONTROLLER: if op_code in OpCode.CORE_CONTROLLER:

View File

@ -1,10 +1,12 @@
from __future__ import annotations from __future__ import annotations
import struct
import time import time
from datetime import timedelta from datetime import timedelta
from eive_tmtc.config.definitions import CustomServiceList from eive_tmtc.config.definitions import CustomServiceList
from eive_tmtc.config.object_ids import get_object_ids from eive_tmtc.config.object_ids import get_object_ids
from spacepackets.ecss import PusTelecommand
from tmtccmd.config import TmtcDefinitionWrapper, OpCodeEntry from tmtccmd.config import TmtcDefinitionWrapper, OpCodeEntry
from tmtccmd.config.tmtc import tmtc_definitions_provider from tmtccmd.config.tmtc import tmtc_definitions_provider
@ -14,7 +16,6 @@ from tmtccmd.pus.s11_tc_sched import (
create_enable_tc_sched_cmd, create_enable_tc_sched_cmd,
create_reset_tc_sched_cmd, create_reset_tc_sched_cmd,
) )
from tmtccmd.tc.pus_3_fsfw_hk import *
import eive_tmtc.config.object_ids as oids import eive_tmtc.config.object_ids as oids
from eive_tmtc.tmtc.tcs.brd_assy import OpCodeAssy as TcsOpCodes from eive_tmtc.tmtc.tcs.brd_assy import OpCodeAssy as TcsOpCodes
@ -49,6 +50,11 @@ from eive_tmtc.tmtc.acs.gyros import (
L3gGyroSetId as L3gGyroSetIds_1_3, L3gGyroSetId as L3gGyroSetIds_1_3,
) )
from eive_tmtc.tmtc.acs.gps import SetId as GpsSetIds from eive_tmtc.tmtc.acs.gps import SetId as GpsSetIds
from tmtccmd.tc.pus_3_fsfw_hk import (
make_sid,
disable_periodic_hk_command,
create_enable_periodic_hk_command_with_interval,
)
class OpCode: class OpCode:
@ -154,7 +160,7 @@ def add_proc_cmds(defs: TmtcDefinitionWrapper):
) )
def pack_generic_hk_listening_cmds( def pack_generic_hk_listening_cmds( # noqa C901: Complexity okay here.
q: DefaultPusQueueHelper, q: DefaultPusQueueHelper,
proc_key: str, proc_key: str,
sid_list: list[bytearray], sid_list: list[bytearray],
@ -222,7 +228,9 @@ def pack_generic_hk_listening_cmds(
diag_list.clear() diag_list.clear()
def pack_proc_commands(q: DefaultPusQueueHelper, op_code: str): def pack_proc_commands( # noqa C901: Complexity is okay here.
q: DefaultPusQueueHelper, op_code: str
): # noqa C901: Complexity okay here.
sid_list = [] sid_list = []
obj_id_dict = get_object_ids() obj_id_dict = get_object_ids()
if op_code in OpCode.RESET_SCHED: if op_code in OpCode.RESET_SCHED:
@ -733,7 +741,7 @@ def enable_listen_to_hk_for_x_seconds(
interval_seconds: float, interval_seconds: float,
): ):
q.add_log_cmd(f"Enabling periodic HK for {device}") q.add_log_cmd(f"Enabling periodic HK for {device}")
cmd_tuple = enable_periodic_hk_command_with_interval( cmd_tuple = create_enable_periodic_hk_command_with_interval(
diag=diag, sid=sid, interval_seconds=interval_seconds diag=diag, sid=sid, interval_seconds=interval_seconds
) )
for cmd in cmd_tuple: for cmd in cmd_tuple:

View File

@ -1,5 +1,16 @@
import logging
import struct import struct
from eive_tmtc.config.object_ids import * from eive_tmtc.config.object_ids import (
ACU_HANDLER_ID,
PDU_1_HANDLER_ID,
PDU_2_HANDLER_ID,
IMTQ_HANDLER_ID,
PLOC_MPSOC_ID,
PLOC_SUPV_ID,
CORE_CONTROLLER_ID,
STAR_TRACKER_ID,
P60_DOCK_HANDLER,
)
from eive_tmtc.tmtc.acs.imtq import ImtqActionId from eive_tmtc.tmtc.acs.imtq import ImtqActionId
from eive_tmtc.pus_tm.defs import PrintWrapper from eive_tmtc.pus_tm.defs import PrintWrapper
from eive_tmtc.tmtc.core import handle_core_ctrl_action_replies from eive_tmtc.tmtc.core import handle_core_ctrl_action_replies
@ -10,6 +21,7 @@ from eive_tmtc.tmtc.power.tm import handle_get_param_data_reply
from tmtccmd.tm import Service8FsfwTm from tmtccmd.tm import Service8FsfwTm
from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter
from spacepackets.ccsds.time import CdsShortTimestamp from spacepackets.ccsds.time import CdsShortTimestamp
from tmtccmd.util import ObjectIdDictT
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)

View File

@ -17,7 +17,9 @@ from spacepackets.ccsds.time import CdsShortTimestamp
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
def handle_event_packet(raw_tm: bytes, pw: PrintWrapper): def handle_event_packet( # noqa C901: Complexity okay here
raw_tm: bytes, pw: PrintWrapper
): # noqa C901: Complexity okay here
tm = Service5Tm.unpack(data=raw_tm, time_reader=CdsShortTimestamp.empty()) tm = Service5Tm.unpack(data=raw_tm, time_reader=CdsShortTimestamp.empty())
event_dict = get_event_dict() event_dict = get_event_dict()
event_def = tm.event_definition event_def = tm.event_definition
@ -33,7 +35,10 @@ def handle_event_packet(raw_tm: bytes, pw: PrintWrapper):
obj_name = event_def.reporter_id.hex(sep=",") obj_name = event_def.reporter_id.hex(sep=",")
else: else:
obj_name = obj_id_obj.name obj_name = obj_id_obj.name
generic_event_string = f"Object {obj_name} generated Event {info.name} (ID: {event_def.event_id:#04x}) at {tm.time_provider.as_date_time()}" generic_event_string = (
f"Object {obj_name} generated Event {info.name} (ID: {event_def.event_id:#04x}) "
f"at {tm.time_provider.as_date_time()}"
)
_LOGGER.info(generic_event_string) _LOGGER.info(generic_event_string)
pw.file_logger.info( pw.file_logger.info(
f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}: {generic_event_string}" f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}: {generic_event_string}"
@ -48,8 +53,8 @@ def handle_event_packet(raw_tm: bytes, pw: PrintWrapper):
if info.name == "SUPV_UPDATE_PROGRESS" or info.name == "WRITE_MEMORY_FAILED": if info.name == "SUPV_UPDATE_PROGRESS" or info.name == "WRITE_MEMORY_FAILED":
additional_event_info = f"Additional info: {info.info}" additional_event_info = f"Additional info: {info.info}"
context = ( context = (
f"Progress Percent: {event_def.param1 >> 24 & 0xff} | Sequence Count: {event_def.param1 & 0xffff} " f"Progress Percent: {event_def.param1 >> 24 & 0xff} | "
f"| Bytes Written: {event_def.param2}" f"Sequence Count: {event_def.param1 & 0xffff} | Bytes Written: {event_def.param2}"
) )
pw.dlog(additional_event_info) pw.dlog(additional_event_info)
pw.dlog(context) pw.dlog(context)
@ -75,7 +80,8 @@ def handle_event_packet(raw_tm: bytes, pw: PrintWrapper):
elif event_def.param1 == Mode.RAW: elif event_def.param1 == Mode.RAW:
mode_name = "Raw" mode_name = "Raw"
pw.dlog( pw.dlog(
f"Mode Number {event_def.param1}, Mode Name {mode_name}, Submode: {event_def.param2}" f"Mode Number {event_def.param1}, Mode Name {mode_name}, "
f"Submode: {event_def.param2}"
) )
if info.name == "INDIVIDUAL_BOOT_COUNTS": if info.name == "INDIVIDUAL_BOOT_COUNTS":
boot_count_00 = (event_def.param1 >> 16) & 0xFFFF boot_count_00 = (event_def.param1 >> 16) & 0xFFFF
@ -126,7 +132,10 @@ def handle_event_packet(raw_tm: bytes, pw: PrintWrapper):
submode = event_def.param2 submode = event_def.param2
pw.dlog(f"Mode Number {mode}, Submode: {submode}") pw.dlog(f"Mode Number {mode}, Submode: {submode}")
if not specific_handler: if not specific_handler:
additional_event_info = f"Additional info: {info.info} | P1: {event_def.param1} | P2: {event_def.param2}" additional_event_info = (
f"Additional info: {info.info} | P1: {event_def.param1} | "
f"P2: {event_def.param2}"
)
pw.dlog(additional_event_info) pw.dlog(additional_event_info)
if not specific_handler: if not specific_handler:
# printer.handle_long_tm_print(packet_if=tm.pus_tm, info_if=tm.pus_tm) # printer.handle_long_tm_print(packet_if=tm.pus_tm, info_if=tm.pus_tm)

View File

@ -24,7 +24,7 @@ from .action_reply_handler import handle_action_reply
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
def pus_factory_hook( def pus_factory_hook( # noqa C901 : Complexity okay here
packet: bytes, packet: bytes,
verif_wrapper: VerificationWrapper, verif_wrapper: VerificationWrapper,
printer: FsfwTmTcPrinter, printer: FsfwTmTcPrinter,
@ -78,7 +78,7 @@ def pus_factory_hook(
elif isinstance(scalar_param, float): elif isinstance(scalar_param, float):
pw.dlog(f"Scalar floating point parameter: {scalar_param}") pw.dlog(f"Scalar floating point parameter: {scalar_param}")
except ValueError as e: except ValueError as e:
pw.dlog("received {e} trying to parse scalar parameter") pw.dlog(f"received {e} trying to parse scalar parameter")
else: else:
# TODO: Could improve display further by actually displaying a matrix as a # TODO: Could improve display further by actually displaying a matrix as a
# matrix using row and column information # matrix using row and column information

View File

@ -1,5 +1,4 @@
"""HK Handling for EIVE OBSW""" """HK Handling for EIVE OBSW"""
import datetime
import logging import logging
# from pus_tm.tcp_server_objects import TCP_SEVER_SENSOR_TEMPERATURES # from pus_tm.tcp_server_objects import TCP_SEVER_SENSOR_TEMPERATURES
@ -85,7 +84,7 @@ def handle_hk_packet(
_LOGGER.warning("HK definitions printout not implemented yet") _LOGGER.warning("HK definitions printout not implemented yet")
def handle_regular_hk_print( def handle_regular_hk_print( # noqa C901: Complexity okay here
printer: FsfwTmTcPrinter, printer: FsfwTmTcPrinter,
object_id: ObjectIdU32, object_id: ObjectIdU32,
hk_packet: Service3Base, hk_packet: Service3Base,

View File

@ -1,13 +1,13 @@
import logging
import socket import socket
from typing import Optional from typing import Optional
import json import json
import base64 import base64
from tmtccmd.logging import get_console_logger
from tmtccmd.util.obj_id import ObjectIdU32 from tmtccmd.util.obj_id import ObjectIdU32
from dle_encoder import DleEncoder from dle_encoder import DleEncoder
LOGGER = get_console_logger() _LOGGER = logging.getLogger(__name__)
class TmTcpServer: class TmTcpServer:
@ -29,8 +29,8 @@ class TmTcpServer:
def __del__(self): def __del__(self):
try: try:
self.close() self.close()
except: except IOError:
LOGGER.warning("Could not close sockets!") _LOGGER.warning("Could not close sockets!")
def close(self): def close(self):
self.server_socket.close() self.server_socket.close()
@ -45,8 +45,8 @@ class TmTcpServer:
(self.client_connection, _) = self.server_socket.accept() (self.client_connection, _) = self.server_socket.accept()
self.client_connection.setblocking(False) self.client_connection.setblocking(False)
print("Client connected") print("Client connected")
except: except IOError:
# no client waiting
return return
data_json_bytes = json.dumps(dictionary).encode() data_json_bytes = json.dumps(dictionary).encode()
@ -57,7 +57,7 @@ class TmTcpServer:
try: try:
sent_length = self.client_connection.send(data_json_bytes) sent_length = self.client_connection.send(data_json_bytes)
except: except IOError:
self.client_connection = None self.client_connection = None
return return
if sent_length == 0: if sent_length == 0:

View File

@ -229,7 +229,7 @@ def acs_cmd_defs(defs: TmtcDefinitionWrapper):
@service_provider(CustomServiceList.ACS_CTRL.value) @service_provider(CustomServiceList.ACS_CTRL.value)
def pack_acs_ctrl_command(p: ServiceProviderParams): def pack_acs_ctrl_command(p: ServiceProviderParams): # noqa C901
op_code = p.op_code op_code = p.op_code
q = p.queue_helper q = p.queue_helper
if op_code in OpCodes.OFF: if op_code in OpCodes.OFF:
@ -507,7 +507,7 @@ def set_acs_ctrl_param_scalar(q: DefaultPusQueueHelper):
domain_id=sid, domain_id=sid,
unique_id=pid, unique_id=pid,
parameter=param, parameter=param,
).pack() )
) )
) )
case 1: case 1:
@ -519,7 +519,7 @@ def set_acs_ctrl_param_scalar(q: DefaultPusQueueHelper):
domain_id=sid, domain_id=sid,
unique_id=pid, unique_id=pid,
parameter=param, parameter=param,
).pack() )
) )
) )
case 2: case 2:
@ -531,7 +531,7 @@ def set_acs_ctrl_param_scalar(q: DefaultPusQueueHelper):
domain_id=sid, domain_id=sid,
unique_id=pid, unique_id=pid,
parameter=param, parameter=param,
).pack() )
) )
) )
case 3: case 3:
@ -543,7 +543,7 @@ def set_acs_ctrl_param_scalar(q: DefaultPusQueueHelper):
domain_id=sid, domain_id=sid,
unique_id=pid, unique_id=pid,
parameter=param, parameter=param,
).pack() )
) )
) )
case 4: case 4:
@ -555,7 +555,7 @@ def set_acs_ctrl_param_scalar(q: DefaultPusQueueHelper):
domain_id=sid, domain_id=sid,
unique_id=pid, unique_id=pid,
parameter=param, parameter=param,
).pack() )
) )
) )
@ -603,7 +603,7 @@ def set_acs_ctrl_param_vector(q: DefaultPusQueueHelper):
domain_id=sid, domain_id=sid,
unique_id=pid, unique_id=pid,
parameters=param, parameters=param,
).pack() )
) )
) )
else: else:
@ -636,7 +636,7 @@ def set_acs_ctrl_param_matrix(q: DefaultPusQueueHelper):
domain_id=sid, domain_id=sid,
unique_id=pid, unique_id=pid,
parameters=param, parameters=param,
).pack() )
) )
) )
else: else:
@ -663,7 +663,7 @@ def set_acs_ctrl_param_matrix(q: DefaultPusQueueHelper):
domain_id=sid, domain_id=sid,
unique_id=pid, unique_id=pid,
parameters=param, parameters=param,
).pack() )
) )
) )
else: else:
@ -1132,7 +1132,9 @@ def handle_act_cmd_data(pw: PrintWrapper, hk_data: bytes):
FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=3) FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=3)
def perform_mgm_calibration(pw: PrintWrapper, mgm_tuple: Tuple): def perform_mgm_calibration( # noqa C901: Complexity okay
pw: PrintWrapper, mgm_tuple: Tuple
): # noqa C901: Complexity okay
global CALIBR_SOCKET, CALIBRATION_ADDR global CALIBR_SOCKET, CALIBRATION_ADDR
try: try:
declare_api_cmd = "declare_api_version 2" declare_api_cmd = "declare_api_version 2"
@ -1146,7 +1148,7 @@ def perform_mgm_calibration(pw: PrintWrapper, mgm_tuple: Tuple):
return return
else: else:
if str(reply[0]) == "0": if str(reply[0]) == "0":
pw.dlog(f"MGM calibration: API version 2 was not accepted") pw.dlog("MGM calibration: API version 2 was not accepted")
return return
if len(mgm_tuple) != 3: if len(mgm_tuple) != 3:
pw.dlog(f"MGM tuple has invalid length {len(mgm_tuple)}") pw.dlog(f"MGM tuple has invalid length {len(mgm_tuple)}")
@ -1164,7 +1166,7 @@ def perform_mgm_calibration(pw: PrintWrapper, mgm_tuple: Tuple):
return return
else: else:
if str(reply[0]) == "0": if str(reply[0]) == "0":
pw.dlog(f"MGM calibration: magnetmeter field format was not accepted") pw.dlog("MGM calibration: magnetmeter field format was not accepted")
return return
pw.dlog(f"Sent data {mgm_list} to Helmholtz Testbench successfully") pw.dlog(f"Sent data {mgm_list} to Helmholtz Testbench successfully")
except socket.timeout: except socket.timeout:
@ -1172,6 +1174,6 @@ def perform_mgm_calibration(pw: PrintWrapper, mgm_tuple: Tuple):
except BlockingIOError as e: except BlockingIOError as e:
pw.dlog(f"Error {e}") pw.dlog(f"Error {e}")
except ConnectionResetError as e: except ConnectionResetError as e:
pw.dlog("Socket was closed") pw.dlog(f"Socket was closed: {e}")
except ConnectionRefusedError or OSError: except ConnectionRefusedError or OSError:
pw.dlog("Connecting to Calibration Socket on addrss {} failed") pw.dlog("Connecting to Calibration Socket on addrss {} failed")

View File

@ -24,7 +24,6 @@ from tmtccmd.tc.pus_3_fsfw_hk import (
generate_one_diag_command, generate_one_diag_command,
generate_one_hk_command, generate_one_hk_command,
create_request_one_diag_command, create_request_one_diag_command,
create_enable_periodic_hk_command,
create_disable_periodic_hk_command, create_disable_periodic_hk_command,
create_enable_periodic_hk_command_with_interval, create_enable_periodic_hk_command_with_interval,
) )
@ -129,7 +128,9 @@ def add_imtq_cmds(defs: TmtcDefinitionWrapper):
defs.add_service(CustomServiceList.IMTQ.value, "IMQT Device", oce) defs.add_service(CustomServiceList.IMTQ.value, "IMQT Device", oce)
def pack_imtq_test_into(object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code: str): def pack_imtq_test_into( # noqa C901
object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code: str
):
q.add_log_cmd( q.add_log_cmd(
f"Testing ISIS IMTQ handler with object id: {object_id.as_hex_string}" f"Testing ISIS IMTQ handler with object id: {object_id.as_hex_string}"
) )
@ -680,7 +681,7 @@ def handle_self_test_data(pw: PrintWrapper, hk_data: bytes):
init_coil_z_temperature, init_coil_z_temperature,
err, err,
raw_mag_x, raw_mag_x,
init_raw_mag_y, raw_mag_y,
raw_mag_z, raw_mag_z,
cal_mag_x, cal_mag_x,
cal_mag_y, cal_mag_y,

View File

@ -17,7 +17,6 @@ from tmtccmd.config.tmtc import tmtc_definitions_provider, TmtcDefinitionWrapper
from tmtccmd.tc import DefaultPusQueueHelper from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_200_fsfw_mode import create_mode_command, Mode from tmtccmd.tc.pus_200_fsfw_mode import create_mode_command, Mode
from tmtccmd.util import ObjectIdU32 from tmtccmd.util import ObjectIdU32
from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter
class OpCode: class OpCode:
@ -99,7 +98,7 @@ def handle_mgm_rm3100_hk_data(
object_id: ObjectIdU32, pw: PrintWrapper, set_id: int, hk_data: bytes object_id: ObjectIdU32, pw: PrintWrapper, set_id: int, hk_data: bytes
): ):
if set_id == MgmRm3100SetId.CORE_HK: if set_id == MgmRm3100SetId.CORE_HK:
fmt_str = f"!fff" fmt_str = "!fff"
inc_len = struct.calcsize(fmt_str) inc_len = struct.calcsize(fmt_str)
(field_x, field_y, field_z) = struct.unpack(fmt_str, hk_data[0 : 0 + inc_len]) (field_x, field_y, field_z) = struct.unpack(fmt_str, hk_data[0 : 0 + inc_len])
pw.dlog(f"Received MGM RM3100 from object {object_id}") pw.dlog(f"Received MGM RM3100 from object {object_id}")

View File

@ -144,7 +144,7 @@ def add_rw_cmds(defs: TmtcDefinitionWrapper):
) )
def pack_single_rw_test_into( def pack_single_rw_test_into( # noqa C901: Complexity is okay here.
object_id: bytes, rw_idx: int, q: DefaultPusQueueHelper, op_code: str object_id: bytes, rw_idx: int, q: DefaultPusQueueHelper, op_code: str
): ):
if op_code == OpCodesDev.SPEED: if op_code == OpCodesDev.SPEED:
@ -374,7 +374,7 @@ def handle_rw_hk_data(
f"State {state}. 0: Error, 1: Idle, 2: Coasting, 3: Running, speed stable, " f"State {state}. 0: Error, 1: Idle, 2: Coasting, 3: Running, speed stable, "
f"4: Running, speed changing" f"4: Running, speed changing"
) )
pw.dlog(f"Number Of Invalid Packets:") pw.dlog("Number Of Invalid Packets:")
pw.dlog("CRC | Length | CMD") pw.dlog("CRC | Length | CMD")
pw.dlog( pw.dlog(
f"{num_invalid_crc_packets} | {num_invalid_len_packets} | {num_invalid_cmd_packets}" f"{num_invalid_crc_packets} | {num_invalid_len_packets} | {num_invalid_cmd_packets}"
@ -385,8 +385,8 @@ def handle_rw_hk_data(
) )
pw.dlog("UART COM information:") pw.dlog("UART COM information:")
pw.dlog( pw.dlog(
f"NumBytesWritten | NumBytesRead | ParityErrs | NoiseErrs | FrameErrs | " "NumBytesWritten | NumBytesRead | ParityErrs | NoiseErrs | FrameErrs | "
f"RegOverrunErrs | TotalErrs" "RegOverrunErrs | TotalErrs"
) )
pw.dlog( pw.dlog(
f"{uart_num_of_bytes_written} | {uart_num_of_bytes_read} | {uart_num_parity_errors} | " f"{uart_num_of_bytes_written} | {uart_num_of_bytes_read} | {uart_num_parity_errors} | "
@ -394,7 +394,7 @@ def handle_rw_hk_data(
f"{uart_total_num_errors}" f"{uart_total_num_errors}"
) )
pw.dlog("SPI COM Info:") pw.dlog("SPI COM Info:")
pw.dlog(f"NumBytesWritten | NumBytesRead | RegOverrunErrs | TotalErrs") pw.dlog("NumBytesWritten | NumBytesRead | RegOverrunErrs | TotalErrs")
pw.dlog( pw.dlog(
f"{spi_num_bytes_written} | {spi_num_bytes_read} | {spi_num_reg_overrun_errors} | " f"{spi_num_bytes_written} | {spi_num_bytes_read} | {spi_num_reg_overrun_errors} | "
f"{spi_total_num_errors}" f"{spi_total_num_errors}"

View File

@ -202,7 +202,7 @@ def prompt_object_id_mode_cmd() -> bytes:
return STAR_TRACKER_ID return STAR_TRACKER_ID
def pack_star_tracker_commands( def pack_star_tracker_commands( # noqa C901
object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code: str object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code: str
): ):
q.add_log_cmd( q.add_log_cmd(

View File

@ -4,7 +4,8 @@
@brief Commanding of the star tracker image helper object which is responsible for uploading @brief Commanding of the star tracker image helper object which is responsible for uploading
and downloading images to/from the star tracker. and downloading images to/from the star tracker.
@details Images to uplaod must be previously transferred to the OBC with the CFDP protocol. @details Images to uplaod must be previously transferred to the OBC with the CFDP protocol.
Also downloaded images will be stored on the filesystem of the OBC and can be transferred via CFDP. Also downloaded images will be stored on the filesystem of the OBC and can be
transferred via CFDP.
@author J. Meier @author J. Meier
@date 29.11.2021 @date 29.11.2021
""" """

View File

@ -23,7 +23,7 @@ def handle_sus_hk(
channels.append(struct.unpack("!H", hk_data[current_idx : current_idx + 2])) channels.append(struct.unpack("!H", hk_data[current_idx : current_idx + 2]))
current_idx += 2 current_idx += 2
pw.dlog(f"Temperature: {temperature} C") pw.dlog(f"Temperature: {temperature} C")
pw.dlog(f"AIN Channel | Raw Value (hex) | Raw Value (dec)") pw.dlog("AIN Channel | Raw Value (hex) | Raw Value (dec)")
for idx, val in enumerate(channels): for idx, val in enumerate(channels):
pw.dlog(f"{idx} | {val[0]:#06x} |" + str(val[0]).ljust(5)) pw.dlog(f"{idx} | {val[0]:#06x} |" + str(val[0]).ljust(5))
FsfwTmTcPrinter.get_validity_buffer( FsfwTmTcPrinter.get_validity_buffer(

View File

@ -18,7 +18,6 @@ from tmtccmd.config.tmtc import (
from tmtccmd.tc import DefaultPusQueueHelper from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_200_fsfw_mode import create_mode_command, Mode from tmtccmd.tc.pus_200_fsfw_mode import create_mode_command, Mode
from tmtccmd.util import ObjectIdU32 from tmtccmd.util import ObjectIdU32
from eive_tmtc.config.object_ids import CCSDS_HANDLER_ID
class ActionId(enum.IntEnum): class ActionId(enum.IntEnum):
@ -67,7 +66,7 @@ class Info:
DISABLE_ACTION = "Disable TX (legacy)" DISABLE_ACTION = "Disable TX (legacy)"
def pack_ccsds_handler_test( def pack_ccsds_handler_command( # noqa C901
object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code: str object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code: str
): ):
obyt = object_id.as_bytes obyt = object_id.as_bytes

View File

@ -61,7 +61,7 @@ class Info:
@service_provider(CustomServiceList.COM_SS) @service_provider(CustomServiceList.COM_SS)
def build_com_subsystem_cmd(p: ServiceProviderParams): def build_com_subsystem_cmd(p: ServiceProviderParams): # noqa C901
q = p.queue_helper q = p.queue_helper
o = p.op_code o = p.op_code
prefix = "COM Subsystem" prefix = "COM Subsystem"

View File

@ -142,7 +142,7 @@ def normal_mode_cmd(q: DefaultPusQueueHelper, info: str, submode: int):
q.add_pus_tc(create_mode_command(SYRLINKS_HANDLER_ID, Mode.NORMAL, submode)) q.add_pus_tc(create_mode_command(SYRLINKS_HANDLER_ID, Mode.NORMAL, submode))
def pack_syrlinks_command( def pack_syrlinks_command( # noqa C901: Complexity okay here.
object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code: str object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code: str
): ):
obyt = object_id.as_bytes obyt = object_id.as_bytes

View File

@ -212,7 +212,9 @@ def add_core_controller_definitions(defs: TmtcDefinitionWrapper):
defs.add_service(CustomServiceList.CORE.value, "Core Controller", oce) defs.add_service(CustomServiceList.CORE.value, "Core Controller", oce)
def pack_core_commands(q: DefaultPusQueueHelper, op_code: str): def pack_core_commands( # noqa C901
q: DefaultPusQueueHelper, op_code: str
): # noqa: C901 , complexity okay here
if op_code == OpCode.ANNOUNCE_VERSION: if op_code == OpCode.ANNOUNCE_VERSION:
q.add_log_cmd(f"{Info.ANNOUNCE_VERSION}") q.add_log_cmd(f"{Info.ANNOUNCE_VERSION}")
q.add_pus_tc(create_action_cmd(CORE_CONTROLLER_ID, ActionId.ANNOUNCE_VERSION)) q.add_pus_tc(create_action_cmd(CORE_CONTROLLER_ID, ActionId.ANNOUNCE_VERSION))

View File

@ -1,5 +1,3 @@
import struct
from eive_tmtc.config.definitions import CustomServiceList from eive_tmtc.config.definitions import CustomServiceList
from eive_tmtc.tmtc.obj_prompt import prompt_object from eive_tmtc.tmtc.obj_prompt import prompt_object
from spacepackets.ecss import PusTelecommand from spacepackets.ecss import PusTelecommand

View File

@ -1,9 +1,7 @@
from eive_tmtc.config.object_ids import ( from eive_tmtc.config.object_ids import (
ACS_SUBSYSTEM_ID, ACS_SUBSYSTEM_ID,
ACS_CONTROLLER,
IMTQ_HANDLER_ID, IMTQ_HANDLER_ID,
GPS_0_HEALTH_DEV, GPS_0_HEALTH_DEV,
GPS_1_HEALTH_DEV,
GYRO_0_ADIS_HANDLER_ID, GYRO_0_ADIS_HANDLER_ID,
GYRO_1_L3G_HANDLER_ID, GYRO_1_L3G_HANDLER_ID,
ACS_BOARD_ASS_ID, ACS_BOARD_ASS_ID,

View File

@ -1,8 +1,8 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
""" """
@file ploc_memory_dumper.py @file ploc_memory_dumper.py
@brief This file implements the command to dump memory sectors of the PLOC. Memories of the PLOC which can be dumped @brief This file implements the command to dump memory sectors of the PLOC. Memories of the PLOC
are one MRAM, two flash memories and the SRAM. which can be dumped are one MRAM, two flash memories and the SRAM.
@author J. Meier @author J. Meier
@date 31.08.2021 @date 31.08.2021
""" """

View File

@ -175,7 +175,9 @@ def add_ploc_mpsoc_cmds(defs: TmtcDefinitionWrapper):
@service_provider(CustomServiceList.PLOC_MPSOC) @service_provider(CustomServiceList.PLOC_MPSOC)
def pack_ploc_mpsoc_commands(p: ServiceProviderParams): def pack_ploc_mpsoc_commands( # noqa C901
p: ServiceProviderParams,
): # noqa C901: Complexity okay here.
object_id = get_object_ids().get(PLOC_MPSOC_ID) object_id = get_object_ids().get(PLOC_MPSOC_ID)
q = p.queue_helper q = p.queue_helper
prefix = "PLOC MPSoC" prefix = "PLOC MPSoC"
@ -422,7 +424,7 @@ def prepare_replay_write_sequence_cmd(object_id: bytes) -> bytearray:
def prepare_cam_take_pic_cmd(object_id: bytes) -> bytearray: def prepare_cam_take_pic_cmd(object_id: bytes) -> bytearray:
selection = input("Use default parameter? (Y/N): ") selection = input("Use default parameter? (Y/N): ")
if selection is "Y" or selection is "y": if selection.lower() in ["y", "1", "yes"]:
filename = "0:/test" filename = "0:/test"
encoder_setting_y = 7 encoder_setting_y = 7
quantization_y = 0 quantization_y = 0

View File

@ -223,7 +223,7 @@ def add_ploc_supv_cmds(defs: TmtcDefinitionWrapper):
@service_provider(CustomServiceList.PLOC_SUPV) @service_provider(CustomServiceList.PLOC_SUPV)
def pack_ploc_supv_commands(p: ServiceProviderParams): def pack_ploc_supv_commands(p: ServiceProviderParams): # noqa C901
q = p.queue_helper q = p.queue_helper
op_code = p.op_code op_code = p.op_code
object_id = get_object_ids().get(PLOC_SUPV_ID) object_id = get_object_ids().get(PLOC_SUPV_ID)
@ -484,7 +484,9 @@ def pack_ploc_supv_commands(p: ServiceProviderParams):
def pack_sel_boot_image_cmd( def pack_sel_boot_image_cmd(
object_id: bytes, mem: int, bp0: int, bp1: int, bp2: int object_id: bytes, mem: int, bp0: int, bp1: int, bp2: int
) -> bytearray: ) -> bytearray:
"""This function can be used to generate the command to select the image from which the MPSoC will boot """This function can be used to generate the command to select the image from which the MPSoC
will boot.
@param object_id The object id of the PLOC supervisor handler. @param object_id The object id of the PLOC supervisor handler.
@param mem The memory from which the MPSoC shall boot (NVM0 - 0, NVM1 - 1) @param mem The memory from which the MPSoC shall boot (NVM0 - 0, NVM1 - 1)
@param bp0 Partition pin 0 @param bp0 Partition pin 0

View File

@ -111,7 +111,7 @@ def handle_rad_sensor_data(pw: PrintWrapper, set_id: int, hk_data: bytes):
) )
ain_dict = {0: ain0, 1: ain1, 4: ain4, 5: ain5, 6: ain6, 7: ain7} ain_dict = {0: ain0, 1: ain1, 4: ain4, 5: ain5, 6: ain6, 7: ain7}
pw.dlog(f"Temperature: {temp} C") pw.dlog(f"Temperature: {temp} C")
pw.dlog(f"AIN Channel | Raw Value (hex) | Raw Value (dec)") pw.dlog("AIN Channel | Raw Value (hex) | Raw Value (dec)")
for idx, val in ain_dict.items(): for idx, val in ain_dict.items():
pw.dlog(f"{idx} | {val:#06x} | {str(val).ljust(5)}") pw.dlog(f"{idx} | {val:#06x} | {str(val).ljust(5)}")
current_idx += inc_len current_idx += inc_len

View File

@ -75,7 +75,7 @@ def add_scex_cmds(defs: TmtcDefinitionWrapper):
@service_provider(CustomServiceList.SCEX.value) @service_provider(CustomServiceList.SCEX.value)
def pack_scex_cmds(p: ServiceProviderParams): def pack_scex_cmds(p: ServiceProviderParams): # noqa C901
op_code = p.op_code op_code = p.op_code
q = p.queue_helper q = p.queue_helper
if op_code in OpCode.SWITCH_ON: if op_code in OpCode.SWITCH_ON:

View File

@ -140,7 +140,7 @@ def pack_common_power_cmds(
q.add_pus_tc(disable_periodic_hk_command(True, make_sid(objb, SetId.CORE))) q.add_pus_tc(disable_periodic_hk_command(True, make_sid(objb, SetId.CORE)))
def pack_common_gomspace_cmds( def pack_common_gomspace_cmds( # noqa C901: Complexity is okay here.
prefix: str, object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code: str prefix: str, object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code: str
): ):
objb = object_id.as_bytes objb = object_id.as_bytes

View File

@ -98,7 +98,9 @@ class P60DockHkTable:
wdt_gnd_left = TableEntry(bytearray([0x00, 0xA8]), TableEntry.uint32_size) wdt_gnd_left = TableEntry(bytearray([0x00, 0xA8]), TableEntry.uint32_size)
def pack_p60dock_cmds(object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code: str): def pack_p60dock_cmds( # noqa C901: Complexity okay here.
object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code: str
):
objb = object_id.as_bytes objb = object_id.as_bytes
pack_common_power_cmds("P60 Dock", object_id, q, op_code) pack_common_power_cmds("P60 Dock", object_id, q, op_code)
pack_common_gomspace_cmds("P60 Dock", object_id, q, op_code) pack_common_gomspace_cmds("P60 Dock", object_id, q, op_code)

View File

@ -3,8 +3,16 @@
@author J. Meier @author J. Meier
@date 17.12.2020 @date 17.12.2020
""" """
import enum
from eive_tmtc.config.definitions import CustomServiceList from eive_tmtc.config.definitions import CustomServiceList
from eive_tmtc.config.object_ids import PDU_1_HANDLER_ID from eive_tmtc.config.object_ids import PDU_1_HANDLER_ID
from eive_tmtc.gomspace.gomspace_common import (
pack_ping_command,
TableIds,
pack_get_param_command,
)
from eive_tmtc.gomspace.gomspace_pdu_definitions import PduHkTable
from eive_tmtc.tmtc.power.common_power import ( from eive_tmtc.tmtc.power.common_power import (
pack_common_gomspace_cmds, pack_common_gomspace_cmds,
req_hk_cmds, req_hk_cmds,
@ -17,15 +25,15 @@ from eive_tmtc.tmtc.power.common_power import (
pack_common_power_cmds, pack_common_power_cmds,
GomspaceOpCode, GomspaceOpCode,
GsInfo, GsInfo,
PowerInfo,
add_common_power_defs, add_common_power_defs,
SetId, SetId,
) )
from spacepackets.ecss import PusTelecommand
from eive_tmtc.gomspace.gomspace_common import *
from eive_tmtc.gomspace.gomspace_pdu_definitions import *
from tmtccmd.config import OpCodeEntry, TmtcDefinitionWrapper from tmtccmd.config import OpCodeEntry, TmtcDefinitionWrapper
from tmtccmd.config.tmtc import tmtc_definitions_provider from tmtccmd.config.tmtc import tmtc_definitions_provider
from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.util import ObjectIdU32
class Pdu1InfoBase: class Pdu1InfoBase:
@ -102,7 +110,9 @@ def info_off_pdu1(base: str) -> str:
return "PDU1: " + base + " off" return "PDU1: " + base + " off"
def pdu1_switch_cmds(q: DefaultPusQueueHelper, op_code: str): def pdu1_switch_cmds( # noqa C901: Complexity is okay here.
q: DefaultPusQueueHelper, op_code: str
): # noqa C901: Complexity okay here
if op_code in PowerOpCodes.TCS_ON: if op_code in PowerOpCodes.TCS_ON:
tcs_on_cmd(q) tcs_on_cmd(q)
elif op_code in PowerOpCodes.TCS_OFF: elif op_code in PowerOpCodes.TCS_OFF:

View File

@ -6,7 +6,18 @@
@author J. Meier @author J. Meier
@date 17.12.2020 @date 17.12.2020
""" """
import enum
from eive_tmtc.config.object_ids import PDU_2_HANDLER_ID from eive_tmtc.config.object_ids import PDU_2_HANDLER_ID
from eive_tmtc.gomspace.gomspace_common import (
pack_reboot_command,
pack_ping_command,
pack_gnd_wdt_reset_command,
pack_get_param_command,
TableIds,
pack_request_full_hk_table_command,
)
from eive_tmtc.gomspace.gomspace_pdu_definitions import PduHkTable, PduConfigTable
from eive_tmtc.tmtc.power.common_power import ( from eive_tmtc.tmtc.power.common_power import (
pack_common_gomspace_cmds, pack_common_gomspace_cmds,
req_hk_cmds, req_hk_cmds,
@ -20,10 +31,11 @@ from eive_tmtc.tmtc.power.common_power import (
SetId, SetId,
add_common_power_defs, add_common_power_defs,
) )
from eive_tmtc.gomspace.gomspace_common import * from spacepackets.ecss import PusTelecommand
from eive_tmtc.gomspace.gomspace_pdu_definitions import *
from tmtccmd.config import OpCodeEntry, TmtcDefinitionWrapper from tmtccmd.config import OpCodeEntry, TmtcDefinitionWrapper
from tmtccmd.config.tmtc import tmtc_definitions_provider from tmtccmd.config.tmtc import tmtc_definitions_provider
from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.util import ObjectIdU32
class Pdu2InfoBase: class Pdu2InfoBase:
@ -159,7 +171,7 @@ def add_pdu2_cmds(defs: TmtcDefinitionWrapper):
) )
def pdu2_switch_cmds(q: DefaultPusQueueHelper, op_code: str): def pdu2_switch_cmds(q: DefaultPusQueueHelper, op_code: str): # noqa C901
if op_code in PowerOpCodes.PL_PCDU_VBAT_NOM_ON: if op_code in PowerOpCodes.PL_PCDU_VBAT_NOM_ON:
pl_pcdu_bat_nom_on_cmd(q) pl_pcdu_bat_nom_on_cmd(q)
elif op_code in PowerOpCodes.PL_PCDU_VBAT_NOM_OFF: elif op_code in PowerOpCodes.PL_PCDU_VBAT_NOM_OFF:

View File

@ -161,7 +161,9 @@ def add_pl_pcdu_cmds(defs: TmtcDefinitionWrapper):
defs.add_service(CustomServiceList.PL_PCDU.value, "PL PCDU", oce) defs.add_service(CustomServiceList.PL_PCDU.value, "PL PCDU", oce)
def pack_pl_pcdu_commands(q: DefaultPusQueueHelper, op_code: str): def pack_pl_pcdu_commands( # noqa C901: Complexity is okay here.
q: DefaultPusQueueHelper, op_code: str
): # noqa C901: Complexity is okay here.
if op_code in OpCode.SWITCH_ON: if op_code in OpCode.SWITCH_ON:
pack_pl_pcdu_mode_cmd(q=q, info=Info.SWITCH_ON, mode=Mode.ON, submode=0) pack_pl_pcdu_mode_cmd(q=q, info=Info.SWITCH_ON, mode=Mode.ON, submode=0)
if op_code in OpCode.SWITCH_OFF: if op_code in OpCode.SWITCH_OFF:

View File

@ -35,7 +35,6 @@ from eive_tmtc.tmtc.power.acu import add_acu_cmds, acu_req_hk_cmds
from tmtccmd.tc.pus_3_fsfw_hk import ( from tmtccmd.tc.pus_3_fsfw_hk import (
create_request_one_diag_command, create_request_one_diag_command,
make_sid, make_sid,
create_request_one_hk_command,
) )
from tmtccmd.config.tmtc import tmtc_definitions_provider from tmtccmd.config.tmtc import tmtc_definitions_provider
from tmtccmd.tc import DefaultPusQueueHelper from tmtccmd.tc import DefaultPusQueueHelper

View File

@ -119,7 +119,7 @@ class DevicesInfoParser:
return current_idx return current_idx
def print(self, pw: PrintWrapper): def print(self, pw: PrintWrapper):
pw.dlog(f"Device Type | Device State (0:None | 1:OK | 3:ERROR | 4:NOT FOUND)") pw.dlog("Device Type | Device State (0:None | 1:OK | 3:ERROR | 4:NOT FOUND)")
for i in range(len(self.dev_types)): for i in range(len(self.dev_types)):
pw.dlog( pw.dlog(
f"{self.map_idx_to_type(self.dev_types[i])} | {self.dev_statuses[i]}" f"{self.map_idx_to_type(self.dev_types[i])} | {self.dev_statuses[i]}"
@ -181,7 +181,7 @@ def handle_pdu_data(pw: PrintWrapper, pdu_idx: int, set_id: int, hk_data: bytes)
wdt = WdtInfo(pw=pw) wdt = WdtInfo(pw=pw)
current_idx = wdt.parse(wdt_data=hk_data[current_idx:], current_idx=current_idx) current_idx = wdt.parse(wdt_data=hk_data[current_idx:], current_idx=current_idx)
wdt.print() wdt.print()
pw.dlog(f"PDU Device Types: 0:FRAM|1:ADC|2:ADC|3:TempSens|4,5,6,7:Reserved") pw.dlog("PDU Device Types: 0:FRAM|1:ADC|2:ADC|3:TempSens|4,5,6,7:Reserved")
dev_parser.print(pw=pw) dev_parser.print(pw=pw)
if set_id == SetId.CORE or set_id == SetId.CORE: if set_id == SetId.CORE or set_id == SetId.CORE:
pw.dlog(f"Received PDU HK from PDU {pdu_idx}") pw.dlog(f"Received PDU HK from PDU {pdu_idx}")
@ -377,9 +377,7 @@ def handle_acu_hk_data(pw: PrintWrapper, set_id: int, hk_data: bytes):
current_idx += inc_len current_idx += inc_len
pw.dlog("Received ACU Core HK. Voltages in mV, currents in mA") pw.dlog("Received ACU Core HK. Voltages in mV, currents in mA")
pw.dlog(f"VCC {vcc} mV | VBAT {vbat} mV | MPPT Mode {mppt_mode}") pw.dlog(f"VCC {vcc} mV | VBAT {vbat} mV | MPPT Mode {mppt_mode}")
header_str = ( header_str = "Channel | Input U [mV] | Input I [mA] | U Boost [mV] | Power [mW]"
f"Channel | Input U [mV] | Input I [mA] | U Boost [mV] | Power [mW]"
)
pw.dlog(header_str) pw.dlog(header_str)
for i in range(6): for i in range(6):
pw.dlog( pw.dlog(
@ -416,7 +414,8 @@ def handle_acu_hk_data(pw: PrintWrapper, set_id: int, hk_data: bytes):
pw.dlog("Received ACU Aux HK. Voltages in mV, currents in mA") pw.dlog("Received ACU Aux HK. Voltages in mV, currents in mA")
pw.dlog( pw.dlog(
f"DAC Enable States: DAC 0 {dac_enb_str[0]} | DAC 1 {dac_enb_str[1]} | DAC 2 {dac_enb_str[2]}" f"DAC Enable States: DAC 0 {dac_enb_str[0]} | DAC 1 {dac_enb_str[1]} | "
f"DAC 2 {dac_enb_str[2]}"
) )
pw.dlog(f"Boot Cause {boot_cause} | Reset Cause {reset_cause}") pw.dlog(f"Boot Cause {boot_cause} | Reset Cause {reset_cause}")
pw.dlog( pw.dlog(
@ -424,8 +423,8 @@ def handle_acu_hk_data(pw: PrintWrapper, set_id: int, hk_data: bytes):
) )
pw.dlog( pw.dlog(
f"ACU Dev Types: 0:FRAM|1:ADC|2:ADC|3:DAC|4:DAC|" "ACU Dev Types: 0:FRAM|1:ADC|2:ADC|3:DAC|4:DAC|"
f"5:DAC|6:TempSens|7:Reserved" "5:DAC|6:TempSens|7:Reserved"
) )
dev_parser.print(pw=pw) dev_parser.print(pw=pw)
FsfwTmTcPrinter.get_validity_buffer( FsfwTmTcPrinter.get_validity_buffer(

View File

@ -1,7 +1,8 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
""" """
@file solar_array_deployment.py @file solar_array_deployment.py
@brief The test function in this file simply returns a command which triggers the solar array deployment. @brief The test function in this file simply returns a command which triggers the solar array
deployment.
@author J. Meier @author J. Meier
@date 15.02.2021 @date 15.02.2021
""" """

View File

@ -11,7 +11,6 @@ from tmtccmd.tc import service_provider
from eive_tmtc.config.object_ids import EIVE_SYSTEM_ID from eive_tmtc.config.object_ids import EIVE_SYSTEM_ID
from tmtccmd.tc.pus_200_fsfw_mode import ( from tmtccmd.tc.pus_200_fsfw_mode import (
create_mode_command, create_mode_command,
Mode,
create_announce_mode_recursive_command, create_announce_mode_recursive_command,
) )
from tmtccmd.tc.pus_8_fsfw_funccmd import create_action_cmd from tmtccmd.tc.pus_8_fsfw_funccmd import create_action_cmd

View File

@ -1,11 +1,9 @@
import logging import logging
import pprint
import struct import struct
from eive_tmtc.pus_tm.defs import PrintWrapper from eive_tmtc.pus_tm.defs import PrintWrapper
from tmtccmd.fsfw import validity_buffer_list from tmtccmd.fsfw import validity_buffer_list
from tmtccmd.util import ObjectIdU32 from tmtccmd.util import ObjectIdU32
from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter
from .defs import CtrlSetId from .defs import CtrlSetId
from .heater import HEATER_LOCATION from .heater import HEATER_LOCATION

View File

@ -148,7 +148,8 @@ def time_prompt_offset_from_now() -> datetime.datetime:
seconds_offset = math.floor( seconds_offset = math.floor(
float( float(
input( input(
"Please enter the time as a offset from now in seconds. Negative offset is allowed: " "Please enter the time as a offset from now in seconds. Negative offset is "
"allowed: "
) )
) )
) )

View File

@ -1,7 +1,8 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
""" """
@file input_helper.py @file input_helper.py
@brief This class can be used to get user input. A dictionary must be provided which describes the input options. @brief This class can be used to get user input. A dictionary must be provided which describes the
input options.
@author J. Meier @author J. Meier
@date 13.02.2021 @date 13.02.2021
""" """

View File

@ -45,7 +45,7 @@ except ImportError as error:
try: try:
import tmtccmd import tmtccmd
except ImportError as error: except ImportError:
run_tmtc_commander = None run_tmtc_commander = None
initialize_tmtc_commander = None initialize_tmtc_commander = None
tb = traceback.format_exc() tb = traceback.format_exc()
@ -204,7 +204,7 @@ class CfdpInCcsdsWrapper(SpecificApidHandlerBase):
_LOGGER.info("Received File Data PDU TM") _LOGGER.info("Received File Data PDU TM")
else: else:
if pdu_base.directive_type == DirectiveType.FINISHED_PDU: if pdu_base.directive_type == DirectiveType.FINISHED_PDU:
_LOGGER.info(f"Received Finished PDU TM") _LOGGER.info("Received Finished PDU TM")
else: else:
_LOGGER.info( _LOGGER.info(
f"Received File Directive PDU with type {pdu_base.directive_type!r} TM" f"Received File Directive PDU with type {pdu_base.directive_type!r} TM"
@ -312,7 +312,7 @@ class TcHandler(TcHandlerBase):
) )
elif pdu.pdu_directive_type == DirectiveType.EOF_PDU: elif pdu.pdu_directive_type == DirectiveType.EOF_PDU:
self.queue_helper.add_log_cmd( self.queue_helper.add_log_cmd(
f"CFDP Source: Sending EOF PDU" "CFDP Source: Sending EOF PDU"
) )
else: else:
fd_pdu = pdu.to_file_data_pdu() fd_pdu = pdu.to_file_data_pdu()
@ -332,7 +332,7 @@ class TcHandler(TcHandlerBase):
f"Finished queue for service {def_proc.service} and op code {def_proc.op_code}" f"Finished queue for service {def_proc.service} and op code {def_proc.op_code}"
) )
elif info.proc_type == TcProcedureType.CFDP: elif info.proc_type == TcProcedureType.CFDP:
_LOGGER.info(f"Finished CFDP queue") _LOGGER.info("Finished CFDP queue")
def setup_params() -> SetupWrapper: def setup_params() -> SetupWrapper:
@ -437,7 +437,7 @@ def setup_backend(
return tmtc_backend return tmtc_backend
def main(): def main(): # noqa C901: Complexity okay here.
print(f"-- eive tmtc v{__version__} --") print(f"-- eive tmtc v{__version__} --")
print(f"-- spacepackets v{spacepackets.__version__} --") print(f"-- spacepackets v{spacepackets.__version__} --")
add_colorlog_console_logger(_LOGGER) add_colorlog_console_logger(_LOGGER)

View File

@ -1,87 +0,0 @@
#!/usr/bin/env python3
"""EIVE TMTC Commander"""
import sys
import traceback
try:
import tmtccmd.runner as tmtccmd
from tmtccmd.config import default_json_path, SetupArgs, CoreGlobalIds
from tmtccmd.config.definitions import CoreModeList
from tmtccmd.config.args import (
create_default_args_parser,
add_default_tmtccmd_args,
parse_default_input_arguments,
handle_unspecified_args,
)
from tmtccmd.ccsds.handler import CcsdsTmHandler, ApidHandler
from tmtccmd.logging import get_console_logger
from tmtccmd.logging.pus import create_tmtc_logger
except ImportError as error:
run_tmtc_commander = None
initialize_tmtc_commander = None
tb = traceback.format_exc()
print(tb)
print("Python tmtccmd submodule could not be imported")
sys.exit(1)
try:
import spacepackets
except ImportError as error:
print(error)
print("Python spacepackets module could not be imported")
print(
'Install with "cd spacepackets && python3 -m pip intall -e ." for interative installation'
)
sys.exit(1)
from config.definitions import PUS_APID
from pus_tc.procedure_packer import pre_tc_send_cb
from pus_tm.factory_hook import ccsds_tm_handler
from tmtcc import tmtcc_pre_args
def main():
hook_obj = tmtcc_pre_args()
arg_parser = create_default_args_parser()
add_default_tmtccmd_args(arg_parser)
args = parse_default_input_arguments(arg_parser, hook_obj)
setup_args = SetupArgs(
hook_obj=hook_obj, use_gui=False, apid=PUS_APID, cli_args=args
)
apid_handler = ApidHandler(cb=ccsds_tm_handler, queue_len=50, user_args=None)
ccsds_handler = CcsdsTmHandler()
ccsds_handler.add_tm_handler(apid=PUS_APID, handler=apid_handler)
tmtccmd.setup(setup_args=setup_args)
tmtccmd.add_ccsds_handler(ccsds_handler)
tmtc_backend = tmtccmd.create_default_tmtc_backend(
setup_args=setup_args,
tm_handler=ccsds_handler,
)
tmtc_file_logger = create_tmtc_logger()
tmtc_backend.usr_send_wrapper = (pre_tc_send_cb, tmtc_file_logger)
tmtc_backend.set_mode(CoreModeList.CONTINUOUS_MODE)
# get_console_logger().info("Disabling console logger for continuous operation")
# get_console_logger().setLevel("ERROR")
tmtccmd.init_and_start_daemons(tmtc_backend=tmtc_backend)
tmtc_backend.perform_operation()
# remove cmdline args so that we can reuse code
sys.argv = sys.argv[:1]
while True:
args.service = None
args.op_code = None
handle_unspecified_args(args, hook_obj.get_service_op_code_dictionary())
tmtc_backend.set_service(args.service)
tmtc_backend.set_opcode(args.op_code)
tmtc_backend.set_mode(CoreModeList.CONTINUOUS_MODE)
tmtc_backend.perform_operation()
if __name__ == "__main__":
main()