added tmtc commander files

This commit is contained in:
Robin Müller 2021-06-11 12:53:35 +02:00
parent ba3b767f15
commit ab0a422292
No known key found for this signature in database
GPG Key ID: 71B58F8A3CDFA9AC
27 changed files with 864 additions and 0 deletions

8
tmtc/README.md Normal file
View File

@ -0,0 +1,8 @@
### How to use this folder
This folder contains template files to set up the TMTC commander
for a new mission or project. These files are the adaption
point to customize the TMTC commander.
To do so, simply copy all folder inside the TMTC commander root. This
step is also required because the TMTC commander core will load some modules.

0
tmtc/config/__init__.py Normal file
View File

View File

@ -0,0 +1 @@

View File

@ -0,0 +1,24 @@
"""
@brief This file transfers control of the custom definitions like modes to the user.
@details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs.
"""
import enum
from enum import auto
class CustomServiceList(enum.IntEnum):
pass
class SerialConfig(enum.Enum):
SERIAL_PORT = auto()
SERIAL_BAUD_RATE = auto()
SERIAL_TIMEOUT = auto()
SERIAL_COMM_TYPE = auto()
class EthernetConfig(enum.Enum):
SEND_ADDRESS = auto()
RECV_ADDRESS = auto()

View File

@ -0,0 +1,3 @@

View File

@ -0,0 +1,19 @@
"""
@brief This file transfers control of custom mode handling to the user.
@details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs.
"""
import sys
from tmtccmd.core.backend import TmTcHandler
from tmtccmd.utility.logger import get_logger
LOGGER = get_logger()
def perform_mode_operation_user(tmtc_backend: TmTcHandler, mode: int):
"""
Custom modes can be implemented here
"""
LOGGER.error(f"Unknown mode {mode}, Configuration error !")
sys.exit()

73
tmtc/config/hook_base.py Normal file
View File

@ -0,0 +1,73 @@
import argparse
from typing import Dict, Union, Tuple
from tmtccmd.config.definitions import ServiceOpCodeDictT
from tmtccmd.config.hook import TmTcHookBase
from tmtccmd.pus_tm.service_3_base import Service3Base
class FsfwHookBase(TmTcHookBase):
from tmtccmd.com_if.com_interface_base import CommunicationInterface
from tmtccmd.core.backend import TmTcHandler
from tmtccmd.pus_tc.definitions import TcQueueT
from tmtccmd.ecss.tm import PusTelemetry
from tmtccmd.utility.tmtc_printer import TmTcPrinter
def get_version(self) -> str:
from config.version import SW_NAME, SW_VERSION, SW_SUBVERSION, SW_SUBSUBVERSION
return f"{SW_NAME} {SW_VERSION}.{SW_SUBVERSION}.{SW_SUBSUBVERSION}"
def get_json_config_file_path(self) -> str:
return "config/tmtc_config.json"
def get_service_op_code_dictionary(self) -> ServiceOpCodeDictT:
from tmtccmd.config.globals import get_default_service_op_code_dict
return get_default_service_op_code_dict()
def add_globals_pre_args_parsing(self, gui: bool = False):
from tmtccmd.config.globals import set_default_globals_pre_args_parsing
set_default_globals_pre_args_parsing(gui=gui, apid=0xef)
def add_globals_post_args_parsing(self, args: argparse.Namespace):
from tmtccmd.config.globals import set_default_globals_post_args_parsing
set_default_globals_post_args_parsing(args=args, json_cfg_path=self.get_json_config_file_path())
def assign_communication_interface(self, com_if_key: str, tmtc_printer: TmTcPrinter) -> \
Union[CommunicationInterface, None]:
from tmtccmd.config.com_if import create_communication_interface_default
return create_communication_interface_default(
com_if_key=com_if_key, tmtc_printer=tmtc_printer, json_cfg_path=self.get_json_config_file_path()
)
def perform_mode_operation(self, tmtc_backend: TmTcHandler, mode: int):
print("No custom mode operation implemented")
def pack_service_queue(self, service: int, op_code: str, service_queue: TcQueueT):
from pus_tc.tc_packing import pack_service_queue_user
pack_service_queue_user(service=service, op_code=op_code, service_queue=service_queue)
def tm_user_factory_hook(self, raw_tm_packet: bytearray) -> Union[None, PusTelemetry]:
from pus_tm.factory_hook import tm_user_factory_hook
return tm_user_factory_hook(raw_tm_packet=raw_tm_packet)
def get_object_ids(self) -> Dict[bytes, list]:
from config.object_ids import get_object_ids
return get_object_ids()
@staticmethod
def handle_service_8_telemetry(
object_id: int, action_id: int, custom_data: bytearray
) -> Tuple[list, list]:
from pus_tm.service_8_handling import custom_service_8_handling
return custom_service_8_handling(
object_id=object_id, action_id=action_id, custom_data=custom_data
)
@staticmethod
def handle_service_3_housekeeping(
object_id: bytes, set_id: int, hk_data: bytearray, service3_packet: Service3Base
) -> Tuple[list, list, bytearray, int]:
from pus_tm.service_3_hk_handling import service_3_hk_handling
return service_3_hk_handling(
object_id=object_id, set_id=set_id, hk_data=hk_data, service3_packet=service3_packet
)

