Robin Mueller
db70b33bd6
All checks were successful
EIVE/-/pipeline/pr-v6.0.0-dev This commit looks good
260 lines
9.5 KiB
Python
260 lines
9.5 KiB
Python
from dataclasses import dataclass
|
|
from typing import Optional, Tuple
|
|
|
|
import deprecation
|
|
from spacepackets import PacketType, SpacePacket, SpacePacketHeader
|
|
from spacepackets.cfdp import GenericPduPacket, PduFactory
|
|
from spacepackets.cfdp.pdu import PduHolder
|
|
from cfdppy import (
|
|
CfdpUserBase,
|
|
LocalEntityCfg,
|
|
RemoteEntityCfgTable,
|
|
)
|
|
from cfdppy.defs import CfdpState
|
|
from cfdppy.handler import (
|
|
DestHandler,
|
|
DestStateWrapper,
|
|
SourceHandler,
|
|
SourceStateWrapper,
|
|
)
|
|
from cfdppy.handler.common import PacketDestination, get_packet_destination
|
|
from cfdppy.mib import CheckTimerProvider
|
|
from cfdppy.request import PutRequest
|
|
from spacepackets.seqcount import ProvidesSeqCount
|
|
from tmtccmd.version import get_version
|
|
|
|
|
|
@dataclass
|
|
class StateWrapper:
|
|
source_handler_state = SourceStateWrapper()
|
|
dest_handler_state = DestStateWrapper()
|
|
|
|
|
|
class CfdpHandler:
|
|
"""Wrapper class which wraps both the :py:class:`tmtccmd.cfdp.handler.source.SourceHandler` and
|
|
:py:class:`tmtccmd.cfdp.handler.dest.DestHandler` in a sensible way.
|
|
|
|
If you have special requirements, for example you want to spawn a new destination handler
|
|
for each file copy transfer to allow multiple consecutive file transfers, it might be a good
|
|
idea to write a custom wrapper."""
|
|
|
|
def __init__(
|
|
self,
|
|
cfg: LocalEntityCfg,
|
|
user: CfdpUserBase,
|
|
seq_cnt_provider: ProvidesSeqCount,
|
|
remote_cfg_table: RemoteEntityCfgTable,
|
|
check_timer_provider: CheckTimerProvider,
|
|
):
|
|
self.remote_cfg_table = remote_cfg_table
|
|
self.dest_handler = DestHandler(
|
|
cfg=cfg,
|
|
user=user,
|
|
remote_cfg_table=remote_cfg_table,
|
|
check_timer_provider=check_timer_provider,
|
|
)
|
|
self.source_handler = SourceHandler(
|
|
cfg=cfg,
|
|
seq_num_provider=seq_cnt_provider,
|
|
user=user,
|
|
remote_cfg_table=remote_cfg_table,
|
|
check_timer_provider=check_timer_provider,
|
|
)
|
|
|
|
def put_request(self, request: PutRequest):
|
|
if not self.remote_cfg_table.get_cfg(request.destination_id):
|
|
raise ValueError(
|
|
f"No remote CFDP config found for entity ID {request.destination_id}"
|
|
)
|
|
self.source_handler.put_request(
|
|
request, self.remote_cfg_table.get_cfg(request.destination_id) # type: ignore
|
|
)
|
|
|
|
def pull_next_source_packet(self) -> Optional[PduHolder]:
|
|
res = self.source_handler.state_machine()
|
|
if res.states.num_packets_ready:
|
|
return self.source_handler.get_next_packet()
|
|
return None
|
|
|
|
def pull_next_dest_packet(self) -> Optional[PduHolder]:
|
|
res = self.dest_handler.state_machine()
|
|
if res.states.packets_ready:
|
|
return self.dest_handler.get_next_packet()
|
|
return None
|
|
|
|
def __iter__(self):
|
|
return self
|
|
|
|
def __next__(self) -> Tuple[Optional[PduHolder], Optional[PduHolder]]:
|
|
"""The iterator for this class will returns a tuple of optional PDUs wrapped b a
|
|
:py:class:`PduHolder`.
|
|
|
|
:return: Can be a tuple where the first entry can hold a source packet and the second entry
|
|
can be a destination packet. If both packets are None, a StopIteration will be raised.
|
|
"""
|
|
next_source_packet = self.pull_next_source_packet()
|
|
next_dest_packet = self.pull_next_dest_packet()
|
|
if not next_dest_packet and not next_source_packet:
|
|
raise StopIteration
|
|
return next_source_packet, next_dest_packet
|
|
|
|
def put_request_pending(self) -> bool:
|
|
return self.source_handler.states.state != CfdpState.IDLE
|
|
|
|
@deprecation.deprecated(
|
|
deprecated_in="6.0.0rc0",
|
|
current_version=get_version(),
|
|
details="Use insert_packet instead",
|
|
)
|
|
def pass_packet(self, packet: GenericPduPacket):
|
|
self.insert_packet(packet)
|
|
|
|
def insert_packet(self, packet: GenericPduPacket):
|
|
"""This function routes the packets based on PDU type and directive type if applicable.
|
|
|
|
The routing is based on section 4.5 of the CFDP standard which specifies the PDU forwarding
|
|
procedure."""
|
|
if get_packet_destination(packet) == PacketDestination.DEST_HANDLER:
|
|
self.dest_handler.insert_packet(packet)
|
|
elif get_packet_destination(packet) == PacketDestination.SOURCE_HANDLER:
|
|
self.source_handler.insert_packet(packet) # type: ignore
|
|
|
|
|
|
class CfdpInCcsdsHandler:
|
|
"""Wrapper helper type used to wrap PDU packets into CCSDS packets and to extract PDU
|
|
packets from CCSDS packets.
|
|
|
|
:param cfg: Local CFDP entity configuration.
|
|
:param user: User wrapper. This contains the indication callback implementations and the
|
|
virtual filestore implementation.
|
|
:param cfdp_seq_cnt_provider: Every CFDP file transfer has a transaction sequence number.
|
|
This provider is used to retrieve that sequence number.
|
|
:param ccsds_seq_cnt_provider: Each CFDP PDU is wrapped into a CCSDS space packet, and each
|
|
space packet has a dedicated sequence count. This provider is used to retrieve the
|
|
sequence count.
|
|
:param ccsds_apid: APID to use for the CCSDS space packet header wrapped around each PDU.
|
|
This is important so that the OBSW can distinguish between regular PUS packets and
|
|
CFDP packets."""
|
|
|
|
def __init__(
|
|
self,
|
|
cfg: LocalEntityCfg,
|
|
user: CfdpUserBase,
|
|
remote_cfg_table: RemoteEntityCfgTable,
|
|
ccsds_apid: int,
|
|
cfdp_seq_cnt_provider: ProvidesSeqCount,
|
|
ccsds_seq_cnt_provider: ProvidesSeqCount,
|
|
check_timer_provider: CheckTimerProvider,
|
|
):
|
|
self.cfdp_handler = CfdpHandler(
|
|
cfg=cfg,
|
|
user=user,
|
|
seq_cnt_provider=cfdp_seq_cnt_provider,
|
|
remote_cfg_table=remote_cfg_table,
|
|
check_timer_provider=check_timer_provider,
|
|
)
|
|
self.ccsds_seq_cnt_provider = ccsds_seq_cnt_provider
|
|
self.ccsds_apid = ccsds_apid
|
|
|
|
def put_request_pending(self):
|
|
return self.cfdp_handler.put_request_pending()
|
|
|
|
def state_machine(self):
|
|
self.source_handler.state_machine()
|
|
self.dest_handler.state_machine()
|
|
|
|
@deprecation.deprecated(
|
|
deprecated_in="6.0.0rc1",
|
|
current_version=get_version(),
|
|
details="Use state_machine instead",
|
|
)
|
|
def fsm(self):
|
|
self.state_machine()
|
|
|
|
@property
|
|
def source_handler(self):
|
|
return self.cfdp_handler.source_handler
|
|
|
|
@property
|
|
def dest_handler(self):
|
|
return self.cfdp_handler.dest_handler
|
|
|
|
def pull_next_source_packet(self) -> Optional[Tuple[PduHolder, SpacePacket]]:
|
|
"""Retrieves the next PDU to send and wraps it into a space packet"""
|
|
next_packet = self.cfdp_handler.pull_next_source_packet()
|
|
if next_packet is None:
|
|
return next_packet
|
|
sp_header = SpacePacketHeader(
|
|
packet_type=PacketType.TC,
|
|
apid=self.ccsds_apid,
|
|
seq_count=self.ccsds_seq_cnt_provider.get_and_increment(),
|
|
data_len=next_packet.packet_len - 1,
|
|
)
|
|
return next_packet, SpacePacket(sp_header, None, next_packet.pack())
|
|
|
|
def pull_next_dest_packet(self) -> Optional[Tuple[PduHolder, SpacePacket]]:
|
|
"""Retrieves the next PDU to send and wraps it into a space packet"""
|
|
next_packet = self.cfdp_handler.pull_next_dest_packet()
|
|
if next_packet is None:
|
|
return next_packet
|
|
sp_header = SpacePacketHeader(
|
|
packet_type=PacketType.TC,
|
|
apid=self.ccsds_apid,
|
|
seq_count=self.ccsds_seq_cnt_provider.get_and_increment(),
|
|
data_len=next_packet.packet_len - 1,
|
|
)
|
|
return next_packet, SpacePacket(sp_header, None, next_packet.pack())
|
|
|
|
@deprecation.deprecated(
|
|
deprecated_in="6.0.0rc1",
|
|
current_version=get_version(),
|
|
details="Use insert_space_packet instead",
|
|
)
|
|
def pass_space_packet(self, space_packet: SpacePacket):
|
|
self.insert_space_packet(space_packet)
|
|
|
|
def insert_space_packet(self, space_packet: SpacePacket) -> bool:
|
|
if space_packet.user_data is None:
|
|
raise ValueError(
|
|
"space packet is empty, expected packet containing a CFDP PDU"
|
|
)
|
|
# Unwrap the user data and pass it to the handler
|
|
pdu_raw = space_packet.user_data
|
|
pdu_base = PduFactory.from_raw(pdu_raw)
|
|
if pdu_base:
|
|
self.insert_pdu_packet(pdu_base)
|
|
return True
|
|
return False
|
|
|
|
def insert_pdu_packet(self, pdu: GenericPduPacket):
|
|
self.cfdp_handler.insert_packet(pdu)
|
|
|
|
@deprecation.deprecated(
|
|
deprecated_in="6.0.0rc1",
|
|
current_version=get_version(),
|
|
details="Use insert_pdu_packet instead",
|
|
)
|
|
def pass_pdu_packet(self, pdu_base: GenericPduPacket):
|
|
self.insert_pdu_packet(pdu_base)
|
|
|
|
def __iter__(self):
|
|
return self
|
|
|
|
def __next__(
|
|
self,
|
|
) -> Tuple[
|
|
Optional[Tuple[PduHolder, SpacePacket]],
|
|
Optional[Tuple[PduHolder, SpacePacket]],
|
|
]:
|
|
"""The iterator for this class will returns a tuple of optional PDUs wrapped b a
|
|
:py:class:`PduHolder`.
|
|
|
|
:return: Can be a tuple where the first entry can hold a source packet and the second entry
|
|
can be a destination packet. If both packets are None, a StopIteration will be raised.
|
|
"""
|
|
next_source_tuple = self.pull_next_source_packet()
|
|
next_dest_tuple = self.pull_next_dest_packet()
|
|
if not next_source_tuple and not next_dest_tuple:
|
|
raise StopIteration
|
|
return next_source_tuple, next_dest_tuple
|