Compare commits

..

1 Commits

Author SHA1 Message Date
04a2e3b166 re-worked TMTC modules
Some checks failed
Rust/sat-rs/pipeline/pr-main There was a failure building this commit
2024-04-15 11:18:51 +02:00
47 changed files with 616 additions and 829 deletions

View File

@@ -103,9 +103,7 @@ class PusHandler(GenericApidHandlerBase):
def handle_tm(self, apid: int, packet: bytes, _user_args: Any):
try:
pus_tm = PusTelemetry.unpack(
packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE
)
pus_tm = PusTelemetry.unpack(packet, time_reader=CdsShortTimestamp.empty())
except ValueError as e:
_LOGGER.warning("Could not generate PUS TM object from raw data")
_LOGGER.warning(f"Raw Packet: [{packet.hex(sep=',')}], REPR: {packet!r}")
@@ -113,7 +111,7 @@ class PusHandler(GenericApidHandlerBase):
service = pus_tm.service
if service == 1:
tm_packet = Service1Tm.unpack(
data=packet, params=UnpackParams(CdsShortTimestamp.TIMESTAMP_SIZE, 1, 2)
data=packet, params=UnpackParams(CdsShortTimestamp.empty(), 1, 2)
)
res = self.verif_wrapper.add_tm(tm_packet)
if res is None:
@@ -130,9 +128,7 @@ class PusHandler(GenericApidHandlerBase):
elif service == 3:
_LOGGER.info("No handling for HK packets implemented")
_LOGGER.info(f"Raw packet: 0x[{packet.hex(sep=',')}]")
pus_tm = PusTelemetry.unpack(
packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE
)
pus_tm = PusTelemetry.unpack(packet, time_reader=CdsShortTimestamp.empty())
if pus_tm.subservice == 25:
if len(pus_tm.source_data) < 8:
raise ValueError("No addressable ID in HK packet")
@@ -140,7 +136,7 @@ class PusHandler(GenericApidHandlerBase):
_LOGGER.info(json_str)
elif service == 5:
tm_packet = PusTelemetry.unpack(
packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE
packet, time_reader=CdsShortTimestamp.empty()
)
src_data = tm_packet.source_data
event_u32 = EventU32.unpack(src_data)
@@ -149,7 +145,7 @@ class PusHandler(GenericApidHandlerBase):
_LOGGER.info("Received test event")
elif service == 17:
tm_packet = Service17Tm.unpack(
packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE
packet, time_reader=CdsShortTimestamp.empty()
)
if tm_packet.subservice == 2:
self.file_logger.info("Received Ping Reply TM[17,2]")
@@ -166,7 +162,7 @@ class PusHandler(GenericApidHandlerBase):
f"The service {service} is not implemented in Telemetry Factory"
)
tm_packet = PusTelemetry.unpack(
packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE
packet, time_reader=CdsShortTimestamp.empty()
)
self.raw_logger.log_tm(pus_tm)
@@ -201,15 +197,15 @@ class TcHandler(TcHandlerBase):
_LOGGER.info(log_entry.log_str)
def queue_finished_cb(self, info: ProcedureWrapper):
if info.proc_type == TcProcedureType.TREE_COMMANDING:
def_proc = info.to_tree_commanding_procedure()
if info.proc_type == TcProcedureType.DEFAULT:
def_proc = info.to_def_procedure()
_LOGGER.info(f"Queue handling finished for command {def_proc.cmd_path}")
def feed_cb(self, info: ProcedureWrapper, wrapper: FeedWrapper):
q = self.queue_helper
q.queue_wrapper = wrapper.queue_wrapper
if info.proc_type == TcProcedureType.TREE_COMMANDING:
def_proc = info.to_tree_commanding_procedure()
if info.proc_type == TcProcedureType.DEFAULT:
def_proc = info.to_def_procedure()
assert def_proc.cmd_path is not None
pus_tc.pack_pus_telecommands(q, def_proc.cmd_path)
@@ -260,7 +256,6 @@ def main():
while True:
state = tmtc_backend.periodic_op(None)
if state.request == BackendRequest.TERMINATION_NO_ERROR:
tmtc_backend.close_com_if()
sys.exit(0)
elif state.request == BackendRequest.DELAY_IDLE:
_LOGGER.info("TMTC Client in IDLE mode")
@@ -275,7 +270,6 @@ def main():
elif state.request == BackendRequest.CALL_NEXT:
pass
except KeyboardInterrupt:
tmtc_backend.close_com_if()
sys.exit(0)

View File

@@ -1,2 +1,2 @@
tmtccmd == 8.0.0rc2
tmtccmd == 8.0.0rc1
# -e git+https://github.com/robamu-org/tmtccmd@97e5e51101a08b21472b3ddecc2063359f7e307a#egg=tmtccmd

View File

