merged new tmtc

This commit is contained in:
Jakob Meier 2021-03-23 13:08:51 +01:00
commit 146cb52b66
43 changed files with 419 additions and 561 deletions

6
.gitmodules vendored
View File

@ -1,3 +1,3 @@
[submodule "tmtc_core"]
path = tmtc_core
url = https://git.ksat-stuttgart.de/irs/eive/tmtc_core.git
[submodule "tmtccmd"]
path = tmtccmd
url = https://github.com/rmspacefish/tmtccmd.git

View File

@ -13,7 +13,7 @@
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_client_cli.py" />
<option name="PARAMETERS" value="-m 1 -c 2 --hk" />
<option name="PARAMETERS" value="-m 2 -c 2 --hk" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" />
<option name="MODULE_MODE" value="false" />

View File

@ -13,7 +13,7 @@
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_client_cli.py" />
<option name="PARAMETERS" value="-m 3 -c 2 -s 17 -t 3" />
<option name="PARAMETERS" value="-m 1 -c 2 -s 17 -t 3" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" />
<option name="MODULE_MODE" value="false" />

View File

@ -13,7 +13,7 @@
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_client_cli.py" />
<option name="PARAMETERS" value="-m 3 -c 2 -s 200 -t 4" />
<option name="PARAMETERS" value="-m 1 -c 2 -s 200 -t 4" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" />
<option name="MODULE_MODE" value="false" />

View File

@ -13,7 +13,7 @@
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_client_cli.py" />
<option name="PARAMETERS" value="-m 3 -c 2 -s 2 -t 4" />
<option name="PARAMETERS" value="-m 1 -c 2 -s 2 -t 4" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" />
<option name="MODULE_MODE" value="false" />

View File

@ -13,7 +13,7 @@
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_client_cli.py" />
<option name="PARAMETERS" value="-m 3 -c 2 -s 3 -p --hk" />
<option name="PARAMETERS" value="-m 1 -c 2 -s 3 -p --hk" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" />
<option name="MODULE_MODE" value="false" />

View File

@ -13,7 +13,7 @@
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_client_cli.py" />
<option name="PARAMETERS" value="-m 3 -c 2 -s 8 -t 8" />
<option name="PARAMETERS" value="-m 1 -c 2 -s 8 -t 8" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" />
<option name="MODULE_MODE" value="false" />

View File

@ -13,7 +13,7 @@
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_core/tmtcc_runner.py" />
<option name="PARAMETERS" value="-m 2 -c 2 --boardIP=127.0.0.1" />
<option name="PARAMETERS" value="-m 0 -c 2 --boardIP=127.0.0.1" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" />
<option name="MODULE_MODE" value="false" />

View File

@ -13,7 +13,7 @@
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_client_cli.py" />
<option name="PARAMETERS" value="-m 3 -c 2 -s ACU --boardIP 192.168.133.10 --clientIP 192.168.133.4 -l" />
<option name="PARAMETERS" value="-m 1 -c 2 -s acu --boardIP 192.168.133.10 --clientIP 192.168.133.4 -l" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" />
<option name="MODULE_MODE" value="false" />

View File

@ -13,7 +13,7 @@
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_client_cli.py" />
<option name="PARAMETERS" value="-m 3 -c 2 -s P60DOCK --boardIP 192.168.133.10 --clientIP 192.168.133.4 -l" />
<option name="PARAMETERS" value="-m 1 -c 2 -s p60dock --boardIP 192.168.133.10 --clientIP 192.168.133.4 -l" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" />
<option name="MODULE_MODE" value="false" />

View File

@ -13,7 +13,7 @@
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_client_cli.py" />
<option name="PARAMETERS" value="-m 3 -c 2 -s PDU1 --boardIP 192.168.133.10 --clientIP 192.168.133.4 -l" />
<option name="PARAMETERS" value="-m 1 -c 2 -s pdu1 --boardIP 192.168.133.10 --clientIP 192.168.133.4 -l" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" />
<option name="MODULE_MODE" value="false" />

View File

@ -13,7 +13,7 @@
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_client_cli.py" />
<option name="PARAMETERS" value="-m 3 -c 2 -s PDU2 --boardIP 192.168.133.10 --clientIP 192.168.133.4 -l -t 4" />
<option name="PARAMETERS" value="-m 1 -c 2 -s pdu2 --boardIP 192.168.133.10 --clientIP 192.168.133.4 -l -t 4" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" />
<option name="MODULE_MODE" value="false" />

View File

