Compare commits

...

57 Commits

Author SHA1 Message Date
986f88e290 tmtccmd update 2021-04-25 12:03:49 +02:00
3b5d92f908 pushed new version number 2021-04-25 11:15:02 +02:00
684ae7a9c4 Merge branch 'mueller/master' into develop 2021-04-25 11:11:12 +02:00
7cc06ef0e0 minor change 2021-04-24 23:17:36 +02:00
3c343f751f updated tmtccmd submodule 2021-04-24 22:50:35 +02:00
2e84c822c8 bumped version numbr 2021-04-10 23:01:01 +02:00
e993d2d2ea Merge branch 'mueller/master' into develop 2021-04-10 23:00:46 +02:00
8703aeea55 updated tmtc 2021-04-10 22:59:51 +02:00
754edffeae include fixes 2021-03-23 14:24:04 +01:00
74c61842c8 minor fixes 2021-03-23 13:44:33 +01:00
19f5ba62de submodule update 2021-03-23 13:43:31 +01:00
10fa7658d0 Merge branch 'develop' of https://egit.irs.uni-stuttgart.de/eive/eive_tmtc into develop 2021-03-23 13:12:56 +01:00
52428e4e03 tmtccmd update 2021-03-23 13:12:41 +01:00
146cb52b66 merged new tmtc 2021-03-23 13:08:51 +01:00
5f1803b663 updated to new tmtccmd 2021-03-20 15:00:36 +01:00
4d737bfc12 some renaming 2021-03-19 18:03:41 +01:00
6e3bef7ac5 fixes 2021-03-19 18:01:17 +01:00
dc999188c0 some fixes (some pep related) 2021-03-19 17:50:09 +01:00
04920feea5 newline 2021-03-19 17:43:18 +01:00
e06bb91f08 minor corrections 2021-03-19 17:42:36 +01:00
27c1da4f77 moved EIVE to new tmtc commander 2021-03-19 17:39:52 +01:00
4d9df1ba87 moved submodule to github 2021-03-19 16:00:57 +01:00
81cd88f521 syrlinks hk handler test 2021-03-01 12:14:04 +01:00
cf572c311c fixed merge conflicts 2021-02-27 13:42:41 +01:00
d1899ef181 updated hk handling 2021-02-27 13:13:12 +01:00
07381147f7 syrlinks hk test wip 2021-02-27 13:09:55 +01:00
90c45ea0c7 adapted object IDs 2021-02-23 22:07:21 +01:00
1d5fe4ebc7 deployment test 2021-02-16 15:33:26 +01:00
2b85ece071 core update 2021-02-14 12:19:17 +01:00
55cf05918c Merge remote-tracking branch 'origin/develop' into mueller/master 2021-02-14 12:18:46 +01:00
110ec9644a removed unused class in heater test 2021-02-14 10:14:24 +01:00
56f3f9c3f1 Merge branch 'develop' into meier/heaterhandler 2021-02-14 10:09:13 +01:00
e19883d80f heater selection input 2021-02-14 09:37:28 +01:00
c10fbe7fbe Merge remote-tracking branch 'origin/develop' into mueller/master 2021-02-13 19:00:08 +01:00
4433fb68ac heater print adaption 2021-02-11 08:18:42 +01:00
a0280c880b updated version tag for first release 2021-02-09 12:53:44 +01:00
d9fc5563ce updated core develop 2021-02-09 12:46:30 +01:00
cf5ca54fbe service 8 reply interpretation 2021-02-09 12:35:10 +01:00
b4868b78e1 resolved merge conflicts 2021-02-06 16:41:06 +01:00
8a071954af heater wip 2021-02-06 16:35:53 +01:00
361eb39aa1 added init file for gomspace folder 2021-02-03 14:18:00 +01:00
a84d125b4b updated to new tmtc core 2021-02-03 14:14:54 +01:00
be536aca49 core update 2021-02-03 13:58:55 +01:00
39e8ff7134 tmp1075 temperature sensor 2021-01-30 11:20:34 +01:00
7e64595d1a updated core 2021-01-13 22:41:44 +01:00
6b840eff43 tmp1075 tests 2021-01-10 11:42:06 +01:00
eb9deae5ab core update 2020-12-30 23:38:44 +01:00
4b0d457937 tmtc core update 2020-12-30 23:38:20 +01:00
b1dc449ccf Merge branch 'mueller/master' of https://egit.irs.uni-stuttgart.de/eive/eive_tmtc into mueller/master 2020-12-29 12:31:47 +01:00
97075080de Merge pull request 'integrated pcu, acu and p60 dock tests' (#1) from meier/pcduGomspace into develop
Reviewed-on: eive/eive_tmtc#1
2020-12-29 12:28:20 +01:00
0c334f615f integrated pcu, acu and p60 dock tests 2020-12-29 11:29:03 +01:00
9be8713fa0 core update 2020-12-27 17:04:07 +01:00
e0c896e62d core update 2020-12-17 19:38:00 +01:00
34760becb9 Merge branch 'mueller/master' 2020-12-17 18:20:16 +01:00
75b01009f1 core update 2020-12-17 18:11:08 +01:00
264a4a791e updated globals 2020-12-17 18:09:00 +01:00
673a3e8ba4 core update 2020-12-17 18:03:08 +01:00
50 changed files with 1377 additions and 439 deletions

6
.gitmodules vendored
View File

@@ -1,3 +1,3 @@
[submodule "tmtc_core"]
path = tmtc_core
url = https://git.ksat-stuttgart.de/irs/eive/tmtc_core.git
[submodule "tmtccmd"]
path = tmtccmd
url = https://github.com/rmspacefish/tmtccmd.git

View File

@@ -1,5 +1,5 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="tmtcc Listener Mode UDP" type="PythonConfigurationType" factoryName="Python" folderName="UDP">
<configuration default="false" name="tmtcc Listener Mode UDP" type="PythonConfigurationType" factoryName="Python" folderName="PUS">
<module name="tmtc" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
@@ -13,7 +13,7 @@
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_client_cli.py" />
<option name="PARAMETERS" value="-m 1 -c 2 --hk" />
<option name="PARAMETERS" value="-m 2 -c 2 --hk" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" />
<option name="MODULE_MODE" value="false" />

View File

@@ -1,5 +1,5 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="tmtcc Service 17 UDP" type="PythonConfigurationType" factoryName="Python" folderName="UDP">
<configuration default="false" name="tmtcc Service 17 UDP" type="PythonConfigurationType" factoryName="Python" folderName="PUS">
<module name="tmtc" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
@@ -13,7 +13,7 @@
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_client_cli.py" />
<option name="PARAMETERS" value="-m 3 -c 2 -s 17 -t 3" />
<option name="PARAMETERS" value="-m seqcmd -c udp -s 17 -t 3" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" />
<option name="MODULE_MODE" value="false" />

View File

@@ -1,5 +1,5 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="tmtcc Service 200 UDP" type="PythonConfigurationType" factoryName="Python" folderName="UDP">
<configuration default="false" name="tmtcc Service 200 UDP" type="PythonConfigurationType" factoryName="Python" folderName="PUS">
<module name="tmtc" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
@@ -13,7 +13,7 @@
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_client_cli.py" />
<option name="PARAMETERS" value="-m 3 -c 2 -s 200 -t 4" />
<option name="PARAMETERS" value="-m seqcmd -c udp -s 200 -t 4" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" />
<option name="MODULE_MODE" value="false" />

View File

@@ -1,5 +1,5 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="tmtcc Service 2 UDP" type="PythonConfigurationType" factoryName="Python" folderName="UDP">
<configuration default="false" name="tmtcc Service 2 UDP" type="PythonConfigurationType" factoryName="Python" folderName="PUS">
<module name="tmtc" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
@@ -13,7 +13,7 @@
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_client_cli.py" />
<option name="PARAMETERS" value="-m 3 -c 2 -s 2 -t 4" />
<option name="PARAMETERS" value="-m seqcmd -c udp -s 2 -t 4" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" />
<option name="MODULE_MODE" value="false" />

View File

@@ -1,5 +1,5 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="tmtcc Service 3 UDP" type="PythonConfigurationType" factoryName="Python" folderName="UDP">
<configuration default="false" name="tmtcc Service 3 UDP" type="PythonConfigurationType" factoryName="Python" folderName="PUS">
<module name="tmtc" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
@@ -13,7 +13,7 @@
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_client_cli.py" />
<option name="PARAMETERS" value="-m 3 -c 2 -s 3 -p --hk" />
<option name="PARAMETERS" value="-m seqcmd -c udp -s 3 -p --hk" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" />
<option name="MODULE_MODE" value="false" />

View File

@@ -1,5 +1,5 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="tmtcc Service 8 UDP" type="PythonConfigurationType" factoryName="Python" folderName="UDP">
<configuration default="false" name="tmtcc Service 8 UDP" type="PythonConfigurationType" factoryName="Python" folderName="PUS">
<module name="tmtc" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
@@ -13,7 +13,7 @@
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_client_cli.py" />
<option name="PARAMETERS" value="-m 3 -c 2 -s 8 -t 8" />
<option name="PARAMETERS" value="-m seqcmd -c udp -s 8 -t 8" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" />
<option name="MODULE_MODE" value="false" />

View File

@@ -1,5 +1,5 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="tmtcc Single Command UDP" type="PythonConfigurationType" factoryName="Python" folderName="UDP">
<configuration default="false" name="tmtcc Single Command UDP" type="PythonConfigurationType" factoryName="Python" folderName="PUS">
<module name="tmtc" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
@@ -13,7 +13,7 @@
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_core/tmtcc_runner.py" />
<option name="PARAMETERS" value="-m 2 -c 2 --boardIP=127.0.0.1" />
<option name="PARAMETERS" value="-m listener -c udp" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" />
<option name="MODULE_MODE" value="false" />

View File

@@ -0,0 +1,24 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="tmtcclient ACU Test UDP" type="PythonConfigurationType" factoryName="Python" folderName="Devices">
<module name="tmtc" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_client_cli.py" />
<option name="PARAMETERS" value="-m seqcmd -c udp -s acu --boardIP 192.168.133.10 --clientIP 192.168.133.4 -l" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" />
<option name="MODULE_MODE" value="false" />
<option name="REDIRECT_INPUT" value="false" />
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
</component>

View File

@@ -0,0 +1,24 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="tmtcclient P60 Dock Test UDP" type="PythonConfigurationType" factoryName="Python" folderName="Devices">
<module name="tmtc" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_client_cli.py" />
<option name="PARAMETERS" value="-m seqcmd -c udp -s p60dock --boardIP 192.168.133.10 --clientIP 192.168.133.4 -l" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" />
<option name="MODULE_MODE" value="false" />
<option name="REDIRECT_INPUT" value="false" />
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
</component>

View File

@@ -0,0 +1,24 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="tmtcclient PDU1 Test UDP" type="PythonConfigurationType" factoryName="Python" folderName="Devices">
<module name="tmtc" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_client_cli.py" />
<option name="PARAMETERS" value="-m seqcmd -c udp -s pdu1 --boardIP 192.168.133.10 --clientIP 192.168.133.4 -l" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" />
<option name="MODULE_MODE" value="false" />
<option name="REDIRECT_INPUT" value="false" />
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
</component>

View File

@@ -0,0 +1,24 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="tmtcclient PDU2 Test UDP" type="PythonConfigurationType" factoryName="Python" folderName="Devices">
<module name="tmtc" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_client_cli.py" />
<option name="PARAMETERS" value="-m seqcmd -c udp -s pdu2 --boardIP 192.168.133.10 --clientIP 192.168.133.4 -l -t 4" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" />
<option name="MODULE_MODE" value="false" />
<option name="REDIRECT_INPUT" value="false" />
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
</component>

View File

@@ -3,7 +3,7 @@
@details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs.
"""
from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand
from tmtccmd.ecss.tc import PusTelecommand
def command_preparation_hook() -> PusTelecommand:

19
config/custom_mode_op.py Normal file
View File

@@ -0,0 +1,19 @@
"""
@brief This file transfers control of custom mode handling to the user.
@details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs.
"""
import enum
from tmtccmd.core.backend import TmTcHandler
from tmtccmd.utility.tmtcc_logger import get_logger
LOGGER = get_logger()
class CustomModeList(enum.IntEnum):
pass
def custom_mode_operation(tmtc_backend: TmTcHandler, mode: int):
pass

17
config/definitions.py Normal file
View File

@@ -0,0 +1,17 @@
"""
@brief This file transfers control of the custom definitions like modes to the user.
@details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs.
"""
import enum
class CustomServiceList(enum.Enum):
P60DOCK = "p60dock"
PDU1 = "pdu1"
PDU2 = "pdu2"
ACU = "acu"
TMP1075_1 = "tmp1075_1"
TMP1075_2 = "tmp1075_2"
HEATER = "heater"

55
config/globals_config.py Normal file
View File

@@ -0,0 +1,55 @@
"""
@brief This file transfers definitions of global variables to the user.
@details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs.
"""
import enum
import argparse
# All globals can be added here and will be part of a globals dictionary.
from config.definitions import CustomServiceList
from config.custom_mode_op import CustomModeList
from tmtccmd.core.definitions import CoreComInterfaces, CoreServiceList
from tmtccmd.defaults.globals_setup import set_default_globals_pre_args_parsing, \
set_default_globals_post_args_parsing, get_core_service_dict
from tmtccmd.core.globals_manager import update_global
from tmtccmd.core.definitions import CoreGlobalIds
from tmtccmd.utility.tmtcc_logger import get_logger
LOGGER = get_logger()
class CustomGlobalIds(enum.Enum):
from enum import auto
pass
def set_globals_pre_args_parsing(gui: bool = False):
set_default_globals_pre_args_parsing(gui=gui, apid=0x65, com_if_id=CoreComInterfaces.TCPIP_UDP)
servicelist = get_core_service_dict()
update_global(CoreGlobalIds.CURRENT_SERVICE, CoreServiceList.SERVICE_17)
update_global(CoreGlobalIds.SERVICE_DICT, servicelist)
def add_globals_post_args_parsing(args: argparse.Namespace):
set_default_globals_post_args_parsing(
args=args, custom_services_list=[CustomServiceList],
custom_modes_list=[CustomModeList],
custom_com_ifs_lists=None
)
def set_up_serial_cfg(com_if: CoreComInterfaces):
from tmtccmd.defaults.com_setup import default_serial_cfg_setup
default_serial_cfg_setup(com_if=com_if)
def set_up_ethernet_cfg():
from tmtccmd.defaults.com_setup import default_tcpip_udp_cfg_setup
default_tcpip_udp_cfg_setup()

View File

@@ -0,0 +1,73 @@
import argparse
from typing import Union, Dict, Tuple
from tmtccmd.pus_tm.service_3_base import Service3Base
from tmtccmd.ecss.tm import PusTelemetry
from tmtccmd.ecss.tc import PusTelecommand
from tmtccmd.pus_tc.definitions import TcQueueT
from tmtccmd.com_if.com_interface_base import CommunicationInterface
from tmtccmd.core.backend import TmTcHandler
from tmtccmd.core.hook_base import TmTcHookBase
from tmtccmd.utility.tmtc_printer import TmTcPrinter
class EiveHookObject(TmTcHookBase):
def get_version(self) -> str:
from config.version import SW_NAME, VERSION_MAJOR, VERSION_MINOR, VERSION_SUBMINOR
return f"{SW_NAME} {VERSION_MAJOR}.{VERSION_MINOR}.{VERSION_SUBMINOR}"
def add_globals_pre_args_parsing(self, gui: bool = False):
from config.globals_config import set_globals_pre_args_parsing
set_globals_pre_args_parsing(gui=gui)
def add_globals_post_args_parsing(self, args: argparse.Namespace):
from config.globals_config import add_globals_post_args_parsing
add_globals_post_args_parsing(args)
def assign_communication_interface(self, com_if: int, tmtc_printer: TmTcPrinter) -> \
Union[CommunicationInterface, None]:
from tmtccmd.defaults.com_setup import create_communication_interface_default
return create_communication_interface_default(com_if=com_if, tmtc_printer=tmtc_printer)
def perform_mode_operation(self, tmtc_backend: TmTcHandler, mode: int):
from config.custom_mode_op import custom_mode_operation
custom_mode_operation(mode=mode, tmtc_backend=tmtc_backend)
def pack_service_queue(self, service: int, op_code: str, service_queue: TcQueueT):
from pus_tc.tc_packer_hook import pack_service_queue_user
pack_service_queue_user(service=service, op_code=op_code, service_queue=service_queue)
def tm_user_factory_hook(self, raw_tm_packet: bytearray) -> PusTelemetry:
from pus_tm.factory_hook import tm_user_factory_hook
return tm_user_factory_hook(raw_tm_packet=raw_tm_packet)
def set_object_ids(self) -> Dict[int, bytearray]:
from config.object_ids import set_object_ids
return set_object_ids()
def pack_total_service_queue(self) -> Union[None, TcQueueT]:
from pus_tc.tc_packer_hook import create_total_tc_queue_user
return create_total_tc_queue_user()
def command_preparation_hook(self) -> Union[None, PusTelecommand]:
from custom_hooks import command_preparation_hook
return command_preparation_hook()
@staticmethod
def handle_service_8_telemetry(
object_id: int, action_id: int, custom_data: bytearray
) -> Tuple[list, list]:
from pus_tm.service_8_hook import user_analyze_service_8_data
return user_analyze_service_8_data(
object_id=object_id, action_id=action_id, custom_data=custom_data
)
@staticmethod
def handle_service_3_housekeeping(
object_id: int, set_id: int, hk_data: bytearray, service3_packet: Service3Base
) -> Tuple[list, list, bytearray, int]:
from pus_tm.hk_handling import handle_user_hk_packet
return handle_user_hk_packet(
object_id=object_id, set_id=set_id, hk_data=hk_data, service3_packet=service3_packet
)

51
config/object_ids.py Normal file
View File

@@ -0,0 +1,51 @@
"""
@brief This file transfers control of the object IDs to the user.
@details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs.
"""
import enum
from typing import Dict
PUS_SERVICE_17_ID = bytearray([0x53, 0x00, 0x00, 0x17])
TEST_DEVICE_ID = bytearray([0x44, 0x00, 0xAF, 0xFE])
P60_DOCK_HANDLER = bytearray([0x44, 0x00, 0x00, 0x1])
PDU_1_HANDLER_ID = bytearray([0x44, 0x00, 0x00, 0x2])
PDU_2_HANDLER_ID = bytearray([0x44, 0x00, 0x00, 0x3])
ACU_HANDLER_ID = bytearray([0x44, 0x00, 0x00, 0x4])
TMP_1075_1_HANDLER_ID = bytearray([0x44, 0x00, 0x00, 0x5])
TMP_1075_2_HANDLER_ID = bytearray([0x44, 0x00, 0x00, 0x6])
HEATER_ID = bytearray([0x54, 0x00, 0x00, 0x1])
PCDU_HANDLER_ID = bytearray([0x44, 0x00, 0x10, 0x00])
SOLAR_ARRAY_DEPLOYMENT_ID = bytearray([0x44, 0x00, 0x10, 0x01])
class ObjIdIds(enum.IntEnum):
PUS_SERVICE_17_ID = 0
TEST_DEVICE_ID = 1
P60DOCK_HANDLER_ID = 2
PDU1_HANDLER_ID = 3
PDU2_HANDLER_ID = 4
PCDU_HANDLER_ID = 5
ACU_HANDLER_ID = 6
TMP1075_1_HANDLER_ID = 7
TMP1075_2_HANDLER_ID = 8
HEATER_ID = 9
SOLAR_ARRAY_DEPLOYMENT_ID = 10
def set_object_ids() -> Dict[int, bytearray]:
object_id_dict = ({
ObjIdIds.PUS_SERVICE_17_ID: PUS_SERVICE_17_ID,
ObjIdIds.TEST_DEVICE_ID: TEST_DEVICE_ID,
ObjIdIds.P60DOCK_HANDLER_ID: P60_DOCK_HANDLER,
ObjIdIds.PDU1_HANDLER_ID: PDU_1_HANDLER_ID,
ObjIdIds.PDU2_HANDLER_ID: PDU_2_HANDLER_ID,
ObjIdIds.ACU_HANDLER_ID: ACU_HANDLER_ID,
ObjIdIds.TMP1075_1_HANDLER_ID: TMP_1075_1_HANDLER_ID,
ObjIdIds.TMP1075_2_HANDLER_ID: TMP_1075_2_HANDLER_ID,
ObjIdIds.HEATER_ID: HEATER_ID,
ObjIdIds.PCDU_HANDLER_ID: PCDU_HANDLER_ID,
ObjIdIds.SOLAR_ARRAY_DEPLOYMENT_ID: SOLAR_ARRAY_DEPLOYMENT_ID,
})
return object_id_dict

View File

@@ -1,18 +0,0 @@
"""
@brief This file transfers control of communication interface setup to the user.
@details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs.
"""
from typing import Union
from tmtc_core.com_if.tmtcc_com_interface_base import CommunicationInterface
from tmtc_core.core.tmtc_core_definitions import ComInterfaces
from tmtc_core.core.tmtcc_com_if_setup import create_communication_interface_default
from tmtc_core.utility.tmtcc_tmtc_printer import TmTcPrinter
def create_communication_interface_user(com_if: ComInterfaces, tmtc_printer: TmTcPrinter) -> \
Union[CommunicationInterface, None]:
return create_communication_interface_default(com_if, tmtc_printer)

View File

@@ -1,42 +0,0 @@
"""
@brief This file transfers control of the custom definitions like modes to the user.
@details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs.
"""
import enum
from enum import auto
# Mode options, set by args parser
class ModeList(enum.Enum):
Idle = 0
ListenerMode = 1
SingleCommandMode = 2
ServiceTestMode = 3
SoftwareTestMode = 4
PromptMode = 32
class ServiceList(enum.Enum):
SERVICE_2 = 0
SERVICE_3 = auto()
SERVICE_5 = auto()
SERVICE_8 = auto()
SERVICE_9 = auto()
SERVICE_17 = auto()
SERVICE_20 = auto()
SERVICE_23 = auto()
SERVICE_200 = auto()
class SerialConfig(enum.Enum):
SERIAL_PORT = auto()
SERIAL_BAUD_RATE = auto()
SERIAL_TIMEOUT = auto()
SERIAL_COMM_TYPE = auto()
class EthernetConfig(enum.Enum):
SEND_ADDRESS = auto()
RECV_ADDRESS = auto()

View File

@@ -1,223 +0,0 @@
"""
@brief This file transfers definitions of global variables to the user.
@details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs.
"""
import enum
import argparse
# All globals can be added here and will be part of a globals dictionary.
from config.tmtcc_definitions import ServiceList, ModeList
from tmtc_core.com_if.tmtcc_serial_com_if import SerialCommunicationType
from tmtc_core.com_if.tmtcc_serial_utilities import determine_com_port
from tmtc_core.core.tmtc_core_definitions import ComInterfaces
from tmtc_core.utility.tmtcc_logger import get_logger
class GlobalIds(enum.Enum):
from enum import auto
# Generic parameters
APID = auto()
MODE = auto()
SERVICE = auto()
SERVICELIST = auto()
COM_IF = auto()
OP_CODE = auto()
TM_TIMEOUT = auto()
# Miscellaneous
DISPLAY_MODE = auto()
USE_LISTENER_AFTER_OP = auto()
PRINT_HK = auto()
PRINT_TM = auto()
PRINT_RAW_TM = auto()
PRINT_TO_FILE = auto()
RESEND_TC = auto()
TC_SEND_TIMEOUT_FACTOR = auto()
# Config dictionaries
USE_SERIAL = auto()
SERIAL_CONFIG = auto()
USE_ETHERNET = auto()
ETHERNET_CONFIG = auto()
# Object handles
PRETTY_PRINTER = auto()
TM_LISTENER_HANDLE = auto()
COM_INTERFACE_HANDLE = auto()
TMTC_PRINTER_HANDLE = auto()
def add_globals_pre_args_parsing(gui: bool = False):
from tmtc_core.core.tmtcc_globals_manager import update_global
import pprint
update_global(GlobalIds.APID, 0xef)
update_global(GlobalIds.COM_IF, ComInterfaces.EthernetUDP)
update_global(GlobalIds.TC_SEND_TIMEOUT_FACTOR, 2)
update_global(GlobalIds.TM_TIMEOUT, 4)
update_global(GlobalIds.DISPLAY_MODE, "long")
update_global(GlobalIds.PRINT_TO_FILE, True)
update_global(GlobalIds.SERIAL_CONFIG, dict())
update_global(GlobalIds.ETHERNET_CONFIG, dict())
pp = pprint.PrettyPrinter()
update_global(GlobalIds.PRETTY_PRINTER, pp)
update_global(GlobalIds.TM_LISTENER_HANDLE, None)
update_global(GlobalIds.COM_INTERFACE_HANDLE, None)
update_global(GlobalIds.TMTC_PRINTER_HANDLE, None)
update_global(GlobalIds.PRINT_RAW_TM, False)
update_global(GlobalIds.RESEND_TC, False)
update_global(GlobalIds.OP_CODE, "0")
update_global(GlobalIds.MODE, ModeList.ListenerMode)
if gui:
set_up_ethernet_cfg()
servicelist = dict()
servicelist[ServiceList.SERVICE_2] = ["Service 2 Raw Commanding"]
servicelist[ServiceList.SERVICE_3] = ["Service 3 Housekeeping"]
servicelist[ServiceList.SERVICE_5] = ["Service 5 Event"]
servicelist[ServiceList.SERVICE_8] = ["Service 8 Functional Commanding"]
servicelist[ServiceList.SERVICE_9] = ["Service 9 Time"]
servicelist[ServiceList.SERVICE_17] = ["Service 17 Test"]
servicelist[ServiceList.SERVICE_20] = ["Service 20 Parameters"]
servicelist[ServiceList.SERVICE_23] = ["Service 23 File Management"]
servicelist[ServiceList.SERVICE_200] = ["Service 200 Mode Management"]
update_global(GlobalIds.SERVICE, ServiceList.SERVICE_17)
update_global(GlobalIds.SERVICELIST, servicelist)
def add_globals_post_args_parsing(args: argparse.Namespace):
from tmtc_core.core.tmtcc_globals_manager import update_global
from config.tmtcc_definitions import ModeList
logger = get_logger()
mode_param = ModeList.ListenerMode
if 0 <= args.mode <= 6:
if args.mode == 0:
mode_param = ModeList.GUIMode
elif args.mode == 1:
mode_param = ModeList.ListenerMode
elif args.mode == 2:
mode_param = ModeList.SingleCommandMode
elif args.mode == 3:
mode_param = ModeList.ServiceTestMode
elif args.mode == 4:
mode_param = ModeList.SoftwareTestMode
update_global(GlobalIds.MODE, mode_param)
if args.com_if == ComInterfaces.EthernetUDP.value:
com_if = ComInterfaces.EthernetUDP
elif args.com_if == ComInterfaces.Serial.value:
com_if = ComInterfaces.Serial
elif args.com_if == ComInterfaces.Dummy.value:
com_if = ComInterfaces.Dummy
elif args.com_if == ComInterfaces.QEMU.value:
com_if = ComInterfaces.QEMU
else:
com_if = ComInterfaces.Serial
update_global(GlobalIds.COM_IF, com_if)
if args.short_display_mode:
display_mode_param = "short"
else:
display_mode_param = "long"
update_global(GlobalIds.DISPLAY_MODE, display_mode_param)
service = str(args.service).lower()
if service == "2":
service = ServiceList.SERVICE_2
elif service == "3":
service = ServiceList.SERVICE_3
elif service == "5":
service = ServiceList.SERVICE_5
elif service == "8":
service = ServiceList.SERVICE_8
elif service == "9":
service = ServiceList.SERVICE_9
elif service == "17":
service = ServiceList.SERVICE_17
elif service == "20":
service = ServiceList.SERVICE_20
elif service == "23":
service = ServiceList.SERVICE_23
else:
logger.warning("Service not known! Setting standard service 17")
service = ServiceList.SERVICE_17
update_global(GlobalIds.SERVICE, service)
if args.op_code is None:
op_code = 0
else:
op_code = str(args.op_code).lower()
update_global(GlobalIds.OP_CODE, op_code)
update_global(GlobalIds.USE_LISTENER_AFTER_OP, args.listener)
update_global(GlobalIds.TM_TIMEOUT, args.tm_timeout)
update_global(GlobalIds.PRINT_HK, args.print_hk)
update_global(GlobalIds.PRINT_TM, args.print_tm)
update_global(GlobalIds.PRINT_RAW_TM, args.raw_data_print)
update_global(GlobalIds.PRINT_TO_FILE, args.print_log)
update_global(GlobalIds.RESEND_TC, args.resend_tc)
update_global(GlobalIds.TC_SEND_TIMEOUT_FACTOR, 3)
use_serial_cfg = False
if com_if == ComInterfaces.Serial or com_if == ComInterfaces.QEMU:
use_serial_cfg = True
if use_serial_cfg:
set_up_serial_cfg(com_if)
use_ethernet_cfg = False
if com_if == ComInterfaces.EthernetUDP:
use_ethernet_cfg = True
if use_ethernet_cfg:
# TODO: Port and IP address can also be passed as CLI parameters. Use them here if applicable
set_up_ethernet_cfg()
def set_up_serial_cfg(com_if: ComInterfaces):
from tmtc_core.core.tmtcc_globals_manager import update_global
update_global(GlobalIds.USE_SERIAL, True)
from tmtc_core.core.tmtcc_globals_manager import get_global
from config.tmtcc_definitions import SerialConfig
serial_cfg_dict = get_global(GlobalIds.SERIAL_CONFIG)
if com_if == ComInterfaces.Serial:
com_port = determine_com_port()
else:
com_port = ""
serial_cfg_dict.update({SerialConfig.SERIAL_PORT: com_port})
serial_cfg_dict.update({SerialConfig.SERIAL_BAUD_RATE: 115200})
serial_cfg_dict.update({SerialConfig.SERIAL_TIMEOUT: 0.01})
serial_cfg_dict.update({SerialConfig.SERIAL_COMM_TYPE: SerialCommunicationType.DLE_ENCODING})
serial_cfg_dict.update({SerialConfig.SERIAL_FRAME_SIZE: 256})
serial_cfg_dict.update({SerialConfig.SERIAL_DLE_QUEUE_LEN: 25})
serial_cfg_dict.update({SerialConfig.SERIAL_DLE_MAX_FRAME_SIZE: 1024})
update_global(GlobalIds.SERIAL_CONFIG, serial_cfg_dict)
def set_up_ethernet_cfg():
from tmtc_core.core.tmtcc_globals_manager import update_global
update_global(GlobalIds.USE_ETHERNET, True)
from tmtc_core.core.tmtcc_globals_manager import get_global
from config.tmtcc_definitions import EthernetConfig
ethernet_cfg_dict = get_global(GlobalIds.ETHERNET_CONFIG)
# Local host and unused port as default config
default_send_ip = "127.0.0.1"
default_send_port = 7301
send_address = (default_send_ip, default_send_port)
ethernet_cfg_dict.update({EthernetConfig.SEND_ADDRESS: send_address})
# Bind to all interfaces (might be insecure!)
default_rcv_ip = ''
default_rcv_port = 7302
recv_address = (default_rcv_ip, default_rcv_port)
ethernet_cfg_dict.update({EthernetConfig.RECV_ADDRESS: recv_address})
update_global(GlobalIds.ETHERNET_CONFIG, ethernet_cfg_dict)

View File

@@ -1,23 +0,0 @@
"""
@brief This file transfers control of the object IDs to the user.
@details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs.
"""
import enum
from typing import Dict
class ObjectIds(enum.Enum):
from enum import auto
PUS_SERVICE_17 = auto()
TEST_DEVICE = auto()
def set_object_ids(object_id_dict: Dict[ObjectIds, bytearray]):
o_ids = ObjectIds
object_id_dict.update(
{o_ids.PUS_SERVICE_17: bytearray([0x53, 0x00, 0x00, 0x17]),
o_ids.TEST_DEVICE: bytearray([0x44, 0x00, 0xAF, 0xFE]),
}
)

View File

@@ -1,22 +0,0 @@
"""
@brief This file transfers control of custom mode handling to the user.
@details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs.
"""
import sys
from config.tmtcc_definitions import ModeList
from core.tmtc_backend import TmTcHandler
from test.obsw_pus_service_test import run_selected_pus_tests
from tmtc_core.utility.tmtcc_logger import get_logger
from utility.tmtcc_binary_uploader import BinaryFileUploader
LOGGER = get_logger()
def perform_mode_operation_user(tmtc_backend: TmTcHandler, mode: ModeList):
"""
Custom modes can be implemented here
"""
LOGGER.error(f"Unknown mode {mode}, Configuration error !")
sys.exit()

View File

@@ -1,10 +0,0 @@
"""
@brief This file transfers control of versioning to the user.
@details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs.
"""
import tmtc_core.core.tmtcc_version as core
SW_NAME = core.SW_NAME
SW_VERSION = core.SW_VERSION
SW_SUBVERSION = core.SW_SUBVERSION

4
config/version.py Normal file
View File

@@ -0,0 +1,4 @@
SW_NAME = "eive"
VERSION_MAJOR = 1
VERSION_MINOR = 4
VERSION_SUBMINOR = 0

0
gomspace/__init__.py Normal file
View File

132
gomspace/gomspace_common.py Normal file
View File

@@ -0,0 +1,132 @@
# -*- coding: utf-8 -*-
"""
@file gomspace_common.py
@brief PDU2 tests
@details All functions and classes common for all gomspace devices are defined in this file.
@author J. Meier
@date 17.12.2020
"""
class GomspaceDeviceActions:
PING = bytearray([0x0, 0x0, 0x0, 0x1])
REBOOT = bytearray([0x0, 0x0, 0x0, 0x4])
PARAM_GET = bytearray([0x0, 0x0, 0x0, 0x00])
PARAM_SET = bytearray([0x0, 0x0, 0x0, 0xFF])
WDT_RESET = bytearray([0x0, 0x0, 0x0, 0x9])
REQUEST_HK_TABLE = bytearray([0x0, 0x0, 0x0, 0x10])
class TableIds:
config = 1
hk = 4
class TableEntry:
uint8_size = 1
uint16_size = 2
uint32_size = 4
def __init__(self, parameter_address: bytearray, parameter_size):
self.parameter_address = parameter_address
self.parameter_size = parameter_size
class Channel:
on = 1
off = 0
def pack_get_param_command(object_id: bytearray, table_id: int, memory_address: bytearray,
parameter_size: int) -> bytearray:
""" Function to generate a command to retrieve parameters like the temperature from a gomspace device.
@param object_id: The object id of the gomspace device handler.
@param table_id: The table id of the gomspace device
@param memory_address: Address offset within table of the value to read.
@param parameter_size: Size of the value to read. E.g. temperature is uint16_t and thus parameter_size is 2
@return: The command as bytearray.
"""
action_id = GomspaceDeviceActions.PARAM_GET
command = bytearray()
command = command + object_id + action_id
command.append(table_id)
command = command + memory_address
command.append(parameter_size)
return command
def pack_set_param_command(object_id: bytearray, memory_address: bytearray, parameter_size: int,
parameter: int) -> bytearray:
""" Function to generate a command to set a parameter
@param object_id: The object id of the gomspace device handler.
@param memory_address: Address offset within table of the value to set.
@param parameter: The parameter value to set.
@param parameter_size: Size of the value to set. There are uint8_t, uint16_t and uint32_t in the device tables.
@return: The command as bytearray.
"""
action_id = GomspaceDeviceActions.PARAM_SET
command = bytearray()
command = command + object_id + action_id
command = command + memory_address
command.append(parameter_size)
if parameter_size == 1:
command.append(parameter)
elif parameter_size == 2:
byte_one = 0xFF00 & parameter >> 8
byte_two = 0xFF & parameter
command.append(byte_one)
command.append(byte_two)
elif parameter_size == 4:
byte_one = 0xFF000000 & parameter >> 24
byte_two = 0xFF0000 & parameter >> 16
byte_three = 0xFF00 & parameter >> 8
byte_four = 0xFF & parameter
command.append(byte_one)
command.append(byte_two)
command.append(byte_three)
command.append(byte_four)
return command
def pack_ping_command(object_id: bytearray, data: bytearray) -> bytearray:
"""" Function to generate the command to ping a gomspace device
@param object_id Object Id of the gomspace device handler.
@param data Bytearray containing the bytes to send to the gomspace device. For now the on board software
supports only the handling of up to 33 bytes.
@note The ping request sends the specified data to a gompsace device. These
data are simply copied by the device and then sent back.
"""
action_id = GomspaceDeviceActions.PING
command = object_id + action_id + data
return command
def pack_gnd_wdt_reset_command(object_id: bytearray) -> bytearray:
"""" Function to generate the command to reset the watchdog of a gomspace device.
@param object_id Object Id of the gomspace device handler.
"""
action_id = GomspaceDeviceActions.WDT_RESET
command = bytearray()
command += object_id + action_id
return command
def pack_reboot_command(object_id: bytearray) -> bytearray:
""" Function to generate the command which triggers a reboot of a gomspace device
@param object_id The object id of the gomspace device handler.
"""
action_id = GomspaceDeviceActions.REBOOT
command = bytearray()
command += object_id + action_id
return command
def pack_request_full_hk_table_command(object_id: bytearray) -> bytearray:
""" Function to generate the command to request the full housekeeping table from a gomspace
device.
@param object_id The object id of the gomspace device handler.
"""
action_id = GomspaceDeviceActions.REQUEST_HK_TABLE
command = bytearray()
command = object_id + action_id
return command

View File

@@ -0,0 +1,16 @@
from gomspace.gomspace_common import TableEntry
class PDUConfigTable:
out_en_0 = TableEntry(bytearray([0x00, 0x48]), TableEntry.uint8_size)
out_en_1 = TableEntry(bytearray([0x00, 0x49]), TableEntry.uint8_size)
out_en_2 = TableEntry(bytearray([0x00, 0x4A]), TableEntry.uint8_size)
out_en_3 = TableEntry(bytearray([0x00, 0x4B]), TableEntry.uint8_size)
# When channel consumes more than cur_lu_lim, channel is turned of immediately
cur_lu_lim_0 = TableEntry(bytearray([0x00, 0xB8]), TableEntry.uint16_size)
class PDUHkTable:
temperature = TableEntry(bytearray([0x00, 0x28]), TableEntry.uint16_size)
# Ground WDT value (remaining seconds until reboot)
wdt_gnd_left = TableEntry(bytearray([0x00, 0x80]), TableEntry.uint32_size)

125
pus_tc/acu.py Normal file
View File

@@ -0,0 +1,125 @@
# -*- coding: utf-8 -*-
"""
@file tmtcc_tc_acu.py
@brief ACU tests
@author J. Meier
@date 21.12.2020
"""
from tmtccmd.pus_tc.packer import TcQueueT
from tmtccmd.ecss.tc import PusTelecommand
from tmtccmd.core.definitions import QueueCommands
from gomspace.gomspace_common import *
from pus_tc.p60dock import P60DockConfigTable
from config.object_ids import ObjIdIds
from tmtccmd.core.object_id_manager import get_object_id
class ACUTestProcedure:
"""
@brief Use this class to define the tests to perform for the ACU.
@details Setting all to True will run all tests.
Setting all to False will only run the tests set to True.
"""
all = False
reboot = False
read_gnd_wdt = False
gnd_wdt_reset = False
ping = False
read_temperature1 = False
read_temperature2 = False
read_temperature3 = True
read_mppt_mode = True
read_vboost = True
read_vbat_max_hi = True
read_vbat_max_lo = True
read_ov_mode = True
class ACUConfigTable:
mppt_mode = TableEntry(bytearray([0x00, 0x00]), TableEntry.uint8_size)
mppt_d_mode = TableEntry(bytearray([0x00, 0x01]), TableEntry.uint8_size)
vboost = TableEntry(bytearray([0x00, 0x02]), TableEntry.uint16_size)
vbat_max_hi = TableEntry(bytearray([0x00, 0x10]), TableEntry.uint16_size)
vbat_max_lo = TableEntry(bytearray([0x00, 0x12]), TableEntry.uint16_size)
ov_mode = TableEntry(bytearray([0x00, 0x1A]), TableEntry.uint8_size)
class ACUHkTable:
temperature1 = TableEntry(bytearray([0x00, 0x1C]), TableEntry.uint16_size)
temperature2 = TableEntry(bytearray([0x00, 0x1D]), TableEntry.uint16_size)
temperature3 = TableEntry(bytearray([0x00, 0x1E]), TableEntry.uint16_size)
# Ground WDT value (remaining seconds until reboot)
wdt_gnd_left = TableEntry(bytearray([0x00, 0x74]), TableEntry.uint32_size)
def pack_acu_test_into(object_id: bytearray, tc_queue: TcQueueT) -> TcQueueT:
tc_queue.appendleft((QueueCommands.PRINT, "Testing ACU"))
tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Enabling ACU connected to X1 slot (channel 0)"))
p60dock_object_id = get_object_id(ObjIdIds.P60DOCK_HANDLER_ID)
command = pack_set_param_command(p60dock_object_id, P60DockConfigTable.out_en_0.parameter_address,
P60DockConfigTable.out_en_0.parameter_size, Channel.on)
command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if ACUTestProcedure.all or ACUTestProcedure.reboot:
tc_queue.appendleft((QueueCommands.PRINT, "ACU: Reboot"))
command = pack_reboot_command(object_id)
command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if ACUTestProcedure.all or ACUTestProcedure.read_gnd_wdt:
tc_queue.appendleft((QueueCommands.PRINT, "ACU: Reading ground watchdog timer value"))
command = pack_get_param_command(object_id, TableIds.hk, ACUHkTable.wdt_gnd_left.parameter_address,
ACUHkTable.wdt_gnd_left.parameter_size)
command = PusTelecommand(service=8, subservice=128, ssc=20, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if ACUTestProcedure.all or ACUTestProcedure.gnd_wdt_reset:
tc_queue.appendleft((QueueCommands.PRINT, "ACU: Testing ground watchdog reset"))
command = pack_gnd_wdt_reset_command(object_id)
command = PusTelecommand(service=8, subservice=128, ssc=21, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if ACUTestProcedure.all or ACUTestProcedure.ping:
tc_queue.appendleft((QueueCommands.PRINT, "ACU: Ping Test"))
ping_data = bytearray([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
command = pack_ping_command(object_id, ping_data)
command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if ACUTestProcedure.all or ACUTestProcedure.read_temperature3:
tc_queue.appendleft((QueueCommands.PRINT, "ACU: Reading temperature 3"))
command = pack_get_param_command(object_id, TableIds.hk, ACUHkTable.temperature3.parameter_address,
ACUHkTable.temperature3.parameter_size)
command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if ACUTestProcedure.all or ACUTestProcedure.read_vboost:
tc_queue.appendleft((QueueCommands.PRINT, "ACU: Reading vboost value"))
command = pack_get_param_command(object_id, TableIds.config, ACUConfigTable.vboost.parameter_address,
ACUConfigTable.vboost.parameter_size)
command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if ACUTestProcedure.all or ACUTestProcedure.read_vbat_max_hi:
tc_queue.appendleft((QueueCommands.PRINT, "ACU: Reading vbat_max_hi"))
command = pack_get_param_command(object_id, TableIds.config, ACUConfigTable.vbat_max_hi.parameter_address,
ACUConfigTable.vbat_max_hi.parameter_size)
command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if ACUTestProcedure.all or ACUTestProcedure.read_vbat_max_lo:
tc_queue.appendleft((QueueCommands.PRINT, "ACU: Reading vbat_max_lo"))
command = pack_get_param_command(object_id, TableIds.config, ACUConfigTable.vbat_max_lo.parameter_address,
ACUConfigTable.vbat_max_lo.parameter_size)
command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if ACUTestProcedure.all or ACUTestProcedure.read_ov_mode:
tc_queue.appendleft((QueueCommands.PRINT, "ACU: Reading ov_mode"))
command = pack_get_param_command(object_id, TableIds.config, ACUConfigTable.ov_mode.parameter_address,
ACUConfigTable.ov_mode.parameter_size)
command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Turning off ACU"))
command = pack_set_param_command(p60dock_object_id, P60DockConfigTable.out_en_0.parameter_address,
P60DockConfigTable.out_en_0.parameter_size, Channel.off)
command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
return tc_queue

56
pus_tc/heater.py Normal file
View File

@@ -0,0 +1,56 @@
# -*- coding: utf-8 -*-
"""
@file tmtcc_tc_heater.py
@brief Command sequence to test the HeaterHandler
@author J. Meier
@date 30.01.2021
"""
from tmtccmd.core.definitions import QueueCommands
from tmtccmd.pus_tc.packer import TcQueueT
from tmtccmd.ecss.tc import PusTelecommand
class SwitchNumbers:
HEATER_0 = 0
HEATER_1 = 1
HEATER_2 = 2
HEATER_3 = 3
HEATER_4 = 4
HEATER_5 = 5
HEATER_6 = 6
HEATER_7 = 7
NUMBER_OF_SWITCHES = 8
class ActionIds:
SWITCH_HEATER = bytearray([0x0, 0x0, 0x0, 0x0])
def pack_heater_test_into(object_id: bytearray, tc_queue: TcQueueT):
tc_queue.appendleft((QueueCommands.PRINT, "Testing Heater Switching"))
heater_number = int(input("Type number of heater to switch: "))
if heater_number >= SwitchNumbers.NUMBER_OF_SWITCHES:
print("Invalid heater switch number")
return
action = int(input("Turn switch on or off? (0 - off, 1 - on): "))
if action != 0 and action != 1:
print("Invalid action defined. Must be 0 (off) or 1 (on")
debug_string = "Switching heater " + str(heater_number)
tc_queue.appendleft((QueueCommands.PRINT, debug_string))
command = pack_switch_heater_command(object_id, heater_number, action)
command = PusTelecommand(service=8, subservice=128, ssc=300, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
def pack_switch_heater_command(object_id: bytearray, switch_nr: int, switch_action: int) -> bytearray:
""" Function to generate the command switch a heater
@param object_id The object id of the HeaterHandler object.
@param switch_nr The switch number identifying the heater to switch
@param switch_action Action to perform. 0 - Sets switch off, 1 - Sets switch on.
"""
action_id = ActionIds.SWITCH_HEATER
command = object_id + action_id
command.append(switch_nr)
command.append(switch_action)
return command

161
pus_tc/p60dock.py Normal file
View File

@@ -0,0 +1,161 @@
# -*- coding: utf-8 -*-
"""
@file tmtcc_tc_p60dock.py
@brief P60 Dock tests
@author J. Meier
@date 13.12.2020
"""
from tmtccmd.core.definitions import QueueCommands
from tmtccmd.pus_tc.packer import TcQueueT
from tmtccmd.ecss.tc import PusTelecommand
from gomspace.gomspace_common import *
class P60DockTestProcedure:
"""
@brief Use this class to define the tests to perform for the P60Dock.
@details Setting all to True will run all tests.
Setting all to False will only run the tests set to True.
"""
all = False
reboot = False
read_gnd_wdt = False
gnd_wdt_reset = False
ping = False
channel_3_off = False # pdu2
read_temperature1 = True
read_channel_3_state = False # pdu2
read_cur_lu_lim_0 = False
channel_3_on = False # pdu2
invalid_table_id_test = False # Test to check if software properly handles invalid table ids
invalid_address_test = False # Test to check if software properly handles invalid addresses
invalid_parameter_size_test = False
class P60DockConfigTable:
out_en_0 = TableEntry(bytearray([0x00, 0x68]), TableEntry.uint8_size) # ACU
out_en_1 = TableEntry(bytearray([0x00, 0x69]), TableEntry.uint8_size) # PDU1
out_en_2 = TableEntry(bytearray([0x00, 0x6A]), TableEntry.uint8_size)
out_en_3 = TableEntry(bytearray([0x00, 0x6B]), TableEntry.uint8_size) # PDU2
# When channel consumes more than cur_lu_lim, channel is turned of immediately
cur_lu_lim_0 = TableEntry(bytearray([0x00, 0xF8]), TableEntry.uint16_size)
class P60DockHkTable:
temperature1 = TableEntry(bytearray([0x00, 0x44]), TableEntry.uint16_size)
temperature2 = TableEntry(bytearray([0x00, 0x46]), TableEntry.uint16_size)
# Ground WDT value (remaining seconds until reboot)
wdt_gnd_left = TableEntry(bytearray([0x00, 0xA8]), TableEntry.uint32_size)
def pack_p60dock_test_into(object_id: bytearray, tc_queue: TcQueueT) -> TcQueueT:
if P60DockTestProcedure.all or P60DockTestProcedure.reboot:
tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Reboot"))
command = pack_reboot_command(object_id)
command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if P60DockTestProcedure.all or P60DockTestProcedure.read_gnd_wdt:
tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Reading ground watchdog timer value"))
command = pack_get_param_command(
object_id, TableIds.hk, P60DockHkTable.wdt_gnd_left.parameter_address,
P60DockHkTable.wdt_gnd_left.parameter_size
)
command = PusTelecommand(service=8, subservice=128, ssc=20, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if P60DockTestProcedure.all or P60DockTestProcedure.gnd_wdt_reset:
tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Testing ground watchdog reset"))
command = pack_gnd_wdt_reset_command(object_id)
command = PusTelecommand(service=8, subservice=128, ssc=21, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if P60DockTestProcedure.all or P60DockTestProcedure.ping:
tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Ping"))
ping_data = bytearray([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
command = pack_ping_command(object_id, ping_data)
command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if P60DockTestProcedure.all or P60DockTestProcedure.channel_3_off:
tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Testing setting output channel 3 off"))
parameter = 0 # set channel off
command = pack_set_param_command(object_id, P60DockConfigTable.out_en_3.parameter_address,
P60DockConfigTable.out_en_3.parameter_size, parameter)
command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if P60DockTestProcedure.all or P60DockTestProcedure.read_temperature1:
tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Testing temperature reading"))
command = pack_get_param_command(
object_id, TableIds.hk, P60DockHkTable.temperature1.parameter_address,
P60DockHkTable.temperature1.parameter_size
)
command = PusTelecommand(service=8, subservice=128, ssc=24, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if P60DockTestProcedure.all or P60DockTestProcedure.channel_3_on:
tc_queue.appendleft((QueueCommands.PRINT,
"P60 Dock: Testing Output Channel 3 state (PDU2)"))
command = pack_get_param_command(object_id, TableIds.config, P60DockConfigTable.out_en_3.parameter_address,
P60DockConfigTable.out_en_3.parameter_size)
command = PusTelecommand(service=8, subservice=128, ssc=25, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if P60DockTestProcedure.all or P60DockTestProcedure.read_cur_lu_lim_0:
tc_queue.appendleft((QueueCommands.PRINT,
"P60 Dock: Reading current limit value of output channel 0"))
command = pack_get_param_command(
object_id, TableIds.config, P60DockConfigTable.cur_lu_lim_0.parameter_address,
P60DockConfigTable.cur_lu_lim_0.parameter_size
)
command = PusTelecommand(service=8, subservice=128, ssc=26, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if P60DockTestProcedure.all or P60DockTestProcedure.channel_3_on:
tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Testing setting output channel 3 on"))
parameter = 1 # set channel on
command = pack_set_param_command(object_id, P60DockConfigTable.out_en_3.parameter_address,
P60DockConfigTable.out_en_3.parameter_size, parameter)
command = PusTelecommand(service=8, subservice=128, ssc=27, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if P60DockTestProcedure.all or P60DockTestProcedure.invalid_table_id_test:
tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Testing invalid table id handling"))
table_id_invalid = 5
command = pack_get_param_command(
object_id, table_id_invalid, P60DockHkTable.temperature1.parameter_address,
P60DockHkTable.temperature1.parameter_size
)
command = PusTelecommand(service=8, subservice=128, ssc=28, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if P60DockTestProcedure.all or P60DockTestProcedure.invalid_address_test:
tc_queue.appendleft((QueueCommands.PRINT,
"P60 Dock: Testing invalid address handling in get param command"))
invalid_address = bytearray([0x01, 0xF4])
command = pack_get_param_command(object_id, TableIds.hk, invalid_address,
P60DockHkTable.temperature1.parameter_size)
command = PusTelecommand(service=8, subservice=128, ssc=29, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft((QueueCommands.PRINT,
"P60 Dock: Testing invalid address handling in set param command"))
invalid_address = bytearray([0x01, 0xF4])
parameter_size = 2
parameter = 1
command = pack_set_param_command(object_id, invalid_address, parameter_size, parameter)
command = PusTelecommand(service=8, subservice=128, ssc=30, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if P60DockTestProcedure.all or P60DockTestProcedure.invalid_parameter_size_test:
tc_queue.appendleft(
(QueueCommands.PRINT,
"P60 Dock: Testing handling of invalid parameter sizes in get-param command")
)
invalid_size = 5
command = pack_get_param_command(
object_id, TableIds.hk, P60DockHkTable.temperature1.parameter_address, invalid_size
)
command = PusTelecommand(service=8, subservice=128, ssc=31, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft(
(QueueCommands.PRINT,
"P60 Dock: Testing handling of invalid parameter size in set-param command")
)
parameter = 1
command = pack_set_param_command(
object_id, P60DockConfigTable.out_en_3.parameter_address, invalid_size, parameter
)
command = PusTelecommand(service=8, subservice=128, ssc=32, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
return tc_queue

55
pus_tc/pdu1.py Normal file
View File

@@ -0,0 +1,55 @@
# -*- coding: utf-8 -*-
"""
@file tmtcc_tc_pdu1.py
@brief PDU2 tests
@author J. Meier
@date 17.12.2020
"""
from tmtccmd.core.definitions import QueueCommands
from tmtccmd.pus_tc.packer import TcQueueT
from tmtccmd.ecss.tc import PusTelecommand
from gomspace.gomspace_common import *
from pus_tc.p60dock import P60DockConfigTable
from gomspace.gomspace_pdu_definitions import *
class PDU1TestProcedure:
"""
@brief Use this class to define the tests to perform for the PDU2.
@details Setting all to True will run all tests.
Setting all to False will only run the tests set to True.
"""
all = False
reboot = False
ping = False
read_temperature = False
def pack_pdu1_test_into(
pdu1_object_id: bytearray, p60dock_object_id: bytearray, tc_queue: TcQueueT
):
tc_queue.appendleft((QueueCommands.PRINT, "Testing PDU1"))
tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Enabling PDU1"))
command = pack_set_param_command(
p60dock_object_id, P60DockConfigTable.out_en_1.parameter_address,
P60DockConfigTable.out_en_1.parameter_size, Channel.on
)
command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if PDU1TestProcedure.all or PDU1TestProcedure.ping:
tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Ping Test"))
ping_data = bytearray([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
command = pack_ping_command(pdu1_object_id, ping_data)
command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if PDU1TestProcedure.all or PDU1TestProcedure.read_temperature:
tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Testing temperature reading"))
command = pack_get_param_command(
pdu1_object_id, TableIds.hk, PDUHkTable.temperature.parameter_address,
PDUHkTable.temperature.parameter_size
)
command = PusTelecommand(service=8, subservice=128, ssc=24, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())

104
pus_tc/pdu2.py Normal file
View File

@@ -0,0 +1,104 @@
# -*- coding: utf-8 -*-
"""
@file tmtcc_tc_pdu2.py
@brief PDU2 tests
@author J. Meier
@date 17.12.2020
"""
from tmtccmd.core.definitions import QueueCommands
from tmtccmd.pus_tc.packer import TcQueueT
from tmtccmd.ecss.tc import PusTelecommand
from gomspace.gomspace_common import *
from gomspace.gomspace_pdu_definitions import *
from pus_tc.p60dock import P60DockConfigTable
class PDU2TestProcedure:
"""
@brief Use this class to define the tests to perform for the PDU2.
@details Setting all to True will run all tests.
Setting all to False will only run the tests set to True.
"""
all = False
reboot = False
read_gnd_wdt = False
gnd_wdt_reset = False
ping = False
channel_2_off = False # Reaction wheels 5V
read_temperature = True
read_channel_2_state = False # Reaction wheels 5V
read_cur_lu_lim_0 = False # OBC
channel_2_on = False # Reaction wheels 5V
invalid_table_id_test = False # Test to check if software properly handles invalid table ids
invalid_address_test = False # Test to check if software properly handles invalid addresses
invalid_parameter_size_test = False
request_hk_table = False
def pack_pdu2_test_into(pdu2_object_id: bytearray, p60dock_object_id: bytearray, tc_queue: TcQueueT) -> TcQueueT:
tc_queue.appendleft((QueueCommands.PRINT, "Testing PDU2"))
tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Enabling PDU2"))
command = pack_set_param_command(p60dock_object_id, P60DockConfigTable.out_en_3.parameter_address,
P60DockConfigTable.out_en_3.parameter_size, Channel.on)
command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if PDU2TestProcedure.all or PDU2TestProcedure.reboot:
tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Reboot"))
command = pack_reboot_command(pdu2_object_id)
command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if PDU2TestProcedure.all or PDU2TestProcedure.read_gnd_wdt:
tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Reading ground watchdog timer value"))
command = pack_get_param_command(pdu2_object_id, TableIds.hk, PDUHkTable.wdt_gnd_left.parameter_address,
PDUHkTable.wdt_gnd_left.parameter_size)
command = PusTelecommand(service=8, subservice=128, ssc=20, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if PDU2TestProcedure.all or PDU2TestProcedure.gnd_wdt_reset:
tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Testing ground watchdog reset"))
command = pack_gnd_wdt_reset_command(pdu2_object_id)
command = PusTelecommand(service=8, subservice=128, ssc=21, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if PDU2TestProcedure.all or PDU2TestProcedure.ping:
tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Ping Test"))
ping_data = bytearray([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
command = pack_ping_command(pdu2_object_id, ping_data)
command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if PDU2TestProcedure.all or PDU2TestProcedure.channel_2_on:
tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Testing setting output channel 2 on (TCS Heater)"))
command = pack_set_param_command(pdu2_object_id, PDUConfigTable.out_en_2.parameter_address,
PDUConfigTable.out_en_2.parameter_size, Channel.on)
command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if PDU2TestProcedure.all or PDU2TestProcedure.read_temperature:
tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Testing temperature reading"))
command = pack_get_param_command(pdu2_object_id, TableIds.hk, PDUHkTable.temperature.parameter_address,
PDUHkTable.temperature.parameter_size)
command = PusTelecommand(service=8, subservice=128, ssc=24, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if PDU2TestProcedure.all or PDU2TestProcedure.read_channel_2_state:
tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Reading output channel 2 state (TCS Heater)"))
command = pack_get_param_command(pdu2_object_id, TableIds.config, PDUConfigTable.out_en_2.parameter_address,
PDUConfigTable.out_en_2.parameter_size)
command = PusTelecommand(service=8, subservice=128, ssc=25, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if PDU2TestProcedure.all or PDU2TestProcedure.read_cur_lu_lim_0:
tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Reading current limit value of output channel 0 (OBC)"))
command = pack_get_param_command(pdu2_object_id, TableIds.config, PDUConfigTable.cur_lu_lim_0.parameter_address,
PDUConfigTable.cur_lu_lim_0.parameter_size)
command = PusTelecommand(service=8, subservice=128, ssc=26, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if PDU2TestProcedure.all or PDU2TestProcedure.channel_2_off:
tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Testing setting output channel 2 off"))
command = pack_set_param_command(pdu2_object_id, PDUConfigTable.out_en_2.parameter_address,
PDUConfigTable.out_en_2.parameter_size, Channel.off)
command = PusTelecommand(service=8, subservice=128, ssc=27, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if PDU2TestProcedure.all or PDU2TestProcedure.request_hk_table:
tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Requesting housekeeping table"))
command = pack_request_full_hk_table_command(pdu2_object_id)
command = PusTelecommand(service=8, subservice=128, ssc=28, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
return tc_queue

View File

@@ -0,0 +1,42 @@
# -*- coding: utf-8 -*-
"""
@file tmtcc_tc_service200_mode.py
@brief PUS Service 200: PUS custom service 200: Mode commanding
@author R. Mueller
@date 02.05.2020
"""
from tmtccmd.core.definitions import QueueCommands
from tmtccmd.ecss.tc import PusTelecommand
from tmtccmd.pus_tc.packer import TcQueueT
from tmtccmd.pus_tc.service_200_mode import pack_mode_data
from config.object_ids import TEST_DEVICE_ID
TEST_DEVICE_OBJ_ID = TEST_DEVICE_ID
def pack_service200_test_into(tc_queue: TcQueueT) -> TcQueueT:
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200"))
# Object ID: Dummy Device
obj_id = TEST_DEVICE_OBJ_ID
# Set On Mode
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode On"))
mode_data = pack_mode_data(obj_id, 1, 0)
command = PusTelecommand(service=200, subservice=1, ssc=2000, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
# Set Normal mode
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode Normal"))
mode_data = pack_mode_data(obj_id, 2, 0)
command = PusTelecommand(service=200, subservice=1, ssc=2010, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
# Set Raw Mode
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode Raw"))
mode_data = pack_mode_data(obj_id, 3, 0)
command = PusTelecommand(service=200, subservice=1, ssc=2020, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
# Set Off Mode
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode Off"))
mode_data = pack_mode_data(obj_id, 0, 0)
command = PusTelecommand(service=200, subservice=1, ssc=2030, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft((QueueCommands.EXPORT_LOG, "log/tmtc_log_service200.txt"))
return tc_queue

68
pus_tc/tc_packer_hook.py Normal file
View File

@@ -0,0 +1,68 @@
"""
@brief This file transfers control of TC packing to the user
@details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs.
"""
import os
from collections import deque
from typing import Union
from tmtccmd.core.definitions import CoreServiceList
from tmtccmd.utility.tmtcc_logger import get_logger
from tmtccmd.pus_tc.definitions import TcQueueT
from tmtccmd.pus_tc.service_5_event import pack_generic_service5_test_into
from tmtccmd.pus_tc.service_17_test import pack_service17_ping_command
from tmtccmd.core.object_id_manager import get_object_id
from pus_tc.p60dock import pack_p60dock_test_into
from pus_tc.pdu2 import pack_pdu2_test_into
from pus_tc.pdu1 import pack_pdu1_test_into
from pus_tc.acu import pack_acu_test_into
from pus_tc.tmp1075 import pack_tmp1075_test_into
from pus_tc.heater import pack_heater_test_into
from config.definitions import CustomServiceList
from config.object_ids import ObjIdIds
LOGGER = get_logger()
def pack_service_queue_user(service: Union[str, int], op_code: str, service_queue: TcQueueT):
if service == CoreServiceList.SERVICE_5:
return pack_generic_service5_test_into(service_queue)
if service == CoreServiceList.SERVICE_17:
return service_queue.appendleft(pack_service17_ping_command(ssc=1700).pack_command_tuple())
if service == CustomServiceList.P60DOCK.value:
object_id = get_object_id(ObjIdIds.P60DOCK_HANDLER_ID)
return pack_p60dock_test_into(object_id, service_queue)
if service == CustomServiceList.PDU1.value:
pdu1_object_id = get_object_id(ObjIdIds.PDU1_HANDLER_ID)
p60dock_object_id = get_object_id(ObjIdIds.P60DOCK_HANDLER_ID)
return pack_pdu1_test_into(pdu1_object_id, p60dock_object_id, service_queue)
if service == CustomServiceList.PDU2.value:
pdu2_object_id = get_object_id(ObjIdIds.PDU2_HANDLER_ID)
p60dock_object_id = get_object_id(ObjIdIds.P60DOCK_HANDLER_ID)
return pack_pdu2_test_into(pdu2_object_id, p60dock_object_id, service_queue)
if service == CustomServiceList.ACU.value:
object_id = get_object_id(ObjIdIds.ACU_HANDLER_ID)
return pack_acu_test_into(object_id, service_queue)
if service == CustomServiceList.TMP1075_1.value:
object_id = get_object_id(ObjIdIds.TMP1075_1_HANDLER_ID)
return pack_tmp1075_test_into(object_id, service_queue)
if service == CustomServiceList.TMP1075_2.value:
object_id = get_object_id(ObjIdIds.TMP1075_2_HANDLER_ID)
return pack_tmp1075_test_into(object_id, service_queue)
if service == CustomServiceList.HEATER.value:
object_id = get_object_id(ObjIdIds.HEATER)
return pack_heater_test_into(object_id, service_queue)
LOGGER.warning("Invalid Service !")
def create_total_tc_queue_user() -> TcQueueT:
if not os.path.exists("log"):
os.mkdir("log")
tc_queue = deque()
pack_generic_service5_test_into(tc_queue)
tc_queue.appendleft(pack_service17_ping_command(ssc=1700).pack_command_tuple())
return tc_queue

63
pus_tc/tmp1075.py Normal file
View File

@@ -0,0 +1,63 @@
# -*- coding: utf-8 -*-
"""
@file tmtcc_tc_tmp1075.py
@brief TMP1075 tests
@author J. Meier
@date 06.01.2021
"""
from tmtccmd.core.definitions import QueueCommands
from tmtccmd.pus_tc.packer import TcQueueT
from tmtccmd.ecss.tc import PusTelecommand
from pus_tc.service_200_mode import pack_mode_data
class Tmp1075TestProcedure:
"""
@brief Use this class to define the tests to perform for the Tmp1075.
@details Setting all to True will run all tests.
Setting all to False will only run the tests set to True.
"""
all = False
start_adc_conversion = False
get_temp = False
set_mode_normal = True # Setting mode to normal starts continuous temperature reading
set_mode_on = False # If mode is MODE_ON, temperature will only be read on command
class Tmp1075ActionIds:
get_temp = bytearray([0x0, 0x0, 0x0, 0x01])
start_adc_conversion = bytearray([0x0, 0x0, 0x0, 0x02])
def pack_tmp1075_test_into(object_id: bytearray, tc_queue: TcQueueT) -> TcQueueT:
tc_queue.appendleft(
(QueueCommands.PRINT,
"Testing Tmp1075 Temperature Sensor Handler with object id: 0x" + object_id.hex())
)
if Tmp1075TestProcedure.all or Tmp1075TestProcedure.start_adc_conversion:
tc_queue.appendleft((QueueCommands.PRINT, "TMP1075: Starting new temperature conversion"))
command = object_id + Tmp1075ActionIds.start_adc_conversion
command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if Tmp1075TestProcedure.all or Tmp1075TestProcedure.get_temp:
tc_queue.appendleft((QueueCommands.PRINT, "TMP1075: Read temperature"))
command = object_id + Tmp1075ActionIds.get_temp
command = PusTelecommand(service=8, subservice=128, ssc=24, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if Tmp1075TestProcedure.set_mode_normal:
tc_queue.appendleft((QueueCommands.PRINT, "TMP1075: Set Mode Normal"))
mode_data = pack_mode_data(object_id, 2, 0)
command = PusTelecommand(service=200, subservice=1, ssc=220, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
if Tmp1075TestProcedure.set_mode_on:
tc_queue.appendleft((QueueCommands.PRINT, "TMP1075: Set Mode On"))
mode_data = pack_mode_data(object_id, 1, 0)
command = PusTelecommand(service=200, subservice=1, ssc=221, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
return tc_queue

View File

@@ -1,34 +0,0 @@
"""
@brief This file transfers control of TC packing to the user
@details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs.
"""
import os
from collections import deque
from typing import Union
from config.tmtcc_definitions import ServiceList
from tmtc_core.utility.tmtcc_logger import get_logger
from tmtc_core.pus_tc.tmtcc_pus_tc_base import TcQueueT
from tmtc_core.pus_tc.tmtcc_tc_service5_event import pack_service5_test_into
from tmtc_core.pus_tc.tmtcc_tc_service17_test import pack_service17_ping_command
LOGGER = get_logger()
def pack_service_queue_user(service: Union[int, str], op_code: int, service_queue: TcQueueT):
if service == ServiceList.SERVICE_5:
return pack_service5_test_into(service_queue)
if service == ServiceList.SERVICE_17:
return service_queue.appendleft(pack_service17_ping_command(ssc=1700).pack_command_tuple())
LOGGER.warning("Invalid Service !")
def create_total_tc_queue_user() -> TcQueueT:
if not os.path.exists("log"):
os.mkdir("log")
tc_queue = deque()
pack_service5_test_into(tc_queue)
tc_queue.appendleft(pack_service17_ping_command(ssc=1700).pack_command_tuple())
return tc_queue

View File

@@ -0,0 +1,22 @@
# -*- coding: utf-8 -*-
"""
@file tmtcc_tc_solar_array_deployment.py
@brief The test function in this file simply returns a command which triggers the solar array deployment.
@author J. Meier
@date 15.02.2021
"""
from tmtc_core.core.tmtc_core_definitions import QueueCommands
from tmtc_core.pus_tc.tmtcc_pus_tc_packer import TcQueueT
from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand
class ActionIds:
DEPLOY_SOLAR_ARRAYS = bytearray([0x0, 0x0, 0x0, 0x5])
def pack_solar_array_deployment_test_into(object_id: bytearray, tc_queue: TcQueueT) -> TcQueueT:
tc_queue.appendleft((QueueCommands.PRINT, "Testing S/A Deployment"))
command = object_id + ActionIds.DEPLOY_SOLAR_ARRAYS
command = PusTelecommand(service=8, subservice=128, ssc=200, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())

View File

@@ -0,0 +1,31 @@
# -*- coding: utf-8 -*-
"""
@file tmtcc_tc_syrlinks_hk_handler.py
@brief Syrlinks Hk Handler tests
@author J. Meier
@date 13.12.2020
"""
from tmtc_core.core.tmtc_core_definitions import QueueCommands
from tmtc_core.pus_tc.tmtcc_pus_tc_packer import TcQueueT
from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand
from tmtc_core.pus_tc.tmtcc_tc_service_3_housekeeping import *
class SetIds:
RX_REGISTERS_DATASET = 1
TX_REGISTERS_DATASET = 2
def pack_syrlinks_hk_handler_test_into(object_id: bytearray, tc_queue: TcQueueT) -> TcQueueT:
tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Get RX Registers"))
sid = make_sid(object_id, SetIds.RX_REGISTERS_DATASET)
command = generate_one_hk_command(sid, 200)
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Get TX Registers"))
sid = make_sid(object_id, SetIds.TX_REGISTERS_DATASET)
command = generate_one_hk_command(sid, 201)
tc_queue.appendleft(command.pack_command_tuple())
return tc_queue

View File

@@ -3,13 +3,14 @@
@details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs.
"""
from tmtccmd.pus_tm.service_8_functional_cmd import Service8TM
from tmtccmd.ecss.tm import PusTelemetry
from tmtccmd.utility.tmtcc_logger import get_logger
from tmtc_core.pus_tm.tmtcc_pus_tm_base import PusTelemetry
from tmtc_core.utility.tmtcc_logger import get_logger
from tmtc_core.pus_tm.tmtcc_tm_service1 import Service1TM
from tmtc_core.pus_tm.tmtcc_tm_service5 import Service5TM
from tmtc_core.pus_tm.tmtcc_tm_service17 import Service17TM
from tmtccmd.pus_tm.service_1_verification import Service1TM
from tmtccmd.pus_tm.service_3_base import Service3Base
from tmtccmd.pus_tm.service_5_event import Service5TM
from tmtccmd.pus_tm.service_17_test import Service17TM
LOGGER = get_logger()
@@ -18,8 +19,13 @@ def tm_user_factory_hook(raw_tm_packet: bytearray) -> PusTelemetry:
service_type = raw_tm_packet[7]
if service_type == 1:
return Service1TM(raw_tm_packet)
if service_type == 3:
return Service3Base(raw_tm_packet)
if service_type == 5:
return Service5TM(raw_tm_packet)
if service_type == 8:
service_8_tm = Service8TM(raw_tm_packet)
return Service8TM(raw_tm_packet)
if service_type == 17:
return Service17TM(raw_tm_packet)
LOGGER.info("The service " + str(service_type) + " is not implemented in Telemetry Factory")

30
pus_tm/hk_handling.py Normal file
View File

@@ -0,0 +1,30 @@
"""
@brief This file transfers control of housekeeping handling (PUS service 3) to the
developer
@details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs.
"""
from typing import Tuple
from tmtccmd.pus_tm.service_3_base import Service3Base
from tmtccmd.utility.tmtcc_logger import get_logger
LOGGER = get_logger()
def handle_user_hk_packet(object_id: int, set_id: int, hk_data: bytearray,
service3_packet: Service3Base) -> Tuple[list, list, bytearray, int]:
"""
This function is called when a Service 3 Housekeeping packet is received.
@param object_id:
@param set_id:
@param hk_data:
@param service3_packet:
@return: Expects a tuple, consisting of two lists, a bytearray and an integer
The first list contains the header columns, the second list the list with
the corresponding values. The bytearray is the validity buffer, which is usually appended
at the end of the housekeeping packet. The last value is the number of parameters.
"""
LOGGER.info("Service3TM: Parsing for this SID has not been implemented.")
return [], [], bytearray(), 0

31
pus_tm/service_8_hook.py Normal file
View File

@@ -0,0 +1,31 @@
from typing import Tuple
from config.object_ids import ObjIdIds
def user_analyze_service_8_data(
object_id: int, action_id: int, custom_data: bytearray) -> Tuple[list, list]:
"""
This function is called by the TMTC core if a Service 8 data reply (subservice 130)
is received. The user can return a tuple of two lists, where the first list
is a list of header strings to print and the second list is a list of values to print.
The TMTC core will take care of printing both lists and logging them.
@param object_id:
@param action_id:
@param custom_data:
@return:
"""
if object_id == ObjIdIds.PDU2_HANDLER_ID.value:
header_list = ['PDU2 Service 8 Reply']
data_string = str()
for index in range(len(custom_data)):
data_string += str(hex(custom_data[index])) + " , "
data_string = data_string.rstrip()
data_string = data_string.rstrip(',')
data_string = data_string.rstrip()
content_list = [data_string]
else:
header_list = []
content_list = []
return header_list, content_list

View File

@@ -1,26 +0,0 @@
"""
@brief This file transfers control of housekeeping handling (PUS service 3) to the
developer
@details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs.
"""
from typing import Tuple
from tmtc_core.pus_tm.obsw_tm_service_3 import Service3Base
from tmtc_core.utility.tmtcc_logger import get_logger
LOGGER = get_logger()
def handle_user_hk_packet(
object_id: bytearray, hk_data: bytearray,
service3_packet: Service3Base) -> Tuple[list, list, bytearray]:
"""
This function is called when a Service 3 Housekeeping packet is received.
@param object_id:
@param hk_data:
@param service3_packet:
@return:
"""
LOGGER.info("Service3TM: Parsing for this SID has not been implemented.")
return [], [], bytearray()

View File

@@ -0,0 +1,81 @@
"""
@brief This file transfers control of housekeeping handling (PUS service 3) to the
developer
@details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs.
"""
import struct
from typing import Tuple
from config.tmtcc_object_ids import ObjectIds
from tmtc_core.pus_tm.tmtcc_tm_service3_base import Service3Base
from tmtc_core.utility.tmtcc_logger import get_logger
from config.tmtcc_object_ids import ObjectIds
from pus_tc.tmtcc_tc_syrlinks_hk_handler import SetIds
LOGGER = get_logger()
def handle_user_hk_packet(object_id: ObjectIds, set_id: int, hk_data: bytearray,
service3_packet: Service3Base) -> Tuple[list, list, bytearray, int]:
"""
This function is called when a Service 3 Housekeeping packet is received.
Please note that the object IDs should be compared by value because direct comparison of
enumerations does not work in Python. For example use:
if object_id.value == ObjectIds.TEST_OBJECT.value
to test equality based on the object ID list.
@param object_id:
@param set_id:
@param hk_data:
@param service3_packet:
@return: Expects a tuple, consisting of two lists, a bytearray and an integer
The first list contains the header columns, the second list the list with
the corresponding values. The bytearray is the validity buffer, which is usually appended
at the end of the housekeeping packet. The last value is the number of parameters.
"""
if object_id == ObjectIds.SYRLINKS_HK_HANDLER:
if set_id == SetIds.RX_REGISTERS_DATASET:
return handle_syrlinks_rx_registers_dataset(hk_data)
elif set_id == SetIds.TX_REGISTERS_DATASET:
return handle_syrlinks_tx_registers_dataset(hk_data)
else:
LOGGER.info("Serive 3 TM: Syrlinks handler reply with unknown set id")
return [], [], bytearray(), 0
else:
LOGGER.info("Service 3 TM: Parsing for this SID has not been implemented.")
return [], [], bytearray(), 0
def handle_syrlinks_rx_registers_dataset(hk_data: bytearray) -> Tuple[list, list, bytearray, int]:
hk_header = []
hk_content = []
validity_buffer = bytearray()
hk_header = ["RX Status", "RX Sensitivity", "RX Frequency Shift", "RX IQ Power", "RX AGC Value", "RX Demod Eb",
"RX Demod N0", "RX Datarate"]
rx_status = hk_data[0]
rx_sensitivity = struct.unpack('!I', hk_data[1:5])
rx_frequency_shift = struct.unpack('!I', hk_data[5:9])
rx_iq_power = struct.unpack('!H', hk_data[9:11])
rx_agc_value = struct.unpack('!H', hk_data[11:13])
rx_demod_eb = struct.unpack('!I', hk_data[13:17])
rx_demod_n0 = struct.unpack('!I', hk_data[17:21])
rx_data_rate = hk_data[21]
hk_content = [rx_status, rx_sensitivity, rx_frequency_shift, rx_iq_power, rx_agc_value, rx_demod_eb, rx_demod_n0,
rx_data_rate]
return hk_header, hk_content, validity_buffer, 8
def handle_syrlinks_tx_registers_dataset(hk_data: bytearray) -> Tuple[list, list, bytearray, int]:
hk_header = []
hk_content = []
validity_buffer = bytearray()
hk_header = ["TX Status", "TX Waveform", "TX AGC value"]
tx_status = hk_data[0]
tx_waveform = hk_data[1]
tx_agc_value = struct.unpack('!H', hk_data[2:4])
hk_content = [tx_status, tx_waveform, tx_agc_value]
return hk_header, hk_content, validity_buffer, 3

View File

@@ -1,4 +1 @@
crcmod>=1.7
PyQt5>=5.15.1
PyQt5-stubs>=5.14.2.2
pyserial>=3.4
tmtccmd>=1.0.0

View File

@@ -16,7 +16,7 @@ Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
@@ -26,11 +26,14 @@ limitations under the License.
@author R. Mueller
"""
from tmtc_core.tmtcc_runner import run_tmtc_client
from tmtccmd.runner import initialize_tmtc_commander, run_tmtc_commander
from config.hook_implementations import EiveHookObject
def main():
run_tmtc_client(False)
hook_obj = EiveHookObject()
initialize_tmtc_commander(hook_object=hook_obj)
run_tmtc_commander(False)
if __name__ == "__main__":

View File

@@ -16,7 +16,7 @@ Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
@@ -26,11 +26,14 @@ limitations under the License.
@author R. Mueller
"""
from tmtc_core.tmtcc_runner import run_tmtc_client
from config.hook_implementations import EiveHookObject
from tmtccmd.runner import initialize_tmtc_commander, run_tmtc_commander
def main():
run_tmtc_client(True)
hook_obj = EiveHookObject()
initialize_tmtc_commander(hook_object=hook_obj)
run_tmtc_commander(True)
if __name__ == "__main__":

Submodule tmtc_core deleted from bdb8d5533a

1
tmtccmd Submodule

Submodule tmtccmd added at 1773f62856

View File

@@ -1,8 +1,3 @@
"""
@brief This file transfers control of the command line argument parsing to the user.
@details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs.
"""
def parse_input_arguments_user(print_known_args: bool = False, print_unknown_args: bool = False):
@@ -10,6 +5,6 @@ def parse_input_arguments_user(print_known_args: bool = False, print_unknown_arg
This function by default will build the default argument parser. A custom CLI parser can be
built in this function if required.
"""
from tmtc_core.utility.tmtcc_core_args_parser import parse_default_input_arguments
from tmtccmd.defaults.args_parser import parse_default_input_arguments
parse_default_input_arguments(print_known_args=print_known_args,
print_unknown_args=print_unknown_args)