From f5a45c008e62059e43bc7051f93be6f672bb91a7 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Sat, 10 Sep 2022 13:34:04 +0200 Subject: [PATCH] this is complicated --- Cargo.lock | 16 +- fsrc-core/Cargo.toml | 2 +- fsrc-core/src/pool.rs | 215 +++++++++++++++------------ fsrc-core/src/pus/verification.rs | 46 +++--- fsrc-core/tests/pool_test.rs | 2 +- fsrc-core/tests/verification_test.rs | 5 +- fsrc-example/Cargo.toml | 1 + fsrc-example/src/bin/obsw/main.rs | 33 +++- fsrc-example/src/bin/obsw/pus.rs | 22 +-- fsrc-example/src/bin/obsw/tmtc.rs | 31 ++-- fsrc-example/src/bin/test.rs | 5 +- 11 files changed, 227 insertions(+), 151 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a09bb11..8d3a336 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -202,6 +202,17 @@ dependencies = [ "syn", ] +[[package]] +name = "delegate" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "082a24a9967533dc5d743c602157637116fc1b52806d694a5a45e6f32567fcdd" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "downcast-rs" version = "1.2.0" @@ -224,7 +235,7 @@ version = "0.1.0" dependencies = [ "bus", "crossbeam-channel", - "delegate", + "delegate 0.8.0", "downcast-rs", "hashbrown", "num-traits", @@ -241,6 +252,7 @@ name = "fsrc-example" version = "0.1.0" dependencies = [ "crossbeam-channel", + "delegate 0.8.0", "fsrc-core", "spacepackets", ] @@ -573,7 +585,7 @@ version = "0.1.0" dependencies = [ "chrono", "crc", - "delegate", + "delegate 0.7.0", "num-traits", "postcard", "serde", diff --git a/fsrc-core/Cargo.toml b/fsrc-core/Cargo.toml index 160c727..482f2ab 100644 --- a/fsrc-core/Cargo.toml +++ b/fsrc-core/Cargo.toml @@ -7,7 +7,7 @@ edition = "2021" [dependencies] thiserror = "1.0" -delegate = "0.7.0" +delegate = "0.8.0" hashbrown = "0.12.3" [dependencies.num-traits] diff --git a/fsrc-core/src/pool.rs b/fsrc-core/src/pool.rs index e1a00a7..66b68a5 100644 --- a/fsrc-core/src/pool.rs +++ b/fsrc-core/src/pool.rs @@ -13,7 +13,7 @@ //! # Example //! //! ``` -//! use fsrc_core::pool::{LocalPool, PoolCfg}; +//! use fsrc_core::pool::{LocalPool, PoolCfg, PoolProvider}; //! //! // 4 buckets of 4 bytes, 2 of 8 bytes and 1 of 16 bytes //! let pool_cfg = PoolCfg::new(vec![(4, 4), (2, 8), (1, 16)]); @@ -78,9 +78,18 @@ use alloc::string::String; use alloc::vec; use alloc::vec::Vec; use delegate::delegate; +#[cfg(feature = "std")] +use std::boxed::Box; +#[cfg(feature = "std")] +use std::sync::{Arc, RwLock}; type NumBlocks = u16; +#[cfg(feature = "std")] +pub type ShareablePoolProvider = Box; +#[cfg(feature = "std")] +pub type SharedPool = Arc>; + /// Configuration structure of the [local pool][LocalPool] /// /// # Parameters @@ -151,10 +160,52 @@ pub enum StoreError { InternalError(String), } +pub trait PoolProvider { + /// Add new data to the pool. The provider should attempt to reserve a memory block with the + /// appropriate size and then copy the given data to the block. Yields a [StoreAddr] which can + /// be used to access the data stored in the pool + fn add(&mut self, data: &[u8]) -> Result; + + /// The provider should attempt to reserve a free memory block with the appropriate size and + /// then return a mutable reference to it. Yields a [StoreAddr] which can be used to access + /// the data stored in the pool + fn free_element(&mut self, len: usize) -> Result<(StoreAddr, &mut [u8]), StoreError>; + + /// Modify data added previously using a given [StoreAddr] by yielding a mutable reference + /// to it + fn modify(&mut self, addr: &StoreAddr) -> Result<&mut [u8], StoreError>; + + /// This function behaves like [Self::modify], but consumes the provided address and returns a + /// RAII conformant guard object. + /// + /// Unless the guard [PoolRwGuard::release] method is called, the data for the + /// given address will be deleted automatically when the guard is dropped. + /// This can prevent memory leaks. Users can read (and modify) the data and release the guard + /// if the data in the store is valid for further processing. If the data is faulty, no + /// manual deletion is necessary when returning from a processing function prematurely. + fn modify_with_guard(&mut self, addr: StoreAddr) -> PoolRwGuard; + + /// Read data by yielding a read-only reference given a [StoreAddr] + fn read(&self, addr: &StoreAddr) -> Result<&[u8], StoreError>; + + /// This function behaves like [Self::read], but consumes the provided address and returns a + /// RAII conformant guard object. + /// + /// Unless the guard [PoolRwGuard::release] method is called, the data for the + /// given address will be deleted automatically when the guard is dropped. + /// This can prevent memory leaks. Users can read the data and release the guard + /// if the data in the store is valid for further processing. If the data is faulty, no + /// manual deletion is necessary when returning from a processing function prematurely. + fn read_with_guard(&mut self, addr: StoreAddr) -> PoolGuard; + + /// Delete data inside the pool given a [StoreAddr] + fn delete(&mut self, addr: StoreAddr) -> Result<(), StoreError>; + fn has_element_at(&self, addr: &StoreAddr) -> Result; +} + impl LocalPool { const STORE_FREE: PoolSize = PoolSize::MAX; const MAX_SIZE: PoolSize = Self::STORE_FREE - 1; - /// Create a new local pool from the [given configuration][PoolCfg]. This function will sanitize /// the given configuration as well. pub fn new(mut cfg: PoolCfg) -> LocalPool { @@ -175,96 +226,6 @@ impl LocalPool { local_pool } - /// Add new data to the pool. It will attempt to reserve a memory block with the appropriate - /// size and then copy the given data to the block. Yields a [StoreAddr] which can be used - /// to access the data stored in the pool - pub fn add(&mut self, data: &[u8]) -> Result { - let data_len = data.len(); - if data_len > Self::MAX_SIZE { - return Err(StoreError::DataTooLarge(data_len)); - } - let addr = self.reserve(data_len)?; - self.write(&addr, data)?; - Ok(addr) - } - - /// Reserves a free memory block with the appropriate size and returns a mutable reference - /// to it. Yields a [StoreAddr] which can be used to access the data stored in the pool - pub fn free_element(&mut self, len: usize) -> Result<(StoreAddr, &mut [u8]), StoreError> { - if len > Self::MAX_SIZE { - return Err(StoreError::DataTooLarge(len)); - } - let addr = self.reserve(len)?; - let raw_pos = self.raw_pos(&addr).unwrap(); - let block = &mut self.pool.get_mut(addr.pool_idx as usize).unwrap()[raw_pos..raw_pos + len]; - Ok((addr, block)) - } - - /// Modify data added previously using a given [StoreAddr] by yielding a mutable reference - /// to it - pub fn modify(&mut self, addr: &StoreAddr) -> Result<&mut [u8], StoreError> { - let curr_size = self.addr_check(addr)?; - let raw_pos = self.raw_pos(addr).unwrap(); - let block = &mut self.pool.get_mut(addr.pool_idx as usize).unwrap()[raw_pos..curr_size]; - Ok(block) - } - - /// This function behaves like [Self::modify], but consumes the provided address and returns a - /// RAII conformant guard object. - /// - /// Unless the guard [PoolRwGuard::release] method is called, the data for the - /// given address will be deleted automatically when the guard is dropped. - /// This can prevent memory leaks. Users can read (and modify) the data and release the guard - /// if the data in the store is valid for further processing. If the data is faulty, no - /// manual deletion is necessary when returning from a processing function prematurely. - pub fn modify_with_guard(&mut self, addr: StoreAddr) -> PoolRwGuard { - PoolRwGuard::new(self, addr) - } - - /// Read data by yielding a read-only reference given a [StoreAddr] - pub fn read(&self, addr: &StoreAddr) -> Result<&[u8], StoreError> { - let curr_size = self.addr_check(addr)?; - let raw_pos = self.raw_pos(addr).unwrap(); - let block = &self.pool.get(addr.pool_idx as usize).unwrap()[raw_pos..raw_pos + curr_size]; - Ok(block) - } - - /// This function behaves like [Self::read], but consumes the provided address and returns a - /// RAII conformant guard object. - /// - /// Unless the guard [PoolRwGuard::release] method is called, the data for the - /// given address will be deleted automatically when the guard is dropped. - /// This can prevent memory leaks. Users can read the data and release the guard - /// if the data in the store is valid for further processing. If the data is faulty, no - /// manual deletion is necessary when returning from a processing function prematurely. - pub fn read_with_guard(&mut self, addr: StoreAddr) -> PoolGuard { - PoolGuard::new(self, addr) - } - - /// Delete data inside the pool given a [StoreAddr] - pub fn delete(&mut self, addr: StoreAddr) -> Result<(), StoreError> { - self.addr_check(&addr)?; - let block_size = self.pool_cfg.cfg.get(addr.pool_idx as usize).unwrap().1; - let raw_pos = self.raw_pos(&addr).unwrap(); - let block = - &mut self.pool.get_mut(addr.pool_idx as usize).unwrap()[raw_pos..raw_pos + block_size]; - let size_list = self.sizes_lists.get_mut(addr.pool_idx as usize).unwrap(); - size_list[addr.packet_idx as usize] = Self::STORE_FREE; - block.fill(0); - Ok(()) - } - - pub fn has_element_at(&self, addr: &StoreAddr) -> Result { - self.validate_addr(addr)?; - let pool_idx = addr.pool_idx as usize; - let size_list = self.sizes_lists.get(pool_idx).unwrap(); - let curr_size = size_list[addr.packet_idx as usize]; - if curr_size == Self::STORE_FREE { - return Ok(false); - } - Ok(true) - } - fn addr_check(&self, addr: &StoreAddr) -> Result { self.validate_addr(addr)?; let pool_idx = addr.pool_idx as usize; @@ -355,6 +316,73 @@ impl LocalPool { } } +impl PoolProvider for LocalPool { + fn add(&mut self, data: &[u8]) -> Result { + let data_len = data.len(); + if data_len > Self::MAX_SIZE { + return Err(StoreError::DataTooLarge(data_len)); + } + let addr = self.reserve(data_len)?; + self.write(&addr, data)?; + Ok(addr) + } + + fn free_element(&mut self, len: usize) -> Result<(StoreAddr, &mut [u8]), StoreError> { + if len > Self::MAX_SIZE { + return Err(StoreError::DataTooLarge(len)); + } + let addr = self.reserve(len)?; + let raw_pos = self.raw_pos(&addr).unwrap(); + let block = &mut self.pool.get_mut(addr.pool_idx as usize).unwrap()[raw_pos..raw_pos + len]; + Ok((addr, block)) + } + + fn modify(&mut self, addr: &StoreAddr) -> Result<&mut [u8], StoreError> { + let curr_size = self.addr_check(addr)?; + let raw_pos = self.raw_pos(addr).unwrap(); + let block = &mut self.pool.get_mut(addr.pool_idx as usize).unwrap()[raw_pos..curr_size]; + Ok(block) + } + + fn modify_with_guard(&mut self, addr: StoreAddr) -> PoolRwGuard { + PoolRwGuard::new(self, addr) + } + + fn read(&self, addr: &StoreAddr) -> Result<&[u8], StoreError> { + let curr_size = self.addr_check(addr)?; + let raw_pos = self.raw_pos(addr).unwrap(); + let block = &self.pool.get(addr.pool_idx as usize).unwrap()[raw_pos..raw_pos + curr_size]; + Ok(block) + } + + fn read_with_guard(&mut self, addr: StoreAddr) -> PoolGuard { + PoolGuard::new(self, addr) + } + + fn delete(&mut self, addr: StoreAddr) -> Result<(), StoreError> { + self.addr_check(&addr)?; + let block_size = self.pool_cfg.cfg.get(addr.pool_idx as usize).unwrap().1; + let raw_pos = self.raw_pos(&addr).unwrap(); + let block = + &mut self.pool.get_mut(addr.pool_idx as usize).unwrap()[raw_pos..raw_pos + block_size]; + let size_list = self.sizes_lists.get_mut(addr.pool_idx as usize).unwrap(); + size_list[addr.packet_idx as usize] = Self::STORE_FREE; + block.fill(0); + Ok(()) + } + + fn has_element_at(&self, addr: &StoreAddr) -> Result { + self.validate_addr(addr)?; + let pool_idx = addr.pool_idx as usize; + let size_list = self.sizes_lists.get(pool_idx).unwrap(); + let curr_size = size_list[addr.packet_idx as usize]; + if curr_size == Self::STORE_FREE { + return Ok(false); + } + Ok(true) + } +} + pub struct PoolGuard<'a> { pool: &'a mut LocalPool, pub addr: StoreAddr, @@ -418,7 +446,8 @@ impl<'a> PoolRwGuard<'a> { #[cfg(test)] mod tests { use crate::pool::{ - LocalPool, PoolCfg, PoolGuard, PoolRwGuard, StoreAddr, StoreError, StoreIdError, + LocalPool, PoolCfg, PoolGuard, PoolProvider, PoolRwGuard, StoreAddr, StoreError, + StoreIdError, }; use std::vec; diff --git a/fsrc-core/src/pus/verification.rs b/fsrc-core/src/pus/verification.rs index e3646bb..e53d5c6 100644 --- a/fsrc-core/src/pus/verification.rs +++ b/fsrc-core/src/pus/verification.rs @@ -15,7 +15,7 @@ //! ``` //! use std::sync::{Arc, RwLock}; //! use std::time::Duration; -//! use fsrc_core::pool::{LocalPool, PoolCfg}; +//! use fsrc_core::pool::{LocalPool, PoolCfg, PoolProvider, SharedPool}; //! use fsrc_core::pus::verification::{CrossbeamVerifSender, VerificationReporterCfg, VerificationReporterWithSender}; //! use spacepackets::ecss::PusPacket; //! use spacepackets::SpHeader; @@ -26,7 +26,7 @@ //! const TEST_APID: u16 = 0x02; //! //! let pool_cfg = PoolCfg::new(vec![(10, 32), (10, 64), (10, 128), (10, 1024)]); -//! let shared_tm_pool = Arc::new(RwLock::new(LocalPool::new(pool_cfg.clone()))); +//! let shared_tm_pool: SharedPool = Arc::new(RwLock::new(Box::new(LocalPool::new(pool_cfg.clone())))); //! let (verif_tx, verif_rx) = crossbeam_channel::bounded(10); //! let sender = CrossbeamVerifSender::new(shared_tm_pool.clone(), verif_tx); //! let cfg = VerificationReporterCfg::new(TEST_APID, 1, 2, 8); @@ -87,7 +87,10 @@ use spacepackets::{ByteConversionError, SizeMissmatch, SpHeader}; use spacepackets::{CcsdsPacket, PacketId, PacketSequenceCtrl}; #[cfg(feature = "std")] -pub use stdmod::{CrossbeamVerifSender, StdVerifSender, StdVerifSenderError}; +pub use stdmod::{ + CrossbeamVerifSender, MpscVerifSender, SharedStdVerifReporterWithSender, + StdVerifReporterWithSender, StdVerifSenderError, +}; /// This is a request identifier as specified in 5.4.11.2 c. of the PUS standard /// This field equivalent to the first two bytes of the CCSDS space packet header. @@ -711,11 +714,16 @@ impl VerificationReporterWithSender { #[cfg(feature = "std")] mod stdmod { - use crate::pool::{LocalPool, StoreAddr, StoreError}; - use crate::pus::verification::{VerificationError, VerificationSender}; + use crate::pool::{ShareablePoolProvider, SharedPool, StoreAddr, StoreError}; + use crate::pus::verification::{ + VerificationError, VerificationReporterWithSender, VerificationSender, + }; use delegate::delegate; use spacepackets::tm::PusTm; - use std::sync::{mpsc, Arc, RwLock, RwLockWriteGuard}; + use std::sync::{mpsc, Arc, Mutex, RwLockWriteGuard}; + + pub type StdVerifReporterWithSender = VerificationReporterWithSender; + pub type SharedStdVerifReporterWithSender = Arc>; #[derive(Debug, Eq, PartialEq)] pub enum StdVerifSenderError { @@ -730,12 +738,12 @@ mod stdmod { struct StdSenderBase { pub ignore_poison_error: bool, - tm_store: Arc>, + tm_store: SharedPool, tx: S, } impl StdSenderBase { - pub fn new(tm_store: Arc>, tx: S) -> Self { + pub fn new(tm_store: SharedPool, tx: S) -> Self { Self { ignore_poison_error: false, tm_store, @@ -744,20 +752,23 @@ mod stdmod { } } + unsafe impl Sync for StdSenderBase {} + unsafe impl Send for StdSenderBase {} + impl SendBackend for mpsc::Sender { fn send(&self, addr: StoreAddr) -> Result<(), StoreAddr> { self.send(addr).map_err(|_| addr) } } - pub struct StdVerifSender { + pub struct MpscVerifSender { base: StdSenderBase>, } /// Verification sender with a [mpsc::Sender] backend. /// It implements the [VerificationSender] trait to be used as PUS Verification TM sender. - impl StdVerifSender { - pub fn new(tm_store: Arc>, tx: mpsc::Sender) -> Self { + impl MpscVerifSender { + pub fn new(tm_store: SharedPool, tx: mpsc::Sender) -> Self { Self { base: StdSenderBase::new(tm_store, tx), } @@ -765,15 +776,15 @@ mod stdmod { } //noinspection RsTraitImplementation - impl VerificationSender for StdVerifSender { + impl VerificationSender for MpscVerifSender { delegate!( to self.base { fn send_verification_tm(&mut self, tm: PusTm) -> Result<(), VerificationError>; } ); } - unsafe impl Sync for StdVerifSender {} - unsafe impl Send for StdVerifSender {} + unsafe impl Sync for MpscVerifSender {} + unsafe impl Send for MpscVerifSender {} impl SendBackend for crossbeam_channel::Sender { fn send(&self, addr: StoreAddr) -> Result<(), StoreAddr> { @@ -788,10 +799,7 @@ mod stdmod { } impl CrossbeamVerifSender { - pub fn new( - tm_store: Arc>, - tx: crossbeam_channel::Sender, - ) -> Self { + pub fn new(tm_store: SharedPool, tx: crossbeam_channel::Sender) -> Self { Self { base: StdSenderBase::new(tm_store, tx), } @@ -815,7 +823,7 @@ mod stdmod { &mut self, tm: PusTm, ) -> Result<(), VerificationError> { - let operation = |mut mg: RwLockWriteGuard| { + let operation = |mut mg: RwLockWriteGuard| { let (addr, buf) = mg.free_element(tm.len_packed()).map_err(|e| { VerificationError::SendError(StdVerifSenderError::StoreError(e)) })?; diff --git a/fsrc-core/tests/pool_test.rs b/fsrc-core/tests/pool_test.rs index e116130..3187fd9 100644 --- a/fsrc-core/tests/pool_test.rs +++ b/fsrc-core/tests/pool_test.rs @@ -1,4 +1,4 @@ -use fsrc_core::pool::{LocalPool, PoolCfg, PoolGuard, StoreAddr}; +use fsrc_core::pool::{LocalPool, PoolCfg, PoolGuard, PoolProvider, StoreAddr}; use std::ops::DerefMut; use std::sync::mpsc; use std::sync::mpsc::{Receiver, Sender}; diff --git a/fsrc-core/tests/verification_test.rs b/fsrc-core/tests/verification_test.rs index 4576471..22f351e 100644 --- a/fsrc-core/tests/verification_test.rs +++ b/fsrc-core/tests/verification_test.rs @@ -1,4 +1,4 @@ -use fsrc_core::pool::{LocalPool, PoolCfg}; +use fsrc_core::pool::{LocalPool, PoolCfg, PoolProvider, SharedPool}; use fsrc_core::pus::verification::{ CrossbeamVerifSender, FailParams, RequestId, VerificationReporterCfg, VerificationReporterWithSender, @@ -28,7 +28,8 @@ fn test_shared_reporter() { let cfg = VerificationReporterCfg::new(TEST_APID, 1, 2, 8); // Shared pool object to store the verification PUS telemetry let pool_cfg = PoolCfg::new(vec![(10, 32), (10, 64), (10, 128), (10, 1024)]); - let shared_tm_pool = Arc::new(RwLock::new(LocalPool::new(pool_cfg.clone()))); + let shared_tm_pool: SharedPool = + Arc::new(RwLock::new(Box::new(LocalPool::new(pool_cfg.clone())))); let shared_tc_pool_0 = Arc::new(RwLock::new(LocalPool::new(pool_cfg))); let shared_tc_pool_1 = shared_tc_pool_0.clone(); let (tx, rx) = crossbeam_channel::bounded(5); diff --git a/fsrc-example/Cargo.toml b/fsrc-example/Cargo.toml index ede00be..8bccc84 100644 --- a/fsrc-example/Cargo.toml +++ b/fsrc-example/Cargo.toml @@ -6,6 +6,7 @@ authors = ["Robin Mueller "] [dependencies] crossbeam-channel = "0.5" +delegate = "0.8" [dependencies.spacepackets] path = "../spacepackets" diff --git a/fsrc-example/src/bin/obsw/main.rs b/fsrc-example/src/bin/obsw/main.rs index 728196c..00ad086 100644 --- a/fsrc-example/src/bin/obsw/main.rs +++ b/fsrc-example/src/bin/obsw/main.rs @@ -2,13 +2,16 @@ mod ccsds; mod pus; mod tmtc; -use crate::tmtc::{core_tmtc_task, TmStore}; +use crate::tmtc::{core_tmtc_task, TmStore, PUS_APID}; use fsrc_core::hal::host::udp_server::UdpTcServer; -use fsrc_core::pool::{LocalPool, PoolCfg, StoreAddr}; +use fsrc_core::pool::{LocalPool, PoolCfg, SharedPool, StoreAddr}; +use fsrc_core::pus::verification::{ + MpscVerifSender, VerificationReporterCfg, VerificationReporterWithSender, +}; use fsrc_core::tmtc::CcsdsError; use fsrc_example::{OBSW_SERVER_ADDR, SERVER_PORT}; use std::net::{IpAddr, SocketAddr}; -use std::sync::{mpsc, Arc, Mutex}; +use std::sync::{mpsc, Arc, Mutex, RwLock}; use std::thread; struct TmFunnel { @@ -19,7 +22,7 @@ struct TmFunnel { struct UdpTmtcServer { udp_tc_server: UdpTcServer>, tm_rx: mpsc::Receiver, - tm_store: Arc>, + tm_store: SharedPool, } unsafe impl Send for UdpTmtcServer {} @@ -28,13 +31,27 @@ fn main() { println!("Running OBSW example"); let pool_cfg = PoolCfg::new(vec![(8, 32), (4, 64), (2, 128)]); let tm_pool = LocalPool::new(pool_cfg); - let tm_store = Arc::new(Mutex::new(TmStore { pool: tm_pool })); + let tm_store: SharedPool = Arc::new(RwLock::new(Box::new(tm_pool))); + let tm_store_helper = TmStore { + pool: tm_store.clone(), + }; let addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT); - let (tm_creator_tx, tm_funnel_rx) = mpsc::channel(); + let (tm_funnel_tx, tm_funnel_rx) = mpsc::channel(); let (tm_server_tx, tm_server_rx) = mpsc::channel(); - + let sender = MpscVerifSender::new(tm_store.clone(), tm_funnel_tx.clone()); + let verif_cfg = VerificationReporterCfg::new(PUS_APID, 1, 2, 8); + let reporter_with_sender_0 = Arc::new(Mutex::new(VerificationReporterWithSender::new( + verif_cfg, + Box::new(sender), + ))); let jh0 = thread::spawn(move || { - core_tmtc_task(tm_creator_tx.clone(), tm_server_rx, tm_store.clone(), addr); + core_tmtc_task( + tm_funnel_tx.clone(), + tm_server_rx, + tm_store_helper.clone(), + addr, + reporter_with_sender_0, + ); }); let jh1 = thread::spawn(move || { diff --git a/fsrc-example/src/bin/obsw/pus.rs b/fsrc-example/src/bin/obsw/pus.rs index ee7e9cf..eb76d09 100644 --- a/fsrc-example/src/bin/obsw/pus.rs +++ b/fsrc-example/src/bin/obsw/pus.rs @@ -1,23 +1,31 @@ -use crate::TmStore; +use crate::tmtc::TmStore; use fsrc_core::pool::StoreAddr; +use fsrc_core::pus::verification::SharedStdVerifReporterWithSender; use fsrc_core::tmtc::tm_helper::PusTmWithCdsShortHelper; use fsrc_core::tmtc::PusServiceProvider; use spacepackets::tc::{PusTc, PusTcSecondaryHeaderT}; use spacepackets::SpHeader; -use std::sync::{mpsc, Arc, Mutex}; +use std::sync::mpsc; pub struct PusReceiver { pub tm_helper: PusTmWithCdsShortHelper, pub tm_tx: mpsc::Sender, - pub tm_store: Arc>, + pub tm_store: TmStore, + pub verif_reporter: SharedStdVerifReporterWithSender, } impl PusReceiver { - pub fn new(apid: u16, tm_tx: mpsc::Sender, tm_store: Arc>) -> Self { + pub fn new( + apid: u16, + tm_tx: mpsc::Sender, + tm_store: TmStore, + verif_reporter: SharedStdVerifReporterWithSender, + ) -> Self { Self { tm_helper: PusTmWithCdsShortHelper::new(apid), tm_tx, tm_store, + verif_reporter, } } } @@ -44,11 +52,7 @@ impl PusReceiver { println!("Received PUS ping command TC[17,1]"); println!("Sending ping reply PUS TM[17,2]"); let ping_reply = self.tm_helper.create_pus_tm_timestamp_now(17, 2, None); - let addr = self - .tm_store - .lock() - .expect("Locking TM store failed") - .add_pus_tm(&ping_reply); + let addr = self.tm_store.add_pus_tm(&ping_reply); self.tm_tx .send(addr) .expect("Sending TM to TM funnel failed"); diff --git a/fsrc-example/src/bin/obsw/tmtc.rs b/fsrc-example/src/bin/obsw/tmtc.rs index 683569a..1c9ad9e 100644 --- a/fsrc-example/src/bin/obsw/tmtc.rs +++ b/fsrc-example/src/bin/obsw/tmtc.rs @@ -1,28 +1,28 @@ use fsrc_core::hal::host::udp_server::{ReceiveResult, UdpTcServer}; use std::net::SocketAddr; -use std::sync::{mpsc, Arc, Mutex}; +use std::sync::mpsc; use std::thread; use std::time::Duration; use crate::ccsds::CcsdsReceiver; use crate::pus::PusReceiver; use crate::UdpTmtcServer; -use fsrc_core::pool::{LocalPool, StoreAddr}; +use fsrc_core::pool::{SharedPool, StoreAddr}; +use fsrc_core::pus::verification::SharedStdVerifReporterWithSender; use fsrc_core::tmtc::{CcsdsDistributor, CcsdsError, PusDistributor}; use spacepackets::tm::PusTm; pub const PUS_APID: u16 = 0x02; +#[derive(Clone)] pub struct TmStore { - pub pool: LocalPool, + pub pool: SharedPool, } impl TmStore { pub fn add_pus_tm(&mut self, pus_tm: &PusTm) -> StoreAddr { - let (addr, buf) = self - .pool - .free_element(pus_tm.len_packed()) - .expect("Store error"); + let mut pg = self.pool.write().expect("Error locking TM store"); + let (addr, buf) = pg.free_element(pus_tm.len_packed()).expect("Store error"); pus_tm .write_to(buf) .expect("Writing PUS TM to store failed"); @@ -33,10 +33,16 @@ impl TmStore { pub fn core_tmtc_task( tm_creator_tx: mpsc::Sender, tm_server_rx: mpsc::Receiver, - tm_store: Arc>, + tm_store_helper: TmStore, addr: SocketAddr, + verif_reporter: SharedStdVerifReporterWithSender, ) { - let pus_receiver = PusReceiver::new(PUS_APID, tm_creator_tx, tm_store.clone()); + let pus_receiver = PusReceiver::new( + PUS_APID, + tm_creator_tx, + tm_store_helper.clone(), + verif_reporter, + ); let pus_distributor = PusDistributor::new(Box::new(pus_receiver)); let ccsds_receiver = CcsdsReceiver { pus_handler: pus_distributor, @@ -44,10 +50,11 @@ pub fn core_tmtc_task( let ccsds_distributor = CcsdsDistributor::new(Box::new(ccsds_receiver)); let udp_tc_server = UdpTcServer::new(addr, 2048, Box::new(ccsds_distributor)) .expect("Creating UDP TMTC server failed"); + let mut udp_tmtc_server = UdpTmtcServer { udp_tc_server, tm_rx: tm_server_rx, - tm_store, + tm_store: tm_store_helper.pool.clone(), }; loop { core_tmtc_loop(&mut udp_tmtc_server); @@ -89,9 +96,9 @@ fn core_tm_handling(udp_tmtc_server: &mut UdpTmtcServer, recv_addr: &SocketAddr) while let Ok(addr) = udp_tmtc_server.tm_rx.try_recv() { let mut store_lock = udp_tmtc_server .tm_store - .lock() + .write() .expect("Locking TM store failed"); - let pg = store_lock.pool.read_with_guard(addr); + let pg = store_lock.read_with_guard(addr); let buf = pg.read().expect("Error reading TM pool data"); println!("Sending TM"); udp_tmtc_server diff --git a/fsrc-example/src/bin/test.rs b/fsrc-example/src/bin/test.rs index 026ccfc..a979878 100644 --- a/fsrc-example/src/bin/test.rs +++ b/fsrc-example/src/bin/test.rs @@ -27,10 +27,7 @@ impl FieldDataProvider for FixedFieldDataWrapper { type FieldDataTraitObj = Box; fn main() { - let (s0, r0): ( - Sender, - Receiver, - ) = bounded(5); + let (s0, r0): (Sender, Receiver) = bounded(5); let data_wrapper = FixedFieldDataWrapper::from_two_u32(2, 3); s0.send(Box::new(data_wrapper)).unwrap(); let jh0 = thread::spawn(move || {