moved high level handler from tmtccmd to EIVE
All checks were successful
EIVE/-/pipeline/head This commit looks good
All checks were successful
EIVE/-/pipeline/head This commit looks good
This commit is contained in:
parent
0f6e7eb159
commit
6a8f48c493
259
eive_tmtc/cfdp/handler.py
Normal file
259
eive_tmtc/cfdp/handler.py
Normal file
@ -0,0 +1,259 @@
|
|||||||
|
from dataclasses import dataclass
|
||||||
|
from typing import Optional, Tuple
|
||||||
|
|
||||||
|
import deprecation
|
||||||
|
from spacepackets import PacketType, SpacePacket, SpacePacketHeader
|
||||||
|
from spacepackets.cfdp import GenericPduPacket, PduFactory
|
||||||
|
from spacepackets.cfdp.pdu import PduHolder
|
||||||
|
from tmtccmd.cfdp import (
|
||||||
|
CfdpUserBase,
|
||||||
|
LocalEntityCfg,
|
||||||
|
RemoteEntityCfgTable,
|
||||||
|
)
|
||||||
|
from tmtccmd.cfdp.defs import CfdpState
|
||||||
|
from tmtccmd.cfdp.handler import (
|
||||||
|
DestHandler,
|
||||||
|
DestStateWrapper,
|
||||||
|
SourceHandler,
|
||||||
|
SourceStateWrapper,
|
||||||
|
)
|
||||||
|
from tmtccmd.cfdp.handler.common import PacketDestination, get_packet_destination
|
||||||
|
from tmtccmd.cfdp.mib import CheckTimerProvider
|
||||||
|
from tmtccmd.cfdp.request import PutRequest
|
||||||
|
from tmtccmd.util import ProvidesSeqCount
|
||||||
|
from tmtccmd.version import get_version
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class StateWrapper:
|
||||||
|
source_handler_state = SourceStateWrapper()
|
||||||
|
dest_handler_state = DestStateWrapper()
|
||||||
|
|
||||||
|
|
||||||
|
class CfdpHandler:
|
||||||
|
"""Wrapper class which wraps both the :py:class:`tmtccmd.cfdp.handler.source.SourceHandler` and
|
||||||
|
:py:class:`tmtccmd.cfdp.handler.dest.DestHandler` in a sensible way.
|
||||||
|
|
||||||
|
If you have special requirements, for example you want to spawn a new destination handler
|
||||||
|
for each file copy transfer to allow multiple consecutive file transfers, it might be a good
|
||||||
|
idea to write a custom wrapper."""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
cfg: LocalEntityCfg,
|
||||||
|
user: CfdpUserBase,
|
||||||
|
seq_cnt_provider: ProvidesSeqCount,
|
||||||
|
remote_cfg_table: RemoteEntityCfgTable,
|
||||||
|
check_timer_provider: CheckTimerProvider,
|
||||||
|
):
|
||||||
|
self.remote_cfg_table = remote_cfg_table
|
||||||
|
self.dest_handler = DestHandler(
|
||||||
|
cfg=cfg,
|
||||||
|
user=user,
|
||||||
|
remote_cfg_table=remote_cfg_table,
|
||||||
|
check_timer_provider=check_timer_provider,
|
||||||
|
)
|
||||||
|
self.source_handler = SourceHandler(
|
||||||
|
cfg=cfg,
|
||||||
|
seq_num_provider=seq_cnt_provider,
|
||||||
|
user=user,
|
||||||
|
remote_cfg_table=remote_cfg_table,
|
||||||
|
check_timer_provider=check_timer_provider,
|
||||||
|
)
|
||||||
|
|
||||||
|
def put_request(self, request: PutRequest):
|
||||||
|
if not self.remote_cfg_table.get_cfg(request.destination_id):
|
||||||
|
raise ValueError(
|
||||||
|
f"No remote CFDP config found for entity ID {request.destination_id}"
|
||||||
|
)
|
||||||
|
self.source_handler.put_request(
|
||||||
|
request, self.remote_cfg_table.get_cfg(request.destination_id) # type: ignore
|
||||||
|
)
|
||||||
|
|
||||||
|
def pull_next_source_packet(self) -> Optional[PduHolder]:
|
||||||
|
res = self.source_handler.state_machine()
|
||||||
|
if res.states.num_packets_ready:
|
||||||
|
return self.source_handler.get_next_packet()
|
||||||
|
return None
|
||||||
|
|
||||||
|
def pull_next_dest_packet(self) -> Optional[PduHolder]:
|
||||||
|
res = self.dest_handler.state_machine()
|
||||||
|
if res.states.packets_ready:
|
||||||
|
return self.dest_handler.get_next_packet()
|
||||||
|
return None
|
||||||
|
|
||||||
|
def __iter__(self):
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __next__(self) -> Tuple[Optional[PduHolder], Optional[PduHolder]]:
|
||||||
|
"""The iterator for this class will returns a tuple of optional PDUs wrapped b a
|
||||||
|
:py:class:`PduHolder`.
|
||||||
|
|
||||||
|
:return: Can be a tuple where the first entry can hold a source packet and the second entry
|
||||||
|
can be a destination packet. If both packets are None, a StopIteration will be raised.
|
||||||
|
"""
|
||||||
|
next_source_packet = self.pull_next_source_packet()
|
||||||
|
next_dest_packet = self.pull_next_dest_packet()
|
||||||
|
if not next_dest_packet and not next_source_packet:
|
||||||
|
raise StopIteration
|
||||||
|
return next_source_packet, next_dest_packet
|
||||||
|
|
||||||
|
def put_request_pending(self) -> bool:
|
||||||
|
return self.source_handler.states.state != CfdpState.IDLE
|
||||||
|
|
||||||
|
@deprecation.deprecated(
|
||||||
|
deprecated_in="6.0.0rc0",
|
||||||
|
current_version=get_version(),
|
||||||
|
details="Use insert_packet instead",
|
||||||
|
)
|
||||||
|
def pass_packet(self, packet: GenericPduPacket):
|
||||||
|
self.insert_packet(packet)
|
||||||
|
|
||||||
|
def insert_packet(self, packet: GenericPduPacket):
|
||||||
|
"""This function routes the packets based on PDU type and directive type if applicable.
|
||||||
|
|
||||||
|
The routing is based on section 4.5 of the CFDP standard which specifies the PDU forwarding
|
||||||
|
procedure."""
|
||||||
|
if get_packet_destination(packet) == PacketDestination.DEST_HANDLER:
|
||||||
|
self.dest_handler.insert_packet(packet)
|
||||||
|
elif get_packet_destination(packet) == PacketDestination.SOURCE_HANDLER:
|
||||||
|
self.source_handler.insert_packet(packet) # type: ignore
|
||||||
|
|
||||||
|
|
||||||
|
class CfdpInCcsdsHandler:
|
||||||
|
"""Wrapper helper type used to wrap PDU packets into CCSDS packets and to extract PDU
|
||||||
|
packets from CCSDS packets.
|
||||||
|
|
||||||
|
:param cfg: Local CFDP entity configuration.
|
||||||
|
:param user: User wrapper. This contains the indication callback implementations and the
|
||||||
|
virtual filestore implementation.
|
||||||
|
:param cfdp_seq_cnt_provider: Every CFDP file transfer has a transaction sequence number.
|
||||||
|
This provider is used to retrieve that sequence number.
|
||||||
|
:param ccsds_seq_cnt_provider: Each CFDP PDU is wrapped into a CCSDS space packet, and each
|
||||||
|
space packet has a dedicated sequence count. This provider is used to retrieve the
|
||||||
|
sequence count.
|
||||||
|
:param ccsds_apid: APID to use for the CCSDS space packet header wrapped around each PDU.
|
||||||
|
This is important so that the OBSW can distinguish between regular PUS packets and
|
||||||
|
CFDP packets."""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
cfg: LocalEntityCfg,
|
||||||
|
user: CfdpUserBase,
|
||||||
|
remote_cfg_table: RemoteEntityCfgTable,
|
||||||
|
ccsds_apid: int,
|
||||||
|
cfdp_seq_cnt_provider: ProvidesSeqCount,
|
||||||
|
ccsds_seq_cnt_provider: ProvidesSeqCount,
|
||||||
|
check_timer_provider: CheckTimerProvider,
|
||||||
|
):
|
||||||
|
self.cfdp_handler = CfdpHandler(
|
||||||
|
cfg=cfg,
|
||||||
|
user=user,
|
||||||
|
seq_cnt_provider=cfdp_seq_cnt_provider,
|
||||||
|
remote_cfg_table=remote_cfg_table,
|
||||||
|
check_timer_provider=check_timer_provider,
|
||||||
|
)
|
||||||
|
self.ccsds_seq_cnt_provider = ccsds_seq_cnt_provider
|
||||||
|
self.ccsds_apid = ccsds_apid
|
||||||
|
|
||||||
|
def put_request_pending(self):
|
||||||
|
return self.cfdp_handler.put_request_pending()
|
||||||
|
|
||||||
|
def state_machine(self):
|
||||||
|
self.source_handler.state_machine()
|
||||||
|
self.dest_handler.state_machine()
|
||||||
|
|
||||||
|
@deprecation.deprecated(
|
||||||
|
deprecated_in="6.0.0rc1",
|
||||||
|
current_version=get_version(),
|
||||||
|
details="Use state_machine instead",
|
||||||
|
)
|
||||||
|
def fsm(self):
|
||||||
|
self.state_machine()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def source_handler(self):
|
||||||
|
return self.cfdp_handler.source_handler
|
||||||
|
|
||||||
|
@property
|
||||||
|
def dest_handler(self):
|
||||||
|
return self.cfdp_handler.dest_handler
|
||||||
|
|
||||||
|
def pull_next_source_packet(self) -> Optional[Tuple[PduHolder, SpacePacket]]:
|
||||||
|
"""Retrieves the next PDU to send and wraps it into a space packet"""
|
||||||
|
next_packet = self.cfdp_handler.pull_next_source_packet()
|
||||||
|
if next_packet is None:
|
||||||
|
return next_packet
|
||||||
|
sp_header = SpacePacketHeader(
|
||||||
|
packet_type=PacketType.TC,
|
||||||
|
apid=self.ccsds_apid,
|
||||||
|
seq_count=self.ccsds_seq_cnt_provider.get_and_increment(),
|
||||||
|
data_len=next_packet.packet_len - 1,
|
||||||
|
)
|
||||||
|
return next_packet, SpacePacket(sp_header, None, next_packet.pack())
|
||||||
|
|
||||||
|
def pull_next_dest_packet(self) -> Optional[Tuple[PduHolder, SpacePacket]]:
|
||||||
|
"""Retrieves the next PDU to send and wraps it into a space packet"""
|
||||||
|
next_packet = self.cfdp_handler.pull_next_dest_packet()
|
||||||
|
if next_packet is None:
|
||||||
|
return next_packet
|
||||||
|
sp_header = SpacePacketHeader(
|
||||||
|
packet_type=PacketType.TC,
|
||||||
|
apid=self.ccsds_apid,
|
||||||
|
seq_count=self.ccsds_seq_cnt_provider.get_and_increment(),
|
||||||
|
data_len=next_packet.packet_len - 1,
|
||||||
|
)
|
||||||
|
return next_packet, SpacePacket(sp_header, None, next_packet.pack())
|
||||||
|
|
||||||
|
@deprecation.deprecated(
|
||||||
|
deprecated_in="6.0.0rc1",
|
||||||
|
current_version=get_version(),
|
||||||
|
details="Use insert_space_packet instead",
|
||||||
|
)
|
||||||
|
def pass_space_packet(self, space_packet: SpacePacket):
|
||||||
|
self.insert_space_packet(space_packet)
|
||||||
|
|
||||||
|
def insert_space_packet(self, space_packet: SpacePacket) -> bool:
|
||||||
|
if space_packet.user_data is None:
|
||||||
|
raise ValueError(
|
||||||
|
"space packet is empty, expected packet containing a CFDP PDU"
|
||||||
|
)
|
||||||
|
# Unwrap the user data and pass it to the handler
|
||||||
|
pdu_raw = space_packet.user_data
|
||||||
|
pdu_base = PduFactory.from_raw(pdu_raw)
|
||||||
|
if pdu_base:
|
||||||
|
self.insert_pdu_packet(pdu_base)
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
def insert_pdu_packet(self, pdu: GenericPduPacket):
|
||||||
|
self.cfdp_handler.insert_packet(pdu)
|
||||||
|
|
||||||
|
@deprecation.deprecated(
|
||||||
|
deprecated_in="6.0.0rc1",
|
||||||
|
current_version=get_version(),
|
||||||
|
details="Use insert_pdu_packet instead",
|
||||||
|
)
|
||||||
|
def pass_pdu_packet(self, pdu_base: GenericPduPacket):
|
||||||
|
self.insert_pdu_packet(pdu_base)
|
||||||
|
|
||||||
|
def __iter__(self):
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __next__(
|
||||||
|
self,
|
||||||
|
) -> Tuple[
|
||||||
|
Optional[Tuple[PduHolder, SpacePacket]],
|
||||||
|
Optional[Tuple[PduHolder, SpacePacket]],
|
||||||
|
]:
|
||||||
|
"""The iterator for this class will returns a tuple of optional PDUs wrapped b a
|
||||||
|
:py:class:`PduHolder`.
|
||||||
|
|
||||||
|
:return: Can be a tuple where the first entry can hold a source packet and the second entry
|
||||||
|
can be a destination packet. If both packets are None, a StopIteration will be raised.
|
||||||
|
"""
|
||||||
|
next_source_tuple = self.pull_next_source_packet()
|
||||||
|
next_dest_tuple = self.pull_next_dest_packet()
|
||||||
|
if not next_source_tuple and not next_dest_tuple:
|
||||||
|
raise StopIteration
|
||||||
|
return next_source_tuple, next_dest_tuple
|
Loading…
Reference in New Issue
Block a user