Compare commits

..

52 Commits

Author SHA1 Message Date
c9ae13c16f some fixes for common tmtc code 2023-01-13 11:07:08 +01:00
8ad775e5be minor improvements 2022-10-19 11:55:58 +02:00
9abfba1e0e basic printout for received PDU TMs 2022-10-19 11:49:44 +02:00
34203461fa some changes for CFDP done logic 2022-10-17 12:26:00 +02:00
73ba9f5d90 use singular enum names 2022-09-15 18:46:28 +02:00
7b0c8fa25a separate function for cfdp setup 2022-09-15 16:28:58 +02:00
4a9ed5aad0 more generic PDU packing approach 2022-09-15 14:31:13 +02:00
51871a887f moved a lot of code to tmtccmd 2022-09-14 19:01:47 +02:00
3787164cce this works 2022-09-14 18:30:52 +02:00
c4dcf0df76 getting complicated 2022-09-14 18:19:56 +02:00
061d62579b info printout for finished CFDP queue 2022-09-14 17:54:57 +02:00
8237591c4c more useful printout 2022-09-14 17:48:15 +02:00
e4e9b3d5d3 all indications working 2022-09-14 17:40:39 +02:00
0faccfb456 better printout 2022-09-14 17:35:33 +02:00
0e1d451651 send eof packet to finish transaction 2022-09-14 14:00:37 +02:00
6ee3fa67dc send wrapped CFDP packets and clean up printout 2022-09-13 13:54:47 +02:00
a3d1808457 building infastructure to build queue 2022-09-13 11:59:29 +02:00
e9aaa5502d printout tweaks 2022-09-12 10:08:03 +02:00
845ee041d4 correct code is now reached 2022-09-12 09:39:37 +02:00
99ccd04b2a some adaptions 2022-09-12 00:53:15 +02:00
27c1002d25 now only arg param to proc conversion missing 2022-09-09 17:51:41 +02:00
0e27436284 add back ping TC 2022-09-09 17:43:13 +02:00
d1f3898be8 continue cfdp setup 2022-09-09 17:41:44 +02:00
0d9b6b7086 create CFDP components 2022-09-09 16:55:04 +02:00
aca667248c now only args to put req conversion missing 2022-09-09 15:24:23 +02:00
d737ba90f9 simplifications 2022-09-09 15:21:14 +02:00
6a7a8bac47 some documentation 2022-09-09 15:06:58 +02:00
4f9e246ecd some architectural improvements 2022-09-09 14:58:34 +02:00
64ade84d51 integrate basic cfdp proc support 2022-09-09 14:46:41 +02:00
af8dccb56b clean up 2022-09-09 14:30:15 +02:00
b1b19f9cba added cfdp module to common 2022-09-09 14:23:18 +02:00
1c0905292e use cfdp args parsing now 2022-09-08 17:52:09 +02:00
c83e00a67b bump dependencies 2022-08-24 15:31:06 +02:00
81a3abe850 update verif handler 2022-08-18 18:34:26 +02:00
865ac0574f name change 2022-08-17 17:23:02 +02:00
8554ab5742 small renaming 2022-07-28 16:49:19 +02:00
6de31ccd3f update for new default pus queue helper 2022-07-28 16:37:01 +02:00
57aeba3de0 smaller fixes 2022-07-28 15:44:57 +02:00
2593105d65 re-test param service 2022-07-28 15:37:19 +02:00
9dedcd4e4f log in file logger as well 2022-07-28 15:26:37 +02:00
99165298d4 delete obsolete module, add log printout 2022-07-28 15:25:06 +02:00
6e89b61bf8 basic PUS11 support 2022-07-27 20:43:52 +02:00
4a90841282 update to newer tmtccmd 2022-07-27 14:40:25 +02:00
1f3e04bde4 reduce init printout 2022-07-04 10:41:31 +02:00
d87e2971b4 update code to new tmtccmd version 2022-07-03 20:58:32 +02:00
4cc02d287d rename file 2022-07-03 19:29:00 +02:00
3cca54f66f moved some helpers into framework 2022-05-24 14:50:37 +02:00
cdbd40363d added more tc sched commands 2022-05-24 14:27:29 +02:00
0064faf72d hate forgetting to push so much 2022-05-24 13:24:02 +02:00
7d197dbe4b cotinued sched service test 2022-05-20 11:08:46 +02:00
756b300c31 if elif fix 2022-05-20 00:19:50 +02:00
d308899920 update common_tmtc 2022-05-18 23:40:13 +02:00
24 changed files with 1157 additions and 559 deletions

427
common.py Normal file
View File

