Merge pull request 'Major TMTC Update' (#52) from mueller/major-tmtc-update into develop

Reviewed-on: #52
This commit is contained in:
Jakob Meier 2022-04-07 11:39:45 +02:00
commit 25bab108df
34 changed files with 750 additions and 452 deletions

View File

@ -4,7 +4,7 @@
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/tmtccmd/src" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/tmtccmd/src/tests" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />

View File

@ -0,0 +1,24 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="tmtccli-example" type="PythonConfigurationType" factoryName="Python" folderName="Example">
<module name="tmtc" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/tmtccmd/examples" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtccmd/examples/tmtccli.py" />
<option name="PARAMETERS" value="" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" />
<option name="MODULE_MODE" value="false" />
<option name="REDIRECT_INPUT" value="false" />
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
</component>

View File

@ -0,0 +1,24 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="tmtcgui-example" type="PythonConfigurationType" factoryName="Python" folderName="Example">
<module name="tmtc" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/tmtccmd/examples" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtccmd/examples/tmtcgui.py" />
<option name="PARAMETERS" value="" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" />
<option name="MODULE_MODE" value="false" />
<option name="REDIRECT_INPUT" value="false" />
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
</component>

View File

@ -6,7 +6,7 @@
import enum
from tmtccmd.core.backend import TmTcHandler
from tmtccmd.utility.logger import get_console_logger
from tmtccmd.logging import get_console_logger
LOGGER = get_console_logger()

View File

@ -12,11 +12,8 @@ import argparse
from config.definitions import CustomServiceList, PUS_APID
from config.custom_mode_op import CustomModeList
from tmtccmd.config.definitions import CoreComInterfaces
from tmtccmd.config.globals import (
set_default_globals_pre_args_parsing,
set_default_globals_post_args_parsing,
)
from tmtccmd.utility.logger import get_console_logger
from tmtccmd.config.globals import set_default_globals_pre_args_parsing
from tmtccmd.logging import get_console_logger
LOGGER = get_console_logger()
@ -34,12 +31,3 @@ def set_globals_pre_args_parsing(gui: bool = False):
tm_apid=PUS_APID,
com_if_id=CoreComInterfaces.TCPIP_UDP.value,
)
def add_globals_post_args_parsing(args: argparse.Namespace, json_cfg_path: str):
set_default_globals_post_args_parsing(
args=args,
custom_services_list=[CustomServiceList],
custom_modes_list=[CustomModeList],
json_cfg_path=json_cfg_path,
)

View File

@ -13,7 +13,7 @@ from tmtccmd.pus.obj_id import ObjectIdDictT
from tmtccmd.com_if.com_interface_base import CommunicationInterface
from tmtccmd.core.backend import TmTcHandler
from tmtccmd.config.hook import TmTcHookBase
from tmtccmd.utility.tmtc_printer import TmTcPrinter
from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter
from tmtccmd.config.globals import OpCodeDictKeys
from config.definitions import CustomServiceList
@ -21,43 +21,24 @@ from config.retvals import get_retval_dict
class EiveHookObject(TmTcHookBase):
def __init__(self, json_cfg_path: str):
super().__init__(json_cfg_path=json_cfg_path)
def get_service_op_code_dictionary(self) -> ServiceOpCodeDictT:
from tmtccmd.config.globals import get_default_service_op_code_dict
service_op_code_dict = get_default_service_op_code_dict()
get_eive_service_op_code_dict(service_op_code_dict=service_op_code_dict)
return service_op_code_dict
def get_json_config_file_path(self) -> str:
"""The user can specify a path and filename for the JSON configuration file by overriding
this function.
:return:
"""
return "config/tmtc_config.json"
def add_globals_pre_args_parsing(self, gui: bool = False):
from config.globals_config import set_globals_pre_args_parsing
set_globals_pre_args_parsing(gui=gui)
def add_globals_post_args_parsing(self, args: argparse.Namespace):
from config.globals_config import add_globals_post_args_parsing
add_globals_post_args_parsing(
args=args, json_cfg_path=self.get_json_config_file_path()
)
def assign_communication_interface(
self, com_if_key: str, tmtc_printer: TmTcPrinter
self, com_if_key: str
) -> Union[CommunicationInterface, None]:
from tmtccmd.config.com_if import create_communication_interface_default
return create_communication_interface_default(
com_if_key=com_if_key,
tmtc_printer=tmtc_printer,
json_cfg_path=self.get_json_config_file_path(),
json_cfg_path=self.json_cfg_path,
space_packet_ids=(0x0865,),
)
@ -78,39 +59,6 @@ class EiveHookObject(TmTcHookBase):
return get_object_ids()
@staticmethod
def handle_service_8_telemetry(
object_id: bytes, action_id: int, custom_data: bytearray
) -> DataReplyUnpacked:
from pus_tm.service_8_hook import user_analyze_service_8_data
return user_analyze_service_8_data(
object_id=object_id, action_id=action_id, custom_data=custom_data
)
@staticmethod
def handle_service_3_housekeeping(
object_id: bytes, set_id: int, hk_data: bytearray, service3_packet: Service3Base
) -> HkReplyUnpacked:
from pus_tm.hk_handling import handle_user_hk_packet
return handle_user_hk_packet(
object_id=object_id,
set_id=set_id,
hk_data=hk_data,
service3_packet=service3_packet,
)
@staticmethod
def handle_service_5_event(
object_id: bytes, event_id: int, param_1: int, param_2: int
) -> str:
from pus_tm.event_handler import handle_event_packet
return handle_event_packet(
object_id=object_id, event_id=event_id, param_1=param_1, param_2=param_2
)
def get_retval_dict(self) -> RetvalDictT:
return get_retval_dict()

View File

