Compare commits

...

55 Commits

Author SHA1 Message Date
f7fd1e5bec Merge pull request 'v1.9.0' (#53) from develop into master
Reviewed-on: #53
2022-04-07 17:39:41 +02:00
25bab108df Merge pull request 'Major TMTC Update' (#52) from mueller/major-tmtc-update into develop
Reviewed-on: #52
2022-04-07 11:39:45 +02:00
Jakob Meier
f422d92553 fixed conflict 2022-04-07 11:39:27 +02:00
43be3fd370 Merge pull request 'Update PL PCDU Commands' (#50) from mueller/new-pl-pcdu-cmds into develop
Reviewed-on: #50
2022-04-07 11:36:01 +02:00
8f2ff3034f tmtccmd update 2022-04-07 10:05:53 +02:00
Robin Mueller
5d9f008320 small bugfix for PL PCDU commanding 2022-04-07 00:15:15 +02:00
Robin Mueller
61937a0272 bump tmtccmd requirement 2022-04-06 19:04:18 +02:00
Robin Mueller
1380460754 renamed function 2022-04-06 18:25:54 +02:00
Robin Mueller
ce7be3a3eb hk handing bpx fix 2022-04-06 17:35:04 +02:00
Robin Mueller
d4ebb1b8aa update hk handling 2022-04-06 17:01:01 +02:00
Robin Mueller
3fac640101 update action reply handling 2022-04-06 16:41:46 +02:00
Robin Mueller
50747d8a5e getter function renamed 2022-04-06 16:19:59 +02:00
Robin Mueller
49f88a5b3d pre send CB works properly now 2022-04-06 16:17:51 +02:00
Robin Mueller
fc2b50979e submodule updates 2022-04-06 15:54:20 +02:00
Robin Mueller
d32e427e0f pack HK command instead of diag 2022-04-06 12:13:28 +02:00
Robin Mueller
8917f303ec Merge remote-tracking branch 'origin/mueller/major-tmtc-update' into mueller/master 2022-04-06 12:12:14 +02:00
Robin Mueller
1c10575264 tmtccmd update 2022-04-06 12:08:18 +02:00
6088230a5f bump minor version 2022-04-06 11:44:40 +02:00
b2acb7fec5 update for changed tmtccmd API 2022-04-06 11:43:17 +02:00
3f5d77a5e5 request diag packet 2022-04-05 19:49:42 +02:00
5b2dfa55eb Merge remote-tracking branch 'origin/develop' into mueller/major-tmtc-update 2022-04-05 19:43:22 +02:00
38b21e4fc8 Merge branch 'develop' into mueller/new-pl-pcdu-cmds 2022-04-05 19:40:48 +02:00
2c8c0a97e2 Merge pull request 'meier/syrlinks' (#51) from meier/syrlinks into develop
Reviewed-on: #51
2022-04-05 19:40:31 +02:00
007adfd66a tmtccmd update 2022-04-05 19:40:03 +02:00
a038e4c175 finished p60 dock HK handling 2022-04-05 19:27:55 +02:00
2c58a77338 continuing printout handling refactoring 2022-04-05 17:39:51 +02:00
5896ecad07 continuing refactoring of printouts 2022-04-05 17:05:11 +02:00
3c302bd4e8 tmtccmd refactoring 2022-04-05 15:19:46 +02:00
Jakob Meier
28ca0aa363 fixed conflicts 2022-04-05 11:58:13 +02:00
Jakob Meier
f8d6fc4c2b disabel debug command id 2022-04-05 08:42:38 +02:00
Robin Mueller
e67eb6633e Merge branch 'mueller/master' of https://egit.irs.uni-stuttgart.de/eive/eive-tmtc into mueller/master 2022-04-05 00:55:17 +02:00
Robin Mueller
c94e3bd085 moved logger 2022-04-05 00:51:52 +02:00
2b7a75e32b temporary fix for tmtc printer overhaul 2022-04-04 22:32:15 +02:00
Robin Mueller
923929ca55 HKhandler p60dock 2022-04-04 18:46:52 +02:00
Robin Mueller
0fc8369bbc added p60dock hk handler 2022-04-04 17:15:12 +02:00
Jakob Meier
592f37544f fixed conflicts 2022-04-04 15:05:38 +02:00
Robin Mueller
8b5554d6fd updated pl pcdu commands 2022-04-04 13:53:26 +02:00
Robin Mueller
a39c7007c6 Merge remote-tracking branch 'origin/develop' into mueller/master 2022-04-04 12:00:29 +02:00
a14784666f Merge remote-tracking branch 'origin/develop' into mueller/new-pl-pcdu-cmds 2022-03-31 14:49:23 +02:00
c8308f47a8 remove wait time cmds 2022-03-31 14:47:38 +02:00
cd87968c3d Merge remote-tracking branch 'origin/develop' into mueller/new-pl-pcdu-cmds 2022-03-31 14:46:32 +02:00
4600aed2be Merge pull request 'meier/develop' (#49) from meier/develop into develop
Reviewed-on: #49
2022-03-31 14:46:00 +02:00
57f810f9de Merge remote-tracking branch 'origin/develop' into mueller/master 2022-03-31 14:45:45 +02:00
f2f61e4fda Merge branch 'develop' into meier/develop 2022-03-31 14:45:22 +02:00
dd411bc42b Merge pull request 'payload improvements' (#48) from meier/ploc into develop
Reviewed-on: #48
2022-03-31 14:44:57 +02:00
5ac8912dd2 device commands 2022-03-31 11:36:50 +02:00
f7a3ad9981 rad sensor enable debug output 2022-03-30 18:12:51 +02:00
5c43f638ac form change 2022-03-30 17:12:48 +02:00
eb4dc95160 write lcl config register 2022-03-30 16:31:52 +02:00
b5a9dac8f6 new commands for PL PCDU mode commanding 2022-03-30 12:08:50 +02:00
Jakob Meier
c1a8170945 syrlinks print text adaption 2022-03-30 09:22:10 +02:00
Jakob Meier
ed85e1706f syrlinks mode commands 2022-03-29 07:57:39 +02:00
Jakob Meier
3e1dcdf430 Merge branch 'meier/ploc' into meier/syrlinks 2022-03-29 07:03:19 +02:00
Jakob Meier
b21850d4f9 syrlinks mode commands wip 2022-03-29 07:03:13 +02:00
e37430423e tmtccmd update 2022-03-28 12:30:48 +02:00
39 changed files with 1092 additions and 544 deletions

View File

@@ -13,7 +13,7 @@
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtccli.py" />
<option name="PARAMETERS" value="-s pdu1 -l -t 6" />
<option name="PARAMETERS" value="-s pdu1 -t 6" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" />
<option name="MODULE_MODE" value="false" />

View File

@@ -13,7 +13,7 @@
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtccli.py" />
<option name="PARAMETERS" value="-s ploc_mpsoc -t 6 -l" />
<option name="PARAMETERS" value="-s ploc_mpsoc -t 6" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" />
<option name="MODULE_MODE" value="false" />

View File

@@ -6,14 +6,14 @@
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="C:\Users\EIVE_Reinraumrechner\AppData\Local\Programs\Python\Python39\python.exe" />
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$" />
<option name="IS_MODULE_SDK" value="false" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_client_cli.py" />
<option name="PARAMETERS" value="-s syrlinks -l -t 6" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtccli.py" />
<option name="PARAMETERS" value="-s syrlinks -t 6" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" />
<option name="MODULE_MODE" value="false" />

View File

@@ -4,7 +4,7 @@
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/tmtccmd/src" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/tmtccmd/src/tests" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />

View File

@@ -6,16 +6,16 @@
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="C:\Users\jakob\AppData\Local\Programs\Python\Python39\python.exe" />
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$" />
<option name="IS_MODULE_SDK" value="false" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtccli.py" />
<option name="PARAMETERS" value="-s ccsds_handler -l -t 8 --hk" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" />
<option name="EMULATE_TERMINAL" value="true" />
<option name="MODULE_MODE" value="false" />
<option name="REDIRECT_INPUT" value="false" />
<option name="INPUT_FILE" value="" />

View File

@@ -6,9 +6,9 @@
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="C:\Users\jakob\AppData\Local\Programs\Python\Python39\python.exe" />
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$" />
<option name="IS_MODULE_SDK" value="false" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />

View File

@@ -0,0 +1,24 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="tmtccli-example" type="PythonConfigurationType" factoryName="Python" folderName="Example">
<module name="tmtc" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/tmtccmd/examples" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtccmd/examples/tmtccli.py" />
<option name="PARAMETERS" value="" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" />
<option name="MODULE_MODE" value="false" />
<option name="REDIRECT_INPUT" value="false" />
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
</component>

View File

@@ -0,0 +1,24 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="tmtcgui-example" type="PythonConfigurationType" factoryName="Python" folderName="Example">
<module name="tmtc" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/tmtccmd/examples" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtccmd/examples/tmtcgui.py" />
<option name="PARAMETERS" value="" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" />
<option name="MODULE_MODE" value="false" />
<option name="REDIRECT_INPUT" value="false" />
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
</component>

View File

@@ -6,7 +6,7 @@
import enum
from tmtccmd.core.backend import TmTcHandler
from tmtccmd.utility.logger import get_console_logger
from tmtccmd.logging import get_console_logger
LOGGER = get_console_logger()

View File

@@ -12,11 +12,8 @@ import argparse
from config.definitions import CustomServiceList, PUS_APID
from config.custom_mode_op import CustomModeList
from tmtccmd.config.definitions import CoreComInterfaces
from tmtccmd.config.globals import (
set_default_globals_pre_args_parsing,
set_default_globals_post_args_parsing,
)
from tmtccmd.utility.logger import get_console_logger
from tmtccmd.config.globals import set_default_globals_pre_args_parsing
from tmtccmd.logging import get_console_logger
LOGGER = get_console_logger()
@@ -34,12 +31,3 @@ def set_globals_pre_args_parsing(gui: bool = False):
tm_apid=PUS_APID,
com_if_id=CoreComInterfaces.TCPIP_UDP.value,
)
def add_globals_post_args_parsing(args: argparse.Namespace, json_cfg_path: str):
set_default_globals_post_args_parsing(
args=args,
custom_services_list=[CustomServiceList],
custom_modes_list=[CustomModeList],
json_cfg_path=json_cfg_path,
)

View File

@@ -13,7 +13,7 @@ from tmtccmd.pus.obj_id import ObjectIdDictT
from tmtccmd.com_if.com_interface_base import CommunicationInterface
from tmtccmd.core.backend import TmTcHandler
from tmtccmd.config.hook import TmTcHookBase
from tmtccmd.utility.tmtc_printer import TmTcPrinter
from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter
from tmtccmd.config.globals import OpCodeDictKeys
from config.definitions import CustomServiceList
@@ -21,43 +21,24 @@ from config.retvals import get_retval_dict
class EiveHookObject(TmTcHookBase):
def __init__(self, json_cfg_path: str):
super().__init__(json_cfg_path=json_cfg_path)
def get_service_op_code_dictionary(self) -> ServiceOpCodeDictT:
from tmtccmd.config.globals import get_default_service_op_code_dict
service_op_code_dict = get_default_service_op_code_dict()
get_eive_service_op_code_dict(service_op_code_dict=service_op_code_dict)
return service_op_code_dict
def get_json_config_file_path(self) -> str:
"""The user can specify a path and filename for the JSON configuration file by overriding
this function.
:return:
"""
return "config/tmtc_config.json"
def add_globals_pre_args_parsing(self, gui: bool = False):
from config.globals_config import set_globals_pre_args_parsing
set_globals_pre_args_parsing(gui=gui)
def add_globals_post_args_parsing(self, args: argparse.Namespace):
from config.globals_config import add_globals_post_args_parsing
add_globals_post_args_parsing(
args=args, json_cfg_path=self.get_json_config_file_path()
)
def assign_communication_interface(
self, com_if_key: str, tmtc_printer: TmTcPrinter
self, com_if_key: str
) -> Union[CommunicationInterface, None]:
from tmtccmd.config.com_if import create_communication_interface_default
return create_communication_interface_default(
com_if_key=com_if_key,
tmtc_printer=tmtc_printer,
json_cfg_path=self.get_json_config_file_path(),
json_cfg_path=self.json_cfg_path,
space_packet_ids=(0x0865,),
)
@@ -78,39 +59,6 @@ class EiveHookObject(TmTcHookBase):
return get_object_ids()
@staticmethod
def handle_service_8_telemetry(
object_id: bytes, action_id: int, custom_data: bytearray
) -> DataReplyUnpacked:
from pus_tm.service_8_hook import user_analyze_service_8_data
return user_analyze_service_8_data(
object_id=object_id, action_id=action_id, custom_data=custom_data
)
@staticmethod
def handle_service_3_housekeeping(
object_id: bytes, set_id: int, hk_data: bytearray, service3_packet: Service3Base
) -> HkReplyUnpacked:
from pus_tm.hk_handling import handle_user_hk_packet
return handle_user_hk_packet(
object_id=object_id,
set_id=set_id,
hk_data=hk_data,
service3_packet=service3_packet,
)
@staticmethod
def handle_service_5_event(
object_id: bytes, event_id: int, param_1: int, param_2: int
) -> str:
from pus_tm.event_handler import handle_event_packet
return handle_event_packet(
object_id=object_id, event_id=event_id, param_1=param_1, param_2=param_2
)
def get_retval_dict(self) -> RetvalDictT:
return get_retval_dict()
@@ -334,19 +282,54 @@ def get_eive_service_op_code_dict(service_op_code_dict: ServiceOpCodeDictT):
service_pdec_handler_tuple = ("PDEC Handler", op_code_dict_srv_pdec_handler)
op_code_dict_srv_syrlinks_handler = {
"0": ("Syrlinks Handler: Set TX standby", {OpCodeDictKeys.TIMEOUT: 2.0}),
"1": ("Syrlinks Handler: Set TX modulation", {OpCodeDictKeys.TIMEOUT: 2.0}),
"2": ("Syrlinks Handler: Set TX carrier wave", {OpCodeDictKeys.TIMEOUT: 2.0}),
"5": ("Syrlinks Handler: Read TX status", {OpCodeDictKeys.TIMEOUT: 2.0}),
"6": ("Syrlinks Handler: Read TX waveform", {OpCodeDictKeys.TIMEOUT: 2.0}),
"7": (
"0": ("Syrlinks Handler: Set mode off", {OpCodeDictKeys.TIMEOUT: 2.0}),
"1": ("Syrlinks Handler: Set mode on", {OpCodeDictKeys.TIMEOUT: 2.0}),
"2": ("Syrlinks Handler: Set mode normal", {OpCodeDictKeys.TIMEOUT: 2.0}),
"3": ("Syrlinks Handler: Set TX standby", {OpCodeDictKeys.TIMEOUT: 2.0}),
"4": ("Syrlinks Handler: Set TX modulation", {OpCodeDictKeys.TIMEOUT: 2.0}),
"5": ("Syrlinks Handler: Set TX carrier wave", {OpCodeDictKeys.TIMEOUT: 2.0}),
"6": ("Syrlinks Handler: Read TX status", {OpCodeDictKeys.TIMEOUT: 2.0}),
"7": ("Syrlinks Handler: Read TX waveform", {OpCodeDictKeys.TIMEOUT: 2.0}),
"8": (
"Syrlinks Handler: Read TX AGC value high byte ",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"8": (
"9": (
"Syrlinks Handler: Read TX AGC value low byte ",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"12": (
"Syrlinks Handler: Write LCL config",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"13": (
"Syrlinks Handler: Read RX status registers",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"14": (
"Syrlinks Handler: Read LCL config register",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"15": (
"Syrlinks Handler: Set waveform OQPSK",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"16": (
"Syrlinks Handler: Set waveform BPSK",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"17": (
"Syrlinks Handler: Set second config",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"18": (
"Syrlinks Handler: Enable debug output",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"19": (
"Syrlinks Handler: Disable debug output",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
}
service_syrlinks_handler_tuple = (
"Syrlinks Handler",

View File

@@ -5,8 +5,9 @@
"""
import os.path
from tmtccmd.pus.obj_id import ObjectIdDictT
from tmtccmd.utility.fsfw import parse_fsfw_objects_csv
from tmtccmd.utility.logger import get_console_logger
from tmtccmd.fsfw import parse_fsfw_objects_csv
from tmtccmd.logging import get_console_logger
LOGGER = get_console_logger()
DEFAULT_OBJECTS_CSV_PATH = "config/objects.csv"

View File

@@ -1,6 +1,6 @@
import os
from tmtccmd.utility.fsfw import parse_fsfw_returnvalues_csv, RetvalDictT
from tmtccmd.utility.logger import get_console_logger
from tmtccmd.fsfw import parse_fsfw_returnvalues_csv, RetvalDictT
from tmtccmd.logging import get_console_logger
DEFAULT_RETVAL_CSV_NAME = "config/returnvalues.csv"
__RETVAL_DICT = None

View File

@@ -1,6 +1,6 @@
SW_NAME = "eive"
VERSION_MAJOR = 1
VERSION_MINOR = 8
VERSION_MINOR = 9
VERSION_SUBMINOR = 0
__version__ = "1.8.0"
__version__ = "1.9.0"

View File

@@ -23,11 +23,11 @@ class GomspaceDeviceActionIds(enum.IntEnum):
PRINT_LATCHUPS = 33
class GomspaceOpCodes(enum.Enum):
class GomspaceOpCodes:
# Request HK
REQUEST_HK_ONCE = "128"
PRINT_SWITCH_V_I = "129"
PRINT_LATCHUPS = "130"
REQUEST_HK_ONCE = ["req-hk-once", "128"]
PRINT_SWITCH_V_I = ["print-switch-vi", "129"]
PRINT_LATCHUPS = ["print-latchups", "130"]
class SetIds:

View File

@@ -97,25 +97,44 @@ def add_core_controller_definitions(cmd_dict: ServiceOpCodeDictT):
def add_pl_pcdu_cmds(cmd_dict: ServiceOpCodeDictT):
from pus_tc.devs.plpcdu import OpCodes
from pus_tc.devs.plpcdu import OpCodes, Info
op_code_dict = dict()
add_op_code_entry(
op_code_dict=op_code_dict, keys=OpCodes.SWITCH_ON, info="Switch PL PCDU on"
op_code_dict=op_code_dict, keys=OpCodes.SWITCH_ON, info=Info.SWITCH_ON
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=OpCodes.SWITCH_ADC_NORMAL,
info="Switch PL PCDU ADC normal, submode ADC ON",
)
add_op_code_entry(
op_code_dict=op_code_dict, keys=OpCodes.SWITCH_OFF, info="Switch PL PCDU off"
keys=OpCodes.NORMAL_SSR,
info=Info.NORMAL_SSR,
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=OpCodes.SWITCH_ALL_NORMAL,
info="Switch all PL PCDU modules normal, submode ALL ON",
options=generate_op_code_options(enter_listener_mode=True),
keys=OpCodes.NORMAL_DRO,
info=Info.NORMAL_DRO,
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=OpCodes.NORMAL_X8,
info=Info.NORMAL_X8,
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=OpCodes.NORMAL_TX,
info=Info.NORMAL_TX,
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=OpCodes.NORMAL_MPA,
info=Info.NORMAL_MPA,
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=OpCodes.NORMAL_HPA,
info=Info.NORMAL_HPA,
)
add_op_code_entry(
op_code_dict=op_code_dict, keys=OpCodes.SWITCH_OFF, info=Info.SWITCH_OFF
)
add_op_code_entry(
op_code_dict=op_code_dict,
@@ -161,47 +180,47 @@ def add_pl_pcdu_cmds(cmd_dict: ServiceOpCodeDictT):
def add_pcdu_cmds(cmd_dict: ServiceOpCodeDictT):
from pus_tc.devs.p60dock import P60OpCodes, GomspaceOpCodes
from pus_tc.devs.p60dock import P60OpCodes, GomspaceOpCodes, Info
from pus_tc.devs.pdu1 import Pdu1OpCodes
from pus_tc.devs.pdu2 import Pdu2OpCodes
op_code_dict = dict()
add_op_code_entry(op_code_dict=op_code_dict, keys="0", info="P60 Tests")
add_op_code_entry(
op_code_dict=op_code_dict,
keys=P60OpCodes.STACK_3V3_ON.value,
info="P60 Dock: Turn stack 3V3 on",
keys=P60OpCodes.STACK_3V3_ON,
info=Info.STACK_3V3_ON,
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=P60OpCodes.STACK_3V3_OFF.value,
info="P60 Dock: Turn stack 3V3 off",
keys=P60OpCodes.STACK_3V3_OFF,
info=Info.STACK_3V3_OFF,
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=P60OpCodes.STACK_5V_ON.value,
info="P60 Dock: Turn stack 5V on",
keys=P60OpCodes.STACK_5V_ON,
info=Info.STACK_5V_ON,
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=P60OpCodes.STACK_5V_OFF.value,
info="P60 Dock: Turn stack 5V off",
keys=P60OpCodes.STACK_5V_OFF,
info=Info.STACK_5V_OFF,
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=GomspaceOpCodes.REQUEST_HK_ONCE.value,
keys=GomspaceOpCodes.REQUEST_HK_ONCE,
info="P60 Dock: Request HK once",
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=GomspaceOpCodes.PRINT_SWITCH_V_I.value,
keys=GomspaceOpCodes.PRINT_SWITCH_V_I,
info="P60 Dock: Print Switches, Voltages, Currents",
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=GomspaceOpCodes.PRINT_LATCHUPS.value,
keys=GomspaceOpCodes.PRINT_LATCHUPS,
info="P60 Dock: Print Latchups",
)
add_op_code_entry(op_code_dict=op_code_dict, keys=P60OpCodes.TEST, info="P60 Tests")
add_service_op_code_entry(
srv_op_code_dict=cmd_dict,
name=CustomServiceList.P60DOCK.value,
@@ -292,17 +311,17 @@ def add_pcdu_cmds(cmd_dict: ServiceOpCodeDictT):
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=GomspaceOpCodes.REQUEST_HK_ONCE.value,
keys=GomspaceOpCodes.REQUEST_HK_ONCE,
info="PDU1: Request HK once",
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=GomspaceOpCodes.PRINT_SWITCH_V_I.value,
keys=GomspaceOpCodes.PRINT_SWITCH_V_I,
info="PDU1: Print Switches, Voltages, Currents",
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=GomspaceOpCodes.PRINT_LATCHUPS.value,
keys=GomspaceOpCodes.PRINT_LATCHUPS,
info="PDU1: Print Latchups",
)
add_op_code_entry(
@@ -399,18 +418,18 @@ def add_pcdu_cmds(cmd_dict: ServiceOpCodeDictT):
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=GomspaceOpCodes.REQUEST_HK_ONCE.value,
keys=GomspaceOpCodes.REQUEST_HK_ONCE,
info="PDU2: Request HK once",
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=GomspaceOpCodes.PRINT_SWITCH_V_I.value,
keys=GomspaceOpCodes.PRINT_SWITCH_V_I,
info="PDU2: Print Switches, Voltages, Currents",
options={OpCodeDictKeys.TIMEOUT: 2.0},
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=GomspaceOpCodes.PRINT_LATCHUPS.value,
keys=GomspaceOpCodes.PRINT_LATCHUPS,
info="PDU2: Print Latchups",
)
add_service_op_code_entry(
@@ -461,6 +480,10 @@ def add_rad_sens_cmds(cmd_dict: ServiceOpCodeDictT):
"0": ("Radiation Sensor: Set mode on", {OpCodeDictKeys.TIMEOUT: 2.0}),
"1": ("Radiation Sensor: Set mode normal", {OpCodeDictKeys.TIMEOUT: 2.0}),
"2": ("Radiation Sensor: Set mode off", {OpCodeDictKeys.TIMEOUT: 2.0}),
"3": ("Radiation Sensor: Start conversions", {OpCodeDictKeys.TIMEOUT: 2.0}),
"4": ("Radiation Sensor: Read conversions", {OpCodeDictKeys.TIMEOUT: 2.0}),
"5": ("Radiation Sensor: Enable debug output", {OpCodeDictKeys.TIMEOUT: 2.0}),
"6": ("Radiation Sensor: Disable debug putput", {OpCodeDictKeys.TIMEOUT: 2.0}),
}
service_rad_sensor_tuple = ("Radiation Sensor", op_code_dict_srv_rad_sensor)
cmd_dict[CustomServiceList.RAD_SENSOR.value] = service_rad_sensor_tuple
@@ -468,18 +491,21 @@ def add_rad_sens_cmds(cmd_dict: ServiceOpCodeDictT):
def add_ploc_mpsoc_cmds(cmd_dict: ServiceOpCodeDictT):
op_code_dict_srv_ploc_mpsoc = {
"0": ("Ploc MPSoC: Memory write", {OpCodeDictKeys.TIMEOUT: 2.0}),
"1": ("Ploc MPSoC: Memory read", {OpCodeDictKeys.TIMEOUT: 2.0}),
"2": ("Ploc MPSoC: Flash write", {OpCodeDictKeys.TIMEOUT: 2.0}),
"3": ("Ploc MPSoC: Flash delete", {OpCodeDictKeys.TIMEOUT: 2.0}),
"4": ("Ploc MPSoC: Replay start", {OpCodeDictKeys.TIMEOUT: 2.0}),
"5": ("Ploc MPSoC: Replay stop", {OpCodeDictKeys.TIMEOUT: 2.0}),
"6": ("Ploc MPSoC: Downlink pwr on", {OpCodeDictKeys.TIMEOUT: 2.0}),
"7": ("Ploc MPSoC: Downlink pwr off", {OpCodeDictKeys.TIMEOUT: 2.0}),
"8": ("Ploc MPSoC: Replay write sequence", {OpCodeDictKeys.TIMEOUT: 2.0}),
"9": ("Ploc MPSoC: OBSW reset sequence count", {OpCodeDictKeys.TIMEOUT: 2.0}),
"10": ("Ploc MPSoC: Read DEADBEEF address", {OpCodeDictKeys.TIMEOUT: 2.0}),
"11": ("Ploc MPSoC: Mode replay", {OpCodeDictKeys.TIMEOUT: 2.0}),
"0": ("Ploc MPSoC: Set mode off", {OpCodeDictKeys.TIMEOUT: 2.0}),
"1": ("Ploc MPSoC: Set mode on", {OpCodeDictKeys.TIMEOUT: 2.0}),
"2": ("Ploc MPSoC: Set mode normal", {OpCodeDictKeys.TIMEOUT: 2.0}),
"3": ("Ploc MPSoC: Memory write", {OpCodeDictKeys.TIMEOUT: 2.0}),
"4": ("Ploc MPSoC: Memory read", {OpCodeDictKeys.TIMEOUT: 2.0}),
"5": ("Ploc MPSoC: Flash write", {OpCodeDictKeys.TIMEOUT: 2.0}),
"6": ("Ploc MPSoC: Flash delete", {OpCodeDictKeys.TIMEOUT: 2.0}),
"7": ("Ploc MPSoC: Replay start", {OpCodeDictKeys.TIMEOUT: 2.0}),
"8": ("Ploc MPSoC: Replay stop", {OpCodeDictKeys.TIMEOUT: 2.0}),
"9": ("Ploc MPSoC: Downlink pwr on", {OpCodeDictKeys.TIMEOUT: 2.0}),
"10": ("Ploc MPSoC: Downlink pwr off", {OpCodeDictKeys.TIMEOUT: 2.0}),
"11": ("Ploc MPSoC: Replay write sequence", {OpCodeDictKeys.TIMEOUT: 2.0}),
"12": ("Ploc MPSoC: OBSW reset sequence count", {OpCodeDictKeys.TIMEOUT: 2.0}),
"13": ("Ploc MPSoC: Read DEADBEEF address", {OpCodeDictKeys.TIMEOUT: 2.0}),
"14": ("Ploc MPSoC: Mode replay", {OpCodeDictKeys.TIMEOUT: 2.0}),
}
service_ploc_mpsoc_tuple = ("Ploc MPSoC", op_code_dict_srv_ploc_mpsoc)
cmd_dict[CustomServiceList.PLOC_MPSOC.value] = service_ploc_mpsoc_tuple

View File

@@ -7,17 +7,32 @@
"""
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.tc.packer import TcQueueT
from tmtccmd.tc.service_3_housekeeping import generate_one_hk_command, make_sid
from tmtccmd.tc.service_3_housekeeping import (
generate_one_hk_command,
make_sid,
generate_one_diag_command,
)
from gomspace.gomspace_common import *
from config.object_ids import P60_DOCK_HANDLER
class P60OpCodes(enum.Enum):
TEST = "0"
STACK_3V3_ON = "1"
STACK_3V3_OFF = "2"
STACK_5V_ON = "3"
STACK_5V_OFF = "4"
HK_SET_ID = 0x3
class P60OpCodes:
STACK_3V3_ON = ["stack-3v3-on", "1"]
STACK_3V3_OFF = ["stack-3v3-off", "2"]
STACK_5V_ON = ["stack-5v-on", "3"]
STACK_5V_OFF = ["stack-5v-off", "4"]
TEST = ["test", "0"]
class Info:
PREFIX = "P60 Dock"
STACK_3V3_ON = f"{PREFIX}: Turn Stack 3V3 on"
STACK_3V3_OFF = f"{PREFIX}: Turn Stack 3V3 off"
STACK_5V_ON = f"{PREFIX}: Turn Stack 5V on"
STACK_5V_OFF = f"{PREFIX}: Turn Stack 5V off"
class P60DockTestProcedure:
@@ -76,8 +91,8 @@ class P60DockHkTable:
def pack_p60dock_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: str):
if op_code == P60OpCodes.STACK_3V3_ON.value:
tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Turning stack 3V3 on"))
if op_code in P60OpCodes.STACK_3V3_ON:
tc_queue.appendleft((QueueCommands.PRINT, Info.STACK_3V3_ON))
command = pack_set_param_command(
object_id,
P60DockConfigTable.out_en_9.parameter_address,
@@ -85,8 +100,8 @@ def pack_p60dock_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: st
Channel.on,
)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == P60OpCodes.STACK_3V3_OFF.value:
tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Turning stack 3V3 off"))
if op_code in P60OpCodes.STACK_3V3_OFF:
tc_queue.appendleft((QueueCommands.PRINT, Info.STACK_3V3_OFF))
command = pack_set_param_command(
object_id,
P60DockConfigTable.out_en_9.parameter_address,
@@ -94,8 +109,8 @@ def pack_p60dock_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: st
Channel.off,
)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == P60OpCodes.STACK_5V_ON.value:
tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Turning stack 5V on"))
if op_code in P60OpCodes.STACK_5V_ON:
tc_queue.appendleft((QueueCommands.PRINT, Info.STACK_5V_ON))
command = pack_set_param_command(
object_id,
P60DockConfigTable.out_en_10.parameter_address,
@@ -103,8 +118,8 @@ def pack_p60dock_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: st
Channel.on,
)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == P60OpCodes.STACK_5V_OFF.value:
tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Turning stack 5V off"))
if op_code in P60OpCodes.STACK_5V_OFF:
tc_queue.appendleft((QueueCommands.PRINT, Info.STACK_5V_OFF))
command = pack_set_param_command(
object_id,
P60DockConfigTable.out_en_10.parameter_address,
@@ -112,12 +127,12 @@ def pack_p60dock_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: st
Channel.off,
)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == GomspaceOpCodes.REQUEST_HK_ONCE.value:
if op_code in GomspaceOpCodes.REQUEST_HK_ONCE:
tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Requesting HK Table Once"))
hk_sid = make_sid(object_id=P60_DOCK_HANDLER, set_id=SetIds.P60_DOCK)
command = generate_one_hk_command(sid=hk_sid, ssc=0)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == GomspaceOpCodes.PRINT_SWITCH_V_I.value:
if op_code in GomspaceOpCodes.PRINT_SWITCH_V_I:
tc_queue.appendleft(
(QueueCommands.PRINT, "P60 Dock: Print Switches, Voltages, Currents")
)
@@ -125,7 +140,7 @@ def pack_p60dock_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: st
object_id=object_id, action_id=GomspaceDeviceActionIds.PRINT_SWITCH_V_I
)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == GomspaceOpCodes.PRINT_LATCHUPS.value:
if op_code in GomspaceOpCodes.PRINT_LATCHUPS:
tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Print Latchups"))
command = generate_action_command(
object_id=object_id, action_id=GomspaceDeviceActionIds.PRINT_LATCHUPS

View File

@@ -210,12 +210,12 @@ def pack_pdu1_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str):
Channel.off,
)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == GomspaceOpCodes.REQUEST_HK_ONCE.value:
if op_code in GomspaceOpCodes.REQUEST_HK_ONCE:
tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Requesting HK Table Once"))
hk_sid = make_sid(object_id=PDU_1_HANDLER_ID, set_id=SetIds.PDU_1)
command = generate_one_hk_command(sid=hk_sid, ssc=0)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == GomspaceOpCodes.PRINT_SWITCH_V_I.value:
if op_code in GomspaceOpCodes.PRINT_SWITCH_V_I:
tc_queue.appendleft(
(QueueCommands.PRINT, "PDU1: Print Switches, Voltages, Currents")
)
@@ -223,7 +223,7 @@ def pack_pdu1_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str):
object_id=object_id, action_id=GomspaceDeviceActionIds.PRINT_SWITCH_V_I
)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == GomspaceOpCodes.PRINT_LATCHUPS.value:
if op_code in GomspaceOpCodes.PRINT_LATCHUPS:
tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Print Latchups"))
command = generate_action_command(
object_id=object_id, action_id=GomspaceDeviceActionIds.PRINT_LATCHUPS

View File

@@ -215,9 +215,7 @@ def pack_pdu2_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str):
)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == Pdu2OpCodes.PL_CAMERA_ON.value:
tc_queue.appendleft(
(QueueCommands.PRINT, "PDU2: Turn payload camera on")
)
tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Turn payload camera on"))
command = pack_set_param_command(
object_id,
PDUConfigTable.out_en_8.parameter_address,
@@ -226,9 +224,7 @@ def pack_pdu2_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str):
)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == Pdu2OpCodes.PL_CAMERA_OFF.value:
tc_queue.appendleft(
(QueueCommands.PRINT, "PDU2: Turn payload camera off")
)
tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Turn payload camera off"))
command = pack_set_param_command(
object_id,
PDUConfigTable.out_en_8.parameter_address,
@@ -236,12 +232,12 @@ def pack_pdu2_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str):
Channel.off,
)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == GomspaceOpCodes.REQUEST_HK_ONCE.value:
if op_code in GomspaceOpCodes.REQUEST_HK_ONCE:
tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Requesting HK Table Once"))
hk_sid = make_sid(object_id=PDU_2_HANDLER_ID, set_id=SetIds.PDU_2)
command = generate_one_hk_command(sid=hk_sid, ssc=0)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == GomspaceOpCodes.PRINT_SWITCH_V_I.value:
if op_code in GomspaceOpCodes.PRINT_SWITCH_V_I:
tc_queue.appendleft(
(QueueCommands.PRINT, "PDU2: Print Switches, Currents, Voltahes")
)
@@ -249,7 +245,7 @@ def pack_pdu2_commands(object_id: bytearray, tc_queue: TcQueueT, op_code: str):
object_id=object_id, action_id=GomspaceDeviceActionIds.PRINT_SWITCH_V_I
)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == GomspaceOpCodes.PRINT_LATCHUPS.value:
if op_code in GomspaceOpCodes.PRINT_LATCHUPS:
tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Print Latchups"))
command = generate_action_command(
object_id=object_id, action_id=GomspaceDeviceActionIds.PRINT_LATCHUPS

View File

@@ -10,10 +10,11 @@ import struct
import enum
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.utility.logger import get_console_logger
from tmtccmd.logging import get_console_logger
from tmtccmd.tc.packer import TcQueueT
from spacepackets.ecss.tc import PusTelecommand
from utility.input_helper import InputHelper
from tmtccmd.tc.service_200_mode import pack_mode_data, Modes
LOGGER = get_console_logger()
@@ -59,7 +60,7 @@ class PlocReplyIds:
def pack_ploc_mpsoc_commands(
object_id: bytearray, tc_queue: TcQueueT, op_code: str
object_id: bytearray, tc_queue: TcQueueT, op_code: str
) -> TcQueueT:
tc_queue.appendleft(
(
@@ -69,8 +70,25 @@ def pack_ploc_mpsoc_commands(
)
if op_code == "0":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Set mode off"))
command = pack_mode_data(object_id, Modes.OFF, 0)
command = PusTelecommand(service=200, subservice=1, ssc=9, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "1":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Set mode on"))
command = pack_mode_data(object_id, Modes.ON, 0)
command = PusTelecommand(service=200, subservice=1, ssc=10, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "2":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Mode Normal"))
command = pack_mode_data(object_id, Modes.NORMAL, 0)
command = PusTelecommand(service=200, subservice=1, ssc=11, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "3":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: TC mem write test"))
memory_address = int(input("PLOC MPSoC: Tc Mem Write: Type memory address: 0x"), 16)
memory_address = int(
input("PLOC MPSoC: Tc Mem Write: Type memory address: 0x"), 16
)
memory_data = int(input("PLOC MPSoC: Tc Mem Write: Type memory data: 0x"), 16)
# TODO: implement variable length mem write command
mem_len = 1 # 1 32-bit word
@@ -79,61 +97,67 @@ def pack_ploc_mpsoc_commands(
)
command = PusTelecommand(service=8, subservice=128, ssc=20, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "1":
elif op_code == "4":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: TC mem read test"))
command = prepare_mem_read_command(object_id)
command = PusTelecommand(service=8, subservice=128, ssc=21, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "2":
elif op_code == "5":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Flash write"))
command = prepare_flash_write_cmd(object_id)
command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "3":
elif op_code == "6":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Flash delete"))
command = prepare_flash_delete_cmd(object_id)
command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "4":
elif op_code == "7":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Replay start"))
command = prepare_replay_start_cmd(object_id)
command = PusTelecommand(service=8, subservice=128, ssc=24, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "5":
elif op_code == "8":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Replay stop"))
command = object_id + struct.pack('!I', CommandIds.TC_REPLAY_STOP)
command = object_id + struct.pack("!I", CommandIds.TC_REPLAY_STOP)
command = PusTelecommand(service=8, subservice=128, ssc=25, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "6":
elif op_code == "9":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Downlink pwr on"))
command = prepare_downlink_pwr_on_cmd(object_id)
command = PusTelecommand(service=8, subservice=128, ssc=26, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "7":
elif op_code == "10":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Downlink pwr off"))
command = object_id + struct.pack('!I', CommandIds.TC_DOWNLINK_PWR_OFF)
command = object_id + struct.pack("!I", CommandIds.TC_DOWNLINK_PWR_OFF)
command = PusTelecommand(service=8, subservice=128, ssc=26, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "8":
elif op_code == "11":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Replay write sequence"))
command = prepare_replay_write_sequence_cmd(object_id)
command = PusTelecommand(service=8, subservice=128, ssc=27, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "9":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Reset OBSW sequence count"))
command = object_id + struct.pack('!I', CommandIds.OBSW_RESET_SEQ_COUNT)
elif op_code == "12":
tc_queue.appendleft(
(QueueCommands.PRINT, "PLOC MPSoC: Reset OBSW sequence count")
)
command = object_id + struct.pack("!I", CommandIds.OBSW_RESET_SEQ_COUNT)
command = PusTelecommand(service=8, subservice=128, ssc=28, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "10":
elif op_code == "13":
num_words = 1
tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Read DEADBEEF address"))
command = object_id + struct.pack('!I', CommandIds.TC_MEM_READ) + struct.pack("!I", MemAddresses.DEADBEEF) + \
struct.pack('!H', num_words)
command = (
object_id
+ struct.pack("!I", CommandIds.TC_MEM_READ)
+ struct.pack("!I", MemAddresses.DEADBEEF)
+ struct.pack("!H", num_words)
)
command = PusTelecommand(service=8, subservice=128, ssc=29, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "11":
elif op_code == "14":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Tc mode replay"))
command = object_id + struct.pack('!I', CommandIds.TC_MODE_REPLAY)
command = object_id + struct.pack("!I", CommandIds.TC_MODE_REPLAY)
command = PusTelecommand(service=8, subservice=128, ssc=30, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
@@ -141,7 +165,7 @@ def pack_ploc_mpsoc_commands(
def generate_write_mem_command(
object_id: bytearray, memory_address: int, memory_data: int, mem_len: int
object_id: bytearray, memory_address: int, memory_data: int, mem_len: int
) -> bytearray:
"""This function generates the command to write to a memory address within the PLOC
@param object_id The object id of the PlocHandler
@@ -149,11 +173,11 @@ def generate_write_mem_command(
@param memory_data The data to write to the memory address specified by the bytearray memory_address.
"""
command = (
object_id
+ struct.pack('!I', CommandIds.TC_MEM_WRITE)
+ struct.pack('!I', memory_address)
+ struct.pack('!H', mem_len)
+ struct.pack("!I", memory_data)
object_id
+ struct.pack("!I", CommandIds.TC_MEM_WRITE)
+ struct.pack("!I", memory_address)
+ struct.pack("!H", mem_len)
+ struct.pack("!I", memory_data)
)
return command
@@ -162,8 +186,10 @@ def prepare_mem_read_command(object_id: bytearray) -> bytearray:
memory_address = int(input("PLOC MPSoC Tc Mem Read: Type memory address: 0x"), 16)
num_words = int(input("PLOC MPSoC specify number of words (32-bit) to read: "))
command = (
object_id + struct.pack('!I', CommandIds.TC_MEM_READ) + struct.pack("!I", memory_address) + struct.pack(
'!H', num_words)
object_id
+ struct.pack("!I", CommandIds.TC_MEM_READ)
+ struct.pack("!I", memory_address)
+ struct.pack("!H", num_words)
)
return command
@@ -171,28 +197,44 @@ def prepare_mem_read_command(object_id: bytearray) -> bytearray:
def prepare_flash_write_cmd(object_id: bytearray) -> bytearray:
obcFile = get_obc_file()
mpsocFile = get_mpsoc_file()
command = object_id + struct.pack('!I', CommandIds.FLASH_WRITE) + bytearray(obcFile, 'utf-8') + bytearray(mpsocFile,
'utf-8')
command = (
object_id
+ struct.pack("!I", CommandIds.FLASH_WRITE)
+ bytearray(obcFile, "utf-8")
+ bytearray(mpsocFile, "utf-8")
)
return command
def prepare_flash_delete_cmd(object_id: bytearray) -> bytearray:
file = get_mpsoc_file()
command = object_id + struct.pack('!I', CommandIds.TC_FLASH_DELETE) + bytearray(file, 'utf-8')
command = (
object_id
+ struct.pack("!I", CommandIds.TC_FLASH_DELETE)
+ bytearray(file, "utf-8")
)
return command
def prepare_replay_start_cmd(object_id: bytearray) -> bytearray:
replay = int(input("Specify replay mode (0 - once, 1 - repeated): "))
command = object_id + struct.pack('!I', CommandIds.TC_REPLAY_START) + struct.pack('!B', replay)
command = (
object_id
+ struct.pack("!I", CommandIds.TC_REPLAY_START)
+ struct.pack("!B", replay)
)
return command
def prepare_downlink_pwr_on_cmd(object_id: bytearray) -> bytearray:
mode = int(input("Specify JESD mode (0 - 5): "))
lane_rate = int(input("Specify lane rate (0 - 9): "))
command = object_id + struct.pack('!I', CommandIds.TC_DOWNLINK_PWR_ON) + struct.pack('!B', mode) \
+ struct.pack('!B', lane_rate)
command = (
object_id
+ struct.pack("!I", CommandIds.TC_DOWNLINK_PWR_ON)
+ struct.pack("!B", mode)
+ struct.pack("!B", lane_rate)
)
return command
@@ -200,8 +242,12 @@ def prepare_replay_write_sequence_cmd(object_id: bytearray) -> bytearray:
null_terminator = 0
use_decoding = int(input("Use decoding (set to 1): "))
file = get_sequence_file()
command = object_id + struct.pack('!I', CommandIds.TC_REPLAY_WRITE_SEQUENCE) + struct.pack('!B', use_decoding) + \
bytearray(file, 'utf-8')
command = (
object_id
+ struct.pack("!I", CommandIds.TC_REPLAY_WRITE_SEQUENCE)
+ struct.pack("!B", use_decoding)
+ bytearray(file, "utf-8")
)
return command

View File

@@ -11,7 +11,7 @@ import struct
from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.tc.packer import TcQueueT
from tmtccmd.utility.logger import get_console_logger
from tmtccmd.logging import get_console_logger
from tmtccmd.tc.service_200_mode import pack_mode_data, Modes
LOGGER = get_console_logger()
@@ -86,16 +86,12 @@ def pack_ploc_supv_commands(
)
)
if op_code == "0":
tc_queue.appendleft(
(QueueCommands.PRINT, "PLOC Supervisor: Set mode off")
)
tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Set mode off"))
command = pack_mode_data(object_id, Modes.OFF, 0)
command = PusTelecommand(service=200, subservice=1, ssc=9, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "1":
tc_queue.appendleft(
(QueueCommands.PRINT, "PLOC Supervisor: Set mode on")
)
tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Set mode on"))
command = pack_mode_data(object_id, Modes.ON, 0)
command = PusTelecommand(service=200, subservice=1, ssc=10, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())

View File

@@ -9,7 +9,7 @@ from tmtccmd.tc.service_20_parameter import (
pack_fsfw_load_param_cmd,
pack_boolean_parameter_app_data,
)
from tmtccmd.utility.logger import get_console_logger
from tmtccmd.logging import get_console_logger
from spacepackets.ecss.tc import PusTelecommand
from config.object_ids import PL_PCDU_ID
@@ -18,13 +18,13 @@ LOGGER = get_console_logger()
class OpCodes:
SWITCH_ON = ["0", "on"]
SWITCH_ADC_NORMAL = ["1", "adc-normal"]
SWITCH_ALL_NORMAL = ["2", "all-normal"]
SWITCH_OFF = ["3", "off"]
UPDATE_DRO_TO_X8_WAIT = ["6", "dro-to-x8-wait"]
UPDATE_X8_TO_TX_WAIT_TIME = ["7", "x8-to-tx-wait"]
UPDATE_TX_TO_MPA_WAIT_TIME = ["8", "tx-to-mpa-wait"]
UPDATE_MPA_TO_HPA_WAIT_TIME = ["9", "mpa-to-hpa-wait"]
SWITCH_OFF = ["1", "off"]
NORMAL_SSR = ["2", "nml-ssr"]
NORMAL_DRO = ["3", "nml-dro"]
NORMAL_X8 = ["4", "nml-x8"]
NORMAL_TX = ["5", "nml-tx"]
NORMAL_MPA = ["6", "nml-mpa"]
NORMAL_HPA = ["7", "nml-hpa"]
INJECT_SSR_TO_DRO_FAILURE = ["10", "inject-ssr-dro-fault"]
INJECT_DRO_TO_X8_FAILURE = ["11", "inject-dro-x8-fault"]
@@ -33,10 +33,32 @@ class OpCodes:
INJECT_MPA_TO_HPA_FAILURE = ["14", "inject-mpa-hpa-fault"]
INJECT_ALL_ON_FAILURE = ["15", "inject-all-on-fault"]
# The following commands might become deprecated in the future
UPDATE_DRO_TO_X8_WAIT = ["128", "dro-to-x8-wait"]
UPDATE_X8_TO_TX_WAIT_TIME = ["129", "x8-to-tx-wait"]
UPDATE_TX_TO_MPA_WAIT_TIME = ["130", "tx-to-mpa-wait"]
UPDATE_MPA_TO_HPA_WAIT_TIME = ["131", "mpa-to-hpa-wait"]
class Submodes(enum.IntEnum):
ADC_ON = 0
ALL_ON = 1
class Info:
NORMAL = "PL PCDU ADC modules normal"
SWITCH_ON = "Switching PL PCDU on"
SWITCH_OFF = "Switching PL PCDU off"
NORMAL_SSR = f"{NORMAL}, SSR on"
NORMAL_DRO = f"{NORMAL},DRO on"
NORMAL_X8 = f"{NORMAL}, X8 on"
NORMAL_TX = f"{NORMAL}, TX on"
NORMAL_MPA = f"{NORMAL}, MPA on"
NORMAL_HPA = f"{NORMAL}, HPA on"
class NormalSubmodesMask(enum.IntEnum):
SOLID_STATE_RELAYS_ADC_ON = 0
DRO_ON = 1
X8_ON = 2
TX_ON = 3
MPA_ON = 4
HPA_ON = 5
class ParamIds(enum.IntEnum):
@@ -79,67 +101,79 @@ class ParamIds(enum.IntEnum):
def pack_pl_pcdu_commands(tc_queue: TcQueueT, op_code: str):
if op_code in OpCodes.SWITCH_ON:
tc_queue.appendleft((QueueCommands.PRINT, "Switching PL PCDU on"))
mode_data = pack_mode_data(object_id=PL_PCDU_ID, mode=Modes.ON, submode=0)
mode_cmd = PusTelecommand(
service=200, subservice=Subservices.COMMAND_MODE_COMMAND, app_data=mode_data
pack_pl_pcdu_mode_cmd(
tc_queue=tc_queue, info=Info.SWITCH_ON, mode=Modes.ON, submode=0
)
tc_queue.appendleft(mode_cmd.pack_command_tuple())
if op_code in OpCodes.SWITCH_OFF:
tc_queue.appendleft((QueueCommands.PRINT, "Switching PL PCDU off"))
mode_data = pack_mode_data(object_id=PL_PCDU_ID, mode=Modes.OFF, submode=0)
mode_cmd = PusTelecommand(
service=200, subservice=Subservices.COMMAND_MODE_COMMAND, app_data=mode_data
pack_pl_pcdu_mode_cmd(
tc_queue=tc_queue, info=Info.SWITCH_OFF, mode=Modes.OFF, submode=0
)
tc_queue.appendleft(mode_cmd.pack_command_tuple())
if op_code in OpCodes.SWITCH_ADC_NORMAL:
tc_queue.appendleft(
(QueueCommands.PRINT, "Switching PL PCDU ADC module normal, submode ADC ON")
)
mode_data = pack_mode_data(
object_id=PL_PCDU_ID, mode=Modes.NORMAL, submode=Submodes.ADC_ON
)
mode_cmd = PusTelecommand(
service=200, subservice=Subservices.COMMAND_MODE_COMMAND, app_data=mode_data
)
tc_queue.appendleft(mode_cmd.pack_command_tuple())
if op_code in OpCodes.SWITCH_ALL_NORMAL:
tc_queue.appendleft(
(
QueueCommands.PRINT,
"Switching all PL PCDU modules normal, submode ALL ON",
)
)
mode_data = pack_mode_data(
object_id=PL_PCDU_ID, mode=Modes.NORMAL, submode=Submodes.ALL_ON
)
mode_cmd = PusTelecommand(
service=200, subservice=Subservices.COMMAND_MODE_COMMAND, app_data=mode_data
)
tc_queue.appendleft(mode_cmd.pack_command_tuple())
if op_code in OpCodes.UPDATE_DRO_TO_X8_WAIT:
pack_wait_time_cmd(
if op_code in OpCodes.NORMAL_SSR:
pack_pl_pcdu_mode_cmd(
tc_queue=tc_queue,
param_id=ParamIds.DRO_TO_X8_WAIT_TIME,
print_str="DRO to X8",
info=Info.NORMAL_SSR,
mode=Modes.NORMAL,
submode=(1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON),
)
if op_code in OpCodes.UPDATE_X8_TO_TX_WAIT_TIME:
pack_wait_time_cmd(
if op_code in OpCodes.NORMAL_DRO:
pack_pl_pcdu_mode_cmd(
tc_queue=tc_queue,
param_id=ParamIds.X8_TO_TX_WAIT_TIME,
print_str="X8 to TX",
info=Info.NORMAL_DRO,
mode=Modes.NORMAL,
submode=(
1 << NormalSubmodesMask.DRO_ON
| (1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON)
),
)
if op_code in OpCodes.UPDATE_TX_TO_MPA_WAIT_TIME:
pack_wait_time_cmd(
if op_code in OpCodes.NORMAL_X8:
pack_pl_pcdu_mode_cmd(
tc_queue=tc_queue,
param_id=ParamIds.TX_TO_MPA_WAIT_TIME,
print_str="TX to MPA",
info=Info.NORMAL_X8,
mode=Modes.NORMAL,
submode=(
1 << NormalSubmodesMask.DRO_ON
| (1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON)
| (1 << NormalSubmodesMask.X8_ON)
),
)
if op_code in OpCodes.UPDATE_MPA_TO_HPA_WAIT_TIME:
pack_wait_time_cmd(
if op_code in OpCodes.NORMAL_TX:
pack_pl_pcdu_mode_cmd(
tc_queue=tc_queue,
param_id=ParamIds.MPA_TO_HPA_WAIT_TIME,
print_str="MPA to HPA",
info=Info.NORMAL_TX,
mode=Modes.NORMAL,
submode=(
1 << NormalSubmodesMask.DRO_ON
| (1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON)
| (1 << NormalSubmodesMask.X8_ON)
| (1 << NormalSubmodesMask.TX_ON)
),
)
if op_code in OpCodes.NORMAL_MPA:
pack_pl_pcdu_mode_cmd(
tc_queue=tc_queue,
info=Info.NORMAL_MPA,
mode=Modes.NORMAL,
submode=(
1 << NormalSubmodesMask.DRO_ON
| (1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON)
| (1 << NormalSubmodesMask.X8_ON)
| (1 << NormalSubmodesMask.TX_ON)
| (1 << NormalSubmodesMask.MPA_ON)
),
)
if op_code in OpCodes.NORMAL_HPA:
pack_pl_pcdu_mode_cmd(
tc_queue=tc_queue,
info=Info.NORMAL_HPA,
mode=Modes.NORMAL,
submode=(
1 << NormalSubmodesMask.DRO_ON
| (1 << NormalSubmodesMask.SOLID_STATE_RELAYS_ADC_ON)
| (1 << NormalSubmodesMask.X8_ON)
| (1 << NormalSubmodesMask.TX_ON)
| (1 << NormalSubmodesMask.MPA_ON)
| (1 << NormalSubmodesMask.HPA_ON)
),
)
if op_code in OpCodes.INJECT_ALL_ON_FAILURE:
pack_failure_injection_cmd(
@@ -189,3 +223,17 @@ def pack_failure_injection_cmd(tc_queue: TcQueueT, param_id: int, print_str: str
)
cmd = pack_fsfw_load_param_cmd(ssc=0, app_data=param_data)
tc_queue.appendleft(cmd.pack_command_tuple())
def pack_pl_pcdu_mode_cmd(tc_queue: TcQueueT, info: str, mode: int, submode: int):
tc_queue.appendleft(
(
QueueCommands.PRINT,
info,
)
)
mode_data = pack_mode_data(object_id=PL_PCDU_ID, mode=mode, submode=submode)
mode_cmd = PusTelecommand(
service=200, subservice=Subservices.COMMAND_MODE_COMMAND, app_data=mode_data
)
tc_queue.appendleft(mode_cmd.pack_command_tuple())

View File

@@ -5,6 +5,8 @@
@author J. Meier
@date 01.07.2021
"""
import struct
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.tc.packer import TcQueueT
@@ -12,6 +14,13 @@ from spacepackets.ecss.tc import PusTelecommand
from pus_tc.service_200_mode import pack_mode_data
class CommandIds:
START_CONVERSIONS = 2
READ_CONVERSIONS = 3
ENABLE_DEBUG_OUTPUT = 4
DISABLE_DEBUG_OUTPUT = 5
def pack_rad_sensor_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: str):
tc_queue.appendleft(
(
@@ -37,3 +46,27 @@ def pack_rad_sensor_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code:
mode_data = pack_mode_data(object_id, 0, 0)
command = PusTelecommand(service=200, subservice=1, ssc=42, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "3":
tc_queue.appendleft((QueueCommands.PRINT, "Rad sensor: Start conversions"))
command = object_id + struct.pack("!I", CommandIds.START_CONVERSIONS)
command = PusTelecommand(service=8, subservice=128, ssc=43, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "4":
tc_queue.appendleft((QueueCommands.PRINT, "Rad sensor: Read conversions"))
command = object_id + struct.pack("!I", CommandIds.READ_CONVERSIONS)
command = PusTelecommand(service=8, subservice=128, ssc=44, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "5":
tc_queue.appendleft((QueueCommands.PRINT, "Rad sensor: Enable debug output"))
command = object_id + struct.pack("!I", CommandIds.ENABLE_DEBUG_OUTPUT)
command = PusTelecommand(service=8, subservice=128, ssc=45, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "6":
tc_queue.appendleft((QueueCommands.PRINT, "Rad sensor: Disable debug output"))
command = object_id + struct.pack("!I", CommandIds.DISABLE_DEBUG_OUTPUT)
command = PusTelecommand(service=8, subservice=128, ssc=45, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())

View File

@@ -12,7 +12,7 @@ from tmtccmd.config.definitions import QueueCommands
from tmtccmd.tc.packer import TcQueueT
from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.tc.service_200_mode import pack_mode_data, Modes
from tmtccmd.utility.logger import get_console_logger
from tmtccmd.logging import get_console_logger
from utility.input_helper import InputHelper

View File

@@ -10,6 +10,8 @@ from tmtccmd.config.definitions import QueueCommands
from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.tc.service_3_housekeeping import make_sid, generate_one_hk_command
from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.tc.service_200_mode import pack_mode_data, Modes
import struct
class SetIds:
@@ -18,13 +20,21 @@ class SetIds:
class CommandIds:
SET_TX_MODE_STANDBY = bytearray([0x0, 0x0, 0x0, 0x3])
SET_TX_MODE_MODULATION = bytearray([0x0, 0x0, 0x0, 0x4])
SET_TX_MODE_CW = bytearray([0x0, 0x0, 0x0, 0x5])
READ_TX_STATUS = bytearray([0x0, 0x0, 0x0, 0x7])
READ_TX_WAVEFORM = bytearray([0x0, 0x0, 0x0, 0x8])
READ_TX_AGC_VALUE_HIGH_BYTE = bytearray([0x0, 0x0, 0x0, 0x9])
READ_TX_AGC_VALUE_LOW_BYTE = bytearray([0x0, 0x0, 0x0, 0x9])
READ_RX_STATUS_REGISTERS = 2
SET_TX_MODE_STANDBY = 3
SET_TX_MODE_MODULATION = 4
SET_TX_MODE_CW = 5
READ_TX_STATUS = 7
READ_TX_WAVEFORM = 8
READ_TX_AGC_VALUE_HIGH_BYTE = 9
READ_TX_AGC_VALUE_LOW_BYTE = 10
WRITE_LCL_CONFIG = 11
READ_LCL_CONFIG_REGISTER = 12
SET_WAVEFORM_OQPSK = 17
SET_WAVEFORM_BPSK = 18
SET_SECOND_CONFIG = 19
ENABLE_DEBUG = 20
DISABLE_DEBUG = 21
def pack_syrlinks_command(
@@ -33,56 +43,111 @@ def pack_syrlinks_command(
tc_queue.appendleft(
(
QueueCommands.PRINT,
"Testing PLOC memory dumper with object id: 0x" + object_id.hex(),
"Testing Syrlinks with object id: 0x" + object_id.hex(),
)
)
if op_code == "0":
tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Set mode off"))
command = pack_mode_data(object_id, Modes.OFF, 0)
command = PusTelecommand(service=200, subservice=1, ssc=9, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "1":
tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Set mode on"))
command = pack_mode_data(object_id, Modes.ON, 0)
command = PusTelecommand(service=200, subservice=1, ssc=10, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "2":
tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Mode Normal"))
command = pack_mode_data(object_id, Modes.NORMAL, 0)
command = PusTelecommand(service=200, subservice=1, ssc=11, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "3":
tc_queue.appendleft((QueueCommands.PRINT, "syrlinks: Set TX mode standby"))
command = object_id + CommandIds.SET_TX_MODE_STANDBY
command = PusTelecommand(service=8, subservice=128, ssc=10, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "1":
if op_code == "4":
tc_queue.appendleft((QueueCommands.PRINT, "syrlinks: Set TX mode modulation"))
command = object_id + CommandIds.SET_TX_MODE_MODULATION
command = PusTelecommand(service=8, subservice=128, ssc=11, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "2":
if op_code == "5":
tc_queue.appendleft((QueueCommands.PRINT, "syrlinks: Set TX mode CW"))
command = object_id + CommandIds.SET_TX_MODE_CW
command = PusTelecommand(service=8, subservice=128, ssc=12, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "3":
if op_code == "6":
tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Get RX Registers"))
sid = make_sid(object_id, SetIds.RX_REGISTERS_DATASET)
command = generate_one_hk_command(sid, 200)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "4":
if op_code == "7":
tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Get TX Registers"))
sid = make_sid(object_id, SetIds.TX_REGISTERS_DATASET)
command = generate_one_hk_command(sid, 201)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "5":
if op_code == "8":
tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Read TX status"))
command = object_id + CommandIds.READ_TX_STATUS
command = PusTelecommand(service=8, subservice=128, ssc=13, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "6":
if op_code == "9":
tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Read TX waveform"))
command = object_id + CommandIds.READ_TX_WAVEFORM
command = PusTelecommand(service=8, subservice=128, ssc=14, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "7":
if op_code == "10":
tc_queue.appendleft(
(QueueCommands.PRINT, "Syrlinks: Read TX AGC value high byte")
)
command = object_id + CommandIds.READ_TX_AGC_VALUE_HIGH_BYTE
command = PusTelecommand(service=8, subservice=128, ssc=15, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "8":
if op_code == "11":
tc_queue.appendleft(
(QueueCommands.PRINT, "Syrlinks: Read TX AGC value low byte")
)
command = object_id + CommandIds.READ_TX_AGC_VALUE_LOW_BYTE
command = PusTelecommand(service=8, subservice=128, ssc=16, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "12":
tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Write LCL config"))
command = object_id + struct.pack("!I", CommandIds.WRITE_LCL_CONFIG)
command = PusTelecommand(service=8, subservice=128, ssc=17, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "13":
tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Read RX status registers"))
command = object_id + struct.pack("!I", CommandIds.READ_RX_STATUS_REGISTERS)
command = PusTelecommand(service=8, subservice=128, ssc=18, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "14":
tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Read LCL config register"))
command = object_id + struct.pack("!I", CommandIds.READ_LCL_CONFIG_REGISTER)
command = PusTelecommand(service=8, subservice=128, ssc=19, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "15":
tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Set waveform OQPSK"))
command = object_id + struct.pack("!I", CommandIds.SET_WAVEFORM_OQPSK)
command = PusTelecommand(service=8, subservice=128, ssc=20, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "16":
tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Set waveform BPSK"))
command = object_id + struct.pack("!I", CommandIds.SET_WAVEFORM_BPSK)
command = PusTelecommand(service=8, subservice=128, ssc=21, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "17":
tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Set second config"))
command = object_id + struct.pack("!I", CommandIds.SET_SECOND_CONFIG)
command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "18":
tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Enable debug printout"))
command = object_id + struct.pack("!I", CommandIds.ENABLE_DEBUG)
command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "19":
tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Disable debug printout"))
command = object_id + struct.pack("!I", CommandIds.DISABLE_DEBUG)
command = PusTelecommand(service=8, subservice=128, ssc=24, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())

View File

@@ -3,7 +3,7 @@ import enum
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.tc.service_8_functional_cmd import generate_action_command
from tmtccmd.utility.logger import get_console_logger
from tmtccmd.logging import get_console_logger
from tmtccmd.tc.service_3_housekeeping import make_sid, generate_one_hk_command
from config.object_ids import CORE_CONTROLLER_ID

View File

@@ -1,14 +1,19 @@
"""Hook function which packs telecommands based on service and operation code string
"""
import logging
import os
from collections import deque
from typing import Union
from spacepackets.ecss import PusTelecommand
from tmtccmd.com_if.com_interface_base import CommunicationInterface
from tmtccmd.config.definitions import CoreServiceList
from tmtccmd.utility.logger import get_console_logger
from tmtccmd.logging import get_console_logger
from tmtccmd.logging.pus import log_raw_pus_tc
from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.tc.service_5_event import pack_generic_service5_test_into
from tmtccmd.pus.service_17_test import pack_service_17_ping_command
from tmtccmd.logging import get_current_time_string
from pus_tc.service_200_mode import pack_service200_test_into
from pus_tc.devs.p60dock import pack_p60dock_test_into
@@ -68,6 +73,21 @@ from config.object_ids import (
LOGGER = get_console_logger()
def pre_tc_send_cb(
packet: bytes,
com_if: CommunicationInterface,
pus_info: Union[PusTelecommand, any],
file_logger: logging.Logger,
):
log_raw_pus_tc(
packet=packet, srv_subservice=(pus_info.service, pus_info.subservice)
)
tc_info_string = f"Sent {pus_info}"
LOGGER.info(tc_info_string)
file_logger.info(f"{get_current_time_string(True)}: {tc_info_string}")
com_if.send(data=packet)
def pack_service_queue_user(
service: Union[str, int], op_code: str, service_queue: TcQueueT
):

View File

@@ -0,0 +1,107 @@
import struct
from config.object_ids import *
from pus_tc.devs.imtq import ImtqActionIds
from pus_tc.devs.ploc_mpsoc import PlocReplyIds
from pus_tc.devs.ploc_supervisor import SupvActionIds
from pus_tc.devs.star_tracker import StarTrackerActionIds
from tmtccmd.logging import get_console_logger
from tmtccmd.config.definitions import DataReplyUnpacked
from tmtccmd.tm import Service8FsfwTm
from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter
LOGGER = get_console_logger()
def handle_action_reply(
raw_tm: bytes, printer: FsfwTmTcPrinter, obj_id_dict: ObjectIdDictT
):
"""Core Action reply handler
:return:
"""
tm_packet = Service8FsfwTm.unpack(raw_telemetry=raw_tm)
printer.handle_long_tm_print(packet_if=tm_packet, info_if=tm_packet)
object_id = obj_id_dict.get(tm_packet.source_object_id)
custom_data = tm_packet.custom_data
action_id = tm_packet.action_id
generic_print_str = printer.generic_action_packet_tm_print(
packet=tm_packet, obj_id=object_id
)
print(generic_print_str)
printer.file_logger.info(generic_print_str)
if object_id == IMTQ_HANDLER_ID:
return handle_imtq_replies(action_id, printer, custom_data)
elif object_id == PLOC_MPSOC_ID:
return handle_ploc_replies(action_id, printer, custom_data)
elif object_id == PLOC_SUPV_ID:
return handle_supervisor_replies(action_id, printer, custom_data)
elif object_id == STAR_TRACKER_ID:
return handle_startracker_replies(action_id, printer, custom_data)
def handle_imtq_replies(
action_id: int, printer: FsfwTmTcPrinter, custom_data: bytearray
):
if action_id == struct.unpack("!I", ImtqActionIds.get_commanded_dipole)[0]:
header_list = [
"Commanded X-Dipole",
"Commanded Y-Dipole",
"Commanded Z-Dipole",
]
[x_dipole, y_dipole, z_dipole] = struct.unpack("!HHH", custom_data[0:6])
content_list = [x_dipole, y_dipole, z_dipole]
print(header_list)
print(content_list)
printer.file_logger.info(header_list)
printer.file_logger.info(content_list)
def handle_ploc_replies(
action_id: int, printer: FsfwTmTcPrinter, custom_data: bytearray
):
if action_id == PlocReplyIds.tm_mem_read_report:
header_list = [
"PLOC Memory Address",
"PLOC Mem Len",
"PLOC Read Memory Data",
]
content_list = [
"0x" + custom_data[:4].hex(),
struct.unpack("!H", custom_data[4:6])[0],
"0x" + custom_data[6:10].hex(),
]
print(header_list)
print(content_list)
printer.file_logger.info(header_list)
printer.file_logger.info(content_list)
def handle_supervisor_replies(
action_id: int, printer: FsfwTmTcPrinter, custom_data: bytearray
):
reply = DataReplyUnpacked()
if action_id == SupvActionIds.DUMP_MRAM:
header_list = ["MRAM Dump"]
content_list = [custom_data[: len(custom_data)]]
print(header_list)
print(content_list)
printer.file_logger.info(header_list)
printer.file_logger.info(content_list)
def handle_startracker_replies(
action_id: int, printer: FsfwTmTcPrinter, custom_data: bytearray
):
if action_id == StarTrackerActionIds.CHECKSUM:
if len(custom_data) != 5:
LOGGER.warning(
"Star tracker reply has invalid length {0}".format(len(custom_data))
)
return
header_list = ["Checksum", "Checksum valid"]
print(custom_data[4])
checksum_valid_flag = custom_data[4] >> 8
content_list = ["0x" + custom_data[:4].hex(), checksum_valid_flag]
print(header_list)
print(content_list)
printer.file_logger.info(header_list)
printer.file_logger.info(content_list)

View File

@@ -1,8 +1,13 @@
import logging
import os.path
from datetime import datetime
from config.object_ids import get_object_ids
from tmtccmd.utility.logger import get_console_logger
from tmtccmd.utility.fsfw import parse_fsfw_events_csv, EventDictT, EventInfo
from tmtccmd.tm import Service5Tm
from tmtccmd.logging import get_console_logger
from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter
from tmtccmd.fsfw import parse_fsfw_events_csv, EventDictT, EventInfo
LOGGER = get_console_logger()
DEFAULT_EVENTS_CSV_PATH = "config/events.csv"
@@ -21,25 +26,36 @@ def get_event_dict() -> EventDictT:
def handle_event_packet(
object_id: bytes, event_id: int, param_1: int, param_2: int
raw_tm: bytes, printer: FsfwTmTcPrinter, file_logger: logging.Logger
) -> str:
tm = Service5Tm.unpack(raw_telemetry=raw_tm)
printer.handle_long_tm_print(packet_if=tm, info_if=tm)
additional_event_info = ""
event_dict = get_event_dict()
info = event_dict.get(event_id)
info = event_dict.get(tm.event_id)
if info is None:
LOGGER.warning(f"Event ID {event_id} has no information")
LOGGER.warning(f"Event ID {tm.event_id} has no information")
info = EventInfo()
info.name = "Unknown event"
obj_ids = get_object_ids()
obj_id_obj = obj_ids.get(bytes(object_id))
obj_id_obj = obj_ids.get(tm.reporter_id.as_bytes)
if obj_id_obj is None:
LOGGER.warning(f"Object ID 0x{object_id.hex()} has no name")
obj_name = object_id.hex()
LOGGER.warning(f"Object ID 0x{tm.reporter_id.as_string} has no name")
obj_name = tm.reporter_id.as_string
else:
obj_name = obj_id_obj.name
generic_event_string = f"Object {obj_name} generated Event {event_id} | {info.name}"
generic_event_string = (
f"Object {obj_name} generated Event {tm.event_id} | {info.name}"
)
if info.info != "":
additional_event_info = (
f" | Additional info: {info.info} | P1: {param_1} | P2: {param_2}"
f"Additional info: {info.info} | P1: {tm.param_1} | P2: {tm.param_2}"
)
return generic_event_string + additional_event_info
file_logger.info(
f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}: {generic_event_string}"
)
LOGGER.info(generic_event_string)
if additional_event_info != "":
file_logger.info(additional_event_info)
print(additional_event_info)
return generic_event_string + " | " + additional_event_info

View File

@@ -1,64 +1,80 @@
"""Core EIVE TM handler module
"""
@brief This file transfers control of TM parsing to the user
@details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs.
"""
from tmtccmd.tm.service_8_fsfw_functional_cmd import Service8FsfwTm
from spacepackets.ecss.tm import PusTelemetry
from tmtccmd.utility.logger import get_console_logger
from tmtccmd.logging import get_console_logger
from tmtccmd.logging.pus import (
log_raw_pus_tm,
log_raw_unknown_packet,
PacketTypes,
create_tmtc_logger,
)
from tmtccmd.pus.service_1_verification import Service1TMExtended
from tmtccmd.pus.service_17_test import Service17TMExtended
from tmtccmd.tm.service_3_fsfw_housekeeping import Service3FsfwTm
from tmtccmd.tm.service_20_fsfw_parameters import Service20FsfwTm
from tmtccmd.tm.service_5_event import Service5Tm
from tmtccmd.tm.service_200_fsfw_mode import Service200FsfwTm
from tmtccmd.utility.tmtc_printer import TmTcPrinter, PrintFormats
from tmtccmd.utility.tmtc_printer import PrintFormats, FsfwTmTcPrinter
from config.definitions import PUS_APID
from config.object_ids import get_object_ids
from .event_handler import handle_event_packet
from .verification_handler import handle_service_1_packet
from .hk_handling import handle_hk_packet
from .action_reply_handler import handle_action_reply
LOGGER = get_console_logger()
def ccsds_tm_handler(
apid: int, raw_tm_packet: bytearray, tmtc_printer: TmTcPrinter
) -> None:
FSFW_PRINTER = FsfwTmTcPrinter(file_logger=create_tmtc_logger())
def ccsds_tm_handler(apid: int, raw_tm_packet: bytes, _user_args: any) -> None:
if apid == PUS_APID:
pus_factory_hook(raw_tm_packet=raw_tm_packet, tmtc_printer=tmtc_printer)
pus_factory_hook(raw_tm_packet=raw_tm_packet)
def pus_factory_hook(raw_tm_packet: bytearray, tmtc_printer: TmTcPrinter):
def pus_factory_hook(raw_tm_packet: bytes):
if len(raw_tm_packet) < 8:
LOGGER.warning("Detected packet shorter than 8 bytes!")
return
service_type = raw_tm_packet[7]
tm_packet = None
subservice_type = raw_tm_packet[8]
file_logger = FSFW_PRINTER.file_logger
obj_id_dict = get_object_ids()
try:
if service_type == 1:
tm_packet = Service1TMExtended.unpack(raw_telemetry=raw_tm_packet)
if service_type == 3:
tm_packet = Service3FsfwTm.unpack(
raw_telemetry=raw_tm_packet, custom_hk_handling=False
handle_service_1_packet(printer=FSFW_PRINTER, raw_tm=raw_tm_packet)
elif service_type == 3:
handle_hk_packet(
printer=FSFW_PRINTER, raw_tm=raw_tm_packet, obj_id_dict=obj_id_dict
)
if service_type == 5:
tm_packet = Service5Tm.unpack(raw_telemetry=raw_tm_packet)
if service_type == 8:
tm_packet = Service8FsfwTm.unpack(raw_telemetry=raw_tm_packet)
if service_type == 17:
elif service_type == 5:
handle_event_packet(
raw_tm=raw_tm_packet, printer=FSFW_PRINTER, file_logger=file_logger
)
elif service_type == 8:
handle_action_reply(
raw_tm=raw_tm_packet, printer=FSFW_PRINTER, obj_id_dict=obj_id_dict
)
elif service_type == 17:
tm_packet = Service17TMExtended.unpack(raw_telemetry=raw_tm_packet)
if service_type == 20:
FSFW_PRINTER.handle_long_tm_print(packet_if=tm_packet, info_if=tm_packet)
elif service_type == 20:
tm_packet = Service20FsfwTm.unpack(raw_telemetry=raw_tm_packet)
if service_type == 200:
FSFW_PRINTER.handle_long_tm_print(packet_if=tm_packet, info_if=tm_packet)
elif service_type == 200:
tm_packet = Service200FsfwTm.unpack(raw_telemetry=raw_tm_packet)
if tm_packet is None:
FSFW_PRINTER.handle_long_tm_print(packet_if=tm_packet, info_if=tm_packet)
else:
LOGGER.info(
f"The service {service_type} is not implemented in Telemetry Factory"
)
tm_packet = PusTelemetry.unpack(raw_telemetry=raw_tm_packet)
tm_packet.print_source_data(PrintFormats.HEX)
tmtc_printer.print_telemetry(
packet_if=tm_packet, info_if=tm_packet, print_raw_tm=False
log_raw_pus_tm(
packet=raw_tm_packet, srv_subservice=(service_type, subservice_type)
)
except ValueError:
# TODO: Log faulty packet
LOGGER.warning("Invalid packet format detected")
log_raw_unknown_packet(packet=raw_tm_packet, packet_type=PacketTypes.TM)

View File

@@ -3,12 +3,18 @@ import struct
import os
import datetime
from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter
from tmtccmd.config.definitions import HkReplyUnpacked
from tmtccmd.tm.service_3_fsfw_housekeeping import Service3Base
from tmtccmd.utility.logger import get_console_logger
from tmtccmd.tm.service_3_fsfw_housekeeping import (
Service3Base,
HkContentType,
Service3FsfwTm,
)
from tmtccmd.logging import get_console_logger
from pus_tc.devs.bpx_batt import BpxSetIds
from pus_tc.devs.syrlinks_hk_handler import SetIds
from pus_tc.devs.imtq import ImtqSetIds
from tmtccmd.pus.obj_id import ObjectId, ObjectIdDictT
from config.object_ids import (
SYRLINKS_HANDLER_ID,
IMTQ_HANDLER_ID,
@@ -16,45 +22,81 @@ from config.object_ids import (
GPS_HANDLER_1_ID,
BPX_HANDLER_ID,
CORE_CONTROLLER_ID,
P60_DOCK_HANDLER,
PL_PCDU_ID
)
LOGGER = get_console_logger()
def handle_user_hk_packet(
object_id: bytes, set_id: int, hk_data: bytearray, service3_packet: Service3Base
) -> HkReplyUnpacked:
def handle_hk_packet(
raw_tm: bytes,
obj_id_dict: ObjectIdDictT,
printer: FsfwTmTcPrinter,
):
tm_packet = Service3FsfwTm.unpack(raw_telemetry=raw_tm, custom_hk_handling=False)
named_obj_id = obj_id_dict.get(tm_packet.object_id.as_bytes)
if named_obj_id is None:
named_obj_id = tm_packet.object_id
if tm_packet.subservice == 25 or tm_packet.subservice == 26:
hk_data = tm_packet.tm_data[8:]
printer.generic_hk_tm_print(
content_type=HkContentType.HK,
object_id=named_obj_id,
set_id=tm_packet.set_id,
hk_data=hk_data,
)
handle_regular_hk_print(
printer=printer,
object_id=named_obj_id,
hk_packet=tm_packet,
hk_data=hk_data,
)
if tm_packet.subservice == 10 or tm_packet.subservice == 12:
LOGGER.warning("HK definitions printout not implemented yet")
def handle_regular_hk_print(
printer: FsfwTmTcPrinter,
object_id: ObjectId,
hk_packet: Service3Base,
hk_data: bytes,
):
object_id = object_id.as_bytes
set_id = hk_packet.set_id
"""This function is called when a Service 3 Housekeeping packet is received."""
if object_id == SYRLINKS_HANDLER_ID:
if set_id == SetIds.RX_REGISTERS_DATASET:
return handle_syrlinks_rx_registers_dataset(hk_data)
return handle_syrlinks_rx_registers_dataset(printer, hk_data)
elif set_id == SetIds.TX_REGISTERS_DATASET:
return handle_syrlinks_tx_registers_dataset(hk_data)
return handle_syrlinks_tx_registers_dataset(printer, hk_data)
else:
LOGGER.info("Serive 3 TM: Syrlinks handler reply with unknown set id")
LOGGER.info("Service 3 TM: Syrlinks handler reply with unknown set id")
elif object_id == IMTQ_HANDLER_ID:
if (set_id >= ImtqSetIds.POSITIVE_X_TEST) and (
set_id <= ImtqSetIds.NEGATIVE_Z_TEST
):
return handle_self_test_data(hk_data)
return handle_self_test_data(printer, hk_data)
else:
LOGGER.info("Serive 3 TM: Syrlinks handler reply with unknown set id")
LOGGER.info("Service 3 TM: Syrlinks handler reply with unknown set id")
elif object_id == GPS_HANDLER_0_ID or object_id == GPS_HANDLER_1_ID:
return handle_gps_data(hk_data=hk_data)
handle_gps_data(printer=printer, hk_data=hk_data)
elif object_id == BPX_HANDLER_ID:
return handle_bpx_hk_data(hk_data=hk_data, set_id=set_id)
handle_bpx_hk_data(hk_data=hk_data, set_id=set_id, printer=printer)
elif object_id == CORE_CONTROLLER_ID:
return handle_core_hk_data(hk_data=hk_data, set_id=set_id)
return handle_core_hk_data(printer=printer, hk_data=hk_data)
elif object_id == P60_DOCK_HANDLER:
handle_p60_hk_data(printer=printer, hk_data=hk_data)
elif object_id == PL_PCDU_ID:
log_to_both(printer, "Received PL PCDU HK data")
else:
LOGGER.info("Service 3 TM: Parsing for this SID has not been implemented.")
return HkReplyUnpacked()
def handle_syrlinks_rx_registers_dataset(
hk_data: bytearray,
) -> HkReplyUnpacked:
def handle_syrlinks_rx_registers_dataset(printer: FsfwTmTcPrinter, hk_data: bytes):
reply = HkReplyUnpacked()
reply.header_list = [
header_list = [
"RX Status",
"RX Sensitivity",
"RX Frequency Shift",
@@ -72,7 +114,7 @@ def handle_syrlinks_rx_registers_dataset(
rx_demod_eb = struct.unpack("!I", hk_data[13:17])
rx_demod_n0 = struct.unpack("!I", hk_data[17:21])
rx_data_rate = hk_data[21]
reply.content_list = [
content_list = [
rx_status,
rx_sensitivity,
rx_frequency_shift,
@@ -82,28 +124,30 @@ def handle_syrlinks_rx_registers_dataset(
rx_demod_n0,
rx_data_rate,
]
reply.validity_buffer = hk_data[22:]
reply.num_of_vars = 8
return reply
validity_buffer = hk_data[22:]
log_to_both(printer, str(header_list))
log_to_both(printer, str(content_list))
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=8)
def handle_syrlinks_tx_registers_dataset(
hk_data: bytearray,
) -> HkReplyUnpacked:
printer: FsfwTmTcPrinter,
hk_data: bytes,
):
reply = HkReplyUnpacked()
reply.header_list = ["TX Status", "TX Waveform", "TX AGC value"]
header_list = ["TX Status", "TX Waveform", "TX AGC value"]
tx_status = hk_data[0]
tx_waveform = hk_data[1]
tx_agc_value = struct.unpack("!H", hk_data[2:4])
reply.content_list = [tx_status, tx_waveform, tx_agc_value]
reply.validity_buffer = hk_data[4:]
reply.num_of_vars = 3
return reply
content_list = [tx_status, tx_waveform, tx_agc_value]
validity_buffer = hk_data[4:]
log_to_both(printer, str(header_list))
log_to_both(printer, str(content_list))
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=3)
def handle_self_test_data(hk_data: bytearray) -> HkReplyUnpacked:
reply = HkReplyUnpacked()
reply.hk_header = [
def handle_self_test_data(printer: FsfwTmTcPrinter, hk_data: bytes):
header_list = [
"Init Err",
"Init Raw Mag X [nT]",
"Init Raw Mag Y [nT]",
@@ -189,8 +233,8 @@ def handle_self_test_data(hk_data: bytearray) -> HkReplyUnpacked:
fina_coil_y_temperature = struct.unpack("!H", hk_data[125:127])[0]
fina_coil_z_temperature = struct.unpack("!H", hk_data[127:129])[0]
reply.validity_buffer = hk_data[129:]
reply.content_list = [
validity_buffer = hk_data[129:]
content_list = [
init_err,
init_raw_mag_x,
init_raw_mag_y,
@@ -231,17 +275,17 @@ def handle_self_test_data(hk_data: bytearray) -> HkReplyUnpacked:
fina_coil_y_temperature,
fina_coil_z_temperature,
]
reply.num_of_vars = len(reply.hk_header)
return reply
num_of_vars = len(header_list)
log_to_both(printer, str(header_list))
log_to_both(printer, str(content_list))
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=num_of_vars)
def handle_gps_data(hk_data: bytearray) -> HkReplyUnpacked:
def handle_gps_data(printer: FsfwTmTcPrinter, hk_data: bytes):
LOGGER.info(f"Received GPS data, HK data length {len(hk_data)}")
reply = HkReplyUnpacked()
var_index = 0
header_array = []
content_array = []
reply.header_list = [
header_list = [
"Latitude",
"Longitude",
"Altitude",
@@ -263,7 +307,7 @@ def handle_gps_data(hk_data: bytearray) -> HkReplyUnpacked:
seconds = hk_data[32]
date_string = f"{day}.{month}.{year} {hours}:{minutes}:{seconds}"
unix_seconds = struct.unpack("!I", hk_data[33:37])[0]
content_array = [
content_list = [
latitude,
longitude,
altitude,
@@ -285,27 +329,29 @@ def handle_gps_data(hk_data: bytearray) -> HkReplyUnpacked:
f"{datetime.datetime.now()}, {latitude}, {longitude}, {altitude}, "
f"{fix_mode}, {sat_in_use}, {date_string}, {unix_seconds}\n"
)
reply.header_list = header_array
reply.content_list = content_array
reply.validity_buffer = hk_data[37:39]
return reply
validity_buffer = hk_data[37:39]
log_to_both(printer, str(header_list))
log_to_both(printer, str(content_list))
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=10)
def handle_bpx_hk_data(hk_data: bytes, set_id: int) -> HkReplyUnpacked:
LOGGER.info(f"Received BPX data, HK data length {len(hk_data)}")
reply = HkReplyUnpacked()
def handle_bpx_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
if set_id == BpxSetIds.GET_HK_SET:
charge_current = struct.unpack("!H", hk_data[0:2])[0]
discharge_current = struct.unpack("!H", hk_data[2:4])[0]
heater_current = struct.unpack("!H", hk_data[4:6])[0]
batt_voltage = struct.unpack("!H", hk_data[6:8])[0]
batt_temp_1 = struct.unpack("!h", hk_data[8:10])[0]
batt_temp_2 = struct.unpack("!h", hk_data[10:12])[0]
batt_temp_3 = struct.unpack("!h", hk_data[12:14])[0]
batt_temp_4 = struct.unpack("!h", hk_data[14:16])[0]
reboot_cntr = struct.unpack("!I", hk_data[16:20])[0]
boot_cause = hk_data[20]
reply.header_list = [
fmt_str = "!HHHHhhhhIB"
inc_len = struct.calcsize(fmt_str)
(
charge_current,
discharge_current,
heater_current,
batt_voltage,
batt_temp_1,
batt_temp_2,
batt_temp_3,
batt_temp_4,
reboot_cntr,
boot_cause,
) = struct.unpack(fmt_str, hk_data[0:inc_len])
header_list = [
"Charge Current",
"Discharge Current",
"Heater Current",
@@ -317,7 +363,7 @@ def handle_bpx_hk_data(hk_data: bytes, set_id: int) -> HkReplyUnpacked:
"Reboot Counter",
"Boot Cause",
]
reply.content_list = [
content_list = [
charge_current,
discharge_current,
heater_current,
@@ -329,29 +375,189 @@ def handle_bpx_hk_data(hk_data: bytes, set_id: int) -> HkReplyUnpacked:
reboot_cntr,
boot_cause,
]
reply.validity_buffer = hk_data[21:]
validity_buffer = hk_data[inc_len:]
log_to_both(printer, str(header_list))
log_to_both(printer, str(content_list))
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=10)
elif set_id == BpxSetIds.GET_CFG_SET:
battheat_mode = hk_data[0]
battheat_low = struct.unpack("!b", hk_data[1:2])[0]
battheat_high = struct.unpack("!b", hk_data[2:3])[0]
reply.header_list = [
header_list = [
"Battery Heater Mode",
"Battery Heater Low Limit",
"Battery Heater High Limit",
]
reply.content_list = [battheat_mode, battheat_low, battheat_high]
reply.validity_buffer = hk_data[3:]
return reply
content_list = [battheat_mode, battheat_low, battheat_high]
validity_buffer = hk_data[3:]
log_to_both(printer, str(header_list))
log_to_both(printer, str(content_list))
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=10)
def handle_core_hk_data(hk_data: bytes, set_id: int) -> HkReplyUnpacked:
reply = HkReplyUnpacked()
reply.header_list = ["Chip Temperature [°C]", "PS Voltage [mV]", "PL Voltage [mV]"]
temperature = struct.unpack("!f", hk_data[0:4])
ps_voltage = struct.unpack("!f", hk_data[4:8])
pl_voltage = struct.unpack("!f", hk_data[8:12])
tx_agc_value = struct.unpack("!H", hk_data[2:4])
reply.content_list = [temperature, ps_voltage, pl_voltage]
reply.validity_buffer = hk_data[12:]
reply.num_of_vars = 3
return reply
def handle_core_hk_data(printer: FsfwTmTcPrinter, hk_data: bytes):
fmt_str = "!fffH"
inc_len = struct.calcsize(fmt_str)
(temperature, ps_voltage, pl_voltage, tx_agc_value) = struct.unpack(
fmt_str, hk_data[0 : 0 + inc_len]
)
printout = (
f"Chip Temperature [°C] {temperature} | PS Voltage [mV] {ps_voltage} | "
f"PL Voltage [mV] {pl_voltage} | TX AGC {tx_agc_value}"
)
log_to_both(printer, printout)
printer.print_validity_buffer(validity_buffer=hk_data[inc_len:], num_vars=4)
P60_INDEX_LIST = [
"ACU VCC",
"PDU1 VCC",
"X3 IDLE VCC",
"PDU2 VCC",
"ACU VBAT",
"PDU1 VBAT",
"X3 IDLE VBAT",
"PDU2 VBAT",
"STACK VBAT",
"STACK 3V3",
"STACK 5V",
"GS3V3",
"GS5V",
]
WDT_LIST = ["GND", "I2C", "CAN", "CSP0", "CSP1"]
def handle_p60_hk_data(printer: FsfwTmTcPrinter, hk_data: bytes):
current_idx = 0
current_list = []
for idx in range(13):
current_list.append(
struct.unpack("!h", hk_data[current_idx : current_idx + 2])[0]
)
current_idx += 2
voltage_list = []
for idx in range(13):
voltage_list.append(
struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0]
)
current_idx += 2
out_enb_list = []
for idx in range(13):
out_enb_list.append(hk_data[current_idx])
current_idx += 1
header_str = f"{'Name'.ljust(24)} | OutEnb | U [mV] | I [mA]"
print(header_str)
printer.file_logger.info(header_str)
for idx in range(13):
out_enb = f"{out_enb_list[idx]}".ljust(6)
content_line = (
f"{P60_INDEX_LIST[idx].ljust(24)} | {out_enb} | "
f"{voltage_list[idx]:05} | {current_list[idx]:04}"
)
print(content_line)
printer.file_logger.info(content_line)
fmt_str = "!hhIIIhBBB"
inc_len = struct.calcsize(fmt_str)
(
temp0,
temp1,
boot_cause,
boot_count,
uptime,
reset_cause,
batt_mode,
heater_on,
conv_5v_on,
) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len])
current_idx += inc_len
util_info = (
f"Batt Mode {batt_mode} | Boot Count {boot_count} | Heater On {heater_on}"
)
util_info2 = (
f"Reset Cause {reset_cause} | Boot Cause {boot_cause} | Uptime {uptime} | "
f"Conv 5V on {conv_5v_on}"
)
print(util_info)
print(util_info2)
printer.file_logger.info(util_info)
printer.file_logger.info(util_info2)
latchup_list = []
for idx in range(0, 13):
latchup_list.append(
struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0]
)
current_idx += 2
fmt_str = "!HhhHhh"
inc_len = struct.calcsize(fmt_str)
(
dock_vbat,
dock_vcc_current,
batt_current,
batt_voltage,
batt_temp_0,
batt_temp_1,
) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len])
current_idx += inc_len
device_types = []
device_statuses = []
for idx in range(8):
device_types.append(hk_data[current_idx])
current_idx += 1
for idx in range(8):
device_statuses.append(hk_data[current_idx])
current_idx += 1
dearm_status = hk_data[current_idx]
current_idx += 1
wdt_reboots_list = []
for idx in range(5):
wdt_reboots_list.append(
struct.unpack("!I", hk_data[current_idx : current_idx + 4])[0]
)
current_idx += 4
time_pings_left_list = []
for idx in range(3):
time_pings_left_list.append(
struct.unpack("!I", hk_data[current_idx : current_idx + 4])[0]
)
current_idx += 4
for idx in range(2):
time_pings_left_list.append(hk_data[current_idx])
current_idx += 1
batt_charge_current = struct.unpack("!h", hk_data[current_idx : current_idx + 2])[0]
current_idx += 2
batt_discharge_current = struct.unpack(
"!h", hk_data[current_idx : current_idx + 2]
)[0]
current_idx += 2
ant6_depl_status = struct.unpack("!b", hk_data[current_idx : current_idx + 1])[0]
current_idx += 1
ar6_depl_status = struct.unpack("!b", hk_data[current_idx : current_idx + 1])[0]
current_idx += 1
wdt_info = "WDT Type | Reboots | Time or Pings left (CSP only)"
log_to_both(printer, wdt_info)
for idx in range(len(wdt_reboots_list)):
log_to_both(
printer,
f"{WDT_LIST[idx].ljust(5)} | "
f"{wdt_reboots_list[idx]:010} | {time_pings_left_list[idx]:010}",
)
temps = (
f"In C: Temp 0 {temp0 / 10.0} | Temp 1 {temp1 / 10.0} | "
f"Batt Temp 0 {batt_temp_0 / 10.0} | Batt Temp 1 {batt_temp_1 / 10.0}"
)
batt_info = (
f"Batt: Current {batt_current} | Volt {batt_voltage} | "
f"Charge Current {batt_charge_current} | Discharge Current {batt_discharge_current}"
)
log_to_both(printer, temps)
log_to_both(printer, batt_info)
misc_info = f"Dearm {dearm_status} | ANT6 Depl {ant6_depl_status} | AR6 Deply {ar6_depl_status}"
log_to_both(printer, misc_info)
printer.print_validity_buffer(validity_buffer=hk_data[current_idx:], num_vars=36)
def log_to_both(printer: FsfwTmTcPrinter, string: str):
print(string)
printer.file_logger.info(string)

View File

@@ -1,104 +0,0 @@
import struct
from config.object_ids import *
from pus_tc.devs.imtq import ImtqActionIds
from pus_tc.devs.ploc_mpsoc import PlocReplyIds
from pus_tc.devs.ploc_supervisor import SupvActionIds
from pus_tc.devs.star_tracker import StarTrackerActionIds
from tmtccmd.utility.logger import get_console_logger
from tmtccmd.config.definitions import DataReplyUnpacked
LOGGER = get_console_logger()
def user_analyze_service_8_data(
object_id: bytes, action_id: int, custom_data: bytearray
) -> DataReplyUnpacked:
"""
This function is called by the TMTC core if a Service 8 data reply (subservice 130)
is received. The user can return a tuple of two lists, where the first list
is a list of header strings to print and the second list is a list of values to print.
The TMTC core will take care of printing both lists and logging them.
@param object_id:
@param action_id:
@param custom_data:
@return:
"""
if object_id == PDU_2_HANDLER_ID:
reply = DataReplyUnpacked()
reply.header_list = ["PDU2 Service 8 Reply"]
data_string = str()
for index in range(len(custom_data)):
data_string += str(hex(custom_data[index])) + " , "
data_string = data_string.rstrip()
data_string = data_string.rstrip(",")
data_string = data_string.rstrip()
reply.content_list = [data_string]
return reply
elif object_id == IMTQ_HANDLER_ID:
return handle_imtq_replies(action_id, custom_data)
elif object_id == PLOC_MPSOC_ID:
return handle_ploc_replies(action_id, custom_data)
elif object_id == PLOC_SUPV_ID:
return handle_supervisor_replies(action_id, custom_data)
elif object_id == STAR_TRACKER_ID:
return handle_startracker_replies(action_id, custom_data)
return DataReplyUnpacked()
def handle_imtq_replies(action_id: int, custom_data: bytearray) -> DataReplyUnpacked:
reply = DataReplyUnpacked()
if action_id == struct.unpack("!I", ImtqActionIds.get_commanded_dipole)[0]:
reply.header_list = [
"Commanded X-Dipole",
"Commanded Y-Dipole",
"Commanded Z-Dipole",
]
x_dipole = struct.unpack("!H", custom_data[:2])
y_dipole = struct.unpack("!H", custom_data[2:4])
z_dipole = struct.unpack("!H", custom_data[4:6])
reply.content_list = [x_dipole[0], y_dipole[0], z_dipole[0]]
return reply
def handle_ploc_replies(action_id: int, custom_data: bytearray) -> DataReplyUnpacked:
reply = DataReplyUnpacked()
if action_id == PlocReplyIds.tm_mem_read_report:
reply.header_list = [
"PLOC Memory Address",
"PLOC Mem Len",
"PLOC Read Memory Data",
]
reply.content_list = [
"0x" + custom_data[:4].hex(),
struct.unpack("!H", custom_data[4:6])[0],
"0x" + custom_data[6:10].hex(),
]
return reply
def handle_supervisor_replies(
action_id: int, custom_data: bytearray
) -> DataReplyUnpacked:
reply = DataReplyUnpacked()
if action_id == SupvActionIds.DUMP_MRAM:
reply.header_list = ["MRAM Dump"]
reply.content_list = [custom_data[: len(custom_data)]]
return reply
def handle_startracker_replies(
action_id: int, custom_data: bytearray
) -> DataReplyUnpacked:
reply = DataReplyUnpacked()
if action_id == StarTrackerActionIds.CHECKSUM:
if len(custom_data) != 5:
LOGGER.warning(
"Star tracker reply has invalid length {0}".format(len(custom_data))
)
return reply
reply.header_list = ["Checksum", "Checksum valid"]
print(custom_data[4])
checksum_valid_flag = custom_data[4] >> 8
reply.content_list = ["0x" + custom_data[:4].hex(), checksum_valid_flag]
return reply

View File

@@ -0,0 +1,30 @@
import logging
from datetime import datetime
from typing import cast
from tmtccmd.pus.service_1_verification import Service1TMExtended
from tmtccmd.logging import get_console_logger
from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter
from config.retvals import get_retval_dict
LOGGER = get_console_logger()
def handle_service_1_packet(printer: FsfwTmTcPrinter, raw_tm: bytes):
tm_packet = Service1TMExtended.unpack(raw_telemetry=raw_tm)
printer.handle_long_tm_print(packet_if=tm_packet, info_if=tm_packet)
srv1_packet = cast(Service1TMExtended, tm_packet)
retval_dict = get_retval_dict()
if srv1_packet.has_tc_error_code:
retval_info = retval_dict.get(srv1_packet.error_code)
if retval_info is None:
LOGGER.info(
f"No returnvalue information found for error code {srv1_packet.error_code}"
)
else:
retval_string = (
f"Error Code information for code {srv1_packet.error_code}| "
f"Name: {retval_info.name} | Info: {retval_info.info}"
)
LOGGER.info(retval_string)
printer.file_logger.info(retval_string)

View File

@@ -1 +1 @@
tmtccmd>=1.13.0
tmtccmd>=2.0.1

View File

@@ -30,13 +30,16 @@ import sys
import traceback
try:
from tmtccmd.runner import (
initialize_tmtc_commander,
run_tmtc_commander,
add_ccsds_handler,
import tmtccmd.runner as tmtccmd
from tmtccmd.config import default_json_path, SetupArgs
from tmtccmd.config.args import (
create_default_args_parser,
add_default_tmtccmd_args,
parse_default_input_arguments,
)
from tmtccmd.ccsds.handler import CcsdsTmHandler
from tmtccmd.utility.logger import TMTC_LOGGER_NAME
from tmtccmd.ccsds.handler import CcsdsTmHandler, ApidHandler
from tmtccmd.logging import init_console_logger
from tmtccmd.logging.pus import create_tmtc_logger
except ImportError as error:
run_tmtc_commander = None
initialize_tmtc_commander = None
@@ -47,7 +50,6 @@ except ImportError as error:
try:
import spacepackets
from spacepackets.log import set_custom_console_logger_name
except ImportError as error:
print(error)
print("Python spacepackets module could not be imported")
@@ -59,23 +61,33 @@ except ImportError as error:
from config.hook_implementations import EiveHookObject
from config.version import __version__
from config.definitions import PUS_APID
from pus_tc.tc_packer_hook import pre_tc_send_cb
from pus_tm.factory_hook import ccsds_tm_handler
def main():
from pus_tm.event_handler import handle_event_packet
hook_obj = EiveHookObject()
print(f"-- eive tmtc version {__version__} --")
print(f"-- spacepackets version {spacepackets.__version__} --")
set_custom_console_logger_name(logger_name=TMTC_LOGGER_NAME)
initialize_tmtc_commander(hook_object=hook_obj)
ccsds_handler = CcsdsTmHandler()
ccsds_handler.add_tm_handler(
apid=PUS_APID, pus_tm_handler=ccsds_tm_handler, max_queue_len=50
tmtccmd.init_printout(False)
tmtc_file_logger = create_tmtc_logger()
hook_obj = EiveHookObject(json_cfg_path=default_json_path())
arg_parser = create_default_args_parser()
add_default_tmtccmd_args(arg_parser)
args = parse_default_input_arguments(arg_parser, hook_obj)
setup_args = SetupArgs(
hook_obj=hook_obj, use_gui=False, apid=PUS_APID, cli_args=args
)
add_ccsds_handler(ccsds_handler)
run_tmtc_commander(False)
apid_handler = ApidHandler(cb=ccsds_tm_handler, queue_len=50, user_args=None)
ccsds_handler = CcsdsTmHandler()
ccsds_handler.add_tm_handler(apid=PUS_APID, handler=apid_handler)
tmtccmd.setup(setup_args=setup_args)
tmtccmd.add_ccsds_handler(ccsds_handler)
tmtc_backend = tmtccmd.create_default_tmtc_backend(
setup_args=setup_args,
tm_handler=ccsds_handler,
)
tmtc_backend.usr_send_wrapper = (pre_tc_send_cb, tmtc_file_logger)
tmtccmd.run(tmtc_backend=tmtc_backend)
if __name__ == "__main__":

Submodule tmtccmd updated: 86cf0bf953...48b6b8396e

View File

@@ -35,8 +35,8 @@ from pus_tm.factory_hook import ccsds_tm_handler
try:
from tmtccmd.runner import (
initialize_tmtc_commander,
run_tmtc_commander,
init_tmtccmd,
run_tmtccmd,
add_ccsds_handler,
)
from tmtccmd.ccsds.handler import CcsdsTmHandler
@@ -56,13 +56,13 @@ def main():
hook_obj = EiveHookObject()
print(f"-- eive tmtc version {__version__}")
print(f"-- spacepackets version {spacepackets.__version__} --")
initialize_tmtc_commander(hook_object=hook_obj)
init_tmtccmd(hook_object=hook_obj)
ccsds_handler = CcsdsTmHandler()
ccsds_handler.add_tm_handler(
apid=PUS_APID, pus_tm_handler=ccsds_tm_handler, max_queue_len=50
)
add_ccsds_handler(ccsds_handler)
run_tmtc_commander(use_gui=True)
run_tmtccmd(use_gui=True)
if __name__ == "__main__":

View File

@@ -6,7 +6,7 @@
@date 13.02.2021
"""
from tmtccmd.utility.logger import get_console_logger
from tmtccmd.logging import get_console_logger
LOGGER = get_console_logger()