19
tmtc/config/object_ids.py Normal file
View File

@ -0,0 +1,19 @@
"""
@brief This file transfers control of the object IDs to the user.
@details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs.
"""
from typing import Dict
PUS_SERVICE_17_ID = bytes([0x53, 0x00, 0x00, 0x17])
TEST_DEVICE_0_ID = bytes([0x44, 0x01, 0xAF, 0xFE])
TEST_DEVICE_1_ID = bytes([0x44, 0x02, 0xAF, 0xFE])
def get_object_ids() -> Dict[bytes, list]:
object_id_dict = {
PUS_SERVICE_17_ID: ["PUS Service 17"],
TEST_DEVICE_0_ID: ["Test Device 0"],
TEST_DEVICE_1_ID: ["Test Device 1"]
}
return object_id_dict

View File

@ -0,0 +1,6 @@
{
"COM_IF_KEY": "udp",
"TCPIP_UDP_RECV_MAX_SIZE": 1500,
"TCPIP_UDP_DEST_IP_ADDRESS": "127.0.0.1",
"TCPIP_UDP_DEST_PORT": 7301
}

4
tmtc/config/version.py Normal file
View File

@ -0,0 +1,4 @@
SW_NAME = "fsfw-tmtc"
SW_VERSION = 1
SW_SUBVERSION = 2
SW_SUBSUBVERSION = 0

0
tmtc/log/tmtc_error.log Normal file
View File

BIN
tmtc/logo.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 13 KiB

0
tmtc/pus_tc/__init__.py Normal file
View File

View File

@ -0,0 +1,11 @@
import struct
# Commands
TEST_COMMAND_0 = struct.pack("!I", 1)
TEST_COMMAND_1 = struct.pack("!I", 2)
TEST_COMMAND_1_PARAM_1 = bytearray([0xBA, 0xB0])
TEST_COMMAND_1_PARAM_2 = bytearray([0x00, 0x00, 0x00, 0x52, 0x4F, 0x42, 0x49, 0x4E])
DUMMY_COMMAND_3 = bytearray([0xBA, 0xDE, 0xAF, 0xFE])

View File