@ -0,0 +1,427 @@
import logging
import sys
from pathlib import Path
from typing import cast
from spacepackets import SpacePacket, SpacePacketHeader
from spacepackets.ccsds import SPACE_PACKET_HEADER_SIZE
from spacepackets.cfdp import (
TransmissionMode,
ChecksumType,
ConditionCode,
PduHolder,
DirectiveType,
PduFactory,
PduType,
)
from spacepackets.cfdp.pdu import MetadataPdu, FileDataPdu
from tmtccmd.cfdp import (
RemoteEntityCfg,
LocalEntityCfg,
CfdpUserBase,
IndicationCfg,
TransactionId,
)
from tmtccmd.cfdp.defs import CfdpRequestType
from tmtccmd.cfdp.handler import CfdpInCcsdsHandler
from tmtccmd.cfdp.mib import DefaultFaultHandlerBase
from tmtccmd.cfdp.user import (
FileSegmentRecvdParams,
MetadataRecvParams,
TransactionFinishedParams,
)
from tmtccmd.config.args import ProcedureParamsWrapper
from tmtccmd.logging import get_current_time_string
from tmtccmd.pus.pus_11_tc_sched import Subservices as Pus11Subservices
from tmtccmd.tc.queue import DefaultPusQueueHelper
from tmtccmd.util import FileSeqCountProvider, PusFileSeqCountProvider
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter
try:
import spacepackets
except ImportError as error:
print(error)
print("Python tmtccmd module could not be imported. Make sure it is installed")
sys.exit(1)
try:
import tmtccmd
except ImportError as error:
print(error)
print("Python tmtccmd module could not be imported. Make sure it is installed")
sys.exit(1)
from spacepackets.ecss import PusVerificator, PusTelecommand, PusServices
from common_tmtc.pus_tc.pus_11_tc_sched import pack_service_11_commands
from common_tmtc.pus_tc.pus_17_test import pack_service_17_commands
from common_tmtc.pus_tc.pus_200_mode import pack_service_200_commands_into
from common_tmtc.pus_tc.service_20_parameters import pack_service20_commands_into
from common_tmtc.pus_tc.service_2_raw_cmd import pack_service_2_commands_into
from common_tmtc.pus_tc.service_3_housekeeping import pack_service_3_commands_into
from common_tmtc.pus_tc.service_8_func_cmd import pack_service_8_commands_into
from examples.tmtcc import (
EXAMPLE_PUS_APID,
EXAMPLE_CFDP_APID,
CFDP_LOCAL_ENTITY_ID,
CFDP_REMOTE_ENTITY_ID,
)
from tmtccmd import (
TcHandlerBase,
get_console_logger,
TmTcCfgHookBase,
CcsdsTmtcBackend,
)
from tmtccmd.pus import VerificationWrapper
from tmtccmd.tc import (
ProcedureWrapper,
FeedWrapper,
TcProcedureType,
TcQueueEntryType,
SendCbParams,
)
from tmtccmd.tc.pus_5_event import pack_generic_service_5_test_into
from tmtccmd.tm import SpecificApidHandlerBase, CcsdsTmHandler
from tmtccmd.logging.pus import RawTmtcTimedLogWrapper
from tmtccmd.config import (
CoreServiceList,
SetupWrapper,
SetupParams,
PreArgsParsingWrapper,
params_to_procedure_conversion,
)
from common_tmtc.pus_tm.factory_hook import pus_factory_hook
LOGGER = get_console_logger()
class ExampleCfdpFaultHandler(DefaultFaultHandlerBase):
def notice_of_suspension_cb(self, cond: ConditionCode):
pass
def notice_of_cancellation_cb(self, cond: ConditionCode):
pass
def abandoned_cb(self, cond: ConditionCode):
pass
def ignore_cb(self, cond: ConditionCode):
pass
class ExampleCfdpUser(CfdpUserBase):
def transaction_indication(self, transaction_id: TransactionId):
LOGGER.info(f"CFDP User: Start of File {transaction_id}")
def eof_sent_indication(self, transaction_id: TransactionId):
LOGGER.info(f"CFDP User: EOF sent for {transaction_id}")
def transaction_finished_indication(self, params: TransactionFinishedParams):
LOGGER.info(f"CFDP User: {params.transaction_id} finished")
def metadata_recv_indication(self, params: MetadataRecvParams):
pass
def file_segment_recv_indication(self, params: FileSegmentRecvdParams):
pass
def report_indication(self, transaction_id: TransactionId, status_report: any):
pass
def suspended_indication(
self, transaction_id: TransactionId, cond_code: ConditionCode
):
pass
def resumed_indication(self, transaction_id: TransactionId, progress: int):
pass
def fault_indication(
self, transaction_id: TransactionId, cond_code: ConditionCode, progress: int
):
pass
def abandoned_indication(
self, transaction_id: TransactionId, cond_code: ConditionCode, progress: int
):
pass
def eof_recv_indication(self, transaction_id: TransactionId):
pass
class CfdpInCcsdsWrapper(SpecificApidHandlerBase):
def __init__(self, cfdp_in_ccsds_handler: CfdpInCcsdsHandler):
super().__init__(EXAMPLE_CFDP_APID, None)
self.handler = cfdp_in_ccsds_handler
def handle_tm(self, packet: bytes, _user_args: any):
# Ignore the space packet header. Its only purpose is to use the same protocol and
# have a seaprate APID for space packets. If this function is called, the APID is correct.
pdu = packet[SPACE_PACKET_HEADER_SIZE:]
pdu_base = PduFactory.from_raw(pdu)
if pdu_base.pdu_type == PduType.FILE_DATA:
LOGGER.info("Received File Data PDU TM")
else:
if pdu_base.directive_type == DirectiveType.FINISHED_PDU:
LOGGER.info(f"Received Finished PDU TM")
else:
LOGGER.info(
f"Received File Directive PDU with type {pdu_base.directive_type!r} TM"
)
self.handler.pass_pdu_packet(pdu_base)
class PusHandler(SpecificApidHandlerBase):
def __init__(
self,
wrapper: VerificationWrapper,
printer: FsfwTmTcPrinter,
raw_logger: RawTmtcTimedLogWrapper,
):
super().__init__(EXAMPLE_PUS_APID, None)
self.printer = printer
self.verif_wrapper = wrapper
self.raw_logger = raw_logger
def handle_tm(self, packet: bytes, _user_args: any):
pus_factory_hook(
packet=packet,
wrapper=self.verif_wrapper,
raw_logger=self.raw_logger,
printer=self.printer,
)
class TcHandler(TcHandlerBase):
def __init__(
self,
seq_count_provider: FileSeqCountProvider,
cfdp_in_ccsds_wrapper: CfdpInCcsdsWrapper,
pus_verificator: PusVerificator,
file_logger: logging.Logger,
raw_logger: RawTmtcTimedLogWrapper,
):
super().__init__()
self.cfdp_handler_started = False
self.cfdp_dest_id = CFDP_REMOTE_ENTITY_ID
self.seq_count_provider = seq_count_provider
self.pus_verificator = pus_verificator
self.file_logger = file_logger
self.raw_logger = raw_logger
self.queue_helper = DefaultPusQueueHelper(
queue_wrapper=None,
pus_apid=EXAMPLE_PUS_APID,
seq_cnt_provider=seq_count_provider,
pus_verificator=pus_verificator,
)
self.cfdp_in_ccsds_wrapper = cfdp_in_ccsds_wrapper
def cfdp_done(self) -> bool:
return not self.cfdp_in_ccsds_wrapper.handler.put_request_pending()
def feed_cb(self, info: ProcedureWrapper, wrapper: FeedWrapper):
self.queue_helper.queue_wrapper = wrapper.queue_wrapper
if info.proc_type == TcProcedureType.DEFAULT:
self.handle_default_procedure(info)
elif info.proc_type == TcProcedureType.CFDP:
self.handle_cfdp_procedure(info)
def handle_default_procedure(self, info: ProcedureWrapper):
def_proc = info.to_def_procedure()
service = def_proc.service
op_code = def_proc.op_code
if service == CoreServiceList.SERVICE_2.value:
return pack_service_2_commands_into(op_code=op_code, q=self.queue_helper)
if service == CoreServiceList.SERVICE_3.value:
return pack_service_3_commands_into(op_code=op_code, q=self.queue_helper)
if service == CoreServiceList.SERVICE_5.value:
return pack_generic_service_5_test_into(q=self.queue_helper)
if service == CoreServiceList.SERVICE_8.value:
return pack_service_8_commands_into(op_code=op_code, q=self.queue_helper)
if service == CoreServiceList.SERVICE_11.value:
return pack_service_11_commands(op_code=op_code, q=self.queue_helper)
if service == CoreServiceList.SERVICE_17.value:
return pack_service_17_commands(op_code=op_code, q=self.queue_helper)
if service == CoreServiceList.SERVICE_20.value:
return pack_service20_commands_into(q=self.queue_helper, op_code=op_code)
if service == CoreServiceList.SERVICE_200.value:
return pack_service_200_commands_into(q=self.queue_helper, op_code=op_code)
LOGGER.warning("Invalid Service !")
def handle_cfdp_procedure(self, info: ProcedureWrapper):
cfdp_procedure = info.to_cfdp_procedure()
if cfdp_procedure.cfdp_request_type == CfdpRequestType.PUT:
if (
not self.cfdp_in_ccsds_wrapper.handler.put_request_pending()
and not self.cfdp_handler_started
):
put_req = cfdp_procedure.request_wrapper.to_put_request()
put_req.cfg.destination_id = self.cfdp_dest_id
LOGGER.info(
f"CFDP: Starting file put request with parameters:\n{put_req}"
)
self.cfdp_in_ccsds_wrapper.handler.cfdp_handler.put_request(put_req)
self.cfdp_handler_started = True
for source_pair, dest_pair in self.cfdp_in_ccsds_wrapper.handler:
pdu, sp = source_pair
pdu = cast(PduHolder, pdu)
if pdu.is_file_directive:
if pdu.pdu_directive_type == DirectiveType.METADATA_PDU:
metadata = pdu.to_metadata_pdu()
self.queue_helper.add_log_cmd(
f"CFDP Source: Sending Metadata PDU for file with size "
f"{metadata.file_size}"
)
elif pdu.pdu_directive_type == DirectiveType.EOF_PDU:
self.queue_helper.add_log_cmd(
f"CFDP Source: Sending EOF PDU"
)
else:
fd_pdu = pdu.to_file_data_pdu()
self.queue_helper.add_log_cmd(
f"CFDP Source: Sending File Data PDU for segment at offset "
f"{fd_pdu.offset} with length {len(fd_pdu.file_data)}"
)
self.queue_helper.add_ccsds_tc(sp)
self.cfdp_in_ccsds_wrapper.handler.confirm_source_packet_sent()
self.cfdp_in_ccsds_wrapper.handler.source_handler.state_machine()
def send_cb(self, params: SendCbParams):
if params.entry.is_tc:
if params.entry.entry_type == TcQueueEntryType.PUS_TC:
self.handle_tc_send_cb(params)
elif params.entry.entry_type == TcQueueEntryType.CCSDS_TC:
cfdp_packet_in_ccsds = params.entry.to_space_packet_entry()
params.com_if.send(cfdp_packet_in_ccsds.space_packet.pack())
elif params.entry.entry_type == TcQueueEntryType.LOG:
log_entry = params.entry.to_log_entry()
LOGGER.info(log_entry.log_str)
self.file_logger.info(log_entry.log_str)
def handle_tc_send_cb(self, params: SendCbParams):
pus_tc_wrapper = params.entry.to_pus_tc_entry()
if (
pus_tc_wrapper.pus_tc.service == PusServices.S11_TC_SCHED
and pus_tc_wrapper.pus_tc.subservice == Pus11Subservices.TC_INSERT
):
wrapped_tc = PusTelecommand.unpack(pus_tc_wrapper.pus_tc.app_data[4:])
tc_info_string = f"Sending time-tagged command {wrapped_tc}"
LOGGER.info(tc_info_string)
self.file_logger.info(f"{get_current_time_string(True)}: {tc_info_string}")
raw_tc = pus_tc_wrapper.pus_tc.pack()
self.raw_logger.log_tc(pus_tc_wrapper.pus_tc)
tc_info_string = f"Sending {pus_tc_wrapper.pus_tc}"
LOGGER.info(tc_info_string)
self.file_logger.info(f"{get_current_time_string(True)}: {tc_info_string}")
params.com_if.send(raw_tc)
def queue_finished_cb(self, info: ProcedureWrapper):
if info is not None:
if info.proc_type == TcQueueEntryType.PUS_TC:
def_proc = info.to_def_procedure()
LOGGER.info(
f"Finished queue for service {def_proc.service} and op code {def_proc.op_code}"
)
elif info.proc_type == TcProcedureType.CFDP:
LOGGER.info(f"Finished CFDP queue")
self.cfdp_sending_done = True
def setup_params(hook_obj: TmTcCfgHookBase) -> SetupWrapper:
print(f"-- eive TMTC Commander --")
print(f"-- spacepackets v{spacepackets.__version__} --")
params = SetupParams()
parser_wrapper = PreArgsParsingWrapper()
parser_wrapper.create_default_parent_parser()
parser_wrapper.create_default_parser()
parser_wrapper.add_def_proc_and_cfdp_as_subparsers()
post_arg_parsing_wrapper = parser_wrapper.parse(hook_obj)
tmtccmd.init_printout(post_arg_parsing_wrapper.use_gui)
use_prompts = not post_arg_parsing_wrapper.use_gui
proc_param_wrapper = ProcedureParamsWrapper()
if use_prompts:
post_arg_parsing_wrapper.set_params_with_prompts(params, proc_param_wrapper)
else:
post_arg_parsing_wrapper.set_params_without_prompts(params, proc_param_wrapper)
params.apid = EXAMPLE_PUS_APID
setup_wrapper = SetupWrapper(
hook_obj=hook_obj, setup_params=params, proc_param_wrapper=proc_param_wrapper
)
return setup_wrapper
def setup_cfdp_handler() -> CfdpInCcsdsWrapper:
fh_base = ExampleCfdpFaultHandler()
cfdp_cfg = LocalEntityCfg(
local_entity_id=CFDP_LOCAL_ENTITY_ID,
indication_cfg=IndicationCfg(),
default_fault_handlers=fh_base,
)
remote_cfg = RemoteEntityCfg(
closure_requested=False,
entity_id=CFDP_REMOTE_ENTITY_ID,
max_file_segment_len=1024,
check_limit=None,
crc_on_transmission=False,
crc_type=ChecksumType.CRC_32,
default_transmission_mode=TransmissionMode.UNACKNOWLEDGED,
)
cfdp_seq_count_provider = FileSeqCountProvider(
max_bit_width=16, file_name=Path("seqcnt_cfdp_transaction.txt")
)
cfdp_ccsds_seq_count_provider = PusFileSeqCountProvider(
file_name=Path("seqcnt_cfdp_ccsds_.txt")
)
cfdp_user = ExampleCfdpUser()
cfdp_in_ccsds_handler = CfdpInCcsdsHandler(
cfg=cfdp_cfg,
remote_cfgs=[remote_cfg],
ccsds_apid=EXAMPLE_CFDP_APID,
ccsds_seq_cnt_provider=cfdp_ccsds_seq_count_provider,
cfdp_seq_cnt_provider=cfdp_seq_count_provider,
user=cfdp_user,
)
return CfdpInCcsdsWrapper(cfdp_in_ccsds_handler)
def setup_tmtc_handlers(
verif_wrapper: VerificationWrapper,
printer: FsfwTmTcPrinter,
raw_logger: RawTmtcTimedLogWrapper,
) -> (CcsdsTmHandler, TcHandler):
cfdp_in_ccsds_wrapper = setup_cfdp_handler()
pus_handler = PusHandler(
printer=printer, raw_logger=raw_logger, wrapper=verif_wrapper
)
ccsds_handler = CcsdsTmHandler(None)
ccsds_handler.add_apid_handler(pus_handler)
ccsds_handler.add_apid_handler(cfdp_in_ccsds_wrapper)
tc_handler = TcHandler(
file_logger=printer.file_logger,
raw_logger=raw_logger,
pus_verificator=verif_wrapper.pus_verificator,
seq_count_provider=PusFileSeqCountProvider(
file_name=Path("seqcnt_pus_ccsds.txt")
),
cfdp_in_ccsds_wrapper=cfdp_in_ccsds_wrapper,
)
return ccsds_handler, tc_handler
def setup_backend(
setup_wrapper: SetupWrapper,
tc_handler: TcHandler,
ccsds_handler: CcsdsTmHandler,
) -> CcsdsTmtcBackend:
init_proc = params_to_procedure_conversion(setup_wrapper.proc_param_wrapper)
tmtc_backend = tmtccmd.create_default_tmtc_backend(
setup_wrapper=setup_wrapper,
tm_handler=ccsds_handler,
tc_handler=tc_handler,
init_procedure=init_proc,
)
tmtccmd.start(tmtc_backend=tmtc_backend, hook_obj=setup_wrapper.hook_obj)
return cast(CcsdsTmtcBackend, tmtc_backend)

