Compare commits

...

11 Commits

23 changed files with 878 additions and 293 deletions

View File

@@ -0,0 +1,6 @@
SW_NAME = "fsfw-tmtc"
SW_VERSION = 1
SW_SUBVERSION = 4
SW_SUBSUBVERSION = 0
__version__ = "1.4.0"

View File

@@ -1 +1,2 @@
PUS_APID = 0xef
PUS_APID = 0xEF
TM_SP_IDS = ((0x08 << 8) | PUS_APID,)

View File

@@ -1,73 +1,52 @@
import argparse
from typing import Dict, Tuple, Optional
from typing import Optional, Union
from tmtccmd.com_if.com_interface_base import CommunicationInterface
from tmtccmd.config.definitions import ServiceOpCodeDictT
from tmtccmd.config.hook import TmTcHookBase
from tmtccmd.core.backend import TmTcHandler
from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.tm.service_3_base import Service3Base
from tmtccmd.ecss.conf import PusVersion
from tmtccmd.utility.tmtc_printer import TmTcPrinter
from tmtccmd.utility.obj_id import ObjectIdDictT
from common_tmtc.config.definitions import PUS_APID
from tmtccmd.utility.retval import RetvalDictT
from common_tmtc.config.definitions import TM_SP_IDS
from common_tmtc.pus_tc.cmd_definitions import common_fsfw_service_op_code_dict
class FsfwHookBase(TmTcHookBase):
class CommonFsfwHookBase(TmTcHookBase):
def pack_service_queue(
self, service: Union[int, str], op_code: str, service_queue: TcQueueT
):
from common_tmtc.pus_tc.tc_packing import common_service_queue_user
def get_json_config_file_path(self) -> str:
return "config/tmtc_config.json"
common_service_queue_user(
service=service, op_code=op_code, tc_queue=service_queue
)
def __init__(self, json_cfg_path: str):
super().__init__(json_cfg_path=json_cfg_path)
def get_service_op_code_dictionary(self) -> ServiceOpCodeDictT:
from tmtccmd.config.globals import get_default_service_op_code_dict
return get_default_service_op_code_dict()
def add_globals_pre_args_parsing(self, gui: bool = False):
from tmtccmd.config.globals import set_default_globals_pre_args_parsing
set_default_globals_pre_args_parsing(
gui=gui, pus_tm_version=PusVersion.PUS_C, pus_tc_version=PusVersion.PUS_C, apid=PUS_APID
)
def add_globals_post_args_parsing(self, args: argparse.Namespace):
from tmtccmd.config.globals import set_default_globals_post_args_parsing
set_default_globals_post_args_parsing(
args=args, json_cfg_path=self.get_json_config_file_path()
)
return common_fsfw_service_op_code_dict()
def assign_communication_interface(
self, com_if_key: str, tmtc_printer: TmTcPrinter
self, com_if_key: str
) -> Optional[CommunicationInterface]:
from tmtccmd.config.com_if import create_communication_interface_default
return create_communication_interface_default(
com_if_key=com_if_key, tmtc_printer=tmtc_printer,
json_cfg_path=self.get_json_config_file_path(), space_packet_id=0x08ef
com_if_key=com_if_key,
json_cfg_path=self.json_cfg_path,
space_packet_ids=TM_SP_IDS,
)
def perform_mode_operation(self, tmtc_backend: TmTcHandler, mode: int):
print("No custom mode operation implemented")
def pack_service_queue(self, service: int, op_code: str, service_queue: TcQueueT):
from common_tmtc.pus_tc.tc_packing import pack_service_queue_user
pack_service_queue_user(service=service, op_code=op_code, tc_queue=service_queue)
def get_object_ids(self) -> Dict[bytes, list]:
def get_object_ids(self) -> ObjectIdDictT:
from common_tmtc.config.object_ids import get_object_ids
return get_object_ids()
@staticmethod
def handle_service_8_telemetry(
object_id: int, action_id: int, custom_data: bytearray
) -> Tuple[list, list]:
from common_tmtc.pus_tm.service_8_handling import custom_service_8_handling
return custom_service_8_handling(
object_id=object_id, action_id=action_id, custom_data=custom_data
)
@staticmethod
def handle_service_3_housekeeping(
object_id: bytes, set_id: int, hk_data: bytearray, service3_packet: Service3Base
) -> Tuple[list, list, bytearray, int]:
from common_tmtc.pus_tm import service_3_hk_handling
return service_3_hk_handling(
object_id=object_id, set_id=set_id, hk_data=hk_data, service3_packet=service3_packet
)
def get_retval_dict(self) -> RetvalDictT:
return self.get_retval_dict()

View File