@ -3,7 +3,7 @@
@details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs.
"""
from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand
from tmtccmd.pus_tc.base import PusTelecommand
def command_preparation_hook() -> PusTelecommand:

19
config/custom_mode_op.py Normal file
View File

@ -0,0 +1,19 @@
"""
@brief This file transfers control of custom mode handling to the user.
@details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs.
"""
import enum
from tmtccmd.core.backend import TmTcHandler
from tmtccmd.utility.tmtcc_logger import get_logger
LOGGER = get_logger()
class CustomModeList(enum.IntEnum):
pass
def custom_mode_operation(tmtc_backend: TmTcHandler, mode: int):
pass

17
config/definitions.py Normal file
View File

@ -0,0 +1,17 @@
"""
@brief This file transfers control of the custom definitions like modes to the user.
@details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs.
"""
import enum
class CustomServiceList(enum.Enum):
P60DOCK = "p60dock"
PDU1 = "pdu1"
PDU2 = "pdu2"
ACU = "acu"
TMP1075_1 = "tmp1075_1"
TMP1075_2 = "tmp1075_2"
HEATER = "heater"

54
config/globals_config.py Normal file
View File

@ -0,0 +1,54 @@
"""
@brief This file transfers definitions of global variables to the user.
@details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs.
"""
import enum
import argparse
# All globals can be added here and will be part of a globals dictionary.
from config.definitions import CustomServiceList
from config.custom_mode_op import CustomModeList
from tmtccmd.core.definitions import CoreComInterfaces, CoreServiceList
from tmtccmd.defaults.globals_setup import set_default_globals_pre_args_parsing, \
set_default_globals_post_args_parsing, get_core_service_dict
from tmtccmd.core.globals_manager import update_global
from tmtccmd.core.definitions import CoreGlobalIds
from tmtccmd.utility.tmtcc_logger import get_logger
LOGGER = get_logger()
class CustomGlobalIds(enum.Enum):
from enum import auto
pass
def set_globals_pre_args_parsing(gui: bool = False):
set_default_globals_pre_args_parsing(gui=gui, apid=0x65, com_if_id=CoreComInterfaces.TCPIP_UDP)
servicelist = get_core_service_dict()
update_global(CoreGlobalIds.CURRENT_SERVICE, CoreServiceList.SERVICE_17)
update_global(CoreGlobalIds.SERVICE_DICT, servicelist)
def add_globals_post_args_parsing(args: argparse.Namespace):
set_default_globals_post_args_parsing(
args=args, custom_service_list=CustomServiceList, custom_mode_list=CustomModeList,
custom_com_if_list=None
)
def set_up_serial_cfg(com_if: CoreComInterfaces):
from tmtccmd.defaults.com_setup import default_serial_cfg_setup
default_serial_cfg_setup(com_if=com_if)
def set_up_ethernet_cfg():
from tmtccmd.defaults.com_setup import default_tcpip_udp_cfg_setup
default_tcpip_udp_cfg_setup()

View File

@ -0,0 +1,75 @@
import argparse
from typing import Union, Dict, Tuple
from tmtccmd.pus_tm.service_3_base import Service3Base
from tmtccmd.pus_tm.base import PusTelemetry
from tmtccmd.pus_tc.base import TcQueueT, PusTelecommand
from com_if.com_interface_base import CommunicationInterface
from core.backend import TmTcHandler
from tmtccmd.core.hook_base import TmTcHookBase
from tmtccmd.utility.tmtc_printer import TmTcPrinter
class EiveHookObject(TmTcHookBase):
def get_version(self) -> str:
from config.version import SW_NAME, VERSION_MAJOR, VERSION_MINOR, VERSION_SUBMINOR
return f"{SW_NAME} {VERSION_MAJOR}.{VERSION_MINOR}.{VERSION_SUBMINOR}"
def add_globals_pre_args_parsing(self, gui: bool = False):
from config.globals_config import set_globals_pre_args_parsing
set_globals_pre_args_parsing(gui=gui)
def add_globals_post_args_parsing(self, args: argparse.Namespace):
from config.globals_config import add_globals_post_args_parsing
add_globals_post_args_parsing(args)
def assign_communication_interface(self, com_if: int, tmtc_printer: TmTcPrinter) -> \
Union[CommunicationInterface, None]:
from tmtccmd.defaults.com_setup import create_communication_interface_default
return create_communication_interface_default(com_if=com_if, tmtc_printer=tmtc_printer)
def perform_mode_operation(self, tmtc_backend: TmTcHandler, mode: int):
from config.custom_mode_op import custom_mode_operation
custom_mode_operation(mode=mode, tmtc_backend=tmtc_backend)
def pack_service_queue(self, service: int, op_code: str, service_queue: TcQueueT):
from pus_tc.tc_packer_hook import pack_service_queue_user
pack_service_queue_user(service=service, op_code=op_code, service_queue=service_queue)
def tm_user_factory_hook(self, raw_tm_packet: bytearray) -> PusTelemetry:
from pus_tm.factory_hook import tm_user_factory_hook
return tm_user_factory_hook(raw_tm_packet=raw_tm_packet)
def set_object_ids(self) -> Dict[int, bytearray]:
from config.object_ids import set_object_ids
return set_object_ids()
def pack_total_service_queue(self) -> Union[None, TcQueueT]:
from pus_tc.tc_packer_hook import create_total_tc_queue_user
return create_total_tc_queue_user()
def command_preparation_hook(self) -> Union[None, PusTelecommand]:
from custom_hooks import command_preparation_hook
return command_preparation_hook()
@staticmethod
def handle_service_8_telemetry(
object_id: int, action_id: int, custom_data: bytearray
) -> Tuple[list, list]:
from pus_tm.service_8_hook import user_analyze_service_8_data
return user_analyze_service_8_data(
object_id=object_id, action_id=action_id, custom_data=custom_data
)
@staticmethod
def handle_service_3_housekeeping(
object_id: int, set_id: int, hk_data: bytearray, service3_packet: Service3Base
) -> Tuple[list, list, bytearray, int]:
from pus_tm.hk_handling import handle_user_hk_packet
return handle_user_hk_packet(
object_id=object_id, set_id=set_id, hk_data=hk_data, service3_packet=service3_packet
)

51
config/object_ids.py Normal file
View File

@ -0,0 +1,51 @@
"""
@brief This file transfers control of the object IDs to the user.
@details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs.
"""
import enum
from typing import Dict
PUS_SERVICE_17_ID = bytearray([0x53, 0x00, 0x00, 0x17])
TEST_DEVICE_ID = bytearray([0x44, 0x00, 0xAF, 0xFE])
P60_DOCK_HANDLER = bytearray([0x44, 0x00, 0x00, 0x1])
PDU_1_HANDLER_ID = bytearray([0x44, 0x00, 0x00, 0x2])
PDU_2_HANDLER_ID = bytearray([0x44, 0x00, 0x00, 0x3])
ACU_HANDLER_ID = bytearray([0x44, 0x00, 0x00, 0x4])
TMP_1075_1_HANDLER_ID = bytearray([0x44, 0x00, 0x00, 0x5])
TMP_1075_2_HANDLER_ID = bytearray([0x44, 0x00, 0x00, 0x6])
HEATER_ID = bytearray([0x54, 0x00, 0x00, 0x1])
PCDU_HANDLER_ID = bytearray([0x44, 0x00, 0x10, 0x00])
SOLAR_ARRAY_DEPLOYMENT_ID = bytearray([0x44, 0x00, 0x10, 0x01])
class ObjIdIds(enum.IntEnum):
PUS_SERVICE_17_ID = 0
TEST_DEVICE_ID = 1
P60DOCK_HANDLER_ID = 2
PDU1_HANDLER_ID = 3
PDU2_HANDLER_ID = 4
PCDU_HANDLER_ID = 5
ACU_HANDLER_ID = 6
TMP1075_1_HANDLER_ID = 7
TMP1075_2_HANDLER_ID = 8
HEATER_ID = 9
SOLAR_ARRAY_DEPLOYMENT_ID = 10
def set_object_ids() -> Dict[int, bytearray]:
object_id_dict = ({
ObjIdIds.PUS_SERVICE_17_ID: PUS_SERVICE_17_ID,
ObjIdIds.TEST_DEVICE_ID: TEST_DEVICE_ID,
ObjIdIds.P60DOCK_HANDLER_ID: P60_DOCK_HANDLER,
ObjIdIds.PDU1_HANDLER_ID: PDU_1_HANDLER_ID,
ObjIdIds.PDU2_HANDLER_ID: PDU_2_HANDLER_ID,
ObjIdIds.ACU_HANDLER_ID: ACU_HANDLER_ID,
ObjIdIds.TMP1075_1_HANDLER_ID: TMP_1075_1_HANDLER_ID,
ObjIdIds.TMP1075_2_HANDLER_ID: TMP_1075_2_HANDLER_ID,
ObjIdIds.HEATER_ID: HEATER_ID,
ObjIdIds.PCDU_HANDLER_ID: PCDU_HANDLER_ID,
ObjIdIds.SOLAR_ARRAY_DEPLOYMENT_ID: SOLAR_ARRAY_DEPLOYMENT_ID,
})
return object_id_dict

View File

@ -1,18 +0,0 @@
"""
@brief This file transfers control of communication interface setup to the user.
@details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs.
"""
from typing import Union
from tmtc_core.com_if.tmtcc_com_interface_base import CommunicationInterface
from tmtc_core.core.tmtc_core_definitions import ComInterfaces
from tmtc_core.core.tmtcc_com_if_setup import create_communication_interface_default
from tmtc_core.utility.tmtcc_tmtc_printer import TmTcPrinter
def create_communication_interface_user(com_if: ComInterfaces, tmtc_printer: TmTcPrinter) -> \
Union[CommunicationInterface, None]:
return create_communication_interface_default(com_if, tmtc_printer)

View File

@ -1,51 +0,0 @@
"""
@brief This file transfers control of the custom definitions like modes to the user.
@details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs.
"""
import enum
from enum import auto
# Mode options, set by args parser
class ModeList(enum.Enum):
Idle = 0
ListenerMode = 1
SingleCommandMode = 2
ServiceTestMode = 3
SoftwareTestMode = 4
PromptMode = 32
class ServiceList(enum.Enum):
SERVICE_2 = 0
SERVICE_3 = auto()
SERVICE_5 = auto()
SERVICE_8 = auto()
SERVICE_9 = auto()
SERVICE_17 = auto()
SERVICE_20 = auto()
SERVICE_23 = auto()
SERVICE_200 = auto()
P60DOCK = auto()
PDU1 = auto()
PDU2 = auto()
ACU = auto()
TMP1075_1 = auto()
TMP1075_2 = auto()
HEATER = auto()
SA_DEPL = auto()
SYRLINKS = auto()
class SerialConfig(enum.Enum):
SERIAL_PORT = auto()
SERIAL_BAUD_RATE = auto()
SERIAL_TIMEOUT = auto()
SERIAL_COMM_TYPE = auto()
class EthernetConfig(enum.Enum):
SEND_ADDRESS = auto()
RECV_ADDRESS = auto()

View File

@ -1,236 +0,0 @@
"""
@brief This file transfers definitions of global variables to the user.
@details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs.
"""
import enum
import argparse
# All globals can be added here and will be part of a globals dictionary.
from config.tmtcc_definitions import ServiceList, ModeList
from tmtc_core.com_if.tmtcc_serial_com_if import SerialCommunicationType
from tmtc_core.com_if.tmtcc_serial_utilities import determine_com_port
from tmtc_core.core.tmtc_core_definitions import ComInterfaces
from tmtc_core.utility.tmtcc_logger import get_logger
class GlobalIds(enum.Enum):
from enum import auto
# Generic parameters
APID = auto()
MODE = auto()
SERVICE = auto()
SERVICELIST = auto()
COM_IF = auto()
OP_CODE = auto()
TM_TIMEOUT = auto()
# Miscellaneous
DISPLAY_MODE = auto()
USE_LISTENER_AFTER_OP = auto()
PRINT_HK = auto()
PRINT_TM = auto()
PRINT_RAW_TM = auto()
PRINT_TO_FILE = auto()
RESEND_TC = auto()
TC_SEND_TIMEOUT_FACTOR = auto()
# Config dictionaries
USE_SERIAL = auto()
SERIAL_CONFIG = auto()
USE_ETHERNET = auto()
ETHERNET_CONFIG = auto()
# Object handles
PRETTY_PRINTER = auto()
TM_LISTENER_HANDLE = auto()
COM_INTERFACE_HANDLE = auto()
TMTC_PRINTER_HANDLE = auto()
def add_globals_pre_args_parsing(gui: bool = False):
from tmtc_core.core.tmtcc_globals_manager import update_global
import pprint
update_global(GlobalIds.APID, 0x65)
update_global(GlobalIds.COM_IF, ComInterfaces.EthernetUDP)
update_global(GlobalIds.TC_SEND_TIMEOUT_FACTOR, 2)
update_global(GlobalIds.TM_TIMEOUT, 4)
update_global(GlobalIds.DISPLAY_MODE, "long")
update_global(GlobalIds.PRINT_TO_FILE, True)
update_global(GlobalIds.SERIAL_CONFIG, dict())
update_global(GlobalIds.ETHERNET_CONFIG, dict())
pp = pprint.PrettyPrinter()
update_global(GlobalIds.PRETTY_PRINTER, pp)
update_global(GlobalIds.TM_LISTENER_HANDLE, None)
update_global(GlobalIds.COM_INTERFACE_HANDLE, None)
update_global(GlobalIds.TMTC_PRINTER_HANDLE, None)
update_global(GlobalIds.PRINT_RAW_TM, False)
update_global(GlobalIds.RESEND_TC, False)
update_global(GlobalIds.OP_CODE, "0")
update_global(GlobalIds.MODE, ModeList.ListenerMode)
if gui:
set_up_ethernet_cfg()
servicelist = dict()
servicelist[ServiceList.SERVICE_2] = ["Service 2 Raw Commanding"]
servicelist[ServiceList.SERVICE_3] = ["Service 3 Housekeeping"]
servicelist[ServiceList.SERVICE_5] = ["Service 5 Event"]
servicelist[ServiceList.SERVICE_8] = ["Service 8 Functional Commanding"]
servicelist[ServiceList.SERVICE_9] = ["Service 9 Time"]
servicelist[ServiceList.SERVICE_17] = ["Service 17 Test"]
servicelist[ServiceList.SERVICE_20] = ["Service 20 Parameters"]
servicelist[ServiceList.SERVICE_23] = ["Service 23 File Management"]
servicelist[ServiceList.SERVICE_200] = ["Service 200 Mode Management"]
update_global(GlobalIds.SERVICE, ServiceList.SERVICE_17)
update_global(GlobalIds.SERVICELIST, servicelist)
def add_globals_post_args_parsing(args: argparse.Namespace):
from tmtc_core.core.tmtcc_globals_manager import update_global
from config.tmtcc_definitions import ModeList
logger = get_logger()
mode_param = ModeList.ListenerMode
if 0 <= args.mode <= 6:
if args.mode == 0:
mode_param = ModeList.GUIMode
elif args.mode == 1:
mode_param = ModeList.ListenerMode
elif args.mode == 2:
mode_param = ModeList.SingleCommandMode
elif args.mode == 3:
mode_param = ModeList.ServiceTestMode
elif args.mode == 4:
mode_param = ModeList.SoftwareTestMode
update_global(GlobalIds.MODE, mode_param)
if args.com_if == ComInterfaces.EthernetUDP.value:
com_if = ComInterfaces.EthernetUDP
elif args.com_if == ComInterfaces.Serial.value:
com_if = ComInterfaces.Serial
elif args.com_if == ComInterfaces.Dummy.value:
com_if = ComInterfaces.Dummy
elif args.com_if == ComInterfaces.QEMU.value:
com_if = ComInterfaces.QEMU
else:
com_if = ComInterfaces.Serial
update_global(GlobalIds.COM_IF, com_if)
if args.short_display_mode:
display_mode_param = "short"
else:
display_mode_param = "long"
update_global(GlobalIds.DISPLAY_MODE, display_mode_param)
service = str(args.service).lower()
if service == "2":
service = ServiceList.SERVICE_2
elif service == "3":
service = ServiceList.SERVICE_3
elif service == "5":
service = ServiceList.SERVICE_5
elif service == "8":
service = ServiceList.SERVICE_8
elif service == "9":
service = ServiceList.SERVICE_9
elif service == "17":
service = ServiceList.SERVICE_17
elif service == "20":
service = ServiceList.SERVICE_20
elif service == "23":
service = ServiceList.SERVICE_23
elif service == "p60dock":
service = ServiceList.P60DOCK
elif service == "pdu1":
service = ServiceList.PDU1
elif service == "pdu2":
service = ServiceList.PDU2
elif service == "acu":
service = ServiceList.ACU
elif service == "tmp1075_1":
service = ServiceList.TMP1075_1
elif service == "tmp1075_2":
service = ServiceList.TMP1075_2
elif service == "heater":
service = ServiceList.HEATER
elif service == "sa_depl":
service = ServiceList.SA_DEPL
elif service == "syrlinks":
service = ServiceList.SYRLINKS
else:
logger.warning("Service not known! Setting standard service 17")
service = ServiceList.SERVICE_17
update_global(GlobalIds.SERVICE, service)
if args.op_code is None:
op_code = 0
else:
op_code = str(args.op_code).lower()
update_global(GlobalIds.OP_CODE, op_code)
update_global(GlobalIds.USE_LISTENER_AFTER_OP, args.listener)
update_global(GlobalIds.TM_TIMEOUT, args.tm_timeout)
update_global(GlobalIds.PRINT_HK, args.print_hk)
update_global(GlobalIds.PRINT_TM, args.print_tm)
update_global(GlobalIds.PRINT_RAW_TM, args.raw_data_print)
update_global(GlobalIds.PRINT_TO_FILE, args.print_log)
update_global(GlobalIds.RESEND_TC, args.resend_tc)
update_global(GlobalIds.TC_SEND_TIMEOUT_FACTOR, 3)
use_serial_cfg = False
if com_if == ComInterfaces.Serial or com_if == ComInterfaces.QEMU:
use_serial_cfg = True
if use_serial_cfg:
set_up_serial_cfg(com_if)
use_ethernet_cfg = False
if com_if == ComInterfaces.EthernetUDP:
use_ethernet_cfg = True
if use_ethernet_cfg:
# TODO: Port and IP address can also be passed as CLI parameters. Use them here if applicable
set_up_ethernet_cfg()
def set_up_serial_cfg(com_if: ComInterfaces):
from tmtc_core.core.tmtcc_globals_manager import update_global
update_global(GlobalIds.USE_SERIAL, True)
from tmtc_core.core.tmtcc_globals_manager import get_global
from config.tmtcc_definitions import SerialConfig
serial_cfg_dict = get_global(GlobalIds.SERIAL_CONFIG)
if com_if == ComInterfaces.Serial:
com_port = determine_com_port()
else:
com_port = ""
serial_cfg_dict.update({SerialConfig.SERIAL_PORT: com_port})
serial_cfg_dict.update({SerialConfig.SERIAL_BAUD_RATE: 115200})
serial_cfg_dict.update({SerialConfig.SERIAL_TIMEOUT: 0.01})
serial_cfg_dict.update({SerialConfig.SERIAL_COMM_TYPE: SerialCommunicationType.DLE_ENCODING})
serial_cfg_dict.update({SerialConfig.SERIAL_FRAME_SIZE: 256})
serial_cfg_dict.update({SerialConfig.SERIAL_DLE_QUEUE_LEN: 25})
serial_cfg_dict.update({SerialConfig.SERIAL_DLE_MAX_FRAME_SIZE: 1024})
update_global(GlobalIds.SERIAL_CONFIG, serial_cfg_dict)
def set_up_ethernet_cfg():
from tmtc_core.core.tmtcc_globals_manager import update_global
update_global(GlobalIds.USE_ETHERNET, True)
from tmtc_core.core.tmtcc_globals_manager import get_global
from config.tmtcc_definitions import EthernetConfig
from tmtc_core.com_if.tmtcc_ethernet_utilities import determine_ip_addresses
ethernet_cfg_dict = get_global(GlobalIds.ETHERNET_CONFIG)
# This will either load the addresses from a JSON file or prompt them from the user.
send_addr, rcv_addr = determine_ip_addresses()
ethernet_cfg_dict.update({EthernetConfig.SEND_ADDRESS: send_addr})
ethernet_cfg_dict.update({EthernetConfig.RECV_ADDRESS: rcv_addr})
update_global(GlobalIds.ETHERNET_CONFIG, ethernet_cfg_dict)

View File

@ -1,45 +0,0 @@
"""
@brief This file transfers control of the object IDs to the user.
@details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs.
"""
import enum
from typing import Dict
class ObjectIds(enum.Enum):
from enum import auto
INVALID = auto()
PUS_SERVICE_17 = auto()
TEST_DEVICE = auto()
P60DOCK_HANDLER_ID = auto()
PDU1_HANDLER_ID = auto()
PDU2_HANDLER_ID = auto()
PCDU_HANDLER = auto()
ACU_HANDLER_ID = auto()
TMP1075_1_HANDLER_ID = auto()
TMP1075_2_HANDLER_ID = auto()
HEATER = auto()
SOLAR_ARRAY_DEPLOYMENT = auto()
SYRLINKS_HK_HANDLER = auto()
def set_object_ids(object_id_dict: Dict[ObjectIds, bytearray]):
o_ids = ObjectIds
object_id_dict.update(
{o_ids.PUS_SERVICE_17: bytearray([0x53, 0x00, 0x00, 0x17]),
o_ids.TEST_DEVICE: bytearray([0x44, 0x00, 0xAF, 0xFE]),
o_ids.P60DOCK_HANDLER_ID: bytearray([0x44, 0x00, 0x00, 0x1]),
o_ids.PDU1_HANDLER_ID: bytearray([0x44, 0x00, 0x00, 0x2]),
o_ids.PDU2_HANDLER_ID: bytearray([0x44, 0x00, 0x00, 0x3]),
o_ids.ACU_HANDLER_ID: bytearray([0x44, 0x00, 0x00, 0x4]),
o_ids.TMP1075_1_HANDLER_ID: bytearray([0x44, 0x00, 0x00, 0x5]),
o_ids.TMP1075_2_HANDLER_ID: bytearray([0x44, 0x00, 0x00, 0x6]),
o_ids.HEATER: bytearray([0x54, 0x00, 0x00, 0x1]),
o_ids.PCDU_HANDLER: bytearray([0x44, 0x00, 0x10, 0x00]),
o_ids.SOLAR_ARRAY_DEPLOYMENT: bytearray([0x44, 0x00, 0x10, 0x01]),
o_ids.SYRLINKS_HK_HANDLER: bytearray([0x44, 0x00, 0x00, 0x9]),
o_ids.INVALID: bytearray([0xFF, 0xFF, 0xFF, 0xFF]),
}
)

View File

@ -1,20 +0,0 @@
"""
@brief This file transfers control of custom mode handling to the user.
@details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs.
"""
import sys
from config.tmtcc_definitions import ModeList
from tmtc_core.core.tmtcc_backend import TmTcHandler
from tmtc_core.utility.tmtcc_logger import get_logger
LOGGER = get_logger()
def perform_mode_operation_user(tmtc_backend: TmTcHandler, mode: ModeList):
"""
Custom modes can be implemented here
"""
LOGGER.error(f"Unknown mode {mode}, Configuration error !")
sys.exit()

View File

@ -1,8 +0,0 @@
"""
@brief This file transfers control of versioning to the user.
@details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs.
"""
SW_NAME = "eive"
SW_VERSION = 1
SW_SUBVERSION = 1

4
config/version.py Normal file
View File

@ -0,0 +1,4 @@
SW_NAME = "eive"
VERSION_MAJOR = 1
VERSION_MINOR = 2
VERSION_SUBMINOR = 0

View File

@ -6,13 +6,13 @@
@date 21.12.2020
"""
from tmtc_core.pus_tc.tmtcc_pus_tc_packer import TcQueueT
from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand
from tmtc_core.core.tmtc_core_definitions import QueueCommands
from tmtccmd.pus_tc.packer import TcQueueT
from tmtccmd.pus_tc.base import PusTelecommand
from tmtccmd.core.definitions import QueueCommands
from gomspace.gomspace_common import *
from pus_tc.tmtcc_tc_p60dock import P60DockConfigTable
from config.tmtcc_object_ids import ObjectIds
from tmtc_core.core.tmtcc_object_id_manager import get_object_id
from pus_tc.p60dock import P60DockConfigTable
from config.object_ids import ObjIdIds
from tmtccmd.core.object_id_manager import get_object_id
class ACUTestProcedure:
@ -57,7 +57,7 @@ def pack_acu_test_into(object_id: bytearray, tc_queue: TcQueueT) -> TcQueueT:
tc_queue.appendleft((QueueCommands.PRINT, "Testing ACU"))
tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Enabling ACU connected to X1 slot (channel 0)"))
p60dock_object_id = get_object_id(ObjectIds.P60DOCK_HANDLER_ID)
p60dock_object_id = get_object_id(ObjIdIds.P60DOCK_HANDLER_ID)
command = pack_set_param_command(p60dock_object_id, P60DockConfigTable.out_en_0.parameter_address,
P60DockConfigTable.out_en_0.parameter_size, Channel.on)
command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command)