@@ -1,4 +1,3 @@
use std::time::Duration;
use std::{
collections::{HashSet, VecDeque},
fmt::Debug,
@@ -8,35 +7,14 @@ use std::{
use log::{info, warn};
use satrs::{
encoding::ccsds::{SpValidity, SpacePacketValidator},
hal::std::tcp_server::{HandledConnectionHandler, ServerConfig, TcpSpacepacketsServer},
spacepackets::{CcsdsPacket, PacketId},
spacepackets::PacketId,
tmtc::{PacketSenderRaw, PacketSource},
};
#[derive(Default)]
pub struct ConnectionFinishedHandler {}
pub struct SimplePacketValidator {
pub valid_ids: HashSet<PacketId>,
}
impl SpacePacketValidator for SimplePacketValidator {
fn validate(
&self,
sp_header: &satrs::spacepackets::SpHeader,
_raw_buf: &[u8],
) -> satrs::encoding::ccsds::SpValidity {
if self.valid_ids.contains(&sp_header.packet_id()) {
return SpValidity::Valid;
}
log::warn!("ignoring space packet with header {:?}", sp_header);
// We could perform a CRC check.. but lets keep this simple and assume that TCP ensures
// data integrity.
SpValidity::Skip
}
}
impl HandledConnectionHandler for ConnectionFinishedHandler {
fn handled_connection(&mut self, info: satrs::hal::std::tcp_server::HandledConnectionInfo) {
info!(
@@ -105,7 +83,7 @@ impl PacketSource for SyncTcpTmSource {
pub type TcpServer<ReceivesTc, SendError> = TcpSpacepacketsServer<
SyncTcpTmSource,
ReceivesTc,
SimplePacketValidator,
HashSet<PacketId>,
ConnectionFinishedHandler,
(),
SendError,
@@ -123,14 +101,14 @@ impl<TcSender: PacketSenderRaw<Error = SendError>, SendError: Debug + 'static>
cfg: ServerConfig,
tm_source: SyncTcpTmSource,
tc_sender: TcSender,
valid_ids: HashSet<PacketId>,
packet_id_lookup: HashSet<PacketId>,
) -> Result<Self, std::io::Error> {
Ok(Self(
TcpSpacepacketsServer::new(
cfg,
tm_source,
tc_sender,
SimplePacketValidator { valid_ids },
packet_id_lookup,
ConnectionFinishedHandler::default(),
None,
)?,
@@ -140,9 +118,7 @@ impl<TcSender: PacketSenderRaw<Error = SendError>, SendError: Debug + 'static>
pub fn periodic_operation(&mut self) {
loop {
let result = self
.0
.handle_all_connections(Some(Duration::from_millis(400)));
let result = self.0.handle_next_connection(None);
match result {
Ok(_conn_result) => (),
Err(e) => {

View File

@@ -3,7 +3,8 @@ use std::net::{SocketAddr, UdpSocket};
use std::sync::mpsc;
use log::{info, warn};
use satrs::tmtc::{PacketAsVec, PacketInPool, PacketSenderRaw};
use satrs::pus::{PacketAsVec, PacketInPool};
use satrs::tmtc::PacketSenderRaw;
use satrs::{
hal::std::udp_server::{ReceiveResult, UdpTcServer},
pool::{PoolProviderWithGuards, SharedStaticMemoryPool},
@@ -114,7 +115,6 @@ impl<
#[cfg(test)]
mod tests {
use std::net::Ipv4Addr;
use std::{
cell::RefCell,
collections::VecDeque,
@@ -183,7 +183,7 @@ mod tests {
#[test]
fn test_transactions() {
let sock_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0);
let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), 0);
let test_receiver = TestSender::default();
// let tc_queue = test_receiver.tc_vec.clone();
let udp_tc_server =
@@ -201,8 +201,8 @@ mod tests {
.unwrap();
let client = UdpSocket::bind("127.0.0.1:0").expect("Connecting to UDP server failed");
let client_addr = client.local_addr().unwrap();
println!("{}", server_addr);
client.send_to(&ping_tc, server_addr).unwrap();
client.connect(server_addr).unwrap();
client.send(&ping_tc).unwrap();
udp_dyn_server.periodic_operation();
{
let mut queue = udp_dyn_server.udp_tc_server.tc_sender.tc_vec.borrow_mut();

View File

@@ -12,13 +12,13 @@ use satrs::pus::verification::{
use satrs::pus::{
ActiveRequestProvider, EcssTcAndToken, EcssTcInMemConverter, EcssTcInSharedStoreConverter,
EcssTcInVecConverter, EcssTmSender, EcssTmtcError, GenericConversionError, MpscTcReceiver,
MpscTmAsVecSender, PusPacketHandlerResult, PusReplyHandler, PusServiceHelper,
MpscTmAsVecSender, PacketAsVec, PusPacketHandlerResult, PusReplyHandler, PusServiceHelper,
PusTcToRequestConverter,
};
use satrs::request::{GenericMessage, UniqueApidTargetId};
use satrs::spacepackets::ecss::tc::PusTcReader;
use satrs::spacepackets::ecss::{EcssEnumU16, PusPacket};
use satrs::tmtc::{PacketAsVec, PacketSenderWithSharedPool};
use satrs::tmtc::PacketSenderWithSharedPool;
use satrs_example::config::components::PUS_ACTION_SERVICE;
use satrs_example::config::tmtc_err;
use std::sync::mpsc;
@@ -465,10 +465,7 @@ mod tests {
.verif_reporter()
.check_next_is_acceptance_success(id, accepted_token.request_id());
self.pus_packet_tx
.send(EcssTcAndToken::new(
PacketAsVec::new(self.service.service_helper.id(), tc.to_vec().unwrap()),
accepted_token,
))
.send(EcssTcAndToken::new(tc.to_vec().unwrap(), accepted_token))
.unwrap();
}
}

View File

@@ -8,9 +8,10 @@ use satrs::pus::event_srv::PusEventServiceHandler;
use satrs::pus::verification::VerificationReporter;
use satrs::pus::{
EcssTcAndToken, EcssTcInMemConverter, EcssTcInSharedStoreConverter, EcssTcInVecConverter,
EcssTmSender, MpscTcReceiver, MpscTmAsVecSender, PusPacketHandlerResult, PusServiceHelper,
EcssTmSender, MpscTcReceiver, MpscTmAsVecSender, PacketAsVec, PusPacketHandlerResult,
PusServiceHelper,
};
use satrs::tmtc::{PacketAsVec, PacketSenderWithSharedPool};
use satrs::tmtc::PacketSenderWithSharedPool;
use satrs_example::config::components::PUS_EVENT_MANAGEMENT;
use super::HandlingStatus;

View File

@@ -9,13 +9,13 @@ use satrs::pus::verification::{
use satrs::pus::{
ActivePusRequestStd, ActiveRequestProvider, DefaultActiveRequestMap, EcssTcAndToken,
EcssTcInMemConverter, EcssTcInSharedStoreConverter, EcssTcInVecConverter, EcssTmSender,
EcssTmtcError, GenericConversionError, MpscTcReceiver, MpscTmAsVecSender,
EcssTmtcError, GenericConversionError, MpscTcReceiver, MpscTmAsVecSender, PacketAsVec,
PusPacketHandlerResult, PusReplyHandler, PusServiceHelper, PusTcToRequestConverter,
};
use satrs::request::{GenericMessage, UniqueApidTargetId};
use satrs::spacepackets::ecss::tc::PusTcReader;
use satrs::spacepackets::ecss::{hk, PusPacket};
use satrs::tmtc::{PacketAsVec, PacketSenderWithSharedPool};
use satrs::tmtc::PacketSenderWithSharedPool;
use satrs_example::config::components::PUS_HK_SERVICE;
use satrs_example::config::{hk_err, tmtc_err};
use std::sync::mpsc;

View File

@@ -1,6 +1,5 @@
use crate::requests::GenericRequestRouter;
use log::warn;
use satrs::pool::PoolAddr;
use satrs::pus::verification::{
self, FailParams, TcStateAccepted, TcStateStarted, VerificationReporter,
VerificationReporterCfg, VerificationReportingProvider, VerificationToken,
@@ -14,8 +13,7 @@ use satrs::pus::{
use satrs::queue::{GenericReceiveError, GenericSendError};
use satrs::request::{Apid, GenericMessage, MessageMetadata};
use satrs::spacepackets::ecss::tc::PusTcReader;
use satrs::spacepackets::ecss::{PusPacket, PusServiceId};
use satrs::tmtc::{PacketAsVec, PacketInPool};
use satrs::spacepackets::ecss::PusServiceId;
use satrs::ComponentId;
use satrs_example::config::components::PUS_ROUTING_SERVICE;
use satrs_example::config::{tmtc_err, CustomPusServiceId};
@@ -76,54 +74,19 @@ impl<TmSender: EcssTmSender> PusTcDistributor<TmSender> {
}
}
pub fn handle_tc_packet_vec(
pub fn handle_tc_packet(
&mut self,
packet_as_vec: PacketAsVec,
tc_in_memory: TcInMemory,
service: u8,
pus_tc: &PusTcReader,
) -> Result<PusPacketHandlerResult, GenericSendError> {
self.handle_tc_generic(packet_as_vec.sender_id, None, &packet_as_vec.packet)
}
pub fn handle_tc_packet_in_store(
&mut self,
packet_in_pool: PacketInPool,
pus_tc_copy: &[u8],
) -> Result<PusPacketHandlerResult, GenericSendError> {
self.handle_tc_generic(
packet_in_pool.sender_id,
Some(packet_in_pool.store_addr),
pus_tc_copy,
)
}
pub fn handle_tc_generic(
&mut self,
sender_id: ComponentId,
addr_opt: Option<PoolAddr>,
raw_tc: &[u8],
) -> Result<PusPacketHandlerResult, GenericSendError> {
let pus_tc_result = PusTcReader::new(raw_tc);
if pus_tc_result.is_err() {
log::warn!(
"error creating PUS TC from raw data received from {}: {}",
sender_id,
pus_tc_result.unwrap_err()
);
log::warn!("raw data: {:x?}", raw_tc);
return Ok(PusPacketHandlerResult::RequestHandled);
}
let pus_tc = pus_tc_result.unwrap().0;
let init_token = self.verif_reporter.add_tc(&pus_tc);
let init_token = self.verif_reporter.add_tc(pus_tc);
self.stamp_helper.update_from_now();
let accepted_token = self
.verif_reporter
.acceptance_success(&self.tm_sender, init_token, self.stamp_helper.stamp())
.expect("Acceptance success failure");
let service = PusServiceId::try_from(pus_tc.service());
let tc_in_memory: TcInMemory = if let Some(store_addr) = addr_opt {
PacketInPool::new(sender_id, store_addr).into()
} else {
PacketAsVec::new(sender_id, Vec::from(raw_tc)).into()
};
let service = PusServiceId::try_from(service);
match service {
Ok(standard_service) => match standard_service {
PusServiceId::Test => self.pus_router.test_tc_sender.send(EcssTcAndToken {
@@ -498,7 +461,7 @@ pub(crate) mod tests {
use std::time::Duration;
use satrs::pus::test_util::TEST_COMPONENT_ID_0;
use satrs::pus::{MpscTmAsVecSender, PusTmVariant};
use satrs::pus::{MpscTmAsVecSender, PacketAsVec, PusTmVariant};
use satrs::request::RequestId;
use satrs::{
pus::{

View File

@@ -1,6 +1,6 @@
use derive_new::new;
use log::{error, warn};
use satrs::tmtc::{PacketAsVec, PacketSenderWithSharedPool};
use satrs::tmtc::PacketSenderWithSharedPool;
use std::sync::mpsc;
use std::time::Duration;
@@ -9,7 +9,7 @@ use satrs::pool::SharedStaticMemoryPool;
use satrs::pus::verification::VerificationReporter;
use satrs::pus::{
DefaultActiveRequestMap, EcssTcAndToken, EcssTcInMemConverter, EcssTcInSharedStoreConverter,
EcssTcInVecConverter, MpscTcReceiver, MpscTmAsVecSender, PusPacketHandlerResult,
EcssTcInVecConverter, MpscTcReceiver, MpscTmAsVecSender, PacketAsVec, PusPacketHandlerResult,
PusServiceHelper,
};
use satrs::request::GenericMessage;

View File

@@ -9,9 +9,10 @@ use satrs::pus::scheduler_srv::PusSchedServiceHandler;
use satrs::pus::verification::VerificationReporter;
use satrs::pus::{
EcssTcAndToken, EcssTcInMemConverter, EcssTcInSharedStoreConverter, EcssTcInVecConverter,
EcssTmSender, MpscTcReceiver, MpscTmAsVecSender, PusPacketHandlerResult, PusServiceHelper,
EcssTmSender, MpscTcReceiver, MpscTmAsVecSender, PacketAsVec, PacketInPool,
PusPacketHandlerResult, PusServiceHelper,
};
use satrs::tmtc::{PacketAsVec, PacketInPool, PacketSenderWithSharedPool};
use satrs::tmtc::PacketSenderWithSharedPool;
use satrs::ComponentId;
use satrs_example::config::components::PUS_SCHED_SERVICE;

View File

@@ -7,13 +7,13 @@ use satrs::pus::verification::{FailParams, VerificationReporter, VerificationRep
use satrs::pus::EcssTcInSharedStoreConverter;
use satrs::pus::{
EcssTcAndToken, EcssTcInMemConverter, EcssTcInVecConverter, EcssTmSender, MpscTcReceiver,
MpscTmAsVecSender, PusPacketHandlerResult, PusServiceHelper,
MpscTmAsVecSender, PacketAsVec, PusPacketHandlerResult, PusServiceHelper,
};
use satrs::spacepackets::ecss::tc::PusTcReader;
use satrs::spacepackets::ecss::PusPacket;
use satrs::spacepackets::time::cds::CdsTime;
use satrs::spacepackets::time::TimeWriter;
use satrs::tmtc::{PacketAsVec, PacketSenderWithSharedPool};
use satrs::tmtc::PacketSenderWithSharedPool;
use satrs_example::config::components::PUS_TEST_SERVICE;
use satrs_example::config::{tmtc_err, TEST_EVENT};
use std::sync::mpsc;

View File

@@ -1,19 +1,23 @@
use satrs::{
pool::PoolProvider,
tmtc::{PacketAsVec, PacketInPool, PacketSenderWithSharedPool, SharedPacketPool},
pus::{PacketAsVec, PacketInPool},
tmtc::{PacketSenderWithSharedPool, SharedPacketPool},
};
use std::sync::mpsc::{self, TryRecvError};
use satrs::pus::MpscTmAsVecSender;
use satrs::{
pus::MpscTmAsVecSender,
spacepackets::ecss::{tc::PusTcReader, PusPacket},
};
use crate::pus::{HandlingStatus, PusTcDistributor};
use crate::pus::PusTcDistributor;
// TC source components where static pools are the backing memory of the received telecommands.
pub struct TcSourceTaskStatic {
shared_tc_pool: SharedPacketPool,
tc_receiver: mpsc::Receiver<PacketInPool>,
tc_buf: [u8; 4096],
pus_distributor: PusTcDistributor<PacketSenderWithSharedPool>,
pus_receiver: PusTcDistributor<PacketSenderWithSharedPool>,
}
impl TcSourceTaskStatic {
@@ -26,7 +30,7 @@ impl TcSourceTaskStatic {
shared_tc_pool,
tc_receiver,
tc_buf: [0; 4096],
pus_distributor: pus_receiver,
pus_receiver,
}
}
@@ -34,9 +38,7 @@ impl TcSourceTaskStatic {
self.poll_tc();
}
pub fn poll_tc(&mut self) -> HandlingStatus {
// Right now, we only expect ECSS PUS packets.
// If packets like CFDP are expected, we might have to check the APID first.
pub fn poll_tc(&mut self) -> bool {
match self.tc_receiver.try_recv() {
Ok(packet_in_pool) => {
let pool = self
@@ -47,16 +49,29 @@ impl TcSourceTaskStatic {
pool.read(&packet_in_pool.store_addr, &mut self.tc_buf)
.expect("reading pool failed");
drop(pool);
self.pus_distributor
.handle_tc_packet_in_store(packet_in_pool, &self.tc_buf)
.ok();
HandlingStatus::HandledOne
match PusTcReader::new(&self.tc_buf) {
Ok((pus_tc, _)) => {
self.pus_receiver
.handle_tc_packet(
satrs::pus::TcInMemory::StoreAddr(packet_in_pool.store_addr),
pus_tc.service(),
&pus_tc,
)
.ok();
true
}
Err(e) => {
log::warn!("error creating PUS TC from raw data: {e}");
log::warn!("raw data: {:x?}", self.tc_buf);
true
}
}
}
Err(e) => match e {
TryRecvError::Empty => HandlingStatus::Empty,
TryRecvError::Empty => false,
TryRecvError::Disconnected => {
log::warn!("tmtc thread: sender disconnected");
HandlingStatus::Empty
false
}
},
}
@@ -66,7 +81,7 @@ impl TcSourceTaskStatic {
// TC source components where the heap is the backing memory of the received telecommands.
pub struct TcSourceTaskDynamic {
pub tc_receiver: mpsc::Receiver<PacketAsVec>,
pus_distributor: PusTcDistributor<MpscTmAsVecSender>,
pus_receiver: PusTcDistributor<MpscTmAsVecSender>,
}
impl TcSourceTaskDynamic {
@@ -76,7 +91,7 @@ impl TcSourceTaskDynamic {
) -> Self {
Self {
tc_receiver,
pus_distributor: pus_receiver,
pus_receiver,
}
}
@@ -84,21 +99,31 @@ impl TcSourceTaskDynamic {
self.poll_tc();
}
pub fn poll_tc(&mut self) -> HandlingStatus {
// Right now, we only expect ECSS PUS packets.
// If packets like CFDP are expected, we might have to check the APID first.
pub fn poll_tc(&mut self) -> bool {
// Right now, we only expect PUS packets.
match self.tc_receiver.try_recv() {
Ok(packet_as_vec) => {
self.pus_distributor
.handle_tc_packet_vec(packet_as_vec)
.ok();
HandlingStatus::HandledOne
}
Ok(packet_as_vec) => match PusTcReader::new(&packet_as_vec.packet) {
Ok((pus_tc, _)) => {
self.pus_receiver
.handle_tc_packet(
satrs::pus::TcInMemory::Vec(packet_as_vec.packet.clone()),
pus_tc.service(),
&pus_tc,
)
.ok();
true
}
Err(e) => {
log::warn!("error creating PUS TC from raw data: {e}");
log::warn!("raw data: {:x?}", packet_as_vec.packet);
true
}
},
Err(e) => match e {
TryRecvError::Empty => HandlingStatus::Empty,
TryRecvError::Empty => false,
TryRecvError::Disconnected => {
log::warn!("tmtc thread: sender disconnected");
HandlingStatus::Empty
false
}
},
}

View File

@@ -4,7 +4,6 @@ use std::{
};
use log::info;
use satrs::tmtc::{PacketAsVec, PacketInPool, SharedPacketPool};
use satrs::{
pool::PoolProvider,
seq_count::{CcsdsSimpleSeqCountProvider, SequenceCountProviderCore},
@@ -14,6 +13,10 @@ use satrs::{
CcsdsPacket,
},
};
use satrs::{
pus::{PacketAsVec, PacketInPool},
tmtc::SharedPacketPool,
};
use crate::interface::tcp::SyncTcpTmSource;

View File

@@ -8,10 +8,6 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
# [unreleased]
# [v0.1.2] 2024-04-17
Allow `satrs-shared` from `v0.1.3` to `<v0.2`.
# [v0.1.1] 2024-02-17
- Bumped `spacepackets` to v0.10.0

View File

@@ -1,6 +1,6 @@
[package]
name = "satrs-mib"
version = "0.1.2"
version = "0.1.1"
edition = "2021"
rust-version = "1.61"
authors = ["Robin Mueller <muellerr@irs.uni-stuttgart.de>"]
@@ -23,12 +23,13 @@ version = "1"
optional = true
[dependencies.satrs-shared]
version = ">=0.1.3, <0.2"
path = "../satrs-shared"
version = "0.1.3"
features = ["serde"]
[dependencies.satrs-mib-codegen]
path = "codegen"
version = "0.1.2"
version = "0.1.1"
[dependencies.serde]
version = "1"

View File

@@ -1,6 +1,6 @@
[package]
name = "satrs-mib-codegen"
version = "0.1.2"
version = "0.1.1"
edition = "2021"
description = "satrs-mib proc macro implementation"
homepage = "https://egit.irs.uni-stuttgart.de/rust/sat-rs"
@@ -28,7 +28,8 @@ features = ["full"]
trybuild = { version = "1", features = ["diff"] }
[dev-dependencies.satrs-shared]
version = ">=0.1.3, <0.2"
version = "0.1.3"
path = "../../satrs-shared"
[dev-dependencies.satrs-mib]
path = ".."

View File

@@ -8,10 +8,6 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
# [unreleased]
# [v0.1.3] 2024-04-16
Allow `spacepackets` range starting with v0.10 and v0.11.
# [v0.1.2] 2024-02-17
- Bumped `spacepackets` to v0.10.0 for `UnsignedEnum` trait change.

View File

@@ -18,7 +18,7 @@ default-features = false
optional = true
[dependencies.spacepackets]
version = ">0.9, <=0.11"
version = "0.11.0-rc.2"
default-features = false
[features]

View File

@@ -8,34 +8,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
# [unreleased]
# [v0.2.0-rc.4] 2024-04-23
## Changed
- The `parse_for_ccsds_space_packets` method now expects a non-mutable slice and does not copy
broken tail packets anymore. It also does not expect a mutable `next_write_idx` argument anymore.
Instead, a `ParseResult` structure is returned which contains the `packets_found` and an
optional `incomplete_tail_start` value.
## Fixed
- `parse_for_ccsds_space_packets` did not detect CCSDS space packets at the buffer end with the
smallest possible size of 7 bytes.
- TCP server component now re-registers the internal `mio::Poll` object if the client reset
the connection unexpectedly. Not doing so prevented the server from functioning properly
after a re-connect.
# [v0.2.0-rc.3] 2024-04-17
docs-rs hotfix 2
# [v0.2.0-rc.2] 2024-04-17
docs-rs hotfix
# [v0.2.0-rc.1] 2024-04-17
- `spacepackets` v0.11
- `spacepackets` v0.11.0
## Added
@@ -58,9 +31,6 @@ docs-rs hotfix
- Renamed `ReceivesTcCore` to `PacketSenderRaw` to better show its primary purpose. It now contains
a `send_raw_tc` method which is not mutable anymore.
- Renamed `TmPacketSourceCore` to `TmPacketSource`.
- Renamed `EcssTmSenderCore` to `EcssTmSender`.
- Renamed `StoreAddr` to `PoolAddr`.
- Reanmed `StoreError` to `PoolError`.
- TCP server generics order. The error generics come last now.
- `encoding::ccsds::PacketIdValidator` renamed to `ValidatorU16Id`, which lives in the crate root.
It can be used for both CCSDS packet ID and CCSDS APID validation.

View File

@@ -1,8 +1,8 @@
[package]
name = "satrs"
version = "0.2.0-rc.4"
version = "0.2.0-rc.0"
edition = "2021"
rust-version = "1.71.1"
rust-version = "1.61"
authors = ["Robin Mueller <muellerr@irs.uni-stuttgart.de>"]
description = "A framework to build software for remote systems"
homepage = "https://absatsw.irs.uni-stuttgart.de/projects/sat-rs/"
@@ -19,26 +19,13 @@ smallvec = "1"
crc = "3"
[dependencies.satrs-shared]
version = ">=0.1.3, <0.2"
version = "0.1.3"
path = "../satrs-shared"
[dependencies.num_enum]
version = ">0.5, <=0.7"
default-features = false
[dependencies.spacepackets]
version = "0.11"
default-features = false
[dependencies.cobs]
git = "https://github.com/robamu/cobs.rs.git"
version = "0.2.3"
branch = "all_features"
default-features = false
[dependencies.num-traits]
version = "0.2"
default-features = false
[dependencies.dyn-clone]
version = "1"
optional = true
@@ -51,6 +38,10 @@ optional = true
version = "0.7"
optional = true
[dependencies.num-traits]
version = "0.2"
default-features = false
[dependencies.downcast-rs]
version = "1.2"
default-features = false
@@ -84,6 +75,17 @@ version = "0.8"
features = ["os-poll", "net"]
optional = true
[dependencies.spacepackets]
# git = "https://egit.irs.uni-stuttgart.de/rust/spacepackets.git"
version = "0.11.0-rc.2"
default-features = false
[dependencies.cobs]
git = "https://github.com/robamu/cobs.rs.git"
version = "0.2.3"
branch = "all_features"
default-features = false
[dev-dependencies]
serde = "1"
zerocopy = "0.7"
@@ -126,4 +128,4 @@ doc-images = []
[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docs_rs", "--generate-link-to-definition"]
rustdoc-args = ["--cfg", "doc_cfg", "--generate-link-to-definition"]

View File

@@ -1,4 +1,4 @@
use crate::{params::Params, pool::PoolAddr};
use crate::{params::Params, pool::StoreAddr};
#[cfg(feature = "alloc")]
pub use alloc_mod::*;
@@ -21,7 +21,7 @@ impl ActionRequest {
#[derive(Clone, Eq, PartialEq, Debug)]
pub enum ActionRequestVariant {
NoData,
StoreData(PoolAddr),
StoreData(StoreAddr),
#[cfg(feature = "alloc")]
VecData(alloc::vec::Vec<u8>),
}

View File

@@ -1,102 +1,68 @@
use spacepackets::{CcsdsPacket, SpHeader};
use crate::{tmtc::PacketSenderRaw, ComponentId};
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum SpValidity {
Valid,
/// The space packet can be assumed to have a valid format, but the packet should
/// be skipped.
Skip,
/// The space packet or space packet header has an invalid format, for example a CRC check
/// failed. In that case, the parser loses the packet synchronization and needs to check for
/// the start of a new space packet header start again. The space packet header
/// [spacepackets::PacketId] can be used as a synchronization marker to detect the start
/// of a possible valid packet again.
Invalid,
}
/// Simple trait to allow user code to check the validity of a space packet.
pub trait SpacePacketValidator {
fn validate(&self, sp_header: &SpHeader, raw_buf: &[u8]) -> SpValidity;
}
#[derive(Default, Debug, PartialEq, Eq)]
pub struct ParseResult {
pub packets_found: u32,
/// If an incomplete space packet was found, its start index is indicated by this value.
pub incomplete_tail_start: Option<usize>,
}
use crate::{tmtc::PacketSenderRaw, ComponentId, ValidatorU16Id};
/// This function parses a given buffer for tightly packed CCSDS space packets. It uses the
/// [spacepackets::SpHeader] of the CCSDS packets and a user provided [SpacePacketValidator]
/// to check whether a received space packet is relevant for processing.
/// [spacepackets::PacketId] field of the CCSDS packets to detect the start of a CCSDS space packet
/// and then uses the length field of the packet to extract CCSDS packets.
///
/// This function is also able to deal with broken tail packets at the end as long a the parser
/// can read the full 7 bytes which constitue a space packet header plus one byte minimal size.
/// If broken tail packets are detected, they are moved to the front of the buffer, and the write
/// index for future write operations will be written to the `next_write_idx` argument.
///
/// The parses will behave differently based on the [SpValidity] returned from the user provided
/// [SpacePacketValidator]:
///
/// 1. [SpValidity::Valid]: The parser will forward all packets to the given `packet_sender` and
/// return the number of packets found.If the [PacketSenderRaw::send_packet] calls fails, the
/// error will be returned.
/// 2. [SpValidity::Invalid]: The parser assumes that the synchronization is lost and tries to
/// find the start of a new space packet header by scanning all the following bytes.
/// 3. [SpValidity::Skip]: The parser skips the packet using the packet length determined from the
/// space packet header.
/// The parser will forward all packets which were decoded successfully to the given
/// `packet_sender` and return the number of packets found. If the [PacketSenderRaw::send_packet]
/// calls fails, the error will be returned.
pub fn parse_buffer_for_ccsds_space_packets<SendError>(
buf: &[u8],
packet_validator: &(impl SpacePacketValidator + ?Sized),
buf: &mut [u8],
packet_id_validator: &(impl ValidatorU16Id + ?Sized),
sender_id: ComponentId,
packet_sender: &(impl PacketSenderRaw<Error = SendError> + ?Sized),
) -> Result<ParseResult, SendError> {
let mut parse_result = ParseResult::default();
next_write_idx: &mut usize,
) -> Result<u32, SendError> {
*next_write_idx = 0;
let mut packets_found = 0;
let mut current_idx = 0;
let buf_len = buf.len();
loop {
if current_idx + 7 > buf.len() {
if current_idx + 7 >= buf.len() {
break;
}
let sp_header = SpHeader::from_be_bytes(&buf[current_idx..]).unwrap().0;
match packet_validator.validate(&sp_header, &buf[current_idx..]) {
SpValidity::Valid => {
let packet_size = sp_header.total_len();
if (current_idx + packet_size) <= buf_len {
packet_sender
.send_packet(sender_id, &buf[current_idx..current_idx + packet_size])?;
parse_result.packets_found += 1;
} else {
// Move packet to start of buffer if applicable.
parse_result.incomplete_tail_start = Some(current_idx);
let packet_id = u16::from_be_bytes(buf[current_idx..current_idx + 2].try_into().unwrap());
if packet_id_validator.validate(packet_id) {
let length_field =
u16::from_be_bytes(buf[current_idx + 4..current_idx + 6].try_into().unwrap());
let packet_size = length_field + 7;
if (current_idx + packet_size as usize) <= buf_len {
packet_sender.send_packet(
sender_id,
&buf[current_idx..current_idx + packet_size as usize],
)?;
packets_found += 1;
} else {
// Move packet to start of buffer if applicable.
if current_idx > 0 {
buf.copy_within(current_idx.., 0);
*next_write_idx = buf.len() - current_idx;
}
current_idx += packet_size;
continue;
}
SpValidity::Skip => {
current_idx += sp_header.total_len();
}
// We might have lost sync. Try to find the start of a new space packet header.
SpValidity::Invalid => {
current_idx += 1;
}
current_idx += packet_size as usize;
continue;
}
current_idx += 1;
}
Ok(parse_result)
Ok(packets_found)
}
#[cfg(test)]
mod tests {
use spacepackets::{
ecss::{tc::PusTcCreator, WritablePusPacket},
CcsdsPacket, PacketId, PacketSequenceCtrl, PacketType, SequenceFlags, SpHeader,
PacketId, SpHeader,
};
use crate::{encoding::tests::TcCacher, ComponentId};
use super::{parse_buffer_for_ccsds_space_packets, SpValidity, SpacePacketValidator};
use super::parse_buffer_for_ccsds_space_packets;
const PARSER_ID: ComponentId = 0x05;
const TEST_APID_0: u16 = 0x02;
@@ -104,30 +70,6 @@ mod tests {
const TEST_PACKET_ID_0: PacketId = PacketId::new_for_tc(true, TEST_APID_0);
const TEST_PACKET_ID_1: PacketId = PacketId::new_for_tc(true, TEST_APID_1);
#[derive(Default)]
struct SimpleVerificator {
pub enable_second_id: bool,
}
impl SimpleVerificator {
pub fn new_with_second_id() -> Self {
Self {
enable_second_id: true,
}
}
}
impl SpacePacketValidator for SimpleVerificator {
fn validate(&self, sp_header: &SpHeader, _raw_buf: &[u8]) -> super::SpValidity {
if sp_header.packet_id() == TEST_PACKET_ID_0
|| (self.enable_second_id && sp_header.packet_id() == TEST_PACKET_ID_1)
{
return SpValidity::Valid;
}
SpValidity::Skip
}
}
#[test]
fn test_basic() {
let sph = SpHeader::new_from_apid(TEST_APID_0);
@@ -136,16 +78,19 @@ mod tests {
let packet_len = ping_tc
.write_to_bytes(&mut buffer)
.expect("writing packet failed");
let valid_packet_ids = [TEST_PACKET_ID_0];
let tc_cacher = TcCacher::default();
let mut next_write_idx = 0;
let parse_result = parse_buffer_for_ccsds_space_packets(
&buffer,
&SimpleVerificator::default(),
&mut buffer,
valid_packet_ids.as_slice(),
PARSER_ID,
&tc_cacher,
&mut next_write_idx,
);
assert!(parse_result.is_ok());
let parse_result = parse_result.unwrap();
assert_eq!(parse_result.packets_found, 1);
let parsed_packets = parse_result.unwrap();
assert_eq!(parsed_packets, 1);
let mut queue = tc_cacher.tc_queue.borrow_mut();
assert_eq!(queue.len(), 1);
let packet_with_sender = queue.pop_front().unwrap();
@@ -165,16 +110,19 @@ mod tests {
let packet_len_action = action_tc
.write_to_bytes(&mut buffer[packet_len_ping..])
.expect("writing packet failed");
let valid_packet_ids = [TEST_PACKET_ID_0];
let tc_cacher = TcCacher::default();
let mut next_write_idx = 0;
let parse_result = parse_buffer_for_ccsds_space_packets(
&buffer,
&SimpleVerificator::default(),
&mut buffer,
valid_packet_ids.as_slice(),
PARSER_ID,
&tc_cacher,
&mut next_write_idx,
);
assert!(parse_result.is_ok());
let parse_result = parse_result.unwrap();
assert_eq!(parse_result.packets_found, 2);
let parsed_packets = parse_result.unwrap();
assert_eq!(parsed_packets, 2);
let mut queue = tc_cacher.tc_queue.borrow_mut();
assert_eq!(queue.len(), 2);
let packet_with_addr = queue.pop_front().unwrap();
@@ -201,13 +149,19 @@ mod tests {
let packet_len_action = action_tc
.write_to_bytes(&mut buffer[packet_len_ping..])
.expect("writing packet failed");
let valid_packet_ids = [TEST_PACKET_ID_0, TEST_PACKET_ID_1];
let tc_cacher = TcCacher::default();
let verificator = SimpleVerificator::new_with_second_id();
let parse_result =
parse_buffer_for_ccsds_space_packets(&buffer, &verificator, PARSER_ID, &tc_cacher);
let mut next_write_idx = 0;
let parse_result = parse_buffer_for_ccsds_space_packets(
&mut buffer,
valid_packet_ids.as_slice(),
PARSER_ID,
&tc_cacher,
&mut next_write_idx,
);
assert!(parse_result.is_ok());
let parse_result = parse_result.unwrap();
assert_eq!(parse_result.packets_found, 2);
let parsed_packets = parse_result.unwrap();
assert_eq!(parsed_packets, 2);
let mut queue = tc_cacher.tc_queue.borrow_mut();
assert_eq!(queue.len(), 2);
let packet_with_addr = queue.pop_front().unwrap();
@@ -232,25 +186,25 @@ mod tests {
let packet_len_action = action_tc
.write_to_bytes(&mut buffer[packet_len_ping..])
.expect("writing packet failed");
let valid_packet_ids = [TEST_PACKET_ID_0, TEST_PACKET_ID_1];
let tc_cacher = TcCacher::default();
let verificator = SimpleVerificator::new_with_second_id();
let mut next_write_idx = 0;
let parse_result = parse_buffer_for_ccsds_space_packets(
&buffer[..packet_len_ping + packet_len_action - 4],
&verificator,
&mut buffer[..packet_len_ping + packet_len_action - 4],
valid_packet_ids.as_slice(),
PARSER_ID,
&tc_cacher,
&mut next_write_idx,
);
assert!(parse_result.is_ok());
let parse_result = parse_result.unwrap();
assert_eq!(parse_result.packets_found, 1);
assert!(parse_result.incomplete_tail_start.is_some());
let incomplete_tail_idx = parse_result.incomplete_tail_start.unwrap();
assert_eq!(incomplete_tail_idx, packet_len_ping);
let parsed_packets = parse_result.unwrap();
assert_eq!(parsed_packets, 1);
let queue = tc_cacher.tc_queue.borrow();
assert_eq!(queue.len(), 1);
// The broken packet was moved to the start, so the next write index should be after the
// last segment missing 4 bytes.
assert_eq!(next_write_idx, packet_len_action - 4);
}
#[test]
@@ -261,39 +215,21 @@ mod tests {
let packet_len_ping = ping_tc
.write_to_bytes(&mut buffer)
.expect("writing packet failed");
let valid_packet_ids = [TEST_PACKET_ID_0, TEST_PACKET_ID_1];
let tc_cacher = TcCacher::default();
let verificator = SimpleVerificator::new_with_second_id();
let mut next_write_idx = 0;
let parse_result = parse_buffer_for_ccsds_space_packets(
&buffer[..packet_len_ping - 4],
&verificator,
&mut buffer[..packet_len_ping - 4],
valid_packet_ids.as_slice(),
PARSER_ID,
&tc_cacher,
&mut next_write_idx,
);
assert_eq!(next_write_idx, 0);
assert!(parse_result.is_ok());
let parse_result = parse_result.unwrap();
assert_eq!(parse_result.packets_found, 0);
let parsed_packets = parse_result.unwrap();
assert_eq!(parsed_packets, 0);
let queue = tc_cacher.tc_queue.borrow();
assert_eq!(queue.len(), 0);
}
#[test]
fn test_smallest_packet() {
let ccsds_header_only = SpHeader::new(
PacketId::new(PacketType::Tc, true, TEST_APID_0),
PacketSequenceCtrl::new(SequenceFlags::Unsegmented, 0),
0,
);
let mut buf: [u8; 7] = [0; 7];
ccsds_header_only
.write_to_be_bytes(&mut buf)
.expect("writing failed");
let verificator = SimpleVerificator::default();
let tc_cacher = TcCacher::default();
let parse_result =
parse_buffer_for_ccsds_space_packets(&buf, &verificator, PARSER_ID, &tc_cacher);
assert!(parse_result.is_ok());
let parse_result = parse_result.unwrap();
assert_eq!(parse_result.packets_found, 1);
}
}

View File

@@ -10,10 +10,7 @@ pub(crate) mod tests {
use alloc::collections::VecDeque;
use crate::{
tmtc::{PacketAsVec, PacketSenderRaw},
ComponentId,
};
use crate::{pus::PacketAsVec, tmtc::PacketSenderRaw, ComponentId};
use super::cobs::encode_packet_with_cobs;

View File

@@ -181,8 +181,8 @@ impl<
/// useful if using the port number 0 for OS auto-assignment.
pub fn local_addr(&self) -> std::io::Result<SocketAddr>;
/// Delegation to the [TcpTmtcGenericServer::handle_all_connections] call.
pub fn handle_all_connections(
/// Delegation to the [TcpTmtcGenericServer::handle_next_connection] call.
pub fn handle_next_connection(
&mut self,
poll_duration: Option<Duration>,
) -> Result<ConnectionResult, TcpTmtcError<TmError, TcError>>;
@@ -211,8 +211,8 @@ mod tests {
tests::{ConnectionFinishedHandler, SyncTmSource},
ConnectionResult, ServerConfig,
},
pus::PacketAsVec,
queue::GenericSendError,
tmtc::PacketAsVec,
ComponentId,
};
use alloc::sync::Arc;
@@ -274,7 +274,7 @@ mod tests {
let set_if_done = conn_handled.clone();
// Call the connection handler in separate thread, does block.
thread::spawn(move || {
let result = tcp_server.handle_all_connections(Some(Duration::from_millis(100)));
let result = tcp_server.handle_next_connection(Some(Duration::from_millis(100)));
if result.is_err() {
panic!("handling connection failed: {:?}", result.unwrap_err());
}
@@ -330,7 +330,7 @@ mod tests {
let set_if_done = conn_handled.clone();
// Call the connection handler in separate thread, does block.
thread::spawn(move || {
let result = tcp_server.handle_all_connections(Some(Duration::from_millis(100)));
let result = tcp_server.handle_next_connection(Some(Duration::from_millis(100)));
if result.is_err() {
panic!("handling connection failed: {:?}", result.unwrap_err());
}
@@ -436,7 +436,7 @@ mod tests {
let start = Instant::now();
// Call the connection handler in separate thread, does block.
let thread_jh = thread::spawn(move || loop {
let result = tcp_server.handle_all_connections(Some(Duration::from_millis(20)));
let result = tcp_server.handle_next_connection(Some(Duration::from_millis(20)));
if result.is_err() {
panic!("handling connection failed: {:?}", result.unwrap_err());
}
@@ -470,7 +470,7 @@ mod tests {
let start = Instant::now();
// Call the connection handler in separate thread, does block.
let thread_jh = thread::spawn(move || loop {
let result = tcp_server.handle_all_connections(Some(Duration::from_millis(20)));
let result = tcp_server.handle_next_connection(Some(Duration::from_millis(20)));
if result.is_err() {
panic!("handling connection failed: {:?}", result.unwrap_err());
}

View File

@@ -9,6 +9,8 @@ use mio::{Events, Interest, Poll, Token};
use socket2::{Domain, Socket, Type};
use std::io::{self, Read};
use std::net::SocketAddr;
// use std::net::TcpListener;
// use std::net::{SocketAddr, TcpStream};
use std::thread;
use crate::tmtc::{PacketSenderRaw, PacketSource};
@@ -17,7 +19,9 @@ use thiserror::Error;
// Re-export the TMTC in COBS server.
pub use crate::hal::std::tcp_cobs_server::{CobsTcParser, CobsTmSender, TcpTmtcInCobsServer};
pub use crate::hal::std::tcp_spacepackets_server::{SpacepacketsTmSender, TcpSpacepacketsServer};
pub use crate::hal::std::tcp_spacepackets_server::{
SpacepacketsTcParser, SpacepacketsTmSender, TcpSpacepacketsServer,
};
/// Configuration struct for the generic TCP TMTC server
///
@@ -161,7 +165,6 @@ pub trait TcpTmSender<TmError, TcError> {
/// Currently, this framework offers the following concrete implementations:
///
/// 1. [TcpTmtcInCobsServer] to exchange TMTC wrapped inside the COBS framing protocol.
/// 2. [TcpSpacepacketsServer] to exchange space packets via TCP.
pub struct TcpTmtcGenericServer<
TmSource: PacketSource<Error = TmError>,
TcSender: PacketSenderRaw<Error = TcSendError>,
@@ -242,16 +245,13 @@ impl<
// Create a poll instance.
let poll = Poll::new()?;
// Create storage for events.
let events = Events::with_capacity(32);
let events = Events::with_capacity(10);
let listener: std::net::TcpListener = socket.into();
let mut mio_listener = TcpListener::from_std(listener);
// Start listening for incoming connections.
poll.registry().register(
&mut mio_listener,
Token(0),
Interest::READABLE | Interest::WRITABLE,
)?;
poll.registry()
.register(&mut mio_listener, Token(0), Interest::READABLE)?;
Ok(Self {
id: cfg.id,
@@ -281,11 +281,11 @@ impl<
self.listener.local_addr()
}
/// This call is used to handle all connection from clients. Right now, it performs
/// This call is used to handle the next connection to a client. Right now, it performs
/// the following steps:
///
/// 1. It calls the [std::net::TcpListener::accept] method until a client connects. An optional
/// timeout can be specified for non-blocking acceptance.
/// 1. It calls the [std::net::TcpListener::accept] method internally using the blocking API
/// until a client connects.
/// 2. It reads all the telecommands from the client and parses all received data using the
/// user specified [TcpTcParser].
/// 3. After reading and parsing all telecommands, it sends back all telemetry using the
@@ -294,7 +294,7 @@ impl<
/// The server will delay for a user-specified period if the client connects to the server
/// for prolonged periods and there is no traffic for the server. This is the case if the
/// client does not send any telecommands and no telemetry needs to be sent back to the client.
pub fn handle_all_connections(
pub fn handle_next_connection(
&mut self,
poll_timeout: Option<Duration>,
) -> Result<ConnectionResult, TcpTmtcError<TmError, TcSendError>> {
@@ -318,17 +318,11 @@ impl<
loop {
match self.listener.accept() {
Ok((stream, addr)) => {
if let Err(e) = self.handle_accepted_connection(stream, addr) {
self.reregister_poll_interest()?;
return Err(e);
}
self.handle_accepted_connection(stream, addr)?;
handled_connections += 1;
}
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => break,
Err(err) => {
self.reregister_poll_interest()?;
return Err(TcpTmtcError::Io(err));
}
Err(err) => return Err(TcpTmtcError::Io(err)),
}
}
}
@@ -338,14 +332,6 @@ impl<
Ok(ConnectionResult::AcceptTimeout)
}
fn reregister_poll_interest(&mut self) -> io::Result<()> {
self.poll.registry().reregister(
&mut self.listener,
Token(0),
Interest::READABLE | Interest::WRITABLE,
)
}
fn handle_accepted_connection(
&mut self,
mut stream: TcpStream,

View File

@@ -5,9 +5,9 @@ use mio::net::{TcpListener, TcpStream};
use std::{io::Write, net::SocketAddr};
use crate::{
encoding::{ccsds::SpacePacketValidator, parse_buffer_for_ccsds_space_packets},
encoding::parse_buffer_for_ccsds_space_packets,
tmtc::{PacketSenderRaw, PacketSource},
ComponentId,
ComponentId, ValidatorU16Id,
};
use super::tcp_server::{
@@ -15,7 +15,20 @@ use super::tcp_server::{
TcpTmSender, TcpTmtcError, TcpTmtcGenericServer,
};
impl<T: SpacePacketValidator, TmError, TcError: 'static> TcpTcParser<TmError, TcError> for T {
/// Concrete [TcpTcParser] implementation for the [TcpSpacepacketsServer].
pub struct SpacepacketsTcParser<PacketIdChecker: ValidatorU16Id> {
packet_id_lookup: PacketIdChecker,
}
impl<PacketIdChecker: ValidatorU16Id> SpacepacketsTcParser<PacketIdChecker> {
pub fn new(packet_id_lookup: PacketIdChecker) -> Self {
Self { packet_id_lookup }
}
}
impl<PacketIdChecker: ValidatorU16Id, TmError, TcError: 'static> TcpTcParser<TmError, TcError>
for SpacepacketsTcParser<PacketIdChecker>
{
fn handle_tc_parsing(
&mut self,
tc_buffer: &mut [u8],
@@ -26,19 +39,14 @@ impl<T: SpacePacketValidator, TmError, TcError: 'static> TcpTcParser<TmError, Tc
next_write_idx: &mut usize,
) -> Result<(), TcpTmtcError<TmError, TcError>> {
// Reader vec full, need to parse for packets.
let parse_result = parse_buffer_for_ccsds_space_packets(
&tc_buffer[..current_write_idx],
self,
conn_result.num_received_tcs += parse_buffer_for_ccsds_space_packets(
&mut tc_buffer[..current_write_idx],
&self.packet_id_lookup,
sender_id,
tc_sender,
next_write_idx,
)
.map_err(|e| TcpTmtcError::TcError(e))?;
if let Some(broken_tail_start) = parse_result.incomplete_tail_start {
// Copy broken tail to front of buffer.
tc_buffer.copy_within(broken_tail_start..current_write_idx, 0);
*next_write_idx = current_write_idx - broken_tail_start;
}
conn_result.num_received_tcs += parse_result.packets_found;
Ok(())
}
}
@@ -79,18 +87,17 @@ impl<TmError, TcError> TcpTmSender<TmError, TcError> for SpacepacketsTmSender {
///
/// This serves only works if
/// [CCSDS 133.0-B-2 space packets](https://public.ccsds.org/Pubs/133x0b2e1.pdf) are the only
/// packet type being exchanged. It uses the CCSDS space packet header [spacepackets::SpHeader] and
/// a user specified [SpacePacketValidator] to determine the space packets relevant for further
/// processing.
/// packet type being exchanged. It uses the CCSDS [spacepackets::PacketId] as the packet delimiter
/// and start marker when parsing for packets. The user specifies a set of expected
/// [spacepackets::PacketId]s as part of the server configuration for that purpose.
///
/// ## Example
///
/// The [TCP server integration tests](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs/tests/tcp_servers.rs)
/// also serves as the example application for this module.
pub struct TcpSpacepacketsServer<
TmSource: PacketSource<Error = TmError>,
TcSender: PacketSenderRaw<Error = SendError>,
Validator: SpacePacketValidator,
PacketIdChecker: ValidatorU16Id,
HandledConnection: HandledConnectionHandler,
TmError,
SendError: 'static,
@@ -99,7 +106,7 @@ pub struct TcpSpacepacketsServer<
TmSource,
TcSender,
SpacepacketsTmSender,
Validator,
SpacepacketsTcParser<PacketIdChecker>,
HandledConnection,
TmError,
SendError,
@@ -108,12 +115,20 @@ pub struct TcpSpacepacketsServer<
impl<
TmSource: PacketSource<Error = TmError>,
TcSender: PacketSenderRaw<Error = TcError>,
Validator: SpacePacketValidator,
TcReceiver: PacketSenderRaw<Error = TcError>,
PacketIdChecker: ValidatorU16Id,
HandledConnection: HandledConnectionHandler,
TmError: 'static,
TcError: 'static,
> TcpSpacepacketsServer<TmSource, TcSender, Validator, HandledConnection, TmError, TcError>
>
TcpSpacepacketsServer<
TmSource,
TcReceiver,
PacketIdChecker,
HandledConnection,
TmError,
TcError,
>
{
///
/// ## Parameter
@@ -121,30 +136,26 @@ impl<
/// * `cfg` - Configuration of the server.
/// * `tm_source` - Generic TM source used by the server to pull telemetry packets which are
/// then sent back to the client.
/// * `tc_sender` - Any received telecommands which were decoded successfully will be
/// forwarded using this [PacketSenderRaw].
/// * `validator` - Used to determine the space packets relevant for further processing and
/// to detect broken space packets.
/// * `handled_connection_hook` - Called to notify the user about a succesfully handled
/// connection.
/// * `stop_signal` - Can be used to shut down the TCP server even for longer running
/// connections.
/// * `tc_receiver` - Any received telecommands which were decoded successfully will be
/// forwarded to this TC receiver.
/// * `packet_id_lookup` - This lookup table contains the relevant packets IDs for packet
/// parsing. This mechanism is used to have a start marker for finding CCSDS packets.
pub fn new(
cfg: ServerConfig,
tm_source: TmSource,
tc_sender: TcSender,
validator: Validator,
handled_connection_hook: HandledConnection,
tc_receiver: TcReceiver,
packet_id_checker: PacketIdChecker,
handled_connection: HandledConnection,
stop_signal: Option<Arc<AtomicBool>>,
) -> Result<Self, std::io::Error> {
Ok(Self {
generic_server: TcpTmtcGenericServer::new(
cfg,
validator,
SpacepacketsTcParser::new(packet_id_checker),
SpacepacketsTmSender::default(),
tm_source,
tc_sender,
handled_connection_hook,
tc_receiver,
handled_connection,
stop_signal,
)?,
})
@@ -158,8 +169,8 @@ impl<
/// useful if using the port number 0 for OS auto-assignment.
pub fn local_addr(&self) -> std::io::Result<SocketAddr>;
/// Delegation to the [TcpTmtcGenericServer::handle_all_connections] call.
pub fn handle_all_connections(
/// Delegation to the [TcpTmtcGenericServer::handle_next_connection] call.
pub fn handle_next_connection(
&mut self,
poll_timeout: Option<Duration>
) -> Result<ConnectionResult, TcpTmtcError<TmError, TcError>>;
@@ -186,17 +197,16 @@ mod tests {
use hashbrown::HashSet;
use spacepackets::{
ecss::{tc::PusTcCreator, WritablePusPacket},
CcsdsPacket, PacketId, SpHeader,
PacketId, SpHeader,
};
use crate::{
encoding::ccsds::{SpValidity, SpacePacketValidator},
hal::std::tcp_server::{
tests::{ConnectionFinishedHandler, SyncTmSource},
ConnectionResult, ServerConfig,
},
pus::PacketAsVec,
queue::GenericSendError,
tmtc::PacketAsVec,
ComponentId,
};
@@ -208,29 +218,16 @@ mod tests {
const TEST_APID_1: u16 = 0x10;
const TEST_PACKET_ID_1: PacketId = PacketId::new_for_tc(true, TEST_APID_1);
#[derive(Default)]
pub struct SimpleValidator(pub HashSet<PacketId>);
impl SpacePacketValidator for SimpleValidator {
fn validate(&self, sp_header: &SpHeader, _raw_buf: &[u8]) -> SpValidity {
if self.0.contains(&sp_header.packet_id()) {
return SpValidity::Valid;
}
// Simple case: Assume that the interface always contains valid space packets.
SpValidity::Skip
}
}
fn generic_tmtc_server(
addr: &SocketAddr,
tc_sender: mpsc::Sender<PacketAsVec>,
tm_source: SyncTmSource,
validator: SimpleValidator,
packet_id_lookup: HashSet<PacketId>,
stop_signal: Option<Arc<AtomicBool>>,
) -> TcpSpacepacketsServer<
SyncTmSource,
mpsc::Sender<PacketAsVec>,
SimpleValidator,
HashSet<PacketId>,
ConnectionFinishedHandler,
(),
GenericSendError,
@@ -239,7 +236,7 @@ mod tests {
ServerConfig::new(TCP_SERVER_ID, *addr, Duration::from_millis(2), 1024, 1024),
tm_source,
tc_sender,
validator,
packet_id_lookup,
ConnectionFinishedHandler::default(),
stop_signal,
)
@@ -251,13 +248,13 @@ mod tests {
let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
let (tc_sender, tc_receiver) = mpsc::channel();
let tm_source = SyncTmSource::default();
let mut validator = SimpleValidator::default();
validator.0.insert(TEST_PACKET_ID_0);
let mut packet_id_lookup = HashSet::new();
packet_id_lookup.insert(TEST_PACKET_ID_0);
let mut tcp_server = generic_tmtc_server(
&auto_port_addr,
tc_sender.clone(),
tm_source,
validator,
packet_id_lookup,
None,
);
let dest_addr = tcp_server
@@ -267,7 +264,7 @@ mod tests {
let set_if_done = conn_handled.clone();
// Call the connection handler in separate thread, does block.
thread::spawn(move || {
let result = tcp_server.handle_all_connections(Some(Duration::from_millis(100)));
let result = tcp_server.handle_next_connection(Some(Duration::from_millis(100)));
if result.is_err() {
panic!("handling connection failed: {:?}", result.unwrap_err());
}
@@ -326,14 +323,14 @@ mod tests {
tm_source.add_tm(&tm_1);
// Set up server
let mut validator = SimpleValidator::default();
validator.0.insert(TEST_PACKET_ID_0);
validator.0.insert(TEST_PACKET_ID_1);
let mut packet_id_lookup = HashSet::new();
packet_id_lookup.insert(TEST_PACKET_ID_0);
packet_id_lookup.insert(TEST_PACKET_ID_1);
let mut tcp_server = generic_tmtc_server(
&auto_port_addr,
tc_sender.clone(),
tm_source,
validator,
packet_id_lookup,
None,
);
let dest_addr = tcp_server
@@ -344,7 +341,7 @@ mod tests {
// Call the connection handler in separate thread, does block.
thread::spawn(move || {
let result = tcp_server.handle_all_connections(Some(Duration::from_millis(100)));
let result = tcp_server.handle_next_connection(Some(Duration::from_millis(100)));
if result.is_err() {
panic!("handling connection failed: {:?}", result.unwrap_err());
}

View File

@@ -43,7 +43,7 @@
//! This includes the [ParamsHeapless] enumeration for contained values which do not require heap
//! allocation, and the [Params] which enumerates [ParamsHeapless] and some additional types which
//! require [alloc] support but allow for more flexbility.
use crate::pool::PoolAddr;
use crate::pool::StoreAddr;
use core::fmt::Debug;
use core::mem::size_of;
use paste::paste;
@@ -588,15 +588,15 @@ from_conversions_for_raw!(
#[non_exhaustive]
pub enum Params {
Heapless(ParamsHeapless),
Store(PoolAddr),
Store(StoreAddr),
#[cfg(feature = "alloc")]
Vec(Vec<u8>),
#[cfg(feature = "alloc")]
String(String),
}
impl From<PoolAddr> for Params {
fn from(x: PoolAddr) -> Self {
impl From<StoreAddr> for Params {
fn from(x: StoreAddr) -> Self {
Self::Store(x)
}
}

View File

@@ -82,7 +82,7 @@ use spacepackets::ByteConversionError;
use std::error::Error;
type NumBlocks = u16;
pub type PoolAddr = u64;
pub type StoreAddr = u64;
/// Simple address type used for transactions with the local pool.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
@@ -100,14 +100,14 @@ impl StaticPoolAddr {
}
}
impl From<StaticPoolAddr> for PoolAddr {
impl From<StaticPoolAddr> for StoreAddr {
fn from(value: StaticPoolAddr) -> Self {
((value.pool_idx as u64) << 16) | value.packet_idx as u64
}
}
impl From<PoolAddr> for StaticPoolAddr {
fn from(value: PoolAddr) -> Self {
impl From<StoreAddr> for StaticPoolAddr {
fn from(value: StoreAddr) -> Self {
Self {
pool_idx: ((value >> 16) & 0xff) as u16,
packet_idx: (value & 0xff) as u16,
@@ -150,59 +150,59 @@ impl Error for StoreIdError {}
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum PoolError {
pub enum StoreError {
/// Requested data block is too large
DataTooLarge(usize),
/// The store is full. Contains the index of the full subpool
StoreFull(u16),
/// Store ID is invalid. This also includes partial errors where only the subpool is invalid
InvalidStoreId(StoreIdError, Option<PoolAddr>),
InvalidStoreId(StoreIdError, Option<StoreAddr>),
/// Valid subpool and packet index, but no data is stored at the given address
DataDoesNotExist(PoolAddr),
DataDoesNotExist(StoreAddr),
ByteConversionError(spacepackets::ByteConversionError),
LockError,
/// Internal or configuration errors
InternalError(u32),
}
impl Display for PoolError {
impl Display for StoreError {
fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
match self {
PoolError::DataTooLarge(size) => {
StoreError::DataTooLarge(size) => {
write!(f, "data to store with size {size} is too large")
}
PoolError::StoreFull(u16) => {
StoreError::StoreFull(u16) => {
write!(f, "store is too full. index for full subpool: {u16}")
}
PoolError::InvalidStoreId(id_e, addr) => {
StoreError::InvalidStoreId(id_e, addr) => {
write!(f, "invalid store ID: {id_e}, address: {addr:?}")
}
PoolError::DataDoesNotExist(addr) => {
StoreError::DataDoesNotExist(addr) => {
write!(f, "no data exists at address {addr:?}")
}
PoolError::InternalError(e) => {
StoreError::InternalError(e) => {
write!(f, "internal error: {e}")
}
PoolError::ByteConversionError(e) => {
StoreError::ByteConversionError(e) => {
write!(f, "store error: {e}")
}
PoolError::LockError => {
StoreError::LockError => {
write!(f, "lock error")
}
}
}
}
impl From<ByteConversionError> for PoolError {
impl From<ByteConversionError> for StoreError {
fn from(value: ByteConversionError) -> Self {
Self::ByteConversionError(value)
}
}
#[cfg(feature = "std")]
impl Error for PoolError {
impl Error for StoreError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
if let PoolError::InvalidStoreId(e, _) = self {
if let StoreError::InvalidStoreId(e, _) = self {
return Some(e);
}
None
@@ -217,41 +217,44 @@ impl Error for PoolError {
/// pool structure being wrapped inside a lock.
pub trait PoolProvider {
/// Add new data to the pool. The provider should attempt to reserve a memory block with the
/// appropriate size and then copy the given data to the block. Yields a [PoolAddr] which can
/// appropriate size and then copy the given data to the block. Yields a [StoreAddr] which can
/// be used to access the data stored in the pool
fn add(&mut self, data: &[u8]) -> Result<PoolAddr, PoolError>;
fn add(&mut self, data: &[u8]) -> Result<StoreAddr, StoreError>;
/// The provider should attempt to reserve a free memory block with the appropriate size first.
/// It then executes a user-provided closure and passes a mutable reference to that memory
/// block to the closure. This allows the user to write data to the memory block.
/// The function should yield a [PoolAddr] which can be used to access the data stored in the
/// The function should yield a [StoreAddr] which can be used to access the data stored in the
/// pool.
fn free_element<W: FnMut(&mut [u8])>(
&mut self,
len: usize,
writer: W,
) -> Result<PoolAddr, PoolError>;
) -> Result<StoreAddr, StoreError>;
/// Modify data added previously using a given [PoolAddr]. The provider should use the store
/// Modify data added previously using a given [StoreAddr]. The provider should use the store
/// address to determine if a memory block exists for that address. If it does, it should
/// call the user-provided closure and pass a mutable reference to the memory block
/// to the closure. This allows the user to modify the memory block.
fn modify<U: FnMut(&mut [u8])>(&mut self, addr: &PoolAddr, updater: U)
-> Result<(), PoolError>;
fn modify<U: FnMut(&mut [u8])>(
&mut self,
addr: &StoreAddr,
updater: U,
) -> Result<(), StoreError>;
/// The provider should copy the data from the memory block to the user-provided buffer if
/// it exists.
fn read(&self, addr: &PoolAddr, buf: &mut [u8]) -> Result<usize, PoolError>;
fn read(&self, addr: &StoreAddr, buf: &mut [u8]) -> Result<usize, StoreError>;
/// Delete data inside the pool given a [PoolAddr].
fn delete(&mut self, addr: PoolAddr) -> Result<(), PoolError>;
fn has_element_at(&self, addr: &PoolAddr) -> Result<bool, PoolError>;
/// Delete data inside the pool given a [StoreAddr].
fn delete(&mut self, addr: StoreAddr) -> Result<(), StoreError>;
fn has_element_at(&self, addr: &StoreAddr) -> Result<bool, StoreError>;
/// Retrieve the length of the data at the given store address.
fn len_of_data(&self, addr: &PoolAddr) -> Result<usize, PoolError>;
fn len_of_data(&self, addr: &StoreAddr) -> Result<usize, StoreError>;
#[cfg(feature = "alloc")]
fn read_as_vec(&self, addr: &PoolAddr) -> Result<alloc::vec::Vec<u8>, PoolError> {
fn read_as_vec(&self, addr: &StoreAddr) -> Result<alloc::vec::Vec<u8>, StoreError> {
let mut vec = alloc::vec![0; self.len_of_data(addr)?];
self.read(addr, &mut vec)?;
Ok(vec)
@@ -268,7 +271,7 @@ pub trait PoolProviderWithGuards: PoolProvider {
/// This can prevent memory leaks. Users can read the data and release the guard
/// if the data in the store is valid for further processing. If the data is faulty, no
/// manual deletion is necessary when returning from a processing function prematurely.
fn read_with_guard(&mut self, addr: PoolAddr) -> PoolGuard<Self>;
fn read_with_guard(&mut self, addr: StoreAddr) -> PoolGuard<Self>;
/// This function behaves like [PoolProvider::modify], but consumes the provided
/// address and returns a RAII conformant guard object.
@@ -278,20 +281,20 @@ pub trait PoolProviderWithGuards: PoolProvider {
/// This can prevent memory leaks. Users can read (and modify) the data and release the guard
/// if the data in the store is valid for further processing. If the data is faulty, no
/// manual deletion is necessary when returning from a processing function prematurely.
fn modify_with_guard(&mut self, addr: PoolAddr) -> PoolRwGuard<Self>;
fn modify_with_guard(&mut self, addr: StoreAddr) -> PoolRwGuard<Self>;
}
pub struct PoolGuard<'a, MemProvider: PoolProvider + ?Sized> {
pool: &'a mut MemProvider,
pub addr: PoolAddr,
pub addr: StoreAddr,
no_deletion: bool,
deletion_failed_error: Option<PoolError>,
deletion_failed_error: Option<StoreError>,
}
/// This helper object can be used to safely access pool data without worrying about memory
/// leaks.
impl<'a, MemProvider: PoolProvider> PoolGuard<'a, MemProvider> {
pub fn new(pool: &'a mut MemProvider, addr: PoolAddr) -> Self {
pub fn new(pool: &'a mut MemProvider, addr: StoreAddr) -> Self {
Self {
pool,
addr,
@@ -300,12 +303,12 @@ impl<'a, MemProvider: PoolProvider> PoolGuard<'a, MemProvider> {
}
}
pub fn read(&self, buf: &mut [u8]) -> Result<usize, PoolError> {
pub fn read(&self, buf: &mut [u8]) -> Result<usize, StoreError> {
self.pool.read(&self.addr, buf)
}
#[cfg(feature = "alloc")]
pub fn read_as_vec(&self) -> Result<alloc::vec::Vec<u8>, PoolError> {
pub fn read_as_vec(&self) -> Result<alloc::vec::Vec<u8>, StoreError> {
self.pool.read_as_vec(&self.addr)
}
@@ -331,19 +334,19 @@ pub struct PoolRwGuard<'a, MemProvider: PoolProvider + ?Sized> {
}
impl<'a, MemProvider: PoolProvider> PoolRwGuard<'a, MemProvider> {
pub fn new(pool: &'a mut MemProvider, addr: PoolAddr) -> Self {
pub fn new(pool: &'a mut MemProvider, addr: StoreAddr) -> Self {
Self {
guard: PoolGuard::new(pool, addr),
}
}
pub fn update<U: FnMut(&mut [u8])>(&mut self, updater: &mut U) -> Result<(), PoolError> {
pub fn update<U: FnMut(&mut [u8])>(&mut self, updater: &mut U) -> Result<(), StoreError> {
self.guard.pool.modify(&self.guard.addr, updater)
}
delegate!(
to self.guard {
pub fn read(&self, buf: &mut [u8]) -> Result<usize, PoolError>;
pub fn read(&self, buf: &mut [u8]) -> Result<usize, StoreError>;
/// Releasing the pool guard will disable the automatic deletion of the data when the guard
/// is dropped.
pub fn release(&mut self);
@@ -354,7 +357,7 @@ impl<'a, MemProvider: PoolProvider> PoolRwGuard<'a, MemProvider> {
#[cfg(feature = "alloc")]
mod alloc_mod {
use super::{PoolGuard, PoolProvider, PoolProviderWithGuards, PoolRwGuard, StaticPoolAddr};
use crate::pool::{NumBlocks, PoolAddr, PoolError, StoreIdError};
use crate::pool::{NumBlocks, StoreAddr, StoreError, StoreIdError};
use alloc::vec;
use alloc::vec::Vec;
use spacepackets::ByteConversionError;
@@ -419,7 +422,7 @@ mod alloc_mod {
/// fitting subpool is full. This might be added in the future.
///
/// Transactions with the [pool][StaticMemoryPool] are done using a generic
/// [address][PoolAddr] type. Adding any data to the pool will yield a store address.
/// [address][StoreAddr] type. Adding any data to the pool will yield a store address.
/// Modification and read operations are done using a reference to a store address. Deletion
/// will consume the store address.
pub struct StaticMemoryPool {
@@ -449,41 +452,41 @@ mod alloc_mod {
local_pool
}
fn addr_check(&self, addr: &StaticPoolAddr) -> Result<usize, PoolError> {
fn addr_check(&self, addr: &StaticPoolAddr) -> Result<usize, StoreError> {
self.validate_addr(addr)?;
let pool_idx = addr.pool_idx as usize;
let size_list = self.sizes_lists.get(pool_idx).unwrap();
let curr_size = size_list[addr.packet_idx as usize];
if curr_size == STORE_FREE {
return Err(PoolError::DataDoesNotExist(PoolAddr::from(*addr)));
return Err(StoreError::DataDoesNotExist(StoreAddr::from(*addr)));
}
Ok(curr_size)
}
fn validate_addr(&self, addr: &StaticPoolAddr) -> Result<(), PoolError> {
fn validate_addr(&self, addr: &StaticPoolAddr) -> Result<(), StoreError> {
let pool_idx = addr.pool_idx as usize;
if pool_idx >= self.pool_cfg.cfg.len() {
return Err(PoolError::InvalidStoreId(
return Err(StoreError::InvalidStoreId(
StoreIdError::InvalidSubpool(addr.pool_idx),
Some(PoolAddr::from(*addr)),
Some(StoreAddr::from(*addr)),
));
}
if addr.packet_idx >= self.pool_cfg.cfg[addr.pool_idx as usize].0 {
return Err(PoolError::InvalidStoreId(
return Err(StoreError::InvalidStoreId(
StoreIdError::InvalidPacketIdx(addr.packet_idx),
Some(PoolAddr::from(*addr)),
Some(StoreAddr::from(*addr)),
));
}
Ok(())
}
fn reserve(&mut self, data_len: usize) -> Result<StaticPoolAddr, PoolError> {
fn reserve(&mut self, data_len: usize) -> Result<StaticPoolAddr, StoreError> {
let mut subpool_idx = self.find_subpool(data_len, 0)?;
if self.pool_cfg.spill_to_higher_subpools {
while let Err(PoolError::StoreFull(_)) = self.find_empty(subpool_idx) {
while let Err(StoreError::StoreFull(_)) = self.find_empty(subpool_idx) {
if (subpool_idx + 1) as usize == self.sizes_lists.len() {
return Err(PoolError::StoreFull(subpool_idx));
return Err(StoreError::StoreFull(subpool_idx));
}
subpool_idx += 1;
}
@@ -497,7 +500,7 @@ mod alloc_mod {
})
}
fn find_subpool(&self, req_size: usize, start_at_subpool: u16) -> Result<u16, PoolError> {
fn find_subpool(&self, req_size: usize, start_at_subpool: u16) -> Result<u16, StoreError> {
for (i, &(_, elem_size)) in self.pool_cfg.cfg.iter().enumerate() {
if i < start_at_subpool as usize {
continue;
@@ -506,21 +509,21 @@ mod alloc_mod {
return Ok(i as u16);
}
}
Err(PoolError::DataTooLarge(req_size))
Err(StoreError::DataTooLarge(req_size))
}
fn write(&mut self, addr: &StaticPoolAddr, data: &[u8]) -> Result<(), PoolError> {
let packet_pos = self.raw_pos(addr).ok_or(PoolError::InternalError(0))?;
fn write(&mut self, addr: &StaticPoolAddr, data: &[u8]) -> Result<(), StoreError> {
let packet_pos = self.raw_pos(addr).ok_or(StoreError::InternalError(0))?;
let subpool = self
.pool
.get_mut(addr.pool_idx as usize)
.ok_or(PoolError::InternalError(1))?;
.ok_or(StoreError::InternalError(1))?;
let pool_slice = &mut subpool[packet_pos..packet_pos + data.len()];
pool_slice.copy_from_slice(data);
Ok(())
}
fn find_empty(&mut self, subpool: u16) -> Result<(u16, &mut usize), PoolError> {
fn find_empty(&mut self, subpool: u16) -> Result<(u16, &mut usize), StoreError> {
if let Some(size_list) = self.sizes_lists.get_mut(subpool as usize) {
for (i, elem_size) in size_list.iter_mut().enumerate() {
if *elem_size == STORE_FREE {
@@ -528,12 +531,12 @@ mod alloc_mod {
}
}
} else {
return Err(PoolError::InvalidStoreId(
return Err(StoreError::InvalidStoreId(
StoreIdError::InvalidSubpool(subpool),
None,
));
}
Err(PoolError::StoreFull(subpool))
Err(StoreError::StoreFull(subpool))
}
fn raw_pos(&self, addr: &StaticPoolAddr) -> Option<usize> {
@@ -543,10 +546,10 @@ mod alloc_mod {
}
impl PoolProvider for StaticMemoryPool {
fn add(&mut self, data: &[u8]) -> Result<PoolAddr, PoolError> {
fn add(&mut self, data: &[u8]) -> Result<StoreAddr, StoreError> {
let data_len = data.len();
if data_len > POOL_MAX_SIZE {
return Err(PoolError::DataTooLarge(data_len));
return Err(StoreError::DataTooLarge(data_len));
}
let addr = self.reserve(data_len)?;
self.write(&addr, data)?;
@@ -557,9 +560,9 @@ mod alloc_mod {
&mut self,
len: usize,
mut writer: W,
) -> Result<PoolAddr, PoolError> {
) -> Result<StoreAddr, StoreError> {
if len > POOL_MAX_SIZE {
return Err(PoolError::DataTooLarge(len));
return Err(StoreError::DataTooLarge(len));
}
let addr = self.reserve(len)?;
let raw_pos = self.raw_pos(&addr).unwrap();
@@ -571,9 +574,9 @@ mod alloc_mod {
fn modify<U: FnMut(&mut [u8])>(
&mut self,
addr: &PoolAddr,
addr: &StoreAddr,
mut updater: U,
) -> Result<(), PoolError> {
) -> Result<(), StoreError> {
let addr = StaticPoolAddr::from(*addr);
let curr_size = self.addr_check(&addr)?;
let raw_pos = self.raw_pos(&addr).unwrap();
@@ -583,7 +586,7 @@ mod alloc_mod {
Ok(())
}
fn read(&self, addr: &PoolAddr, buf: &mut [u8]) -> Result<usize, PoolError> {
fn read(&self, addr: &StoreAddr, buf: &mut [u8]) -> Result<usize, StoreError> {
let addr = StaticPoolAddr::from(*addr);
let curr_size = self.addr_check(&addr)?;
if buf.len() < curr_size {
@@ -601,7 +604,7 @@ mod alloc_mod {
Ok(curr_size)
}
fn delete(&mut self, addr: PoolAddr) -> Result<(), PoolError> {
fn delete(&mut self, addr: StoreAddr) -> Result<(), StoreError> {
let addr = StaticPoolAddr::from(addr);
self.addr_check(&addr)?;
let block_size = self.pool_cfg.cfg.get(addr.pool_idx as usize).unwrap().1;
@@ -614,7 +617,7 @@ mod alloc_mod {
Ok(())
}
fn has_element_at(&self, addr: &PoolAddr) -> Result<bool, PoolError> {
fn has_element_at(&self, addr: &StoreAddr) -> Result<bool, StoreError> {
let addr = StaticPoolAddr::from(*addr);
self.validate_addr(&addr)?;
let pool_idx = addr.pool_idx as usize;
@@ -626,7 +629,7 @@ mod alloc_mod {
Ok(true)
}
fn len_of_data(&self, addr: &PoolAddr) -> Result<usize, PoolError> {
fn len_of_data(&self, addr: &StoreAddr) -> Result<usize, StoreError> {
let addr = StaticPoolAddr::from(*addr);
self.validate_addr(&addr)?;
let pool_idx = addr.pool_idx as usize;
@@ -640,11 +643,11 @@ mod alloc_mod {
}
impl PoolProviderWithGuards for StaticMemoryPool {
fn modify_with_guard(&mut self, addr: PoolAddr) -> PoolRwGuard<Self> {
fn modify_with_guard(&mut self, addr: StoreAddr) -> PoolRwGuard<Self> {
PoolRwGuard::new(self, addr)
}
fn read_with_guard(&mut self, addr: PoolAddr) -> PoolGuard<Self> {
fn read_with_guard(&mut self, addr: StoreAddr) -> PoolGuard<Self> {
PoolGuard::new(self, addr)
}
}
@@ -653,8 +656,8 @@ mod alloc_mod {
#[cfg(test)]
mod tests {
use crate::pool::{
PoolError, PoolGuard, PoolProvider, PoolProviderWithGuards, PoolRwGuard, StaticMemoryPool,
StaticPoolAddr, StaticPoolConfig, StoreIdError, POOL_MAX_SIZE,
PoolGuard, PoolProvider, PoolProviderWithGuards, PoolRwGuard, StaticMemoryPool,
StaticPoolAddr, StaticPoolConfig, StoreError, StoreIdError, POOL_MAX_SIZE,
};
use std::vec;
@@ -778,7 +781,7 @@ mod tests {
let res = local_pool.free_element(8, |_| {});
assert!(res.is_err());
let err = res.unwrap_err();
assert_eq!(err, PoolError::StoreFull(1));
assert_eq!(err, StoreError::StoreFull(1));
// Verify that the two deletions are successful
assert!(local_pool.delete(addr0).is_ok());
@@ -800,7 +803,7 @@ mod tests {
assert!(res.is_err());
assert!(matches!(
res.unwrap_err(),
PoolError::DataDoesNotExist { .. }
StoreError::DataDoesNotExist { .. }
));
}
@@ -813,8 +816,8 @@ mod tests {
let res = local_pool.add(&test_buf);
assert!(res.is_err());
let err = res.unwrap_err();
assert!(matches!(err, PoolError::StoreFull { .. }));
if let PoolError::StoreFull(subpool) = err {
assert!(matches!(err, StoreError::StoreFull { .. }));
if let StoreError::StoreFull(subpool) = err {
assert_eq!(subpool, 2);
}
}
@@ -832,7 +835,7 @@ mod tests {
let err = res.unwrap_err();
assert!(matches!(
err,
PoolError::InvalidStoreId(StoreIdError::InvalidSubpool(3), Some(_))
StoreError::InvalidStoreId(StoreIdError::InvalidSubpool(3), Some(_))
));
}
@@ -849,7 +852,7 @@ mod tests {
let err = res.unwrap_err();
assert!(matches!(
err,
PoolError::InvalidStoreId(StoreIdError::InvalidPacketIdx(1), Some(_))
StoreError::InvalidStoreId(StoreIdError::InvalidPacketIdx(1), Some(_))
));
}
@@ -860,7 +863,7 @@ mod tests {
let res = local_pool.add(&data_too_large);
assert!(res.is_err());
let err = res.unwrap_err();
assert_eq!(err, PoolError::DataTooLarge(20));
assert_eq!(err, StoreError::DataTooLarge(20));
}
#[test]
@@ -868,7 +871,10 @@ mod tests {
let mut local_pool = basic_small_pool();
let res = local_pool.free_element(POOL_MAX_SIZE + 1, |_| {});
assert!(res.is_err());
assert_eq!(res.unwrap_err(), PoolError::DataTooLarge(POOL_MAX_SIZE + 1));
assert_eq!(
res.unwrap_err(),
StoreError::DataTooLarge(POOL_MAX_SIZE + 1)
);
}
#[test]
@@ -877,7 +883,7 @@ mod tests {
// Try to request a slot which is too large
let res = local_pool.free_element(20, |_| {});
assert!(res.is_err());
assert_eq!(res.unwrap_err(), PoolError::DataTooLarge(20));
assert_eq!(res.unwrap_err(), StoreError::DataTooLarge(20));
}
#[test]
@@ -997,7 +1003,7 @@ mod tests {
let should_fail = local_pool.free_element(8, |_| {});
assert!(should_fail.is_err());
if let Err(err) = should_fail {
assert_eq!(err, PoolError::StoreFull(1));
assert_eq!(err, StoreError::StoreFull(1));
} else {
panic!("unexpected store address");
}
@@ -1028,7 +1034,7 @@ mod tests {
let should_fail = local_pool.free_element(8, |_| {});
assert!(should_fail.is_err());
if let Err(err) = should_fail {
assert_eq!(err, PoolError::StoreFull(2));
assert_eq!(err, StoreError::StoreFull(2));
} else {
panic!("unexpected store address");
}

View File

@@ -7,9 +7,11 @@ use crate::{
use satrs_shared::res_code::ResultU16;
#[cfg(feature = "std")]
#[cfg_attr(doc_cfg, doc(cfg(feature = "std")))]
pub use std_mod::*;
#[cfg(feature = "alloc")]
#[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))]
#[allow(unused_imports)]
pub use alloc_mod::*;
@@ -63,6 +65,7 @@ impl GenericActionReplyPus {
}
#[cfg(feature = "alloc")]
#[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))]
pub mod alloc_mod {
use crate::{
action::ActionRequest,
@@ -124,6 +127,7 @@ pub mod alloc_mod {
}
#[cfg(feature = "std")]
#[cfg_attr(doc_cfg, doc(cfg(feature = "std")))]
pub mod std_mod {
use std::sync::mpsc;

View File

@@ -13,8 +13,10 @@ use crate::pus::verification::TcStateToken;
use crate::pus::EcssTmSender;
use crate::pus::EcssTmtcError;
#[cfg(feature = "alloc")]
#[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))]
pub use alloc_mod::*;
#[cfg(feature = "heapless")]
#[cfg_attr(doc_cfg, doc(cfg(feature = "heapless")))]
pub use heapless_mod::*;
/// This trait allows the PUS event manager implementation to stay generic over various types
@@ -42,6 +44,7 @@ pub mod heapless_mod {
use crate::events::LargestEventRaw;
use core::marker::PhantomData;
#[cfg_attr(doc_cfg, doc(cfg(feature = "heapless")))]
// TODO: After a new version of heapless is released which uses hash32 version 0.3, try using
// regular Event type again.
#[derive(Default)]
@@ -254,8 +257,9 @@ pub mod alloc_mod {
#[cfg(test)]
mod tests {
use super::*;
use crate::events::SeverityInfo;
use crate::pus::PacketAsVec;
use crate::request::UniqueApidTargetId;
use crate::{events::SeverityInfo, tmtc::PacketAsVec};
use std::sync::mpsc::{self, TryRecvError};
const INFO_EVENT: EventU32TypedSev<SeverityInfo> =

View File

@@ -213,13 +213,9 @@ mod tests {
.expect("acceptance success failure")
}
fn send_tc(&self, token: &VerificationToken<TcStateAccepted>, tc: &PusTcCreator) {
self.common
.send_tc(self.handler.service_helper.id(), token, tc);
}
delegate! {
to self.common {
fn send_tc(&self, token: &VerificationToken<TcStateAccepted>, tc: &PusTcCreator);
fn read_next_tm(&mut self) -> PusTmReader<'_>;
fn check_no_tm_available(&self) -> bool;
fn check_next_verification_tm(&self, subservice: u8, expected_request_id: RequestId);

View File

@@ -2,13 +2,10 @@
//!
//! This module contains structures to make working with the PUS C standard easier.
//! The satrs-example application contains various usage examples of these components.
use crate::pool::{PoolAddr, PoolError};
use crate::pool::{StoreAddr, StoreError};
use crate::pus::verification::{TcStateAccepted, TcStateToken, VerificationToken};
use crate::queue::{GenericReceiveError, GenericSendError};
use crate::request::{GenericMessage, MessageMetadata, RequestId};
#[cfg(feature = "alloc")]
use crate::tmtc::PacketAsVec;
use crate::tmtc::PacketInPool;
use crate::ComponentId;
use core::fmt::{Display, Formatter};
use core::time::Duration;
@@ -47,12 +44,12 @@ use self::verification::VerificationReportingProvider;
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum PusTmVariant<'time, 'src_data> {
InStore(PoolAddr),
InStore(StoreAddr),
Direct(PusTmCreator<'time, 'src_data>),
}
impl From<PoolAddr> for PusTmVariant<'_, '_> {
fn from(value: PoolAddr) -> Self {
impl From<StoreAddr> for PusTmVariant<'_, '_> {
fn from(value: StoreAddr) -> Self {
Self::InStore(value)
}
}
@@ -65,10 +62,10 @@ impl<'time, 'src_data> From<PusTmCreator<'time, 'src_data>> for PusTmVariant<'ti
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum EcssTmtcError {
Store(PoolError),
Store(StoreError),
ByteConversion(ByteConversionError),
Pus(PusError),
CantSendAddr(PoolAddr),
CantSendAddr(StoreAddr),
CantSendDirectTm,
Send(GenericSendError),
Receive(GenericReceiveError),
@@ -102,8 +99,8 @@ impl Display for EcssTmtcError {
}
}
impl From<PoolError> for EcssTmtcError {
fn from(value: PoolError) -> Self {
impl From<StoreError> for EcssTmtcError {
fn from(value: StoreError) -> Self {
Self::Store(value)
}
}
@@ -178,26 +175,26 @@ impl EcssTmSender for EcssTmDummySender {
}
}
/// A PUS telecommand packet can be stored in memory and sent using different methods. Right now,
/// A PUS telecommand packet can be stored in memory using different methods. Right now,
/// storage inside a pool structure like [crate::pool::StaticMemoryPool], and storage inside a
/// `Vec<u8>` are supported.
#[non_exhaustive]
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TcInMemory {
Pool(PacketInPool),
StoreAddr(StoreAddr),
#[cfg(feature = "alloc")]
Vec(PacketAsVec),
Vec(alloc::vec::Vec<u8>),
}
impl From<PacketInPool> for TcInMemory {
fn from(value: PacketInPool) -> Self {
Self::Pool(value)
impl From<StoreAddr> for TcInMemory {
fn from(value: StoreAddr) -> Self {
Self::StoreAddr(value)
}
}
#[cfg(feature = "alloc")]
impl From<PacketAsVec> for TcInMemory {
fn from(value: PacketAsVec) -> Self {
impl From<alloc::vec::Vec<u8>> for TcInMemory {
fn from(value: alloc::vec::Vec<u8>) -> Self {
Self::Vec(value)
}
}
@@ -265,8 +262,8 @@ impl From<PusError> for TryRecvTmtcError {
}
}
impl From<PoolError> for TryRecvTmtcError {
fn from(value: PoolError) -> Self {
impl From<StoreError> for TryRecvTmtcError {
fn from(value: StoreError) -> Self {
Self::Tmtc(value.into())
}
}
@@ -368,6 +365,7 @@ pub mod alloc_mod {
/// [DynClone] allows cloning the trait object as long as the boxed object implements
/// [Clone].
#[cfg(feature = "alloc")]
#[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))]
pub trait EcssTmSenderExt: EcssTmSender + Downcast + DynClone {
// Remove this once trait upcasting coercion has been implemented.
// Tracking issue: https://github.com/rust-lang/rust/issues/65991
@@ -408,6 +406,7 @@ pub mod alloc_mod {
/// [DynClone] allows cloning the trait object as long as the boxed object implements
/// [Clone].
#[cfg(feature = "alloc")]
#[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))]
pub trait EcssTcSenderExt: EcssTcSender + Downcast + DynClone {}
/// Blanket implementation for all types which implement [EcssTcSender] and are clonable.
@@ -427,6 +426,7 @@ pub mod alloc_mod {
/// [DynClone] allows cloning the trait object as long as the boxed object implements
/// [Clone].
#[cfg(feature = "alloc")]
#[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))]
pub trait EcssTcReceiverExt: EcssTcReceiver + Downcast {}
/// Blanket implementation for all types which implement [EcssTcReceiver] and are clonable.
@@ -548,6 +548,7 @@ pub mod alloc_mod {
>
{
#[cfg(feature = "std")]
#[cfg_attr(doc_cfg, doc(cfg(feature = "std")))]
pub fn new_from_now(
active_request_map: ActiveRequestMap,
fail_data_buf_size: usize,
@@ -634,6 +635,7 @@ pub mod alloc_mod {
/// Update the current time used for timeout checks based on the current OS time.
#[cfg(feature = "std")]
#[cfg_attr(doc_cfg, doc(cfg(feature = "std")))]
pub fn update_time_from_now(&mut self) -> Result<(), std::time::SystemTimeError> {
self.current_time = UnixTimestamp::from_now()?;
Ok(())
@@ -648,16 +650,17 @@ pub mod alloc_mod {
}
#[cfg(feature = "std")]
#[cfg_attr(doc_cfg, doc(cfg(feature = "std")))]
pub mod std_mod {
use crate::pool::{
PoolAddr, PoolError, PoolProvider, PoolProviderWithGuards, SharedStaticMemoryPool,
PoolProvider, PoolProviderWithGuards, SharedStaticMemoryPool, StoreAddr, StoreError,
};
use crate::pus::verification::{TcStateAccepted, VerificationToken};
use crate::pus::{
EcssTcAndToken, EcssTcReceiver, EcssTmSender, EcssTmtcError, GenericReceiveError,
GenericSendError, PusTmVariant, TryRecvTmtcError,
};
use crate::tmtc::{PacketAsVec, PacketSenderWithSharedPool};
use crate::tmtc::PacketSenderWithSharedPool;
use crate::ComponentId;
use alloc::vec::Vec;
use core::time::Duration;
@@ -675,10 +678,24 @@ pub mod std_mod {
use super::verification::{TcStateToken, VerificationReportingProvider};
use super::{AcceptedEcssTcAndToken, ActiveRequestProvider, TcInMemory};
use crate::tmtc::PacketInPool;
impl From<mpsc::SendError<PoolAddr>> for EcssTmtcError {
fn from(_: mpsc::SendError<PoolAddr>) -> Self {
#[derive(Debug)]
pub struct PacketInPool {
pub sender_id: ComponentId,
pub store_addr: StoreAddr,
}
impl PacketInPool {
pub fn new(sender_id: ComponentId, store_addr: StoreAddr) -> Self {
Self {
sender_id,
store_addr,
}
}
}
impl From<mpsc::SendError<StoreAddr>> for EcssTmtcError {
fn from(_: mpsc::SendError<StoreAddr>) -> Self {
Self::Send(GenericSendError::RxDisconnected)
}
}
@@ -713,6 +730,18 @@ pub mod std_mod {
}
}
#[derive(Debug)]
pub struct PacketAsVec {
pub sender_id: ComponentId,
pub packet: Vec<u8>,
}
impl PacketAsVec {
pub fn new(sender_id: ComponentId, packet: Vec<u8>) -> Self {
Self { sender_id, packet }
}
}
pub type MpscTmAsVecSender = mpsc::Sender<PacketAsVec>;
impl EcssTmSender for MpscTmAsVecSender {
@@ -765,14 +794,14 @@ pub mod std_mod {
use super::*;
use crossbeam_channel as cb;
impl From<cb::SendError<PoolAddr>> for EcssTmtcError {
fn from(_: cb::SendError<PoolAddr>) -> Self {
impl From<cb::SendError<StoreAddr>> for EcssTmtcError {
fn from(_: cb::SendError<StoreAddr>) -> Self {
Self::Send(GenericSendError::RxDisconnected)
}
}
impl From<cb::TrySendError<PoolAddr>> for EcssTmtcError {
fn from(value: cb::TrySendError<PoolAddr>) -> Self {
impl From<cb::TrySendError<StoreAddr>> for EcssTmtcError {
fn from(value: cb::TrySendError<StoreAddr>) -> Self {
match value {
cb::TrySendError::Full(_) => Self::Send(GenericSendError::QueueFull(None)),
cb::TrySendError::Disconnected(_) => {
@@ -948,8 +977,6 @@ pub mod std_mod {
fn tc_slice_raw(&self) -> &[u8];
fn sender_id(&self) -> Option<ComponentId>;
fn cache_and_convert(
&mut self,
possible_packet: &TcInMemory,
@@ -972,7 +999,6 @@ pub mod std_mod {
/// [SharedStaticMemoryPool].
#[derive(Default, Clone)]
pub struct EcssTcInVecConverter {
sender_id: Option<ComponentId>,
pub pus_tc_raw: Option<Vec<u8>>,
}
@@ -980,21 +1006,16 @@ pub mod std_mod {
fn cache(&mut self, tc_in_memory: &TcInMemory) -> Result<(), PusTcFromMemError> {
self.pus_tc_raw = None;
match tc_in_memory {
super::TcInMemory::Pool(_packet_in_pool) => {
super::TcInMemory::StoreAddr(_) => {
return Err(PusTcFromMemError::InvalidFormat(tc_in_memory.clone()));
}
super::TcInMemory::Vec(packet_with_sender) => {
self.pus_tc_raw = Some(packet_with_sender.packet.clone());
self.sender_id = Some(packet_with_sender.sender_id);
super::TcInMemory::Vec(vec) => {
self.pus_tc_raw = Some(vec.clone());
}
};
Ok(())
}
fn sender_id(&self) -> Option<ComponentId> {
self.sender_id
}
fn tc_slice_raw(&self) -> &[u8] {
if self.pus_tc_raw.is_none() {
return &[];
@@ -1008,7 +1029,6 @@ pub mod std_mod {
/// packets should be avoided. Please note that this structure is not able to convert TCs which
/// are stored as a `Vec<u8>`.
pub struct EcssTcInSharedStoreConverter {
sender_id: Option<ComponentId>,
shared_tc_store: SharedStaticMemoryPool,
pus_buf: Vec<u8>,
}
@@ -1016,16 +1036,15 @@ pub mod std_mod {
impl EcssTcInSharedStoreConverter {
pub fn new(shared_tc_store: SharedStaticMemoryPool, max_expected_tc_size: usize) -> Self {
Self {
sender_id: None,
shared_tc_store,
pus_buf: alloc::vec![0; max_expected_tc_size],
}
}
pub fn copy_tc_to_buf(&mut self, addr: PoolAddr) -> Result<(), PusTcFromMemError> {
pub fn copy_tc_to_buf(&mut self, addr: StoreAddr) -> Result<(), PusTcFromMemError> {
// Keep locked section as short as possible.
let mut tc_pool = self.shared_tc_store.write().map_err(|_| {
PusTcFromMemError::EcssTmtc(EcssTmtcError::Store(PoolError::LockError))
PusTcFromMemError::EcssTmtc(EcssTmtcError::Store(StoreError::LockError))
})?;
let tc_size = tc_pool.len_of_data(&addr).map_err(EcssTmtcError::Store)?;
if tc_size > self.pus_buf.len() {
@@ -1047,9 +1066,8 @@ pub mod std_mod {
impl EcssTcInMemConverter for EcssTcInSharedStoreConverter {
fn cache(&mut self, tc_in_memory: &TcInMemory) -> Result<(), PusTcFromMemError> {
match tc_in_memory {
super::TcInMemory::Pool(packet_in_pool) => {
self.copy_tc_to_buf(packet_in_pool.store_addr)?;
self.sender_id = Some(packet_in_pool.sender_id);
super::TcInMemory::StoreAddr(addr) => {
self.copy_tc_to_buf(*addr)?;
}
super::TcInMemory::Vec(_) => {
return Err(PusTcFromMemError::InvalidFormat(tc_in_memory.clone()));
@@ -1061,10 +1079,6 @@ pub mod std_mod {
fn tc_slice_raw(&self) -> &[u8] {
self.pus_buf.as_ref()
}
fn sender_id(&self) -> Option<ComponentId> {
self.sender_id
}
}
pub struct PusServiceBase<
@@ -1266,7 +1280,7 @@ pub mod tests {
use crate::pool::{PoolProvider, SharedStaticMemoryPool, StaticMemoryPool, StaticPoolConfig};
use crate::pus::verification::{RequestId, VerificationReporter};
use crate::tmtc::{PacketAsVec, PacketInPool, PacketSenderWithSharedPool, SharedPacketPool};
use crate::tmtc::{PacketSenderWithSharedPool, SharedPacketPool};
use crate::ComponentId;
use super::test_util::{TEST_APID, TEST_COMPONENT_ID_0};
@@ -1375,12 +1389,7 @@ pub mod tests {
),
)
}
pub fn send_tc(
&self,
sender_id: ComponentId,
token: &VerificationToken<TcStateAccepted>,
tc: &PusTcCreator,
) {
pub fn send_tc(&self, token: &VerificationToken<TcStateAccepted>, tc: &PusTcCreator) {
let mut mut_buf = self.pus_buf.borrow_mut();
let tc_size = tc.write_to_bytes(mut_buf.as_mut_slice()).unwrap();
let mut tc_pool = self.tc_pool.write().unwrap();
@@ -1388,10 +1397,7 @@ pub mod tests {
drop(tc_pool);
// Send accepted TC to test service handler.
self.tc_sender
.send(EcssTcAndToken::new(
PacketInPool::new(sender_id, addr),
*token,
))
.send(EcssTcAndToken::new(addr, *token))
.expect("sending tc failed");
}
@@ -1505,19 +1511,11 @@ pub mod tests {
}
impl PusServiceHandlerWithVecCommon {
pub fn send_tc(
&self,
sender_id: ComponentId,
token: &VerificationToken<TcStateAccepted>,
tc: &PusTcCreator,
) {
pub fn send_tc(&self, token: &VerificationToken<TcStateAccepted>, tc: &PusTcCreator) {
// Send accepted TC to test service handler.
self.tc_sender
.send(EcssTcAndToken::new(
TcInMemory::Vec(PacketAsVec::new(
sender_id,
tc.to_vec().expect("pus tc conversion to vec failed"),
)),
TcInMemory::Vec(tc.to_vec().expect("pus tc conversion to vec failed")),
*token,
))
.expect("sending tc failed");

View File

@@ -26,9 +26,11 @@ pub enum Subservice {
}
#[cfg(feature = "alloc")]
#[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))]
pub mod alloc_mod {}
#[cfg(feature = "alloc")]
#[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))]
pub mod std_mod {}
#[cfg(test)]

View File

@@ -14,7 +14,7 @@ use spacepackets::{ByteConversionError, CcsdsPacket};
#[cfg(feature = "std")]
use std::error::Error;
use crate::pool::{PoolError, PoolProvider};
use crate::pool::{PoolProvider, StoreError};
#[cfg(feature = "alloc")]
pub use alloc_mod::*;
@@ -151,7 +151,7 @@ pub enum ScheduleError {
},
/// Nested time-tagged commands are not allowed.
NestedScheduledTc,
StoreError(PoolError),
StoreError(StoreError),
TcDataEmpty,
TimestampError(TimestampError),
WrongSubservice(u8),
@@ -206,8 +206,8 @@ impl From<PusError> for ScheduleError {
}
}
impl From<PoolError> for ScheduleError {
fn from(e: PoolError) -> Self {
impl From<StoreError> for ScheduleError {
fn from(e: StoreError) -> Self {
Self::StoreError(e)
}
}
@@ -240,7 +240,7 @@ impl Error for ScheduleError {
pub trait PusSchedulerProvider {
type TimeProvider: CcsdsTimeProvider + TimeReader;
fn reset(&mut self, store: &mut (impl PoolProvider + ?Sized)) -> Result<(), PoolError>;
fn reset(&mut self, store: &mut (impl PoolProvider + ?Sized)) -> Result<(), StoreError>;
fn is_enabled(&self) -> bool;
@@ -347,7 +347,7 @@ pub mod alloc_mod {
};
use spacepackets::time::cds::{self, DaysLen24Bits};
use crate::pool::PoolAddr;
use crate::pool::StoreAddr;
use super::*;
@@ -368,8 +368,8 @@ pub mod alloc_mod {
}
enum DeletionResult {
WithoutStoreDeletion(Option<PoolAddr>),
WithStoreDeletion(Result<bool, PoolError>),
WithoutStoreDeletion(Option<StoreAddr>),
WithStoreDeletion(Result<bool, StoreError>),
}
/// This is the core data structure for scheduling PUS telecommands with [alloc] support.
@@ -423,6 +423,7 @@ pub mod alloc_mod {
/// Like [Self::new], but sets the `init_current_time` parameter to the current system time.
#[cfg(feature = "std")]
#[cfg_attr(doc_cfg, doc(cfg(feature = "std")))]
pub fn new_with_current_init_time(time_margin: Duration) -> Result<Self, SystemTimeError> {
Ok(Self::new(UnixTime::now()?, time_margin))
}
@@ -524,7 +525,7 @@ pub mod alloc_mod {
&mut self,
time_window: TimeWindow<TimeProvider>,
pool: &mut (impl PoolProvider + ?Sized),
) -> Result<u64, (u64, PoolError)> {
) -> Result<u64, (u64, StoreError)> {
let range = self.retrieve_by_time_filter(time_window);
let mut del_packets = 0;
let mut res_if_fails = None;
@@ -554,7 +555,7 @@ pub mod alloc_mod {
pub fn delete_all(
&mut self,
pool: &mut (impl PoolProvider + ?Sized),
) -> Result<u64, (u64, PoolError)> {
) -> Result<u64, (u64, StoreError)> {
self.delete_by_time_filter(TimeWindow::<cds::CdsTime>::new_select_all(), pool)
}
@@ -600,7 +601,7 @@ pub mod alloc_mod {
/// Please note that this function will stop on the first telecommand with a request ID match.
/// In case of duplicate IDs (which should generally not happen), this function needs to be
/// called repeatedly.
pub fn delete_by_request_id(&mut self, req_id: &RequestId) -> Option<PoolAddr> {
pub fn delete_by_request_id(&mut self, req_id: &RequestId) -> Option<StoreAddr> {
if let DeletionResult::WithoutStoreDeletion(v) =
self.delete_by_request_id_internal_without_store_deletion(req_id)
{
@@ -614,7 +615,7 @@ pub mod alloc_mod {
&mut self,
req_id: &RequestId,
pool: &mut (impl PoolProvider + ?Sized),
) -> Result<bool, PoolError> {
) -> Result<bool, StoreError> {
if let DeletionResult::WithStoreDeletion(v) =
self.delete_by_request_id_internal_with_store_deletion(req_id, pool)
{
@@ -666,6 +667,7 @@ pub mod alloc_mod {
}
#[cfg(feature = "std")]
#[cfg_attr(doc_cfg, doc(cfg(feature = "std")))]
pub fn update_time_from_now(&mut self) -> Result<(), SystemTimeError> {
self.current_time = UnixTime::now()?;
Ok(())
@@ -691,7 +693,7 @@ pub mod alloc_mod {
releaser: R,
tc_store: &mut (impl PoolProvider + ?Sized),
tc_buf: &mut [u8],
) -> Result<u64, (u64, PoolError)> {
) -> Result<u64, (u64, StoreError)> {
self.release_telecommands_internal(releaser, tc_store, Some(tc_buf))
}
@@ -705,7 +707,7 @@ pub mod alloc_mod {
&mut self,
releaser: R,
tc_store: &mut (impl PoolProvider + ?Sized),
) -> Result<u64, (u64, PoolError)> {
) -> Result<u64, (u64, StoreError)> {
self.release_telecommands_internal(releaser, tc_store, None)
}
@@ -714,7 +716,7 @@ pub mod alloc_mod {
mut releaser: R,
tc_store: &mut (impl PoolProvider + ?Sized),
mut tc_buf: Option<&mut [u8]>,
) -> Result<u64, (u64, PoolError)> {
) -> Result<u64, (u64, StoreError)> {
let tcs_to_release = self.telecommands_to_release();
let mut released_tcs = 0;
let mut store_error = Ok(());
@@ -760,7 +762,7 @@ pub mod alloc_mod {
mut releaser: R,
tc_store: &(impl PoolProvider + ?Sized),
tc_buf: &mut [u8],
) -> Result<alloc::vec::Vec<TcInfo>, (alloc::vec::Vec<TcInfo>, PoolError)> {
) -> Result<alloc::vec::Vec<TcInfo>, (alloc::vec::Vec<TcInfo>, StoreError)> {
let tcs_to_release = self.telecommands_to_release();
let mut released_tcs = alloc::vec::Vec::new();
for tc in tcs_to_release {
@@ -791,7 +793,7 @@ pub mod alloc_mod {
/// The holding store for the telecommands needs to be passed so all the stored telecommands
/// can be deleted to avoid a memory leak. If at last one deletion operation fails, the error
/// will be returned but the method will still try to delete all the commands in the schedule.
fn reset(&mut self, store: &mut (impl PoolProvider + ?Sized)) -> Result<(), PoolError> {
fn reset(&mut self, store: &mut (impl PoolProvider + ?Sized)) -> Result<(), StoreError> {
self.enabled = false;
let mut deletion_ok = Ok(());
for tc_lists in &mut self.tc_map {
@@ -849,7 +851,7 @@ pub mod alloc_mod {
mod tests {
use super::*;
use crate::pool::{
PoolAddr, PoolError, PoolProvider, StaticMemoryPool, StaticPoolAddr, StaticPoolConfig,
PoolProvider, StaticMemoryPool, StaticPoolAddr, StaticPoolConfig, StoreAddr, StoreError,
};
use alloc::collections::btree_map::Range;
use spacepackets::ecss::tc::{PusTcCreator, PusTcReader, PusTcSecondaryHeader};
@@ -988,7 +990,7 @@ mod tests {
.insert_unwrapped_and_stored_tc(
UnixTime::new_only_secs(100),
TcInfo::new(
PoolAddr::from(StaticPoolAddr {
StoreAddr::from(StaticPoolAddr {
pool_idx: 0,
packet_idx: 1,
}),
@@ -1005,7 +1007,7 @@ mod tests {
.insert_unwrapped_and_stored_tc(
UnixTime::new_only_secs(100),
TcInfo::new(
PoolAddr::from(StaticPoolAddr {
StoreAddr::from(StaticPoolAddr {
pool_idx: 0,
packet_idx: 2,
}),
@@ -1049,8 +1051,8 @@ mod tests {
fn common_check(
enabled: bool,
store_addr: &PoolAddr,
expected_store_addrs: Vec<PoolAddr>,
store_addr: &StoreAddr,
expected_store_addrs: Vec<StoreAddr>,
counter: &mut usize,
) {
assert!(enabled);
@@ -1059,8 +1061,8 @@ mod tests {
}
fn common_check_disabled(
enabled: bool,
store_addr: &PoolAddr,
expected_store_addrs: Vec<PoolAddr>,
store_addr: &StoreAddr,
expected_store_addrs: Vec<StoreAddr>,
counter: &mut usize,
) {
assert!(!enabled);
@@ -1514,7 +1516,7 @@ mod tests {
// TC could not even be read..
assert_eq!(err.0, 0);
match err.1 {
PoolError::DataDoesNotExist(addr) => {
StoreError::DataDoesNotExist(addr) => {
assert_eq!(tc_info_0.addr(), addr);
}
_ => panic!("unexpected error {}", err.1),
@@ -1537,7 +1539,7 @@ mod tests {
assert!(reset_res.is_err());
let err = reset_res.unwrap_err();
match err {
PoolError::DataDoesNotExist(addr) => {
StoreError::DataDoesNotExist(addr) => {
assert_eq!(addr, tc_info_0.addr());
}
_ => panic!("unexpected error {err}"),
@@ -1639,7 +1641,7 @@ mod tests {
let err = insert_res.unwrap_err();
match err {
ScheduleError::StoreError(e) => match e {
PoolError::StoreFull(_) => {}
StoreError::StoreFull(_) => {}
_ => panic!("unexpected store error {e}"),
},
_ => panic!("unexpected error {err}"),

View File

@@ -2,11 +2,11 @@ use super::scheduler::PusSchedulerProvider;
use super::verification::{VerificationReporter, VerificationReportingProvider};
use super::{
EcssTcInMemConverter, EcssTcInSharedStoreConverter, EcssTcInVecConverter, EcssTcReceiver,
EcssTmSender, MpscTcReceiver, PusServiceHelper,
EcssTmSender, MpscTcReceiver, PacketAsVec, PusServiceHelper,
};
use crate::pool::PoolProvider;
use crate::pus::{PusPacketHandlerResult, PusPacketHandlingError};
use crate::tmtc::{PacketAsVec, PacketSenderWithSharedPool};
use crate::tmtc::PacketSenderWithSharedPool;
use alloc::string::ToString;
use spacepackets::ecss::{scheduling, PusPacket};
use spacepackets::time::cds::CdsTime;
@@ -315,13 +315,9 @@ mod tests {
.expect("acceptance success failure")
}
fn send_tc(&self, token: &VerificationToken<TcStateAccepted>, tc: &PusTcCreator) {
self.common
.send_tc(self.handler.service_helper.id(), token, tc);
}
delegate! {
to self.common {
fn send_tc(&self, token: &VerificationToken<TcStateAccepted>, tc: &PusTcCreator);
fn read_next_tm(&mut self) -> PusTmReader<'_>;
fn check_no_tm_available(&self) -> bool;
fn check_next_verification_tm(&self, subservice: u8, expected_request_id: RequestId);
@@ -344,7 +340,7 @@ mod tests {
fn reset(
&mut self,
_store: &mut (impl crate::pool::PoolProvider + ?Sized),
) -> Result<(), crate::pool::PoolError> {
) -> Result<(), crate::pool::StoreError> {
self.reset_count += 1;
Ok(())
}

View File

@@ -1,7 +1,8 @@
use crate::pus::{
PartialPusHandlingError, PusPacketHandlerResult, PusPacketHandlingError, PusTmVariant,
PacketAsVec, PartialPusHandlingError, PusPacketHandlerResult, PusPacketHandlingError,
PusTmVariant,
};
use crate::tmtc::{PacketAsVec, PacketSenderWithSharedPool};
use crate::tmtc::PacketSenderWithSharedPool;
use spacepackets::ecss::tm::{PusTmCreator, PusTmSecondaryHeader};
use spacepackets::ecss::PusPacket;
use spacepackets::SpHeader;
@@ -203,14 +204,10 @@ mod tests {
.expect("acceptance success failure")
}
fn send_tc(&self, token: &VerificationToken<TcStateAccepted>, tc: &PusTcCreator) {
self.common
.send_tc(self.handler.service_helper.id(), token, tc);
}
delegate! {
to self.common {
fn read_next_tm(&mut self) -> PusTmReader<'_>;
fn send_tc(&self, token: &VerificationToken<TcStateAccepted>, tc: &PusTcCreator);
fn check_no_tm_available(&self) -> bool;
fn check_next_verification_tm(
&self,
@@ -258,13 +255,9 @@ mod tests {
.expect("acceptance success failure")
}
fn send_tc(&self, token: &VerificationToken<TcStateAccepted>, tc: &PusTcCreator) {
self.common
.send_tc(self.handler.service_helper.id(), token, tc);
}
delegate! {
to self.common {
fn send_tc(&self, token: &VerificationToken<TcStateAccepted>, tc: &PusTcCreator);
fn read_next_tm(&mut self) -> PusTmReader<'_>;
fn check_no_tm_available(&self) -> bool;
fn check_next_verification_tm(

View File

@@ -98,11 +98,18 @@ pub use crate::seq_count::SeqCountProviderSimple;
pub use spacepackets::ecss::verification::*;
#[cfg(feature = "alloc")]
#[cfg_attr(feature = "doc_cfg", doc(cfg(feature = "alloc")))]
pub use alloc_mod::*;
use crate::request::Apid;
use crate::ComponentId;
/*
#[cfg(feature = "std")]
#[cfg_attr(feature = "doc_cfg", doc(cfg(feature = "std")))]
pub use std_mod::*;
*/
/// This is a request identifier as specified in 5.4.11.2 c. of the PUS standard.
///
/// This field equivalent to the first two bytes of the CCSDS space packet header.

View File

@@ -7,19 +7,21 @@
//! all received telecommands are sent to a special handler object called TC source. Using
//! a design like this makes it simpler to add new TC packet sources or new telemetry generators:
//! They only need to send the received and generated data to these objects.
#[cfg(feature = "std")]
use crate::queue::GenericSendError;
use crate::{
pool::{PoolAddr, PoolError},
pool::{PoolProvider, StoreAddr, StoreError},
pus::PacketAsVec,
ComponentId,
};
#[cfg(feature = "std")]
pub use alloc_mod::*;
use core::cell::RefCell;
#[cfg(feature = "alloc")]
use downcast_rs::{impl_downcast, Downcast};
use spacepackets::{
ecss::{
tc::PusTcReader,
tm::{PusTmCreator, PusTmReader},
WritablePusPacket,
},
SpHeader,
};
@@ -30,23 +32,6 @@ pub use std_mod::*;
pub mod tm_helper;
/// Simple type modelling packet stored inside a pool structure. This structure is intended to
/// be used when sending a packet via a message queue, so it also contains the sender ID.
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct PacketInPool {
pub sender_id: ComponentId,
pub store_addr: PoolAddr,
}
impl PacketInPool {
pub fn new(sender_id: ComponentId, store_addr: PoolAddr) -> Self {
Self {
sender_id,
store_addr,
}
}
}
/// Generic trait for object which can send any packets in form of a raw bytestream, with
/// no assumptions about the received protocol.
pub trait PacketSenderRaw: Send {
@@ -54,6 +39,29 @@ pub trait PacketSenderRaw: Send {
fn send_packet(&self, sender_id: ComponentId, packet: &[u8]) -> Result<(), Self::Error>;
}
#[cfg(feature = "std")]
impl PacketSenderRaw for mpsc::Sender<PacketAsVec> {
type Error = GenericSendError;
fn send_packet(&self, sender_id: ComponentId, packet: &[u8]) -> Result<(), Self::Error> {
self.send(PacketAsVec::new(sender_id, packet.to_vec()))
.map_err(|_| GenericSendError::RxDisconnected)
}
}
#[cfg(feature = "std")]
impl PacketSenderRaw for mpsc::SyncSender<PacketAsVec> {
type Error = GenericSendError;
fn send_packet(&self, sender_id: ComponentId, tc_raw: &[u8]) -> Result<(), Self::Error> {
self.try_send(PacketAsVec::new(sender_id, tc_raw.to_vec()))
.map_err(|e| match e {
mpsc::TrySendError::Full(_) => GenericSendError::QueueFull(None),
mpsc::TrySendError::Disconnected(_) => GenericSendError::RxDisconnected,
})
}
}
/// Extension trait of [PacketSenderRaw] which allows downcasting by implementing [Downcast].
#[cfg(feature = "alloc")]
pub trait PacketSenderRawExt: PacketSenderRaw + Downcast {
@@ -169,24 +177,75 @@ where
}
}
/// Newtype wrapper around the [SharedStaticMemoryPool] to enable extension helper traits on
/// top of the regular shared memory pool API.
#[derive(Clone)]
pub struct SharedPacketPool(pub SharedStaticMemoryPool);
impl SharedPacketPool {
pub fn new(pool: &SharedStaticMemoryPool) -> Self {
Self(pool.clone())
}
}
/// Helper trait for any generic (static) store which allows storing raw or CCSDS packets.
pub trait CcsdsPacketPool {
fn add_ccsds_tc(&mut self, _: &SpHeader, tc_raw: &[u8]) -> Result<PoolAddr, PoolError> {
fn add_ccsds_tc(&mut self, _: &SpHeader, tc_raw: &[u8]) -> Result<StoreAddr, StoreError> {
self.add_raw_tc(tc_raw)
}
fn add_raw_tc(&mut self, tc_raw: &[u8]) -> Result<PoolAddr, PoolError>;
fn add_raw_tc(&mut self, tc_raw: &[u8]) -> Result<StoreAddr, StoreError>;
}
/// Helper trait for any generic (static) store which allows storing ECSS PUS Telecommand packets.
pub trait PusTcPool {
fn add_pus_tc(&mut self, pus_tc: &PusTcReader) -> Result<PoolAddr, PoolError>;
fn add_pus_tc(&mut self, pus_tc: &PusTcReader) -> Result<StoreAddr, StoreError>;
}
/// Helper trait for any generic (static) store which allows storing ECSS PUS Telemetry packets.
pub trait PusTmPool {
fn add_pus_tm_from_reader(&mut self, pus_tm: &PusTmReader) -> Result<PoolAddr, PoolError>;
fn add_pus_tm_from_creator(&mut self, pus_tm: &PusTmCreator) -> Result<PoolAddr, PoolError>;
fn add_pus_tm_from_reader(&mut self, pus_tm: &PusTmReader) -> Result<StoreAddr, StoreError>;
fn add_pus_tm_from_creator(&mut self, pus_tm: &PusTmCreator) -> Result<StoreAddr, StoreError>;
}
impl PusTcPool for SharedPacketPool {
fn add_pus_tc(&mut self, pus_tc: &PusTcReader) -> Result<StoreAddr, StoreError> {
let mut pg = self.0.write().map_err(|_| StoreError::LockError)?;
let addr = pg.free_element(pus_tc.len_packed(), |buf| {
buf[0..pus_tc.len_packed()].copy_from_slice(pus_tc.raw_data());
})?;
Ok(addr)
}
}
impl PusTmPool for SharedPacketPool {
fn add_pus_tm_from_reader(&mut self, pus_tm: &PusTmReader) -> Result<StoreAddr, StoreError> {
let mut pg = self.0.write().map_err(|_| StoreError::LockError)?;
let addr = pg.free_element(pus_tm.len_packed(), |buf| {
buf[0..pus_tm.len_packed()].copy_from_slice(pus_tm.raw_data());
})?;
Ok(addr)
}
fn add_pus_tm_from_creator(&mut self, pus_tm: &PusTmCreator) -> Result<StoreAddr, StoreError> {
let mut pg = self.0.write().map_err(|_| StoreError::LockError)?;
let mut result = Ok(0);
let addr = pg.free_element(pus_tm.len_written(), |buf| {
result = pus_tm.write_to_bytes(buf);
})?;
result?;
Ok(addr)
}
}
impl CcsdsPacketPool for SharedPacketPool {
fn add_raw_tc(&mut self, tc_raw: &[u8]) -> Result<StoreAddr, StoreError> {
let mut pg = self.0.write().map_err(|_| StoreError::LockError)?;
let addr = pg.free_element(tc_raw.len(), |buf| {
buf[0..tc_raw.len()].copy_from_slice(tc_raw);
})?;
Ok(addr)
}
}
/// Generic trait for any sender component able to send packets stored inside a pool structure.
@@ -194,126 +253,25 @@ pub trait PacketInPoolSender: Send {
fn send_packet(
&self,
sender_id: ComponentId,
store_addr: PoolAddr,
store_addr: StoreAddr,
) -> Result<(), GenericSendError>;
}
#[cfg(feature = "alloc")]
pub mod alloc_mod {
use alloc::vec::Vec;
use super::*;
/// Simple type modelling packet stored in the heap. This structure is intended to
/// be used when sending a packet via a message queue, so it also contains the sender ID.
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct PacketAsVec {
pub sender_id: ComponentId,
pub packet: Vec<u8>,
}
impl PacketAsVec {
pub fn new(sender_id: ComponentId, packet: Vec<u8>) -> Self {
Self { sender_id, packet }
}
}
}
#[cfg(feature = "std")]
pub mod std_mod {
use core::cell::RefCell;
#[cfg(feature = "crossbeam")]
use crossbeam_channel as cb;
use spacepackets::ecss::WritablePusPacket;
use thiserror::Error;
use crate::pool::PoolProvider;
use crate::pus::{EcssTmSender, EcssTmtcError, PacketSenderPusTc};
use crate::pus::{EcssTmSender, EcssTmtcError, PacketInPool, PacketSenderPusTc};
use super::*;
/// Newtype wrapper around the [SharedStaticMemoryPool] to enable extension helper traits on
/// top of the regular shared memory pool API.
#[derive(Clone)]
pub struct SharedPacketPool(pub SharedStaticMemoryPool);
impl SharedPacketPool {
pub fn new(pool: &SharedStaticMemoryPool) -> Self {
Self(pool.clone())
}
}
impl PusTcPool for SharedPacketPool {
fn add_pus_tc(&mut self, pus_tc: &PusTcReader) -> Result<PoolAddr, PoolError> {
let mut pg = self.0.write().map_err(|_| PoolError::LockError)?;
let addr = pg.free_element(pus_tc.len_packed(), |buf| {
buf[0..pus_tc.len_packed()].copy_from_slice(pus_tc.raw_data());
})?;
Ok(addr)
}
}
impl PusTmPool for SharedPacketPool {
fn add_pus_tm_from_reader(&mut self, pus_tm: &PusTmReader) -> Result<PoolAddr, PoolError> {
let mut pg = self.0.write().map_err(|_| PoolError::LockError)?;
let addr = pg.free_element(pus_tm.len_packed(), |buf| {
buf[0..pus_tm.len_packed()].copy_from_slice(pus_tm.raw_data());
})?;
Ok(addr)
}
fn add_pus_tm_from_creator(
&mut self,
pus_tm: &PusTmCreator,
) -> Result<PoolAddr, PoolError> {
let mut pg = self.0.write().map_err(|_| PoolError::LockError)?;
let mut result = Ok(0);
let addr = pg.free_element(pus_tm.len_written(), |buf| {
result = pus_tm.write_to_bytes(buf);
})?;
result?;
Ok(addr)
}
}
impl CcsdsPacketPool for SharedPacketPool {
fn add_raw_tc(&mut self, tc_raw: &[u8]) -> Result<PoolAddr, PoolError> {
let mut pg = self.0.write().map_err(|_| PoolError::LockError)?;
let addr = pg.free_element(tc_raw.len(), |buf| {
buf[0..tc_raw.len()].copy_from_slice(tc_raw);
})?;
Ok(addr)
}
}
#[cfg(feature = "std")]
impl PacketSenderRaw for mpsc::Sender<PacketAsVec> {
type Error = GenericSendError;
fn send_packet(&self, sender_id: ComponentId, packet: &[u8]) -> Result<(), Self::Error> {
self.send(PacketAsVec::new(sender_id, packet.to_vec()))
.map_err(|_| GenericSendError::RxDisconnected)
}
}
#[cfg(feature = "std")]
impl PacketSenderRaw for mpsc::SyncSender<PacketAsVec> {
type Error = GenericSendError;
fn send_packet(&self, sender_id: ComponentId, tc_raw: &[u8]) -> Result<(), Self::Error> {
self.try_send(PacketAsVec::new(sender_id, tc_raw.to_vec()))
.map_err(|e| match e {
mpsc::TrySendError::Full(_) => GenericSendError::QueueFull(None),
mpsc::TrySendError::Disconnected(_) => GenericSendError::RxDisconnected,
})
}
}
#[derive(Debug, Clone, PartialEq, Eq, Error)]
pub enum StoreAndSendError {
#[error("Store error: {0}")]
Store(#[from] PoolError),
Store(#[from] StoreError),
#[error("Genreric send error: {0}")]
Send(#[from] GenericSendError),
}
@@ -324,7 +282,7 @@ pub mod std_mod {
fn send_packet(
&self,
sender_id: ComponentId,
store_addr: PoolAddr,
store_addr: StoreAddr,
) -> Result<(), GenericSendError> {
self.send(PacketInPool::new(sender_id, store_addr))
.map_err(|_| GenericSendError::RxDisconnected)
@@ -335,7 +293,7 @@ pub mod std_mod {
fn send_packet(
&self,
sender_id: ComponentId,
store_addr: PoolAddr,
store_addr: StoreAddr,
) -> Result<(), GenericSendError> {
self.try_send(PacketInPool::new(sender_id, store_addr))
.map_err(|e| match e {
@@ -350,7 +308,7 @@ pub mod std_mod {
fn send_packet(
&self,
sender_id: ComponentId,
store_addr: PoolAddr,
store_addr: StoreAddr,
) -> Result<(), GenericSendError> {
self.try_send(PacketInPool::new(sender_id, store_addr))
.map_err(|e| match e {
@@ -463,7 +421,7 @@ pub mod std_mod {
sender_id: crate::ComponentId,
tm: crate::pus::PusTmVariant,
) -> Result<(), crate::pus::EcssTmtcError> {
let send_addr = |store_addr: PoolAddr| {
let send_addr = |store_addr: StoreAddr| {
self.sender
.send_packet(sender_id, store_addr)
.map_err(EcssTmtcError::Send)
@@ -640,7 +598,7 @@ pub(crate) mod tests {
assert!(result.is_err());
matches!(
result.unwrap_err(),
StoreAndSendError::Store(PoolError::StoreFull(..))
StoreAndSendError::Store(StoreError::StoreFull(..))
);
let packet_in_pool = tc_rx.try_recv().unwrap();
let mut pool = shared_pool.0.write().unwrap();

View File

@@ -1,4 +1,4 @@
use satrs::pool::{PoolAddr, PoolGuard, PoolProvider, StaticMemoryPool, StaticPoolConfig};
use satrs::pool::{PoolGuard, PoolProvider, StaticMemoryPool, StaticPoolConfig, StoreAddr};
use std::ops::DerefMut;
use std::sync::mpsc;
use std::sync::mpsc::{Receiver, Sender};
@@ -12,7 +12,7 @@ fn threaded_usage() {
let pool_cfg = StaticPoolConfig::new(vec![(16, 6), (32, 3), (8, 12)], false);
let shared_pool = Arc::new(RwLock::new(StaticMemoryPool::new(pool_cfg)));
let shared_clone = shared_pool.clone();
let (tx, rx): (Sender<PoolAddr>, Receiver<PoolAddr>) = mpsc::channel();
let (tx, rx): (Sender<StoreAddr>, Receiver<StoreAddr>) = mpsc::channel();
let jh0 = thread::spawn(move || {
let mut dummy = shared_pool.write().unwrap();
let addr = dummy.add(&DUMMY_DATA).expect("Writing data failed");

View File

@@ -7,8 +7,8 @@ use satrs::params::U32Pair;
use satrs::params::{Params, ParamsHeapless, WritableToBeBytes};
use satrs::pus::event_man::{DefaultPusEventMgmtBackend, EventReporter, PusEventDispatcher};
use satrs::pus::test_util::TEST_COMPONENT_ID_0;
use satrs::pus::PacketAsVec;
use satrs::request::UniqueApidTargetId;
use satrs::tmtc::PacketAsVec;
use spacepackets::ecss::tm::PusTmReader;
use spacepackets::ecss::{PusError, PusPacket};
use std::sync::mpsc::{self, SendError, TryRecvError};

View File

@@ -23,10 +23,7 @@ use std::{
use hashbrown::HashSet;
use satrs::{
encoding::{
ccsds::{SpValidity, SpacePacketValidator},
cobs::encode_packet_with_cobs,
},
encoding::cobs::encode_packet_with_cobs,
hal::std::tcp_server::{
ConnectionResult, HandledConnectionHandler, HandledConnectionInfo, ServerConfig,
TcpSpacepacketsServer, TcpTmtcInCobsServer,
@@ -36,7 +33,7 @@ use satrs::{
};
use spacepackets::{
ecss::{tc::PusTcCreator, WritablePusPacket},
CcsdsPacket, PacketId, SpHeader,
PacketId, SpHeader,
};
use std::{collections::VecDeque, sync::Arc, vec::Vec};
@@ -133,7 +130,7 @@ fn test_cobs_server() {
// Call the connection handler in separate thread, does block.
thread::spawn(move || {
let result = tcp_server.handle_all_connections(Some(Duration::from_millis(400)));
let result = tcp_server.handle_next_connection(Some(Duration::from_millis(400)));
if result.is_err() {
panic!("handling connection failed: {:?}", result.unwrap_err());
}
@@ -195,24 +192,6 @@ fn test_cobs_server() {
const TEST_APID_0: u16 = 0x02;
const TEST_PACKET_ID_0: PacketId = PacketId::new_for_tc(true, TEST_APID_0);
#[derive(Default)]
pub struct SimpleVerificator {
pub valid_ids: HashSet<PacketId>,
}
impl SpacePacketValidator for SimpleVerificator {
fn validate(
&self,
sp_header: &SpHeader,
_raw_buf: &[u8],
) -> satrs::encoding::ccsds::SpValidity {
if self.valid_ids.contains(&sp_header.packet_id()) {
return SpValidity::Valid;
}
SpValidity::Skip
}
}
#[test]
fn test_ccsds_server() {
let (tc_sender, tc_receiver) = mpsc::channel();
@@ -221,8 +200,8 @@ fn test_ccsds_server() {
let verif_tm = PusTcCreator::new_simple(sph, 1, 1, &[], true);
let tm_0 = verif_tm.to_vec().expect("tm generation failed");
tm_source.add_tm(&tm_0);
let mut packet_id_lookup = SimpleVerificator::default();
packet_id_lookup.valid_ids.insert(TEST_PACKET_ID_0);
let mut packet_id_lookup = HashSet::new();
packet_id_lookup.insert(TEST_PACKET_ID_0);
let mut tcp_server = TcpSpacepacketsServer::new(
ServerConfig::new(
TCP_SERVER_ID,
@@ -245,7 +224,7 @@ fn test_ccsds_server() {
let set_if_done = conn_handled.clone();
// Call the connection handler in separate thread, does block.
thread::spawn(move || {
let result = tcp_server.handle_all_connections(Some(Duration::from_millis(500)));
let result = tcp_server.handle_next_connection(Some(Duration::from_millis(500)));
if result.is_err() {
panic!("handling connection failed: {:?}", result.unwrap_err());
}