move around some components

This commit is contained in:
Robin Müller 2022-08-19 12:35:21 +02:00
parent a17995c168
commit efb9f757ce
No known key found for this signature in database
GPG Key ID: 71B58F8A3CDFA9AC
2 changed files with 86 additions and 87 deletions

View File

@ -172,12 +172,12 @@ impl LocalPool {
/// size and then copy the given data to the block. Yields a [StoreAddr] which can be used /// size and then copy the given data to the block. Yields a [StoreAddr] which can be used
/// to access the data stored in the pool /// to access the data stored in the pool
pub fn add(&mut self, data: &[u8]) -> Result<StoreAddr, StoreError> { pub fn add(&mut self, data: &[u8]) -> Result<StoreAddr, StoreError> {
let data_len = data.as_ref().len(); let data_len = data.len();
if data_len > Self::MAX_SIZE { if data_len > Self::MAX_SIZE {
return Err(StoreError::DataTooLarge(data_len)); return Err(StoreError::DataTooLarge(data_len));
} }
let addr = self.reserve(data_len)?; let addr = self.reserve(data_len)?;
self.write(&addr, data.as_ref())?; self.write(&addr, data)?;
Ok(addr) Ok(addr)
} }
@ -206,7 +206,7 @@ impl LocalPool {
pub fn read(&self, addr: &StoreAddr) -> Result<&[u8], StoreError> { pub fn read(&self, addr: &StoreAddr) -> Result<&[u8], StoreError> {
let curr_size = self.addr_check(addr)?; let curr_size = self.addr_check(addr)?;
let raw_pos = self.raw_pos(addr).unwrap(); let raw_pos = self.raw_pos(addr).unwrap();
let block = &self.pool.get(addr.pool_idx as usize).unwrap()[raw_pos..curr_size]; let block = &self.pool.get(addr.pool_idx as usize).unwrap()[raw_pos..raw_pos + curr_size];
Ok(block) Ok(block)
} }
@ -215,14 +215,36 @@ impl LocalPool {
self.addr_check(&addr)?; self.addr_check(&addr)?;
let block_size = self.pool_cfg.cfg.get(addr.pool_idx as usize).unwrap().1; let block_size = self.pool_cfg.cfg.get(addr.pool_idx as usize).unwrap().1;
let raw_pos = self.raw_pos(&addr).unwrap(); let raw_pos = self.raw_pos(&addr).unwrap();
let block = &mut self.pool.get_mut(addr.pool_idx as usize).unwrap()[raw_pos..block_size]; let block = &mut self.pool.get_mut(addr.pool_idx as usize).unwrap()[raw_pos..raw_pos + block_size];
let size_list = self.sizes_lists.get_mut(addr.pool_idx as usize).unwrap(); let size_list = self.sizes_lists.get_mut(addr.pool_idx as usize).unwrap();
size_list[addr.packet_idx as usize] = Self::STORE_FREE; size_list[addr.packet_idx as usize] = Self::STORE_FREE;
block.fill(0); block.fill(0);
Ok(()) Ok(())
} }
pub fn has_element_at(&self, addr: &StoreAddr) -> Result<bool, StoreError> {
self.validate_addr(addr)?;
let pool_idx = addr.pool_idx as usize;
let size_list = self.sizes_lists.get(pool_idx).unwrap();
let curr_size = size_list[addr.packet_idx as usize];
if curr_size == Self::STORE_FREE {
return Ok(false)
}
Ok(true)
}
fn addr_check(&self, addr: &StoreAddr) -> Result<usize, StoreError> { fn addr_check(&self, addr: &StoreAddr) -> Result<usize, StoreError> {
self.validate_addr(addr)?;
let pool_idx = addr.pool_idx as usize;
let size_list = self.sizes_lists.get(pool_idx).unwrap();
let curr_size = size_list[addr.packet_idx as usize];
if curr_size == Self::STORE_FREE {
return Err(StoreError::DataDoesNotExist(*addr));
}
Ok(curr_size)
}
fn validate_addr(&self, addr: &StoreAddr) -> Result<(), StoreError> {
let pool_idx = addr.pool_idx as usize; let pool_idx = addr.pool_idx as usize;
if pool_idx as usize >= self.pool_cfg.cfg.len() { if pool_idx as usize >= self.pool_cfg.cfg.len() {
return Err(StoreError::InvalidStoreId( return Err(StoreError::InvalidStoreId(
@ -236,12 +258,7 @@ impl LocalPool {
Some(*addr), Some(*addr),
)); ));
} }
let size_list = self.sizes_lists.get(pool_idx).unwrap(); Ok(())
let curr_size = size_list[addr.packet_idx as usize];
if curr_size == Self::STORE_FREE {
return Err(StoreError::DataDoesNotExist(*addr));
}
Ok(curr_size)
} }
fn reserve(&mut self, data_len: usize) -> Result<StoreAddr, StoreError> { fn reserve(&mut self, data_len: usize) -> Result<StoreAddr, StoreError> {
@ -279,7 +296,7 @@ impl LocalPool {
addr addr
)) ))
})?; })?;
let pool_slice = &mut subpool[packet_pos..self.pool_cfg.cfg[addr.pool_idx as usize].1]; let pool_slice = &mut subpool[packet_pos..packet_pos + data.len()];
pool_slice.copy_from_slice(data); pool_slice.copy_from_slice(data);
Ok(()) Ok(())
} }
@ -306,6 +323,43 @@ impl LocalPool {
} }
} }
pub struct PoolGuard<'a> {
pool: &'a mut LocalPool,
pub addr: StoreAddr,
no_deletion: bool,
deletion_failed_error: Option<StoreError>
}
impl<'a> PoolGuard<'a> {
pub fn new(pool: &'a mut LocalPool, addr: StoreAddr) -> Self {
Self {
pool,
addr,
no_deletion: false,
deletion_failed_error: None
}
}
pub fn read(&self) -> Result<&[u8], StoreError> {
self.pool.read(&self.addr)
}
pub fn release(&mut self) {
self.no_deletion = true;
}
}
impl Drop for PoolGuard<'_> {
fn drop(&mut self) {
if !self.no_deletion {
let res = self.pool.delete(self.addr);
if res.is_err() {
self.deletion_failed_error = Some(res.unwrap_err());
}
}
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crate::pool::{LocalPool, PoolCfg, StoreAddr, StoreError, StoreIdError}; use crate::pool::{LocalPool, PoolCfg, StoreAddr, StoreError, StoreIdError};

View File

@ -1,91 +1,36 @@
use std::ops::DerefMut;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread; use std::thread;
use fsrc_core::pool::{LocalPool, PoolCfg, StoreAddr, PoolGuard};
use std::sync::mpsc::{Sender, Receiver};
use std::sync::mpsc;
struct PoolDummy { const DUMMY_DATA: [u8; 4] = [0, 1, 2, 3];
test_buf: [u8; 128],
}
struct PoolAccessDummy<'a> { #[test]
pool_dummy: &'a mut PoolDummy, fn threaded_usage() {
no_deletion: bool, let pool_cfg = PoolCfg::new(vec![(16, 6), (32, 3), (8, 12)]);
} let shared_dummy = Arc::new(RwLock::new(LocalPool::new(pool_cfg)));
impl PoolAccessDummy<'_> {
fn modify(&mut self) -> &mut [u8] {
self.pool_dummy.modify()
}
fn release(&mut self) {
self.no_deletion = true;
}
}
impl Drop for PoolAccessDummy<'_> {
fn drop(&mut self) {
if self.no_deletion {
println!("Pool access: Drop with no deletion")
} else {
self.pool_dummy.delete();
println!("Pool access: Drop with deletion");
}
}
}
impl Default for PoolDummy {
fn default() -> Self {
PoolDummy { test_buf: [0; 128] }
}
}
impl PoolDummy {
fn modify(&mut self) -> &mut [u8] {
self.test_buf.as_mut_slice()
}
fn modify_with_accessor(&mut self) -> PoolAccessDummy {
PoolAccessDummy {
pool_dummy: self,
no_deletion: false,
}
}
fn read(&self) -> &[u8] {
self.test_buf.as_slice()
}
fn delete(&mut self) {
println!("Store content was deleted");
}
}
fn pool_test() {
println!("Hello World");
let shared_dummy = Arc::new(RwLock::new(PoolDummy::default()));
let shared_clone = shared_dummy.clone(); let shared_clone = shared_dummy.clone();
let jh0 = thread::spawn(move || loop { let (tx, rx): (Sender<StoreAddr>, Receiver<StoreAddr>) = mpsc::channel();
let jh0 = thread::spawn(move || {
{ {
let mut dummy = shared_dummy.write().unwrap(); let mut dummy = shared_dummy.write().unwrap();
let buf = dummy.modify(); let addr = dummy.add(&DUMMY_DATA).expect("Writing data failed");
buf[0] = 1; tx.send(addr).expect("Sending store address failed");
let mut accessor = dummy.modify_with_accessor();
let buf = accessor.modify();
buf[0] = 2;
} }
}); });
let jh1 = thread::spawn(move || loop { let jh1 = thread::spawn(move || {
let mut pool_access = shared_clone.write().unwrap();
let addr;
{ {
let dummy = shared_clone.read().unwrap(); addr = rx.recv().expect("Receiving store address failed");
let buf = dummy.read(); let pg = PoolGuard::new(pool_access.deref_mut(), addr);
println!("Buffer 0: {:?}", buf[0]); let read_res = pg.read().expect("Reading failed");
assert_eq!(read_res, DUMMY_DATA);
} }
assert!(!pool_access.has_element_at(&addr).expect("Invalid address"));
let mut dummy = shared_clone.write().unwrap();
let mut accessor = dummy.modify_with_accessor();
let buf = accessor.modify();
buf[0] = 3;
accessor.release();
}); });
jh0.join().unwrap(); jh0.join().unwrap();
jh1.join().unwrap(); jh1.join().unwrap();