View File

@ -0,0 +1,6 @@
SW_NAME = "fsfw-tmtc"
SW_VERSION = 1
SW_SUBVERSION = 4
SW_SUBSUBVERSION = 0
__version__ = "1.4.0"

View File

@ -1 +1,2 @@
PUS_APID = 0xEF
TM_SP_IDS = ((0x08 << 8) | PUS_APID,)

View File

@ -1,112 +1,33 @@
import argparse
from typing import Dict, Tuple, Optional
from abc import abstractmethod
from typing import Optional
from spacepackets.ecss.conf import PusVersion
from tmtccmd.com_if.com_interface_base import CommunicationInterface
from tmtccmd.config.definitions import ServiceOpCodeDictT
from tmtccmd.config.hook import TmTcHookBase
from tmtccmd.core.backend import TmTcHandler
from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.tm.service_3_base import Service3Base
from tmtccmd.utility.tmtc_printer import TmTcPrinter
from common_tmtc.config.definitions import PUS_APID
from tmtccmd import BackendBase
from tmtccmd.com_if import ComInterface
from tmtccmd.config import TmTcCfgHookBase, TmtcDefinitionWrapper, default_json_path
from tmtccmd.util import ObjectIdDictT, RetvalDictT
class FsfwHookBase(TmTcHookBase):
def get_json_config_file_path(self) -> str:
return "config/tmtc_config.json"
class CommonFsfwHookBase(TmTcCfgHookBase):
def __init__(self, json_cfg_path: Optional[str] = None):
if json_cfg_path is None:
json_cfg_path = default_json_path()
super().__init__(json_cfg_path=json_cfg_path)
def get_service_op_code_dictionary(self) -> ServiceOpCodeDictT:
from tmtccmd.config.globals import (
get_default_service_op_code_dict,
add_service_op_code_entry,
add_op_code_entry,
OpCodeDictKeys,
)
@abstractmethod
def get_tmtc_definitions(self) -> TmtcDefinitionWrapper:
pass
def_dict = get_default_service_op_code_dict()
op_code = dict()
add_op_code_entry(op_code_dict=op_code, keys="test", info="Mode CMD Test")
add_op_code_entry(
op_code_dict=op_code,
keys=["0", "asm_to_normal"],
info="Command test assembly to normal mode",
options={OpCodeDictKeys.TIMEOUT: 6.0},
)
add_service_op_code_entry(
srv_op_code_dict=def_dict,
name="200",
info="Mode MGMT",
op_code_entry=op_code,
)
return def_dict
@abstractmethod
def assign_communication_interface(self, com_if_key: str) -> Optional[ComInterface]:
pass
def add_globals_pre_args_parsing(self, gui: bool = False):
from tmtccmd.config.globals import set_default_globals_pre_args_parsing
set_default_globals_pre_args_parsing(
gui=gui,
pus_tm_version=PusVersion.PUS_C,
pus_tc_version=PusVersion.PUS_C,
tc_apid=PUS_APID,
tm_apid=PUS_APID,
)
def add_globals_post_args_parsing(self, args: argparse.Namespace):
from tmtccmd.config.globals import set_default_globals_post_args_parsing
set_default_globals_post_args_parsing(
args=args, json_cfg_path=self.get_json_config_file_path()
)
def assign_communication_interface(
self, com_if_key: str, tmtc_printer: TmTcPrinter
) -> Optional[CommunicationInterface]:
from tmtccmd.config.com_if import create_communication_interface_default
return create_communication_interface_default(
com_if_key=com_if_key,
tmtc_printer=tmtc_printer,
json_cfg_path=self.get_json_config_file_path(),
space_packet_ids=(0x08EF,),
)
def perform_mode_operation(self, tmtc_backend: TmTcHandler, mode: int):
def perform_mode_operation(self, tmtc_backend: BackendBase, mode: int):
print("No custom mode operation implemented")
def pack_service_queue(self, service: int, op_code: str, service_queue: TcQueueT):
from common_tmtc.pus_tc.tc_packing import pack_service_queue_user
pack_service_queue_user(
service=service, op_code=op_code, tc_queue=service_queue
)
def get_object_ids(self) -> Dict[bytes, list]:
def get_object_ids(self) -> ObjectIdDictT:
from common_tmtc.config.object_ids import get_object_ids
return get_object_ids()
@staticmethod
def handle_service_8_telemetry(
object_id: int, action_id: int, custom_data: bytearray
) -> Tuple[list, list]:
from common_tmtc.pus_tm.service_8_handling import custom_service_8_handling
return custom_service_8_handling(
object_id=object_id, action_id=action_id, custom_data=custom_data
)
@staticmethod
def handle_service_3_housekeeping(
object_id: bytes, set_id: int, hk_data: bytearray, service3_packet: Service3Base
) -> Tuple[list, list, bytearray, int]:
from common_tmtc.pus_tm.service_3_hk_handling import service_3_hk_handling
return service_3_hk_handling(
object_id=object_id,
set_id=set_id,
hk_data=hk_data,
service3_packet=service3_packet,
)
def get_retval_dict(self) -> RetvalDictT:
return self.get_retval_dict()

View File

