moved a lot of code to tmtccmd

This commit is contained in:
Robin Müller 2022-09-14 19:01:47 +02:00
parent 3787164cce
commit 51871a887f
No known key found for this signature in database
GPG Key ID: 11D4952C8CCEF814
1 changed files with 21 additions and 210 deletions

231
common.py
View File

@ -1,43 +1,35 @@
import logging
import sys
from pathlib import Path
from typing import Optional, Sequence, Tuple, cast
from typing import cast
from spacepackets import SpacePacket, SpacePacketHeader, PacketTypes
from spacepackets import SpacePacket, SpacePacketHeader
from spacepackets.cfdp import (
TransmissionModes,
PduType,
DirectiveType,
GenericPduPacket,
PduHolder,
PduFactory,
ChecksumTypes,
ConditionCode,
)
from spacepackets.cfdp.pdu import MetadataPdu, FileDataPdu
from tmtccmd.cfdp import (
RemoteEntityCfgTable,
RemoteEntityCfg,
LocalEntityCfg,
CfdpUserBase,
IndicationCfg,
TransactionId,
)
from tmtccmd.cfdp.defs import CfdpRequestType, CfdpStates
from tmtccmd.cfdp.handler import SourceHandler, DestHandler
from tmtccmd.cfdp.defs import CfdpRequestType
from tmtccmd.cfdp.handler import CfdpInCcsdsHandler
from tmtccmd.cfdp.mib import DefaultFaultHandlerBase
from tmtccmd.cfdp.request import PutRequest
from tmtccmd.cfdp.user import (
FileSegmentRecvdParams,
MetadataRecvParams,
TransactionFinishedParams,
)
from tmtccmd.config.args import ProcedureParamsWrapper
from tmtccmd.core import BackendState
from tmtccmd.logging import get_current_time_string
from tmtccmd.pus.pus_11_tc_sched import Subservices as Pus11Subservices
from tmtccmd.tc.queue import DefaultPusQueueHelper
from tmtccmd.util import FileSeqCountProvider, PusFileSeqCountProvider, ProvidesSeqCount
from tmtccmd.util import FileSeqCountProvider, PusFileSeqCountProvider
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter
try:
@ -73,7 +65,6 @@ from tmtccmd import (
TcHandlerBase,
get_console_logger,
TmTcCfgHookBase,
BackendBase,
CcsdsTmtcBackend,
)
from tmtccmd.pus import VerificationWrapper
@ -155,198 +146,17 @@ class ExampleCfdpUser(CfdpUserBase):
pass
class CfdpCcsdsWrapper(SpecificApidHandlerBase):
def __init__(
self,
cfg: LocalEntityCfg,
user: CfdpUserBase,
remote_cfgs: Sequence[RemoteEntityCfg],
ccsds_apid: int,
cfdp_seq_cnt_provider: ProvidesSeqCount,
ccsds_seq_cnt_provider: ProvidesSeqCount,
):
"""Wrapper helper type used to wrap PDU packets into CCSDS packets and to extract PDU
packets from CCSDS packets.
:param cfg: Local CFDP entity configuration.
:param user: User wrapper. This contains the indication callback implementations and the
virtual filestore implementation.
:param cfdp_seq_cnt_provider: Every CFDP file transfer has a transaction sequence number.
This provider is used to retrieve that sequence number.
:param ccsds_seq_cnt_provider: Each CFDP PDU is wrapped into a CCSDS space packet, and each
space packet has a dedicated sequence count. This provider is used to retrieve the
sequence count.
:param ccsds_apid: APID to use for the CCSDS space packet header wrapped around each PDU.
This is important so that the OBSW can distinguish between regular PUS packets and
CFDP packets.
"""
class CfdpInCcsdsWrapper(SpecificApidHandlerBase):
def __init__(self, cfdp_in_ccsds_handler: CfdpInCcsdsHandler):
super().__init__(EXAMPLE_CFDP_APID, None)
self.handler = CfdpHandler(cfg, user, cfdp_seq_cnt_provider, remote_cfgs)
self.ccsds_seq_cnt_provider = ccsds_seq_cnt_provider
self.ccsds_apid = ccsds_apid
def pull_next_source_packet(self) -> Optional[Tuple[PduHolder, SpacePacket]]:
"""Retrieves the next PDU to send and wraps it into a space packet"""
next_packet = self.handler.pull_next_source_packet()
if next_packet is None:
return next_packet
sp_header = SpacePacketHeader(
packet_type=PacketTypes.TC,
apid=self.ccsds_apid,
seq_count=self.ccsds_seq_cnt_provider.get_and_increment(),
data_len=next_packet.packet_len - 1,
)
return next_packet, SpacePacket(sp_header, None, next_packet.pack())
def pull_next_dest_packet(self) -> Optional[Tuple[PduHolder, SpacePacket]]:
"""Retrieves the next PDU to send and wraps it into a space packet"""
next_packet = self.handler.pull_next_dest_packet()
if next_packet is None:
return next_packet
sp_header = SpacePacketHeader(
packet_type=PacketTypes.TC,
apid=self.ccsds_apid,
seq_count=self.ccsds_seq_cnt_provider.get_and_increment(),
data_len=next_packet.packet_len - 1,
)
return next_packet, SpacePacket(sp_header, None, next_packet.pack())
def confirm_dest_packet_sent(self):
self.handler.confirm_dest_packet_sent()
def confirm_source_packet_sent(self):
self.handler.confirm_source_packet_sent()
def __iter__(self):
return self
def __next__(
self,
) -> (
Optional[Tuple[PduHolder, SpacePacket]],
Optional[Tuple[PduHolder, SpacePacket]],
):
"""The iterator for this class will returns a tuple of optional PDUs wrapped b a
:py:class:`PduHolder`.
:return: Can be a tuple where the first entry can hold a source packet and the second entry
can be a destination packet. If both packets are None, a StopIteration will be raised.
"""
next_source_tuple = self.pull_next_source_packet()
next_dest_tuple = self.pull_next_dest_packet()
if not next_source_tuple and not next_dest_tuple:
raise StopIteration
return next_source_tuple, next_dest_tuple
def pass_packet(self, space_packet: SpacePacket):
# Unwrap the user data and pass it to the handler
pdu_raw = space_packet.user_data
pdu_base = PduFactory.from_raw(pdu_raw)
self.handler.pass_packet(pdu_base)
self.handler = cfdp_in_ccsds_handler
def handle_tm(self, packet: bytes, _user_args: any):
ccsds_header_raw = packet[0:6]
sp_header = SpacePacketHeader.unpack(ccsds_header_raw)
pdu = packet[6:]
sp = SpacePacket(sp_header, sec_header=None, user_data=pdu)
self.pass_packet(sp)
class CfdpHandler:
def __init__(
self,
cfg: LocalEntityCfg,
user: CfdpUserBase,
seq_cnt_provider: ProvidesSeqCount,
remote_cfgs: Sequence[RemoteEntityCfg],
):
self.remote_cfg_table = RemoteEntityCfgTable()
self.remote_cfg_table.add_configs(remote_cfgs)
self.dest_handler = DestHandler(cfg, user, self.remote_cfg_table)
self.source_handler = SourceHandler(cfg, seq_cnt_provider, user)
def put_request(self, request: PutRequest):
if not self.remote_cfg_table.get_cfg(request.cfg.destination_id):
raise ValueError(
f"No remote CFDP config found for entity ID {request.cfg.destination_id}"
)
self.source_handler.put_request(
request, self.remote_cfg_table.get_cfg(request.cfg.destination_id)
)
def pull_next_source_packet(self) -> Optional[PduHolder]:
res = self.source_handler.state_machine()
if res.states.packet_ready:
return self.source_handler.pdu_holder
return None
def pull_next_dest_packet(self) -> Optional[PduHolder]:
res = self.dest_handler.state_machine()
if res.states.packet_ready:
return self.dest_handler.pdu_holder
return None
def __iter__(self):
return self
def __next__(self) -> (Optional[PduHolder], Optional[PduHolder]):
"""The iterator for this class will returns a tuple of optional PDUs wrapped b a
:py:class:`PduHolder`.
:return: Can be a tuple where the first entry can hold a source packet and the second entry
can be a destination packet. If both packets are None, a StopIteration will be raised.
"""
next_source_packet = self.pull_next_source_packet()
next_dest_packet = self.pull_next_dest_packet()
if not next_dest_packet and not next_source_packet:
raise StopIteration
return next_source_packet, next_dest_packet
def put_request_pending(self) -> bool:
return self.source_handler.states.state != CfdpStates.IDLE
def confirm_dest_packet_sent(self):
self.dest_handler.confirm_packet_sent_advance_fsm()
def confirm_source_packet_sent(self):
self.source_handler.confirm_packet_sent_advance_fsm()
def pass_packet(self, packet: GenericPduPacket):
"""This function routes the packets based on PDU type and directive type if applicable.
The routing is based on section 4.5 of the CFDP standard whcih specifies the PDU forwarding
procedure.
"""
if packet.pdu_type == PduType.FILE_DATA:
self.dest_handler.pass_packet(packet)
else:
if packet.directive_type in [
DirectiveType.METADATA_PDU,
DirectiveType.EOF_PDU,
DirectiveType.PROMPT_PDU,
]:
# Section b) of 4.5.3: These PDUs should always be targeted towards the file
# receiver a.k.a. the destination handler
self.dest_handler.pass_packet(packet)
elif packet.directive_type in [
DirectiveType.FINISHED_PDU,
DirectiveType.NAK_PDU,
DirectiveType.KEEP_ALIVE_PDU,
]:
# Section c) of 4.5.3: These PDUs should always be targeted towards the file sender
# a.k.a. the source handler
self.source_handler.pass_packet(packet)
elif packet.directive_type == DirectiveType.ACK_PDU:
# Section a): Recipient depends on the type of PDU that is being acknowledged.
# We can simply extract the PDU type from the raw stream. If it is an EOF PDU,
# this packet is passed to the source handler. For a finished PDU, it is
# passed to the destination handler
pdu_holder = PduHolder(packet)
ack_pdu = pdu_holder.to_ack_pdu()
if ack_pdu.directive_code_of_acked_pdu == DirectiveType.EOF_PDU:
self.source_handler.pass_packet(packet)
elif ack_pdu.directive_code_of_acked_pdu == DirectiveType.FINISHED_PDU:
self.dest_handler.pass_packet(packet)
self.handler.pass_packet(sp)
class PusHandler(SpecificApidHandlerBase):
@ -374,7 +184,7 @@ class TcHandler(TcHandlerBase):
def __init__(
self,
seq_count_provider: FileSeqCountProvider,
cfdp_in_ccsds_wrapper: CfdpCcsdsWrapper,
cfdp_in_ccsds_wrapper: CfdpInCcsdsWrapper,
pus_verificator: PusVerificator,
file_logger: logging.Logger,
raw_logger: RawTmtcTimedLogWrapper,
@ -436,14 +246,14 @@ class TcHandler(TcHandlerBase):
LOGGER.info(
f"CFDP: Starting file put request with parameters:\n{put_req}"
)
self.cfdp_in_ccsds_wrapper.handler.put_request(put_req)
self.cfdp_in_ccsds_wrapper.handler.cfdp_handler.put_request(put_req)
self.cfdp_handler_started = True
(
pdu_holder,
packet,
) = self.cfdp_in_ccsds_wrapper.pull_next_source_packet()
) = self.cfdp_in_ccsds_wrapper.handler.pull_next_source_packet()
self.queue_helper.add_ccsds_tc(packet)
self.cfdp_in_ccsds_wrapper.confirm_source_packet_sent()
self.cfdp_in_ccsds_wrapper.handler.confirm_source_packet_sent()
metadata = cast(MetadataPdu, pdu_holder.to_metadata_pdu())
self.queue_helper.add_log_cmd(
f"CFDP Source: Sending Metadata PDU for file with size {metadata.file_size}"
@ -451,10 +261,10 @@ class TcHandler(TcHandlerBase):
(
pdu_holder,
packet,
) = self.cfdp_in_ccsds_wrapper.pull_next_source_packet()
) = self.cfdp_in_ccsds_wrapper.handler.pull_next_source_packet()
fd_pdu = cast(FileDataPdu, pdu_holder.to_file_data_pdu())
self.queue_helper.add_ccsds_tc(packet)
self.cfdp_in_ccsds_wrapper.confirm_source_packet_sent()
self.cfdp_in_ccsds_wrapper.handler.confirm_source_packet_sent()
self.queue_helper.add_log_cmd(
f"CFDP Source: Sending File Data PDU for segment at offset {fd_pdu.offset} "
f"with length {len(fd_pdu.file_data)}"
@ -462,11 +272,11 @@ class TcHandler(TcHandlerBase):
(
pdu_holder,
packet,
) = self.cfdp_in_ccsds_wrapper.pull_next_source_packet()
) = self.cfdp_in_ccsds_wrapper.handler.pull_next_source_packet()
self.queue_helper.add_log_cmd(f"CFDP Source: Sending EOF PDU")
self.queue_helper.add_ccsds_tc(packet)
self.cfdp_in_ccsds_wrapper.confirm_source_packet_sent()
self.cfdp_in_ccsds_wrapper.handler.confirm_source_packet_sent()
self.cfdp_in_ccsds_wrapper.handler.source_handler.state_machine()
def send_cb(self, params: SendCbParams):
@ -561,7 +371,7 @@ def setup_tmtc_handlers(
file_name=Path("seqcnt_cfdp_ccsds_.txt")
)
cfdp_user = ExampleCfdpUser()
cfdp_in_ccsds_handler = CfdpCcsdsWrapper(
cfdp_in_ccsds_handler = CfdpInCcsdsHandler(
cfg=cfdp_cfg,
remote_cfgs=[remote_cfg],
ccsds_apid=EXAMPLE_CFDP_APID,
@ -569,6 +379,7 @@ def setup_tmtc_handlers(
cfdp_seq_cnt_provider=cfdp_seq_count_provider,
user=cfdp_user,
)
cfdp_in_ccsds_wrapper = CfdpInCcsdsWrapper(cfdp_in_ccsds_handler)
pus_handler = PusHandler(
printer=printer, raw_logger=raw_logger, wrapper=verif_wrapper
@ -582,7 +393,7 @@ def setup_tmtc_handlers(
seq_count_provider=PusFileSeqCountProvider(
file_name=Path("seqcnt_pus_ccsds.txt")
),
cfdp_in_ccsds_wrapper=cfdp_in_ccsds_handler,
cfdp_in_ccsds_wrapper=cfdp_in_ccsds_wrapper,
)
return ccsds_handler, tc_handler
@ -597,7 +408,7 @@ def setup_backend(
setup_wrapper=setup_wrapper,
tm_handler=ccsds_handler,
tc_handler=tc_handler,
init_procedure=init_proc.base,
init_procedure=init_proc,
)
tmtccmd.start(tmtc_backend=tmtc_backend, hook_obj=setup_wrapper.hook_obj)
return cast(CcsdsTmtcBackend, tmtc_backend)