View File

@ -5,9 +5,9 @@
@author J. Meier
@date 30.01.2021
"""
from tmtc_core.core.tmtc_core_definitions import QueueCommands
from tmtc_core.pus_tc.tmtcc_pus_tc_packer import TcQueueT
from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand
from tmtccmd.core.definitions import QueueCommands
from tmtccmd.pus_tc.packer import TcQueueT
from tmtccmd.pus_tc.base import PusTelecommand
class SwitchNumbers:
@ -26,7 +26,7 @@ class ActionIds:
SWITCH_HEATER = bytearray([0x0, 0x0, 0x0, 0x0])
def pack_heater_test_into(object_id: bytearray, tc_queue: TcQueueT) -> TcQueueT:
def pack_heater_test_into(object_id: bytearray, tc_queue: TcQueueT):
tc_queue.appendleft((QueueCommands.PRINT, "Testing Heater Switching"))
heater_number = int(input("Type number of heater to switch: "))

View File

@ -5,9 +5,9 @@
@author J. Meier
@date 13.12.2020
"""
from tmtc_core.core.tmtc_core_definitions import QueueCommands
from tmtc_core.pus_tc.tmtcc_pus_tc_packer import TcQueueT
from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand
from tmtccmd.core.definitions import QueueCommands
from tmtccmd.pus_tc.packer import TcQueueT
from tmtccmd.pus_tc.base import PusTelecommand
from gomspace.gomspace_common import *