@ -3,7 +3,15 @@
@details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs.
"""
from typing import Dict
import os
from tmtccmd.fsfw import parse_fsfw_objects_csv
from tmtccmd.logging import get_console_logger
from tmtccmd.util.obj_id import ObjectIdDictT
LOGGER = get_console_logger()
DEFAULT_OBJECTS_CSV_PATH = "config/objects.csv"
__OBJECT_ID_DICT = None
PUS_SERVICE_17_ID = bytes([0x53, 0x00, 0x00, 0x17])
TEST_DEVICE_0_ID = bytes([0x44, 0x01, 0xAF, 0xFE])
@ -11,11 +19,13 @@ TEST_DEVICE_1_ID = bytes([0x44, 0x02, 0xAF, 0xFE])
ASSEMBLY_ID = bytes([0x41, 0x00, 0xCA, 0xFE])
def get_object_ids() -> Dict[bytes, list]:
object_id_dict = {
PUS_SERVICE_17_ID: ["PUS Service 17"],
TEST_DEVICE_0_ID: ["Test Device 0"],
TEST_DEVICE_1_ID: ["Test Device 1"],
ASSEMBLY_ID: ["Assembly ID"],
}
return object_id_dict
def get_object_ids() -> ObjectIdDictT:
global __OBJECT_ID_DICT
if not os.path.exists(DEFAULT_OBJECTS_CSV_PATH):
LOGGER.warning(f"No Objects CSV file found at {DEFAULT_OBJECTS_CSV_PATH}")
if __OBJECT_ID_DICT is None:
if os.path.exists(DEFAULT_OBJECTS_CSV_PATH):
__OBJECT_ID_DICT = parse_fsfw_objects_csv(csv_file=DEFAULT_OBJECTS_CSV_PATH)
else:
__OBJECT_ID_DICT = dict()
return __OBJECT_ID_DICT

22
config/retvals.py Normal file
View File

@ -0,0 +1,22 @@
import os
from tmtccmd.fsfw import parse_fsfw_returnvalues_csv, RetvalDictT
from tmtccmd.logging import get_console_logger
DEFAULT_RETVAL_CSV_NAME = "config/returnvalues.csv"
__RETVAL_DICT = None
LOGGER = get_console_logger()
def get_retval_dict() -> RetvalDictT:
global __RETVAL_DICT
if __RETVAL_DICT is None:
if os.path.exists(DEFAULT_RETVAL_CSV_NAME):
__RETVAL_DICT = parse_fsfw_returnvalues_csv(
csv_file=DEFAULT_RETVAL_CSV_NAME
)
else:
LOGGER.warning(
f"No Return Value CSV file found at {DEFAULT_RETVAL_CSV_NAME}"
)
__RETVAL_DICT = dict()
return __RETVAL_DICT

View File

@ -1,4 +0,0 @@
SW_NAME = "fsfw-tmtc"
SW_VERSION = 1
SW_SUBVERSION = 4
SW_SUBSUBVERSION = 0

29
pus_tc/cmd_definitions.py Normal file
View File

@ -0,0 +1,29 @@
from common_tmtc.pus_tc.service_20_parameters import add_param_cmds
from common_tmtc.pus_tc.service_3_housekeeping import add_hk_cmds
from tmtccmd.config import TmtcDefinitionWrapper
from common_tmtc.pus_tc.pus_11_tc_sched import add_tc_sched_cmds
from tmtccmd.config.globals import get_default_tmtc_defs
from tmtccmd.config.tmtc import OpCodeEntry
def common_fsfw_service_op_code_dict() -> TmtcDefinitionWrapper:
def_wrapper = get_default_tmtc_defs()
op_code_entry = OpCodeEntry()
op_code_entry.add(keys="test", info="Mode CMD Test")
op_code_entry.add(
keys=["0", "asm_to_normal"], info="Command test assembly to normal mode"
)
def_wrapper.add_service(
"200", info="PUS Service 200 Mode MGMT", op_code_entry=op_code_entry
)
op_code_entry = OpCodeEntry()
op_code_entry.add(keys="ping", info="Send ping command")
def_wrapper.add_service(
"17", info="PUS Service 17 Ping", op_code_entry=op_code_entry
)
add_tc_sched_cmds(def_wrapper)
add_param_cmds(def_wrapper)
add_hk_cmds(def_wrapper)
return def_wrapper

184
pus_tc/pus_11_tc_sched.py Normal file
View File

@ -0,0 +1,184 @@
import struct
import time
from datetime import timedelta
from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.config import TmtcDefinitionWrapper
from tmtccmd.config.tmtc import OpCodeEntry
from tmtccmd.pus.pus_17_test import pack_service_17_ping_command
from tmtccmd.pus.pus_11_tc_sched import TypeOfTimeWindow, Subservices, TcSchedReqId
from tmtccmd.tc.pus_11_tc_sched import (
generate_enable_tc_sched_cmd,
generate_disable_tc_sched_cmd,
generate_reset_tc_sched_cmd,
pack_time_tagged_tc_app_data,
)
from tmtccmd.tc.queue import DefaultPusQueueHelper
class OpCodes:
ENABLE = ["0", "enable"]
DISABLE = ["1", "disable"]
RESET = ["2", "reset"]
TEST_INSERT = ["12", "test-insert"]
TEST_DELETE = ["13", "test-del"]
TEST_RESET = ["14", "test-clear"]
class Info:
ENABLE = "Enable TC scheduling"
DISABLE = "Disable TC scheduling"
RESET = "Reset TC scheduling"
TEST_INSERT = "Test TC scheduling insertion"
TEST_DELETE = "Test TC scheduling deletion"
TEST_RESET = "Test TC scheduling reset command"
def add_tc_sched_cmds(cmd_dict: TmtcDefinitionWrapper):
op_code_entry = OpCodeEntry()
op_code_entry.add(keys=OpCodes.ENABLE, info=Info.ENABLE)
op_code_entry.add(keys=OpCodes.DISABLE, info=Info.DISABLE)
op_code_entry.add(keys=OpCodes.RESET, info=Info.RESET)
op_code_entry.add(keys=OpCodes.TEST_INSERT, info=Info.TEST_INSERT)
op_code_entry.add(keys=OpCodes.TEST_DELETE, info=Info.TEST_DELETE)
op_code_entry.add(keys=OpCodes.TEST_RESET, info=Info.TEST_RESET)
cmd_dict.add_service(
"11", info="PUS Service 11 TC Scheduling", op_code_entry=op_code_entry
)
def __generic_pack_three_time_tagged_cmds(q: DefaultPusQueueHelper):
q.add_log_cmd("Testing Time-Tagged Command insertion")
q.add_pus_tc(generate_enable_tc_sched_cmd())
current_time = int(round(time.time()))
# these TC[17,1] (ping commands) shall be inserted
ping_tcs = [
pack_service_17_ping_command(),
pack_service_17_ping_command(),
pack_service_17_ping_command(),
]
for idx, tc in enumerate(ping_tcs):
release_time = current_time + (idx + 2) * 5
q.add_pus_tc(
PusTelecommand(
service=11,
subservice=Subservices.TC_INSERT,
app_data=pack_time_tagged_tc_app_data(
struct.pack("!I", release_time), tc
),
)
)
q.add_wait(timedelta(seconds=25.0))
def pack_service_11_commands(op_code: str, q: DefaultPusQueueHelper):
if op_code in OpCodes.TEST_INSERT:
q.add_log_cmd("Testing Time-Tagged Command deletion")
__generic_pack_three_time_tagged_cmds(q=q)
if op_code in OpCodes.TEST_DELETE:
current_time = int(round(time.time()))
tc_to_schedule = pack_service_17_ping_command()
q.add_pus_tc(
PusTelecommand(
service=11,
subservice=Subservices.TC_INSERT,
app_data=pack_time_tagged_tc_app_data(
struct.pack("!I", current_time + 20), tc_to_schedule
),
)
)
q.add_pus_tc(
PusTelecommand(
service=11,
subservice=Subservices.TC_DELETE,
app_data=pack_delete_corresponding_tc_app_data(tc=tc_to_schedule),
)
)
if op_code in OpCodes.ENABLE:
q.add_log_cmd("Enabling TC scheduler")
q.add_pus_tc(generate_enable_tc_sched_cmd())
if op_code in OpCodes.DISABLE:
q.add_log_cmd("Disabling TC scheduler")
q.add_pus_tc(generate_disable_tc_sched_cmd())
if op_code in OpCodes.RESET:
q.add_log_cmd("Reset TC scheduler")
q.add_pus_tc(generate_reset_tc_sched_cmd())
if op_code in OpCodes.TEST_RESET:
q.add_log_cmd("Testing Reset command")
__generic_pack_three_time_tagged_cmds(q)
q.add_pus_tc(generate_reset_tc_sched_cmd())
# a TC[11,5] for 3rd inserted ping TC
# TODO: This should be an independent test
# a TC[11,6] for some other previously inserted TCs
# service_11_6_tc = build_filter_delete_tc(TypeOfTimeWindow.FROM_TIMETAG_TO_TIMETAG,
# current_time+45, current_time+55)
# tc_queue.appendleft(service_11_6_tc.pack_command_tuple())
# TODO: This should be an independent test
# a TC[11,7] for another previously inserted TC
# service_11_7_tc = build_corresponding_timeshift_tc(30, ping_tc_4)
# tc_queue.appendleft(service_11_7_tc.pack_command_tuple())
# TODO: This should be an independent test
# a TC[11,8] with offset time of 20s
# service_11_8_tc = build_filter_timeshift_tc(
# 20,
# TypeOfTimeWindow.FROM_TIMETAG_TO_TIMETAG,
# current_time + 45,
# current_time + 55,
# )
# tc_queue.appendleft((service_11_8_tc.pack_command_tuple()))
def pack_delete_corresponding_tc_app_data(tc: PusTelecommand) -> bytes:
return TcSchedReqId.build_from_tc(tc).pack()
def build_filter_delete_tc(
time_window_type: TypeOfTimeWindow, *timestamps: int
) -> PusTelecommand:
app_data = bytearray()
app_data.extend(struct.pack("!I", int(time_window_type)))
if time_window_type != TypeOfTimeWindow.SELECT_ALL:
for timestamp in timestamps:
app_data.extend(struct.pack("!I", timestamp))
return PusTelecommand(service=11, subservice=6, app_data=app_data)
def pack_corresponding_timeshift_app_data(
time_delta: bytes, tc: PusTelecommand, ssc: int
) -> bytes:
req_id = TcSchedReqId.build_from_tc(tc)
app_data = bytearray()
app_data.extend(time_delta)
app_data.extend(req_id.pack())
return app_data
def build_filter_timeshift_tc(
time_offset: int, time_window_type: TypeOfTimeWindow, *timestamps: int
) -> PusTelecommand:
app_data = bytearray()
app_data.extend(struct.pack("!I", time_offset))
app_data.extend(struct.pack("!I", int(time_window_type)))
if time_window_type != TypeOfTimeWindow.SELECT_ALL:
for timestamp in timestamps:
app_data.extend(struct.pack("!I", timestamp))
return PusTelecommand(service=11, subservice=8, app_data=app_data)
# waits for a specified amount of seconds and prints ". . ." for each second
def wait_seconds(t: int):
print("Waiting: ", end="")
for x in range(t):
time.sleep(1)
print(". ", end="")
print("")

14
pus_tc/pus_17_test.py Normal file
View File

@ -0,0 +1,14 @@
from tmtccmd.pus.pus_17_test import (
pack_service_17_ping_command,
)
from tmtccmd.tc.queue import DefaultPusQueueHelper
from tmtccmd.logging import get_console_logger
LOGGER = get_console_logger()
def pack_service_17_commands(op_code: str, q: DefaultPusQueueHelper):
if op_code in ["0", "ping"]:
q.add_pus_tc(pack_service_17_ping_command())
else:
LOGGER.warning(f"Invalid op code {op_code}")

41
pus_tc/pus_200_mode.py Normal file
View File

@ -0,0 +1,41 @@
# -*- coding: utf-8 -*-
from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.tc import QueueHelperBase
from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes
from common_tmtc.config.object_ids import TEST_DEVICE_0_ID, ASSEMBLY_ID
from tmtccmd.tc.queue import DefaultPusQueueHelper
def pack_service_200_commands_into(q: DefaultPusQueueHelper, op_code: str):
if op_code == "test":
pack_service_200_test_into(q)
elif op_code == "asm_to_normal" or op_code == "0":
# Set Normal mode
q.add_log_cmd("Testing Service 200: Set Mode Normal")
# Command assembly to normal, submode 1 for dual mode,
mode_data = pack_mode_data(ASSEMBLY_ID, Modes.NORMAL, 1)
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data))
def pack_service_200_test_into(q: DefaultPusQueueHelper):
q.add_log_cmd("Testing Service 200")
# Object ID: DUMMY Device
object_id = TEST_DEVICE_0_ID
# Set On Mode
q.add_log_cmd("Testing Service 200: Set Mode On")
mode_data = pack_mode_data(object_id, Modes.ON, 0)
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data))
# Set Normal mode
q.add_log_cmd("Testing Service 200: Set Mode Normal")
mode_data = pack_mode_data(object_id, Modes.NORMAL, 0)
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data))
# Set Raw Mode
q.add_log_cmd("Testing Service 200: Set Mode Raw")
mode_data = pack_mode_data(object_id, Modes.RAW, 0)
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data))
# Set Off Mode
q.add_log_cmd("Testing Service 200: Set Mode Off")
mode_data = pack_mode_data(object_id, Modes.OFF, 0)
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data))

View File

@ -1,14 +0,0 @@
from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.pus.service_17_test import (
pack_service_17_ping_command,
pack_generic_service17_test,
)
def pack_service_17_commands(op_code: str, init_ssc: int, tc_queue: TcQueueT):
if op_code == "0":
tc_queue.appendleft(
pack_service_17_ping_command(ssc=init_ssc).pack_command_tuple()
)
else:
pack_generic_service17_test(tc_queue=tc_queue, init_ssc=init_ssc)

View File

@ -1,61 +0,0 @@
# -*- coding: utf-8 -*-
"""
@file tmtcc_tc_service_200_mode.py
@brief PUS Service 200: PUS custom service 200: Mode commanding
@author R. Mueller
@date 02.05.2020
"""
from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.tc.packer import TcQueueT
from tmtccmd.tc.service_200_mode import pack_mode_data, Modes
from common_tmtc.config.object_ids import TEST_DEVICE_0_ID, ASSEMBLY_ID
def pack_service_200_commands_into(tc_queue: TcQueueT, op_code: str):
if op_code == "test":
pack_service_200_test_into(tc_queue=tc_queue, init_ssc=2000)
elif op_code == "asm_to_normal" or op_code == "0":
# Set Normal mode
tc_queue.appendleft(
(QueueCommands.PRINT, "Testing Service 200: Set Mode Normal")
)
# Command assembly to normal, submode 1 for dual mode,
mode_data = pack_mode_data(ASSEMBLY_ID, Modes.NORMAL, 1)
command = PusTelecommand(service=200, subservice=1, ssc=0, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
def pack_service_200_test_into(init_ssc: int, tc_queue: TcQueueT) -> int:
new_ssc = init_ssc
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200"))
# Object ID: DUMMY Device
object_id = TEST_DEVICE_0_ID
# Set On Mode
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode On"))
mode_data = pack_mode_data(object_id, Modes.ON, 0)
command = PusTelecommand(service=200, subservice=1, ssc=new_ssc, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
new_ssc += 1
# Set Normal mode
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode Normal"))
mode_data = pack_mode_data(object_id, Modes.NORMAL, 0)
command = PusTelecommand(service=200, subservice=1, ssc=new_ssc, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
new_ssc += 1
# Set Raw Mode
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode Raw"))
mode_data = pack_mode_data(object_id, Modes.RAW, 0)
command = PusTelecommand(service=200, subservice=1, ssc=new_ssc, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
new_ssc += 1
# Set Off Mode
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode Off"))
mode_data = pack_mode_data(object_id, Modes.OFF, 0)
command = PusTelecommand(service=200, subservice=1, ssc=new_ssc, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
new_ssc += 1
tc_queue.appendleft((QueueCommands.EXPORT_LOG, "log/tmtc_log_service200.txt"))
return new_ssc

View File

@ -1,88 +1,94 @@
import struct
from spacepackets.ecss.tc import PusTelecommand
from spacepackets.ecss import PusServices
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.tc.service_20_parameter import pack_type_and_matrix_data, pack_parameter_id
from tmtccmd.tc.service_200_mode import pack_mode_data
from tmtccmd.utility.logger import get_console_logger
from tmtccmd.config import TmtcDefinitionWrapper, OpCodeEntry
from tmtccmd.tc.pus_20_params import pack_type_and_matrix_data, pack_parameter_id
from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes
from tmtccmd.logging import get_console_logger
from common_tmtc.config.object_ids import TEST_DEVICE_0_ID
from tmtccmd.tc.queue import DefaultPusQueueHelper
LOGGER = get_console_logger()
def pack_service20_commands_into(tc_queue: TcQueueT, op_code: str):
def add_param_cmds(defs: TmtcDefinitionWrapper):
op_code_entry = OpCodeEntry()
op_code_entry.add(keys=["0", "test"], info="Generic Test")
defs.add_service(
name=str(PusServices.S20_PARAMETER.value),
info="PUS Service 20 Parameters",
op_code_entry=op_code_entry,
)
def pack_service20_commands_into(q: DefaultPusQueueHelper, op_code: str):
if op_code == "0":
pack_service20_test_into(tc_queue=tc_queue)
pack_service20_test_into(q=q)
def pack_service20_test_into(tc_queue: TcQueueT, called_externally: bool = False):
def pack_service20_test_into(q: DefaultPusQueueHelper, called_externally: bool = False):
if called_externally is False:
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20"))
q.add_log_cmd("Testing Service 20")
object_id = TEST_DEVICE_0_ID
# set mode on
q.add_log_cmd("Testing Service 20: Set Mode On")
mode_data = pack_mode_data(object_id, Modes.ON, 0)
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data))
# set mode normal
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20: Set Normal Mode"))
mode_data = pack_mode_data(object_id, 2, 0)
command = PusTelecommand(service=200, subservice=1, ssc=2000, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
q.add_log_cmd("Testing Service 20: Set Normal Mode")
mode_data = pack_mode_data(object_id, Modes.NORMAL, 0)
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data))
load_param_0_simple_test_commands(tc_queue=tc_queue)
load_param_1_simple_test_commands(tc_queue=tc_queue)
load_param_2_simple_test_commands(tc_queue=tc_queue)
if called_externally is False:
tc_queue.appendleft((QueueCommands.EXPORT_LOG, "log/tmtc_log_service20.txt"))
load_param_0_simple_test_commands(q)
load_param_1_simple_test_commands(q)
load_param_2_simple_test_commands(q)
def load_param_0_simple_test_commands(tc_queue: TcQueueT):
def load_param_0_simple_test_commands(q: DefaultPusQueueHelper):
object_id = TEST_DEVICE_0_ID
parameter_id_0 = pack_parameter_id(domain_id=0, unique_id=0, linear_index=0)
# test checking Load for uint32_t
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20: Load uint32_t"))
q.add_log_cmd("Testing Service 20: Load uint32_t")
type_and_matrix_data = pack_type_and_matrix_data(3, 14, 1, 1)
parameter_data = struct.pack("!I", 42)
payload = object_id + parameter_id_0 + type_and_matrix_data + parameter_data
command = PusTelecommand(service=20, subservice=128, ssc=2010, app_data=payload)
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(PusTelecommand(service=20, subservice=128, app_data=payload))
# test checking Dump for uint32_t
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20: Dump uint32_t"))
q.add_log_cmd("Testing Service 20: Dump uint32_t")
payload = object_id + parameter_id_0
command = PusTelecommand(service=20, subservice=129, ssc=2020, app_data=payload)
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(PusTelecommand(service=20, subservice=129, app_data=payload))
def load_param_1_simple_test_commands(tc_queue: TcQueueT):
def load_param_1_simple_test_commands(q: DefaultPusQueueHelper):
object_id = TEST_DEVICE_0_ID
parameter_id_1 = pack_parameter_id(domain_id=0, unique_id=1, linear_index=0)
# test checking Load for int32_t
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20: Load int32_t"))
q.add_log_cmd("Testing Service 20: Load int32_t")
type_and_matrix_data = pack_type_and_matrix_data(4, 14, 1, 1)
parameter_data = struct.pack("!i", -42)
payload = object_id + parameter_id_1 + type_and_matrix_data + parameter_data
command = PusTelecommand(service=20, subservice=128, ssc=2030, app_data=payload)
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(PusTelecommand(service=20, subservice=128, app_data=payload))
# test checking Dump for int32_t
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20: Dump int32_t"))
q.add_log_cmd("Testing Service 20: Dump int32_t")
payload = object_id + parameter_id_1
command = PusTelecommand(service=20, subservice=129, ssc=2040, app_data=payload)
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(PusTelecommand(service=20, subservice=129, app_data=payload))
def load_param_2_simple_test_commands(tc_queue: TcQueueT):
def load_param_2_simple_test_commands(q: DefaultPusQueueHelper):
object_id = TEST_DEVICE_0_ID
parameter_id_2 = pack_parameter_id(domain_id=0, unique_id=2, linear_index=0)
# test checking Load for float
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20: Load float"))
q.add_log_cmd("Testing Service 20: Load float")
type_and_matrix_data = pack_type_and_matrix_data(ptc=5, pfc=1, rows=1, columns=3)
parameter_data = struct.pack("!fff", 4.2, -4.2, 49)
payload = object_id + parameter_id_2 + type_and_matrix_data + parameter_data
command = PusTelecommand(service=20, subservice=128, ssc=2050, app_data=payload)
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(PusTelecommand(service=20, subservice=128, app_data=payload))
# test checking Dump for float
# Skip dump for now, still not properly implemented

View File

@ -9,75 +9,51 @@ import struct
from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.tc.service_200_mode import Modes
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.tc.service_200_mode import pack_mode_data
from tmtccmd.tc.pus_200_fsfw_modes import Modes, pack_mode_data
from common_tmtc.pus_tc import command_data as cmd_data
from common_tmtc.config.object_ids import TEST_DEVICE_0_ID
from tmtccmd.tc.queue import DefaultPusQueueHelper
def pack_service_2_commands_into(tc_queue: TcQueueT, op_code: str):
def pack_service_2_commands_into(q: DefaultPusQueueHelper, op_code: str):
if op_code == "0":
pack_generic_service_2_test_into(0, tc_queue)
pack_generic_service_2_test_into(0, q)
else:
print(f"pack_service_2_test: Operation code {op_code} unknown!")
def pack_generic_service_2_test_into(init_ssc: int, tc_queue: TcQueueT) -> int:
def pack_generic_service_2_test_into(init_ssc: int, q: DefaultPusQueueHelper) -> int:
new_ssc = init_ssc
object_id = TEST_DEVICE_0_ID # dummy device
# Set Raw Mode
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 2: Setting Raw Mode"))
q.add_log_cmd("Testing Service 2: Setting Raw Mode")
mode_data = pack_mode_data(object_id, Modes.RAW, 0)
command = PusTelecommand(service=200, subservice=1, ssc=new_ssc, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
new_ssc += 1
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data))
# toggle wiretapping raw
tc_queue.appendleft(
(QueueCommands.PRINT, "Testing Service 2: Toggling Wiretapping Raw")
)
q.add_log_cmd("Testing Service 2: Toggling Wiretapping Raw")
wiretapping_toggle_data = pack_wiretapping_mode(object_id, 1)
toggle_wiretapping_on_command = PusTelecommand(
service=2, subservice=129, ssc=new_ssc, app_data=wiretapping_toggle_data
q.add_pus_tc(
PusTelecommand(service=2, subservice=129, app_data=wiretapping_toggle_data)
)
tc_queue.appendleft(toggle_wiretapping_on_command.pack_command_tuple())
new_ssc += 1
# send raw command, wiretapping should be returned via TM[2,130] and TC[2,131]
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 2: Sending Raw Command"))
q.add_log_cmd("Testing Service 2: Sending Raw Command")
raw_command = cmd_data.TEST_COMMAND_0
raw_data = object_id + raw_command
raw_command = PusTelecommand(
service=2, subservice=128, ssc=new_ssc, app_data=raw_data
)
tc_queue.appendleft(raw_command.pack_command_tuple())
new_ssc += 1
q.add_pus_tc(PusTelecommand(service=2, subservice=128, app_data=raw_data))
# toggle wiretapping off
tc_queue.appendleft(
(QueueCommands.PRINT, "Testing Service 2: Toggle Wiretapping Off")
)
q.add_log_cmd("Testing Service 2: Toggle Wiretapping Off")
wiretapping_toggle_data = pack_wiretapping_mode(object_id, 0)
toggle_wiretapping_off_command = PusTelecommand(
service=2, subservice=129, ssc=new_ssc, app_data=wiretapping_toggle_data
q.add_pus_tc(
PusTelecommand(service=2, subservice=129, app_data=wiretapping_toggle_data)
)
tc_queue.appendleft(toggle_wiretapping_off_command.pack_command_tuple())
new_ssc += 1
# send raw command which should be returned via TM[2,130]
tc_queue.appendleft(
(QueueCommands.PRINT, "Testing Service 2: Send second raw command")
)
command = PusTelecommand(service=2, subservice=128, ssc=new_ssc, app_data=raw_data)
tc_queue.appendleft(command.pack_command_tuple())
new_ssc += 1
q.add_log_cmd("Testing Service 2: Send second raw command")
q.add_pus_tc(PusTelecommand(service=2, subservice=128, app_data=raw_data))
# Set mode off
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 2: Setting Off Mode"))
mode_data = pack_mode_data(object_id, 0, 0)
command = PusTelecommand(service=200, subservice=1, ssc=new_ssc, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
new_ssc += 1
tc_queue.appendleft((QueueCommands.EXPORT_LOG, "log/tmtc_log_service_2.txt"))
q.add_log_cmd("Testing Service 2: Setting Off Mode")
mode_data = pack_mode_data(object_id, Modes.OFF, 0)
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data))
return new_ssc

View File

@ -1,20 +1,23 @@
from spacepackets.ecss.tc import PusTelecommand
from datetime import timedelta
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.tc.service_200_mode import pack_mode_data, Modes
from tmtccmd.tc.service_20_parameter import pack_boolean_parameter_command
from tmtccmd.tc.service_3_housekeeping import (
make_sid,
generate_one_hk_command,
Srv3Subservice,
from spacepackets.ecss.tc import PusTelecommand
from spacepackets.ecss import PusServices
from tmtccmd.config import TmtcDefinitionWrapper, OpCodeEntry
from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes
from tmtccmd.tc.pus_20_params import (
pack_boolean_parameter_app_data,
pack_fsfw_load_param_cmd,
)
from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.tc.service_8_functional_cmd import generate_action_command
from tmtccmd.tc.pus_3_fsfw_hk import make_sid, generate_one_hk_command
import tmtccmd.tc.pus_3_fsfw_hk as srv3
from tmtccmd.tc.pus_8_funccmd import make_fsfw_action_cmd
from common_tmtc.config.object_ids import TEST_DEVICE_0_ID, TEST_DEVICE_1_ID
# Set IDs
from tmtccmd.tc.queue import DefaultPusQueueHelper
TEST_SET_ID = 0
# Action IDs
@ -24,7 +27,17 @@ TEST_NOTIFICATION_ACTION_ID = 3
PARAM_ACTIVATE_CHANGING_DATASETS = 4
def pack_service_3_commands_into(tc_queue: TcQueueT, op_code: str):
def add_hk_cmds(defs: TmtcDefinitionWrapper):
op_code_entry = OpCodeEntry()
op_code_entry.add(keys=["0", "test"], info="Generic Test")
defs.add_service(
name=str(PusServices.S3_HOUSEKEEPING.value),
info="PUS Service 3 Housekeeping",
op_code_entry=op_code_entry,
)
def pack_service_3_commands_into(q: DefaultPusQueueHelper, op_code: str):
current_ssc = 3000
device_idx = 0
if device_idx == 0:
@ -35,178 +48,122 @@ def pack_service_3_commands_into(tc_queue: TcQueueT, op_code: str):
if op_code == "0":
# This will pack all the tests
pack_service_3_test_info(
tc_queue=tc_queue,
init_ssc=current_ssc,
q=q,
object_id=object_id,
device_idx=device_idx,
)
elif op_code == "1":
# Extremely simple, generate one HK packet
pack_gen_one_hk_command(
tc_queue=tc_queue,
q=q,
device_idx=device_idx,
init_ssc=current_ssc,
object_id=object_id,
)
elif op_code == "2":
# Housekeeping basic test
pack_housekeeping_basic_test(
tc_queue=tc_queue, object_id=object_id, init_ssc=current_ssc
)
pack_housekeeping_basic_test(q=q, object_id=object_id)
elif op_code == "3":
# Notification demo
pack_notification_basic_test(
tc_queue=tc_queue, object_id=object_id, init_ssc=current_ssc
)
pack_notification_basic_test(q=q, object_id=object_id)
def pack_service_3_test_info(
tc_queue: TcQueueT, device_idx: int, object_id: bytearray, init_ssc: int
q: DefaultPusQueueHelper, device_idx: int, object_id: bytearray
):
tc_queue.appendleft(
(QueueCommands.PRINT, "Service 3 (Housekeeping Service): All tests")
)
current_ssc = init_ssc
current_ssc += pack_gen_one_hk_command(
tc_queue=tc_queue,
device_idx=device_idx,
q.add_log_cmd("Service 3 (Housekeeping Service): All tests")
pack_gen_one_hk_command(q=q, device_idx=device_idx, object_id=object_id)
pack_housekeeping_basic_test(q=q, object_id=object_id)
pack_notification_basic_test(
q=q,
object_id=object_id,
init_ssc=current_ssc,
)
current_ssc += pack_housekeeping_basic_test(
tc_queue=tc_queue, object_id=object_id, init_ssc=current_ssc
)
current_ssc += pack_notification_basic_test(
tc_queue=tc_queue,
object_id=object_id,
init_ssc=current_ssc,
enable_normal_mode=False,
)
def pack_gen_one_hk_command(
tc_queue: TcQueueT, device_idx: int, init_ssc: int, object_id: bytearray
) -> int:
q: DefaultPusQueueHelper, device_idx: int, object_id: bytearray
):
test_sid = make_sid(object_id=object_id, set_id=TEST_SET_ID)
tc_queue.appendleft(
(
QueueCommands.PRINT,
f"Service 3 Test: Generate one test set packet for "
f"test device {device_idx}",
)
q.add_log_cmd(
f"Service 3 Test: Generate one test set packet for test device {device_idx}"
)
command = generate_one_hk_command(ssc=init_ssc, sid=test_sid)
init_ssc += 1
tc_queue.appendleft(command.pack_command_tuple())
return init_ssc
q.add_pus_tc(generate_one_hk_command(sid=test_sid))
def pack_housekeeping_basic_test(
tc_queue: TcQueueT,
q: DefaultPusQueueHelper,
object_id: bytearray,
init_ssc: int,
enable_normal_mode: bool = True,
) -> int:
):
"""
This basic test will request one HK packet, then it will enable periodic packets and listen
to the periodic packets for a few seconds. After that, HK packets will be disabled again.
"""
test_sid = make_sid(object_id=object_id, set_id=TEST_SET_ID)
current_ssc = init_ssc
# Enable changing datasets via parameter service (Service 20)
tc_queue.appendleft(
(QueueCommands.PRINT, "Service 3 Test: Performing basic HK tests")
)
q.add_log_cmd("Service 3 Test: Performing basic HK tests")
if enable_normal_mode:
# Set mode normal so that sets are changed/read regularly
tc_queue.appendleft((QueueCommands.PRINT, "Service 3 Test: Set Normal Mode"))
q.add_log_cmd("Service 3 Test: Set Normal Mode")
mode_data = pack_mode_data(object_id, Modes.NORMAL, 0)
command = PusTelecommand(
service=200, subservice=1, ssc=current_ssc, app_data=mode_data
)
current_ssc += 1
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data))
tc_queue.appendleft((QueueCommands.PRINT, "Enabling changing datasets"))
command = pack_boolean_parameter_command(
q.add_log_cmd("Enabling changing datasets")
app_data = pack_boolean_parameter_app_data(
object_id=object_id,
domain_id=0,
unique_id=PARAM_ACTIVATE_CHANGING_DATASETS,
parameter=True,
ssc=current_ssc,
)
current_ssc += 1
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(pack_fsfw_load_param_cmd(app_data=app_data))
# Enable periodic reporting
tc_queue.appendleft(
(QueueCommands.PRINT, "Enabling periodic thermal sensor packet generation: ")
q.add_log_cmd("Enabling periodic thermal sensor packet generation: ")
q.add_pus_tc(
PusTelecommand(
service=3,
subservice=srv3.Subservices.TC_ENABLE_PERIODIC_HK_GEN,
app_data=test_sid,
)
)
command = PusTelecommand(
service=3,
subservice=Srv3Subservice.ENABLE_PERIODIC_HK_GEN.value,
ssc=current_ssc,
app_data=test_sid,
)
current_ssc += 1
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft((QueueCommands.WAIT, 2.0))
q.add_wait(timedelta(seconds=2.0))
# Disable periodic reporting
tc_queue.appendleft(
(QueueCommands.PRINT, "Disabling periodic thermal sensor packet generation: ")
q.add_log_cmd("Disabling periodic thermal sensor packet generation: ")
q.add_pus_tc(
PusTelecommand(
service=3,
subservice=srv3.Subservices.TC_DISABLE_PERIODIC_HK_GEN,
app_data=test_sid,
)
)
command = PusTelecommand(
service=3,
subservice=Srv3Subservice.DISABLE_PERIODIC_HK_GEN.value,
ssc=current_ssc,
app_data=test_sid,
)
current_ssc += 1
tc_queue.appendleft(command.pack_command_tuple())
# Disable changing datasets via parameter service (Service 20)
tc_queue.appendleft((QueueCommands.PRINT, "Disabling changing datasets"))
command = pack_boolean_parameter_command(
q.add_log_cmd("Disabling changing datasets")
app_data = pack_boolean_parameter_app_data(
object_id=object_id,
domain_id=0,
unique_id=PARAM_ACTIVATE_CHANGING_DATASETS,
parameter=False,
ssc=current_ssc,
)
current_ssc += 1
tc_queue.appendleft(command.pack_command_tuple())
return current_ssc
q.add_pus_tc(pack_fsfw_load_param_cmd(app_data=app_data))
def pack_notification_basic_test(
tc_queue: TcQueueT,
q: DefaultPusQueueHelper,
object_id: bytearray,
init_ssc: int,
enable_normal_mode: bool = True,
) -> int:
current_ssc = init_ssc
tc_queue.appendleft(
(QueueCommands.PRINT, "Service 3 Test: Performing notification tests")
)
):
q.add_log_cmd("Service 3 Test: Performing notification tests")
if enable_normal_mode:
# Set mode normal so that sets are changed/read regularly
tc_queue.appendleft((QueueCommands.PRINT, "Service 3 Test: Set Normal Mode"))
q.add_log_cmd("Service 3 Test: Set Normal Mode")
mode_data = pack_mode_data(object_id, Modes.NORMAL, 0)
command = PusTelecommand(
service=200, subservice=1, ssc=current_ssc, app_data=mode_data
)
current_ssc += 1
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data))
tc_queue.appendleft((QueueCommands.PRINT, "Triggering notification"))
command = generate_action_command(
object_id=object_id, action_id=TEST_NOTIFICATION_ACTION_ID, ssc=current_ssc
q.add_log_cmd("Triggering notification")
q.add_pus_tc(
make_fsfw_action_cmd(object_id=object_id, action_id=TEST_NOTIFICATION_ACTION_ID)
)
tc_queue.appendleft(command.pack_command_tuple())
current_ssc += 1
return current_ssc

View File

@ -1,64 +1,49 @@
from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.tc.service_200_mode import pack_mode_data, Modes
from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes
import common_tmtc.pus_tc.command_data as cmd_data
from common_tmtc.config.object_ids import TEST_DEVICE_0_ID
from tmtccmd.tc.queue import DefaultPusQueueHelper
def pack_service_8_commands_into(tc_queue: TcQueueT, op_code: str):
def pack_service_8_commands_into(q: DefaultPusQueueHelper, op_code: str):
if op_code == "0":
pack_generic_service_8_test_into(tc_queue=tc_queue)
pack_generic_service_8_test_into(q=q)
else:
print(f"pack_service_8_test: Operation code {op_code} unknown!")
def pack_generic_service_8_test_into(tc_queue: TcQueueT):
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 8"))
def pack_generic_service_8_test_into(q: DefaultPusQueueHelper):
q.add_log_cmd("Testing Service 8")
object_id = TEST_DEVICE_0_ID
# set mode on
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 8: Set On Mode"))
q.add_log_cmd("Testing Service 8: Set On Mode")
mode_data = pack_mode_data(object_id, Modes.ON, 0)
command = PusTelecommand(service=200, subservice=1, ssc=800, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data))
# set mode normal
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 8: Set Normal Mode"))
q.add_log_cmd("Testing Service 8: Set Normal Mode")
mode_data = pack_mode_data(object_id, Modes.NORMAL, 0)
command = PusTelecommand(service=200, subservice=1, ssc=810, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data))
# Direct command which triggers completion reply
tc_queue.appendleft(
(QueueCommands.PRINT, "Testing Service 8: Trigger Step and Completion Reply")
)
q.add_log_cmd("Testing Service 8: Trigger Step and Completion Reply")
action_id = cmd_data.TEST_COMMAND_0
direct_command = object_id + action_id
command = PusTelecommand(
service=8, subservice=128, ssc=820, app_data=direct_command
)
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=direct_command))
# Direct command which triggers _tm_data reply
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 8: Trigger Data Reply"))
q.add_log_cmd("Testing Service 8: Trigger Data Reply")
action_id = cmd_data.TEST_COMMAND_1
command_param1 = cmd_data.TEST_COMMAND_1_PARAM_1
command_param2 = cmd_data.TEST_COMMAND_1_PARAM_2
direct_command = object_id + action_id + command_param1 + command_param2
command = PusTelecommand(
service=8, subservice=128, ssc=830, app_data=direct_command
)
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=direct_command))
# Set mode off
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 8: Set Off Mode"))
mode_data = pack_mode_data(object_id, 0, 0)
command = PusTelecommand(service=200, subservice=1, ssc=800, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft((QueueCommands.EXPORT_LOG, "log/tmtc_log_service_8.txt"))
q.add_log_cmd("Testing Service 8: Set Off Mode")
mode_data = pack_mode_data(object_id, Modes.OFF, 0)
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data))

View File

@ -1,50 +0,0 @@
"""
@brief This file transfers control of TC packing to the user
@details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs.
"""
import os
from collections import deque
from typing import Union
from common_tmtc.pus_tc.service_20_parameters import pack_service20_commands_into
from common_tmtc.pus_tc.service_2_raw_cmd import pack_service_2_commands_into
from common_tmtc.pus_tc.service_3_housekeeping import pack_service_3_commands_into
from common_tmtc.pus_tc.service_17_test import pack_service_17_commands
from common_tmtc.pus_tc.service_8_func_cmd import pack_service_8_commands_into
from tmtccmd.utility.logger import get_console_logger
from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.config.definitions import CoreServiceList
from tmtccmd.tc.service_5_event import pack_generic_service5_test_into
from tmtccmd.pus.service_17_test import pack_generic_service17_test
from common_tmtc.pus_tc.service_200_mode import pack_service_200_commands_into
LOGGER = get_console_logger()
def pack_service_queue_user(service: Union[str, int], op_code: str, tc_queue: TcQueueT):
if service == CoreServiceList.SERVICE_2.value:
return pack_service_2_commands_into(op_code=op_code, tc_queue=tc_queue)
if service == CoreServiceList.SERVICE_3.value:
return pack_service_3_commands_into(op_code=op_code, tc_queue=tc_queue)
if service == CoreServiceList.SERVICE_5.value:
return pack_generic_service5_test_into(tc_queue=tc_queue)
if service == CoreServiceList.SERVICE_8.value:
return pack_service_8_commands_into(op_code=op_code, tc_queue=tc_queue)
if service == CoreServiceList.SERVICE_17.value:
return pack_service_17_commands(op_code=op_code, tc_queue=tc_queue, init_ssc=0)
if service == CoreServiceList.SERVICE_20.value:
return pack_service20_commands_into(tc_queue=tc_queue, op_code=op_code)
if service == CoreServiceList.SERVICE_200.value:
return pack_service_200_commands_into(tc_queue=tc_queue, op_code=op_code)
LOGGER.warning("Invalid Service !")
def create_total_tc_queue_user() -> TcQueueT:
if not os.path.exists("log"):
os.mkdir("log")
tc_queue = deque()
pack_service_2_commands_into(op_code="0", tc_queue=tc_queue)
pack_generic_service17_test(init_ssc=1700, tc_queue=tc_queue)
return tc_queue

View File

@ -0,0 +1,24 @@
from tmtccmd.logging import get_console_logger
from tmtccmd.tm import Service8FsfwTm
from tmtccmd.util import ObjectIdDictT
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter
LOGGER = get_console_logger()
def handle_action_reply(
raw_tm: bytes, printer: FsfwTmTcPrinter, obj_id_dict: ObjectIdDictT
):
"""Core Action reply handler
:return:
"""
tm_packet = Service8FsfwTm.unpack(raw_telemetry=raw_tm)
printer.handle_long_tm_print(packet_if=tm_packet, info_if=tm_packet)
object_id = obj_id_dict.get(tm_packet.source_object_id_as_bytes)
custom_data = tm_packet.custom_data
action_id = tm_packet.action_id
generic_print_str = printer.generic_action_packet_tm_print(
packet=tm_packet, obj_id=object_id
)
print(generic_print_str)
printer.file_logger.info(generic_print_str)

60
pus_tm/event_handler.py Normal file
View File

@ -0,0 +1,60 @@
import logging
import os.path
from datetime import datetime
from common_tmtc.config.object_ids import get_object_ids
from tmtccmd.tm import Service5Tm
from tmtccmd.logging import get_console_logger
from tmtccmd.fsfw import parse_fsfw_events_csv, EventDictT, EventInfo
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter
LOGGER = get_console_logger()
DEFAULT_EVENTS_CSV_PATH = "config/events.csv"
__EVENT_DICT = None
def get_event_dict() -> EventDictT:
global __EVENT_DICT
if __EVENT_DICT is None:
if os.path.exists(DEFAULT_EVENTS_CSV_PATH):
__EVENT_DICT = parse_fsfw_events_csv(DEFAULT_EVENTS_CSV_PATH)
else:
LOGGER.warning(f"No Event CSV file found at {DEFAULT_EVENTS_CSV_PATH}")
__EVENT_DICT = dict()
return __EVENT_DICT
def handle_event_packet(
raw_tm: bytes, printer: FsfwTmTcPrinter, file_logger: logging.Logger
) -> str:
tm = Service5Tm.unpack(raw_telemetry=raw_tm)
printer.handle_long_tm_print(packet_if=tm, info_if=tm)
additional_event_info = ""
event_dict = get_event_dict()
info = event_dict.get(tm.event_id)
if info is None:
LOGGER.warning(f"Event ID {tm.event_id} has no information")
info = EventInfo()
info.name = "Unknown event"
obj_ids = get_object_ids()
obj_id_obj = obj_ids.get(tm.reporter_id.as_bytes)
if obj_id_obj is None:
LOGGER.warning(f"Object ID 0x{tm.reporter_id.name} has no name")
obj_name = tm.reporter_id.name
else:
obj_name = obj_id_obj.name
generic_event_string = (
f"Object {obj_name} generated Event {tm.event_id} | {info.name}"
)
if info.info != "":
additional_event_info = (
f"Additional info: {info.info} | P1: {tm.param_1} | P2: {tm.param_2}"
)
file_logger.info(
f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}: {generic_event_string}"
)
LOGGER.info(generic_event_string)
if additional_event_info != "":
file_logger.info(additional_event_info)
print(additional_event_info)
return generic_event_string + " | " + additional_event_info

View File

@ -3,56 +3,74 @@
@details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs.
"""
from spacepackets.ecss.pus_17_test import Service17Tm
import spacepackets.ecss.pus_17_test as pus17
from spacepackets.ecss.tm import PusTelemetry
from spacepackets.util import PrintFormats
from tmtccmd.utility.logger import get_console_logger
from tmtccmd.pus.service_1_verification import Service1TMExtended
from tmtccmd.tm.service_2_raw_cmd import Service2TM
from tmtccmd.tm.service_3_housekeeping import Service3TM
from tmtccmd.tm.service_5_event import Service5TM
from tmtccmd.tm.service_8_functional_cmd import Service8TM
from tmtccmd.pus.service_17_test import Service17TMExtended
from tmtccmd.tm.service_20_parameters import Service20TM
from tmtccmd.tm.service_200_mode import Service200TM
from tmtccmd.utility.tmtc_printer import TmTcPrinter
from common_tmtc.config.definitions import PUS_APID
from tmtccmd.logging.pus import RawTmtcTimedLogWrapper
from tmtccmd.pus import VerificationWrapper
from tmtccmd.tm.pus_2_rawcmd import Service2Tm
from tmtccmd.tm.pus_20_fsfw_parameters import Service20FsfwTm
from tmtccmd.tm.pus_200_fsfw_modes import Service200FsfwTm
from tmtccmd.logging import get_console_logger
from common_tmtc.config.object_ids import get_object_ids
from common_tmtc.pus_tm.action_reply_handling import handle_action_reply
from common_tmtc.pus_tm.event_handler import handle_event_packet
from common_tmtc.pus_tm.verification_handler import handle_service_1_fsfw_packet
from common_tmtc.pus_tm.hk_handling import handle_hk_packet
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter
LOGGER = get_console_logger()
def ccsds_tm_handler(
apid: int, raw_tm_packet: bytearray, tmtc_printer: TmTcPrinter
) -> None:
if apid == PUS_APID:
pus_packet_factory(raw_tm_packet=raw_tm_packet, tmtc_printer=tmtc_printer)
def pus_packet_factory(raw_tm_packet: bytearray, tmtc_printer: TmTcPrinter):
service_type = raw_tm_packet[7]
tm_packet = None
if service_type == 1:
tm_packet = Service1TMExtended.unpack(raw_tm_packet)
if service_type == 2:
tm_packet = Service2TM.unpack(raw_tm_packet)
if service_type == 3:
tm_packet = Service3TM.unpack(raw_tm_packet, custom_hk_handling=False)
if service_type == 8:
tm_packet = Service8TM.unpack(raw_tm_packet)
if service_type == 5:
tm_packet = Service5TM.unpack(raw_tm_packet)
if service_type == 17:
tm_packet = Service17TMExtended.unpack(raw_tm_packet)
if service_type == 20:
tm_packet = Service20TM.unpack(raw_tm_packet)
if service_type == 200:
tm_packet = Service200TM.unpack(raw_tm_packet)
if tm_packet is None:
LOGGER.info(
"The service "
+ str(service_type)
+ " is not implemented in Telemetry Factory"
)
tm_packet = PusTelemetry.unpack(raw_tm_packet)
tmtc_printer.print_telemetry(packet_if=tm_packet, info_if=tm_packet)
def pus_factory_hook(
wrapper: VerificationWrapper,
packet: bytes,
printer: FsfwTmTcPrinter,
raw_logger: RawTmtcTimedLogWrapper,
):
if len(packet) < 8:
LOGGER.warning("Detected packet shorter than 8 bytes!")
return
try:
tm_packet = PusTelemetry.unpack(packet)
except ValueError:
LOGGER.warning("Could not generate PUS TM object from raw data")
LOGGER.warning(f"Raw Packet: [{packet.hex(sep=',')}], REPR: {packet!r}")
return
service = tm_packet.service
file_logger = printer.file_logger
obj_id_dict = get_object_ids()
dedicated_handler = True
if service == 1:
handle_service_1_fsfw_packet(wrapper=wrapper, raw_tm=packet)
elif service == 2:
tm_packet = Service2Tm.unpack(packet)
dedicated_handler = False
elif service == 3:
handle_hk_packet(printer=printer, raw_tm=packet, obj_id_dict=obj_id_dict)
elif service == 8:
handle_action_reply(raw_tm=packet, printer=printer, obj_id_dict=obj_id_dict)
elif service == 5:
handle_event_packet(raw_tm=packet, printer=printer, file_logger=file_logger)
elif service == 17:
tm_packet = Service17Tm.unpack(raw_telemetry=packet)
if tm_packet.subservice == pus17.Subservices.TM_REPLY:
wrapper.dlog("Received Ping Reply TM[17,2]")
dedicated_handler = True
elif service == 20:
tm_packet = Service20FsfwTm.unpack(raw_telemetry=packet)
dedicated_handler = False
elif service == 200:
tm_packet = Service200FsfwTm.unpack(raw_telemetry=packet)
dedicated_handler = False
else:
LOGGER.info(f"The service {service} is not implemented in Telemetry Factory")
tm_packet = PusTelemetry.unpack(raw_telemetry=packet)
tm_packet.print_source_data(PrintFormats.HEX)
dedicated_handler = True
if not dedicated_handler and tm_packet is not None:
printer.handle_long_tm_print(packet_if=tm_packet, info_if=tm_packet)
raw_logger.log_tm(tm_packet)

