import logging import sys from pathlib import Path from typing import Optional, Sequence from spacepackets import SpacePacket, SpacePacketHeader, PacketTypes from spacepackets.cfdp import ( TransmissionModes, PduType, DirectiveType, GenericPduPacket, PduHolder, PduFactory, ) from tmtccmd.cfdp import ( RemoteEntityCfgTable, RemoteEntityCfg, LocalEntityCfg, CfdpUserBase, ) from spacepackets.util import UnsignedByteField from tmtccmd.cfdp.defs import CfdpRequestType from tmtccmd.cfdp.handler import SourceHandler, DestHandler from tmtccmd.cfdp.request import PutRequest, PutRequestCfg from tmtccmd.logging import get_current_time_string from tmtccmd.pus.pus_11_tc_sched import Subservices as Pus11Subservices from tmtccmd.tc.queue import DefaultPusQueueHelper from tmtccmd.util import FileSeqCountProvider, PusFileSeqCountProvider, ProvidesSeqCount from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter try: import spacepackets except ImportError as error: print(error) print("Python tmtccmd module could not be imported. Make sure it is installed") sys.exit(1) try: import tmtccmd except ImportError as error: print(error) print("Python tmtccmd module could not be imported. Make sure it is installed") sys.exit(1) from spacepackets.ecss import PusVerificator, PusTelecommand, PusServices from common_tmtc.pus_tc.pus_11_tc_sched import pack_service_11_commands from common_tmtc.pus_tc.pus_17_test import pack_service_17_commands from common_tmtc.pus_tc.pus_200_mode import pack_service_200_commands_into from common_tmtc.pus_tc.service_20_parameters import pack_service20_commands_into from common_tmtc.pus_tc.service_2_raw_cmd import pack_service_2_commands_into from common_tmtc.pus_tc.service_3_housekeeping import pack_service_3_commands_into from common_tmtc.pus_tc.service_8_func_cmd import pack_service_8_commands_into from examples.tmtcc import EXAMPLE_PUS_APID, EXAMPLE_CFDP_APID from tmtccmd import TcHandlerBase, get_console_logger, TmTcCfgHookBase, BackendBase from tmtccmd.pus import VerificationWrapper from tmtccmd.tc import ( ProcedureHelper, FeedWrapper, TcProcedureType, TcQueueEntryType, SendCbParams, ) from tmtccmd.tc.pus_5_event import pack_generic_service_5_test_into from tmtccmd.tm import SpecificApidHandlerBase, CcsdsTmHandler from tmtccmd.logging.pus import RawTmtcTimedLogWrapper from tmtccmd.config import CoreServiceList, SetupWrapper, SetupParams, ArgParserWrapper from common_tmtc.pus_tm.factory_hook import pus_factory_hook LOGGER = get_console_logger() class CfdpCcsdsWrapper(SpecificApidHandlerBase): def __init__( self, cfg: LocalEntityCfg, user: CfdpUserBase, cfdp_seq_cnt_provider: ProvidesSeqCount, remote_cfg: Sequence[RemoteEntityCfg], ccsds_seq_cnt_provider: ProvidesSeqCount, ccsds_apid: int, ): """Wrapper helper type used to wrap PDU packets into CCSDS packets and to extract PDU packets from CCSDS packets. :param cfg: Local CFDP entity configuration. :param user: User wrapper. This contains the indication callback implementations and the virtual filestore implementation. :param cfdp_seq_cnt_provider: Every CFDP file transfer has a transaction sequence number. This provider is used to retrieve that sequence number. :param remote_cfg: :param ccsds_seq_cnt_provider: Each CFDP PDU is wrapped into a CCSDS space packet, and each space packet has a dedicated sequence count. This provider is used to retrieve the sequence count. :param ccsds_apid: APID to use for the CCSDS space packet header wrapped around each PDU. This is important so that the OBSW can distinguish between regular PUS packets and CFDP packets. """ super().__init__(EXAMPLE_CFDP_APID, None) self.handler = CfdpHandler(cfg, user, cfdp_seq_cnt_provider, remote_cfg) self.ccsds_seq_cnt_provider = ccsds_seq_cnt_provider self.ccsds_apid = ccsds_apid def pull_next_dest_packet(self) -> Optional[SpacePacket]: """Retrieves the next PDU to send and wraps it into a space packet""" next_packet = self.handler.pull_next_dest_packet() if next_packet is None: return next_packet sp_header = SpacePacketHeader( packet_type=PacketTypes.TC, apid=self.ccsds_apid, seq_count=self.ccsds_seq_cnt_provider.get_and_increment(), data_len=next_packet.packet_len - 1, ) return SpacePacket(sp_header, None, next_packet.pack()) def confirm_dest_packet_sent(self): self.handler.confirm_dest_packet_sent() def pass_packet(self, space_packet: SpacePacket): # Unwrap the user data and pass it to the handler pdu_raw = space_packet.user_data pdu_base = PduFactory.from_raw(pdu_raw) self.handler.pass_packet(pdu_base) def handle_tm(self, packet: bytes, _user_args: any): ccsds_header_raw = packet[0:6] sp_header = SpacePacketHeader.unpack(ccsds_header_raw) pdu = packet[6:] sp = SpacePacket(sp_header, sec_header=None, user_data=pdu) self.pass_packet(sp) class CfdpHandler: def __init__( self, cfg: LocalEntityCfg, user: CfdpUserBase, seq_cnt_provider: ProvidesSeqCount, remote_cfg: Sequence[RemoteEntityCfg], ): self.dest_id = UnsignedByteField(EXAMPLE_PUS_APID, 2) self.remote_cfg_table = RemoteEntityCfgTable() self.remote_cfg_table.add_remote_entities(remote_cfg) self.dest_handler = DestHandler(cfg, user, self.remote_cfg_table) self.source_handler = SourceHandler(cfg, seq_cnt_provider, user) def put_request_file( self, source_path: Path, dest_path: Path, trans_mode: TransmissionModes, closure_requested: bool, ): put_request_cfg = PutRequestCfg( destination_id=self.dest_id, source_file=source_path, dest_file=dest_path.as_posix(), trans_mode=trans_mode, closure_requested=closure_requested, ) put_request = PutRequest(put_request_cfg) self.source_handler.put_request( put_request, self.remote_cfg_table.get_remote_entity(self.dest_id) ) def pull_next_dest_packet(self) -> Optional[PduHolder]: res = self.dest_handler.state_machine() if res.states.packet_ready: return self.dest_handler.pdu_holder return None def confirm_dest_packet_sent(self): self.dest_handler.confirm_packet_sent_advance_fsm() def pass_packet(self, packet: GenericPduPacket): """This function routes the packets based on PDU type and directive type if applicable. The routing is based on section 4.5 of the CFDP standard whcih specifies the PDU forwarding procedure. """ if packet.pdu_type == PduType.FILE_DATA: self.dest_handler.pass_packet(packet) else: if packet.directive_type in [ DirectiveType.METADATA_PDU, DirectiveType.EOF_PDU, DirectiveType.PROMPT_PDU, ]: # Section b) of 4.5.3: These PDUs should always be targeted towards the file # receiver a.k.a. the destination handler self.dest_handler.pass_packet(packet) elif packet.directive_type in [ DirectiveType.FINISHED_PDU, DirectiveType.NAK_PDU, DirectiveType.KEEP_ALIVE_PDU, ]: # Section c) of 4.5.3: These PDUs should always be targeted towards the file sender # a.k.a. the source handler self.source_handler.pass_packet(packet) elif packet.directive_type == DirectiveType.ACK_PDU: # Section a): Recipient depends on the type of PDU that is being acknowledged. # We can simply extract the PDU type from the raw stream. If it is an EOF PDU, # this packet is passed to the source handler. For a finished PDU, it is # passed to the destination handler pdu_holder = PduHolder(packet) ack_pdu = pdu_holder.to_ack_pdu() if ack_pdu.directive_code_of_acked_pdu == DirectiveType.EOF_PDU: self.source_handler.pass_packet(packet) elif ack_pdu.directive_code_of_acked_pdu == DirectiveType.FINISHED_PDU: self.dest_handler.pass_packet(packet) class PusHandler(SpecificApidHandlerBase): def __init__( self, wrapper: VerificationWrapper, printer: FsfwTmTcPrinter, raw_logger: RawTmtcTimedLogWrapper, ): super().__init__(EXAMPLE_PUS_APID, None) self.printer = printer self.verif_wrapper = wrapper self.raw_logger = raw_logger def handle_tm(self, packet: bytes, _user_args: any): pus_factory_hook( packet=packet, wrapper=self.verif_wrapper, raw_logger=self.raw_logger, printer=self.printer, ) class TcHandler(TcHandlerBase): def __init__( self, seq_count_provider: FileSeqCountProvider, cfdp_handler: CfdpCcsdsWrapper, pus_verificator: PusVerificator, file_logger: logging.Logger, raw_logger: RawTmtcTimedLogWrapper, ): super().__init__() self.seq_count_provider = seq_count_provider self.pus_verificator = pus_verificator self.file_logger = file_logger self.raw_logger = raw_logger self.queue_helper = DefaultPusQueueHelper( queue_wrapper=None, pus_apid=EXAMPLE_PUS_APID, seq_cnt_provider=seq_count_provider, pus_verificator=pus_verificator, ) self.cfdp_handler = cfdp_handler def feed_cb(self, info: ProcedureHelper, wrapper: FeedWrapper): self.queue_helper.queue_wrapper = wrapper.queue_wrapper if info.proc_type == TcProcedureType.DEFAULT: self.handle_default_procedure(info) elif info.proc_type == TcProcedureType.CFDP: self.handle_cfdp_procedure(info) def handle_default_procedure(self, info: ProcedureHelper): def_proc = info.to_def_procedure() service = def_proc.service op_code = def_proc.op_code if service == CoreServiceList.SERVICE_2.value: return pack_service_2_commands_into(op_code=op_code, q=self.queue_helper) if service == CoreServiceList.SERVICE_3.value: return pack_service_3_commands_into(op_code=op_code, q=self.queue_helper) if service == CoreServiceList.SERVICE_5.value: return pack_generic_service_5_test_into(q=self.queue_helper) if service == CoreServiceList.SERVICE_8.value: return pack_service_8_commands_into(op_code=op_code, q=self.queue_helper) if service == CoreServiceList.SERVICE_11.value: return pack_service_11_commands(op_code=op_code, q=self.queue_helper) if service == CoreServiceList.SERVICE_17.value: return pack_service_17_commands(op_code=op_code, q=self.queue_helper) if service == CoreServiceList.SERVICE_20.value: return pack_service20_commands_into(q=self.queue_helper, op_code=op_code) if service == CoreServiceList.SERVICE_200.value: return pack_service_200_commands_into(q=self.queue_helper, op_code=op_code) LOGGER.warning("Invalid Service !") def handle_cfdp_procedure(self, info: ProcedureHelper): cfdp_procedure = info.to_cfdp_procedure() if cfdp_procedure.cfdp_request_type == CfdpRequestType.PUT: # TODO: Start put request if there isn't one pending yet # self.cfdp_handler.handler.put_request_file(...) pass pass def send_cb(self, params: SendCbParams): if params.entry.is_tc: if params.entry.entry_type == TcQueueEntryType.PUS_TC: self.handle_tc_send_cb(params) elif params.entry.entry_type == TcQueueEntryType.LOG: log_entry = params.entry.to_log_entry() LOGGER.info(log_entry.log_str) self.file_logger.info(log_entry.log_str) def handle_tc_send_cb(self, params: SendCbParams): pus_tc_wrapper = params.entry.to_pus_tc_entry() if ( pus_tc_wrapper.pus_tc.service == PusServices.S11_TC_SCHED and pus_tc_wrapper.pus_tc.subservice == Pus11Subservices.TC_INSERT ): wrapped_tc = PusTelecommand.unpack(pus_tc_wrapper.pus_tc.app_data[4:]) tc_info_string = f"Sending time-tagged command {wrapped_tc}" LOGGER.info(tc_info_string) self.file_logger.info(f"{get_current_time_string(True)}: {tc_info_string}") raw_tc = pus_tc_wrapper.pus_tc.pack() self.raw_logger.log_tc(pus_tc_wrapper.pus_tc) tc_info_string = f"Sending {pus_tc_wrapper.pus_tc}" LOGGER.info(tc_info_string) self.file_logger.info(f"{get_current_time_string(True)}: {tc_info_string}") params.com_if.send(raw_tc) def queue_finished_cb(self, info: ProcedureHelper): if info is not None and info.proc_type == TcQueueEntryType.PUS_TC: def_proc = info.to_def_procedure() LOGGER.info( f"Finished queue for service {def_proc.service} and op code {def_proc.op_code}" ) def setup_params(hook_obj: TmTcCfgHookBase) -> SetupWrapper: print(f"-- eive TMTC Commander --") print(f"-- spacepackets v{spacepackets.__version__} --") params = SetupParams() parser_wrapper = ArgParserWrapper(hook_obj) parser_wrapper.create_default_parent_parser() parser_wrapper.create_default_parser() parser_wrapper.add_def_proc_and_cfdp_as_subparsers() parser_wrapper.parse() tmtccmd.init_printout(parser_wrapper.use_gui) parser_wrapper.set_params(params) params.apid = EXAMPLE_PUS_APID setup_wrapper = SetupWrapper(hook_obj=hook_obj, setup_params=params) return setup_wrapper def setup_tmtc_handlers( verif_wrapper: VerificationWrapper, printer: FsfwTmTcPrinter, cfdp_handler: CfdpCcsdsWrapper, raw_logger: RawTmtcTimedLogWrapper, ) -> (CcsdsTmHandler, TcHandler): pus_handler = PusHandler( printer=printer, raw_logger=raw_logger, wrapper=verif_wrapper ) ccsds_handler = CcsdsTmHandler(None) ccsds_handler.add_apid_handler(pus_handler) tc_handler = TcHandler( file_logger=printer.file_logger, raw_logger=raw_logger, pus_verificator=verif_wrapper.pus_verificator, seq_count_provider=PusFileSeqCountProvider(), cfdp_handler=cfdp_handler, ) return ccsds_handler, tc_handler def setup_backend( setup_wrapper: SetupWrapper, tc_handler: TcHandler, ccsds_handler: CcsdsTmHandler, ) -> BackendBase: tmtc_backend = tmtccmd.create_default_tmtc_backend( setup_wrapper=setup_wrapper, tm_handler=ccsds_handler, tc_handler=tc_handler ) tmtccmd.start(tmtc_backend=tmtc_backend, hook_obj=setup_wrapper.hook_obj) return tmtc_backend