@@ -3,17 +3,29 @@
@details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs.
"""
from typing import Dict
import os
from tmtccmd.fsfw import parse_fsfw_objects_csv
from tmtccmd.logging import get_console_logger
from tmtccmd.utility.obj_id import ObjectIdDictT
LOGGER = get_console_logger()
DEFAULT_OBJECTS_CSV_PATH = "config/objects.csv"
__OBJECT_ID_DICT = None
PUS_SERVICE_17_ID = bytes([0x53, 0x00, 0x00, 0x17])
TEST_DEVICE_0_ID = bytes([0x44, 0x01, 0xAF, 0xFE])
TEST_DEVICE_1_ID = bytes([0x44, 0x02, 0xAF, 0xFE])
ASSEMBLY_ID = bytes([0x41, 0x00, 0xCA, 0xFE])
def get_object_ids() -> Dict[bytes, list]:
object_id_dict = {
PUS_SERVICE_17_ID: ["PUS Service 17"],
TEST_DEVICE_0_ID: ["Test Device 0"],
TEST_DEVICE_1_ID: ["Test Device 1"]
}
return object_id_dict
def get_object_ids() -> ObjectIdDictT:
global __OBJECT_ID_DICT
if not os.path.exists(DEFAULT_OBJECTS_CSV_PATH):
LOGGER.warning(f"No Objects CSV file found at {DEFAULT_OBJECTS_CSV_PATH}")
if __OBJECT_ID_DICT is None:
if os.path.exists(DEFAULT_OBJECTS_CSV_PATH):
__OBJECT_ID_DICT = parse_fsfw_objects_csv(csv_file=DEFAULT_OBJECTS_CSV_PATH)
else:
__OBJECT_ID_DICT = dict()
return __OBJECT_ID_DICT

22
config/retvals.py Normal file
View File

@@ -0,0 +1,22 @@
import os
from tmtccmd.fsfw import parse_fsfw_returnvalues_csv, RetvalDictT
from tmtccmd.logging import get_console_logger
DEFAULT_RETVAL_CSV_NAME = "config/returnvalues.csv"
__RETVAL_DICT = None
LOGGER = get_console_logger()
def get_retval_dict() -> RetvalDictT:
global __RETVAL_DICT
if __RETVAL_DICT is None:
if os.path.exists(DEFAULT_RETVAL_CSV_NAME):
__RETVAL_DICT = parse_fsfw_returnvalues_csv(
csv_file=DEFAULT_RETVAL_CSV_NAME
)
else:
LOGGER.warning(
f"No Return Value CSV file found at {DEFAULT_RETVAL_CSV_NAME}"
)
__RETVAL_DICT = dict()
return __RETVAL_DICT

View File

@@ -1,4 +0,0 @@
SW_NAME = "fsfw-tmtc"
SW_VERSION = 1
SW_SUBVERSION = 4
SW_SUBSUBVERSION = 0

32
pus_tc/cmd_definitions.py Normal file
View File

@@ -0,0 +1,32 @@
from tmtccmd.config import (
ServiceOpCodeDictT,
add_op_code_entry,
add_service_op_code_entry,
generate_op_code_options,
OpCodeDictKeys,
)
from tmtccmd.config.definitions import OpCodeOptionsT
from tmtccmd.config.globals import get_default_service_op_code_dict
from common_tmtc.pus_tc.pus_11_tc_sched import OpCodes, add_tc_sched_cmds
def common_fsfw_service_op_code_dict() -> ServiceOpCodeDictT:
service_op_code_dict = get_default_service_op_code_dict()
op_code = dict()
add_op_code_entry(op_code_dict=op_code, keys="test", info="Mode CMD Test")
add_op_code_entry(
op_code_dict=op_code,
keys=["0", "asm_to_normal"],
info="Command test assembly to normal mode",
options={OpCodeDictKeys.TIMEOUT: 6.0},
)
add_service_op_code_entry(
srv_op_code_dict=service_op_code_dict,
name="200",
info="PUS Service 200 Mode MGMT",
op_code_entry=op_code,
)
add_tc_sched_cmds(service_op_code_dict)
return service_op_code_dict

214
pus_tc/pus_11_tc_sched.py Normal file
View File

@@ -0,0 +1,214 @@
import struct
import time
from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.config import (
QueueCommands,
ServiceOpCodeDictT,
add_op_code_entry,
generate_op_code_options,
add_service_op_code_entry,
)
from tmtccmd.pus.pus_17_test import pack_service_17_ping_command
from tmtccmd.pus.pus_11_tc_sched import TypeOfTimeWindow, Subservices, TcSchedReqId
from tmtccmd.tc.pus_11_tc_sched import (
generate_enable_tc_sched_cmd,
generate_disable_tc_sched_cmd,
generate_reset_tc_sched_cmd,
pack_time_tagged_tc_app_data,
)
from tmtccmd.tc.definitions import TcQueueT
class OpCodes:
ENABLE = ["0", "enable"]
DISABLE = ["1", "disable"]
RESET = ["2", "reset"]
TEST_INSERT = ["12", "test-insert"]
TEST_DELETE = ["13", "test-del"]
TEST_RESET = ["14", "test-clear"]
class Info:
ENABLE = "Enable TC scheduling"
DISABLE = "Disable TC scheduling"
RESET = "Reset TC scheduling"
TEST_INSERT = "Test TC scheduling insertion"
TEST_DELETE = "Test TC scheduling deletion"
TEST_RESET = "Test TC scheduling reset command"
def add_tc_sched_cmds(cmd_dict: ServiceOpCodeDictT):
op_code = dict()
add_op_code_entry(
op_code_dict=op_code,
keys=OpCodes.ENABLE,
info=Info.ENABLE,
options=generate_op_code_options(custom_timeout=2.0),
)
add_op_code_entry(
op_code_dict=op_code,
keys=OpCodes.DISABLE,
info=Info.DISABLE,
options=generate_op_code_options(custom_timeout=2.0),
)
add_op_code_entry(op_code_dict=op_code, keys=OpCodes.RESET, info=Info.RESET)
add_op_code_entry(
op_code_dict=op_code,
keys=OpCodes.TEST_INSERT,
info=Info.TEST_INSERT,
options=generate_op_code_options(custom_timeout=0.2),
)
add_op_code_entry(
op_code_dict=op_code,
keys=OpCodes.TEST_DELETE,
info=Info.TEST_DELETE,
options=generate_op_code_options(custom_timeout=0.2),
)
add_op_code_entry(
op_code_dict=op_code,
keys=OpCodes.TEST_RESET,
info=Info.TEST_RESET,
options=generate_op_code_options(custom_timeout=0.2),
)
add_service_op_code_entry(
srv_op_code_dict=cmd_dict,
name="11",
info="PUS Service 11 TC Scheduling",
op_code_entry=op_code,
)
def __generic_pack_three_time_tagged_cmds(tc_queue: TcQueueT):
tc_queue.appendleft((QueueCommands.PRINT, "Testing Time-Tagged Command insertion"))
tc_queue.appendleft(generate_enable_tc_sched_cmd(ssc=0).pack_command_tuple())
current_time = int(round(time.time()))
# these TC[17,1] (ping commands) shall be inserted
ping_tcs = [
pack_service_17_ping_command(1701),
pack_service_17_ping_command(1702),
pack_service_17_ping_command(1703),
]
for idx, tc in enumerate(ping_tcs):
release_time = current_time + (idx + 2) * 5
time_tagged_tc = PusTelecommand(
service=11,
subservice=Subservices.TC_INSERT,
app_data=pack_time_tagged_tc_app_data(struct.pack("!I", release_time), tc),
)
tc_queue.appendleft(time_tagged_tc.pack_command_tuple())
tc_queue.appendleft((QueueCommands.WAIT, 25))
def pack_service_11_commands(op_code: str, tc_queue: TcQueueT):
if op_code in OpCodes.TEST_INSERT:
tc_queue.appendleft(
(QueueCommands.PRINT, "Testing Time-Tagged Command deletion")
)
__generic_pack_three_time_tagged_cmds(tc_queue=tc_queue)
if op_code in OpCodes.TEST_DELETE:
current_time = int(round(time.time()))
tc_to_schedule = pack_service_17_ping_command(1703)
time_tagged_tc = PusTelecommand(
service=11,
subservice=Subservices.TC_INSERT,
app_data=pack_time_tagged_tc_app_data(
struct.pack("!I", current_time + 20), tc_to_schedule
),
)
tc_queue.appendleft(time_tagged_tc.pack_command_tuple())
del_time_tagged_tcs = PusTelecommand(
service=11,
subservice=Subservices.TC_DELETE,
app_data=pack_delete_corresponding_tc_app_data(tc=tc_to_schedule),
ssc=1105,
)
tc_queue.appendleft(del_time_tagged_tcs.pack_command_tuple())
if op_code in OpCodes.ENABLE:
tc_queue.appendleft((QueueCommands.PRINT, "Enabling TC scheduler"))
tc_queue.appendleft(generate_enable_tc_sched_cmd(ssc=0).pack_command_tuple())
if op_code in OpCodes.DISABLE:
tc_queue.appendleft((QueueCommands.PRINT, "Disabling TC scheduler"))
tc_queue.appendleft(generate_disable_tc_sched_cmd(ssc=0).pack_command_tuple())
if op_code in OpCodes.RESET:
tc_queue.appendleft((QueueCommands.PRINT, "Reset TC scheduler"))
tc_queue.appendleft(generate_reset_tc_sched_cmd(ssc=0).pack_command_tuple())
if op_code in OpCodes.TEST_RESET:
tc_queue.appendleft((QueueCommands.PRINT, "Testing Reset command"))
__generic_pack_three_time_tagged_cmds(tc_queue=tc_queue)
tc_queue.appendleft(generate_reset_tc_sched_cmd(ssc=0).pack_command_tuple())
# a TC[11,5] for 3rd inserted ping TC
# TODO: This should be an independent test
# a TC[11,6] for some other previously inserted TCs
# service_11_6_tc = build_filter_delete_tc(TypeOfTimeWindow.FROM_TIMETAG_TO_TIMETAG,
# current_time+45, current_time+55)
# tc_queue.appendleft(service_11_6_tc.pack_command_tuple())
# TODO: This should be an independent test
# a TC[11,7] for another previously inserted TC
# service_11_7_tc = build_corresponding_timeshift_tc(30, ping_tc_4)
# tc_queue.appendleft(service_11_7_tc.pack_command_tuple())
# TODO: This should be an independent test
# a TC[11,8] with offset time of 20s
# service_11_8_tc = build_filter_timeshift_tc(
# 20,
# TypeOfTimeWindow.FROM_TIMETAG_TO_TIMETAG,
# current_time + 45,
# current_time + 55,
# )
# tc_queue.appendleft((service_11_8_tc.pack_command_tuple()))
def pack_delete_corresponding_tc_app_data(tc: PusTelecommand) -> bytes:
return TcSchedReqId.build_from_tc(tc).pack()
def build_filter_delete_tc(
time_window_type: TypeOfTimeWindow, *timestamps: int
) -> PusTelecommand:
app_data = bytearray()
app_data.extend(struct.pack("!I", int(time_window_type)))
if time_window_type != TypeOfTimeWindow.SELECT_ALL:
for timestamp in timestamps:
app_data.extend(struct.pack("!I", timestamp))
return PusTelecommand(service=11, subservice=6, app_data=app_data, ssc=1161)
def pack_corresponding_timeshift_app_data(
time_delta: bytes, tc: PusTelecommand, ssc: int
) -> bytes:
req_id = TcSchedReqId.build_from_tc(tc)
app_data = bytearray()
app_data.extend(time_delta)
app_data.extend(req_id.pack())
return app_data
def build_filter_timeshift_tc(
time_offset: int, time_window_type: TypeOfTimeWindow, *timestamps: int
) -> PusTelecommand:
app_data = bytearray()
app_data.extend(struct.pack("!I", time_offset))
app_data.extend(struct.pack("!I", int(time_window_type)))
if time_window_type != TypeOfTimeWindow.SELECT_ALL:
for timestamp in timestamps:
app_data.extend(struct.pack("!I", timestamp))
return PusTelecommand(service=11, subservice=8, app_data=app_data, ssc=1181)
# waits for a specified amount of seconds and prints ". . ." for each second
def wait_seconds(t: int):
print("Waiting: ", end="")
for x in range(t):
time.sleep(1)
print(". ", end="")
print("")

View File

@@ -1,9 +1,14 @@
from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.tc.service_17_test import pack_service17_ping_command, pack_generic_service17_test
from tmtccmd.pus.pus_17_test import (
pack_service_17_ping_command,
pack_generic_service17_test,
)
def pack_service_17_commands(op_code: str, init_ssc: int, tc_queue: TcQueueT):
if op_code == "0":
tc_queue.appendleft(pack_service17_ping_command(ssc=init_ssc).pack_command_tuple())
tc_queue.appendleft(
pack_service_17_ping_command(ssc=init_ssc).pack_command_tuple()
)
else:
pack_generic_service17_test(tc_queue=tc_queue, init_ssc=init_ssc)

View File

@@ -1,21 +1,25 @@
# -*- coding: utf-8 -*-
"""
@file tmtcc_tc_service_200_mode.py
@brief PUS Service 200: PUS custom service 200: Mode commanding
@author R. Mueller
@date 02.05.2020
"""
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.ecss.tc import PusTelecommand
from tmtccmd.tc.packer import TcQueueT
from tmtccmd.tc.service_200_mode import pack_mode_data
from spacepackets.ecss.tc import PusTelecommand
from common_tmtc.config.object_ids import TEST_DEVICE_0_ID
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.tc.packer import TcQueueT
from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes
from common_tmtc.config.object_ids import TEST_DEVICE_0_ID, ASSEMBLY_ID
def pack_service_200_commands_into(tc_queue: TcQueueT, op_code: str):
if op_code == "0":
if op_code == "test":
pack_service_200_test_into(tc_queue=tc_queue, init_ssc=2000)
elif op_code == "asm_to_normal" or op_code == "0":
# Set Normal mode
tc_queue.appendleft(
(QueueCommands.PRINT, "Testing Service 200: Set Mode Normal")
)
# Command assembly to normal, submode 1 for dual mode,
mode_data = pack_mode_data(ASSEMBLY_ID, Modes.NORMAL, 1)
command = PusTelecommand(service=200, subservice=1, ssc=0, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
def pack_service_200_test_into(init_ssc: int, tc_queue: TcQueueT) -> int:
@@ -25,28 +29,26 @@ def pack_service_200_test_into(init_ssc: int, tc_queue: TcQueueT) -> int:
object_id = TEST_DEVICE_0_ID
# Set On Mode
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode On"))
mode_data = pack_mode_data(object_id, 1, 0)
mode_data = pack_mode_data(object_id, Modes.ON, 0)
command = PusTelecommand(service=200, subservice=1, ssc=new_ssc, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
new_ssc += 1
# Set Normal mode
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode Normal"))
mode_data = pack_mode_data(object_id, 2, 0)
mode_data = pack_mode_data(object_id, Modes.NORMAL, 0)
command = PusTelecommand(service=200, subservice=1, ssc=new_ssc, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
new_ssc += 1
# Set Raw Mode
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode Raw"))
mode_data = pack_mode_data(object_id, 3, 0)
mode_data = pack_mode_data(object_id, Modes.RAW, 0)
command = PusTelecommand(service=200, subservice=1, ssc=new_ssc, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
new_ssc += 1
# Set Off Mode
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode Off"))
mode_data = pack_mode_data(object_id, 0, 0)
mode_data = pack_mode_data(object_id, Modes.OFF, 0)
command = PusTelecommand(service=200, subservice=1, ssc=new_ssc, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
new_ssc += 1
tc_queue.appendleft((QueueCommands.EXPORT_LOG, "log/tmtc_log_service200.txt"))
return new_ssc

View File

@@ -1,11 +1,12 @@
import struct
from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.ecss.tc import PusTelecommand
from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.tc.service_20_parameter import pack_type_and_matrix_data, pack_parameter_id
from tmtccmd.tc.service_200_mode import pack_mode_data
from tmtccmd.utility.logger import get_console_logger
from tmtccmd.tc.pus_20_params import pack_type_and_matrix_data, pack_parameter_id
from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes
from tmtccmd.logging import get_console_logger
from common_tmtc.config.object_ids import TEST_DEVICE_0_ID
@@ -24,7 +25,7 @@ def pack_service20_test_into(tc_queue: TcQueueT, called_externally: bool = False
# set mode normal
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20: Set Normal Mode"))
mode_data = pack_mode_data(object_id, 2, 0)
mode_data = pack_mode_data(object_id, Modes.NORMAL, 0)
command = PusTelecommand(service=200, subservice=1, ssc=2000, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
@@ -32,9 +33,6 @@ def pack_service20_test_into(tc_queue: TcQueueT, called_externally: bool = False
load_param_1_simple_test_commands(tc_queue=tc_queue)
load_param_2_simple_test_commands(tc_queue=tc_queue)
if called_externally is False:
tc_queue.appendleft((QueueCommands.EXPORT_LOG, "log/tmtc_log_service20.txt"))
def load_param_0_simple_test_commands(tc_queue: TcQueueT):
object_id = TEST_DEVICE_0_ID
@@ -89,4 +87,3 @@ def load_param_2_simple_test_commands(tc_queue: TcQueueT):
# payload = object_id + parameter_id_2
# command = PusTelecommand(service=20, subservice=129, ssc=2060, app_data=payload)
# tc_queue.appendleft(command.pack_command_tuple())

View File

@@ -7,12 +7,13 @@
"""
import struct
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.ecss.tc import PusTelecommand
from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.tc.service_200_mode import pack_mode_data
from spacepackets.ecss.tc import PusTelecommand
from common_tmtc import pus_tc as cmd_data
from tmtccmd.tc.pus_200_fsfw_modes import Modes, pack_mode_data
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.tc.definitions import TcQueueT
from common_tmtc.pus_tc import command_data as cmd_data
from common_tmtc.config.object_ids import TEST_DEVICE_0_ID
@@ -28,12 +29,14 @@ def pack_generic_service_2_test_into(init_ssc: int, tc_queue: TcQueueT) -> int:
object_id = TEST_DEVICE_0_ID # dummy device
# Set Raw Mode
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 2: Setting Raw Mode"))
mode_data = pack_mode_data(object_id, 3, 0)
mode_data = pack_mode_data(object_id, Modes.RAW, 0)
command = PusTelecommand(service=200, subservice=1, ssc=new_ssc, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
new_ssc += 1
# toggle wiretapping raw
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 2: Toggling Wiretapping Raw"))
tc_queue.appendleft(
(QueueCommands.PRINT, "Testing Service 2: Toggling Wiretapping Raw")
)
wiretapping_toggle_data = pack_wiretapping_mode(object_id, 1)
toggle_wiretapping_on_command = PusTelecommand(
service=2, subservice=129, ssc=new_ssc, app_data=wiretapping_toggle_data
@@ -44,34 +47,42 @@ def pack_generic_service_2_test_into(init_ssc: int, tc_queue: TcQueueT) -> int:
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 2: Sending Raw Command"))
raw_command = cmd_data.TEST_COMMAND_0
raw_data = object_id + raw_command
raw_command = PusTelecommand(service=2, subservice=128, ssc=new_ssc, app_data=raw_data)
raw_command = PusTelecommand(
service=2, subservice=128, ssc=new_ssc, app_data=raw_data
)
tc_queue.appendleft(raw_command.pack_command_tuple())
new_ssc += 1
# toggle wiretapping off
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 2: Toggle Wiretapping Off"))
tc_queue.appendleft(
(QueueCommands.PRINT, "Testing Service 2: Toggle Wiretapping Off")
)
wiretapping_toggle_data = pack_wiretapping_mode(object_id, 0)
toggle_wiretapping_off_command = PusTelecommand(service=2, subservice=129, ssc=new_ssc,
app_data=wiretapping_toggle_data)
toggle_wiretapping_off_command = PusTelecommand(
service=2, subservice=129, ssc=new_ssc, app_data=wiretapping_toggle_data
)
tc_queue.appendleft(toggle_wiretapping_off_command.pack_command_tuple())
new_ssc += 1
# send raw command which should be returned via TM[2,130]
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 2: Send second raw command"))
tc_queue.appendleft(
(QueueCommands.PRINT, "Testing Service 2: Send second raw command")
)
command = PusTelecommand(service=2, subservice=128, ssc=new_ssc, app_data=raw_data)
tc_queue.appendleft(command.pack_command_tuple())
new_ssc += 1
# Set mode off
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 2: Setting Off Mode"))
mode_data = pack_mode_data(object_id, 0, 0)
mode_data = pack_mode_data(object_id, Modes.OFF, 0)
command = PusTelecommand(service=200, subservice=1, ssc=new_ssc, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
new_ssc += 1
tc_queue.appendleft((QueueCommands.EXPORT_LOG, "log/tmtc_log_service_2.txt"))
return new_ssc
# wiretappingMode = 0: MODE_OFF, wiretappingMode = 1: MODE_RAW
def pack_wiretapping_mode(object_id, wiretapping_mode_):
wiretapping_mode = struct.pack(">B", wiretapping_mode_) # MODE_OFF : 0x00, MODE_RAW: 0x01
wiretapping_mode = struct.pack(
">B", wiretapping_mode_
) # MODE_OFF : 0x00, MODE_RAW: 0x01
wiretapping_toggle_data = object_id + wiretapping_mode
return wiretapping_toggle_data

View File

@@ -1,11 +1,15 @@
from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.tc.service_200_mode import pack_mode_data
from tmtccmd.tc.service_20_parameter import pack_boolean_parameter_command
from tmtccmd.tc.service_3_housekeeping import make_sid, generate_one_hk_command, \
Srv3Subservice
from tmtccmd.ecss.tc import PusTelecommand
from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes
from tmtccmd.tc.pus_20_params import (
pack_boolean_parameter_app_data,
pack_fsfw_load_param_cmd,
)
from tmtccmd.tc.pus_3_fsfw_hk import make_sid, generate_one_hk_command
import tmtccmd.tc.pus_3_fsfw_hk as srv3
from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.tc.service_8_functional_cmd import generate_action_command
from tmtccmd.tc.pus_8_funccmd import generate_action_command
from common_tmtc.config.object_ids import TEST_DEVICE_0_ID, TEST_DEVICE_1_ID
@@ -22,7 +26,6 @@ PARAM_ACTIVATE_CHANGING_DATASETS = 4
def pack_service_3_commands_into(tc_queue: TcQueueT, op_code: str):
current_ssc = 3000
# TODO: Import this from config instead
device_idx = 0
if device_idx == 0:
object_id = TEST_DEVICE_0_ID
@@ -31,43 +34,67 @@ def pack_service_3_commands_into(tc_queue: TcQueueT, op_code: str):
if op_code == "0":
# This will pack all the tests
pack_service_3_test_info(tc_queue=tc_queue, init_ssc=current_ssc, object_id=object_id,
device_idx=device_idx)
pack_service_3_test_info(
tc_queue=tc_queue,
init_ssc=current_ssc,
object_id=object_id,
device_idx=device_idx,
)
elif op_code == "1":
# Extremely simple, generate one HK packet
pack_gen_one_hk_command(tc_queue=tc_queue, device_idx=device_idx, init_ssc=current_ssc,
object_id=object_id)
pack_gen_one_hk_command(
tc_queue=tc_queue,
device_idx=device_idx,
init_ssc=current_ssc,
object_id=object_id,
)
elif op_code == "2":
# Housekeeping basic test
pack_housekeeping_basic_test(tc_queue=tc_queue, object_id=object_id, init_ssc=current_ssc)
pack_housekeeping_basic_test(
tc_queue=tc_queue, object_id=object_id, init_ssc=current_ssc
)
elif op_code == "3":
# Notification demo
pack_notification_basic_test(tc_queue=tc_queue, object_id=object_id, init_ssc=current_ssc)
pack_notification_basic_test(
tc_queue=tc_queue, object_id=object_id, init_ssc=current_ssc
)
def pack_service_3_test_info(tc_queue: TcQueueT, device_idx: int, object_id: bytearray,
init_ssc: int):
tc_queue.appendleft((QueueCommands.PRINT, "Service 3 (Housekeeping Service): All tests"))
def pack_service_3_test_info(
tc_queue: TcQueueT, device_idx: int, object_id: bytearray, init_ssc: int
):
tc_queue.appendleft(
(QueueCommands.PRINT, "Service 3 (Housekeeping Service): All tests")
)
current_ssc = init_ssc
current_ssc += pack_gen_one_hk_command(
tc_queue=tc_queue, device_idx=device_idx, object_id=object_id, init_ssc=current_ssc
tc_queue=tc_queue,
device_idx=device_idx,
object_id=object_id,
init_ssc=current_ssc,
)
current_ssc += pack_housekeeping_basic_test(
tc_queue=tc_queue, object_id=object_id, init_ssc=current_ssc
tc_queue=tc_queue, object_id=object_id, init_ssc=current_ssc
)
current_ssc += pack_notification_basic_test(
tc_queue=tc_queue, object_id=object_id, init_ssc=current_ssc, enable_normal_mode=False
tc_queue=tc_queue,
object_id=object_id,
init_ssc=current_ssc,
enable_normal_mode=False,
)
def pack_gen_one_hk_command(
tc_queue: TcQueueT, device_idx: int, init_ssc: int, object_id: bytearray
tc_queue: TcQueueT, device_idx: int, init_ssc: int, object_id: bytearray
) -> int:
test_sid = make_sid(object_id=object_id, set_id=TEST_SET_ID)
tc_queue.appendleft(
(QueueCommands.PRINT, f"Service 3 Test: Generate one test set packet for "
f"test device {device_idx}")
(
QueueCommands.PRINT,
f"Service 3 Test: Generate one test set packet for "
f"test device {device_idx}",
)
)
command = generate_one_hk_command(ssc=init_ssc, sid=test_sid)
init_ssc += 1
@@ -76,7 +103,10 @@ def pack_gen_one_hk_command(
def pack_housekeeping_basic_test(
tc_queue: TcQueueT, object_id: bytearray, init_ssc: int, enable_normal_mode: bool = True
tc_queue: TcQueueT,
object_id: bytearray,
init_ssc: int,
enable_normal_mode: bool = True,
) -> int:
"""
This basic test will request one HK packet, then it will enable periodic packets and listen
@@ -85,63 +115,92 @@ def pack_housekeeping_basic_test(
test_sid = make_sid(object_id=object_id, set_id=TEST_SET_ID)
current_ssc = init_ssc
# Enable changing datasets via parameter service (Service 20)
tc_queue.appendleft((QueueCommands.PRINT, "Service 3 Test: Performing basic HK tests"))
tc_queue.appendleft(
(QueueCommands.PRINT, "Service 3 Test: Performing basic HK tests")
)
if enable_normal_mode:
# Set mode normal so that sets are changed/read regularly
tc_queue.appendleft((QueueCommands.PRINT, "Service 3 Test: Set Normal Mode"))
mode_data = pack_mode_data(object_id, 2, 0)
command = PusTelecommand(service=200, subservice=1, ssc=current_ssc, app_data=mode_data)
mode_data = pack_mode_data(object_id, Modes.NORMAL, 0)
command = PusTelecommand(
service=200, subservice=1, ssc=current_ssc, app_data=mode_data
)
current_ssc += 1
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft((QueueCommands.PRINT, "Enabling changing datasets"))
command = pack_boolean_parameter_command(
object_id=object_id, domain_id=0, unique_id=PARAM_ACTIVATE_CHANGING_DATASETS,
parameter=True, ssc=current_ssc
app_data = pack_boolean_parameter_app_data(
object_id=object_id,
domain_id=0,
unique_id=PARAM_ACTIVATE_CHANGING_DATASETS,
parameter=True,
)
cmd = pack_fsfw_load_param_cmd(app_data=app_data, ssc=0)
current_ssc += 1
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft(cmd.pack_command_tuple())
# Enable periodic reporting
tc_queue.appendleft((QueueCommands.PRINT,
"Enabling periodic thermal sensor packet generation: "))
command = PusTelecommand(service=3, subservice=Srv3Subservice.ENABLE_PERIODIC_HK_GEN.value,
ssc=current_ssc, app_data=test_sid)
tc_queue.appendleft(
(QueueCommands.PRINT, "Enabling periodic thermal sensor packet generation: ")
)
command = PusTelecommand(
service=3,
subservice=srv3.Subservices.TC_ENABLE_PERIODIC_HK_GEN,
ssc=current_ssc,
app_data=test_sid,
)
current_ssc += 1
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft((QueueCommands.WAIT, 2.0))
# Disable periodic reporting
tc_queue.appendleft((QueueCommands.PRINT,
"Disabling periodic thermal sensor packet generation: "))
command = PusTelecommand(service=3, subservice=Srv3Subservice.DISABLE_PERIODIC_HK_GEN.value,
ssc=current_ssc, app_data=test_sid)
tc_queue.appendleft(
(QueueCommands.PRINT, "Disabling periodic thermal sensor packet generation: ")
)
command = PusTelecommand(
service=3,
subservice=srv3.Subservices.TC_DISABLE_PERIODIC_HK_GEN,
ssc=current_ssc,
app_data=test_sid,
)
current_ssc += 1
tc_queue.appendleft(command.pack_command_tuple())
# Disable changing datasets via parameter service (Service 20)
tc_queue.appendleft((QueueCommands.PRINT, "Disabling changing datasets"))
command = pack_boolean_parameter_command(
object_id=object_id, domain_id=0, unique_id=PARAM_ACTIVATE_CHANGING_DATASETS,
parameter=False, ssc=current_ssc
app_data = pack_boolean_parameter_app_data(
object_id=object_id,
domain_id=0,
unique_id=PARAM_ACTIVATE_CHANGING_DATASETS,
parameter=False,
)
current_ssc += 1
tc_queue.appendleft(command.pack_command_tuple())
cmd = pack_fsfw_load_param_cmd(app_data=app_data, ssc=0)
tc_queue.appendleft(cmd.pack_command_tuple())
return current_ssc
def pack_notification_basic_test(tc_queue: TcQueueT, object_id: bytearray, init_ssc: int,
enable_normal_mode: bool = True) -> int:
def pack_notification_basic_test(
tc_queue: TcQueueT,
object_id: bytearray,
init_ssc: int,
enable_normal_mode: bool = True,
) -> int:
current_ssc = init_ssc
tc_queue.appendleft((QueueCommands.PRINT, "Service 3 Test: Performing notification tests"))
tc_queue.appendleft(
(QueueCommands.PRINT, "Service 3 Test: Performing notification tests")
)
if enable_normal_mode:
# Set mode normal so that sets are changed/read regularly
tc_queue.appendleft((QueueCommands.PRINT, "Service 3 Test: Set Normal Mode"))
mode_data = pack_mode_data(object_id, 2, 0)
command = PusTelecommand(service=200, subservice=1, ssc=current_ssc, app_data=mode_data)
mode_data = pack_mode_data(object_id, Modes.NORMAL, 0)
command = PusTelecommand(
service=200, subservice=1, ssc=current_ssc, app_data=mode_data
)
current_ssc += 1
tc_queue.appendleft(command.pack_command_tuple())

View File

@@ -1,9 +1,10 @@
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.ecss.tc import PusTelecommand
from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.tc.service_200_mode import pack_mode_data
from spacepackets.ecss.tc import PusTelecommand
from common_tmtc import pus_tc as cmd_data
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes
import common_tmtc.pus_tc.command_data as cmd_data
from common_tmtc.config.object_ids import TEST_DEVICE_0_ID
@@ -22,21 +23,25 @@ def pack_generic_service_8_test_into(tc_queue: TcQueueT):
# set mode on
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 8: Set On Mode"))
mode_data = pack_mode_data(object_id, 1, 0)
mode_data = pack_mode_data(object_id, Modes.ON, 0)
command = PusTelecommand(service=200, subservice=1, ssc=800, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
# set mode normal
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 8: Set Normal Mode"))
mode_data = pack_mode_data(object_id, 2, 0)
mode_data = pack_mode_data(object_id, Modes.NORMAL, 0)
command = PusTelecommand(service=200, subservice=1, ssc=810, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
# Direct command which triggers completion reply
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 8: Trigger Step and Completion Reply"))
tc_queue.appendleft(
(QueueCommands.PRINT, "Testing Service 8: Trigger Step and Completion Reply")
)
action_id = cmd_data.TEST_COMMAND_0
direct_command = object_id + action_id
command = PusTelecommand(service=8, subservice=128, ssc=820, app_data=direct_command)
command = PusTelecommand(
service=8, subservice=128, ssc=820, app_data=direct_command
)
tc_queue.appendleft(command.pack_command_tuple())
# Direct command which triggers _tm_data reply
@@ -45,13 +50,13 @@ def pack_generic_service_8_test_into(tc_queue: TcQueueT):
command_param1 = cmd_data.TEST_COMMAND_1_PARAM_1
command_param2 = cmd_data.TEST_COMMAND_1_PARAM_2
direct_command = object_id + action_id + command_param1 + command_param2
command = PusTelecommand(service=8, subservice=128, ssc=830, app_data=direct_command)
command = PusTelecommand(
service=8, subservice=128, ssc=830, app_data=direct_command
)
tc_queue.appendleft(command.pack_command_tuple())
# Set mode off
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 8: Set Off Mode"))
mode_data = pack_mode_data(object_id, 0, 0)
mode_data = pack_mode_data(object_id, Modes.OFF, 0)
command = PusTelecommand(service=200, subservice=1, ssc=800, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft((QueueCommands.EXPORT_LOG, "log/tmtc_log_service_8.txt"))

View File

@@ -3,27 +3,55 @@
@details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs.
"""
import logging
import os
from collections import deque
from typing import Union
from spacepackets.ecss.tc import PusTelecommand
from common_tmtc.pus_tc.pus_11_tc_sched import pack_service_11_commands
from tmtccmd.com_if.com_interface_base import CommunicationInterface
from tmtccmd.logging import get_console_logger, get_current_time_string
from tmtccmd.logging.pus import log_raw_pus_tc
from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.config.definitions import CoreServiceList, QueueCommands
from tmtccmd.tc.pus_5_event import pack_generic_service5_test_into
from tmtccmd.pus.pus_17_test import pack_generic_service17_test
from common_tmtc.pus_tc.service_20_parameters import pack_service20_commands_into
from common_tmtc.pus_tc.service_2_raw_cmd import pack_service_2_commands_into
from common_tmtc.pus_tc.service_3_housekeeping import pack_service_3_commands_into
from common_tmtc.pus_tc.service_17_test import pack_service_17_commands
from common_tmtc.pus_tc.pus_17_test import pack_service_17_commands
from common_tmtc.pus_tc.service_8_func_cmd import pack_service_8_commands_into
from tmtccmd.utility.logger import get_console_logger
from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.config.definitions import CoreServiceList
from tmtccmd.tc.service_5_event import pack_generic_service5_test_into
from tmtccmd.tc.service_17_test import pack_generic_service17_test
from common_tmtc.pus_tc.service_200_mode import pack_service_200_commands_into
from common_tmtc.pus_tc.pus_200_mode import pack_service_200_commands_into
LOGGER = get_console_logger()
def pack_service_queue_user(service: Union[str, int], op_code: str, tc_queue: TcQueueT):
def pre_tc_send_cb(
queue_entry: Union[bytes, QueueCommands],
com_if: CommunicationInterface,
queue_info: Union[PusTelecommand, any],
file_logger: logging.Logger,
):
if isinstance(queue_entry, bytes) or isinstance(queue_entry, bytearray):
log_raw_pus_tc(
packet=queue_entry,
srv_subservice=(queue_info.service, queue_info.subservice),
)
tc_info_string = f"Sent {queue_info}"
LOGGER.info(tc_info_string)
file_logger.info(f"{get_current_time_string(True)}: {tc_info_string}")
com_if.send(data=queue_entry)
elif isinstance(queue_entry, QueueCommands):
if queue_entry == QueueCommands.PRINT:
file_logger.info(queue_info)
def common_service_queue_user(
service: Union[str, int], op_code: str, tc_queue: TcQueueT
):
if service == CoreServiceList.SERVICE_2.value:
return pack_service_2_commands_into(op_code=op_code, tc_queue=tc_queue)
if service == CoreServiceList.SERVICE_3.value:
@@ -32,6 +60,8 @@ def pack_service_queue_user(service: Union[str, int], op_code: str, tc_queue: Tc
return pack_generic_service5_test_into(tc_queue=tc_queue)
if service == CoreServiceList.SERVICE_8.value:
return pack_service_8_commands_into(op_code=op_code, tc_queue=tc_queue)
if service == CoreServiceList.SERVICE_11.value:
return pack_service_11_commands(op_code=op_code, tc_queue=tc_queue)
if service == CoreServiceList.SERVICE_17.value:
return pack_service_17_commands(op_code=op_code, tc_queue=tc_queue, init_ssc=0)
if service == CoreServiceList.SERVICE_20.value:

View File

@@ -0,0 +1,24 @@
from tmtccmd.logging import get_console_logger
from tmtccmd.tm import Service8FsfwTm
from tmtccmd.utility.obj_id import ObjectIdDictT
from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter
LOGGER = get_console_logger()
def handle_action_reply(
raw_tm: bytes, printer: FsfwTmTcPrinter, obj_id_dict: ObjectIdDictT
):
"""Core Action reply handler
:return:
"""
tm_packet = Service8FsfwTm.unpack(raw_telemetry=raw_tm)
printer.handle_long_tm_print(packet_if=tm_packet, info_if=tm_packet)
object_id = obj_id_dict.get(tm_packet.source_object_id_as_bytes)
custom_data = tm_packet.custom_data
action_id = tm_packet.action_id
generic_print_str = printer.generic_action_packet_tm_print(
packet=tm_packet, obj_id=object_id
)
print(generic_print_str)
printer.file_logger.info(generic_print_str)

61
pus_tm/event_handler.py Normal file
View File

@@ -0,0 +1,61 @@
import logging
import os.path
from datetime import datetime
from common_tmtc.config.object_ids import get_object_ids
from tmtccmd.tm import Service5Tm
from tmtccmd.logging import get_console_logger
from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter
from tmtccmd.fsfw import parse_fsfw_events_csv, EventDictT, EventInfo
LOGGER = get_console_logger()
DEFAULT_EVENTS_CSV_PATH = "config/events.csv"
__EVENT_DICT = None
def get_event_dict() -> EventDictT:
global __EVENT_DICT
if __EVENT_DICT is None:
if os.path.exists(DEFAULT_EVENTS_CSV_PATH):
__EVENT_DICT = parse_fsfw_events_csv(DEFAULT_EVENTS_CSV_PATH)
else:
LOGGER.warning(f"No Event CSV file found at {DEFAULT_EVENTS_CSV_PATH}")
__EVENT_DICT = dict()
return __EVENT_DICT
def handle_event_packet(
raw_tm: bytes, printer: FsfwTmTcPrinter, file_logger: logging.Logger
) -> str:
tm = Service5Tm.unpack(raw_telemetry=raw_tm)
printer.handle_long_tm_print(packet_if=tm, info_if=tm)
additional_event_info = ""
event_dict = get_event_dict()
info = event_dict.get(tm.event_id)
if info is None:
LOGGER.warning(f"Event ID {tm.event_id} has no information")
info = EventInfo()
info.name = "Unknown event"
obj_ids = get_object_ids()
obj_id_obj = obj_ids.get(tm.reporter_id.as_bytes)
if obj_id_obj is None:
LOGGER.warning(f"Object ID 0x{tm.reporter_id.as_string} has no name")
obj_name = tm.reporter_id.as_string
else:
obj_name = obj_id_obj.name
generic_event_string = (
f"Object {obj_name} generated Event {tm.event_id} | {info.name}"
)
if info.info != "":
additional_event_info = (
f"Additional info: {info.info} | P1: {tm.param_1} | P2: {tm.param_2}"
)
file_logger.info(
f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}: {generic_event_string}"
)
LOGGER.info(generic_event_string)
if additional_event_info != "":
file_logger.info(additional_event_info)
print(additional_event_info)
return generic_event_string + " | " + additional_event_info

View File

@@ -3,50 +3,90 @@
@details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs.
"""
from spacepackets.ecss.tm import PusTelemetry
from spacepackets.util import PrintFormats
from spacepackets.ccsds.spacepacket import PacketTypes
from tmtccmd.ecss.tm import PusTelemetry
from tmtccmd.utility.logger import get_console_logger
from tmtccmd.tm.service_1_verification import Service1TM
from tmtccmd.tm.service_2_raw_cmd import Service2TM
from tmtccmd.tm.service_3_housekeeping import Service3TM
from tmtccmd.tm.service_5_event import Service5TM
from tmtccmd.tm.service_8_functional_cmd import Service8TM
from tmtccmd.tm.service_17_test import Service17TM
from tmtccmd.tm.service_20_parameters import Service20TM
from tmtccmd.tm.service_200_mode import Service200TM
from tmtccmd.utility.tmtc_printer import TmTcPrinter
from tmtccmd.tm.pus_2_rawcmd import Service2Tm
from tmtccmd.tm.pus_17_test import Service17TMExtended
from tmtccmd.tm.pus_20_fsfw_parameters import Service20FsfwTm
from tmtccmd.tm.pus_200_fsfw_modes import Service200FsfwTm
from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter
from tmtccmd.logging import get_console_logger
from tmtccmd.logging.pus import (
create_tmtc_logger,
log_raw_pus_tm,
log_raw_unknown_packet,
)
from common_tmtc.config.object_ids import get_object_ids
from common_tmtc.pus_tm.action_reply_handling import handle_action_reply
from common_tmtc.pus_tm.event_handler import handle_event_packet
from common_tmtc.pus_tm.verification_handler import handle_service_1_packet
from common_tmtc.pus_tm.hk_handling import handle_hk_packet
from common_tmtc.config.definitions import PUS_APID
LOGGER = get_console_logger()
def ccsds_tm_handler(apid: int, raw_tm_packet: bytearray, tmtc_printer: TmTcPrinter) -> None:
FSFW_PRINTER = FsfwTmTcPrinter(file_logger=create_tmtc_logger())
def ccsds_tm_handler(apid: int, raw_tm_packet: bytes, _user_args: any) -> None:
if apid == PUS_APID:
pus_packet_factory(raw_tm_packet=raw_tm_packet, tmtc_printer=tmtc_printer)
pus_factory_hook(raw_tm_packet=raw_tm_packet)
def pus_packet_factory(raw_tm_packet: bytearray, tmtc_printer: TmTcPrinter):
def pus_factory_hook(raw_tm_packet: bytes):
if len(raw_tm_packet) < 8:
LOGGER.warning("Detected packet shorter than 8 bytes!")
return
service_type = raw_tm_packet[7]
tm_packet = None
if service_type == 1:
tm_packet = Service1TM.unpack(raw_tm_packet)
if service_type == 2:
tm_packet = Service2TM.unpack(raw_tm_packet)
if service_type == 3:
tm_packet = Service3TM.unpack(raw_tm_packet, custom_hk_handling=False)
if service_type == 8:
tm_packet = Service8TM.unpack(raw_tm_packet)
if service_type == 5:
tm_packet = Service5TM.unpack(raw_tm_packet)
if service_type == 17:
tm_packet = Service17TM.unpack(raw_tm_packet)
if service_type == 20:
tm_packet = Service20TM.unpack(raw_tm_packet)
if service_type == 200:
tm_packet = Service200TM.unpack(raw_tm_packet)
if tm_packet is None:
LOGGER.info("The service " + str(service_type) + " is not implemented in Telemetry Factory")
tm_packet = PusTelemetry.unpack(raw_tm_packet)
tmtc_printer.print_telemetry(packet_if=tm_packet, info_if=tm_packet)
subservice_type = raw_tm_packet[8]
file_logger = FSFW_PRINTER.file_logger
obj_id_dict = get_object_ids()
dedicated_handler = True
try:
tm_packet = None
if service_type == 1:
handle_service_1_packet(printer=FSFW_PRINTER, raw_tm=raw_tm_packet)
elif service_type == 2:
tm_packet = Service2Tm.unpack(raw_tm_packet)
dedicated_handler = False
elif service_type == 3:
handle_hk_packet(
printer=FSFW_PRINTER, raw_tm=raw_tm_packet, obj_id_dict=obj_id_dict
)
elif service_type == 8:
handle_action_reply(
raw_tm=raw_tm_packet, printer=FSFW_PRINTER, obj_id_dict=obj_id_dict
)
elif service_type == 5:
handle_event_packet(
raw_tm=raw_tm_packet, printer=FSFW_PRINTER, file_logger=file_logger
)
elif service_type == 17:
tm_packet = Service17TMExtended.unpack(raw_telemetry=raw_tm_packet)
dedicated_handler = False
elif service_type == 20:
tm_packet = Service20FsfwTm.unpack(raw_telemetry=raw_tm_packet)
dedicated_handler = False
elif service_type == 200:
tm_packet = Service200FsfwTm.unpack(raw_telemetry=raw_tm_packet)
dedicated_handler = False
else:
LOGGER.info(
f"The service {service_type} is not implemented in Telemetry Factory"
)
tm_packet = PusTelemetry.unpack(raw_telemetry=raw_tm_packet)
tm_packet.print_source_data(PrintFormats.HEX)
dedicated_handler = True
if not dedicated_handler and tm_packet is not None:
FSFW_PRINTER.handle_long_tm_print(packet_if=tm_packet, info_if=tm_packet)
log_raw_pus_tm(
packet=raw_tm_packet, srv_subservice=(service_type, subservice_type)
)
except ValueError:
LOGGER.warning("Invalid packet format detected")
log_raw_unknown_packet(packet=raw_tm_packet, packet_type=PacketTypes.TM)

81
pus_tm/hk_handling.py Normal file
View File

@@ -0,0 +1,81 @@
"""This file transfers control of housekeeping handling (PUS service 3) to the developer
"""
import struct
from typing import Tuple
from tmtccmd.tm import Service3FsfwTm
from tmtccmd.tm.pus_3_hk_base import HkContentType, Service3Base
from tmtccmd.utility.obj_id import ObjectIdDictT, ObjectId
from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter
from tmtccmd.logging import get_console_logger
from common_tmtc.config.object_ids import TEST_DEVICE_0_ID, TEST_DEVICE_1_ID
LOGGER = get_console_logger()
def handle_hk_packet(
raw_tm: bytes,
obj_id_dict: ObjectIdDictT,
printer: FsfwTmTcPrinter,
):
tm_packet = Service3FsfwTm.unpack(raw_telemetry=raw_tm, custom_hk_handling=False)
named_obj_id = obj_id_dict.get(tm_packet.object_id.as_bytes)
if named_obj_id is None:
named_obj_id = tm_packet.object_id
if tm_packet.subservice == 25 or tm_packet.subservice == 26:
hk_data = tm_packet.tm_data[8:]
printer.generic_hk_tm_print(
content_type=HkContentType.HK,
object_id=named_obj_id,
set_id=tm_packet.set_id,
hk_data=hk_data,
)
handle_regular_hk_print(
printer=printer,
object_id=named_obj_id,
hk_packet=tm_packet,
hk_data=hk_data,
)
if tm_packet.subservice == 10 or tm_packet.subservice == 12:
LOGGER.warning("HK definitions printout not implemented yet")
def handle_regular_hk_print(
printer: FsfwTmTcPrinter,
object_id: ObjectId,
hk_packet: Service3Base,
hk_data: bytes,
):
if object_id == TEST_DEVICE_0_ID or object_id == TEST_DEVICE_1_ID:
handle_test_set_deserialization(hk_data=hk_data)
def handle_test_set_deserialization(
hk_data: bytes,
) -> Tuple[list, list, bytearray, int]:
header_list = []
content_list = []
validity_buffer = bytearray()
# uint8 (1) + uint32_t (4) + float vector with 3 entries (12) + validity buffer (1)
if len(hk_data) < 18:
LOGGER.warning("Invalid HK data format for test set reply!")
return header_list, content_list, validity_buffer, 0
uint8_value = struct.unpack("!B", hk_data[0:1])[0]
uint32_value = struct.unpack("!I", hk_data[1:5])[0]
float_value_1 = struct.unpack("!f", hk_data[5:9])[0]
float_value_2 = struct.unpack("!f", hk_data[9:13])[0]
float_value_3 = struct.unpack("!f", hk_data[13:17])[0]
validity_buffer.append(hk_data[17])
header_list.append("uint8 value")
header_list.append("uint32 value")
header_list.append("float vec value 1")
header_list.append("float vec value 2")
header_list.append("float vec value 3")
content_list.append(uint8_value)
content_list.append(uint32_value)
content_list.append(float_value_1)
content_list.append(float_value_2)
content_list.append(float_value_3)
return header_list, content_list, validity_buffer, 3

View File

@@ -1,65 +0,0 @@
"""This file transfers control of housekeeping handling (PUS service 3) to the developer
"""
import struct
from typing import Tuple
from tmtccmd.tm.service_3_housekeeping import Service3Base
from tmtccmd.utility.logger import get_console_logger
from common_tmtc.config.object_ids import TEST_DEVICE_0_ID, TEST_DEVICE_1_ID
LOGGER = get_console_logger()
def service_3_hk_handling(
object_id: bytes, set_id: int, hk_data: bytearray, service3_packet: Service3Base
) -> Tuple[list, list, bytearray, int]:
"""This function is called when a Service 3 Housekeeping packet is received.
Please note that the object IDs should be compared by value because direct comparison of
enumerations does not work in Python. For example use:
if object_id.value == ObjectIds.TEST_OBJECT.value
to test equality based on the object ID list.
@param object_id:
@param set_id:
@param hk_data:
@param service3_packet:
@return: Expects a tuple, consisting of two lists, a bytearray and an integer
The first list contains the header columns, the second list the list with
the corresponding values. The bytearray is the validity buffer, which is usually appended
at the end of the housekeeping packet. The last value is the number of parameters.
"""
if object_id == TEST_DEVICE_0_ID or object_id == TEST_DEVICE_1_ID:
return handle_test_set_deserialization(hk_data=hk_data)
else:
LOGGER.info("Service3TM: Parsing for this SID has not been implemented.")
return [], [], bytearray(), 0
def handle_test_set_deserialization(hk_data: bytearray) -> Tuple[list, list, bytearray, int]:
header_list = []
content_list = []
validity_buffer = bytearray()
# uint8 (1) + uint32_t (4) + float vector with 3 entries (12) + validity buffer (1)
if len(hk_data) < 18:
LOGGER.warning("Invalid HK data format for test set reply!")
return header_list, content_list, validity_buffer, 0
uint8_value = struct.unpack('!B', hk_data[0:1])[0]
uint32_value = struct.unpack('!I', hk_data[1:5])[0]
float_value_1 = struct.unpack('!f', hk_data[5:9])[0]
float_value_2 = struct.unpack('!f', hk_data[9:13])[0]
float_value_3 = struct.unpack('!f', hk_data[13:17])[0]
validity_buffer.append(hk_data[17])
header_list.append("uint8 value")
header_list.append("uint32 value")
header_list.append("float vec value 1")
header_list.append("float vec value 2")
header_list.append("float vec value 3")
content_list.append(uint8_value)
content_list.append(uint32_value)
content_list.append(float_value_1)
content_list.append(float_value_2)
content_list.append(float_value_3)
return header_list, content_list, validity_buffer, 3

View File

@@ -1,19 +0,0 @@
from typing import Tuple
def custom_service_8_handling(
object_id: int, action_id: int, custom_data: bytearray) -> Tuple[list, list]:
"""
This function is called by the TMTC core if a Service 8 data reply (subservice 130)
is received. The user can return a tuple of two lists, where the first list
is a list of header strings to print and the second list is a list of values to print.
The TMTC core will take care of printing both lists and logging them.
@param object_id:
@param action_id:
@param custom_data:
@return:
"""
header_list = []
content_list = []
return header_list, content_list

View File

@@ -0,0 +1,28 @@
from typing import cast
from tmtccmd.tm.pus_1_verification import Service1TMExtended
from tmtccmd.logging import get_console_logger
from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter
from common_tmtc.config.retvals import get_retval_dict
LOGGER = get_console_logger()
def handle_service_1_packet(printer: FsfwTmTcPrinter, raw_tm: bytes):
tm_packet = Service1TMExtended.unpack(raw_telemetry=raw_tm)
printer.handle_long_tm_print(packet_if=tm_packet, info_if=tm_packet)
srv1_packet = cast(Service1TMExtended, tm_packet)
retval_dict = get_retval_dict()
if srv1_packet.has_tc_error_code:
retval_info = retval_dict.get(srv1_packet.error_code)
if retval_info is None:
LOGGER.info(
f"No returnvalue information found for error code {srv1_packet.error_code}"
)
else:
retval_string = (
f"Error Code information for code {srv1_packet.error_code} | "
f"Name: {retval_info.name} | Info: {retval_info.info}"
)
LOGGER.info(retval_string)
printer.file_logger.info(retval_string)

64
tmtcc.py Normal file
View File

@@ -0,0 +1,64 @@
import argparse
import sys
import traceback
from typing import Optional
try:
import spacepackets
except ImportError as error:
print(error)
print("Python spacepackets module could not be imported")
print(
'Install with "cd spacepackets && python3 -m pip intall -e ." for interative installation'
)
sys.exit(1)
try:
import tmtccmd.runner as tmtccmd
from tmtccmd.logging.pus import create_tmtc_logger
from tmtccmd.ccsds.handler import ApidHandler, CcsdsTmHandler
from tmtccmd.config import SetupArgs, default_json_path
from tmtccmd.config.args import (
create_default_args_parser,
add_default_tmtccmd_args,
parse_default_input_arguments,
)
except ImportError as error:
run_tmtc_commander = None
initialize_tmtc_commander = None
tb = traceback.format_exc()
print(tb)
print("Python tmtccmd submodule could not be imported")
sys.exit(1)
from common_tmtc.config import __version__
from common_tmtc.config.definitions import PUS_APID
from common_tmtc.config.hook_implementation import CommonFsfwHookBase
from common_tmtc.pus_tm.factory_hook import ccsds_tm_handler
from common_tmtc.pus_tc.tc_packing import pre_tc_send_cb
def tmtcc_pre_args():
print(f"-- eive tmtc v{__version__} --")
print(f"-- spacepackets v{spacepackets.__version__} --")
tmtccmd.init_printout(False)
def tmtcc_post_args(
hook_obj: CommonFsfwHookBase, use_gui: bool, args: Optional[argparse.Namespace]
):
setup_args = SetupArgs(
hook_obj=hook_obj, use_gui=use_gui, apid=PUS_APID, cli_args=args
)
tmtc_file_logger = create_tmtc_logger()
apid_handler = ApidHandler(cb=ccsds_tm_handler, queue_len=50, user_args=None)
ccsds_handler = CcsdsTmHandler()
ccsds_handler.add_tm_handler(apid=PUS_APID, handler=apid_handler)
tmtccmd.setup(setup_args=setup_args)
tmtccmd.add_ccsds_handler(ccsds_handler)
tmtc_backend = tmtccmd.create_default_tmtc_backend(
setup_args=setup_args,
tm_handler=ccsds_handler,
)
tmtc_backend.usr_send_wrapper = (pre_tc_send_cb, tmtc_file_logger)
tmtccmd.run(tmtc_backend=tmtc_backend)