From 51871a887f8f3ffbcdb9f12c83b1ad309e8cf7b5 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Wed, 14 Sep 2022 19:01:47 +0200 Subject: [PATCH] moved a lot of code to tmtccmd --- common.py | 231 +++++------------------------------------------------- 1 file changed, 21 insertions(+), 210 deletions(-) diff --git a/common.py b/common.py index 50647d6..3336fcb 100644 --- a/common.py +++ b/common.py @@ -1,43 +1,35 @@ import logging import sys from pathlib import Path -from typing import Optional, Sequence, Tuple, cast +from typing import cast -from spacepackets import SpacePacket, SpacePacketHeader, PacketTypes +from spacepackets import SpacePacket, SpacePacketHeader from spacepackets.cfdp import ( TransmissionModes, - PduType, - DirectiveType, - GenericPduPacket, - PduHolder, - PduFactory, ChecksumTypes, ConditionCode, ) from spacepackets.cfdp.pdu import MetadataPdu, FileDataPdu from tmtccmd.cfdp import ( - RemoteEntityCfgTable, RemoteEntityCfg, LocalEntityCfg, CfdpUserBase, IndicationCfg, TransactionId, ) -from tmtccmd.cfdp.defs import CfdpRequestType, CfdpStates -from tmtccmd.cfdp.handler import SourceHandler, DestHandler +from tmtccmd.cfdp.defs import CfdpRequestType +from tmtccmd.cfdp.handler import CfdpInCcsdsHandler from tmtccmd.cfdp.mib import DefaultFaultHandlerBase -from tmtccmd.cfdp.request import PutRequest from tmtccmd.cfdp.user import ( FileSegmentRecvdParams, MetadataRecvParams, TransactionFinishedParams, ) from tmtccmd.config.args import ProcedureParamsWrapper -from tmtccmd.core import BackendState from tmtccmd.logging import get_current_time_string from tmtccmd.pus.pus_11_tc_sched import Subservices as Pus11Subservices from tmtccmd.tc.queue import DefaultPusQueueHelper -from tmtccmd.util import FileSeqCountProvider, PusFileSeqCountProvider, ProvidesSeqCount +from tmtccmd.util import FileSeqCountProvider, PusFileSeqCountProvider from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter try: @@ -73,7 +65,6 @@ from tmtccmd import ( TcHandlerBase, get_console_logger, TmTcCfgHookBase, - BackendBase, CcsdsTmtcBackend, ) from tmtccmd.pus import VerificationWrapper @@ -155,198 +146,17 @@ class ExampleCfdpUser(CfdpUserBase): pass -class CfdpCcsdsWrapper(SpecificApidHandlerBase): - def __init__( - self, - cfg: LocalEntityCfg, - user: CfdpUserBase, - remote_cfgs: Sequence[RemoteEntityCfg], - ccsds_apid: int, - cfdp_seq_cnt_provider: ProvidesSeqCount, - ccsds_seq_cnt_provider: ProvidesSeqCount, - ): - """Wrapper helper type used to wrap PDU packets into CCSDS packets and to extract PDU - packets from CCSDS packets. - - :param cfg: Local CFDP entity configuration. - :param user: User wrapper. This contains the indication callback implementations and the - virtual filestore implementation. - :param cfdp_seq_cnt_provider: Every CFDP file transfer has a transaction sequence number. - This provider is used to retrieve that sequence number. - :param ccsds_seq_cnt_provider: Each CFDP PDU is wrapped into a CCSDS space packet, and each - space packet has a dedicated sequence count. This provider is used to retrieve the - sequence count. - :param ccsds_apid: APID to use for the CCSDS space packet header wrapped around each PDU. - This is important so that the OBSW can distinguish between regular PUS packets and - CFDP packets. - """ +class CfdpInCcsdsWrapper(SpecificApidHandlerBase): + def __init__(self, cfdp_in_ccsds_handler: CfdpInCcsdsHandler): super().__init__(EXAMPLE_CFDP_APID, None) - self.handler = CfdpHandler(cfg, user, cfdp_seq_cnt_provider, remote_cfgs) - self.ccsds_seq_cnt_provider = ccsds_seq_cnt_provider - self.ccsds_apid = ccsds_apid - - def pull_next_source_packet(self) -> Optional[Tuple[PduHolder, SpacePacket]]: - """Retrieves the next PDU to send and wraps it into a space packet""" - next_packet = self.handler.pull_next_source_packet() - if next_packet is None: - return next_packet - sp_header = SpacePacketHeader( - packet_type=PacketTypes.TC, - apid=self.ccsds_apid, - seq_count=self.ccsds_seq_cnt_provider.get_and_increment(), - data_len=next_packet.packet_len - 1, - ) - return next_packet, SpacePacket(sp_header, None, next_packet.pack()) - - def pull_next_dest_packet(self) -> Optional[Tuple[PduHolder, SpacePacket]]: - """Retrieves the next PDU to send and wraps it into a space packet""" - next_packet = self.handler.pull_next_dest_packet() - if next_packet is None: - return next_packet - sp_header = SpacePacketHeader( - packet_type=PacketTypes.TC, - apid=self.ccsds_apid, - seq_count=self.ccsds_seq_cnt_provider.get_and_increment(), - data_len=next_packet.packet_len - 1, - ) - return next_packet, SpacePacket(sp_header, None, next_packet.pack()) - - def confirm_dest_packet_sent(self): - self.handler.confirm_dest_packet_sent() - - def confirm_source_packet_sent(self): - self.handler.confirm_source_packet_sent() - - def __iter__(self): - return self - - def __next__( - self, - ) -> ( - Optional[Tuple[PduHolder, SpacePacket]], - Optional[Tuple[PduHolder, SpacePacket]], - ): - """The iterator for this class will returns a tuple of optional PDUs wrapped b a - :py:class:`PduHolder`. - - :return: Can be a tuple where the first entry can hold a source packet and the second entry - can be a destination packet. If both packets are None, a StopIteration will be raised. - """ - next_source_tuple = self.pull_next_source_packet() - next_dest_tuple = self.pull_next_dest_packet() - if not next_source_tuple and not next_dest_tuple: - raise StopIteration - return next_source_tuple, next_dest_tuple - - def pass_packet(self, space_packet: SpacePacket): - # Unwrap the user data and pass it to the handler - pdu_raw = space_packet.user_data - pdu_base = PduFactory.from_raw(pdu_raw) - self.handler.pass_packet(pdu_base) + self.handler = cfdp_in_ccsds_handler def handle_tm(self, packet: bytes, _user_args: any): ccsds_header_raw = packet[0:6] sp_header = SpacePacketHeader.unpack(ccsds_header_raw) pdu = packet[6:] sp = SpacePacket(sp_header, sec_header=None, user_data=pdu) - self.pass_packet(sp) - - -class CfdpHandler: - def __init__( - self, - cfg: LocalEntityCfg, - user: CfdpUserBase, - seq_cnt_provider: ProvidesSeqCount, - remote_cfgs: Sequence[RemoteEntityCfg], - ): - self.remote_cfg_table = RemoteEntityCfgTable() - self.remote_cfg_table.add_configs(remote_cfgs) - self.dest_handler = DestHandler(cfg, user, self.remote_cfg_table) - self.source_handler = SourceHandler(cfg, seq_cnt_provider, user) - - def put_request(self, request: PutRequest): - if not self.remote_cfg_table.get_cfg(request.cfg.destination_id): - raise ValueError( - f"No remote CFDP config found for entity ID {request.cfg.destination_id}" - ) - self.source_handler.put_request( - request, self.remote_cfg_table.get_cfg(request.cfg.destination_id) - ) - - def pull_next_source_packet(self) -> Optional[PduHolder]: - res = self.source_handler.state_machine() - if res.states.packet_ready: - return self.source_handler.pdu_holder - return None - - def pull_next_dest_packet(self) -> Optional[PduHolder]: - res = self.dest_handler.state_machine() - if res.states.packet_ready: - return self.dest_handler.pdu_holder - return None - - def __iter__(self): - return self - - def __next__(self) -> (Optional[PduHolder], Optional[PduHolder]): - """The iterator for this class will returns a tuple of optional PDUs wrapped b a - :py:class:`PduHolder`. - - :return: Can be a tuple where the first entry can hold a source packet and the second entry - can be a destination packet. If both packets are None, a StopIteration will be raised. - """ - next_source_packet = self.pull_next_source_packet() - next_dest_packet = self.pull_next_dest_packet() - if not next_dest_packet and not next_source_packet: - raise StopIteration - return next_source_packet, next_dest_packet - - def put_request_pending(self) -> bool: - return self.source_handler.states.state != CfdpStates.IDLE - - def confirm_dest_packet_sent(self): - self.dest_handler.confirm_packet_sent_advance_fsm() - - def confirm_source_packet_sent(self): - self.source_handler.confirm_packet_sent_advance_fsm() - - def pass_packet(self, packet: GenericPduPacket): - """This function routes the packets based on PDU type and directive type if applicable. - - The routing is based on section 4.5 of the CFDP standard whcih specifies the PDU forwarding - procedure. - """ - if packet.pdu_type == PduType.FILE_DATA: - self.dest_handler.pass_packet(packet) - else: - if packet.directive_type in [ - DirectiveType.METADATA_PDU, - DirectiveType.EOF_PDU, - DirectiveType.PROMPT_PDU, - ]: - # Section b) of 4.5.3: These PDUs should always be targeted towards the file - # receiver a.k.a. the destination handler - self.dest_handler.pass_packet(packet) - elif packet.directive_type in [ - DirectiveType.FINISHED_PDU, - DirectiveType.NAK_PDU, - DirectiveType.KEEP_ALIVE_PDU, - ]: - # Section c) of 4.5.3: These PDUs should always be targeted towards the file sender - # a.k.a. the source handler - self.source_handler.pass_packet(packet) - elif packet.directive_type == DirectiveType.ACK_PDU: - # Section a): Recipient depends on the type of PDU that is being acknowledged. - # We can simply extract the PDU type from the raw stream. If it is an EOF PDU, - # this packet is passed to the source handler. For a finished PDU, it is - # passed to the destination handler - pdu_holder = PduHolder(packet) - ack_pdu = pdu_holder.to_ack_pdu() - if ack_pdu.directive_code_of_acked_pdu == DirectiveType.EOF_PDU: - self.source_handler.pass_packet(packet) - elif ack_pdu.directive_code_of_acked_pdu == DirectiveType.FINISHED_PDU: - self.dest_handler.pass_packet(packet) + self.handler.pass_packet(sp) class PusHandler(SpecificApidHandlerBase): @@ -374,7 +184,7 @@ class TcHandler(TcHandlerBase): def __init__( self, seq_count_provider: FileSeqCountProvider, - cfdp_in_ccsds_wrapper: CfdpCcsdsWrapper, + cfdp_in_ccsds_wrapper: CfdpInCcsdsWrapper, pus_verificator: PusVerificator, file_logger: logging.Logger, raw_logger: RawTmtcTimedLogWrapper, @@ -436,14 +246,14 @@ class TcHandler(TcHandlerBase): LOGGER.info( f"CFDP: Starting file put request with parameters:\n{put_req}" ) - self.cfdp_in_ccsds_wrapper.handler.put_request(put_req) + self.cfdp_in_ccsds_wrapper.handler.cfdp_handler.put_request(put_req) self.cfdp_handler_started = True ( pdu_holder, packet, - ) = self.cfdp_in_ccsds_wrapper.pull_next_source_packet() + ) = self.cfdp_in_ccsds_wrapper.handler.pull_next_source_packet() self.queue_helper.add_ccsds_tc(packet) - self.cfdp_in_ccsds_wrapper.confirm_source_packet_sent() + self.cfdp_in_ccsds_wrapper.handler.confirm_source_packet_sent() metadata = cast(MetadataPdu, pdu_holder.to_metadata_pdu()) self.queue_helper.add_log_cmd( f"CFDP Source: Sending Metadata PDU for file with size {metadata.file_size}" @@ -451,10 +261,10 @@ class TcHandler(TcHandlerBase): ( pdu_holder, packet, - ) = self.cfdp_in_ccsds_wrapper.pull_next_source_packet() + ) = self.cfdp_in_ccsds_wrapper.handler.pull_next_source_packet() fd_pdu = cast(FileDataPdu, pdu_holder.to_file_data_pdu()) self.queue_helper.add_ccsds_tc(packet) - self.cfdp_in_ccsds_wrapper.confirm_source_packet_sent() + self.cfdp_in_ccsds_wrapper.handler.confirm_source_packet_sent() self.queue_helper.add_log_cmd( f"CFDP Source: Sending File Data PDU for segment at offset {fd_pdu.offset} " f"with length {len(fd_pdu.file_data)}" @@ -462,11 +272,11 @@ class TcHandler(TcHandlerBase): ( pdu_holder, packet, - ) = self.cfdp_in_ccsds_wrapper.pull_next_source_packet() + ) = self.cfdp_in_ccsds_wrapper.handler.pull_next_source_packet() self.queue_helper.add_log_cmd(f"CFDP Source: Sending EOF PDU") self.queue_helper.add_ccsds_tc(packet) - self.cfdp_in_ccsds_wrapper.confirm_source_packet_sent() + self.cfdp_in_ccsds_wrapper.handler.confirm_source_packet_sent() self.cfdp_in_ccsds_wrapper.handler.source_handler.state_machine() def send_cb(self, params: SendCbParams): @@ -561,7 +371,7 @@ def setup_tmtc_handlers( file_name=Path("seqcnt_cfdp_ccsds_.txt") ) cfdp_user = ExampleCfdpUser() - cfdp_in_ccsds_handler = CfdpCcsdsWrapper( + cfdp_in_ccsds_handler = CfdpInCcsdsHandler( cfg=cfdp_cfg, remote_cfgs=[remote_cfg], ccsds_apid=EXAMPLE_CFDP_APID, @@ -569,6 +379,7 @@ def setup_tmtc_handlers( cfdp_seq_cnt_provider=cfdp_seq_count_provider, user=cfdp_user, ) + cfdp_in_ccsds_wrapper = CfdpInCcsdsWrapper(cfdp_in_ccsds_handler) pus_handler = PusHandler( printer=printer, raw_logger=raw_logger, wrapper=verif_wrapper @@ -582,7 +393,7 @@ def setup_tmtc_handlers( seq_count_provider=PusFileSeqCountProvider( file_name=Path("seqcnt_pus_ccsds.txt") ), - cfdp_in_ccsds_wrapper=cfdp_in_ccsds_handler, + cfdp_in_ccsds_wrapper=cfdp_in_ccsds_wrapper, ) return ccsds_handler, tc_handler @@ -597,7 +408,7 @@ def setup_backend( setup_wrapper=setup_wrapper, tm_handler=ccsds_handler, tc_handler=tc_handler, - init_procedure=init_proc.base, + init_procedure=init_proc, ) tmtccmd.start(tmtc_backend=tmtc_backend, hook_obj=setup_wrapper.hook_obj) return cast(CcsdsTmtcBackend, tmtc_backend)