View File

@ -2,44 +2,57 @@
"""
import struct
from typing import Tuple
from tmtccmd.tm.service_3_housekeeping import Service3Base
from tmtccmd.utility.logger import get_console_logger
from tmtccmd.tm import Service3FsfwTm
from tmtccmd.tm.pus_3_hk_base import HkContentType, Service3Base
from tmtccmd.logging import get_console_logger
from common_tmtc.config.object_ids import TEST_DEVICE_0_ID, TEST_DEVICE_1_ID
from tmtccmd.util import ObjectIdDictT, ObjectIdU32
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter
LOGGER = get_console_logger()
def service_3_hk_handling(
object_id: bytes, set_id: int, hk_data: bytearray, service3_packet: Service3Base
) -> Tuple[list, list, bytearray, int]:
"""This function is called when a Service 3 Housekeeping packet is received.
def handle_hk_packet(
raw_tm: bytes,
obj_id_dict: ObjectIdDictT,
printer: FsfwTmTcPrinter,
):
tm_packet = Service3FsfwTm.unpack(raw_telemetry=raw_tm, custom_hk_handling=False)
named_obj_id = obj_id_dict.get(tm_packet.object_id.as_bytes)
if named_obj_id is None:
named_obj_id = tm_packet.object_id
if tm_packet.subservice == 25 or tm_packet.subservice == 26:
hk_data = tm_packet.tm_data[8:]
printer.generic_hk_tm_print(
content_type=HkContentType.HK,
object_id=named_obj_id,
set_id=tm_packet.set_id,
hk_data=hk_data,
)
handle_regular_hk_print(
printer=printer,
object_id=named_obj_id,
hk_packet=tm_packet,
hk_data=hk_data,
)
if tm_packet.subservice == 10 or tm_packet.subservice == 12:
LOGGER.warning("HK definitions printout not implemented yet")
Please note that the object IDs should be compared by value because direct comparison of
enumerations does not work in Python. For example use:
if object_id.value == ObjectIds.TEST_OBJECT.value
to test equality based on the object ID list.
@param object_id:
@param set_id:
@param hk_data:
@param service3_packet:
@return: Expects a tuple, consisting of two lists, a bytearray and an integer
The first list contains the header columns, the second list the list with
the corresponding values. The bytearray is the validity buffer, which is usually appended
at the end of the housekeeping packet. The last value is the number of parameters.
"""
def handle_regular_hk_print(
printer: FsfwTmTcPrinter,
object_id: ObjectIdU32,
hk_packet: Service3Base,
hk_data: bytes,
):
if object_id == TEST_DEVICE_0_ID or object_id == TEST_DEVICE_1_ID:
return handle_test_set_deserialization(hk_data=hk_data)
else:
LOGGER.info("Service3TM: Parsing for this SID has not been implemented.")
return [], [], bytearray(), 0
handle_test_set_deserialization(hk_data=hk_data)
def handle_test_set_deserialization(
hk_data: bytearray,
hk_data: bytes,
) -> Tuple[list, list, bytearray, int]:
header_list = []
content_list = []

View File

@ -1,20 +0,0 @@
from typing import Tuple
def custom_service_8_handling(
object_id: int, action_id: int, custom_data: bytearray
) -> Tuple[list, list]:
"""
This function is called by the TMTC core if a Service 8 data reply (subservice 130)
is received. The user can return a tuple of two lists, where the first list
is a list of header strings to print and the second list is a list of values to print.
The TMTC core will take care of printing both lists and logging them.
@param object_id:
@param action_id:
@param custom_data:
@return:
"""
header_list = []
content_list = []
return header_list, content_list

View File

@ -0,0 +1,53 @@
from spacepackets.ecss.pus_1_verification import UnpackParams, Service1Tm
from tmtccmd.pus import VerificationWrapper
from tmtccmd.tm.pus_1_verification import Service1TmExtended, Service1FsfwWrapper
from tmtccmd.logging import get_console_logger
from common_tmtc.config.retvals import get_retval_dict
LOGGER = get_console_logger()
def handle_service_1_fsfw_packet(wrapper: VerificationWrapper, raw_tm: bytes):
if wrapper.console_logger is None or wrapper.file_logger is None:
raise ValueError(
"Console logger or file logger not valid. Please set a valid one"
)
# Error code with length 2 is FSFW specific
tm_packet = Service1Tm.unpack(data=raw_tm, params=UnpackParams(1, 2))
fsfw_wrapper = Service1FsfwWrapper(tm_packet)
res = wrapper.verificator.add_tm(tm_packet)
if res is None:
LOGGER.info(
f"Received Verification TM[{tm_packet.service}, {tm_packet.subservice}] "
f"with Request ID {tm_packet.tc_req_id.as_u32():#08x}"
)
LOGGER.warning(f"No matching telecommand found for {tm_packet.tc_req_id}")
else:
wrapper.log_to_console(tm_packet, res)
wrapper.log_to_file(tm_packet, res)
retval_dict = get_retval_dict()
if tm_packet.has_failure_notice:
retval_info = retval_dict.get(tm_packet.error_code.val)
if retval_info is None:
raw_err = tm_packet.error_code.val
LOGGER.info(
f"No returnvalue information found for error code with subsystem ID"
f" {(raw_err >> 8) & 0xff} and unique ID {raw_err & 0xff}"
)
else:
retval_string = (
f"Error Code information for code {tm_packet.error_code.val:#06x} | "
f"Name: {retval_info.name} | Info: {retval_info.info}"
)
error_param_1_str = (
f"Error Parameter 1: hex {fsfw_wrapper.error_param_1:#010x} "
f"dec{fsfw_wrapper.error_param_1} "
)
error_param_2_str = (
f"Error Parameter 2: hex {fsfw_wrapper.error_param_2:#010x} "
f"dec {fsfw_wrapper.error_param_2}"
)
wrapper.dlog(retval_string)
wrapper.dlog(error_param_1_str)
wrapper.dlog(error_param_2_str)