#!/usr/bin/env python3 """Example client for the sat-rs example application""" from __future__ import annotations import logging import sys import time from typing import Optional from prompt_toolkit.history import History from prompt_toolkit.history import FileHistory from spacepackets.ccsds import PacketId, PacketType import tmtccmd from spacepackets.ecss import PusVerificator from spacepackets.ccsds.time import CdsShortTimestamp from tmtccmd import TcHandlerBase, ProcedureParamsWrapper from tmtccmd.core.base import BackendRequest from tmtccmd.pus import VerificationWrapper from tmtccmd.tmtc import CcsdsTmHandler from tmtccmd.com import ComInterface from tmtccmd.config import ( CmdTreeNode, default_json_path, SetupParams, HookBase, params_to_procedure_conversion, ) from tmtccmd.config import PreArgsParsingWrapper, SetupWrapper from tmtccmd.logging import add_colorlog_console_logger from tmtccmd.logging.pus import ( RegularTmtcLogWrapper, RawTmtcTimedLogWrapper, TimedLogWhen, ) from tmtccmd.tmtc import ( TcQueueEntryType, ProcedureWrapper, TcProcedureType, FeedWrapper, SendCbParams, DefaultPusQueueHelper, QueueWrapper, ) from spacepackets.seqcount import FileSeqCountProvider, PusFileSeqCountProvider from tmtccmd.util.obj_id import ObjectIdDictT from opssat_tmtc.pus_tc import create_cmd_definition_tree, pack_pus_telecommands from opssat_tmtc.common import EXPERIMENT_APID from opssat_tmtc.pus_tm import PusHandler _LOGGER = logging.getLogger() class SatRsConfigHook(HookBase): def __init__(self, json_cfg_path: str): super().__init__(json_cfg_path) def get_communication_interface(self, com_if_key: str) -> Optional[ComInterface]: from tmtccmd.config.com import ( create_com_interface_default, create_com_interface_cfg_default, ) assert self.cfg_path is not None packet_id_list = [] packet_id_list.append(PacketId(PacketType.TM, True, EXPERIMENT_APID)) cfg = create_com_interface_cfg_default( com_if_key=com_if_key, json_cfg_path=self.cfg_path, space_packet_ids=packet_id_list, ) assert cfg is not None return create_com_interface_default(cfg) def get_command_definitions(self) -> CmdTreeNode: """This function should return the root node of the command definition tree.""" return create_cmd_definition_tree() def get_cmd_history(self) -> Optional[History]: """Optionlly return a history class for the past command paths which will be used when prompting a command path from the user in CLI mode.""" return FileHistory(".tmtc-history.txt") def get_object_ids(self) -> ObjectIdDictT: from tmtccmd.config.objects import get_core_object_ids return get_core_object_ids() class TcHandler(TcHandlerBase): def __init__( self, seq_count_provider: FileSeqCountProvider, verif_wrapper: VerificationWrapper, ): super(TcHandler, self).__init__() self.seq_count_provider = seq_count_provider self.verif_wrapper = verif_wrapper self.queue_helper = DefaultPusQueueHelper( queue_wrapper=QueueWrapper.empty(), tc_sched_timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE, seq_cnt_provider=seq_count_provider, pus_verificator=self.verif_wrapper.pus_verificator, default_pus_apid=EXPERIMENT_APID, ) def send_cb(self, send_params: SendCbParams): entry_helper = send_params.entry if entry_helper.is_tc: if entry_helper.entry_type == TcQueueEntryType.PUS_TC: pus_tc_wrapper = entry_helper.to_pus_tc_entry() raw_tc = pus_tc_wrapper.pus_tc.pack() _LOGGER.info(f"Sending {pus_tc_wrapper.pus_tc}") send_params.com_if.send(raw_tc) elif entry_helper.entry_type == TcQueueEntryType.LOG: log_entry = entry_helper.to_log_entry() _LOGGER.info(log_entry.log_str) def queue_finished_cb(self, info: ProcedureWrapper): if info.proc_type == TcProcedureType.TREE_COMMANDING: def_proc = info.to_tree_commanding_procedure() _LOGGER.info(f"Queue handling finished for command {def_proc.cmd_path}") def feed_cb(self, info: ProcedureWrapper, wrapper: FeedWrapper): q = self.queue_helper q.queue_wrapper = wrapper.queue_wrapper if info.proc_type == TcProcedureType.TREE_COMMANDING: def_proc = info.to_tree_commanding_procedure() assert def_proc.cmd_path is not None pack_pus_telecommands(q, def_proc.cmd_path) def main(): add_colorlog_console_logger(_LOGGER) tmtccmd.init_printout(False) hook_obj = SatRsConfigHook(json_cfg_path=default_json_path()) parser_wrapper = PreArgsParsingWrapper() parser_wrapper.create_default_parent_parser() parser_wrapper.create_default_parser() parser_wrapper.add_def_proc_args() params = SetupParams() post_args_wrapper = parser_wrapper.parse(hook_obj, params) proc_wrapper = ProcedureParamsWrapper() if post_args_wrapper.use_gui: post_args_wrapper.set_params_without_prompts(proc_wrapper) else: post_args_wrapper.set_params_with_prompts(proc_wrapper) setup_args = SetupWrapper( hook_obj=hook_obj, setup_params=params, proc_param_wrapper=proc_wrapper ) # Create console logger helper and file loggers tmtc_logger = RegularTmtcLogWrapper() file_logger = tmtc_logger.logger raw_logger = RawTmtcTimedLogWrapper(when=TimedLogWhen.PER_HOUR, interval=1) verificator = PusVerificator() verification_wrapper = VerificationWrapper(verificator, _LOGGER, file_logger) # Create primary TM handlers and add it to the CCSDS Packet Handler tm_handler = PusHandler(file_logger, verification_wrapper, raw_logger) ccsds_handler = CcsdsTmHandler(generic_handler=tm_handler) # TODO: We could add the CFDP handlers for the CFDP APID at a later stage. # ccsds_handler.add_apid_handler(tm_handler) # Create TC handlers seq_count_provider = PusFileSeqCountProvider() tc_handler = TcHandler(seq_count_provider, verification_wrapper) tmtccmd.setup(setup_args=setup_args) init_proc = params_to_procedure_conversion(setup_args.proc_param_wrapper) tmtc_backend = tmtccmd.create_default_tmtc_backend( setup_wrapper=setup_args, tm_handler=ccsds_handler, tc_handler=tc_handler, init_procedure=init_proc, ) tmtccmd.start(tmtc_backend=tmtc_backend, hook_obj=hook_obj) try: while True: state = tmtc_backend.periodic_op(None) if state.request == BackendRequest.TERMINATION_NO_ERROR: tmtc_backend.close_com_if() sys.exit(0) elif state.request == BackendRequest.DELAY_IDLE: _LOGGER.info("TMTC Client in IDLE mode") time.sleep(3.0) elif state.request == BackendRequest.DELAY_LISTENER: time.sleep(0.8) elif state.request == BackendRequest.DELAY_CUSTOM: if state.next_delay.total_seconds() <= 0.4: time.sleep(state.next_delay.total_seconds()) else: time.sleep(0.4) elif state.request == BackendRequest.CALL_NEXT: pass except KeyboardInterrupt: tmtc_backend.close_com_if() sys.exit(0) if __name__ == "__main__": main()