View File

@ -5,11 +5,12 @@
@author J. Meier
@date 17.12.2020
"""
from tmtccmd.core.definitions import QueueCommands
from tmtc_core.pus_tc.tmtcc_pus_tc_packer import TcQueueT
from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand
from tmtccmd.pus_tc.packer import TcQueueT
from tmtccmd.pus_tc.base import PusTelecommand
from gomspace.gomspace_common import *
from pus_tc.tmtcc_tc_p60dock import P60DockConfigTable
from pus_tc.p60dock import P60DockConfigTable
from gomspace.gomspace_pdu_definitions import *
@ -25,24 +26,30 @@ class PDU1TestProcedure:
read_temperature = False
def pack_pdu1_test_into(pdu1_object_id: bytearray, p60dock_object_id: bytearray, tc_queue: TcQueueT) -> TcQueueT:
tc_queue.appendleft(("print", "Testing PDU1"))
def pack_pdu1_test_into(
pdu1_object_id: bytearray, p60dock_object_id: bytearray, tc_queue: TcQueueT
):
tc_queue.appendleft((QueueCommands.PRINT, "Testing PDU1"))
tc_queue.appendleft(("print", "P60 Dock: Enabling PDU1"))
command = pack_set_param_command(p60dock_object_id, P60DockConfigTable.out_en_1.parameter_address,
P60DockConfigTable.out_en_1.parameter_size, Channel.on)
tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Enabling PDU1"))
command = pack_set_param_command(
p60dock_object_id, P60DockConfigTable.out_en_1.parameter_address,
P60DockConfigTable.out_en_1.parameter_size, Channel.on
)
command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if PDU1TestProcedure.all or PDU1TestProcedure.ping:
tc_queue.appendleft(("print", "PDU1: Ping Test"))
tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Ping Test"))
ping_data = bytearray([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
command = pack_ping_command(pdu1_object_id, ping_data)
command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if PDU1TestProcedure.all or PDU1TestProcedure.read_temperature:
tc_queue.appendleft(("print", "PDU1: Testing temperature reading"))
command = pack_get_param_command(pdu1_object_id, TableIds.hk, PDUHkTable.temperature.parameter_address,
PDUHkTable.temperature.parameter_size)
tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Testing temperature reading"))
command = pack_get_param_command(
pdu1_object_id, TableIds.hk, PDUHkTable.temperature.parameter_address,
PDUHkTable.temperature.parameter_size
)
command = PusTelecommand(service=8, subservice=128, ssc=24, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())

View File

@ -5,12 +5,12 @@
@author J. Meier
@date 17.12.2020
"""
from tmtc_core.core.tmtc_core_definitions import QueueCommands
from tmtc_core.pus_tc.tmtcc_pus_tc_packer import TcQueueT
from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand
from tmtccmd.core.definitions import QueueCommands
from tmtccmd.pus_tc.packer import TcQueueT
from tmtccmd.pus_tc.base import PusTelecommand
from gomspace.gomspace_common import *
from gomspace.gomspace_pdu_definitions import *
from pus_tc.tmtcc_tc_p60dock import P60DockConfigTable
from pus_tc.p60dock import P60DockConfigTable
class PDU2TestProcedure:
@ -27,7 +27,7 @@ class PDU2TestProcedure:
channel_2_off = False # Reaction wheels 5V
read_temperature = True
read_channel_2_state = False # Reaction wheels 5V
read_cur_lu_lim_0 = False # OBC
read_cur_lu_lim_0 = False # OBC
channel_2_on = False # Reaction wheels 5V
invalid_table_id_test = False # Test to check if software properly handles invalid table ids
invalid_address_test = False # Test to check if software properly handles invalid addresses

