#!/usr/bin/env python3 from datetime import timedelta from pathlib import Path import os import ipaddress import tempfile import socket import select import threading import argparse import logging import time import copy from threading import Thread, Event from typing import Any, Dict, List, Tuple, Optional from multiprocessing import Queue from queue import Empty from cfdppy.handler import DestHandler, RemoteEntityCfgTable, SourceHandler from cfdppy.exceptions import InvalidDestinationId, SourceFileDoesNotExist from cfdppy import ( CfdpUserBase, LocalEntityCfg, PacketDestination, PutRequest, TransactionId, get_packet_destination, CfdpState, ) from cfdppy.mib import ( CheckTimerProvider, DefaultFaultHandlerBase, EntityType, IndicationCfg, RemoteEntityCfg, ) from cfdppy.user import ( FileSegmentRecvdParams, MetadataRecvParams, TransactionFinishedParams, TransactionParams, ) from spacepackets.cfdp import ChecksumType, ConditionCode, TransmissionMode from spacepackets.cfdp.pdu import AbstractFileDirectiveBase, PduFactory, PduHolder from spacepackets.cfdp.tlv import ( MessageToUserTlv, OriginatingTransactionId, ProxyMessageType, ProxyPutResponse, ReservedCfdpMessage, ) from spacepackets.cfdp.tlv.msg_to_user import ProxyPutResponseParams from spacepackets.countdown import Countdown from spacepackets.seqcount import SeqCountProvider from spacepackets.util import ByteFieldU16, UnsignedByteField PYTHON_ENTITY_ID = ByteFieldU16(1) RUST_ENTITY_ID = ByteFieldU16(2) # Enable all indications for both local and remote entity. INDICATION_CFG = IndicationCfg() BASE_STR_SRC = "PY SRC" BASE_STR_DEST = "PY DEST" FILE_CONTENT = "Hello World!\n" FILE_SEGMENT_SIZE = 256 MAX_PACKET_LEN = 512 # This queue is used to send put requests. PUT_REQ_QUEUE = Queue() # All telecommands which should go to the source handler should be put into this queue by # the UDP server. SOURCE_ENTITY_QUEUE = Queue() # All telecommands which should go to the destination handler should be put into this queue by # the UDP server. DEST_ENTITY_QUEUE = Queue() # All telemetry which should be sent to the remote entity is put into this queue and will then # be sent by the UDP server. TM_QUEUE = Queue() REMOTE_CFG_OF_PY_ENTITY = RemoteEntityCfg( entity_id=PYTHON_ENTITY_ID, max_packet_len=MAX_PACKET_LEN, max_file_segment_len=FILE_SEGMENT_SIZE, closure_requested=True, crc_on_transmission=False, default_transmission_mode=TransmissionMode.ACKNOWLEDGED, crc_type=ChecksumType.CRC_32, ) REMOTE_CFG_OF_REMOTE_ENTITY = copy.copy(REMOTE_CFG_OF_PY_ENTITY) REMOTE_CFG_OF_REMOTE_ENTITY.entity_id = RUST_ENTITY_ID RUST_PORT = 5111 PY_PORT = 5222 _LOGGER = logging.getLogger(__name__) class UdpServer(Thread): def __init__( self, sleep_time: float, addr: Tuple[str, int], explicit_remote_addr: Optional[Tuple[str, int]], tx_queue: Queue, source_entity_rx_queue: Queue, dest_entity_rx_queue: Queue, stop_signal: Event, ): super().__init__() self.sleep_time = sleep_time self.udp_socket = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM) self.addr = addr self.explicit_remote_addr = explicit_remote_addr self.udp_socket.bind(addr) self.tm_queue = tx_queue self.last_sender = None self.stop_signal = stop_signal self.source_entity_queue = source_entity_rx_queue self.dest_entity_queue = dest_entity_rx_queue def run(self): _LOGGER.info(f"Starting UDP server on {self.addr}") while True: if self.stop_signal.is_set(): break self.periodic_operation() time.sleep(self.sleep_time) def periodic_operation(self): while True: next_packet = self.poll_next_udp_packet() if next_packet is None or next_packet.pdu is None: break # Perform PDU routing. packet_dest = get_packet_destination(next_packet.pdu) _LOGGER.debug(f"UDP server: Routing {next_packet} to {packet_dest}") if packet_dest == PacketDestination.DEST_HANDLER: self.dest_entity_queue.put(next_packet.pdu) elif packet_dest == PacketDestination.SOURCE_HANDLER: self.source_entity_queue.put(next_packet.pdu) self.send_packets() def poll_next_udp_packet(self) -> Optional[PduHolder]: ready = select.select([self.udp_socket], [], [], 0) if ready[0]: data, self.last_sender = self.udp_socket.recvfrom(4096) return PduFactory.from_raw_to_holder(data) return None def send_packets(self): while True: try: next_tm = self.tm_queue.get(False) if not isinstance(next_tm, bytes) and not isinstance( next_tm, bytearray ): _LOGGER.error( f"UDP server can only sent bytearray, received {next_tm}" ) continue if self.explicit_remote_addr is not None: self.udp_socket.sendto(next_tm, self.explicit_remote_addr) elif self.last_sender is not None: self.udp_socket.sendto(next_tm, self.last_sender) else: _LOGGER.warning( "UDP Server: No packet destination found, dropping TM" ) except Empty: break class SourceEntityHandler(Thread): def __init__( self, base_str: str, verbose_level: int, source_handler: SourceHandler, put_req_queue: Queue, source_entity_queue: Queue, tm_queue: Queue, stop_signal: Event, ): super().__init__() self.base_str = base_str self.verbose_level = verbose_level self.source_handler = source_handler self.put_req_queue = put_req_queue self.source_entity_queue = source_entity_queue self.tm_queue = tm_queue self.stop_signal = stop_signal def _idle_handling(self) -> bool: try: put_req: PutRequest = self.put_req_queue.get(False) _LOGGER.info(f"{self.base_str}: Handling Put Request: {put_req}") if put_req.destination_id not in [PYTHON_ENTITY_ID, RUST_ENTITY_ID]: _LOGGER.warning( f"can only handle put requests target towards {RUST_ENTITY_ID} or " f"{PYTHON_ENTITY_ID}" ) else: try: self.source_handler.put_request(put_req) except SourceFileDoesNotExist as e: _LOGGER.warning( f"can not handle put request, source file {e.file} does not exist" ) return True except Empty: return False def _busy_handling(self): # We are getting the packets from a Queue here, they could for example also be polled # from a network. packet_received = False packet = None try: # We are getting the packets from a Queue here, they could for example also be polled # from a network. packet = self.source_entity_queue.get(False) packet_received = True except Empty: pass try: packet_sent = self._call_source_state_machine(packet) # If there is no work to do, put the thread to sleep. if not packet_received and not packet_sent: return False except SourceFileDoesNotExist: _LOGGER.warning("Source file does not exist") self.source_handler.reset() def _call_source_state_machine( self, packet: Optional[AbstractFileDirectiveBase] ) -> bool: """Returns whether a packet was sent.""" if packet is not None: _LOGGER.debug(f"{self.base_str}: Inserting {packet}") try: fsm_result = self.source_handler.state_machine(packet) except InvalidDestinationId as e: _LOGGER.warning( f"invalid destination ID {e.found_dest_id} on packet {packet}, expected " f"{e.expected_dest_id}" ) fsm_result = self.source_handler.state_machine(None) packet_sent = False if fsm_result.states.num_packets_ready > 0: while fsm_result.states.num_packets_ready > 0: next_pdu_wrapper = self.source_handler.get_next_packet() assert next_pdu_wrapper is not None if self.verbose_level >= 1: _LOGGER.debug( f"{self.base_str}: Sending packet {next_pdu_wrapper.pdu}" ) # Send all packets which need to be sent. self.tm_queue.put(next_pdu_wrapper.pack()) packet_sent = True return packet_sent def run(self): _LOGGER.info(f"Starting {self.base_str}") while True: if self.stop_signal.is_set(): break if self.source_handler.state == CfdpState.IDLE: if not self._idle_handling(): time.sleep(0.2) continue if self.source_handler.state == CfdpState.BUSY: if not self._busy_handling(): time.sleep(0.2) class DestEntityHandler(Thread): def __init__( self, base_str: str, verbose_level: int, dest_handler: DestHandler, dest_entity_queue: Queue, tm_queue: Queue, stop_signal: Event, ): super().__init__() self.base_str = base_str self.verbose_level = verbose_level self.dest_handler = dest_handler self.dest_entity_queue = dest_entity_queue self.tm_queue = tm_queue self.stop_signal = stop_signal def run(self): _LOGGER.info( f"Starting {self.base_str}. Local ID {self.dest_handler.cfg.local_entity_id}" ) while True: packet_received = False packet = None if self.stop_signal.is_set(): break try: packet = self.dest_entity_queue.get(False) packet_received = True except Empty: pass if packet is not None: _LOGGER.debug(f"{self.base_str}: Inserting {packet}") fsm_result = self.dest_handler.state_machine(packet) packet_sent = False if fsm_result.states.num_packets_ready > 0: while fsm_result.states.num_packets_ready > 0: next_pdu_wrapper = self.dest_handler.get_next_packet() assert next_pdu_wrapper is not None if self.verbose_level >= 1: _LOGGER.debug( f"{self.base_str}: Sending packet {next_pdu_wrapper.pdu}" ) self.tm_queue.put(next_pdu_wrapper.pack()) packet_sent = True # If there is no work to do, put the thread to sleep. if not packet_received and not packet_sent: time.sleep(0.5) class CfdpFaultHandler(DefaultFaultHandlerBase): def __init__(self, base_str: str): self.base_str = base_str super().__init__() def notice_of_suspension_cb( self, transaction_id: TransactionId, cond: ConditionCode, progress: int ): _LOGGER.warning( f"{self.base_str}: Received Notice of Suspension for transaction {transaction_id!r} " f"with condition code {cond!r}. Progress: {progress}" ) def notice_of_cancellation_cb( self, transaction_id: TransactionId, cond: ConditionCode, progress: int ): _LOGGER.warning( f"{self.base_str}: Received Notice of Cancellation for transaction {transaction_id!r} " f"with condition code {cond!r}. Progress: {progress}" ) def abandoned_cb( self, transaction_id: TransactionId, cond: ConditionCode, progress: int ): _LOGGER.warning( f"{self.base_str}: Abandoned fault for transaction {transaction_id!r} " f"with condition code {cond!r}. Progress: {progress}" ) def ignore_cb( self, transaction_id: TransactionId, cond: ConditionCode, progress: int ): _LOGGER.warning( f"{self.base_str}: Ignored fault for transaction {transaction_id!r} " f"with condition code {cond!r}. Progress: {progress}" ) class CfdpUser(CfdpUserBase): def __init__(self, base_str: str, put_req_queue: Queue): self.base_str = base_str self.put_req_queue = put_req_queue # This is a dictionary where the key is the current transaction ID for a transaction which # was triggered by a proxy request with a originating ID. self.active_proxy_put_reqs: Dict[TransactionId, TransactionId] = {} super().__init__() def transaction_indication( self, transaction_indication_params: TransactionParams, ): """This indication is used to report the transaction ID to the CFDP user""" _LOGGER.info( f"{self.base_str}: Transaction.indication for {transaction_indication_params.transaction_id}" ) if transaction_indication_params.originating_transaction_id is not None: _LOGGER.info( f"Originating Transaction ID: {transaction_indication_params.originating_transaction_id}" ) self.active_proxy_put_reqs.update( { transaction_indication_params.transaction_id: transaction_indication_params.originating_transaction_id } ) def eof_sent_indication(self, transaction_id: TransactionId): _LOGGER.info(f"{self.base_str}: EOF-Sent.indication for {transaction_id}") def transaction_finished_indication(self, params: TransactionFinishedParams): _LOGGER.info( f"{self.base_str}: Transaction-Finished.indication for {params.transaction_id}." ) _LOGGER.info(f"Condition Code: {params.finished_params.condition_code!r}") _LOGGER.info(f"Delivery Code: {params.finished_params.delivery_code!r}") _LOGGER.info(f"File Status: {params.finished_params.file_status!r}") if params.transaction_id in self.active_proxy_put_reqs: proxy_put_response = ProxyPutResponse( ProxyPutResponseParams.from_finished_params(params.finished_params) ).to_generic_msg_to_user_tlv() originating_id = self.active_proxy_put_reqs.get(params.transaction_id) assert originating_id is not None put_req = PutRequest( destination_id=originating_id.source_id, source_file=None, dest_file=None, trans_mode=None, closure_requested=None, msgs_to_user=[ proxy_put_response, OriginatingTransactionId( originating_id ).to_generic_msg_to_user_tlv(), ], ) _LOGGER.info( f"Requesting Proxy Put Response concluding Proxy Put originating from " f"{originating_id}" ) self.put_req_queue.put(put_req) self.active_proxy_put_reqs.pop(params.transaction_id) def metadata_recv_indication(self, params: MetadataRecvParams): _LOGGER.info( f"{self.base_str}: Metadata-Recv.indication for {params.transaction_id}." ) if params.msgs_to_user is not None: self._handle_msgs_to_user(params.transaction_id, params.msgs_to_user) def _handle_msgs_to_user( self, transaction_id: TransactionId, msgs_to_user: List[MessageToUserTlv] ): for msg_to_user in msgs_to_user: if msg_to_user.is_reserved_cfdp_message(): reserved_msg_tlv = msg_to_user.to_reserved_msg_tlv() assert reserved_msg_tlv is not None self._handle_reserved_cfdp_message(transaction_id, reserved_msg_tlv) else: _LOGGER.info(f"Received custom message to user: {msg_to_user}") def _handle_reserved_cfdp_message( self, transaction_id: TransactionId, reserved_cfdp_msg: ReservedCfdpMessage ): if reserved_cfdp_msg.is_cfdp_proxy_operation(): self._handle_cfdp_proxy_operation(transaction_id, reserved_cfdp_msg) elif reserved_cfdp_msg.is_originating_transaction_id(): _LOGGER.info( f"Received originating transaction ID: " f"{reserved_cfdp_msg.get_originating_transaction_id()}" ) def _handle_cfdp_proxy_operation( self, transaction_id: TransactionId, reserved_cfdp_msg: ReservedCfdpMessage ): if ( reserved_cfdp_msg.get_cfdp_proxy_message_type() == ProxyMessageType.PUT_REQUEST ): put_req_params = reserved_cfdp_msg.get_proxy_put_request_params() _LOGGER.info(f"Received Proxy Put Request: {put_req_params}") assert put_req_params is not None put_req = PutRequest( destination_id=put_req_params.dest_entity_id, source_file=Path(put_req_params.source_file_as_path), dest_file=Path(put_req_params.dest_file_as_path), trans_mode=None, closure_requested=None, msgs_to_user=[ OriginatingTransactionId( transaction_id ).to_generic_msg_to_user_tlv() ], ) self.put_req_queue.put(put_req) elif ( reserved_cfdp_msg.get_cfdp_proxy_message_type() == ProxyMessageType.PUT_RESPONSE ): put_response_params = reserved_cfdp_msg.get_proxy_put_response_params() _LOGGER.info(f"Received Proxy Put Response: {put_response_params}") def file_segment_recv_indication(self, params: FileSegmentRecvdParams): _LOGGER.info( f"{self.base_str}: File-Segment-Recv.indication for {params.transaction_id}." ) def report_indication(self, transaction_id: TransactionId, status_report: Any): # TODO: p.28 of the CFDP standard specifies what information the status report parameter # could contain. I think it would be better to not hardcode the type of the status # report here, but something like Union[any, CfdpStatusReport] with CfdpStatusReport # being an implementation which supports all three information suggestions would be # nice pass def suspended_indication( self, transaction_id: TransactionId, cond_code: ConditionCode ): _LOGGER.info( f"{self.base_str}: Suspended.indication for {transaction_id} | Condition Code: {cond_code}" ) def resumed_indication(self, transaction_id: TransactionId, progress: int): _LOGGER.info( f"{self.base_str}: Resumed.indication for {transaction_id} | Progress: {progress} bytes" ) def fault_indication( self, transaction_id: TransactionId, cond_code: ConditionCode, progress: int ): _LOGGER.info( f"{self.base_str}: Fault.indication for {transaction_id} | Condition Code: {cond_code} | " f"Progress: {progress} bytes" ) def abandoned_indication( self, transaction_id: TransactionId, cond_code: ConditionCode, progress: int ): _LOGGER.info( f"{self.base_str}: Abandoned.indication for {transaction_id} | Condition Code: {cond_code} |" f" Progress: {progress} bytes" ) def eof_recv_indication(self, transaction_id: TransactionId): _LOGGER.info(f"{self.base_str}: EOF-Recv.indication for {transaction_id}") class CustomCheckTimerProvider(CheckTimerProvider): def provide_check_timer( self, local_entity_id: UnsignedByteField, remote_entity_id: UnsignedByteField, entity_type: EntityType, ) -> Countdown: return Countdown(timedelta(seconds=5.0)) def main(): parser = argparse.ArgumentParser( prog="CFDP Local Entity Application", formatter_class=argparse.RawTextHelpFormatter, ) parser.add_argument("-v", "--verbose", action="count", default=0) parser.add_argument( "-f", help="Perform a file-copy operation", action="store_true", dest="file_copy", ) parser.add_argument( "-m", "--mode", dest="transmission_mode", help=( f"Specify the transfer type{os.linesep}" f' - "0" or "ack" for unacknowledged (Class 0) transfers{os.linesep}' f' - "1" or "nak" for acknowledged (Class 1) transfers. Default value' ), default="nak", ) # Optional Boolean argument where you can specify True/False parser.add_argument( "-c", type=bool, nargs="?", const=True, default=None, dest="closure_requested", help="Request transaction closure for the unacknowledged mode", ) args = parser.parse_args() stop_signal = threading.Event() logging_level = logging.INFO if args.verbose >= 1: logging_level = logging.DEBUG logging.basicConfig(level=logging_level) remote_cfg_table = RemoteEntityCfgTable() remote_cfg_table.add_config(REMOTE_CFG_OF_REMOTE_ENTITY) src_fault_handler = CfdpFaultHandler(BASE_STR_SRC) # 16 bit sequence count for transactions. src_seq_count_provider = SeqCountProvider(16) src_user = CfdpUser(BASE_STR_SRC, PUT_REQ_QUEUE) check_timer_provider = CustomCheckTimerProvider() source_handler = SourceHandler( cfg=LocalEntityCfg(PYTHON_ENTITY_ID, INDICATION_CFG, src_fault_handler), seq_num_provider=src_seq_count_provider, remote_cfg_table=remote_cfg_table, user=src_user, check_timer_provider=check_timer_provider, ) source_entity_task = SourceEntityHandler( BASE_STR_SRC, logging_level, source_handler, PUT_REQ_QUEUE, SOURCE_ENTITY_QUEUE, TM_QUEUE, stop_signal, ) # Enable all indications. dest_fault_handler = CfdpFaultHandler(BASE_STR_DEST) dest_user = CfdpUser(BASE_STR_DEST, PUT_REQ_QUEUE) dest_handler = DestHandler( cfg=LocalEntityCfg(PYTHON_ENTITY_ID, INDICATION_CFG, dest_fault_handler), user=dest_user, remote_cfg_table=remote_cfg_table, check_timer_provider=check_timer_provider, ) dest_entity_task = DestEntityHandler( BASE_STR_DEST, logging_level, dest_handler, DEST_ENTITY_QUEUE, TM_QUEUE, stop_signal, ) # Address Any to accept CFDP packets from other address than localhost. local_addr = ipaddress.ip_address("0.0.0.0") # Localhost as default. remote_addr = ipaddress.ip_address("127.0.0.1") udp_server = UdpServer( sleep_time=0.1, addr=(str(local_addr), PY_PORT), explicit_remote_addr=(str(remote_addr), RUST_PORT), tx_queue=TM_QUEUE, source_entity_rx_queue=SOURCE_ENTITY_QUEUE, dest_entity_rx_queue=DEST_ENTITY_QUEUE, stop_signal=stop_signal, ) # Prepare a put request / file copy operation if the user specifies it. if args.file_copy: _LOGGER.info("Performing file copy operation") transmission_mode = None if args.transmission_mode == "ack": transmission_mode = TransmissionMode.ACKNOWLEDGED elif args.transmission_mode == "nak": transmission_mode = TransmissionMode.UNACKNOWLEDGED with tempfile.NamedTemporaryFile(delete=False) as srcfile: srcfile.write(FILE_CONTENT.encode()) srcfile_path = srcfile.name tempdir = tempfile.TemporaryDirectory() put_req = PutRequest( destination_id=RUST_ENTITY_ID, source_file=Path(srcfile_path), dest_file=Path(tempdir.name).joinpath("test.txt"), closure_requested=args.closure_requested, trans_mode=transmission_mode, ) PUT_REQ_QUEUE.put(put_req) source_entity_task.start() dest_entity_task.start() udp_server.start() try: while True: time.sleep(1) except KeyboardInterrupt: stop_signal.set() source_entity_task.join() dest_entity_task.join() udp_server.join() if __name__ == "__main__": main()