@ -5,8 +5,9 @@
"""
import os.path
from tmtccmd.pus.obj_id import ObjectIdDictT
from tmtccmd.utility.fsfw import parse_fsfw_objects_csv
from tmtccmd.utility.logger import get_console_logger
from tmtccmd.fsfw import parse_fsfw_objects_csv
from tmtccmd.logging import get_console_logger
LOGGER = get_console_logger()
DEFAULT_OBJECTS_CSV_PATH = "config/objects.csv"

View File

@ -1,6 +1,6 @@
import os
from tmtccmd.utility.fsfw import parse_fsfw_returnvalues_csv, RetvalDictT
from tmtccmd.utility.logger import get_console_logger
from tmtccmd.fsfw import parse_fsfw_returnvalues_csv, RetvalDictT
from tmtccmd.logging import get_console_logger
DEFAULT_RETVAL_CSV_NAME = "config/returnvalues.csv"
__RETVAL_DICT = None

View File

@ -1,6 +1,6 @@
SW_NAME = "eive"
VERSION_MAJOR = 1
VERSION_MINOR = 8
VERSION_MINOR = 9
VERSION_SUBMINOR = 0
__version__ = "1.8.0"
__version__ = "1.9.0"

View File

@ -23,11 +23,11 @@ class GomspaceDeviceActionIds(enum.IntEnum):
PRINT_LATCHUPS = 33
class GomspaceOpCodes(enum.Enum):
class GomspaceOpCodes:
# Request HK
REQUEST_HK_ONCE = "128"
PRINT_SWITCH_V_I = "129"
PRINT_LATCHUPS = "130"
REQUEST_HK_ONCE = ["req-hk-once", "128"]
PRINT_SWITCH_V_I = ["print-switch-vi", "129"]
PRINT_LATCHUPS = ["print-latchups", "130"]
class SetIds:

View File

@ -180,47 +180,47 @@ def add_pl_pcdu_cmds(cmd_dict: ServiceOpCodeDictT):
def add_pcdu_cmds(cmd_dict: ServiceOpCodeDictT):
from pus_tc.devs.p60dock import P60OpCodes, GomspaceOpCodes
from pus_tc.devs.p60dock import P60OpCodes, GomspaceOpCodes, Info
from pus_tc.devs.pdu1 import Pdu1OpCodes
from pus_tc.devs.pdu2 import Pdu2OpCodes
op_code_dict = dict()
add_op_code_entry(op_code_dict=op_code_dict, keys="0", info="P60 Tests")
add_op_code_entry(
op_code_dict=op_code_dict,
keys=P60OpCodes.STACK_3V3_ON.value,
info="P60 Dock: Turn stack 3V3 on",
keys=P60OpCodes.STACK_3V3_ON,
info=Info.STACK_3V3_ON,
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=P60OpCodes.STACK_3V3_OFF.value,
info="P60 Dock: Turn stack 3V3 off",
keys=P60OpCodes.STACK_3V3_OFF,
info=Info.STACK_3V3_OFF,
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=P60OpCodes.STACK_5V_ON.value,
info="P60 Dock: Turn stack 5V on",
keys=P60OpCodes.STACK_5V_ON,
info=Info.STACK_5V_ON,
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=P60OpCodes.STACK_5V_OFF.value,
info="P60 Dock: Turn stack 5V off",
keys=P60OpCodes.STACK_5V_OFF,
info=Info.STACK_5V_OFF,
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=GomspaceOpCodes.REQUEST_HK_ONCE.value,
keys=GomspaceOpCodes.REQUEST_HK_ONCE,
info="P60 Dock: Request HK once",
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=GomspaceOpCodes.PRINT_SWITCH_V_I.value,
keys=GomspaceOpCodes.PRINT_SWITCH_V_I,
info="P60 Dock: Print Switches, Voltages, Currents",
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=GomspaceOpCodes.PRINT_LATCHUPS.value,
keys=GomspaceOpCodes.PRINT_LATCHUPS,
info="P60 Dock: Print Latchups",
)
add_op_code_entry(op_code_dict=op_code_dict, keys=P60OpCodes.TEST, info="P60 Tests")
add_service_op_code_entry(
srv_op_code_dict=cmd_dict,
name=CustomServiceList.P60DOCK.value,
@ -311,17 +311,17 @@ def add_pcdu_cmds(cmd_dict: ServiceOpCodeDictT):
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=GomspaceOpCodes.REQUEST_HK_ONCE.value,
keys=GomspaceOpCodes.REQUEST_HK_ONCE,
info="PDU1: Request HK once",
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=GomspaceOpCodes.PRINT_SWITCH_V_I.value,
keys=GomspaceOpCodes.PRINT_SWITCH_V_I,
info="PDU1: Print Switches, Voltages, Currents",
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=GomspaceOpCodes.PRINT_LATCHUPS.value,
keys=GomspaceOpCodes.PRINT_LATCHUPS,
info="PDU1: Print Latchups",
)
add_op_code_entry(
@ -418,18 +418,18 @@ def add_pcdu_cmds(cmd_dict: ServiceOpCodeDictT):
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=GomspaceOpCodes.REQUEST_HK_ONCE.value,
keys=GomspaceOpCodes.REQUEST_HK_ONCE,
info="PDU2: Request HK once",
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=GomspaceOpCodes.PRINT_SWITCH_V_I.value,
keys=GomspaceOpCodes.PRINT_SWITCH_V_I,
info="PDU2: Print Switches, Voltages, Currents",
options={OpCodeDictKeys.TIMEOUT: 2.0},
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=GomspaceOpCodes.PRINT_LATCHUPS.value,
keys=GomspaceOpCodes.PRINT_LATCHUPS,
info="PDU2: Print Latchups",
)
add_service_op_code_entry(

View File

@ -7,17 +7,32 @@
"""
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.tc.packer import TcQueueT
from tmtccmd.tc.service_3_housekeeping import generate_one_hk_command, make_sid
from tmtccmd.tc.service_3_housekeeping import (
generate_one_hk_command,
make_sid,
generate_one_diag_command,
)
from gomspace.gomspace_common import *
from config.object_ids import P60_DOCK_HANDLER
class P60OpCodes(enum.Enum):
TEST = "0"
STACK_3V3_ON = "1"
STACK_3V3_OFF = "2"
STACK_5V_ON = "3"
STACK_5V_OFF = "4"
HK_SET_ID = 0x3
class P60OpCodes:
STACK_3V3_ON = ["stack-3v3-on", "1"]
STACK_3V3_OFF = ["stack-3v3-off", "2"]
STACK_5V_ON = ["stack-5v-on", "3"]
STACK_5V_OFF = ["stack-5v-off", "4"]
TEST = ["test", "0"]
class Info:
PREFIX = "P60 Dock"
STACK_3V3_ON = f"{PREFIX}: Turn Stack 3V3 on"
STACK_3V3_OFF = f"{PREFIX}: Turn Stack 3V3 off"
STACK_5V_ON = f"{PREFIX}: Turn Stack 5V on"
STACK_5V_OFF = f"{PREFIX}: Turn Stack 5V off"
class P60DockTestProcedure:
@ -76,8 +91,8 @@ class P60DockHkTable:
def pack_p60dock_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: str):
if op_code == P60OpCodes.STACK_3V3_ON.value:
tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Turning stack 3V3 on"))
if op_code in P60OpCodes.STACK_3V3_ON:
tc_queue.appendleft((QueueCommands.PRINT, Info.STACK_3V3_ON))
command = pack_set_param_command(
object_id,
P60DockConfigTable.out_en_9.parameter_address,
@ -85,8 +100,8 @@ def pack_p60dock_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: st
Channel.on,
)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == P60OpCodes.STACK_3V3_OFF.value:
tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Turning stack 3V3 off"))
if op_code in P60OpCodes.STACK_3V3_OFF:
tc_queue.appendleft((QueueCommands.PRINT, Info.STACK_3V3_OFF))
command = pack_set_param_command(
object_id,
P60DockConfigTable.out_en_9.parameter_address,
@ -94,8 +109,8 @@ def pack_p60dock_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: st
Channel.off,
)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == P60OpCodes.STACK_5V_ON.value:
tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Turning stack 5V on"))
if op_code in P60OpCodes.STACK_5V_ON:
tc_queue.appendleft((QueueCommands.PRINT, Info.STACK_5V_ON))
command = pack_set_param_command(
object_id,
P60DockConfigTable.out_en_10.parameter_address,
@ -103,8 +118,8 @@ def pack_p60dock_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: st
Channel.on,
)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == P60OpCodes.STACK_5V_OFF.value:
tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Turning stack 5V off"))
if op_code in P60OpCodes.STACK_5V_OFF:
tc_queue.appendleft((QueueCommands.PRINT, Info.STACK_5V_OFF))
command = pack_set_param_command(
object_id,
P60DockConfigTable.out_en_10.parameter_address,
@ -112,12 +127,12 @@ def pack_p60dock_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: st
Channel.off,
)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == GomspaceOpCodes.REQUEST_HK_ONCE.value:
if op_code in GomspaceOpCodes.REQUEST_HK_ONCE:
tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Requesting HK Table Once"))
hk_sid = make_sid(object_id=P60_DOCK_HANDLER, set_id=SetIds.P60_DOCK)
command = generate_one_hk_command(sid=hk_sid, ssc=0)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == GomspaceOpCodes.PRINT_SWITCH_V_I.value:
if op_code in GomspaceOpCodes.PRINT_SWITCH_V_I:
tc_queue.appendleft(
(QueueCommands.PRINT, "P60 Dock: Print Switches, Voltages, Currents")
)
@ -125,7 +140,7 @@ def pack_p60dock_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: st
object_id=object_id, action_id=GomspaceDeviceActionIds.PRINT_SWITCH_V_I
)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == GomspaceOpCodes.PRINT_LATCHUPS.value:
if op_code in GomspaceOpCodes.PRINT_LATCHUPS:
tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Print Latchups"))
command = generate_action_command(
object_id=object_id, action_id=GomspaceDeviceActionIds.PRINT_LATCHUPS

View File

@ -210,12 +210,12 @@ def pack_pdu1_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str):
Channel.off,
)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == GomspaceOpCodes.REQUEST_HK_ONCE.value:
if op_code in GomspaceOpCodes.REQUEST_HK_ONCE:
tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Requesting HK Table Once"))
hk_sid = make_sid(object_id=PDU_1_HANDLER_ID, set_id=SetIds.PDU_1)
command = generate_one_hk_command(sid=hk_sid, ssc=0)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == GomspaceOpCodes.PRINT_SWITCH_V_I.value:
if op_code in GomspaceOpCodes.PRINT_SWITCH_V_I:
tc_queue.appendleft(
(QueueCommands.PRINT, "PDU1: Print Switches, Voltages, Currents")
)
@ -223,7 +223,7 @@ def pack_pdu1_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str):
object_id=object_id, action_id=GomspaceDeviceActionIds.PRINT_SWITCH_V_I
)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == GomspaceOpCodes.PRINT_LATCHUPS.value:
if op_code in GomspaceOpCodes.PRINT_LATCHUPS:
tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Print Latchups"))
command = generate_action_command(
object_id=object_id, action_id=GomspaceDeviceActionIds.PRINT_LATCHUPS

View File

@ -215,9 +215,7 @@ def pack_pdu2_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str):
)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == Pdu2OpCodes.PL_CAMERA_ON.value:
tc_queue.appendleft(
(QueueCommands.PRINT, "PDU2: Turn payload camera on")
)
tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Turn payload camera on"))
command = pack_set_param_command(
object_id,
PDUConfigTable.out_en_8.parameter_address,
@ -226,9 +224,7 @@ def pack_pdu2_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str):
)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == Pdu2OpCodes.PL_CAMERA_OFF.value:
tc_queue.appendleft(
(QueueCommands.PRINT, "PDU2: Turn payload camera off")
)
tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Turn payload camera off"))
command = pack_set_param_command(
object_id,
PDUConfigTable.out_en_8.parameter_address,
@ -236,12 +232,12 @@ def pack_pdu2_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str):
Channel.off,
)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == GomspaceOpCodes.REQUEST_HK_ONCE.value:
if op_code in GomspaceOpCodes.REQUEST_HK_ONCE:
tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Requesting HK Table Once"))
hk_sid = make_sid(object_id=PDU_2_HANDLER_ID, set_id=SetIds.PDU_2)
command = generate_one_hk_command(sid=hk_sid, ssc=0)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == GomspaceOpCodes.PRINT_SWITCH_V_I.value:
if op_code in GomspaceOpCodes.PRINT_SWITCH_V_I:
tc_queue.appendleft(
(QueueCommands.PRINT, "PDU2: Print Switches, Currents, Voltahes")
)
@ -249,7 +245,7 @@ def pack_pdu2_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str):
object_id=object_id, action_id=GomspaceDeviceActionIds.PRINT_SWITCH_V_I
)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == GomspaceOpCodes.PRINT_LATCHUPS.value:
if op_code in GomspaceOpCodes.PRINT_LATCHUPS:
tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Print Latchups"))
command = generate_action_command(
object_id=object_id, action_id=GomspaceDeviceActionIds.PRINT_LATCHUPS

View File

@ -10,7 +10,7 @@ import struct
import enum
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.utility.logger import get_console_logger
from tmtccmd.logging import get_console_logger
from tmtccmd.tc.packer import TcQueueT
from spacepackets.ecss.tc import PusTelecommand
from utility.input_helper import InputHelper
@ -70,16 +70,12 @@ def pack_ploc_mpsoc_commands(
)
if op_code == "0":
tc_queue.appendleft(
(QueueCommands.PRINT, "PLOC MPSoC: Set mode off")
)
tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Set mode off"))
command = pack_mode_data(object_id, Modes.OFF, 0)
command = PusTelecommand(service=200, subservice=1, ssc=9, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "1":
tc_queue.appendleft(
(QueueCommands.PRINT, "PLOC MPSoC: Set mode on")
)
tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Set mode on"))
command = pack_mode_data(object_id, Modes.ON, 0)
command = PusTelecommand(service=200, subservice=1, ssc=10, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
@ -142,8 +138,10 @@ def pack_ploc_mpsoc_commands(
command = PusTelecommand(service=8, subservice=128, ssc=27, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "12":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Reset OBSW sequence count"))
command = object_id + struct.pack('!I', CommandIds.OBSW_RESET_SEQ_COUNT)
tc_queue.appendleft(
(QueueCommands.PRINT, "PLOC MPSoC: Reset OBSW sequence count")
)
command = object_id + struct.pack("!I", CommandIds.OBSW_RESET_SEQ_COUNT)
command = PusTelecommand(service=8, subservice=128, ssc=28, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "13":

View File

@ -11,7 +11,7 @@ import struct
from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.tc.packer import TcQueueT
from tmtccmd.utility.logger import get_console_logger
from tmtccmd.logging import get_console_logger
from tmtccmd.tc.service_200_mode import pack_mode_data, Modes
LOGGER = get_console_logger()

View File

@ -9,7 +9,7 @@ from tmtccmd.tc.service_20_parameter import (
pack_fsfw_load_param_cmd,
pack_boolean_parameter_app_data,
)
from tmtccmd.utility.logger import get_console_logger
from tmtccmd.logging import get_console_logger
from spacepackets.ecss.tc import PusTelecommand
from config.object_ids import PL_PCDU_ID
@ -52,14 +52,13 @@ class Info:
NORMAL_HPA = f"{NORMAL}, HPA on"
class NormalSubmodes(enum.IntEnum):
ALL_OFF = 0
SOLID_STATE_RELAYS_ADC_ON = 1
DRO_ON = 2
X8_ON = 3
TX_ON = 4
MPA_ON = 5
HPA_ON = 6
class NormalSubmodesMask(enum.IntEnum):
SOLID_STATE_RELAYS_ADC_ON = 0
DRO_ON = 1
X8_ON = 2
TX_ON = 3
MPA_ON = 4
HPA_ON = 5
class ParamIds(enum.IntEnum):
@ -114,42 +113,67 @@ def pack_pl_pcdu_commands(tc_queue: TcQueueT, op_code: str):
tc_queue=tc_queue,
info=Info.NORMAL_SSR,
mode=Modes.NORMAL,
submode=NormalSubmodes.SOLID_STATE_RELAYS_ADC_ON,
submode=(1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON),
)
if op_code in OpCodes.NORMAL_DRO:
pack_pl_pcdu_mode_cmd(
tc_queue=tc_queue,
info=Info.NORMAL_DRO,
mode=Modes.NORMAL,
submode=NormalSubmodes.DRO_ON,
submode=(
1 << NormalSubmodesMask.DRO_ON
| (1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON)
),
)
if op_code in OpCodes.NORMAL_X8:
pack_pl_pcdu_mode_cmd(
tc_queue=tc_queue,
info=Info.NORMAL_X8,
mode=Modes.NORMAL,
submode=NormalSubmodes.X8_ON,
submode=(
1 << NormalSubmodesMask.DRO_ON
| (1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON)
| (1 << NormalSubmodesMask.X8_ON)
),
)
if op_code in OpCodes.NORMAL_TX:
pack_pl_pcdu_mode_cmd(
tc_queue=tc_queue,
info=Info.NORMAL_TX,
mode=Modes.NORMAL,
submode=NormalSubmodes.TX_ON,
submode=(
1 << NormalSubmodesMask.DRO_ON
| (1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON)
| (1 << NormalSubmodesMask.X8_ON)
| (1 << NormalSubmodesMask.TX_ON)
),
)
if op_code in OpCodes.NORMAL_MPA:
pack_pl_pcdu_mode_cmd(
tc_queue=tc_queue,
info=Info.NORMAL_MPA,
mode=Modes.NORMAL,
submode=NormalSubmodes.MPA_ON,
submode=(
1 << NormalSubmodesMask.DRO_ON
| (1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON)
| (1 << NormalSubmodesMask.X8_ON)
| (1 << NormalSubmodesMask.TX_ON)
| (1 << NormalSubmodesMask.MPA_ON)
),
)
if op_code in OpCodes.NORMAL_HPA:
pack_pl_pcdu_mode_cmd(
tc_queue=tc_queue,
info=Info.NORMAL_HPA,
mode=Modes.NORMAL,
submode=NormalSubmodes.HPA_ON,
submode=(
1 << NormalSubmodesMask.DRO_ON
| (1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON)
| (1 << NormalSubmodesMask.X8_ON)
| (1 << NormalSubmodesMask.TX_ON)
| (1 << NormalSubmodesMask.MPA_ON)
| (1 << NormalSubmodesMask.HPA_ON)
),
)
if op_code in OpCodes.INJECT_ALL_ON_FAILURE:
pack_failure_injection_cmd(

View File

@ -49,24 +49,24 @@ def pack_rad_sensor_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code:
if op_code == "3":
tc_queue.appendleft((QueueCommands.PRINT, "Rad sensor: Start conversions"))
command = object_id + struct.pack('!I', CommandIds.START_CONVERSIONS)
command = object_id + struct.pack("!I", CommandIds.START_CONVERSIONS)
command = PusTelecommand(service=8, subservice=128, ssc=43, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "4":
tc_queue.appendleft((QueueCommands.PRINT, "Rad sensor: Read conversions"))
command = object_id + struct.pack('!I', CommandIds.READ_CONVERSIONS)
command = object_id + struct.pack("!I", CommandIds.READ_CONVERSIONS)
command = PusTelecommand(service=8, subservice=128, ssc=44, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "5":
tc_queue.appendleft((QueueCommands.PRINT, "Rad sensor: Enable debug output"))
command = object_id + struct.pack('!I', CommandIds.ENABLE_DEBUG_OUTPUT)
command = object_id + struct.pack("!I", CommandIds.ENABLE_DEBUG_OUTPUT)
command = PusTelecommand(service=8, subservice=128, ssc=45, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "6":
tc_queue.appendleft((QueueCommands.PRINT, "Rad sensor: Disable debug output"))
command = object_id + struct.pack('!I', CommandIds.DISABLE_DEBUG_OUTPUT)
command = object_id + struct.pack("!I", CommandIds.DISABLE_DEBUG_OUTPUT)
command = PusTelecommand(service=8, subservice=128, ssc=45, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())

View File

@ -12,7 +12,7 @@ from tmtccmd.config.definitions import QueueCommands
from tmtccmd.tc.packer import TcQueueT
from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.tc.service_200_mode import pack_mode_data, Modes
from tmtccmd.utility.logger import get_console_logger
from tmtccmd.logging import get_console_logger
from utility.input_helper import InputHelper

View File

@ -48,16 +48,12 @@ def pack_syrlinks_command(
)
if op_code == "0":
tc_queue.appendleft(
(QueueCommands.PRINT, "Syrlinks: Set mode off")
)
tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Set mode off"))
command = pack_mode_data(object_id, Modes.OFF, 0)
command = PusTelecommand(service=200, subservice=1, ssc=9, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "1":
tc_queue.appendleft(
(QueueCommands.PRINT, "Syrlinks: Set mode on")
)
tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Set mode on"))
command = pack_mode_data(object_id, Modes.ON, 0)
command = PusTelecommand(service=200, subservice=1, ssc=10, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
@ -116,61 +112,42 @@ def pack_syrlinks_command(
command = PusTelecommand(service=8, subservice=128, ssc=16, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "12":
tc_queue.appendleft(
(QueueCommands.PRINT, "Syrlinks: Write LCL config")
)
command = object_id + struct.pack('!I', CommandIds.WRITE_LCL_CONFIG)
tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Write LCL config"))
command = object_id + struct.pack("!I", CommandIds.WRITE_LCL_CONFIG)
command = PusTelecommand(service=8, subservice=128, ssc=17, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "13":
tc_queue.appendleft(
(QueueCommands.PRINT, "Syrlinks: Read RX status registers")
)
command = object_id + struct.pack('!I', CommandIds.READ_RX_STATUS_REGISTERS)
tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Read RX status registers"))
command = object_id + struct.pack("!I", CommandIds.READ_RX_STATUS_REGISTERS)
command = PusTelecommand(service=8, subservice=128, ssc=18, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "14":
tc_queue.appendleft(
(QueueCommands.PRINT, "Syrlinks: Read LCL config register")
)
command = object_id + struct.pack('!I', CommandIds.READ_LCL_CONFIG_REGISTER)
tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Read LCL config register"))
command = object_id + struct.pack("!I", CommandIds.READ_LCL_CONFIG_REGISTER)
command = PusTelecommand(service=8, subservice=128, ssc=19, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
<<<<<<< HEAD
if op_code == "15":
tc_queue.appendleft(
(QueueCommands.PRINT, "Syrlinks: Set waveform OQPSK")
)
command = object_id + struct.pack('!I', CommandIds.SET_WAVEFORM_OQPSK)
tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Set waveform OQPSK"))
command = object_id + struct.pack("!I", CommandIds.SET_WAVEFORM_OQPSK)
command = PusTelecommand(service=8, subservice=128, ssc=20, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "16":
tc_queue.appendleft(
(QueueCommands.PRINT, "Syrlinks: Set waveform BPSK")
)
command = object_id + struct.pack('!I', CommandIds.SET_WAVEFORM_BPSK)
tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Set waveform BPSK"))
command = object_id + struct.pack("!I", CommandIds.SET_WAVEFORM_BPSK)
command = PusTelecommand(service=8, subservice=128, ssc=21, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "17":
tc_queue.appendleft(
(QueueCommands.PRINT, "Syrlinks: Set second config")
)
command = object_id + struct.pack('!I', CommandIds.SET_SECOND_CONFIG)
tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Set second config"))
command = object_id + struct.pack("!I", CommandIds.SET_SECOND_CONFIG)
command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "18":
tc_queue.appendleft(
(QueueCommands.PRINT, "Syrlinks: Enable debug printout")
)
command = object_id + struct.pack('!I', CommandIds.ENABLE_DEBUG)
tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Enable debug printout"))
command = object_id + struct.pack("!I", CommandIds.ENABLE_DEBUG)
command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "19":
tc_queue.appendleft(
(QueueCommands.PRINT, "Syrlinks: Disable debug printout")
)
command = object_id + struct.pack('!I', CommandIds.DISABLE_DEBUG)
tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Disable debug printout"))
command = object_id + struct.pack("!I", CommandIds.DISABLE_DEBUG)
command = PusTelecommand(service=8, subservice=128, ssc=24, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
=======
>>>>>>> develop

View File

@ -3,7 +3,7 @@ import enum
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.tc.service_8_functional_cmd import generate_action_command
from tmtccmd.utility.logger import get_console_logger
from tmtccmd.logging import get_console_logger
from tmtccmd.tc.service_3_housekeeping import make_sid, generate_one_hk_command
from config.object_ids import CORE_CONTROLLER_ID

View File

@ -1,14 +1,19 @@
"""Hook function which packs telecommands based on service and operation code string
"""
import logging
import os
from collections import deque
from typing import Union
from spacepackets.ecss import PusTelecommand
from tmtccmd.com_if.com_interface_base import CommunicationInterface
from tmtccmd.config.definitions import CoreServiceList
from tmtccmd.utility.logger import get_console_logger
from tmtccmd.logging import get_console_logger
from tmtccmd.logging.pus import log_raw_pus_tc
from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.tc.service_5_event import pack_generic_service5_test_into
from tmtccmd.pus.service_17_test import pack_service_17_ping_command
from tmtccmd.logging import get_current_time_string
from pus_tc.service_200_mode import pack_service200_test_into
from pus_tc.devs.p60dock import pack_p60dock_test_into
@ -68,6 +73,21 @@ from config.object_ids import (
LOGGER = get_console_logger()
def pre_tc_send_cb(
packet: bytes,
com_if: CommunicationInterface,
pus_info: Union[PusTelecommand, any],
file_logger: logging.Logger,
):
log_raw_pus_tc(
packet=packet, srv_subservice=(pus_info.service, pus_info.subservice)
)
tc_info_string = f"Sent {pus_info}"
LOGGER.info(tc_info_string)
file_logger.info(f"{get_current_time_string(True)}: {tc_info_string}")
com_if.send(data=packet)
def pack_service_queue_user(
service: Union[str, int], op_code: str, service_queue: TcQueueT
):

View File

@ -0,0 +1,107 @@
import struct
from config.object_ids import *
from pus_tc.devs.imtq import ImtqActionIds
from pus_tc.devs.ploc_mpsoc import PlocReplyIds
from pus_tc.devs.ploc_supervisor import SupvActionIds
from pus_tc.devs.star_tracker import StarTrackerActionIds
from tmtccmd.logging import get_console_logger
from tmtccmd.config.definitions import DataReplyUnpacked
from tmtccmd.tm import Service8FsfwTm
from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter
LOGGER = get_console_logger()
def handle_action_reply(
raw_tm: bytes, printer: FsfwTmTcPrinter, obj_id_dict: ObjectIdDictT
):
"""Core Action reply handler
:return:
"""
tm_packet = Service8FsfwTm.unpack(raw_telemetry=raw_tm)
printer.handle_long_tm_print(packet_if=tm_packet, info_if=tm_packet)
object_id = obj_id_dict.get(tm_packet.source_object_id)
custom_data = tm_packet.custom_data
action_id = tm_packet.action_id
generic_print_str = printer.generic_action_packet_tm_print(
packet=tm_packet, obj_id=object_id
)
print(generic_print_str)
printer.file_logger.info(generic_print_str)
if object_id == IMTQ_HANDLER_ID:
return handle_imtq_replies(action_id, printer, custom_data)
elif object_id == PLOC_MPSOC_ID:
return handle_ploc_replies(action_id, printer, custom_data)
elif object_id == PLOC_SUPV_ID:
return handle_supervisor_replies(action_id, printer, custom_data)
elif object_id == STAR_TRACKER_ID:
return handle_startracker_replies(action_id, printer, custom_data)
def handle_imtq_replies(
action_id: int, printer: FsfwTmTcPrinter, custom_data: bytearray
):
if action_id == struct.unpack("!I", ImtqActionIds.get_commanded_dipole)[0]:
header_list = [
"Commanded X-Dipole",
"Commanded Y-Dipole",
"Commanded Z-Dipole",
]
[x_dipole, y_dipole, z_dipole] = struct.unpack("!HHH", custom_data[0:6])
content_list = [x_dipole, y_dipole, z_dipole]
print(header_list)
print(content_list)
printer.file_logger.info(header_list)
printer.file_logger.info(content_list)
def handle_ploc_replies(
action_id: int, printer: FsfwTmTcPrinter, custom_data: bytearray
):
if action_id == PlocReplyIds.tm_mem_read_report:
header_list = [
"PLOC Memory Address",
"PLOC Mem Len",
"PLOC Read Memory Data",
]
content_list = [
"0x" + custom_data[:4].hex(),
struct.unpack("!H", custom_data[4:6])[0],
"0x" + custom_data[6:10].hex(),
]
print(header_list)
print(content_list)
printer.file_logger.info(header_list)
printer.file_logger.info(content_list)
def handle_supervisor_replies(
action_id: int, printer: FsfwTmTcPrinter, custom_data: bytearray
):
reply = DataReplyUnpacked()
if action_id == SupvActionIds.DUMP_MRAM:
header_list = ["MRAM Dump"]
content_list = [custom_data[: len(custom_data)]]
print(header_list)
print(content_list)
printer.file_logger.info(header_list)
printer.file_logger.info(content_list)
def handle_startracker_replies(
action_id: int, printer: FsfwTmTcPrinter, custom_data: bytearray
):
if action_id == StarTrackerActionIds.CHECKSUM:
if len(custom_data) != 5:
LOGGER.warning(
"Star tracker reply has invalid length {0}".format(len(custom_data))
)
return
header_list = ["Checksum", "Checksum valid"]
print(custom_data[4])
checksum_valid_flag = custom_data[4] >> 8
content_list = ["0x" + custom_data[:4].hex(), checksum_valid_flag]
print(header_list)
print(content_list)
printer.file_logger.info(header_list)
printer.file_logger.info(content_list)

View File

@ -1,8 +1,13 @@
import logging
import os.path
from datetime import datetime
from config.object_ids import get_object_ids
from tmtccmd.utility.logger import get_console_logger
from tmtccmd.utility.fsfw import parse_fsfw_events_csv, EventDictT, EventInfo
from tmtccmd.tm import Service5Tm
from tmtccmd.logging import get_console_logger
from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter
from tmtccmd.fsfw import parse_fsfw_events_csv, EventDictT, EventInfo
LOGGER = get_console_logger()
DEFAULT_EVENTS_CSV_PATH = "config/events.csv"
@ -21,25 +26,36 @@ def get_event_dict() -> EventDictT:
def handle_event_packet(
object_id: bytes, event_id: int, param_1: int, param_2: int
raw_tm: bytes, printer: FsfwTmTcPrinter, file_logger: logging.Logger
) -> str:
tm = Service5Tm.unpack(raw_telemetry=raw_tm)
printer.handle_long_tm_print(packet_if=tm, info_if=tm)
additional_event_info = ""
event_dict = get_event_dict()
info = event_dict.get(event_id)
info = event_dict.get(tm.event_id)
if info is None:
LOGGER.warning(f"Event ID {event_id} has no information")
LOGGER.warning(f"Event ID {tm.event_id} has no information")
info = EventInfo()
info.name = "Unknown event"
obj_ids = get_object_ids()
obj_id_obj = obj_ids.get(bytes(object_id))
obj_id_obj = obj_ids.get(tm.reporter_id.as_bytes)
if obj_id_obj is None:
LOGGER.warning(f"Object ID 0x{object_id.hex()} has no name")
obj_name = object_id.hex()
LOGGER.warning(f"Object ID 0x{tm.reporter_id.as_string} has no name")
obj_name = tm.reporter_id.as_string
else:
obj_name = obj_id_obj.name
generic_event_string = f"Object {obj_name} generated Event {event_id} | {info.name}"
generic_event_string = (
f"Object {obj_name} generated Event {tm.event_id} | {info.name}"
)
if info.info != "":
additional_event_info = (
f" | Additional info: {info.info} | P1: {param_1} | P2: {param_2}"
f"Additional info: {info.info} | P1: {tm.param_1} | P2: {tm.param_2}"
)
return generic_event_string + additional_event_info
file_logger.info(
f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}: {generic_event_string}"
)
LOGGER.info(generic_event_string)
if additional_event_info != "":
file_logger.info(additional_event_info)
print(additional_event_info)
return generic_event_string + " | " + additional_event_info

View File

@ -1,64 +1,80 @@
"""Core EIVE TM handler module
"""
@brief This file transfers control of TM parsing to the user
@details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs.
"""
from tmtccmd.tm.service_8_fsfw_functional_cmd import Service8FsfwTm
from spacepackets.ecss.tm import PusTelemetry
from tmtccmd.utility.logger import get_console_logger
from tmtccmd.logging import get_console_logger
from tmtccmd.logging.pus import (
log_raw_pus_tm,
log_raw_unknown_packet,
PacketTypes,
create_tmtc_logger,
)
from tmtccmd.pus.service_1_verification import Service1TMExtended
from tmtccmd.pus.service_17_test import Service17TMExtended
from tmtccmd.tm.service_3_fsfw_housekeeping import Service3FsfwTm
from tmtccmd.tm.service_20_fsfw_parameters import Service20FsfwTm
from tmtccmd.tm.service_5_event import Service5Tm
from tmtccmd.tm.service_200_fsfw_mode import Service200FsfwTm
from tmtccmd.utility.tmtc_printer import TmTcPrinter, PrintFormats
from tmtccmd.utility.tmtc_printer import PrintFormats, FsfwTmTcPrinter
from config.definitions import PUS_APID
from config.object_ids import get_object_ids
from .event_handler import handle_event_packet
from .verification_handler import handle_service_1_packet
from .hk_handling import handle_hk_packet
from .action_reply_handler import handle_action_reply
LOGGER = get_console_logger()
def ccsds_tm_handler(
apid: int, raw_tm_packet: bytearray, tmtc_printer: TmTcPrinter
) -> None:
FSFW_PRINTER = FsfwTmTcPrinter(file_logger=create_tmtc_logger())
def ccsds_tm_handler(apid: int, raw_tm_packet: bytes, _user_args: any) -> None:
if apid == PUS_APID:
pus_factory_hook(raw_tm_packet=raw_tm_packet, tmtc_printer=tmtc_printer)
pus_factory_hook(raw_tm_packet=raw_tm_packet)
def pus_factory_hook(raw_tm_packet: bytearray, tmtc_printer: TmTcPrinter):
def pus_factory_hook(raw_tm_packet: bytes):
if len(raw_tm_packet) < 8:
LOGGER.warning("Detected packet shorter than 8 bytes!")
return
service_type = raw_tm_packet[7]
tm_packet = None
subservice_type = raw_tm_packet[8]
file_logger = FSFW_PRINTER.file_logger
obj_id_dict = get_object_ids()
try:
if service_type == 1:
tm_packet = Service1TMExtended.unpack(raw_telemetry=raw_tm_packet)
if service_type == 3:
tm_packet = Service3FsfwTm.unpack(
raw_telemetry=raw_tm_packet, custom_hk_handling=False
handle_service_1_packet(printer=FSFW_PRINTER, raw_tm=raw_tm_packet)
elif service_type == 3:
handle_hk_packet(
printer=FSFW_PRINTER, raw_tm=raw_tm_packet, obj_id_dict=obj_id_dict
)
if service_type == 5:
tm_packet = Service5Tm.unpack(raw_telemetry=raw_tm_packet)
if service_type == 8:
tm_packet = Service8FsfwTm.unpack(raw_telemetry=raw_tm_packet)
if service_type == 17:
elif service_type == 5:
handle_event_packet(
raw_tm=raw_tm_packet, printer=FSFW_PRINTER, file_logger=file_logger
)
elif service_type == 8:
handle_action_reply(
raw_tm=raw_tm_packet, printer=FSFW_PRINTER, obj_id_dict=obj_id_dict
)
elif service_type == 17:
tm_packet = Service17TMExtended.unpack(raw_telemetry=raw_tm_packet)
if service_type == 20:
FSFW_PRINTER.handle_long_tm_print(packet_if=tm_packet, info_if=tm_packet)
elif service_type == 20:
tm_packet = Service20FsfwTm.unpack(raw_telemetry=raw_tm_packet)
if service_type == 200:
FSFW_PRINTER.handle_long_tm_print(packet_if=tm_packet, info_if=tm_packet)
elif service_type == 200:
tm_packet = Service200FsfwTm.unpack(raw_telemetry=raw_tm_packet)
if tm_packet is None:
FSFW_PRINTER.handle_long_tm_print(packet_if=tm_packet, info_if=tm_packet)
else:
LOGGER.info(
f"The service {service_type} is not implemented in Telemetry Factory"
)
tm_packet = PusTelemetry.unpack(raw_telemetry=raw_tm_packet)
tm_packet.print_source_data(PrintFormats.HEX)
tmtc_printer.print_telemetry(
packet_if=tm_packet, info_if=tm_packet, print_raw_tm=False
log_raw_pus_tm(
packet=raw_tm_packet, srv_subservice=(service_type, subservice_type)
)
except ValueError:
# TODO: Log faulty packet
LOGGER.warning("Invalid packet format detected")
log_raw_unknown_packet(packet=raw_tm_packet, packet_type=PacketTypes.TM)

View File

@ -3,12 +3,18 @@ import struct
import os
import datetime
from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter
from tmtccmd.config.definitions import HkReplyUnpacked
from tmtccmd.tm.service_3_fsfw_housekeeping import Service3Base
from tmtccmd.utility.logger import get_console_logger
from tmtccmd.tm.service_3_fsfw_housekeeping import (
Service3Base,
HkContentType,
Service3FsfwTm,
)
from tmtccmd.logging import get_console_logger
from pus_tc.devs.bpx_batt import BpxSetIds
from pus_tc.devs.syrlinks_hk_handler import SetIds
from pus_tc.devs.imtq import ImtqSetIds
from tmtccmd.pus.obj_id import ObjectId, ObjectIdDictT
from config.object_ids import (
SYRLINKS_HANDLER_ID,
IMTQ_HANDLER_ID,
@ -16,45 +22,81 @@ from config.object_ids import (
GPS_HANDLER_1_ID,
BPX_HANDLER_ID,
CORE_CONTROLLER_ID,
P60_DOCK_HANDLER,
PL_PCDU_ID
)
LOGGER = get_console_logger()
def handle_user_hk_packet(
object_id: bytes, set_id: int, hk_data: bytearray, service3_packet: Service3Base
) -> HkReplyUnpacked:
def handle_hk_packet(
raw_tm: bytes,
obj_id_dict: ObjectIdDictT,
printer: FsfwTmTcPrinter,
):
tm_packet = Service3FsfwTm.unpack(raw_telemetry=raw_tm, custom_hk_handling=False)
named_obj_id = obj_id_dict.get(tm_packet.object_id.as_bytes)
if named_obj_id is None:
named_obj_id = tm_packet.object_id
if tm_packet.subservice == 25 or tm_packet.subservice == 26:
hk_data = tm_packet.tm_data[8:]
printer.generic_hk_tm_print(
content_type=HkContentType.HK,
object_id=named_obj_id,
set_id=tm_packet.set_id,
hk_data=hk_data,
)
handle_regular_hk_print(
printer=printer,
object_id=named_obj_id,
hk_packet=tm_packet,
hk_data=hk_data,
)
if tm_packet.subservice == 10 or tm_packet.subservice == 12:
LOGGER.warning("HK definitions printout not implemented yet")
def handle_regular_hk_print(
printer: FsfwTmTcPrinter,
object_id: ObjectId,
hk_packet: Service3Base,
hk_data: bytes,
):
object_id = object_id.as_bytes
set_id = hk_packet.set_id
"""This function is called when a Service 3 Housekeeping packet is received."""
if object_id == SYRLINKS_HANDLER_ID:
if set_id == SetIds.RX_REGISTERS_DATASET:
return handle_syrlinks_rx_registers_dataset(hk_data)
return handle_syrlinks_rx_registers_dataset(printer, hk_data)
elif set_id == SetIds.TX_REGISTERS_DATASET:
return handle_syrlinks_tx_registers_dataset(hk_data)
return handle_syrlinks_tx_registers_dataset(printer, hk_data)
else:
LOGGER.info("Serive 3 TM: Syrlinks handler reply with unknown set id")
LOGGER.info("Service 3 TM: Syrlinks handler reply with unknown set id")
elif object_id == IMTQ_HANDLER_ID:
if (set_id >= ImtqSetIds.POSITIVE_X_TEST) and (
set_id <= ImtqSetIds.NEGATIVE_Z_TEST
):
return handle_self_test_data(hk_data)
return handle_self_test_data(printer, hk_data)
else:
LOGGER.info("Serive 3 TM: Syrlinks handler reply with unknown set id")
LOGGER.info("Service 3 TM: Syrlinks handler reply with unknown set id")
elif object_id == GPS_HANDLER_0_ID or object_id == GPS_HANDLER_1_ID:
return handle_gps_data(hk_data=hk_data)
handle_gps_data(printer=printer, hk_data=hk_data)
elif object_id == BPX_HANDLER_ID:
return handle_bpx_hk_data(hk_data=hk_data, set_id=set_id)
handle_bpx_hk_data(hk_data=hk_data, set_id=set_id, printer=printer)
elif object_id == CORE_CONTROLLER_ID:
return handle_core_hk_data(hk_data=hk_data, set_id=set_id)
return handle_core_hk_data(printer=printer, hk_data=hk_data)
elif object_id == P60_DOCK_HANDLER:
handle_p60_hk_data(printer=printer, hk_data=hk_data)
elif object_id == PL_PCDU_ID:
log_to_both(printer, "Received PL PCDU HK data")
else:
LOGGER.info("Service 3 TM: Parsing for this SID has not been implemented.")
return HkReplyUnpacked()
def handle_syrlinks_rx_registers_dataset(
hk_data: bytearray,
) -> HkReplyUnpacked:
def handle_syrlinks_rx_registers_dataset(printer: FsfwTmTcPrinter, hk_data: bytes):
reply = HkReplyUnpacked()
reply.header_list = [
header_list = [
"RX Status",
"RX Sensitivity",
"RX Frequency Shift",
@ -72,7 +114,7 @@ def handle_syrlinks_rx_registers_dataset(
rx_demod_eb = struct.unpack("!I", hk_data[13:17])
rx_demod_n0 = struct.unpack("!I", hk_data[17:21])
rx_data_rate = hk_data[21]
reply.content_list = [
content_list = [
rx_status,
rx_sensitivity,
rx_frequency_shift,
@ -82,28 +124,30 @@ def handle_syrlinks_rx_registers_dataset(
rx_demod_n0,
rx_data_rate,
]
reply.validity_buffer = hk_data[22:]
reply.num_of_vars = 8
return reply
validity_buffer = hk_data[22:]
log_to_both(printer, str(header_list))
log_to_both(printer, str(content_list))
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=8)
def handle_syrlinks_tx_registers_dataset(
hk_data: bytearray,
) -> HkReplyUnpacked:
printer: FsfwTmTcPrinter,
hk_data: bytes,
):
reply = HkReplyUnpacked()
reply.header_list = ["TX Status", "TX Waveform", "TX AGC value"]
header_list = ["TX Status", "TX Waveform", "TX AGC value"]
tx_status = hk_data[0]
tx_waveform = hk_data[1]
tx_agc_value = struct.unpack("!H", hk_data[2:4])
reply.content_list = [tx_status, tx_waveform, tx_agc_value]
reply.validity_buffer = hk_data[4:]
reply.num_of_vars = 3
return reply
content_list = [tx_status, tx_waveform, tx_agc_value]
validity_buffer = hk_data[4:]
log_to_both(printer, str(header_list))
log_to_both(printer, str(content_list))
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=3)
def handle_self_test_data(hk_data: bytearray) -> HkReplyUnpacked:
reply = HkReplyUnpacked()
reply.hk_header = [
def handle_self_test_data(printer: FsfwTmTcPrinter, hk_data: bytes):
header_list = [
"Init Err",
"Init Raw Mag X [nT]",
"Init Raw Mag Y [nT]",
@ -189,8 +233,8 @@ def handle_self_test_data(hk_data: bytearray) -> HkReplyUnpacked:
fina_coil_y_temperature = struct.unpack("!H", hk_data[125:127])[0]
fina_coil_z_temperature = struct.unpack("!H", hk_data[127:129])[0]
reply.validity_buffer = hk_data[129:]
reply.content_list = [
validity_buffer = hk_data[129:]
content_list = [
init_err,
init_raw_mag_x,
init_raw_mag_y,
@ -231,17 +275,17 @@ def handle_self_test_data(hk_data: bytearray) -> HkReplyUnpacked:
fina_coil_y_temperature,
fina_coil_z_temperature,
]
reply.num_of_vars = len(reply.hk_header)
return reply
num_of_vars = len(header_list)
log_to_both(printer, str(header_list))
log_to_both(printer, str(content_list))
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=num_of_vars)
def handle_gps_data(hk_data: bytearray) -> HkReplyUnpacked:
def handle_gps_data(printer: FsfwTmTcPrinter, hk_data: bytes):
LOGGER.info(f"Received GPS data, HK data length {len(hk_data)}")
reply = HkReplyUnpacked()
var_index = 0
header_array = []
content_array = []
reply.header_list = [
header_list = [
"Latitude",
"Longitude",
"Altitude",
@ -263,7 +307,7 @@ def handle_gps_data(hk_data: bytearray) -> HkReplyUnpacked:
seconds = hk_data[32]
date_string = f"{day}.{month}.{year} {hours}:{minutes}:{seconds}"
unix_seconds = struct.unpack("!I", hk_data[33:37])[0]
content_array = [
content_list = [
latitude,
longitude,
altitude,
@ -285,27 +329,29 @@ def handle_gps_data(hk_data: bytearray) -> HkReplyUnpacked:
f"{datetime.datetime.now()}, {latitude}, {longitude}, {altitude}, "
f"{fix_mode}, {sat_in_use}, {date_string}, {unix_seconds}\n"
)
reply.header_list = header_array
reply.content_list = content_array
reply.validity_buffer = hk_data[37:39]
return reply
validity_buffer = hk_data[37:39]
log_to_both(printer, str(header_list))
log_to_both(printer, str(content_list))
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=10)
def handle_bpx_hk_data(hk_data: bytes, set_id: int) -> HkReplyUnpacked:
LOGGER.info(f"Received BPX data, HK data length {len(hk_data)}")
reply = HkReplyUnpacked()
def handle_bpx_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
if set_id == BpxSetIds.GET_HK_SET:
charge_current = struct.unpack("!H", hk_data[0:2])[0]
discharge_current = struct.unpack("!H", hk_data[2:4])[0]
heater_current = struct.unpack("!H", hk_data[4:6])[0]
batt_voltage = struct.unpack("!H", hk_data[6:8])[0]
batt_temp_1 = struct.unpack("!h", hk_data[8:10])[0]
batt_temp_2 = struct.unpack("!h", hk_data[10:12])[0]
batt_temp_3 = struct.unpack("!h", hk_data[12:14])[0]
batt_temp_4 = struct.unpack("!h", hk_data[14:16])[0]
reboot_cntr = struct.unpack("!I", hk_data[16:20])[0]
boot_cause = hk_data[20]
reply.header_list = [
fmt_str = "!HHHHhhhhIB"
inc_len = struct.calcsize(fmt_str)
(
charge_current,
discharge_current,
heater_current,
batt_voltage,
batt_temp_1,
batt_temp_2,
batt_temp_3,
batt_temp_4,
reboot_cntr,
boot_cause,
) = struct.unpack(fmt_str, hk_data[0:inc_len])
header_list = [
"Charge Current",
"Discharge Current",
"Heater Current",
@ -317,7 +363,7 @@ def handle_bpx_hk_data(hk_data: bytes, set_id: int) -> HkReplyUnpacked:
"Reboot Counter",
"Boot Cause",
]
reply.content_list = [
content_list = [
charge_current,
discharge_current,
heater_current,
@ -329,29 +375,189 @@ def handle_bpx_hk_data(hk_data: bytes, set_id: int) -> HkReplyUnpacked:
reboot_cntr,
boot_cause,
]
reply.validity_buffer = hk_data[21:]
validity_buffer = hk_data[inc_len:]
log_to_both(printer, str(header_list))
log_to_both(printer, str(content_list))
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=10)
elif set_id == BpxSetIds.GET_CFG_SET:
battheat_mode = hk_data[0]
battheat_low = struct.unpack("!b", hk_data[1:2])[0]
battheat_high = struct.unpack("!b", hk_data[2:3])[0]
reply.header_list = [
header_list = [
"Battery Heater Mode",
"Battery Heater Low Limit",
"Battery Heater High Limit",
]
reply.content_list = [battheat_mode, battheat_low, battheat_high]
reply.validity_buffer = hk_data[3:]
return reply
content_list = [battheat_mode, battheat_low, battheat_high]
validity_buffer = hk_data[3:]
log_to_both(printer, str(header_list))
log_to_both(printer, str(content_list))
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=10)
def handle_core_hk_data(hk_data: bytes, set_id: int) -> HkReplyUnpacked:
reply = HkReplyUnpacked()
reply.header_list = ["Chip Temperature [°C]", "PS Voltage [mV]", "PL Voltage [mV]"]
temperature = struct.unpack("!f", hk_data[0:4])
ps_voltage = struct.unpack("!f", hk_data[4:8])
pl_voltage = struct.unpack("!f", hk_data[8:12])
tx_agc_value = struct.unpack("!H", hk_data[2:4])
reply.content_list = [temperature, ps_voltage, pl_voltage]
reply.validity_buffer = hk_data[12:]
reply.num_of_vars = 3
return reply
def handle_core_hk_data(printer: FsfwTmTcPrinter, hk_data: bytes):
fmt_str = "!fffH"
inc_len = struct.calcsize(fmt_str)
(temperature, ps_voltage, pl_voltage, tx_agc_value) = struct.unpack(
fmt_str, hk_data[0 : 0 + inc_len]
)
printout = (
f"Chip Temperature [°C] {temperature} | PS Voltage [mV] {ps_voltage} | "
f"PL Voltage [mV] {pl_voltage} | TX AGC {tx_agc_value}"
)
log_to_both(printer, printout)
printer.print_validity_buffer(validity_buffer=hk_data[inc_len:], num_vars=4)
P60_INDEX_LIST = [
"ACU VCC",
"PDU1 VCC",
"X3 IDLE VCC",
"PDU2 VCC",
"ACU VBAT",
"PDU1 VBAT",
"X3 IDLE VBAT",
"PDU2 VBAT",
"STACK VBAT",
"STACK 3V3",
"STACK 5V",
"GS3V3",
"GS5V",
]
WDT_LIST = ["GND", "I2C", "CAN", "CSP0", "CSP1"]
def handle_p60_hk_data(printer: FsfwTmTcPrinter, hk_data: bytes):
current_idx = 0
current_list = []
for idx in range(13):
current_list.append(
struct.unpack("!h", hk_data[current_idx : current_idx + 2])[0]
)
current_idx += 2
voltage_list = []
for idx in range(13):
voltage_list.append(
struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0]
)
current_idx += 2
out_enb_list = []
for idx in range(13):
out_enb_list.append(hk_data[current_idx])
current_idx += 1
header_str = f"{'Name'.ljust(24)} | OutEnb | U [mV] | I [mA]"
print(header_str)
printer.file_logger.info(header_str)
for idx in range(13):
out_enb = f"{out_enb_list[idx]}".ljust(6)
content_line = (
f"{P60_INDEX_LIST[idx].ljust(24)} | {out_enb} | "
f"{voltage_list[idx]:05} | {current_list[idx]:04}"
)
print(content_line)
printer.file_logger.info(content_line)
fmt_str = "!hhIIIhBBB"
inc_len = struct.calcsize(fmt_str)
(
temp0,
temp1,
boot_cause,
boot_count,
uptime,
reset_cause,
batt_mode,
heater_on,
conv_5v_on,
) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len])
current_idx += inc_len
util_info = (
f"Batt Mode {batt_mode} | Boot Count {boot_count} | Heater On {heater_on}"
)
util_info2 = (
f"Reset Cause {reset_cause} | Boot Cause {boot_cause} | Uptime {uptime} | "
f"Conv 5V on {conv_5v_on}"
)
print(util_info)
print(util_info2)
printer.file_logger.info(util_info)
printer.file_logger.info(util_info2)
latchup_list = []
for idx in range(0, 13):
latchup_list.append(
struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0]
)
current_idx += 2
fmt_str = "!HhhHhh"
inc_len = struct.calcsize(fmt_str)
(
dock_vbat,
dock_vcc_current,
batt_current,
batt_voltage,
batt_temp_0,
batt_temp_1,
) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len])
current_idx += inc_len
device_types = []
device_statuses = []
for idx in range(8):
device_types.append(hk_data[current_idx])
current_idx += 1
for idx in range(8):
device_statuses.append(hk_data[current_idx])
current_idx += 1
dearm_status = hk_data[current_idx]
current_idx += 1
wdt_reboots_list = []
for idx in range(5):
wdt_reboots_list.append(
struct.unpack("!I", hk_data[current_idx : current_idx + 4])[0]
)
current_idx += 4
time_pings_left_list = []
for idx in range(3):
time_pings_left_list.append(
struct.unpack("!I", hk_data[current_idx : current_idx + 4])[0]
)
current_idx += 4
for idx in range(2):
time_pings_left_list.append(hk_data[current_idx])
current_idx += 1
batt_charge_current = struct.unpack("!h", hk_data[current_idx : current_idx + 2])[0]
current_idx += 2
batt_discharge_current = struct.unpack(
"!h", hk_data[current_idx : current_idx + 2]
)[0]
current_idx += 2
ant6_depl_status = struct.unpack("!b", hk_data[current_idx : current_idx + 1])[0]
current_idx += 1
ar6_depl_status = struct.unpack("!b", hk_data[current_idx : current_idx + 1])[0]
current_idx += 1
wdt_info = "WDT Type | Reboots | Time or Pings left (CSP only)"
log_to_both(printer, wdt_info)
for idx in range(len(wdt_reboots_list)):
log_to_both(
printer,
f"{WDT_LIST[idx].ljust(5)} | "
f"{wdt_reboots_list[idx]:010} | {time_pings_left_list[idx]:010}",
)
temps = (
f"In C: Temp 0 {temp0 / 10.0} | Temp 1 {temp1 / 10.0} | "
f"Batt Temp 0 {batt_temp_0 / 10.0} | Batt Temp 1 {batt_temp_1 / 10.0}"
)
batt_info = (
f"Batt: Current {batt_current} | Volt {batt_voltage} | "
f"Charge Current {batt_charge_current} | Discharge Current {batt_discharge_current}"
)
log_to_both(printer, temps)
log_to_both(printer, batt_info)
misc_info = f"Dearm {dearm_status} | ANT6 Depl {ant6_depl_status} | AR6 Deply {ar6_depl_status}"
log_to_both(printer, misc_info)
printer.print_validity_buffer(validity_buffer=hk_data[current_idx:], num_vars=36)
def log_to_both(printer: FsfwTmTcPrinter, string: str):
print(string)
printer.file_logger.info(string)

View File

@ -1,104 +0,0 @@
import struct
from config.object_ids import *
from pus_tc.devs.imtq import ImtqActionIds
from pus_tc.devs.ploc_mpsoc import PlocReplyIds
from pus_tc.devs.ploc_supervisor import SupvActionIds
from pus_tc.devs.star_tracker import StarTrackerActionIds
from tmtccmd.utility.logger import get_console_logger
from tmtccmd.config.definitions import DataReplyUnpacked
LOGGER = get_console_logger()
def user_analyze_service_8_data(
object_id: bytes, action_id: int, custom_data: bytearray
) -> DataReplyUnpacked:
"""
This function is called by the TMTC core if a Service 8 data reply (subservice 130)
is received. The user can return a tuple of two lists, where the first list
is a list of header strings to print and the second list is a list of values to print.
The TMTC core will take care of printing both lists and logging them.
@param object_id:
@param action_id:
@param custom_data:
@return:
"""
if object_id == PDU_2_HANDLER_ID:
reply = DataReplyUnpacked()
reply.header_list = ["PDU2 Service 8 Reply"]
data_string = str()
for index in range(len(custom_data)):
data_string += str(hex(custom_data[index])) + " , "
data_string = data_string.rstrip()
data_string = data_string.rstrip(",")
data_string = data_string.rstrip()
reply.content_list = [data_string]
return reply
elif object_id == IMTQ_HANDLER_ID:
return handle_imtq_replies(action_id, custom_data)
elif object_id == PLOC_MPSOC_ID:
return handle_ploc_replies(action_id, custom_data)
elif object_id == PLOC_SUPV_ID:
return handle_supervisor_replies(action_id, custom_data)
elif object_id == STAR_TRACKER_ID:
return handle_startracker_replies(action_id, custom_data)
return DataReplyUnpacked()
def handle_imtq_replies(action_id: int, custom_data: bytearray) -> DataReplyUnpacked:
reply = DataReplyUnpacked()
if action_id == struct.unpack("!I", ImtqActionIds.get_commanded_dipole)[0]:
reply.header_list = [
"Commanded X-Dipole",
"Commanded Y-Dipole",
"Commanded Z-Dipole",
]
x_dipole = struct.unpack("!H", custom_data[:2])
y_dipole = struct.unpack("!H", custom_data[2:4])
z_dipole = struct.unpack("!H", custom_data[4:6])
reply.content_list = [x_dipole[0], y_dipole[0], z_dipole[0]]
return reply
def handle_ploc_replies(action_id: int, custom_data: bytearray) -> DataReplyUnpacked:
reply = DataReplyUnpacked()
if action_id == PlocReplyIds.tm_mem_read_report:
reply.header_list = [
"PLOC Memory Address",
"PLOC Mem Len",
"PLOC Read Memory Data",
]
reply.content_list = [
"0x" + custom_data[:4].hex(),
struct.unpack("!H", custom_data[4:6])[0],
"0x" + custom_data[6:10].hex(),
]
return reply
def handle_supervisor_replies(
action_id: int, custom_data: bytearray
) -> DataReplyUnpacked:
reply = DataReplyUnpacked()
if action_id == SupvActionIds.DUMP_MRAM:
reply.header_list = ["MRAM Dump"]
reply.content_list = [custom_data[: len(custom_data)]]
return reply
def handle_startracker_replies(
action_id: int, custom_data: bytearray
) -> DataReplyUnpacked:
reply = DataReplyUnpacked()
if action_id == StarTrackerActionIds.CHECKSUM:
if len(custom_data) != 5:
LOGGER.warning(
"Star tracker reply has invalid length {0}".format(len(custom_data))
)
return reply
reply.header_list = ["Checksum", "Checksum valid"]
print(custom_data[4])
checksum_valid_flag = custom_data[4] >> 8
reply.content_list = ["0x" + custom_data[:4].hex(), checksum_valid_flag]
return reply

View File

@ -0,0 +1,30 @@
import logging
from datetime import datetime
from typing import cast
from tmtccmd.pus.service_1_verification import Service1TMExtended
from tmtccmd.logging import get_console_logger
from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter
from config.retvals import get_retval_dict
LOGGER = get_console_logger()
def handle_service_1_packet(printer: FsfwTmTcPrinter, raw_tm: bytes):
tm_packet = Service1TMExtended.unpack(raw_telemetry=raw_tm)
printer.handle_long_tm_print(packet_if=tm_packet, info_if=tm_packet)
srv1_packet = cast(Service1TMExtended, tm_packet)
retval_dict = get_retval_dict()
if srv1_packet.has_tc_error_code:
retval_info = retval_dict.get(srv1_packet.error_code)
if retval_info is None:
LOGGER.info(
f"No returnvalue information found for error code {srv1_packet.error_code}"
)
else:
retval_string = (
f"Error Code information for code {srv1_packet.error_code}| "
f"Name: {retval_info.name} | Info: {retval_info.info}"
)
LOGGER.info(retval_string)
printer.file_logger.info(retval_string)

View File

@ -1 +1 @@
tmtccmd>=1.13.0
tmtccmd>=2.0.1

@ -1 +1 @@
Subproject commit 19e8a588fa0723a5991f80bb2fd52dfc64f0ac64
Subproject commit 0501a26b6f347cf48e0875658ac4eaca8cc7d819

View File

@ -30,13 +30,16 @@ import sys
import traceback
try:
from tmtccmd.runner import (
initialize_tmtc_commander,
run_tmtc_commander,
add_ccsds_handler,
import tmtccmd.runner as tmtccmd
from tmtccmd.config import default_json_path, SetupArgs
from tmtccmd.config.args import (
create_default_args_parser,
add_default_tmtccmd_args,
parse_default_input_arguments,
)
from tmtccmd.ccsds.handler import CcsdsTmHandler
from tmtccmd.utility.logger import TMTC_LOGGER_NAME
from tmtccmd.ccsds.handler import CcsdsTmHandler, ApidHandler
from tmtccmd.logging import init_console_logger
from tmtccmd.logging.pus import create_tmtc_logger
except ImportError as error:
run_tmtc_commander = None
initialize_tmtc_commander = None
@ -47,7 +50,6 @@ except ImportError as error:
try:
import spacepackets
from spacepackets.log import set_custom_console_logger_name
except ImportError as error:
print(error)
print("Python spacepackets module could not be imported")
@ -59,23 +61,33 @@ except ImportError as error:
from config.hook_implementations import EiveHookObject
from config.version import __version__
from config.definitions import PUS_APID
from pus_tc.tc_packer_hook import pre_tc_send_cb
from pus_tm.factory_hook import ccsds_tm_handler
def main():
from pus_tm.event_handler import handle_event_packet
hook_obj = EiveHookObject()
print(f"-- eive tmtc version {__version__} --")
print(f"-- spacepackets version {spacepackets.__version__} --")
set_custom_console_logger_name(logger_name=TMTC_LOGGER_NAME)
initialize_tmtc_commander(hook_object=hook_obj)
ccsds_handler = CcsdsTmHandler()
ccsds_handler.add_tm_handler(
apid=PUS_APID, pus_tm_handler=ccsds_tm_handler, max_queue_len=50
tmtccmd.init_printout(False)
tmtc_file_logger = create_tmtc_logger()
hook_obj = EiveHookObject(json_cfg_path=default_json_path())
arg_parser = create_default_args_parser()
add_default_tmtccmd_args(arg_parser)
args = parse_default_input_arguments(arg_parser, hook_obj)
setup_args = SetupArgs(
hook_obj=hook_obj, use_gui=False, apid=PUS_APID, cli_args=args
)
add_ccsds_handler(ccsds_handler)
run_tmtc_commander(False)
apid_handler = ApidHandler(cb=ccsds_tm_handler, queue_len=50, user_args=None)
ccsds_handler = CcsdsTmHandler()
ccsds_handler.add_tm_handler(apid=PUS_APID, handler=apid_handler)
tmtccmd.setup(setup_args=setup_args)
tmtccmd.add_ccsds_handler(ccsds_handler)
tmtc_backend = tmtccmd.create_default_tmtc_backend(
setup_args=setup_args,
tm_handler=ccsds_handler,
)
tmtc_backend.usr_send_wrapper = (pre_tc_send_cb, tmtc_file_logger)
tmtccmd.run(tmtc_backend=tmtc_backend)
if __name__ == "__main__":

@ -1 +1 @@
Subproject commit 06aed2f309a105f8f0e183d359a432301eb6947d
Subproject commit 48b6b8396eb3ea5ec4527ccb96f5909a29cd95f6

View File

@ -35,8 +35,8 @@ from pus_tm.factory_hook import ccsds_tm_handler
try:
from tmtccmd.runner import (
initialize_tmtc_commander,
run_tmtc_commander,
init_tmtccmd,
run_tmtccmd,
add_ccsds_handler,
)
from tmtccmd.ccsds.handler import CcsdsTmHandler
@ -56,13 +56,13 @@ def main():
hook_obj = EiveHookObject()
print(f"-- eive tmtc version {__version__}")
print(f"-- spacepackets version {spacepackets.__version__} --")
initialize_tmtc_commander(hook_object=hook_obj)
init_tmtccmd(hook_object=hook_obj)
ccsds_handler = CcsdsTmHandler()
ccsds_handler.add_tm_handler(
apid=PUS_APID, pus_tm_handler=ccsds_tm_handler, max_queue_len=50
)
add_ccsds_handler(ccsds_handler)
run_tmtc_commander(use_gui=True)
run_tmtccmd(use_gui=True)
if __name__ == "__main__":

View File

@ -6,7 +6,7 @@
@date 13.02.2021
"""
from tmtccmd.utility.logger import get_console_logger
from tmtccmd.logging import get_console_logger
LOGGER = get_console_logger()