building infastructure to build queue

This commit is contained in:
Robin Müller 2022-09-13 11:59:29 +02:00
parent e9aaa5502d
commit a3d1808457
No known key found for this signature in database
GPG Key ID: 11D4952C8CCEF814

View File

@ -1,7 +1,7 @@
import logging import logging
import sys import sys
from pathlib import Path from pathlib import Path
from typing import Optional, Sequence from typing import Optional, Sequence, Tuple
from spacepackets import SpacePacket, SpacePacketHeader, PacketTypes from spacepackets import SpacePacket, SpacePacketHeader, PacketTypes
from spacepackets.cfdp import ( from spacepackets.cfdp import (
@ -178,7 +178,20 @@ class CfdpCcsdsWrapper(SpecificApidHandlerBase):
self.ccsds_seq_cnt_provider = ccsds_seq_cnt_provider self.ccsds_seq_cnt_provider = ccsds_seq_cnt_provider
self.ccsds_apid = ccsds_apid self.ccsds_apid = ccsds_apid
def pull_next_dest_packet(self) -> Optional[SpacePacket]: def pull_next_source_packet(self) -> Optional[Tuple[PduHolder, SpacePacket]]:
"""Retrieves the next PDU to send and wraps it into a space packet"""
next_packet = self.handler.pull_next_source_packet()
if next_packet is None:
return next_packet
sp_header = SpacePacketHeader(
packet_type=PacketTypes.TC,
apid=self.ccsds_apid,
seq_count=self.ccsds_seq_cnt_provider.get_and_increment(),
data_len=next_packet.packet_len - 1,
)
return next_packet, SpacePacket(sp_header, None, next_packet.pack())
def pull_next_dest_packet(self) -> Optional[Tuple[PduHolder, SpacePacket]]:
"""Retrieves the next PDU to send and wraps it into a space packet""" """Retrieves the next PDU to send and wraps it into a space packet"""
next_packet = self.handler.pull_next_dest_packet() next_packet = self.handler.pull_next_dest_packet()
if next_packet is None: if next_packet is None:
@ -189,11 +202,35 @@ class CfdpCcsdsWrapper(SpecificApidHandlerBase):
seq_count=self.ccsds_seq_cnt_provider.get_and_increment(), seq_count=self.ccsds_seq_cnt_provider.get_and_increment(),
data_len=next_packet.packet_len - 1, data_len=next_packet.packet_len - 1,
) )
return SpacePacket(sp_header, None, next_packet.pack()) return next_packet, SpacePacket(sp_header, None, next_packet.pack())
def confirm_dest_packet_sent(self): def confirm_dest_packet_sent(self):
self.handler.confirm_dest_packet_sent() self.handler.confirm_dest_packet_sent()
def confirm_source_packet_sent(self):
self.handler.confirm_source_packet_sent()
def __iter__(self):
return self
def __next__(
self,
) -> (
Optional[Tuple[PduHolder, SpacePacket]],
Optional[Tuple[PduHolder, SpacePacket]],
):
"""The iterator for this class will returns a tuple of optional PDUs wrapped b a
:py:class:`PduHolder`.
:return: Can be a tuple where the first entry can hold a source packet and the second entry
can be a destination packet. If both packets are None, a StopIteration will be raised.
"""
next_source_tuple = self.pull_next_source_packet()
next_dest_tuple = self.pull_next_dest_packet()
if not next_source_tuple and not next_dest_tuple:
raise StopIteration
return next_source_tuple, next_dest_tuple
def pass_packet(self, space_packet: SpacePacket): def pass_packet(self, space_packet: SpacePacket):
# Unwrap the user data and pass it to the handler # Unwrap the user data and pass it to the handler
pdu_raw = space_packet.user_data pdu_raw = space_packet.user_data
@ -230,18 +267,43 @@ class CfdpHandler:
request, self.remote_cfg_table.get_cfg(request.cfg.destination_id) request, self.remote_cfg_table.get_cfg(request.cfg.destination_id)
) )
def pull_next_source_packet(self) -> Optional[PduHolder]:
res = self.source_handler.state_machine()
if res.states.packet_ready:
return self.source_handler.pdu_holder
return None
def pull_next_dest_packet(self) -> Optional[PduHolder]: def pull_next_dest_packet(self) -> Optional[PduHolder]:
res = self.dest_handler.state_machine() res = self.dest_handler.state_machine()
if res.states.packet_ready: if res.states.packet_ready:
return self.dest_handler.pdu_holder return self.dest_handler.pdu_holder
return None return None
def __iter__(self):
return self
def __next__(self) -> (Optional[PduHolder], Optional[PduHolder]):
"""The iterator for this class will returns a tuple of optional PDUs wrapped b a
:py:class:`PduHolder`.
:return: Can be a tuple where the first entry can hold a source packet and the second entry
can be a destination packet. If both packets are None, a StopIteration will be raised.
"""
next_source_packet = self.pull_next_source_packet()
next_dest_packet = self.pull_next_dest_packet()
if not next_dest_packet and not next_source_packet:
raise StopIteration
return next_source_packet, next_dest_packet
def put_request_pending(self) -> bool: def put_request_pending(self) -> bool:
return self.source_handler.states.state != CfdpStates.IDLE return self.source_handler.states.state != CfdpStates.IDLE
def confirm_dest_packet_sent(self): def confirm_dest_packet_sent(self):
self.dest_handler.confirm_packet_sent_advance_fsm() self.dest_handler.confirm_packet_sent_advance_fsm()
def confirm_source_packet_sent(self):
self.source_handler.confirm_packet_sent_advance_fsm()
def pass_packet(self, packet: GenericPduPacket): def pass_packet(self, packet: GenericPduPacket):
"""This function routes the packets based on PDU type and directive type if applicable. """This function routes the packets based on PDU type and directive type if applicable.
@ -311,6 +373,7 @@ class TcHandler(TcHandlerBase):
raw_logger: RawTmtcTimedLogWrapper, raw_logger: RawTmtcTimedLogWrapper,
): ):
super().__init__() super().__init__()
self.cfdp_handler_started = False
self.cfdp_dest_id = CFDP_REMOTE_ENTITY_ID self.cfdp_dest_id = CFDP_REMOTE_ENTITY_ID
self.seq_count_provider = seq_count_provider self.seq_count_provider = seq_count_provider
self.pus_verificator = pus_verificator self.pus_verificator = pus_verificator
@ -356,12 +419,32 @@ class TcHandler(TcHandlerBase):
def handle_cfdp_procedure(self, info: ProcedureWrapper): def handle_cfdp_procedure(self, info: ProcedureWrapper):
cfdp_procedure = info.to_cfdp_procedure() cfdp_procedure = info.to_cfdp_procedure()
if cfdp_procedure.cfdp_request_type == CfdpRequestType.PUT: if cfdp_procedure.cfdp_request_type == CfdpRequestType.PUT:
if not self.cfdp_in_ccsds_wrapper.handler.put_request_pending(): if (
not self.cfdp_in_ccsds_wrapper.handler.put_request_pending()
and not self.cfdp_handler_started
):
put_req = cfdp_procedure.request_wrapper.to_put_request() put_req = cfdp_procedure.request_wrapper.to_put_request()
put_req.cfg.destination_id = self.cfdp_dest_id put_req.cfg.destination_id = self.cfdp_dest_id
LOGGER.info(f"Starting put request with parameters:\n{put_req}") LOGGER.info(f"Starting put request with parameters:\n{put_req}")
self.cfdp_in_ccsds_wrapper.handler.put_request(put_req) self.cfdp_in_ccsds_wrapper.handler.put_request(put_req)
pass self.cfdp_handler_started = True
(
pdu_holder,
packet,
) = self.cfdp_in_ccsds_wrapper.pull_next_source_packet()
print(pdu_holder)
self.cfdp_in_ccsds_wrapper.confirm_source_packet_sent()
self.queue_helper.add_ccsds_tc(packet)
(
pdu_holder,
packet,
) = self.cfdp_in_ccsds_wrapper.pull_next_source_packet()
print(pdu_holder)
self.cfdp_in_ccsds_wrapper.confirm_source_packet_sent()
self.queue_helper.add_ccsds_tc(packet)
# for sp in self.cfdp_in_ccsds_wrapper:
# print(sp)
# self.queue_helper.add_ccsds_tc(sp)
pass pass
def send_cb(self, params: SendCbParams): def send_cb(self, params: SendCbParams):