View File

@ -5,49 +5,38 @@
@author R. Mueller
@date 02.05.2020
"""
from tmtc_core.core.tmtc_core_definitions import QueueCommands
from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand
from tmtc_core.pus_tc.tmtcc_pus_tc_packer import TcQueueT
from tmtc_core.core.tmtcc_object_id_manager import get_object_id
from config.tmtcc_object_ids import ObjectIds
import struct
from tmtccmd.core.definitions import QueueCommands
from tmtccmd.pus_tc.base import PusTelecommand
from tmtccmd.pus_tc.packer import TcQueueT
from tmtccmd.pus_tc.service_200_mode import pack_mode_data
from config.object_ids import TEST_DEVICE_ID
TEST_DEVICE_ID = get_object_id(ObjectIds.TEST_DEVICE)
TEST_DEVICE_OBJ_ID = TEST_DEVICE_ID
def pack_service200_test_into(tc_queue: TcQueueT) -> TcQueueT:
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200"))
# Object ID: Dummy Device
object_id = TEST_DEVICE_ID
obj_id = TEST_DEVICE_OBJ_ID
# Set On Mode
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode On"))
mode_data = pack_mode_data(object_id, 1, 0)
mode_data = pack_mode_data(obj_id, 1, 0)
command = PusTelecommand(service=200, subservice=1, ssc=2000, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
# Set Normal mode
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode Normal"))
mode_data = pack_mode_data(object_id, 2, 0)
mode_data = pack_mode_data(obj_id, 2, 0)
command = PusTelecommand(service=200, subservice=1, ssc=2010, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
# Set Raw Mode
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode Raw"))
mode_data = pack_mode_data(object_id, 3, 0)
mode_data = pack_mode_data(obj_id, 3, 0)
command = PusTelecommand(service=200, subservice=1, ssc=2020, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
# Set Off Mode
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode Off"))
mode_data = pack_mode_data(object_id, 0, 0)
mode_data = pack_mode_data(obj_id, 0, 0)
command = PusTelecommand(service=200, subservice=1, ssc=2030, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft((QueueCommands.EXPORT_LOG, "log/tmtc_log_service200.txt"))
return tc_queue
# Mode 0: Off, Mode 1: Mode On, Mode 2: Mode Normal, Mode 3: Mode Raw
def pack_mode_data(object_id: bytearray, mode_: int, submode_: int) -> bytearray:
# Normal mode
mode = struct.pack(">I", mode_)
# Submode default
submode = struct.pack('B', submode_)
mode_data = object_id + mode + submode
return mode_data

68
pus_tc/tc_packer_hook.py Normal file
View File

@ -0,0 +1,68 @@
"""
@brief This file transfers control of TC packing to the user
@details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs.
"""
import os
from collections import deque
from typing import Union
from tmtccmd.core.definitions import CoreServiceList
from tmtccmd.utility.tmtcc_logger import get_logger
from tmtccmd.pus_tc.base import TcQueueT
from tmtccmd.pus_tc.service_5_event import pack_generic_service5_test_into
from tmtccmd.pus_tc.service_17_test import pack_service17_ping_command
from tmtccmd.core.object_id_manager import get_object_id
from pus_tc.p60dock import pack_p60dock_test_into
from pus_tc.pdu2 import pack_pdu2_test_into
from pus_tc.pdu1 import pack_pdu1_test_into
from pus_tc.acu import pack_acu_test_into
from pus_tc.tmp1075 import pack_tmp1075_test_into
from pus_tc.heater import pack_heater_test_into
from config.definitions import CustomServiceList
from config.object_ids import ObjIdIds
LOGGER = get_logger()
def pack_service_queue_user(service: Union[str, int], op_code: str, service_queue: TcQueueT):
if service == CoreServiceList.SERVICE_5:
return pack_generic_service5_test_into(service_queue)
if service == CoreServiceList.SERVICE_17:
return service_queue.appendleft(pack_service17_ping_command(ssc=1700).pack_command_tuple())
if service == CustomServiceList.P60DOCK.value:
object_id = get_object_id(ObjIdIds.P60DOCK_HANDLER_ID)
return pack_p60dock_test_into(object_id, service_queue)
if service == CustomServiceList.PDU1.value:
pdu1_object_id = get_object_id(ObjIdIds.PDU1_HANDLER_ID)
p60dock_object_id = get_object_id(ObjIdIds.P60DOCK_HANDLER_ID)
return pack_pdu1_test_into(pdu1_object_id, p60dock_object_id, service_queue)
if service == CustomServiceList.PDU2.value:
pdu2_object_id = get_object_id(ObjIdIds.PDU2_HANDLER_ID)
p60dock_object_id = get_object_id(ObjIdIds.P60DOCK_HANDLER_ID)
return pack_pdu2_test_into(pdu2_object_id, p60dock_object_id, service_queue)
if service == CustomServiceList.ACU.value:
object_id = get_object_id(ObjIdIds.ACU_HANDLER_ID)
return pack_acu_test_into(object_id, service_queue)
if service == CustomServiceList.TMP1075_1.value:
object_id = get_object_id(ObjIdIds.TMP1075_1_HANDLER_ID)
return pack_tmp1075_test_into(object_id, service_queue)
if service == CustomServiceList.TMP1075_2.value:
object_id = get_object_id(ObjIdIds.TMP1075_2_HANDLER_ID)
return pack_tmp1075_test_into(object_id, service_queue)
if service == CustomServiceList.HEATER.value:
object_id = get_object_id(ObjIdIds.HEATER)
return pack_heater_test_into(object_id, service_queue)
LOGGER.warning("Invalid Service !")
def create_total_tc_queue_user() -> TcQueueT:
if not os.path.exists("log"):
os.mkdir("log")
tc_queue = deque()
pack_generic_service5_test_into(tc_queue)
tc_queue.appendleft(pack_service17_ping_command(ssc=1700).pack_command_tuple())
return tc_queue

View File

@ -5,10 +5,11 @@
@author J. Meier
@date 06.01.2021
"""
from tmtccmd.core.definitions import QueueCommands
from tmtc_core.pus_tc.tmtcc_pus_tc_packer import TcQueueT
from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand
from pus_tc.tmtcc_tc_service200_mode import pack_mode_data
from tmtccmd.pus_tc.packer import TcQueueT
from tmtccmd.pus_tc.base import PusTelecommand
from pus_tc.service_200_mode import pack_mode_data
class Tmp1075TestProcedure:
@ -30,29 +31,31 @@ class Tmp1075ActionIds:
def pack_tmp1075_test_into(object_id: bytearray, tc_queue: TcQueueT) -> TcQueueT:
tc_queue.appendleft(("print", "Testing Tmp1075 Temperature Sensor Handler with object id: 0x" +
object_id.hex()))
tc_queue.appendleft(
(QueueCommands.PRINT,
"Testing Tmp1075 Temperature Sensor Handler with object id: 0x" + object_id.hex())
)
if Tmp1075TestProcedure.all or Tmp1075TestProcedure.start_adc_conversion:
tc_queue.appendleft(("print", "TMP1075: Starting new temperature conversion"))
tc_queue.appendleft((QueueCommands.PRINT, "TMP1075: Starting new temperature conversion"))
command = object_id + Tmp1075ActionIds.start_adc_conversion
command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if Tmp1075TestProcedure.all or Tmp1075TestProcedure.get_temp:
tc_queue.appendleft(("print", "TMP1075: Read temperature"))
tc_queue.appendleft((QueueCommands.PRINT, "TMP1075: Read temperature"))
command = object_id + Tmp1075ActionIds.get_temp
command = PusTelecommand(service=8, subservice=128, ssc=24, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if Tmp1075TestProcedure.set_mode_normal:
tc_queue.appendleft(("print", "TMP1075: Set Mode Normal"))
tc_queue.appendleft((QueueCommands.PRINT, "TMP1075: Set Mode Normal"))
mode_data = pack_mode_data(object_id, 2, 0)
command = PusTelecommand(service=200, subservice=1, ssc=220, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
if Tmp1075TestProcedure.set_mode_on:
tc_queue.appendleft(("print", "TMP1075: Set Mode On"))
tc_queue.appendleft((QueueCommands.PRINT, "TMP1075: Set Mode On"))
mode_data = pack_mode_data(object_id, 1, 0)
command = PusTelecommand(service=200, subservice=1, ssc=221, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())

View File

@ -1,72 +0,0 @@
"""
@brief This file transfers control of TC packing to the user
@details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs.
"""
import os
from collections import deque
from config.tmtcc_definitions import ServiceList
from tmtc_core.utility.tmtcc_logger import get_logger
from tmtc_core.pus_tc.tmtcc_pus_tc_base import TcQueueT
from tmtc_core.pus_tc.tmtcc_tc_service_5_event import pack_generic_service5_test_into
from tmtc_core.pus_tc.tmtcc_tc_service_17_test import pack_service17_ping_command
from pus_tc.tmtcc_tc_p60dock import pack_p60dock_test_into
from pus_tc.tmtcc_tc_pdu2 import pack_pdu2_test_into
from pus_tc.tmtcc_tc_pdu1 import pack_pdu1_test_into
from pus_tc.tmtcc_tc_acu import pack_acu_test_into
from tmtc_core.core.tmtcc_object_id_manager import get_object_id
from config.tmtcc_object_ids import ObjectIds
from pus_tc.tmtcc_tc_tmp1075 import pack_tmp1075_test_into
from pus_tc.tmtcc_tc_heater import pack_heater_test_into
from pus_tc.tmtcc_tc_solar_array_deployment import pack_solar_array_deployment_test_into
from pus_tc.tmtcc_tc_syrlinks_hk_handler import pack_syrlinks_hk_handler_test_into
LOGGER = get_logger()
def pack_service_queue_user(service: ServiceList, op_code: str, service_queue: TcQueueT):
if service == ServiceList.SERVICE_5:
return pack_generic_service5_test_into(service_queue)
if service == ServiceList.SERVICE_17:
return service_queue.appendleft(pack_service17_ping_command(ssc=1700).pack_command_tuple())
if service == ServiceList.P60DOCK:
object_id = get_object_id(ObjectIds.P60DOCK_HANDLER_ID)
return pack_p60dock_test_into(object_id, service_queue)
if service == ServiceList.PDU1:
pdu1_object_id = get_object_id(ObjectIds.PDU1_HANDLER_ID)
p60dock_object_id = get_object_id(ObjectIds.P60DOCK_HANDLER_ID)
return pack_pdu1_test_into(pdu1_object_id, p60dock_object_id, service_queue)
if service == ServiceList.PDU2:
pdu2_object_id = get_object_id(ObjectIds.PDU2_HANDLER_ID)
p60dock_object_id = get_object_id(ObjectIds.P60DOCK_HANDLER_ID)
return pack_pdu2_test_into(pdu2_object_id, p60dock_object_id, service_queue)
if service == ServiceList.ACU:
object_id = get_object_id(ObjectIds.ACU_HANDLER_ID)
return pack_acu_test_into(object_id, service_queue)
if service == ServiceList.TMP1075_1:
object_id = get_object_id(ObjectIds.TMP1075_1_HANDLER_ID)
return pack_tmp1075_test_into(object_id, service_queue)
if service == ServiceList.TMP1075_2:
object_id = get_object_id(ObjectIds.TMP1075_2_HANDLER_ID)
return pack_tmp1075_test_into(object_id, service_queue)
if service == ServiceList.HEATER:
object_id = get_object_id(ObjectIds.HEATER)
return pack_heater_test_into(object_id, service_queue)
if service == ServiceList.SA_DEPL:
object_id = get_object_id(ObjectIds.SOLAR_ARRAY_DEPLOYMENT)
return pack_solar_array_deployment_test_into(object_id, service_queue)
if service == ServiceList.SYRLINKS:
object_id = get_object_id(ObjectIds.SYRLINKS_HK_HANDLER)
return pack_syrlinks_hk_handler_test_into(object_id, service_queue)
LOGGER.warning("Invalid Service !")
def create_total_tc_queue_user() -> TcQueueT:
if not os.path.exists("log"):
os.mkdir("log")
tc_queue = deque()
pack_generic_service5_test_into(tc_queue)
tc_queue.appendleft(pack_service17_ping_command(ssc=1700).pack_command_tuple())
return tc_queue

View File

@ -3,14 +3,14 @@
@details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs.
"""
from tmtc_core.pus_tm.tmtcc_pus_service_3 import Service3TM
from tmtc_core.pus_tm.tmtcc_pus_service_8 import Service8TM
from tmtc_core.pus_tm.tmtcc_pus_tm_base import PusTelemetry
from tmtc_core.utility.tmtcc_logger import get_logger
from tmtccmd.pus_tm.service_8_functional_cmd import Service8TM
from tmtccmd.pus_tm.base import PusTelemetry
from tmtccmd.utility.tmtcc_logger import get_logger
from tmtc_core.pus_tm.tmtcc_tm_service1 import Service1TM
from tmtc_core.pus_tm.tmtcc_tm_service5 import Service5TM
from tmtc_core.pus_tm.tmtcc_tm_service17 import Service17TM
from tmtccmd.pus_tm.service_1_verification import Service1TM
from tmtccmd.pus_tm.service_3_base import Service3Base
from tmtccmd.pus_tm.service_5_event import Service5TM
from tmtccmd.pus_tm.service_17_test import Service17TM
LOGGER = get_logger()
@ -20,11 +20,11 @@ def tm_user_factory_hook(raw_tm_packet: bytearray) -> PusTelemetry:
if service_type == 1:
return Service1TM(raw_tm_packet)
if service_type == 3:
return Service3TM(raw_tm_packet)
return Service3Base(raw_tm_packet)
if service_type == 5:
return Service5TM(raw_tm_packet)
if service_type == 8:
service8tm = Service8TM(raw_tm_packet)
service_8_tm = Service8TM(raw_tm_packet)
return Service8TM(raw_tm_packet)
if service_type == 17:
return Service17TM(raw_tm_packet)

30
pus_tm/hk_handling.py Normal file
View File

@ -0,0 +1,30 @@
"""
@brief This file transfers control of housekeeping handling (PUS service 3) to the
developer
@details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs.
"""
from typing import Tuple
from tmtccmd.pus_tm.service_3_base import Service3Base
from tmtccmd.utility.tmtcc_logger import get_logger
LOGGER = get_logger()
def handle_user_hk_packet(object_id: int, set_id: int, hk_data: bytearray,
service3_packet: Service3Base) -> Tuple[list, list, bytearray, int]:
"""
This function is called when a Service 3 Housekeeping packet is received.
@param object_id:
@param set_id:
@param hk_data:
@param service3_packet:
@return: Expects a tuple, consisting of two lists, a bytearray and an integer
The first list contains the header columns, the second list the list with
the corresponding values. The bytearray is the validity buffer, which is usually appended
at the end of the housekeeping packet. The last value is the number of parameters.
"""
LOGGER.info("Service3TM: Parsing for this SID has not been implemented.")
return [], [], bytearray(), 0

View File

@ -1,28 +1,21 @@
from typing import Tuple
from config.tmtcc_object_ids import ObjectIds
from config.object_ids import ObjIdIds
def user_analyze_service_8_data(
object_id: ObjectIds, action_id: int, custom_data: bytearray) -> Tuple[list, list]:
object_id: int, action_id: int, custom_data: bytearray) -> Tuple[list, list]:
"""
This function is called by the TMTC core if a Service 8 data reply (subservice 130)
is received. The user can return a tuple of two lists, where the first list
is a list of header strings to print and the second list is a list of values to print.
The TMTC core will take care of printing both lists and logging them.
Please note that the object IDs should be compared by value because direct comparison of
enumerations does not work in Python. For example use:
if object_id.value == ObjectIds.TEST_OBJECT.value
to test equality based on the object ID list.
@param object_id:
@param action_id:
@param custom_data:
@return:
"""
if object_id.value == ObjectIds.PDU2_HANDLER_ID.value:
if object_id == ObjIdIds.PDU2_HANDLER_ID.value:
header_list = ['PDU2 Service 8 Reply']
data_string = str()

View File

@ -1,4 +1 @@
crcmod>=1.7
PyQt5>=5.15.1
PyQt5-stubs>=5.14.2.2
pyserial>=3.4
tmtccmd>=1.0.0

View File

@ -26,11 +26,14 @@ limitations under the License.
@author R. Mueller
"""
from tmtc_core.tmtcc_runner import run_tmtc_client
from tmtccmd.runner import initialize_tmtc_commander, run_tmtc_commander
from config.hook_implementations import EiveHookObject
def main():
run_tmtc_client(False)
hook_obj = EiveHookObject()
initialize_tmtc_commander(hook_object=hook_obj)
run_tmtc_commander(False)
if __name__ == "__main__":

View File

@ -26,11 +26,14 @@ limitations under the License.
@author R. Mueller
"""
from tmtc_core.tmtcc_runner import run_tmtc_client
from config.hook_implementations import EiveHookObject
from tmtccmd.runner import initialize_tmtc_commander, run_tmtc_commander
def main():
run_tmtc_client(True)
hook_obj = EiveHookObject()
initialize_tmtc_commander(hook_object=hook_obj)
run_tmtc_commander(True)
if __name__ == "__main__":

@ -1 +0,0 @@
Subproject commit dd623a5ef72c46cdb99334bc7fd166e45bb2dd56

1
tmtccmd Submodule

@ -0,0 +1 @@
Subproject commit b52f70179255695c47a4023f71745b4a67a9f286

View File

@ -1,8 +1,3 @@
"""
@brief This file transfers control of the command line argument parsing to the user.
@details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs.
"""
def parse_input_arguments_user(print_known_args: bool = False, print_unknown_args: bool = False):
@ -10,6 +5,6 @@ def parse_input_arguments_user(print_known_args: bool = False, print_unknown_arg
This function by default will build the default argument parser. A custom CLI parser can be
built in this function if required.
"""
from tmtc_core.utility.tmtcc_core_args_parser import parse_default_input_arguments
from tmtccmd.defaults.args_parser import parse_default_input_arguments
parse_default_input_arguments(print_known_args=print_known_args,
print_unknown_args=print_unknown_args)