@ -0,0 +1,52 @@
# -*- coding: utf-8 -*-
"""
@file tmtcc_tc_service_200_mode.py
@brief PUS Service 200: PUS custom service 200: Mode commanding
@author R. Mueller
@date 02.05.2020
"""
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.ecss.tc import PusTelecommand
from tmtccmd.pus_tc.packer import TcQueueT
from tmtccmd.pus_tc.service_200_mode import pack_mode_data
from config.object_ids import TEST_DEVICE_0_ID
def pack_service_200_commands_into(tc_queue: TcQueueT, op_code: str):
if op_code == "0":
pack_service_200_test_into(tc_queue=tc_queue, init_ssc=2000)
def pack_service_200_test_into(init_ssc: int, tc_queue: TcQueueT) -> int:
new_ssc = init_ssc
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200"))
# Object ID: DUMMY Device
object_id = TEST_DEVICE_0_ID
# Set On Mode
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode On"))
mode_data = pack_mode_data(object_id, 1, 0)
command = PusTelecommand(service=200, subservice=1, ssc=new_ssc, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
new_ssc += 1
# Set Normal mode
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode Normal"))
mode_data = pack_mode_data(object_id, 2, 0)
command = PusTelecommand(service=200, subservice=1, ssc=new_ssc, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
new_ssc += 1
# Set Raw Mode
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode Raw"))
mode_data = pack_mode_data(object_id, 3, 0)
command = PusTelecommand(service=200, subservice=1, ssc=new_ssc, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
new_ssc += 1
# Set Off Mode
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode Off"))
mode_data = pack_mode_data(object_id, 0, 0)
command = PusTelecommand(service=200, subservice=1, ssc=new_ssc, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
new_ssc += 1
tc_queue.appendleft((QueueCommands.EXPORT_LOG, "log/tmtc_log_service200.txt"))
return new_ssc

View File

@ -0,0 +1,93 @@
import struct
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.ecss.tc import PusTelecommand
from tmtccmd.pus_tc.definitions import TcQueueT
from tmtccmd.pus_tc.service_20_parameter import pack_type_and_matrix_data, \
pack_parameter_id
from tmtccmd.pus_tc.service_200_mode import pack_mode_data
from tmtccmd.utility.logger import get_logger
from config.object_ids import TEST_DEVICE_0_ID
LOGGER = get_logger()
def pack_service20_commands_into(tc_queue: TcQueueT, op_code: str):
if op_code == "0":
pack_service20_test_into(tc_queue=tc_queue)
def pack_service20_test_into(tc_queue: TcQueueT, called_externally: bool = False):
if called_externally is False:
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20"))
object_id = TEST_DEVICE_0_ID
# set mode normal
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20: Set Normal Mode"))
mode_data = pack_mode_data(object_id, 2, 0)
command = PusTelecommand(service=200, subservice=1, ssc=2000, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
load_param_0_simple_test_commands(tc_queue=tc_queue)
load_param_1_simple_test_commands(tc_queue=tc_queue)
load_param_2_simple_test_commands(tc_queue=tc_queue)
if called_externally is False:
tc_queue.appendleft((QueueCommands.EXPORT_LOG, "log/tmtc_log_service20.txt"))
def load_param_0_simple_test_commands(tc_queue: TcQueueT):
object_id = TEST_DEVICE_0_ID
parameter_id_0 = pack_parameter_id(domain_id=0, unique_id=0, linear_index=0)
# test checking Load for uint32_t
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20: Load uint32_t"))
type_and_matrix_data = pack_type_and_matrix_data(3, 14, 1, 1)
parameter_data = struct.pack("!I", 42)
payload = object_id + parameter_id_0 + type_and_matrix_data + parameter_data
command = PusTelecommand(service=20, subservice=128, ssc=2010, app_data=payload)
tc_queue.appendleft(command.pack_command_tuple())
# test checking Dump for uint32_t
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20: Dump uint32_t"))
payload = object_id + parameter_id_0
command = PusTelecommand(service=20, subservice=129, ssc=2020, app_data=payload)
tc_queue.appendleft(command.pack_command_tuple())
def load_param_1_simple_test_commands(tc_queue: TcQueueT):
object_id = TEST_DEVICE_0_ID
parameter_id_1 = pack_parameter_id(domain_id=0, unique_id=1, linear_index=0)
# test checking Load for int32_t
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20: Load int32_t"))
type_and_matrix_data = pack_type_and_matrix_data(4, 14, 1, 1)
parameter_data = struct.pack("!i", -42)
payload = object_id + parameter_id_1 + type_and_matrix_data + parameter_data
command = PusTelecommand(service=20, subservice=128, ssc=2030, app_data=payload)
tc_queue.appendleft(command.pack_command_tuple())
# test checking Dump for int32_t
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20: Dump int32_t"))
payload = object_id + parameter_id_1
command = PusTelecommand(service=20, subservice=129, ssc=2040, app_data=payload)
tc_queue.appendleft(command.pack_command_tuple())
def load_param_2_simple_test_commands(tc_queue: TcQueueT):
object_id = TEST_DEVICE_0_ID
parameter_id_2 = pack_parameter_id(domain_id=0, unique_id=2, linear_index=0)
# test checking Load for float
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20: Load float"))
type_and_matrix_data = pack_type_and_matrix_data(ptc=5, pfc=1, rows=1, columns=3)
parameter_data = struct.pack("!fff", 4.2, -4.2, 49)
payload = object_id + parameter_id_2 + type_and_matrix_data + parameter_data
command = PusTelecommand(service=20, subservice=128, ssc=2050, app_data=payload)
tc_queue.appendleft(command.pack_command_tuple())
# test checking Dump for float
# Skip dump for now, still not properly implemented
# tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20: Dump float"))
# payload = object_id + parameter_id_2
# command = PusTelecommand(service=20, subservice=129, ssc=2060, app_data=payload)
# tc_queue.appendleft(command.pack_command_tuple())

View File

@ -0,0 +1,77 @@
# -*- coding: utf-8 -*-
"""
@file tmtcc_tc_service_2_raw_cmd.py
@brief PUS Service 2: Device Access, native low-level commanding
@author R. Mueller
@date 01.11.2019
"""
import struct
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.ecss.tc import PusTelecommand
from tmtccmd.pus_tc.definitions import TcQueueT
from pus_tc.service_200_mode import pack_mode_data
import pus_tc.command_data as cmd_data
from config.object_ids import TEST_DEVICE_0_ID
def pack_service_2_commands_into(tc_queue: TcQueueT, op_code: str):
if op_code == "0":
pack_generic_service_2_test_into(0, tc_queue)
else:
print(f"pack_service_2_test: Operation code {op_code} unknown!")
def pack_generic_service_2_test_into(init_ssc: int, tc_queue: TcQueueT) -> int:
new_ssc = init_ssc
object_id = TEST_DEVICE_0_ID # dummy device
# Set Raw Mode
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 2: Setting Raw Mode"))
mode_data = pack_mode_data(object_id, 3, 0)
command = PusTelecommand(service=200, subservice=1, ssc=new_ssc, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
new_ssc += 1
# toggle wiretapping raw
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 2: Toggling Wiretapping Raw"))
wiretapping_toggle_data = pack_wiretapping_mode(object_id, 1)
toggle_wiretapping_on_command = PusTelecommand(
service=2, subservice=129, ssc=new_ssc, app_data=wiretapping_toggle_data
)
tc_queue.appendleft(toggle_wiretapping_on_command.pack_command_tuple())
new_ssc += 1
# send raw command, wiretapping should be returned via TM[2,130] and TC[2,131]
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 2: Sending Raw Command"))
raw_command = cmd_data.TEST_COMMAND_0
raw_data = object_id + raw_command
raw_command = PusTelecommand(service=2, subservice=128, ssc=new_ssc, app_data=raw_data)
tc_queue.appendleft(raw_command.pack_command_tuple())
new_ssc += 1
# toggle wiretapping off
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 2: Toggle Wiretapping Off"))
wiretapping_toggle_data = pack_wiretapping_mode(object_id, 0)
toggle_wiretapping_off_command = PusTelecommand(service=2, subservice=129, ssc=new_ssc,
app_data=wiretapping_toggle_data)
tc_queue.appendleft(toggle_wiretapping_off_command.pack_command_tuple())
new_ssc += 1
# send raw command which should be returned via TM[2,130]
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 2: Send second raw command"))
command = PusTelecommand(service=2, subservice=128, ssc=new_ssc, app_data=raw_data)
tc_queue.appendleft(command.pack_command_tuple())
new_ssc += 1
# Set mode off
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 2: Setting Off Mode"))
mode_data = pack_mode_data(object_id, 0, 0)
command = PusTelecommand(service=200, subservice=1, ssc=new_ssc, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
new_ssc += 1
tc_queue.appendleft((QueueCommands.EXPORT_LOG, "log/tmtc_log_service_2.txt"))
return new_ssc
# wiretappingMode = 0: MODE_OFF, wiretappingMode = 1: MODE_RAW
def pack_wiretapping_mode(object_id, wiretapping_mode_):
wiretapping_mode = struct.pack(">B", wiretapping_mode_) # MODE_OFF : 0x00, MODE_RAW: 0x01
wiretapping_toggle_data = object_id + wiretapping_mode
return wiretapping_toggle_data

View File

@ -0,0 +1,154 @@
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.pus_tc.service_200_mode import pack_mode_data
from tmtccmd.pus_tc.service_20_parameter import pack_boolean_parameter_command
from tmtccmd.pus_tc.service_3_housekeeping import make_sid, generate_one_hk_command, \
Srv3Subservice
from tmtccmd.ecss.tc import PusTelecommand
from tmtccmd.pus_tc.definitions import TcQueueT
from tmtccmd.pus_tc.service_8_functional_cmd import generate_action_command
from config.object_ids import TEST_DEVICE_0_ID, TEST_DEVICE_1_ID
# Set IDs
TEST_SET_ID = 0
# Action IDs
TEST_NOTIFICATION_ACTION_ID = 3
# Parameters
PARAM_ACTIVATE_CHANGING_DATASETS = 4
def pack_service_3_commands_into(tc_queue: TcQueueT, op_code: str):
current_ssc = 3000
# TODO: Import this from config instead
device_idx = 0
if device_idx == 0:
object_id = TEST_DEVICE_0_ID
else:
object_id = TEST_DEVICE_1_ID
if op_code == "0":
# This will pack all the tests
pack_service_3_test_info(tc_queue=tc_queue, init_ssc=current_ssc, object_id=object_id,
device_idx=device_idx)
elif op_code == "1":
# Extremely simple, generate one HK packet
pack_gen_one_hk_command(tc_queue=tc_queue, device_idx=device_idx, init_ssc=current_ssc,
object_id=object_id)
elif op_code == "2":
# Housekeeping basic test
pack_housekeeping_basic_test(tc_queue=tc_queue, object_id=object_id, init_ssc=current_ssc)
elif op_code == "3":
# Notification demo
pack_notification_basic_test(tc_queue=tc_queue, object_id=object_id, init_ssc=current_ssc)
def pack_service_3_test_info(tc_queue: TcQueueT, device_idx: int, object_id: bytearray,
init_ssc: int):
tc_queue.appendleft((QueueCommands.PRINT, "Service 3 (Housekeeping Service): All tests"))
current_ssc = init_ssc
current_ssc += pack_gen_one_hk_command(
tc_queue=tc_queue, device_idx=device_idx, object_id=object_id, init_ssc=current_ssc
)
current_ssc += pack_housekeeping_basic_test(
tc_queue=tc_queue, object_id=object_id, init_ssc=current_ssc
)
current_ssc += pack_notification_basic_test(
tc_queue=tc_queue, object_id=object_id, init_ssc=current_ssc, enable_normal_mode=False
)
def pack_gen_one_hk_command(
tc_queue: TcQueueT, device_idx: int, init_ssc: int, object_id: bytearray
) -> int:
test_sid = make_sid(object_id=object_id, set_id=TEST_SET_ID)
tc_queue.appendleft(
(QueueCommands.PRINT, f"Service 3 Test: Generate one test set packet for "
f"test device {device_idx}")
)
command = generate_one_hk_command(ssc=init_ssc, sid=test_sid)
init_ssc += 1
tc_queue.appendleft(command.pack_command_tuple())
return init_ssc
def pack_housekeeping_basic_test(
tc_queue: TcQueueT, object_id: bytearray, init_ssc: int, enable_normal_mode: bool = True
) -> int:
"""
This basic test will request one HK packet, then it will enable periodic packets and listen
to the periodic packets for a few seconds. After that, HK packets will be disabled again.
"""
test_sid = make_sid(object_id=object_id, set_id=TEST_SET_ID)
current_ssc = init_ssc
# Enable changing datasets via parameter service (Service 20)
tc_queue.appendleft((QueueCommands.PRINT, "Service 3 Test: Performing basic HK tests"))
if enable_normal_mode:
# Set mode normal so that sets are changed/read regularly
tc_queue.appendleft((QueueCommands.PRINT, "Service 3 Test: Set Normal Mode"))
mode_data = pack_mode_data(object_id, 2, 0)
command = PusTelecommand(service=200, subservice=1, ssc=current_ssc, app_data=mode_data)
current_ssc += 1
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft((QueueCommands.PRINT, "Enabling changing datasets"))
command = pack_boolean_parameter_command(
object_id=object_id, domain_id=0, unique_id=PARAM_ACTIVATE_CHANGING_DATASETS,
parameter=True, ssc=current_ssc
)
current_ssc += 1
tc_queue.appendleft(command.pack_command_tuple())
# Enable periodic reporting
tc_queue.appendleft((QueueCommands.PRINT,
"Enabling periodic thermal sensor packet generation: "))
command = PusTelecommand(service=3, subservice=Srv3Subservice.ENABLE_PERIODIC_HK_GEN.value,
ssc=current_ssc, app_data=test_sid)
current_ssc += 1
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft((QueueCommands.WAIT, 2.0))
# Disable periodic reporting
tc_queue.appendleft((QueueCommands.PRINT,
"Disabling periodic thermal sensor packet generation: "))
command = PusTelecommand(service=3, subservice=Srv3Subservice.DISABLE_PERIODIC_HK_GEN.value,
ssc=current_ssc, app_data=test_sid)
current_ssc += 1
tc_queue.appendleft(command.pack_command_tuple())
# Disable changing datasets via parameter service (Service 20)
tc_queue.appendleft((QueueCommands.PRINT, "Disabling changing datasets"))
command = pack_boolean_parameter_command(
object_id=object_id, domain_id=0, unique_id=PARAM_ACTIVATE_CHANGING_DATASETS,
parameter=False, ssc=current_ssc
)
current_ssc += 1
tc_queue.appendleft(command.pack_command_tuple())
return current_ssc
def pack_notification_basic_test(tc_queue: TcQueueT, object_id: bytearray, init_ssc: int,
enable_normal_mode: bool = True) -> int:
current_ssc = init_ssc
tc_queue.appendleft((QueueCommands.PRINT, "Service 3 Test: Performing notification tests"))
if enable_normal_mode:
# Set mode normal so that sets are changed/read regularly
tc_queue.appendleft((QueueCommands.PRINT, "Service 3 Test: Set Normal Mode"))
mode_data = pack_mode_data(object_id, 2, 0)
command = PusTelecommand(service=200, subservice=1, ssc=current_ssc, app_data=mode_data)
current_ssc += 1
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft((QueueCommands.PRINT, "Triggering notification"))
command = generate_action_command(
object_id=object_id, action_id=TEST_NOTIFICATION_ACTION_ID, ssc=current_ssc
)
tc_queue.appendleft(command.pack_command_tuple())
current_ssc += 1
return current_ssc

View File

@ -0,0 +1,56 @@
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.ecss.tc import PusTelecommand
from tmtccmd.pus_tc.definitions import TcQueueT
import pus_tc.command_data as cmd_data
from pus_tc.service_200_mode import pack_mode_data
from config.object_ids import TEST_DEVICE_0_ID
def pack_service_8_commands_into(tc_queue: TcQueueT, op_code: str):
if op_code == "0":
pack_generic_service_8_test_into(tc_queue=tc_queue)
else:
print(f"pack_service_8_test: Operation code {op_code} unknown!")
def pack_generic_service_8_test_into(tc_queue: TcQueueT):
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 8"))
object_id = TEST_DEVICE_0_ID
# set mode on
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 8: Set On Mode"))
mode_data = pack_mode_data(object_id, 1, 0)
command = PusTelecommand(service=200, subservice=1, ssc=800, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
# set mode normal
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 8: Set Normal Mode"))
mode_data = pack_mode_data(object_id, 2, 0)
command = PusTelecommand(service=200, subservice=1, ssc=810, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
# Direct command which triggers completion reply
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 8: Trigger Step and Completion Reply"))
action_id = cmd_data.TEST_COMMAND_0
direct_command = object_id + action_id
command = PusTelecommand(service=8, subservice=128, ssc=820, app_data=direct_command)
tc_queue.appendleft(command.pack_command_tuple())
# Direct command which triggers _tm_data reply
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 8: Trigger Data Reply"))
action_id = cmd_data.TEST_COMMAND_1
command_param1 = cmd_data.TEST_COMMAND_1_PARAM_1
command_param2 = cmd_data.TEST_COMMAND_1_PARAM_2
direct_command = object_id + action_id + command_param1 + command_param2
command = PusTelecommand(service=8, subservice=128, ssc=830, app_data=direct_command)
tc_queue.appendleft(command.pack_command_tuple())
# Set mode off
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 8: Set Off Mode"))
mode_data = pack_mode_data(object_id, 0, 0)
command = PusTelecommand(service=200, subservice=1, ssc=800, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft((QueueCommands.EXPORT_LOG, "log/tmtc_log_service_8.txt"))

49
tmtc/pus_tc/tc_packing.py Normal file
View File

@ -0,0 +1,49 @@
"""
@brief This file transfers control of TC packing to the user
@details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs.
"""
import os
from collections import deque
from typing import Union
from pus_tc.service_20_parameters import pack_service20_commands_into
from pus_tc.service_2_raw_cmd import pack_service_2_commands_into
from pus_tc.service_3_housekeeping import pack_service_3_commands_into
from pus_tc.service_8_func_cmd import pack_service_8_commands_into
from tmtccmd.utility.logger import get_logger
from tmtccmd.pus_tc.definitions import TcQueueT
from tmtccmd.config.definitions import CoreServiceList
from tmtccmd.pus_tc.service_5_event import pack_generic_service5_test_into
from tmtccmd.pus_tc.service_17_test import pack_generic_service17_test
from pus_tc.service_200_mode import pack_service_200_commands_into
LOGGER = get_logger()
def pack_service_queue_user(service: Union[str, int], op_code: str, service_queue: TcQueueT):
if service == CoreServiceList.SERVICE_2.value:
return pack_service_2_commands_into(op_code=op_code, tc_queue=service_queue)
if service == CoreServiceList.SERVICE_3.value:
return pack_service_3_commands_into(op_code=op_code, tc_queue=service_queue)
if service == CoreServiceList.SERVICE_5.value:
return pack_generic_service5_test_into(tc_queue=service_queue)
if service == CoreServiceList.SERVICE_8.value:
return pack_service_8_commands_into(op_code=op_code, tc_queue=service_queue)
if service == CoreServiceList.SERVICE_17.value:
return pack_generic_service17_test(init_ssc=1700, tc_queue=service_queue)
if service == CoreServiceList.SERVICE_20.value:
return pack_service20_commands_into(tc_queue=service_queue, op_code=op_code)
if service == CoreServiceList.SERVICE_200.value:
return pack_service_200_commands_into(tc_queue=service_queue, op_code=op_code)
LOGGER.warning("Invalid Service !")
def create_total_tc_queue_user() -> TcQueueT:
if not os.path.exists("log"):
os.mkdir("log")
tc_queue = deque()
pack_service_2_commands_into(op_code="0", tc_queue=tc_queue)
pack_generic_service17_test(init_ssc=1700, tc_queue=tc_queue)
return tc_queue

0
tmtc/pus_tm/__init__.py Normal file
View File

View File

@ -0,0 +1,40 @@
"""
@brief This file transfers control of TM parsing to the user
@details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs.
"""
from tmtccmd.ecss.tm import PusTelemetry
from tmtccmd.utility.logger import get_logger
from tmtccmd.pus_tm.service_1_verification import Service1TM
from tmtccmd.pus_tm.service_2_raw_cmd import Service2TM
from tmtccmd.pus_tm.service_3_housekeeping import Service3TM
from tmtccmd.pus_tm.service_5_event import Service5TM
from tmtccmd.pus_tm.service_8_functional_cmd import Service8TM
from tmtccmd.pus_tm.service_17_test import Service17TM
from tmtccmd.pus_tm.service_20_parameters import Service20TM
from tmtccmd.pus_tm.service_200_mode import Service200TM
LOGGER = get_logger()
def tm_user_factory_hook(raw_tm_packet: bytearray) -> PusTelemetry:
service_type = raw_tm_packet[7]
if service_type == 1:
return Service1TM(raw_tm_packet)
if service_type == 2:
return Service2TM(raw_tm_packet)
if service_type == 3:
return Service3TM(raw_tm_packet)
if service_type == 8:
return Service8TM(raw_tm_packet)
if service_type == 5:
return Service5TM(raw_tm_packet)
if service_type == 17:
return Service17TM(raw_tm_packet)
if service_type == 20:
return Service20TM(raw_tm_packet)
if service_type == 200:
return Service200TM(raw_tm_packet)
LOGGER.info("The service " + str(service_type) + " is not implemented in Telemetry Factory")
return PusTelemetry(raw_tm_packet)

View File

@ -0,0 +1,70 @@
"""
@brief This file transfers control of housekeeping handling (PUS service 3) to the
developer
@details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs.
"""
import struct
from typing import Tuple
from tmtccmd.pus_tm.service_3_housekeeping import Service3Base
from tmtccmd.utility.logger import get_logger
from config.object_ids import TEST_DEVICE_0_ID, TEST_DEVICE_1_ID
LOGGER = get_logger()
def service_3_hk_handling(
object_id: bytes, set_id: int, hk_data: bytearray, service3_packet: Service3Base
) -> Tuple[list, list, bytearray, int]:
"""
This function is called when a Service 3 Housekeeping packet is received.
Please note that the object IDs should be compared by value because direct comparison of
enumerations does not work in Python. For example use:
if object_id.value == ObjectIds.TEST_OBJECT.value
to test equality based on the object ID list.
@param object_id:
@param set_id:
@param hk_data:
@param service3_packet:
@return: Expects a tuple, consisting of two lists, a bytearray and an integer
The first list contains the header columns, the second list the list with
the corresponding values. The bytearray is the validity buffer, which is usually appended
at the end of the housekeeping packet. The last value is the number of parameters.
"""
if object_id == TEST_DEVICE_0_ID or object_id == TEST_DEVICE_1_ID:
return handle_test_set_deserialization(hk_data=hk_data)
else:
LOGGER.info("Service3TM: Parsing for this SID has not been implemented.")
return [], [], bytearray(), 0
def handle_test_set_deserialization(hk_data: bytearray) -> Tuple[list, list, bytearray, int]:
header_list = []
content_list = []
validity_buffer = bytearray()
# uint8 (1) + uint32_t (4) + float vector with 3 entries (12) + validity buffer (1)
if len(hk_data) < 18:
LOGGER.warning("Invalid HK data format for test set reply!")
return header_list, content_list, validity_buffer, 0
uint8_value = struct.unpack('!B', hk_data[0:1])[0]
uint32_value = struct.unpack('!I', hk_data[1:5])[0]
float_value_1 = struct.unpack('!f', hk_data[5:9])[0]
float_value_2 = struct.unpack('!f', hk_data[9:13])[0]
float_value_3 = struct.unpack('!f', hk_data[13:17])[0]
validity_buffer.append(hk_data[17])
header_list.append("uint8 value")
header_list.append("uint32 value")
header_list.append("float vec value 1")
header_list.append("float vec value 2")
header_list.append("float vec value 3")
content_list.append(uint8_value)
content_list.append(uint32_value)
content_list.append(float_value_1)
content_list.append(float_value_2)
content_list.append(float_value_3)
return header_list, content_list, validity_buffer, 3

View File

@ -0,0 +1,19 @@
from typing import Tuple
def custom_service_8_handling(
object_id: int, action_id: int, custom_data: bytearray) -> Tuple[list, list]:
"""
This function is called by the TMTC core if a Service 8 data reply (subservice 130)
is received. The user can return a tuple of two lists, where the first list
is a list of header strings to print and the second list is a list of values to print.
The TMTC core will take care of printing both lists and logging them.
@param object_id:
@param action_id:
@param custom_data:
@return:
"""
header_list = []
content_list = []
return header_list, content_list

46
tmtc/tmtc_client_cli.py Normal file
View File

@ -0,0 +1,46 @@
#!/usr/bin/env python3
"""
@brief TMTC Commander entry point for command line mode.
@details
This client was developed by KSat for the SOURCE project to test the on-board software but
has evolved into a more generic tool for satellite developers to perform TMTC (Telemetry and Telecommand)
handling and testing via different communication interfaces. Currently, only the PUS standard is
implemented as a packet standard.
Run this file with the -h flag to display options.
@license
Copyright 2020 KSat e.V. Stuttgart
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
@author R. Mueller
"""
from config.hook_base import FsfwHookBase
try:
from tmtccmd.runner import run_tmtc_commander, initialize_tmtc_commander
except ImportError:
run_tmtc_commander = None
initialize_tmtc_commander = None
print("Python tmtccmd submodule not installed")
print("Install with \"cd tmtccmd && python3 -m pip install -e .\" for interactive installation")
def main():
hook_obj = FsfwHookBase()
initialize_tmtc_commander(hook_object=hook_obj)
run_tmtc_commander(use_gui=False, app_name="TMTC Commander FSFW")
if __name__ == "__main__":
main()

40
tmtc/tmtc_client_gui.py Normal file
View File

@ -0,0 +1,40 @@
#!/usr/bin/env python3
"""
@brief TMTC Commander entry point for GUI mode.
@details
This client was developed by KSat for the SOURCE project to test the on-board software but
has evolved into a more generic tool for satellite developers to perform TMTC (Telemetry and Telecommand)
handling and testing via different communication interfaces. Currently, only the PUS standard is
implemented as a packet standard.
Run this file with the -h flag to display options.
@license
Copyright 2020 KSat e.V. Stuttgart
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
@author R. Mueller
"""
from config.hook_base import FsfwHookBase
from tmtccmd.runner import run_tmtc_commander, initialize_tmtc_commander
def main():
hook_obj = FsfwHookBase()
initialize_tmtc_commander(hook_object=hook_obj)
run_tmtc_commander(use_gui=True, app_name="TMTC Commander FSFW")
if __name__ == "__main__":
main()

0
tmtc/utility/__init__.py Normal file
View File