split up crate and create workspace
This commit is contained in:
16
fsrc-core/Cargo.toml
Normal file
16
fsrc-core/Cargo.toml
Normal file
@ -0,0 +1,16 @@
|
||||
[package]
|
||||
name = "fsrc-core"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
thiserror = "1.0"
|
||||
bus = "2.2.3"
|
||||
num = "0.4"
|
||||
heapless = "0.7.13"
|
||||
postcard = { version = "0.7.3", features = ["use-std"] }
|
||||
serde = "1.0.137"
|
||||
deku = "0.13"
|
||||
zerocopy = "0.6.1"
|
311
fsrc-core/src/event_man.rs
Normal file
311
fsrc-core/src/event_man.rs
Normal file
@ -0,0 +1,311 @@
|
||||
//! [Event][crate::core::events::Event] management and forwarding
|
||||
use crate::events::{Event, EventRaw, GroupId};
|
||||
use std::collections::HashMap;
|
||||
|
||||
#[derive(PartialEq, Eq, Hash, Copy, Clone)]
|
||||
enum ListenerType {
|
||||
Single(EventRaw),
|
||||
Group(GroupId),
|
||||
}
|
||||
|
||||
pub trait EventListener {
|
||||
type Error;
|
||||
|
||||
fn id(&self) -> u32;
|
||||
fn send_to(&mut self, event: Event) -> Result<(), Self::Error>;
|
||||
}
|
||||
|
||||
struct Listener<E> {
|
||||
ltype: ListenerType,
|
||||
dest: Box<dyn EventListener<Error = E>>,
|
||||
}
|
||||
|
||||
pub trait ReceivesAllEvent {
|
||||
fn receive(&mut self) -> Option<Event>;
|
||||
}
|
||||
|
||||
pub struct EventManager<E> {
|
||||
listeners: HashMap<ListenerType, Vec<Listener<E>>>,
|
||||
event_receiver: Box<dyn ReceivesAllEvent>,
|
||||
}
|
||||
|
||||
pub enum HandlerResult {
|
||||
Empty,
|
||||
Handled(u32, Event),
|
||||
}
|
||||
|
||||
impl<E> EventManager<E> {
|
||||
pub fn new(event_receiver: Box<dyn ReceivesAllEvent>) -> Self {
|
||||
EventManager {
|
||||
listeners: HashMap::new(),
|
||||
event_receiver,
|
||||
}
|
||||
}
|
||||
pub fn subscribe_single(
|
||||
&mut self,
|
||||
event: Event,
|
||||
dest: impl EventListener<Error = E> + 'static,
|
||||
) {
|
||||
self.update_listeners(ListenerType::Single(event.raw()), dest);
|
||||
}
|
||||
|
||||
pub fn subscribe_group(
|
||||
&mut self,
|
||||
group_id: GroupId,
|
||||
dest: impl EventListener<Error = E> + 'static,
|
||||
) {
|
||||
self.update_listeners(ListenerType::Group(group_id), dest);
|
||||
}
|
||||
|
||||
fn update_listeners(
|
||||
&mut self,
|
||||
key: ListenerType,
|
||||
dest: impl EventListener<Error = E> + 'static,
|
||||
) {
|
||||
if let std::collections::hash_map::Entry::Vacant(e) = self.listeners.entry(key) {
|
||||
e.insert(vec![Listener {
|
||||
ltype: key,
|
||||
dest: Box::new(dest),
|
||||
}]);
|
||||
} else {
|
||||
let vec = self.listeners.get_mut(&key).unwrap();
|
||||
// To prevent double insertions
|
||||
for entry in vec.iter() {
|
||||
if entry.ltype == key && entry.dest.id() == dest.id() {
|
||||
return;
|
||||
}
|
||||
}
|
||||
vec.push(Listener {
|
||||
ltype: key,
|
||||
dest: Box::new(dest),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
pub fn try_event_handling(&mut self) -> Result<HandlerResult, E> {
|
||||
let mut err_status = None;
|
||||
let mut num_recipients = 0;
|
||||
let mut send_handler = |event, llist: &mut Vec<Listener<E>>| {
|
||||
for listener in llist.iter_mut() {
|
||||
if let Err(e) = listener.dest.send_to(event) {
|
||||
err_status = Some(Err(e));
|
||||
} else {
|
||||
num_recipients += 1;
|
||||
}
|
||||
}
|
||||
};
|
||||
if let Some(event) = self.event_receiver.receive() {
|
||||
let single_key = ListenerType::Single(event.raw());
|
||||
if self.listeners.contains_key(&single_key) {
|
||||
send_handler(event, self.listeners.get_mut(&single_key).unwrap());
|
||||
}
|
||||
let group_key = ListenerType::Group(event.group_id());
|
||||
if self.listeners.contains_key(&group_key) {
|
||||
send_handler(event, self.listeners.get_mut(&group_key).unwrap());
|
||||
}
|
||||
if let Some(err) = err_status {
|
||||
return err;
|
||||
}
|
||||
return Ok(HandlerResult::Handled(num_recipients, event));
|
||||
}
|
||||
Ok(HandlerResult::Empty)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::{EventListener, HandlerResult, ReceivesAllEvent};
|
||||
use crate::event_man::EventManager;
|
||||
use crate::events::{Event, Severity};
|
||||
use std::sync::mpsc::{channel, Receiver, SendError, Sender};
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
|
||||
struct EventReceiver {
|
||||
mpsc_receiver: Receiver<Event>,
|
||||
}
|
||||
impl ReceivesAllEvent for EventReceiver {
|
||||
fn receive(&mut self) -> Option<Event> {
|
||||
self.mpsc_receiver.try_recv().ok()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct MpscEventSenderQueue {
|
||||
id: u32,
|
||||
mpsc_sender: Sender<Event>,
|
||||
}
|
||||
|
||||
impl EventListener for MpscEventSenderQueue {
|
||||
type Error = SendError<Event>;
|
||||
|
||||
fn id(&self) -> u32 {
|
||||
self.id
|
||||
}
|
||||
fn send_to(&mut self, event: Event) -> Result<(), Self::Error> {
|
||||
self.mpsc_sender.send(event)
|
||||
}
|
||||
}
|
||||
|
||||
fn check_next_event(expected: Event, receiver: &Receiver<Event>) {
|
||||
for _ in 0..5 {
|
||||
if let Ok(event) = receiver.try_recv() {
|
||||
assert_eq!(event, expected);
|
||||
break;
|
||||
}
|
||||
thread::sleep(Duration::from_millis(1));
|
||||
}
|
||||
}
|
||||
|
||||
fn check_handled_event(res: HandlerResult, expected: Event, expected_num_sent: u32) {
|
||||
assert!(matches!(res, HandlerResult::Handled { .. }));
|
||||
if let HandlerResult::Handled(num_recipients, event) = res {
|
||||
assert_eq!(event, expected);
|
||||
assert_eq!(num_recipients, expected_num_sent);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_basic() {
|
||||
let (event_sender, manager_queue) = channel();
|
||||
let event_man_receiver = EventReceiver {
|
||||
mpsc_receiver: manager_queue,
|
||||
};
|
||||
let mut event_man: EventManager<SendError<Event>> =
|
||||
EventManager::new(Box::new(event_man_receiver));
|
||||
let event_grp_0 = Event::new(Severity::INFO, 0, 0).unwrap();
|
||||
let event_grp_1_0 = Event::new(Severity::HIGH, 1, 0).unwrap();
|
||||
let (single_event_sender, single_event_receiver) = channel();
|
||||
let single_event_listener = MpscEventSenderQueue {
|
||||
id: 0,
|
||||
mpsc_sender: single_event_sender,
|
||||
};
|
||||
event_man.subscribe_single(event_grp_0, single_event_listener);
|
||||
let (group_event_sender_0, group_event_receiver_0) = channel();
|
||||
let group_event_listener = MpscEventSenderQueue {
|
||||
id: 1,
|
||||
mpsc_sender: group_event_sender_0,
|
||||
};
|
||||
event_man.subscribe_group(event_grp_1_0.group_id(), group_event_listener);
|
||||
|
||||
// Test event with one listener
|
||||
event_sender
|
||||
.send(event_grp_0)
|
||||
.expect("Sending single error failed");
|
||||
let res = event_man.try_event_handling();
|
||||
assert!(res.is_ok());
|
||||
check_handled_event(res.unwrap(), event_grp_0, 1);
|
||||
check_next_event(event_grp_0, &single_event_receiver);
|
||||
|
||||
// Test event which is sent to all group listeners
|
||||
event_sender
|
||||
.send(event_grp_1_0)
|
||||
.expect("Sending group error failed");
|
||||
let res = event_man.try_event_handling();
|
||||
assert!(res.is_ok());
|
||||
check_handled_event(res.unwrap(), event_grp_1_0, 1);
|
||||
check_next_event(event_grp_1_0, &group_event_receiver_0);
|
||||
}
|
||||
|
||||
/// Test listening for multiple groups
|
||||
#[test]
|
||||
fn test_multi_group() {
|
||||
let (event_sender, manager_queue) = channel();
|
||||
let event_man_receiver = EventReceiver {
|
||||
mpsc_receiver: manager_queue,
|
||||
};
|
||||
let mut event_man: EventManager<SendError<Event>> =
|
||||
EventManager::new(Box::new(event_man_receiver));
|
||||
let res = event_man.try_event_handling();
|
||||
assert!(res.is_ok());
|
||||
let hres = res.unwrap();
|
||||
assert!(matches!(hres, HandlerResult::Empty));
|
||||
|
||||
let event_grp_0 = Event::new(Severity::INFO, 0, 0).unwrap();
|
||||
let event_grp_1_0 = Event::new(Severity::HIGH, 1, 0).unwrap();
|
||||
let (event_grp_0_sender, event_grp_0_receiver) = channel();
|
||||
let event_grp_0_and_1_listener = MpscEventSenderQueue {
|
||||
id: 0,
|
||||
mpsc_sender: event_grp_0_sender,
|
||||
};
|
||||
event_man.subscribe_group(event_grp_0.group_id(), event_grp_0_and_1_listener.clone());
|
||||
event_man.subscribe_group(event_grp_1_0.group_id(), event_grp_0_and_1_listener);
|
||||
|
||||
event_sender
|
||||
.send(event_grp_0)
|
||||
.expect("Sending Event Group 0 failed");
|
||||
event_sender
|
||||
.send(event_grp_1_0)
|
||||
.expect("Sendign Event Group 1 failed");
|
||||
let res = event_man.try_event_handling();
|
||||
assert!(res.is_ok());
|
||||
check_handled_event(res.unwrap(), event_grp_0, 1);
|
||||
let res = event_man.try_event_handling();
|
||||
assert!(res.is_ok());
|
||||
check_handled_event(res.unwrap(), event_grp_1_0, 1);
|
||||
|
||||
check_next_event(event_grp_0, &event_grp_0_receiver);
|
||||
check_next_event(event_grp_1_0, &event_grp_0_receiver);
|
||||
}
|
||||
|
||||
/// Test listening to the same event from multiple listeners. Also test listening
|
||||
/// to both group and single events from one listener
|
||||
#[test]
|
||||
fn test_listening_to_same_event_and_multi_type() {
|
||||
let (event_sender, manager_queue) = channel();
|
||||
let event_man_receiver = EventReceiver {
|
||||
mpsc_receiver: manager_queue,
|
||||
};
|
||||
let mut event_man: EventManager<SendError<Event>> =
|
||||
EventManager::new(Box::new(event_man_receiver));
|
||||
let event_0 = Event::new(Severity::INFO, 0, 5).unwrap();
|
||||
let event_1 = Event::new(Severity::HIGH, 1, 0).unwrap();
|
||||
let (event_0_tx_0, event_0_rx_0) = channel();
|
||||
let (event_0_tx_1, event_0_rx_1) = channel();
|
||||
let event_listener_0 = MpscEventSenderQueue {
|
||||
id: 0,
|
||||
mpsc_sender: event_0_tx_0,
|
||||
};
|
||||
let event_listener_1 = MpscEventSenderQueue {
|
||||
id: 1,
|
||||
mpsc_sender: event_0_tx_1,
|
||||
};
|
||||
event_man.subscribe_single(event_0, event_listener_0.clone());
|
||||
event_man.subscribe_single(event_0, event_listener_1);
|
||||
event_sender
|
||||
.send(event_0)
|
||||
.expect("Triggering Event 0 failed");
|
||||
let res = event_man.try_event_handling();
|
||||
assert!(res.is_ok());
|
||||
check_handled_event(res.unwrap(), event_0, 2);
|
||||
check_next_event(event_0, &event_0_rx_0);
|
||||
check_next_event(event_0, &event_0_rx_1);
|
||||
event_man.subscribe_group(event_1.group_id(), event_listener_0.clone());
|
||||
event_sender
|
||||
.send(event_0)
|
||||
.expect("Triggering Event 0 failed");
|
||||
event_sender
|
||||
.send(event_1)
|
||||
.expect("Triggering Event 1 failed");
|
||||
|
||||
// 3 Events messages will be sent now
|
||||
let res = event_man.try_event_handling();
|
||||
assert!(res.is_ok());
|
||||
check_handled_event(res.unwrap(), event_0, 2);
|
||||
let res = event_man.try_event_handling();
|
||||
assert!(res.is_ok());
|
||||
check_handled_event(res.unwrap(), event_1, 1);
|
||||
// Both the single event and the group event should arrive now
|
||||
check_next_event(event_0, &event_0_rx_0);
|
||||
check_next_event(event_1, &event_0_rx_0);
|
||||
|
||||
// Double insertion should be detected, result should remain the same
|
||||
event_man.subscribe_group(event_1.group_id(), event_listener_0);
|
||||
event_sender
|
||||
.send(event_1)
|
||||
.expect("Triggering Event 1 failed");
|
||||
let res = event_man.try_event_handling();
|
||||
assert!(res.is_ok());
|
||||
check_handled_event(res.unwrap(), event_1, 1);
|
||||
}
|
||||
}
|
133
fsrc-core/src/events.rs
Normal file
133
fsrc-core/src/events.rs
Normal file
@ -0,0 +1,133 @@
|
||||
//! Event support module
|
||||
use num::pow;
|
||||
|
||||
pub type GroupId = u16;
|
||||
pub type UniqueId = u16;
|
||||
pub type EventRaw = u32;
|
||||
|
||||
#[derive(Copy, Clone, PartialEq, Debug)]
|
||||
pub enum Severity {
|
||||
INFO = 1,
|
||||
LOW = 2,
|
||||
MEDIUM = 3,
|
||||
HIGH = 4,
|
||||
}
|
||||
|
||||
impl TryFrom<u8> for Severity {
|
||||
type Error = ();
|
||||
|
||||
fn try_from(value: u8) -> Result<Self, Self::Error> {
|
||||
match value {
|
||||
x if x == Severity::INFO as u8 => Ok(Severity::INFO),
|
||||
x if x == Severity::LOW as u8 => Ok(Severity::LOW),
|
||||
x if x == Severity::MEDIUM as u8 => Ok(Severity::MEDIUM),
|
||||
x if x == Severity::HIGH as u8 => Ok(Severity::HIGH),
|
||||
_ => Err(()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug, PartialEq)]
|
||||
pub struct Event {
|
||||
severity: Severity,
|
||||
group_id: GroupId,
|
||||
unique_id: UniqueId,
|
||||
}
|
||||
|
||||
impl Event {
|
||||
/// Generate an event. The raw representation of an event has 32 bits.
|
||||
/// If the passed group ID is invalid (too large), None wil be returned
|
||||
///
|
||||
/// # Parameter
|
||||
///
|
||||
/// * `severity`: Each event has a [severity][Severity]. The raw value of the severity will
|
||||
/// be stored inside the uppermost 3 bits of the raw event ID
|
||||
/// * `group_id`: Related events can be grouped using a group ID. The group ID will occupy the
|
||||
/// next 13 bits after the severity. Therefore, the size is limited by dec 8191 hex 0x1FFF.
|
||||
/// * `unique_id`: Each event has a unique 16 bit ID occupying the last 16 bits of the
|
||||
/// raw event ID
|
||||
pub fn new(severity: Severity, group_id: GroupId, unique_id: UniqueId) -> Option<Event> {
|
||||
if group_id > (pow::pow(2u8 as u16, 13) - 1) {
|
||||
return None;
|
||||
}
|
||||
Some(Event {
|
||||
severity,
|
||||
group_id,
|
||||
unique_id,
|
||||
})
|
||||
}
|
||||
|
||||
/// Retrieve the severity of an event. Returns None if that severity bit field of the raw event
|
||||
/// ID is invalid
|
||||
pub fn severity(&self) -> Severity {
|
||||
self.severity
|
||||
}
|
||||
|
||||
pub fn group_id(&self) -> GroupId {
|
||||
self.group_id
|
||||
}
|
||||
|
||||
pub fn unique_id(&self) -> UniqueId {
|
||||
self.unique_id
|
||||
}
|
||||
|
||||
pub fn raw(&self) -> EventRaw {
|
||||
(((self.severity as u32) << 29) as u32
|
||||
| ((self.group_id as u32) << 16) as u32
|
||||
| self.unique_id as u32) as EventRaw
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<EventRaw> for Event {
|
||||
type Error = ();
|
||||
|
||||
fn try_from(raw: u32) -> Result<Self, Self::Error> {
|
||||
let severity: Option<Severity> = (((raw >> 29) & 0b111) as u8).try_into().ok();
|
||||
if severity.is_none() {
|
||||
return Err(());
|
||||
}
|
||||
let group_id = ((raw >> 16) & 0x1FFF) as u16;
|
||||
let unique_id = (raw & 0xFFFF) as u16;
|
||||
Event::new(severity.unwrap(), group_id, unique_id).ok_or(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::Event;
|
||||
use crate::events::Severity;
|
||||
|
||||
#[test]
|
||||
fn test_events() {
|
||||
let event = Event::new(Severity::INFO, 0, 0).unwrap();
|
||||
assert_eq!(event.severity(), Severity::INFO);
|
||||
assert_eq!(event.unique_id(), 0);
|
||||
assert_eq!(event.group_id(), 0);
|
||||
|
||||
let raw_event = event.raw();
|
||||
assert_eq!(raw_event, 0x20000000);
|
||||
let conv_from_raw = Event::try_from(raw_event);
|
||||
assert!(conv_from_raw.is_ok());
|
||||
let opt_event = conv_from_raw.ok();
|
||||
assert!(opt_event.is_some());
|
||||
let event = opt_event.unwrap();
|
||||
assert_eq!(event.severity(), Severity::INFO);
|
||||
assert_eq!(event.unique_id(), 0);
|
||||
assert_eq!(event.group_id(), 0);
|
||||
|
||||
let event = Event::new(Severity::HIGH, 0x1FFF, 0xFFFF).unwrap();
|
||||
assert_eq!(event.severity(), Severity::HIGH);
|
||||
assert_eq!(event.group_id(), 0x1FFF);
|
||||
assert_eq!(event.unique_id(), 0xFFFF);
|
||||
let raw_event = event.raw();
|
||||
assert_eq!(raw_event, 0x9FFFFFFF);
|
||||
let conv_from_raw = Event::try_from(raw_event);
|
||||
assert!(conv_from_raw.is_ok());
|
||||
let opt_event = conv_from_raw.ok();
|
||||
assert!(opt_event.is_some());
|
||||
let event = opt_event.unwrap();
|
||||
assert_eq!(event.severity(), Severity::HIGH);
|
||||
assert_eq!(event.group_id(), 0x1FFF);
|
||||
assert_eq!(event.unique_id(), 0xFFFF);
|
||||
}
|
||||
}
|
497
fsrc-core/src/executable.rs
Normal file
497
fsrc-core/src/executable.rs
Normal file
@ -0,0 +1,497 @@
|
||||
//! Task scheduling module
|
||||
use bus::BusReader;
|
||||
use std::error::Error;
|
||||
use std::sync::mpsc::TryRecvError;
|
||||
use std::thread;
|
||||
use std::thread::JoinHandle;
|
||||
use std::time::Duration;
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub enum OpResult {
|
||||
Ok,
|
||||
TerminationRequested,
|
||||
}
|
||||
|
||||
pub enum ExecutionType {
|
||||
Infinite,
|
||||
Cycles(u32),
|
||||
OneShot,
|
||||
}
|
||||
|
||||
pub trait Executable: Send {
|
||||
type Error;
|
||||
|
||||
fn exec_type(&self) -> ExecutionType;
|
||||
fn task_name(&self) -> &'static str;
|
||||
fn periodic_op(&mut self, op_code: i32) -> Result<OpResult, Self::Error>;
|
||||
}
|
||||
|
||||
/// This function allows executing one task which implements the [Executable][Executable] trait
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `executable`: Executable task
|
||||
/// * `task_freq`: Optional frequency of task. Required for periodic and fixed cycle tasks
|
||||
/// * `op_code`: Operation code which is passed to the executable task [operation call][Executable::periodic_op]
|
||||
/// * `termination`: Optional termination handler which can cancel threads with a broadcast
|
||||
pub fn exec_sched_single<
|
||||
T: Executable<Error = E> + Send + 'static + ?Sized,
|
||||
E: Error + Send + 'static,
|
||||
>(
|
||||
mut executable: Box<T>,
|
||||
task_freq: Option<Duration>,
|
||||
op_code: i32,
|
||||
mut termination: Option<BusReader<()>>,
|
||||
) -> JoinHandle<Result<OpResult, E>> {
|
||||
let mut cycle_count = 0;
|
||||
thread::spawn(move || loop {
|
||||
if let Some(ref mut terminator) = termination {
|
||||
match terminator.try_recv() {
|
||||
Ok(_) | Err(TryRecvError::Disconnected) => {
|
||||
return Ok(OpResult::Ok);
|
||||
}
|
||||
Err(TryRecvError::Empty) => (),
|
||||
}
|
||||
}
|
||||
match executable.exec_type() {
|
||||
ExecutionType::OneShot => {
|
||||
executable.periodic_op(op_code)?;
|
||||
return Ok(OpResult::Ok);
|
||||
}
|
||||
ExecutionType::Infinite => {
|
||||
executable.periodic_op(op_code)?;
|
||||
}
|
||||
ExecutionType::Cycles(cycles) => {
|
||||
executable.periodic_op(op_code)?;
|
||||
cycle_count += 1;
|
||||
if cycle_count == cycles {
|
||||
return Ok(OpResult::Ok);
|
||||
}
|
||||
}
|
||||
}
|
||||
let freq = task_freq.unwrap_or_else(|| panic!("No task frequency specified"));
|
||||
thread::sleep(freq);
|
||||
})
|
||||
}
|
||||
|
||||
/// This function allows executing multiple tasks as long as the tasks implement the
|
||||
/// [Executable][Executable] trait
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `executable_vec`: Vector of executable objects
|
||||
/// * `task_freq`: Optional frequency of task. Required for periodic and fixed cycle tasks
|
||||
/// * `op_code`: Operation code which is passed to the executable task [operation call][Executable::periodic_op]
|
||||
/// * `termination`: Optional termination handler which can cancel threads with a broadcast
|
||||
pub fn exec_sched_multi<
|
||||
T: Executable<Error = E> + Send + 'static + ?Sized,
|
||||
E: Error + Send + 'static,
|
||||
>(
|
||||
mut executable_vec: Vec<Box<T>>,
|
||||
task_freq: Option<Duration>,
|
||||
op_code: i32,
|
||||
mut termination: Option<BusReader<()>>,
|
||||
) -> JoinHandle<Result<OpResult, E>> {
|
||||
let mut cycle_counts = vec![0; executable_vec.len()];
|
||||
let mut removal_flags = vec![false; executable_vec.len()];
|
||||
thread::spawn(move || loop {
|
||||
if let Some(ref mut terminator) = termination {
|
||||
match terminator.try_recv() {
|
||||
Ok(_) | Err(TryRecvError::Disconnected) => {
|
||||
removal_flags.iter_mut().for_each(|x| *x = true);
|
||||
}
|
||||
Err(TryRecvError::Empty) => (),
|
||||
}
|
||||
}
|
||||
for (idx, executable) in executable_vec.iter_mut().enumerate() {
|
||||
match executable.exec_type() {
|
||||
ExecutionType::OneShot => {
|
||||
executable.periodic_op(op_code)?;
|
||||
removal_flags[idx] = true;
|
||||
}
|
||||
ExecutionType::Infinite => {
|
||||
executable.periodic_op(op_code)?;
|
||||
}
|
||||
ExecutionType::Cycles(cycles) => {
|
||||
executable.periodic_op(op_code)?;
|
||||
cycle_counts[idx] += 1;
|
||||
if cycle_counts[idx] == cycles {
|
||||
removal_flags[idx] = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
let mut removal_iter = removal_flags.iter();
|
||||
executable_vec.retain(|_| !*removal_iter.next().unwrap());
|
||||
removal_iter = removal_flags.iter();
|
||||
cycle_counts.retain(|_| !*removal_iter.next().unwrap());
|
||||
removal_flags.retain(|&i| !i);
|
||||
if executable_vec.is_empty() {
|
||||
return Ok(OpResult::Ok);
|
||||
}
|
||||
let freq = task_freq.unwrap_or_else(|| panic!("No task frequency specified"));
|
||||
thread::sleep(freq);
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::{exec_sched_multi, exec_sched_single, Executable, ExecutionType, OpResult};
|
||||
use bus::Bus;
|
||||
use std::error::Error;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Duration;
|
||||
use std::{fmt, thread};
|
||||
|
||||
struct TestInfo {
|
||||
exec_num: u32,
|
||||
op_code: i32,
|
||||
}
|
||||
struct OneShotTask {
|
||||
exec_num: Arc<Mutex<TestInfo>>,
|
||||
}
|
||||
struct FixedCyclesTask {
|
||||
cycles: u32,
|
||||
exec_num: Arc<Mutex<TestInfo>>,
|
||||
}
|
||||
struct PeriodicTask {
|
||||
exec_num: Arc<Mutex<TestInfo>>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
struct ExampleError {
|
||||
kind: ErrorKind,
|
||||
}
|
||||
|
||||
/// The kind of an error that can occur.
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum ErrorKind {
|
||||
Generic(String, i32),
|
||||
}
|
||||
|
||||
impl ExampleError {
|
||||
fn new(msg: &str, code: i32) -> ExampleError {
|
||||
ExampleError {
|
||||
kind: ErrorKind::Generic(msg.to_string(), code),
|
||||
}
|
||||
}
|
||||
|
||||
/// Return the kind of this error.
|
||||
pub fn kind(&self) -> &ErrorKind {
|
||||
&self.kind
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for ExampleError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
match self.kind() {
|
||||
ErrorKind::Generic(str, code) => {
|
||||
write!(f, "{str} with code {code}")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Error for ExampleError {}
|
||||
|
||||
const ONE_SHOT_TASK_NAME: &str = "One Shot Task";
|
||||
|
||||
impl Executable for OneShotTask {
|
||||
type Error = ExampleError;
|
||||
|
||||
fn exec_type(&self) -> ExecutionType {
|
||||
ExecutionType::OneShot
|
||||
}
|
||||
|
||||
fn task_name(&self) -> &'static str {
|
||||
ONE_SHOT_TASK_NAME
|
||||
}
|
||||
|
||||
fn periodic_op(&mut self, op_code: i32) -> Result<OpResult, ExampleError> {
|
||||
let mut data = self.exec_num.lock().expect("Locking Mutex failed");
|
||||
data.exec_num += 1;
|
||||
data.op_code = op_code;
|
||||
std::mem::drop(data);
|
||||
if op_code >= 0 {
|
||||
Ok(OpResult::Ok)
|
||||
} else {
|
||||
Err(ExampleError::new("One Shot Task Failure", op_code))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const CYCLE_TASK_NAME: &str = "Fixed Cycles Task";
|
||||
|
||||
impl Executable for FixedCyclesTask {
|
||||
type Error = ExampleError;
|
||||
|
||||
fn exec_type(&self) -> ExecutionType {
|
||||
ExecutionType::Cycles(self.cycles)
|
||||
}
|
||||
|
||||
fn task_name(&self) -> &'static str {
|
||||
CYCLE_TASK_NAME
|
||||
}
|
||||
|
||||
fn periodic_op(&mut self, op_code: i32) -> Result<OpResult, ExampleError> {
|
||||
let mut data = self.exec_num.lock().expect("Locking Mutex failed");
|
||||
data.exec_num += 1;
|
||||
data.op_code = op_code;
|
||||
std::mem::drop(data);
|
||||
if op_code >= 0 {
|
||||
Ok(OpResult::Ok)
|
||||
} else {
|
||||
Err(ExampleError::new("Fixed Cycle Task Failure", op_code))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const PERIODIC_TASK_NAME: &str = "Periodic Task";
|
||||
|
||||
impl Executable for PeriodicTask {
|
||||
type Error = ExampleError;
|
||||
|
||||
fn exec_type(&self) -> ExecutionType {
|
||||
ExecutionType::Infinite
|
||||
}
|
||||
|
||||
fn task_name(&self) -> &'static str {
|
||||
PERIODIC_TASK_NAME
|
||||
}
|
||||
|
||||
fn periodic_op(&mut self, op_code: i32) -> Result<OpResult, ExampleError> {
|
||||
let mut data = self.exec_num.lock().expect("Locking Mutex failed");
|
||||
data.exec_num += 1;
|
||||
data.op_code = op_code;
|
||||
std::mem::drop(data);
|
||||
if op_code >= 0 {
|
||||
Ok(OpResult::Ok)
|
||||
} else {
|
||||
Err(ExampleError::new("Example Task Failure", op_code))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_simple_one_shot() {
|
||||
let expected_op_code = 42;
|
||||
let shared = Arc::new(Mutex::new(TestInfo {
|
||||
exec_num: 0,
|
||||
op_code: 0,
|
||||
}));
|
||||
let exec_task = OneShotTask {
|
||||
exec_num: shared.clone(),
|
||||
};
|
||||
let task = Box::new(exec_task);
|
||||
let jhandle = exec_sched_single(
|
||||
task,
|
||||
Some(Duration::from_millis(100)),
|
||||
expected_op_code,
|
||||
None,
|
||||
);
|
||||
let thread_res = jhandle.join().expect("One Shot Task failed");
|
||||
assert!(thread_res.is_ok());
|
||||
assert_eq!(thread_res.unwrap(), OpResult::Ok);
|
||||
let data = shared.lock().expect("Locking Mutex failed");
|
||||
assert_eq!(data.exec_num, 1);
|
||||
assert_eq!(data.op_code, expected_op_code);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_failed_one_shot() {
|
||||
let op_code_inducing_failure = -1;
|
||||
let shared = Arc::new(Mutex::new(TestInfo {
|
||||
exec_num: 0,
|
||||
op_code: 0,
|
||||
}));
|
||||
let exec_task = OneShotTask {
|
||||
exec_num: shared.clone(),
|
||||
};
|
||||
let task = Box::new(exec_task);
|
||||
let jhandle = exec_sched_single(
|
||||
task,
|
||||
Some(Duration::from_millis(100)),
|
||||
op_code_inducing_failure,
|
||||
None,
|
||||
);
|
||||
let thread_res = jhandle.join().expect("One Shot Task failed");
|
||||
assert!(thread_res.is_err());
|
||||
let error = thread_res.unwrap_err();
|
||||
let err = error.kind();
|
||||
assert!(matches!(err, &ErrorKind::Generic { .. }));
|
||||
match err {
|
||||
ErrorKind::Generic(str, op_code) => {
|
||||
assert_eq!(str, &String::from("One Shot Task Failure"));
|
||||
assert_eq!(op_code, &op_code_inducing_failure);
|
||||
}
|
||||
}
|
||||
let error_display = error.to_string();
|
||||
assert_eq!(error_display, "One Shot Task Failure with code -1");
|
||||
let data = shared.lock().expect("Locking Mutex failed");
|
||||
assert_eq!(data.exec_num, 1);
|
||||
assert_eq!(data.op_code, op_code_inducing_failure);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_simple_multi_one_shot() {
|
||||
let expected_op_code = 43;
|
||||
let shared = Arc::new(Mutex::new(TestInfo {
|
||||
exec_num: 0,
|
||||
op_code: 0,
|
||||
}));
|
||||
let exec_task_0 = OneShotTask {
|
||||
exec_num: shared.clone(),
|
||||
};
|
||||
let exec_task_1 = OneShotTask {
|
||||
exec_num: shared.clone(),
|
||||
};
|
||||
let task_vec = vec![Box::new(exec_task_0), Box::new(exec_task_1)];
|
||||
for task in task_vec.iter() {
|
||||
assert_eq!(task.task_name(), ONE_SHOT_TASK_NAME);
|
||||
}
|
||||
let jhandle = exec_sched_multi(
|
||||
task_vec,
|
||||
Some(Duration::from_millis(100)),
|
||||
expected_op_code,
|
||||
None,
|
||||
);
|
||||
let thread_res = jhandle.join().expect("One Shot Task failed");
|
||||
assert!(thread_res.is_ok());
|
||||
assert_eq!(thread_res.unwrap(), OpResult::Ok);
|
||||
let data = shared.lock().expect("Locking Mutex failed");
|
||||
assert_eq!(data.exec_num, 2);
|
||||
assert_eq!(data.op_code, expected_op_code);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_cycles_single() {
|
||||
let expected_op_code = 44;
|
||||
let shared = Arc::new(Mutex::new(TestInfo {
|
||||
exec_num: 0,
|
||||
op_code: 0,
|
||||
}));
|
||||
let cycled_task = Box::new(FixedCyclesTask {
|
||||
exec_num: shared.clone(),
|
||||
cycles: 1,
|
||||
});
|
||||
assert_eq!(cycled_task.task_name(), CYCLE_TASK_NAME);
|
||||
let jh = exec_sched_single(
|
||||
cycled_task,
|
||||
Some(Duration::from_millis(100)),
|
||||
expected_op_code,
|
||||
None,
|
||||
);
|
||||
let thread_res = jh.join().expect("Cycles Task failed");
|
||||
assert!(thread_res.is_ok());
|
||||
let data = shared.lock().expect("Locking Mutex failed");
|
||||
assert_eq!(thread_res.unwrap(), OpResult::Ok);
|
||||
assert_eq!(data.exec_num, 1);
|
||||
assert_eq!(data.op_code, expected_op_code);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_single_and_cycles() {
|
||||
let expected_op_code = 50;
|
||||
let shared = Arc::new(Mutex::new(TestInfo {
|
||||
exec_num: 0,
|
||||
op_code: 0,
|
||||
}));
|
||||
let one_shot_task = Box::new(OneShotTask {
|
||||
exec_num: shared.clone(),
|
||||
});
|
||||
let cycled_task_0 = Box::new(FixedCyclesTask {
|
||||
exec_num: shared.clone(),
|
||||
cycles: 1,
|
||||
});
|
||||
let cycled_task_1 = Box::new(FixedCyclesTask {
|
||||
exec_num: shared.clone(),
|
||||
cycles: 1,
|
||||
});
|
||||
assert_eq!(cycled_task_0.task_name(), CYCLE_TASK_NAME);
|
||||
assert_eq!(one_shot_task.task_name(), ONE_SHOT_TASK_NAME);
|
||||
let task_vec: Vec<Box<dyn Executable<Error = ExampleError>>> =
|
||||
vec![one_shot_task, cycled_task_0, cycled_task_1];
|
||||
let jh = exec_sched_multi(
|
||||
task_vec,
|
||||
Some(Duration::from_millis(100)),
|
||||
expected_op_code,
|
||||
None,
|
||||
);
|
||||
let thread_res = jh.join().expect("Cycles Task failed");
|
||||
assert!(thread_res.is_ok());
|
||||
let data = shared.lock().expect("Locking Mutex failed");
|
||||
assert_eq!(thread_res.unwrap(), OpResult::Ok);
|
||||
assert_eq!(data.exec_num, 3);
|
||||
assert_eq!(data.op_code, expected_op_code);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn test_periodic_single() {
|
||||
let mut terminator = Bus::new(5);
|
||||
let expected_op_code = 45;
|
||||
let shared = Arc::new(Mutex::new(TestInfo {
|
||||
exec_num: 0,
|
||||
op_code: 0,
|
||||
}));
|
||||
let periodic_task = Box::new(PeriodicTask {
|
||||
exec_num: shared.clone(),
|
||||
});
|
||||
assert_eq!(periodic_task.task_name(), PERIODIC_TASK_NAME);
|
||||
let jh = exec_sched_single(
|
||||
periodic_task,
|
||||
Some(Duration::from_millis(20)),
|
||||
expected_op_code,
|
||||
Some(terminator.add_rx()),
|
||||
);
|
||||
thread::sleep(Duration::from_millis(40));
|
||||
terminator.broadcast(());
|
||||
let thread_res = jh.join().expect("Periodic Task failed");
|
||||
assert!(thread_res.is_ok());
|
||||
let data = shared.lock().expect("Locking Mutex failed");
|
||||
assert_eq!(thread_res.unwrap(), OpResult::Ok);
|
||||
let range = 2..4;
|
||||
assert!(range.contains(&data.exec_num));
|
||||
assert_eq!(data.op_code, expected_op_code);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn test_periodic_multi() {
|
||||
let mut terminator = Bus::new(5);
|
||||
let expected_op_code = 46;
|
||||
let shared = Arc::new(Mutex::new(TestInfo {
|
||||
exec_num: 0,
|
||||
op_code: 0,
|
||||
}));
|
||||
let cycled_task = Box::new(FixedCyclesTask {
|
||||
exec_num: shared.clone(),
|
||||
cycles: 1,
|
||||
});
|
||||
let periodic_task_0 = Box::new(PeriodicTask {
|
||||
exec_num: shared.clone(),
|
||||
});
|
||||
let periodic_task_1 = Box::new(PeriodicTask {
|
||||
exec_num: shared.clone(),
|
||||
});
|
||||
assert_eq!(periodic_task_0.task_name(), PERIODIC_TASK_NAME);
|
||||
assert_eq!(periodic_task_1.task_name(), PERIODIC_TASK_NAME);
|
||||
let task_vec: Vec<Box<dyn Executable<Error = ExampleError>>> =
|
||||
vec![cycled_task, periodic_task_0, periodic_task_1];
|
||||
let jh = exec_sched_multi(
|
||||
task_vec,
|
||||
Some(Duration::from_millis(20)),
|
||||
expected_op_code,
|
||||
Some(terminator.add_rx()),
|
||||
);
|
||||
thread::sleep(Duration::from_millis(60));
|
||||
terminator.broadcast(());
|
||||
let thread_res = jh.join().expect("Periodic Task failed");
|
||||
assert!(thread_res.is_ok());
|
||||
let data = shared.lock().expect("Locking Mutex failed");
|
||||
assert_eq!(thread_res.unwrap(), OpResult::Ok);
|
||||
let range = 7..11;
|
||||
assert!(range.contains(&data.exec_num));
|
||||
assert_eq!(data.op_code, expected_op_code);
|
||||
}
|
||||
}
|
6
fsrc-core/src/lib.rs
Normal file
6
fsrc-core/src/lib.rs
Normal file
@ -0,0 +1,6 @@
|
||||
//! # Core components of the Flight Software Rust Crate (FSRC) collection
|
||||
pub mod event_man;
|
||||
pub mod events;
|
||||
pub mod executable;
|
||||
pub mod objects;
|
||||
pub mod pool;
|
289
fsrc-core/src/objects.rs
Normal file
289
fsrc-core/src/objects.rs
Normal file
@ -0,0 +1,289 @@
|
||||
//! # Module providing addressable object support and a manager for them
|
||||
//!
|
||||
//! Each addressable object can be identified using an [object ID][ObjectId].
|
||||
//! The [system object][ManagedSystemObject] trait also allows storing these objects into the
|
||||
//! [object manager][ObjectManager]. They can then be retrieved and casted back to a known type
|
||||
//! using the object ID.
|
||||
//!
|
||||
//! # Examples
|
||||
//!
|
||||
//! ```
|
||||
//! use std::any::Any;
|
||||
//! use std::error::Error;
|
||||
//! use fsrc_core::objects::{ManagedSystemObject, ObjectId, ObjectManager, SystemObject};
|
||||
//!
|
||||
//! struct ExampleSysObj {
|
||||
//! id: ObjectId,
|
||||
//! dummy: u32,
|
||||
//! was_initialized: bool,
|
||||
//! }
|
||||
//!
|
||||
//! impl ExampleSysObj {
|
||||
//! fn new(id: ObjectId, dummy: u32) -> ExampleSysObj {
|
||||
//! ExampleSysObj {
|
||||
//! id,
|
||||
//! dummy,
|
||||
//! was_initialized: false,
|
||||
//! }
|
||||
//! }
|
||||
//! }
|
||||
//!
|
||||
//! impl SystemObject for ExampleSysObj {
|
||||
//! fn as_any(&self) -> &dyn Any {
|
||||
//! self
|
||||
//! }
|
||||
//!
|
||||
//! fn get_object_id(&self) -> &ObjectId {
|
||||
//! &self.id
|
||||
//! }
|
||||
//!
|
||||
//! fn initialize(&mut self) -> Result<(), Box<dyn Error>> {
|
||||
//! self.was_initialized = true;
|
||||
//! Ok(())
|
||||
//! }
|
||||
//! }
|
||||
//!
|
||||
//! impl ManagedSystemObject for ExampleSysObj {}
|
||||
//!
|
||||
//!
|
||||
//! let mut obj_manager = ObjectManager::default();
|
||||
//! let obj_id = ObjectId { id: 0, name: "Example 0"};
|
||||
//! let example_obj = ExampleSysObj::new(obj_id, 42);
|
||||
//! obj_manager.insert(Box::new(example_obj));
|
||||
//! let obj_back_casted: Option<&ExampleSysObj> = obj_manager.get(&obj_id);
|
||||
//! let example_obj = obj_back_casted.unwrap();
|
||||
//! assert_eq!(example_obj.id, obj_id);
|
||||
//! assert_eq!(example_obj.dummy, 42);
|
||||
//! ```
|
||||
|
||||
use std::any::Any;
|
||||
use std::collections::HashMap;
|
||||
use std::error::Error;
|
||||
|
||||
#[derive(PartialEq, Eq, Hash, Copy, Clone, Debug)]
|
||||
pub struct ObjectId {
|
||||
pub id: u32,
|
||||
pub name: &'static str,
|
||||
}
|
||||
|
||||
/// Each object which is stored inside the [object manager][ObjectManager] needs to implemented
|
||||
/// this trait
|
||||
pub trait SystemObject {
|
||||
fn as_any(&self) -> &dyn Any;
|
||||
fn get_object_id(&self) -> &ObjectId;
|
||||
fn initialize(&mut self) -> Result<(), Box<dyn Error>>;
|
||||
}
|
||||
|
||||
pub trait ManagedSystemObject: SystemObject + Any + Send {}
|
||||
|
||||
/// Helper module to manage multiple [ManagedSystemObjects][ManagedSystemObject] by mapping them
|
||||
/// using an [object ID][ObjectId]
|
||||
pub struct ObjectManager {
|
||||
obj_map: HashMap<ObjectId, Box<dyn ManagedSystemObject>>,
|
||||
}
|
||||
|
||||
impl Default for ObjectManager {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl ObjectManager {
|
||||
pub fn new() -> ObjectManager {
|
||||
ObjectManager {
|
||||
obj_map: HashMap::new(),
|
||||
}
|
||||
}
|
||||
pub fn insert(&mut self, sys_obj: Box<dyn ManagedSystemObject>) -> bool {
|
||||
let obj_id = sys_obj.get_object_id();
|
||||
if self.obj_map.contains_key(obj_id) {
|
||||
return false;
|
||||
}
|
||||
self.obj_map.insert(*obj_id, sys_obj).is_none()
|
||||
}
|
||||
|
||||
/// Initializes all System Objects in the hash map and returns the number of successful
|
||||
/// initializations
|
||||
pub fn initialize(&mut self) -> Result<u32, Box<dyn Error>> {
|
||||
let mut init_success = 0;
|
||||
for val in self.obj_map.values_mut() {
|
||||
if val.initialize().is_ok() {
|
||||
init_success += 1
|
||||
}
|
||||
}
|
||||
Ok(init_success)
|
||||
}
|
||||
|
||||
/// Retrieve an object stored inside the manager. The type to retrieve needs to be explicitly
|
||||
/// passed as a generic parameter
|
||||
pub fn get<T: Any>(&self, key: &ObjectId) -> Option<&T> {
|
||||
self.obj_map
|
||||
.get(key)
|
||||
.and_then(|o| o.as_ref().as_any().downcast_ref::<T>())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::objects::{ManagedSystemObject, ObjectId, ObjectManager, SystemObject};
|
||||
use std::any::Any;
|
||||
use std::error::Error;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::thread;
|
||||
|
||||
struct ExampleSysObj {
|
||||
id: ObjectId,
|
||||
dummy: u32,
|
||||
was_initialized: bool,
|
||||
}
|
||||
|
||||
impl ExampleSysObj {
|
||||
fn new(id: ObjectId, dummy: u32) -> ExampleSysObj {
|
||||
ExampleSysObj {
|
||||
id,
|
||||
dummy,
|
||||
was_initialized: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl SystemObject for ExampleSysObj {
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
|
||||
fn get_object_id(&self) -> &ObjectId {
|
||||
&self.id
|
||||
}
|
||||
|
||||
fn initialize(&mut self) -> Result<(), Box<dyn Error>> {
|
||||
self.was_initialized = true;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl ManagedSystemObject for ExampleSysObj {}
|
||||
|
||||
struct OtherExampleObject {
|
||||
id: ObjectId,
|
||||
string: String,
|
||||
was_initialized: bool,
|
||||
}
|
||||
|
||||
impl SystemObject for OtherExampleObject {
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
|
||||
fn get_object_id(&self) -> &ObjectId {
|
||||
&self.id
|
||||
}
|
||||
|
||||
fn initialize(&mut self) -> Result<(), Box<dyn Error>> {
|
||||
self.was_initialized = true;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl ManagedSystemObject for OtherExampleObject {}
|
||||
|
||||
#[test]
|
||||
fn test_obj_manager_simple() {
|
||||
let mut obj_manager = ObjectManager::default();
|
||||
let expl_obj_id = ObjectId {
|
||||
id: 0,
|
||||
name: "Example 0",
|
||||
};
|
||||
let example_obj = ExampleSysObj::new(expl_obj_id, 42);
|
||||
assert!(obj_manager.insert(Box::new(example_obj)));
|
||||
let res = obj_manager.initialize();
|
||||
assert!(res.is_ok());
|
||||
assert_eq!(res.unwrap(), 1);
|
||||
let obj_back_casted: Option<&ExampleSysObj> = obj_manager.get(&expl_obj_id);
|
||||
assert!(obj_back_casted.is_some());
|
||||
let expl_obj_back_casted = obj_back_casted.unwrap();
|
||||
assert_eq!(expl_obj_back_casted.dummy, 42);
|
||||
assert!(expl_obj_back_casted.was_initialized);
|
||||
|
||||
let second_obj_id = ObjectId {
|
||||
id: 12,
|
||||
name: "Example 1",
|
||||
};
|
||||
let second_example_obj = OtherExampleObject {
|
||||
id: second_obj_id,
|
||||
string: String::from("Hello Test"),
|
||||
was_initialized: false,
|
||||
};
|
||||
|
||||
assert!(obj_manager.insert(Box::new(second_example_obj)));
|
||||
let res = obj_manager.initialize();
|
||||
assert!(res.is_ok());
|
||||
assert_eq!(res.unwrap(), 2);
|
||||
let obj_back_casted: Option<&OtherExampleObject> = obj_manager.get(&second_obj_id);
|
||||
assert!(obj_back_casted.is_some());
|
||||
let expl_obj_back_casted = obj_back_casted.unwrap();
|
||||
assert_eq!(expl_obj_back_casted.string, String::from("Hello Test"));
|
||||
assert!(expl_obj_back_casted.was_initialized);
|
||||
|
||||
let existing_obj_id = ObjectId {
|
||||
id: 12,
|
||||
name: "Example 1",
|
||||
};
|
||||
let invalid_obj = OtherExampleObject {
|
||||
id: existing_obj_id,
|
||||
string: String::from("Hello Test"),
|
||||
was_initialized: false,
|
||||
};
|
||||
|
||||
assert!(!obj_manager.insert(Box::new(invalid_obj)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn object_man_threaded() {
|
||||
let obj_manager = Arc::new(Mutex::new(ObjectManager::new()));
|
||||
let expl_obj_id = ObjectId {
|
||||
id: 0,
|
||||
name: "Example 0",
|
||||
};
|
||||
let example_obj = ExampleSysObj::new(expl_obj_id, 42);
|
||||
let second_obj_id = ObjectId {
|
||||
id: 12,
|
||||
name: "Example 1",
|
||||
};
|
||||
let second_example_obj = OtherExampleObject {
|
||||
id: second_obj_id,
|
||||
string: String::from("Hello Test"),
|
||||
was_initialized: false,
|
||||
};
|
||||
|
||||
let mut obj_man_handle = obj_manager.lock().expect("Mutex lock failed");
|
||||
assert!(obj_man_handle.insert(Box::new(example_obj)));
|
||||
assert!(obj_man_handle.insert(Box::new(second_example_obj)));
|
||||
let res = obj_man_handle.initialize();
|
||||
std::mem::drop(obj_man_handle);
|
||||
assert!(res.is_ok());
|
||||
assert_eq!(res.unwrap(), 2);
|
||||
let obj_man_0 = obj_manager.clone();
|
||||
let jh0 = thread::spawn(move || {
|
||||
let locked_man = obj_man_0.lock().expect("Mutex lock failed");
|
||||
let obj_back_casted: Option<&ExampleSysObj> = locked_man.get(&expl_obj_id);
|
||||
assert!(obj_back_casted.is_some());
|
||||
let expl_obj_back_casted = obj_back_casted.unwrap();
|
||||
assert_eq!(expl_obj_back_casted.dummy, 42);
|
||||
assert!(expl_obj_back_casted.was_initialized);
|
||||
std::mem::drop(locked_man)
|
||||
});
|
||||
|
||||
let jh1 = thread::spawn(move || {
|
||||
let locked_man = obj_manager.lock().expect("Mutex lock failed");
|
||||
let obj_back_casted: Option<&OtherExampleObject> = locked_man.get(&second_obj_id);
|
||||
assert!(obj_back_casted.is_some());
|
||||
let expl_obj_back_casted = obj_back_casted.unwrap();
|
||||
assert_eq!(expl_obj_back_casted.string, String::from("Hello Test"));
|
||||
assert!(expl_obj_back_casted.was_initialized);
|
||||
std::mem::drop(locked_man)
|
||||
});
|
||||
jh0.join().expect("Joining thread 0 failed");
|
||||
jh1.join().expect("Joining thread 1 failed");
|
||||
}
|
||||
}
|
479
fsrc-core/src/pool.rs
Normal file
479
fsrc-core/src/pool.rs
Normal file
@ -0,0 +1,479 @@
|
||||
//! # Pool implementation providing pre-allocated sub-pools with fixed size memory blocks
|
||||
//!
|
||||
//! This is a simple memory pool implementation which pre-allocates all sub-pools using a given pool
|
||||
//! configuration. After the pre-allocation, no dynamic memory allocation will be performed
|
||||
//! during run-time. This makes the implementation suitable for real-time applications and
|
||||
//! embedded environments. The pool implementation will also track the size of the data stored
|
||||
//! inside it.
|
||||
//!
|
||||
//! Transaction with the [pool][LocalPool] are done using a special [address][StoreAddr] type.
|
||||
//! Adding any data to the pool will yield a store address. Modification and read operations are
|
||||
//! done using a reference to a store address. Deletion will consume the store address.
|
||||
//!
|
||||
//! # Example
|
||||
//!
|
||||
//! ```
|
||||
//! use fsrc_core::pool::{LocalPool, PoolCfg};
|
||||
//!
|
||||
//! // 4 buckets of 4 bytes, 2 of 8 bytes and 1 of 16 bytes
|
||||
//! let pool_cfg = PoolCfg::new(vec![(4, 4), (2, 8), (1, 16)]);
|
||||
//! let mut local_pool = LocalPool::new(pool_cfg);
|
||||
//! let mut addr;
|
||||
//! {
|
||||
//! // Add new data to the pool
|
||||
//! let mut example_data = [0; 4];
|
||||
//! example_data[0] = 42;
|
||||
//! let res = local_pool.add(example_data);
|
||||
//! assert!(res.is_ok());
|
||||
//! addr = res.unwrap();
|
||||
//! }
|
||||
//!
|
||||
//! {
|
||||
//! // Read the store data back
|
||||
//! let res = local_pool.read(&addr);
|
||||
//! assert!(res.is_ok());
|
||||
//! let buf_read_back = res.unwrap();
|
||||
//! assert_eq!(buf_read_back.len(), 4);
|
||||
//! assert_eq!(buf_read_back[0], 42);
|
||||
//! // Modify the stored data
|
||||
//! let res = local_pool.modify(&addr);
|
||||
//! assert!(res.is_ok());
|
||||
//! let buf_read_back = res.unwrap();
|
||||
//! buf_read_back[0] = 12;
|
||||
//! }
|
||||
//!
|
||||
//! {
|
||||
//! // Read the modified data back
|
||||
//! let res = local_pool.read(&addr);
|
||||
//! assert!(res.is_ok());
|
||||
//! let buf_read_back = res.unwrap();
|
||||
//! assert_eq!(buf_read_back.len(), 4);
|
||||
//! assert_eq!(buf_read_back[0], 12);
|
||||
//! }
|
||||
//!
|
||||
//! // Delete the stored data
|
||||
//! local_pool.delete(addr);
|
||||
//!
|
||||
//! // Get a free element in the pool with an appropriate size
|
||||
//! {
|
||||
//! let res = local_pool.free_element(12);
|
||||
//! assert!(res.is_ok());
|
||||
//! let (tmp, mut_buf) = res.unwrap();
|
||||
//! addr = tmp;
|
||||
//! mut_buf[0] = 7;
|
||||
//! }
|
||||
//!
|
||||
//! // Read back the data
|
||||
//! {
|
||||
//! // Read the store data back
|
||||
//! let res = local_pool.read(&addr);
|
||||
//! assert!(res.is_ok());
|
||||
//! let buf_read_back = res.unwrap();
|
||||
//! assert_eq!(buf_read_back.len(), 12);
|
||||
//! assert_eq!(buf_read_back[0], 7);
|
||||
//! }
|
||||
//! ```
|
||||
type NumBlocks = u16;
|
||||
|
||||
/// Configuration structure of the [local pool][LocalPool]
|
||||
///
|
||||
/// # Parameters
|
||||
///
|
||||
/// * `cfg`: Vector of tuples which represent a subpool. The first entry in the tuple specifies the
|
||||
/// number of memory blocks in the subpool, the second entry the size of the blocks
|
||||
pub struct PoolCfg {
|
||||
cfg: Vec<(NumBlocks, usize)>,
|
||||
}
|
||||
|
||||
impl PoolCfg {
|
||||
pub fn new(cfg: Vec<(NumBlocks, usize)>) -> Self {
|
||||
PoolCfg { cfg }
|
||||
}
|
||||
|
||||
pub fn sanitize(&mut self) -> usize {
|
||||
self.cfg
|
||||
.retain(|&(bucket_num, size)| bucket_num > 0 && size < LocalPool::MAX_SIZE);
|
||||
self.cfg
|
||||
.sort_unstable_by(|(_, sz0), (_, sz1)| sz0.partial_cmp(sz1).unwrap());
|
||||
self.cfg.len()
|
||||
}
|
||||
}
|
||||
|
||||
type PoolSize = usize;
|
||||
|
||||
/// Pool implementation providing sub-pools with fixed size memory blocks. More details in
|
||||
/// the [module documentation][super::pool]
|
||||
pub struct LocalPool {
|
||||
pool_cfg: PoolCfg,
|
||||
pool: Vec<Vec<u8>>,
|
||||
sizes_lists: Vec<Vec<PoolSize>>,
|
||||
}
|
||||
|
||||
/// Simple address type used for transactions with the local pool.
|
||||
#[derive(Debug, Copy, Clone, PartialEq)]
|
||||
pub struct StoreAddr {
|
||||
pool_idx: u16,
|
||||
packet_idx: NumBlocks,
|
||||
}
|
||||
|
||||
impl StoreAddr {
|
||||
pub const INVALID_ADDR: u32 = 0xFFFFFFFF;
|
||||
|
||||
pub fn raw(&self) -> u32 {
|
||||
((self.pool_idx as u32) << 16) as u32 | self.packet_idx as u32
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub enum StoreIdError {
|
||||
InvalidSubpool(u16),
|
||||
InvalidPacketIdx(u16),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub enum StoreError {
|
||||
/// Requested data block is too large
|
||||
DataTooLarge(usize),
|
||||
/// The store is full. Contains the index of the full subpool
|
||||
StoreFull(u16),
|
||||
/// Store ID is invalid. This also includes partial errors where only the subpool is invalid
|
||||
InvalidStoreId(StoreIdError, Option<StoreAddr>),
|
||||
/// Valid subpool and packet index, but no data is stored at the given address
|
||||
DataDoesNotExist(StoreAddr),
|
||||
/// Internal or configuration errors
|
||||
InternalError(String),
|
||||
}
|
||||
|
||||
impl LocalPool {
|
||||
const STORE_FREE: PoolSize = PoolSize::MAX;
|
||||
const MAX_SIZE: PoolSize = Self::STORE_FREE - 1;
|
||||
|
||||
/// Create a new local pool from the [given configuration][PoolCfg]. This function will sanitize
|
||||
/// the given configuration as well.
|
||||
pub fn new(mut cfg: PoolCfg) -> LocalPool {
|
||||
let subpools_num = cfg.sanitize();
|
||||
let mut local_pool = LocalPool {
|
||||
pool_cfg: cfg,
|
||||
pool: Vec::with_capacity(subpools_num),
|
||||
sizes_lists: Vec::with_capacity(subpools_num),
|
||||
};
|
||||
for &(num_elems, elem_size) in local_pool.pool_cfg.cfg.iter() {
|
||||
let next_pool_len = elem_size * num_elems as usize;
|
||||
local_pool.pool.push(vec![0; next_pool_len]);
|
||||
let next_sizes_list_len = num_elems as usize;
|
||||
local_pool
|
||||
.sizes_lists
|
||||
.push(vec![Self::STORE_FREE; next_sizes_list_len]);
|
||||
}
|
||||
local_pool
|
||||
}
|
||||
|
||||
/// Add new data to the pool. It will attempt to reserve a memory block with the appropriate
|
||||
/// size and then copy the given data to the block. Yields a [StoreAddr] which can be used
|
||||
/// to access the data stored in the pool
|
||||
pub fn add(&mut self, data: impl AsRef<[u8]>) -> Result<StoreAddr, StoreError> {
|
||||
let data_len = data.as_ref().len();
|
||||
if data_len > Self::MAX_SIZE {
|
||||
return Err(StoreError::DataTooLarge(data_len));
|
||||
}
|
||||
let addr = self.reserve(data_len)?;
|
||||
self.write(&addr, data.as_ref())?;
|
||||
Ok(addr)
|
||||
}
|
||||
|
||||
/// Reserves a free memory block with the appropriate size and returns a mutable reference
|
||||
/// to it. Yields a [StoreAddr] which can be used to access the data stored in the pool
|
||||
pub fn free_element(&mut self, len: usize) -> Result<(StoreAddr, &mut [u8]), StoreError> {
|
||||
if len > Self::MAX_SIZE {
|
||||
return Err(StoreError::DataTooLarge(len));
|
||||
}
|
||||
let addr = self.reserve(len)?;
|
||||
let raw_pos = self.raw_pos(&addr).unwrap();
|
||||
let block = &mut self.pool.get_mut(addr.pool_idx as usize).unwrap()[raw_pos..len];
|
||||
Ok((addr, block))
|
||||
}
|
||||
|
||||
/// Modify data added previously using a given [StoreAddr] by yielding a mutable reference
|
||||
/// to it
|
||||
pub fn modify(&mut self, addr: &StoreAddr) -> Result<&mut [u8], StoreError> {
|
||||
let curr_size = self.addr_check(addr)?;
|
||||
let raw_pos = self.raw_pos(addr).unwrap();
|
||||
let block = &mut self.pool.get_mut(addr.pool_idx as usize).unwrap()[raw_pos..curr_size];
|
||||
Ok(block)
|
||||
}
|
||||
|
||||
/// Read data by yielding a read-only reference given a [StoreAddr]
|
||||
pub fn read(&self, addr: &StoreAddr) -> Result<&[u8], StoreError> {
|
||||
let curr_size = self.addr_check(addr)?;
|
||||
let raw_pos = self.raw_pos(addr).unwrap();
|
||||
let block = &self.pool.get(addr.pool_idx as usize).unwrap()[raw_pos..curr_size];
|
||||
Ok(block)
|
||||
}
|
||||
|
||||
/// Delete data inside the pool given a [StoreAddr]
|
||||
pub fn delete(&mut self, addr: StoreAddr) -> Result<(), StoreError> {
|
||||
self.addr_check(&addr)?;
|
||||
let block_size = self.pool_cfg.cfg.get(addr.pool_idx as usize).unwrap().1;
|
||||
let raw_pos = self.raw_pos(&addr).unwrap();
|
||||
let block = &mut self.pool.get_mut(addr.pool_idx as usize).unwrap()[raw_pos..block_size];
|
||||
let size_list = self.sizes_lists.get_mut(addr.pool_idx as usize).unwrap();
|
||||
size_list[addr.packet_idx as usize] = Self::STORE_FREE;
|
||||
block.fill(0);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn addr_check(&self, addr: &StoreAddr) -> Result<usize, StoreError> {
|
||||
let pool_idx = addr.pool_idx as usize;
|
||||
if pool_idx as usize >= self.pool_cfg.cfg.len() {
|
||||
return Err(StoreError::InvalidStoreId(
|
||||
StoreIdError::InvalidSubpool(addr.pool_idx),
|
||||
Some(*addr),
|
||||
));
|
||||
}
|
||||
if addr.packet_idx >= self.pool_cfg.cfg[addr.pool_idx as usize].0 {
|
||||
return Err(StoreError::InvalidStoreId(
|
||||
StoreIdError::InvalidPacketIdx(addr.packet_idx),
|
||||
Some(*addr),
|
||||
));
|
||||
}
|
||||
let size_list = self.sizes_lists.get(pool_idx).unwrap();
|
||||
let curr_size = size_list[addr.packet_idx as usize];
|
||||
if curr_size == Self::STORE_FREE {
|
||||
return Err(StoreError::DataDoesNotExist(*addr));
|
||||
}
|
||||
Ok(curr_size)
|
||||
}
|
||||
|
||||
fn reserve(&mut self, data_len: usize) -> Result<StoreAddr, StoreError> {
|
||||
let subpool_idx = self.find_subpool(data_len, 0)?;
|
||||
let (slot, size_slot_ref) = self.find_empty(subpool_idx)?;
|
||||
*size_slot_ref = data_len;
|
||||
Ok(StoreAddr {
|
||||
pool_idx: subpool_idx,
|
||||
packet_idx: slot,
|
||||
})
|
||||
}
|
||||
|
||||
fn find_subpool(&self, req_size: usize, start_at_subpool: u16) -> Result<u16, StoreError> {
|
||||
for (i, &(_, elem_size)) in self.pool_cfg.cfg.iter().enumerate() {
|
||||
if i < start_at_subpool as usize {
|
||||
continue;
|
||||
}
|
||||
if elem_size >= req_size {
|
||||
return Ok(i as u16);
|
||||
}
|
||||
}
|
||||
Err(StoreError::DataTooLarge(req_size))
|
||||
}
|
||||
|
||||
fn write(&mut self, addr: &StoreAddr, data: &[u8]) -> Result<(), StoreError> {
|
||||
let packet_pos = self.raw_pos(addr).ok_or_else(|| {
|
||||
StoreError::InternalError(format!(
|
||||
"write: Error in raw_pos func with address {:?}",
|
||||
addr
|
||||
))
|
||||
})?;
|
||||
let subpool = self.pool.get_mut(addr.pool_idx as usize).ok_or_else(|| {
|
||||
StoreError::InternalError(format!(
|
||||
"write: Error retrieving pool slice with address {:?}",
|
||||
addr
|
||||
))
|
||||
})?;
|
||||
let pool_slice = &mut subpool[packet_pos..self.pool_cfg.cfg[addr.pool_idx as usize].1];
|
||||
pool_slice.copy_from_slice(data);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn find_empty(&mut self, subpool: u16) -> Result<(u16, &mut usize), StoreError> {
|
||||
if let Some(size_list) = self.sizes_lists.get_mut(subpool as usize) {
|
||||
for (i, elem_size) in size_list.iter_mut().enumerate() {
|
||||
if *elem_size == Self::STORE_FREE {
|
||||
return Ok((i as u16, elem_size));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return Err(StoreError::InvalidStoreId(
|
||||
StoreIdError::InvalidSubpool(subpool),
|
||||
None,
|
||||
));
|
||||
}
|
||||
Err(StoreError::StoreFull(subpool))
|
||||
}
|
||||
|
||||
fn raw_pos(&self, addr: &StoreAddr) -> Option<usize> {
|
||||
let (_, size) = self.pool_cfg.cfg.get(addr.pool_idx as usize)?;
|
||||
Some(addr.packet_idx as usize * size)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::pool::{LocalPool, PoolCfg, StoreAddr, StoreError, StoreIdError};
|
||||
|
||||
#[test]
|
||||
fn test_cfg() {
|
||||
// Values where number of buckets is 0 or size is too large should be removed
|
||||
let mut pool_cfg = PoolCfg::new(vec![(0, 0), (1, 0), (2, LocalPool::MAX_SIZE)]);
|
||||
pool_cfg.sanitize();
|
||||
assert_eq!(pool_cfg.cfg, vec![(1, 0)]);
|
||||
// Entries should be ordered according to bucket size
|
||||
pool_cfg = PoolCfg::new(vec![(16, 6), (32, 3), (8, 12)]);
|
||||
pool_cfg.sanitize();
|
||||
assert_eq!(pool_cfg.cfg, vec![(32, 3), (16, 6), (8, 12)]);
|
||||
// Unstable sort is used, so order of entries with same block length should not matter
|
||||
pool_cfg = PoolCfg::new(vec![(12, 12), (14, 16), (10, 12)]);
|
||||
pool_cfg.sanitize();
|
||||
assert!(
|
||||
pool_cfg.cfg == vec![(12, 12), (10, 12), (14, 16)]
|
||||
|| pool_cfg.cfg == vec![(10, 12), (12, 12), (14, 16)]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_basic() {
|
||||
// 4 buckets of 4 bytes, 2 of 8 bytes and 1 of 16 bytes
|
||||
let pool_cfg = PoolCfg::new(vec![(4, 4), (2, 8), (1, 16)]);
|
||||
let mut local_pool = LocalPool::new(pool_cfg);
|
||||
// Try to access data which does not exist
|
||||
let res = local_pool.read(&StoreAddr {
|
||||
packet_idx: 0,
|
||||
pool_idx: 0,
|
||||
});
|
||||
assert!(res.is_err());
|
||||
assert!(matches!(
|
||||
res.unwrap_err(),
|
||||
StoreError::DataDoesNotExist { .. }
|
||||
));
|
||||
let mut test_buf: [u8; 16] = [0; 16];
|
||||
for (i, val) in test_buf.iter_mut().enumerate() {
|
||||
*val = i as u8;
|
||||
}
|
||||
let res = local_pool.add(test_buf);
|
||||
assert!(res.is_ok());
|
||||
let addr = res.unwrap();
|
||||
// Only the second subpool has enough storage and only one bucket
|
||||
assert_eq!(
|
||||
addr,
|
||||
StoreAddr {
|
||||
pool_idx: 2,
|
||||
packet_idx: 0
|
||||
}
|
||||
);
|
||||
|
||||
// The subpool is now full and the call should fail accordingly
|
||||
let res = local_pool.add(test_buf);
|
||||
assert!(res.is_err());
|
||||
let err = res.unwrap_err();
|
||||
assert!(matches!(err, StoreError::StoreFull { .. }));
|
||||
if let StoreError::StoreFull(subpool) = err {
|
||||
assert_eq!(subpool, 2);
|
||||
}
|
||||
|
||||
// Read back data and verify correctness
|
||||
let res = local_pool.read(&addr);
|
||||
assert!(res.is_ok());
|
||||
let buf_read_back = res.unwrap();
|
||||
assert_eq!(buf_read_back.len(), 16);
|
||||
for (i, &val) in buf_read_back.iter().enumerate() {
|
||||
assert_eq!(val, i as u8);
|
||||
}
|
||||
|
||||
// Delete the data
|
||||
let res = local_pool.delete(addr);
|
||||
assert!(res.is_ok());
|
||||
|
||||
{
|
||||
// Verify that the slot is free by trying to get a reference to it
|
||||
let res = local_pool.free_element(12);
|
||||
assert!(res.is_ok());
|
||||
let (addr, buf_ref) = res.unwrap();
|
||||
assert_eq!(
|
||||
addr,
|
||||
StoreAddr {
|
||||
pool_idx: 2,
|
||||
packet_idx: 0
|
||||
}
|
||||
);
|
||||
assert_eq!(buf_ref.len(), 12);
|
||||
assert_eq!(buf_ref, [0; 12]);
|
||||
buf_ref[0] = 5;
|
||||
buf_ref[11] = 12;
|
||||
}
|
||||
|
||||
{
|
||||
// Try to request a slot which is too large
|
||||
let res = local_pool.free_element(20);
|
||||
assert!(res.is_err());
|
||||
assert_eq!(res.unwrap_err(), StoreError::DataTooLarge(20));
|
||||
|
||||
// Try to modify the 12 bytes requested previously
|
||||
let res = local_pool.modify(&addr);
|
||||
assert!(res.is_ok());
|
||||
let buf_ref = res.unwrap();
|
||||
assert_eq!(buf_ref[0], 5);
|
||||
assert_eq!(buf_ref[11], 12);
|
||||
buf_ref[0] = 0;
|
||||
buf_ref[11] = 0;
|
||||
}
|
||||
|
||||
{
|
||||
let addr = StoreAddr {
|
||||
pool_idx: 3,
|
||||
packet_idx: 0,
|
||||
};
|
||||
let res = local_pool.read(&addr);
|
||||
assert!(res.is_err());
|
||||
let err = res.unwrap_err();
|
||||
assert!(matches!(
|
||||
err,
|
||||
StoreError::InvalidStoreId(StoreIdError::InvalidSubpool(3), Some(_))
|
||||
));
|
||||
}
|
||||
|
||||
{
|
||||
let addr = StoreAddr {
|
||||
pool_idx: 2,
|
||||
packet_idx: 1,
|
||||
};
|
||||
assert_eq!(addr.raw(), 0x00020001);
|
||||
let res = local_pool.read(&addr);
|
||||
assert!(res.is_err());
|
||||
let err = res.unwrap_err();
|
||||
assert!(matches!(
|
||||
err,
|
||||
StoreError::InvalidStoreId(StoreIdError::InvalidPacketIdx(1), Some(_))
|
||||
));
|
||||
|
||||
let data_too_large = [0; 20];
|
||||
let res = local_pool.add(data_too_large);
|
||||
assert!(res.is_err());
|
||||
let err = res.unwrap_err();
|
||||
assert_eq!(err, StoreError::DataTooLarge(20));
|
||||
|
||||
let res = local_pool.free_element(LocalPool::MAX_SIZE + 1);
|
||||
assert!(res.is_err());
|
||||
assert_eq!(
|
||||
res.unwrap_err(),
|
||||
StoreError::DataTooLarge(LocalPool::MAX_SIZE + 1)
|
||||
);
|
||||
}
|
||||
|
||||
{
|
||||
// Reserve two smaller blocks consecutively and verify that the third reservation fails
|
||||
let res = local_pool.free_element(8);
|
||||
assert!(res.is_ok());
|
||||
let (addr0, _) = res.unwrap();
|
||||
let res = local_pool.free_element(8);
|
||||
assert!(res.is_ok());
|
||||
let (addr1, _) = res.unwrap();
|
||||
let res = local_pool.free_element(8);
|
||||
assert!(res.is_err());
|
||||
let err = res.unwrap_err();
|
||||
assert_eq!(err, StoreError::StoreFull(1));
|
||||
|
||||
// Verify that the two deletions are successful
|
||||
assert!(local_pool.delete(addr0).is_ok());
|
||||
assert!(local_pool.delete(addr1).is_ok());
|
||||
}
|
||||
}
|
||||
}
|
92
fsrc-core/src/pool_test.rs
Normal file
92
fsrc-core/src/pool_test.rs
Normal file
@ -0,0 +1,92 @@
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::thread;
|
||||
|
||||
struct PoolDummy {
|
||||
test_buf: [u8; 128],
|
||||
}
|
||||
|
||||
struct PoolAccessDummy<'a> {
|
||||
pool_dummy: &'a mut PoolDummy,
|
||||
no_deletion: bool,
|
||||
}
|
||||
|
||||
impl PoolAccessDummy<'_> {
|
||||
fn modify(&mut self) -> &mut [u8] {
|
||||
self.pool_dummy.modify()
|
||||
}
|
||||
|
||||
fn release(&mut self) {
|
||||
self.no_deletion = true;
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for PoolAccessDummy<'_> {
|
||||
fn drop(&mut self) {
|
||||
if self.no_deletion {
|
||||
println!("Pool access: Drop with no deletion")
|
||||
} else {
|
||||
self.pool_dummy.delete();
|
||||
println!("Pool access: Drop with deletion");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for PoolDummy {
|
||||
fn default() -> Self {
|
||||
PoolDummy { test_buf: [0; 128] }
|
||||
}
|
||||
}
|
||||
|
||||
impl PoolDummy {
|
||||
fn modify(&mut self) -> &mut [u8] {
|
||||
self.test_buf.as_mut_slice()
|
||||
}
|
||||
|
||||
fn modify_with_accessor(&mut self) -> PoolAccessDummy {
|
||||
PoolAccessDummy {
|
||||
pool_dummy: self,
|
||||
no_deletion: false,
|
||||
}
|
||||
}
|
||||
|
||||
fn read(&self) -> &[u8] {
|
||||
self.test_buf.as_slice()
|
||||
}
|
||||
|
||||
fn delete(&mut self) {
|
||||
println!("Store content was deleted");
|
||||
}
|
||||
}
|
||||
|
||||
fn pool_test() {
|
||||
println!("Hello World");
|
||||
let shared_dummy = Arc::new(RwLock::new(PoolDummy::default()));
|
||||
let shared_clone = shared_dummy.clone();
|
||||
let jh0 = thread::spawn(move || loop {
|
||||
{
|
||||
let mut dummy = shared_dummy.write().unwrap();
|
||||
let buf = dummy.modify();
|
||||
buf[0] = 1;
|
||||
|
||||
let mut accessor = dummy.modify_with_accessor();
|
||||
let buf = accessor.modify();
|
||||
buf[0] = 2;
|
||||
}
|
||||
});
|
||||
|
||||
let jh1 = thread::spawn(move || loop {
|
||||
{
|
||||
let dummy = shared_clone.read().unwrap();
|
||||
let buf = dummy.read();
|
||||
println!("Buffer 0: {:?}", buf[0]);
|
||||
}
|
||||
|
||||
let mut dummy = shared_clone.write().unwrap();
|
||||
let mut accessor = dummy.modify_with_accessor();
|
||||
let buf = accessor.modify();
|
||||
buf[0] = 3;
|
||||
accessor.release();
|
||||
});
|
||||
jh0.join().unwrap();
|
||||
jh1.join().unwrap();
|
||||
}
|
Reference in New Issue
Block a user