import logging from typing import Any from eive_tmtc.config.definitions import CFDP_APID from spacepackets.ccsds import SPACE_PACKET_HEADER_SIZE from spacepackets.cfdp import PduFactory, PduType from eive_tmtc.cfdp.handler import CfdpInCcsdsHandler from tmtccmd.tmtc import SpecificApidHandlerBase _LOGGER = logging.getLogger(__name__) class CfdpInCcsdsWrapper(SpecificApidHandlerBase): def __init__(self, cfdp_in_ccsds_handler: CfdpInCcsdsHandler): self.handler = cfdp_in_ccsds_handler super().__init__(CFDP_APID, None) def handle_tm(self, packet: bytes, _user_args: Any): # Ignore the space packet header. Its only purpose is to use the same protocol and # have a seaprate APID for space packets. If this function is called, the APID is correct. pdu = packet[SPACE_PACKET_HEADER_SIZE:] generic_pdu = PduFactory.from_raw(pdu) assert generic_pdu is not None if generic_pdu.pdu_type == PduType.FILE_DATA: _LOGGER.info("Received File Data PDU") else: directive_type = PduFactory.pdu_directive_type(pdu) _LOGGER.info(f"Received File Directive PDU with type {directive_type!r}") self.handler.insert_pdu_packet(generic_pdu)