diff --git a/satrs-core/Cargo.toml b/satrs-core/Cargo.toml index 41e977e..22e9d5b 100644 --- a/satrs-core/Cargo.toml +++ b/satrs-core/Cargo.toml @@ -64,7 +64,7 @@ optional = true # version = "0.6" # path = "../spacepackets" git = "https://egit.irs.uni-stuttgart.de/rust/spacepackets.git" -rev = "e3d2d885385" +rev = "784564a20ed" default-features = false [dev-dependencies] diff --git a/satrs-core/src/event_man.rs b/satrs-core/src/event_man.rs index 3fb8d68..c3d6182 100644 --- a/satrs-core/src/event_man.rs +++ b/satrs-core/src/event_man.rs @@ -66,7 +66,7 @@ use core::slice::Iter; #[cfg(feature = "alloc")] use hashbrown::HashMap; -use crate::SenderId; +use crate::ChannelId; #[cfg(feature = "std")] pub use stdmod::*; @@ -88,7 +88,7 @@ pub type EventU16WithAuxData = EventWithAuxData; pub trait SendEventProvider { type Error; - fn id(&self) -> SenderId; + fn id(&self) -> ChannelId; fn send_no_data(&self, event: Provider) -> Result<(), Self::Error> { self.send(event, None) } @@ -109,16 +109,16 @@ pub trait EventReceiver { pub trait ListenerTable { fn get_listeners(&self) -> Vec; fn contains_listener(&self, key: &ListenerKey) -> bool; - fn get_listener_ids(&self, key: &ListenerKey) -> Option>; - fn add_listener(&mut self, key: ListenerKey, sender_id: SenderId) -> bool; + fn get_listener_ids(&self, key: &ListenerKey) -> Option>; + fn add_listener(&mut self, key: ListenerKey, sender_id: ChannelId) -> bool; fn remove_duplicates(&mut self, key: &ListenerKey); } pub trait SenderTable { - fn contains_send_event_provider(&self, id: &SenderId) -> bool; + fn contains_send_event_provider(&self, id: &ChannelId) -> bool; fn get_send_event_provider( &mut self, - id: &SenderId, + id: &ChannelId, ) -> Option<&mut Box>>; fn add_send_event_provider( &mut self, @@ -171,7 +171,7 @@ pub enum EventRoutingResult { pub enum EventRoutingError { SendError(E), NoSendersForKey(ListenerKey), - NoSenderForId(SenderId), + NoSenderForId(ChannelId), } #[derive(Debug)] @@ -186,12 +186,12 @@ impl EventManager { } /// Subscribe for a unique event. - pub fn subscribe_single(&mut self, event: &Event, sender_id: SenderId) { + pub fn subscribe_single(&mut self, event: &Event, sender_id: ChannelId) { self.update_listeners(ListenerKey::Single(event.raw_as_largest_type()), sender_id); } /// Subscribe for an event group. - pub fn subscribe_group(&mut self, group_id: LargestGroupIdRaw, sender_id: SenderId) { + pub fn subscribe_group(&mut self, group_id: LargestGroupIdRaw, sender_id: ChannelId) { self.update_listeners(ListenerKey::Group(group_id), sender_id); } @@ -199,7 +199,7 @@ impl EventManager { /// /// For example, this can be useful for a handler component which sends every event as /// a telemetry packet. - pub fn subscribe_all(&mut self, sender_id: SenderId) { + pub fn subscribe_all(&mut self, sender_id: ChannelId) { self.update_listeners(ListenerKey::All, sender_id); } } @@ -245,7 +245,7 @@ impl } } - fn update_listeners(&mut self, key: ListenerKey, sender_id: SenderId) { + fn update_listeners(&mut self, key: ListenerKey, sender_id: ChannelId) { self.listener_table.add_listener(key, sender_id); } @@ -311,7 +311,7 @@ impl #[derive(Default)] pub struct DefaultListenerTableProvider { - listeners: HashMap>, + listeners: HashMap>, } pub struct DefaultSenderTableProvider< @@ -320,7 +320,7 @@ pub struct DefaultSenderTableProvider< AuxDataProvider = Params, > { senders: HashMap< - SenderId, + ChannelId, Box>, >, } @@ -348,11 +348,11 @@ impl ListenerTable for DefaultListenerTableProvider { self.listeners.contains_key(key) } - fn get_listener_ids(&self, key: &ListenerKey) -> Option> { + fn get_listener_ids(&self, key: &ListenerKey) -> Option> { self.listeners.get(key).map(|vec| vec.iter()) } - fn add_listener(&mut self, key: ListenerKey, sender_id: SenderId) -> bool { + fn add_listener(&mut self, key: ListenerKey, sender_id: ChannelId) -> bool { if let Some(existing_list) = self.listeners.get_mut(&key) { existing_list.push(sender_id); } else { @@ -374,13 +374,13 @@ impl SenderTable for DefaultSenderTableProvider { - fn contains_send_event_provider(&self, id: &SenderId) -> bool { + fn contains_send_event_provider(&self, id: &ChannelId) -> bool { self.senders.contains_key(id) } fn get_send_event_provider( &mut self, - id: &SenderId, + id: &ChannelId, ) -> Option<&mut Box>> { self.senders.get_mut(id).filter(|sender| sender.id() == *id) @@ -486,7 +486,7 @@ mod tests { fn id(&self) -> u32 { self.id } - fn send(&mut self, event: EventU32, aux_data: Option) -> Result<(), Self::Error> { + fn send(&self, event: EventU32, aux_data: Option) -> Result<(), Self::Error> { self.mpsc_sender.send((event, aux_data)) } } diff --git a/satrs-core/src/lib.rs b/satrs-core/src/lib.rs index 4fed5c6..940300e 100644 --- a/satrs-core/src/lib.rs +++ b/satrs-core/src/lib.rs @@ -45,5 +45,5 @@ pub mod tmtc; pub use spacepackets; -// Generic sender ID type. -pub type SenderId = u32; +// Generic channel ID type. +pub type ChannelId = u32; diff --git a/satrs-core/src/pus/event.rs b/satrs-core/src/pus/event.rs index b1faf74..c1dfc5d 100644 --- a/satrs-core/src/pus/event.rs +++ b/satrs-core/src/pus/event.rs @@ -244,7 +244,7 @@ mod tests { use crate::events::{EventU32, Severity}; use crate::pus::tests::CommonTmInfo; use crate::pus::{EcssChannel, PusTmWrapper}; - use crate::SenderId; + use crate::ChannelId; use spacepackets::ByteConversionError; use std::cell::RefCell; use std::collections::VecDeque; @@ -269,7 +269,7 @@ mod tests { } impl EcssChannel for TestSender { - fn id(&self) -> SenderId { + fn id(&self) -> ChannelId { 0 } } @@ -425,9 +425,9 @@ mod tests { let err = reporter.event_info(sender, &time_stamp_empty, event, None); assert!(err.is_err()); let err = err.unwrap_err(); - if let EcssTmtcError::EcssTmtcError(EcssTmtcError::ByteConversion( - ByteConversionError::ToSliceTooSmall(missmatch), - )) = err + if let EcssTmtcError::Pus(PusError::ByteConversion(ByteConversionError::ToSliceTooSmall( + missmatch, + ))) = err { assert_eq!(missmatch.expected, 4); assert_eq!(missmatch.found, expected_found_len); diff --git a/satrs-core/src/pus/event_man.rs b/satrs-core/src/pus/event_man.rs index 4c9b6d8..132e24c 100644 --- a/satrs-core/src/pus/event_man.rs +++ b/satrs-core/src/pus/event_man.rs @@ -223,7 +223,7 @@ pub mod alloc_mod { self.backend.disable_event_reporting(event.as_ref()) } - pub fn generate_pus_event_tm( + pub fn generate_pus_event_tm( &mut self, sender: &mut (impl EcssTmSenderCore + ?Sized), time_stamp: &[u8], diff --git a/satrs-core/src/pus/event_srv.rs b/satrs-core/src/pus/event_srv.rs index 2230200..8f3abb4 100644 --- a/satrs-core/src/pus/event_srv.rs +++ b/satrs-core/src/pus/event_srv.rs @@ -1,18 +1,18 @@ use crate::events::EventU32; -use crate::pool::{PoolGuard, SharedPool, StoreAddr}; +use crate::pool::{SharedPool, StoreAddr}; use crate::pus::event_man::{EventRequest, EventRequestWithToken}; use crate::pus::verification::{ StdVerifReporterWithSender, TcStateAccepted, TcStateToken, VerificationToken, }; use crate::pus::{ - AcceptedTc, EcssTcReceiver, EcssTmSender, PartialPusHandlingError, PusPacketHandlerResult, - PusPacketHandlingError, PusServiceBase, PusServiceHandler, ReceivedTcWrapper, + EcssTcReceiver, EcssTmSender, PartialPusHandlingError, PusPacketHandlerResult, + PusPacketHandlingError, PusServiceBase, PusServiceHandler, }; use spacepackets::ecss::event::Subservice; use spacepackets::ecss::PusPacket; use spacepackets::tc::PusTc; use std::boxed::Box; -use std::sync::mpsc::{Receiver, Sender}; +use std::sync::mpsc::Sender; pub struct PusService5EventHandler { psb: PusServiceBase, @@ -22,13 +22,20 @@ pub struct PusService5EventHandler { impl PusService5EventHandler { pub fn new( tc_receiver: Box, + shared_tc_store: SharedPool, tm_sender: Box, tm_apid: u16, verification_handler: StdVerifReporterWithSender, event_request_tx: Sender, ) -> Self { Self { - psb: PusServiceBase::new(tc_receiver, tm_sender, tm_apid, verification_handler), + psb: PusServiceBase::new( + tc_receiver, + shared_tc_store, + tm_sender, + tm_apid, + verification_handler, + ), event_request_tx, } } @@ -44,16 +51,17 @@ impl PusServiceHandler for PusService5EventHandler { fn handle_one_tc( &mut self, - tc: PusTc, - _tc_guard: PoolGuard, + addr: StoreAddr, token: VerificationToken, ) -> Result { + self.copy_tc_to_buf(addr)?; + let (tc, _) = PusTc::from_bytes(&self.psb.pus_buf)?; let subservice = tc.subservice(); let srv = Subservice::try_from(subservice); if srv.is_err() { return Ok(PusPacketHandlerResult::CustomSubservice( tc.subservice(), - token.into(), + token, )); } let handle_enable_disable_request = |enable: bool, stamp: [u8; 7]| { @@ -115,8 +123,7 @@ impl PusServiceHandler for PusService5EventHandler { } Subservice::TcReportDisabledList | Subservice::TmDisabledEventsReport => { return Ok(PusPacketHandlerResult::SubserviceNotImplemented( - subservice, - token.into(), + subservice, token, )); } } diff --git a/satrs-core/src/pus/mod.rs b/satrs-core/src/pus/mod.rs index 565aecf..0defc10 100644 --- a/satrs-core/src/pus/mod.rs +++ b/satrs-core/src/pus/mod.rs @@ -2,7 +2,7 @@ //! //! This module contains structures to make working with the PUS C standard easier. //! The satrs-example application contains various usage examples of these components. -use crate::SenderId; +use crate::ChannelId; use core::fmt::{Display, Formatter}; #[cfg(feature = "alloc")] use downcast_rs::{impl_downcast, Downcast}; @@ -28,7 +28,7 @@ pub mod verification; #[cfg(feature = "alloc")] pub use alloc_mod::*; -use crate::pool::{PoolGuard, PoolRwGuard, StoreAddr, StoreError}; +use crate::pool::{StoreAddr, StoreError}; use crate::pus::verification::{TcStateAccepted, TcStateToken, VerificationToken}; #[cfg(feature = "std")] pub use std_mod::*; @@ -175,7 +175,7 @@ impl Error for EcssTmtcError { } pub trait EcssChannel: Send { /// Each sender can have an ID associated with it - fn id(&self) -> SenderId; + fn id(&self) -> ChannelId; fn name(&self) -> &'static str { "unset" } @@ -196,15 +196,38 @@ pub trait EcssTcSenderCore: EcssChannel { fn send_tc(&self, tc: PusTc, token: Option) -> Result<(), EcssTmtcError>; } -pub struct ReceivedTcWrapper<'raw_tc> { - pub pool_guard: PoolGuard<'raw_tc>, - pub tc: PusTc<'raw_tc>, +pub struct ReceivedTcWrapper { + pub store_addr: StoreAddr, pub token: Option, } +#[derive(Debug, Clone)] +pub enum TryRecvTmtcError { + Error(EcssTmtcError), + Empty, +} + +impl From for TryRecvTmtcError { + fn from(value: EcssTmtcError) -> Self { + Self::Error(value) + } +} + +impl From for TryRecvTmtcError { + fn from(value: PusError) -> Self { + Self::Error(value.into()) + } +} + +impl From for TryRecvTmtcError { + fn from(value: StoreError) -> Self { + Self::Error(value.into()) + } +} + /// Generic trait for a user supplied receiver object. pub trait EcssTcReceiverCore: EcssChannel { - fn recv_tc<'buf>(&self, buf: &'buf mut [u8]) -> Result, EcssTmtcError>; + fn recv_tc(&self) -> Result; } /// Generic trait for objects which can receive ECSS PUS telecommands. This trait is @@ -271,37 +294,34 @@ mod alloc_mod { /// [Clone]. #[cfg(feature = "alloc")] #[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))] - pub trait EcssTcReceiver: EcssTcReceiverCore + Downcast + DynClone {} + pub trait EcssTcReceiver: EcssTcReceiverCore + Downcast {} /// Blanket implementation for all types which implement [EcssTcReceiverCore] and are clonable. - impl EcssTcReceiver for T where T: EcssTcReceiverCore + Clone + 'static {} + impl EcssTcReceiver for T where T: EcssTcReceiverCore + 'static {} - dyn_clone::clone_trait_object!(EcssTcReceiver); impl_downcast!(EcssTcReceiver); } #[cfg(feature = "std")] pub mod std_mod { - use crate::pool::{PoolGuard, SharedPool, StoreAddr}; + use crate::pool::{SharedPool, StoreAddr}; use crate::pus::verification::{ - StdVerifReporterWithSender, TcStateAccepted, TcStateToken, VerificationToken, + StdVerifReporterWithSender, TcStateAccepted, VerificationToken, }; use crate::pus::{ - AcceptedTc, EcssChannel, EcssTcReceiver, EcssTcReceiverCore, EcssTmSender, - EcssTmSenderCore, EcssTmtcError, GenericRecvError, GenericSendError, PusTmWrapper, - ReceivedTcWrapper, TcAddrWithToken, + EcssChannel, EcssTcReceiver, EcssTcReceiverCore, EcssTmSender, EcssTmSenderCore, + EcssTmtcError, GenericRecvError, GenericSendError, PusTmWrapper, ReceivedTcWrapper, + TcAddrWithToken, TryRecvTmtcError, }; use crate::tmtc::tm_helper::SharedTmStore; - use crate::SenderId; + use crate::ChannelId; use alloc::boxed::Box; use alloc::vec::Vec; use spacepackets::ecss::PusError; - use spacepackets::tc::PusTc; use spacepackets::time::cds::TimeProvider; - use spacepackets::time::std_mod::StdTimestampError; + use spacepackets::time::StdTimestampError; use spacepackets::time::TimeWriter; use spacepackets::tm::PusTm; - use spacepackets::{ByteConversionError, SizeMissmatch}; use std::cell::RefCell; use std::string::String; use std::sync::mpsc; @@ -310,7 +330,7 @@ pub mod std_mod { #[derive(Clone)] pub struct MpscTmInStoreSender { - id: SenderId, + id: ChannelId, name: &'static str, shared_tm_store: SharedTmStore, sender: mpsc::Sender, @@ -318,7 +338,7 @@ pub mod std_mod { } impl EcssChannel for MpscTmInStoreSender { - fn id(&self) -> SenderId { + fn id(&self) -> ChannelId { self.id } @@ -352,7 +372,7 @@ pub mod std_mod { impl MpscTmInStoreSender { pub fn new( - id: SenderId, + id: ChannelId, name: &'static str, shared_tm_store: SharedTmStore, sender: mpsc::Sender, @@ -368,15 +388,14 @@ pub mod std_mod { } pub struct MpscTcInStoreReceiver { - id: SenderId, + id: ChannelId, name: &'static str, - shared_tc_pool: SharedPool, receiver: mpsc::Receiver, pub ignore_poison_errors: bool, } impl EcssChannel for MpscTcInStoreReceiver { - fn id(&self) -> SenderId { + fn id(&self) -> ChannelId { self.id } @@ -386,36 +405,32 @@ pub mod std_mod { } impl EcssTcReceiverCore for MpscTcInStoreReceiver { - fn recv_tc<'buf>( - &self, - buf: &'buf mut [u8], - ) -> Result, EcssTmtcError> { - let (addr, token) = self.receiver.try_recv().map_err(|e| match e { - TryRecvError::Empty => GenericRecvError::Empty, - TryRecvError::Disconnected => GenericRecvError::TxDisconnected, + fn recv_tc(&self) -> Result { + let (store_addr, token) = self.receiver.try_recv().map_err(|e| match e { + TryRecvError::Empty => TryRecvTmtcError::Empty, + TryRecvError::Disconnected => { + TryRecvTmtcError::Error(EcssTmtcError::from(GenericRecvError::TxDisconnected)) + } })?; - let mut shared_tc_pool = self - .shared_tc_pool - .read() - .map_err(|_| EcssTmtcError::StoreLock)?; - let pool_guard = shared_tc_pool.read_with_guard(addr); - let tc_raw = pool_guard.read()?; - if buf.len() < tc_raw.len() { - return Err( - PusError::ByteConversion(ByteConversionError::ToSliceTooSmall(SizeMissmatch { - found: buf.len(), - expected: tc_raw.len(), - })) - .into(), - ); - } - buf[..tc_raw.len()].copy_from_slice(tc_raw); - let (tc, _) = PusTc::from_bytes(buf)?; - Ok((ReceivedTcWrapper { - tc, - pool_guard, + Ok(ReceivedTcWrapper { + store_addr, token: Some(token), - })) + }) + } + } + + impl MpscTcInStoreReceiver { + pub fn new( + id: ChannelId, + name: &'static str, + receiver: mpsc::Receiver, + ) -> Self { + Self { + id, + name, + receiver, + ignore_poison_errors: false, + } } } @@ -425,7 +440,7 @@ pub mod std_mod { /// going to be called with direct packets. #[derive(Clone)] pub struct MpscTmAsVecSender { - id: SenderId, + id: ChannelId, sender: mpsc::Sender>, name: &'static str, } @@ -443,7 +458,7 @@ pub mod std_mod { } impl EcssChannel for MpscTmAsVecSender { - fn id(&self) -> SenderId { + fn id(&self) -> ChannelId { self.id } fn name(&self) -> &'static str { @@ -502,8 +517,8 @@ pub mod std_mod { pub enum PusPacketHandlerResult { RequestHandled, RequestHandledPartialSuccess(PartialPusHandlingError), - SubserviceNotImplemented(u8, TcStateToken), - CustomSubservice(u8, TcStateToken), + SubserviceNotImplemented(u8, VerificationToken), + CustomSubservice(u8, VerificationToken), Empty, } @@ -518,6 +533,7 @@ pub mod std_mod { /// is constrained to the [StdVerifReporterWithSender]. pub struct PusServiceBase { pub tc_receiver: Box, + pub shared_tc_store: SharedPool, pub tm_sender: Box, pub tm_apid: u16, /// The verification handler is wrapped in a [RefCell] to allow the interior mutability @@ -530,12 +546,14 @@ pub mod std_mod { impl PusServiceBase { pub fn new( tc_receiver: Box, + shared_tc_store: SharedPool, tm_sender: Box, tm_apid: u16, verification_handler: StdVerifReporterWithSender, ) -> Self { Self { tc_receiver, + shared_tc_store, tm_apid, tm_sender, verification_handler: RefCell::new(verification_handler), @@ -571,31 +589,37 @@ pub mod std_mod { fn psb(&self) -> &PusServiceBase; fn handle_one_tc( &mut self, - tc: PusTc, - tc_guard: PoolGuard, + addr: StoreAddr, token: VerificationToken, ) -> Result; + fn copy_tc_to_buf(&mut self, addr: StoreAddr) -> Result<(), PusPacketHandlingError> { + // Keep locked section as short as possible. + let psb_mut = self.psb_mut(); + let mut tc_pool = psb_mut + .shared_tc_store + .write() + .map_err(|_| PusPacketHandlingError::EcssTmtc(EcssTmtcError::StoreLock))?; + let tc_guard = tc_pool.read_with_guard(addr); + let tc_raw = tc_guard.read().unwrap(); + psb_mut.pus_buf[0..tc_raw.len()].copy_from_slice(tc_raw); + Ok(()) + } + fn handle_next_packet(&mut self) -> Result { - match self.psb().tc_receiver.recv_tc(&mut self.psb_mut().pus_buf) { - Ok(ReceivedTcWrapper { - tc, - pool_guard, - token, - }) => { + match self.psb().tc_receiver.recv_tc() { + Ok(ReceivedTcWrapper { store_addr, token }) => { if token.is_none() { return Err(PusPacketHandlingError::InvalidVerificationToken); } let token = token.unwrap(); - self.handle_one_tc(tc, pool_guard, token.try_into().unwrap()) + let accepted_token = VerificationToken::::try_from(token) + .map_err(|_| PusPacketHandlingError::InvalidVerificationToken)?; + self.handle_one_tc(store_addr, accepted_token) } Err(e) => match e { - EcssTmtcError::StoreLock => {} - EcssTmtcError::Store(_) => {} - EcssTmtcError::Pus(_) => {} - EcssTmtcError::CantSendAddr(_) => {} - EcssTmtcError::Send(_) => {} - EcssTmtcError::Recv(_) => {} + TryRecvTmtcError::Error(e) => Err(PusPacketHandlingError::EcssTmtc(e)), + TryRecvTmtcError::Empty => Ok(PusPacketHandlerResult::Empty), }, } } diff --git a/satrs-core/src/pus/scheduler_srv.rs b/satrs-core/src/pus/scheduler_srv.rs index 059a66d..52fc33a 100644 --- a/satrs-core/src/pus/scheduler_srv.rs +++ b/satrs-core/src/pus/scheduler_srv.rs @@ -1,15 +1,14 @@ -use crate::pool::{PoolGuard, SharedPool, StoreAddr}; +use crate::pool::{SharedPool, StoreAddr}; use crate::pus::scheduler::PusScheduler; use crate::pus::verification::{StdVerifReporterWithSender, TcStateAccepted, VerificationToken}; use crate::pus::{ - AcceptedTc, EcssTcReceiver, EcssTmSender, PusPacketHandlerResult, PusPacketHandlingError, - PusServiceBase, PusServiceHandler, ReceivedTcWrapper, + EcssTcReceiver, EcssTmSender, PusPacketHandlerResult, PusPacketHandlingError, PusServiceBase, + PusServiceHandler, }; use spacepackets::ecss::{scheduling, PusPacket}; use spacepackets::tc::PusTc; use spacepackets::time::cds::TimeProvider; use std::boxed::Box; -use std::sync::mpsc::Receiver; /// This is a helper class for [std] environments to handle generic PUS 11 (scheduling service) /// packets. This handler is constrained to using the [PusScheduler], but is able to process @@ -21,22 +20,26 @@ use std::sync::mpsc::Receiver; /// telecommands when applicable. pub struct PusService11SchedHandler { psb: PusServiceBase, - shared_tc_store: SharedPool, scheduler: PusScheduler, } impl PusService11SchedHandler { pub fn new( tc_receiver: Box, + shared_tc_store: SharedPool, tm_sender: Box, tm_apid: u16, verification_handler: StdVerifReporterWithSender, - shared_tc_store: SharedPool, scheduler: PusScheduler, ) -> Self { Self { - psb: PusServiceBase::new(tc_receiver, tm_sender, tm_apid, verification_handler), - shared_tc_store, + psb: PusServiceBase::new( + tc_receiver, + shared_tc_store, + tm_sender, + tm_apid, + verification_handler, + ), scheduler, } } @@ -60,15 +63,17 @@ impl PusServiceHandler for PusService11SchedHandler { fn handle_one_tc( &mut self, - tc: PusTc, - tc_guard: PoolGuard, + addr: StoreAddr, token: VerificationToken, ) -> Result { - let std_service = scheduling::Subservice::try_from(tc.subservice()); + self.copy_tc_to_buf(addr)?; + let (tc, _) = PusTc::from_bytes(&self.psb.pus_buf)?; + let subservice = tc.subservice(); + let std_service = scheduling::Subservice::try_from(subservice); if std_service.is_err() { return Ok(PusPacketHandlerResult::CustomSubservice( tc.subservice(), - token.into(), + token, )); } let mut partial_error = None; @@ -120,7 +125,11 @@ impl PusServiceHandler for PusService11SchedHandler { .start_success(token, Some(&time_stamp)) .expect("Error sending start success"); - let mut pool = self.shared_tc_store.write().expect("Locking pool failed"); + let mut pool = self + .psb + .shared_tc_store + .write() + .expect("Locking pool failed"); self.scheduler .reset(pool.as_mut()) @@ -140,7 +149,11 @@ impl PusServiceHandler for PusService11SchedHandler { .start_success(token, Some(&time_stamp)) .expect("error sending start success"); - let mut pool = self.shared_tc_store.write().expect("locking pool failed"); + let mut pool = self + .psb + .shared_tc_store + .write() + .expect("locking pool failed"); self.scheduler .insert_wrapped_tc::(&tc, pool.as_mut()) .expect("insertion of activity into pool failed"); @@ -154,7 +167,7 @@ impl PusServiceHandler for PusService11SchedHandler { _ => { return Ok(PusPacketHandlerResult::CustomSubservice( tc.subservice(), - token.into(), + token, )); } } @@ -165,7 +178,7 @@ impl PusServiceHandler for PusService11SchedHandler { } Ok(PusPacketHandlerResult::CustomSubservice( tc.subservice(), - token.into(), + token, )) } } diff --git a/satrs-core/src/pus/test.rs b/satrs-core/src/pus/test.rs index f593b3e..178493b 100644 --- a/satrs-core/src/pus/test.rs +++ b/satrs-core/src/pus/test.rs @@ -1,15 +1,14 @@ -use crate::pool::{PoolGuard, SharedPool, StoreAddr}; +use crate::pool::{SharedPool, StoreAddr}; use crate::pus::verification::{StdVerifReporterWithSender, TcStateAccepted, VerificationToken}; use crate::pus::{ - AcceptedTc, EcssTcReceiver, EcssTmSender, PartialPusHandlingError, PusPacketHandlerResult, - PusPacketHandlingError, PusServiceBase, PusServiceHandler, PusTmWrapper, ReceivedTcWrapper, + EcssTcReceiver, EcssTmSender, PartialPusHandlingError, PusPacketHandlerResult, + PusPacketHandlingError, PusServiceBase, PusServiceHandler, PusTmWrapper, }; use spacepackets::ecss::PusPacket; use spacepackets::tc::PusTc; use spacepackets::tm::{PusTm, PusTmSecondaryHeader}; use spacepackets::SpHeader; use std::boxed::Box; -use std::sync::mpsc::Receiver; /// This is a helper class for [std] environments to handle generic PUS 17 (test service) packets. /// This handler only processes ping requests and generates a ping reply for them accordingly. @@ -20,12 +19,19 @@ pub struct PusService17TestHandler { impl PusService17TestHandler { pub fn new( tc_receiver: Box, + shared_tc_store: SharedPool, tm_sender: Box, tm_apid: u16, verification_handler: StdVerifReporterWithSender, ) -> Self { Self { - psb: PusServiceBase::new(tc_receiver, tm_sender, tm_apid, verification_handler), + psb: PusServiceBase::new( + tc_receiver, + shared_tc_store, + tm_sender, + tm_apid, + verification_handler, + ), } } } @@ -40,10 +46,11 @@ impl PusServiceHandler for PusService17TestHandler { fn handle_one_tc( &mut self, - tc: PusTc, - _tc_guard: PoolGuard, + addr: StoreAddr, token: VerificationToken, ) -> Result { + self.copy_tc_to_buf(addr)?; + let (tc, _) = PusTc::from_bytes(&self.psb.pus_buf)?; if tc.service() != 17 { return Err(PusPacketHandlingError::WrongService(tc.service())); } @@ -95,7 +102,7 @@ impl PusServiceHandler for PusService17TestHandler { } Ok(PusPacketHandlerResult::CustomSubservice( tc.subservice(), - token.into(), + token, )) } } @@ -107,7 +114,7 @@ mod tests { use crate::pus::verification::{ RequestId, StdVerifReporterWithSender, VerificationReporterCfg, }; - use crate::pus::{MpscTmInStoreSender, PusServiceHandler}; + use crate::pus::{MpscTcInStoreReceiver, MpscTmInStoreSender, PusServiceHandler}; use crate::tmtc::tm_helper::SharedTmStore; use spacepackets::ecss::{PusPacket, SerializablePusPacket}; use spacepackets::tc::{PusTc, PusTcSecondaryHeader}; @@ -126,24 +133,21 @@ mod tests { let tc_pool = LocalPool::new(pool_cfg.clone()); let tm_pool = LocalPool::new(pool_cfg); let tc_pool_shared = SharedPool::new(RwLock::new(Box::new(tc_pool))); - let tm_pool_shared = SharedPool::new(RwLock::new(Box::new(tm_pool))); - let shared_tm_store = SharedTmStore::new(tm_pool_shared.clone()); - let (test_srv_tx, test_srv_rx) = mpsc::channel(); + let shared_tm_store = SharedTmStore::new(Box::new(tm_pool)); + let tm_pool_shared = shared_tm_store.clone_backing_pool(); + let (test_srv_tc_tx, test_srv_tc_rx) = mpsc::channel(); let (tm_tx, tm_rx) = mpsc::channel(); - let verif_sender = MpscTmInStoreSender::new( - 0, - "verif_sender", - shared_tm_store.backing_pool(), - tm_tx.clone(), - ); + let verif_sender = + MpscTmInStoreSender::new(0, "verif_sender", shared_tm_store.clone(), tm_tx.clone()); let verif_cfg = VerificationReporterCfg::new(TEST_APID, 1, 2, 8).unwrap(); let mut verification_handler = StdVerifReporterWithSender::new(&verif_cfg, Box::new(verif_sender)); + let test_srv_tm_sender = MpscTmInStoreSender::new(0, "TEST_SENDER", shared_tm_store, tm_tx); + let test_srv_tc_receiver = MpscTcInStoreReceiver::new(0, "TEST_RECEIVER", test_srv_tc_rx); let mut pus_17_handler = PusService17TestHandler::new( - test_srv_rx, + Box::new(test_srv_tc_receiver), tc_pool_shared.clone(), - tm_tx, - shared_tm_store, + Box::new(test_srv_tm_sender), TEST_APID, verification_handler.clone(), ); @@ -160,7 +164,7 @@ mod tests { let addr = tc_pool.add(&pus_buf[..tc_size]).unwrap(); drop(tc_pool); // Send accepted TC to test service handler. - test_srv_tx.send((addr, token)).unwrap(); + test_srv_tc_tx.send((addr, token.into())).unwrap(); let result = pus_17_handler.handle_next_packet(); assert!(result.is_ok()); // We should see 4 replies in the TM queue now: Acceptance TM, Start TM, ping reply and diff --git a/satrs-core/src/pus/verification.rs b/satrs-core/src/pus/verification.rs index 53f0c8f..1f25c90 100644 --- a/satrs-core/src/pus/verification.rs +++ b/satrs-core/src/pus/verification.rs @@ -19,6 +19,7 @@ //! use satrs_core::pus::verification::{VerificationReporterCfg, VerificationReporterWithSender}; //! use satrs_core::seq_count::SeqCountProviderSimple; //! use satrs_core::pus::MpscTmInStoreSender; +//! use satrs_core::tmtc::tm_helper::SharedTmStore; //! use spacepackets::ecss::PusPacket; //! use spacepackets::SpHeader; //! use spacepackets::tc::{PusTc, PusTcSecondaryHeader}; @@ -28,9 +29,11 @@ //! const TEST_APID: u16 = 0x02; //! //! let pool_cfg = PoolCfg::new(vec![(10, 32), (10, 64), (10, 128), (10, 1024)]); -//! let shared_tm_pool: SharedPool = Arc::new(RwLock::new(Box::new(LocalPool::new(pool_cfg.clone())))); +//! let tm_pool = LocalPool::new(pool_cfg.clone()); +//! let shared_tm_store = SharedTmStore::new(Box::new(tm_pool)); +//! let tm_store = shared_tm_store.clone_backing_pool(); //! let (verif_tx, verif_rx) = mpsc::channel(); -//! let sender = MpscTmInStoreSender::new(0, "Test Sender", shared_tm_pool.clone(), verif_tx); +//! let sender = MpscTmInStoreSender::new(0, "Test Sender", shared_tm_store, verif_tx); //! let cfg = VerificationReporterCfg::new(TEST_APID, 1, 2, 8).unwrap(); //! let mut reporter = VerificationReporterWithSender::new(&cfg , Box::new(sender)); //! @@ -51,7 +54,7 @@ //! let addr = verif_rx.recv_timeout(Duration::from_millis(10)).unwrap(); //! let tm_len; //! { -//! let mut rg = shared_tm_pool.write().expect("Error locking shared pool"); +//! let mut rg = tm_store.write().expect("Error locking shared pool"); //! let store_guard = rg.read_with_guard(addr); //! let slice = store_guard.read().expect("Error reading TM slice"); //! tm_len = slice.len(); @@ -1491,25 +1494,26 @@ mod std_mod { #[cfg(test)] mod tests { - use crate::pool::{LocalPool, PoolCfg, SharedPool}; + use crate::pool::{LocalPool, PoolCfg}; use crate::pus::tests::CommonTmInfo; use crate::pus::verification::{ EcssTmSenderCore, EcssTmtcError, FailParams, FailParamsWithStep, RequestId, TcStateNone, VerificationReporter, VerificationReporterCfg, VerificationReporterWithSender, VerificationToken, }; - use crate::pus::{EcssChannel, EcssTmtcError, MpscTmInStoreSender, PusTmWrapper}; - use crate::SenderId; + use crate::pus::{EcssChannel, MpscTmInStoreSender, PusTmWrapper}; + use crate::tmtc::tm_helper::SharedTmStore; + use crate::ChannelId; use alloc::boxed::Box; use alloc::format; - use spacepackets::ecss::{EcssEnumU16, EcssEnumU32, EcssEnumU8, PusPacket}; + use spacepackets::ecss::{EcssEnumU16, EcssEnumU32, EcssEnumU8, PusError, PusPacket}; use spacepackets::tc::{PusTc, PusTcSecondaryHeader}; use spacepackets::tm::PusTm; use spacepackets::util::UnsignedEnum; use spacepackets::{ByteConversionError, CcsdsPacket, SpHeader}; use std::cell::RefCell; use std::collections::VecDeque; - use std::sync::{mpsc, Arc, RwLock}; + use std::sync::mpsc; use std::time::Duration; use std::vec; use std::vec::Vec; @@ -1534,7 +1538,7 @@ mod tests { } impl EcssChannel for TestSender { - fn id(&self) -> SenderId { + fn id(&self) -> ChannelId { 0 } fn name(&self) -> &'static str { @@ -1543,9 +1547,7 @@ mod tests { } impl EcssTmSenderCore for TestSender { - type Error = (); - - fn send_tm(&self, tm: PusTmWrapper) -> Result<(), Self::Error> { + fn send_tm(&self, tm: PusTmWrapper) -> Result<(), EcssTmtcError> { match tm { PusTmWrapper::InStore(_) => { panic!("TestSender: Can not deal with addresses"); @@ -1576,23 +1578,6 @@ mod tests { } } - #[derive(Debug, Copy, Clone, Eq, PartialEq)] - struct DummyError {} - #[derive(Default, Clone)] - struct FallibleSender {} - - impl EcssChannel for FallibleSender { - fn id(&self) -> SenderId { - 0 - } - } - impl EcssTmSenderCore for FallibleSender { - type Error = DummyError; - fn send_tm(&self, _: PusTmWrapper) -> Result<(), Self::Error> { - Err(DummyError {}) - } - } - struct TestBase<'a> { vr: VerificationReporter, #[allow(dead_code)] @@ -1604,13 +1589,13 @@ mod tests { &mut self.vr } } - struct TestBaseWithHelper<'a, E> { - helper: VerificationReporterWithSender, + struct TestBaseWithHelper<'a> { + helper: VerificationReporterWithSender, #[allow(dead_code)] tc: PusTc<'a>, } - impl<'a, E> TestBaseWithHelper<'a, E> { + impl<'a> TestBaseWithHelper<'a> { fn rep(&mut self) -> &mut VerificationReporter { &mut self.helper.reporter } @@ -1641,10 +1626,7 @@ mod tests { (TestBase { vr: reporter, tc }, init_tok) } - fn base_with_helper_init() -> ( - TestBaseWithHelper<'static, ()>, - VerificationToken, - ) { + fn base_with_helper_init() -> (TestBaseWithHelper<'static>, VerificationToken) { let mut reporter = base_reporter(); let (tc, _) = base_tc_init(None); let init_tok = reporter.add_tc(&tc); @@ -1674,9 +1656,10 @@ mod tests { #[test] fn test_mpsc_verif_send_sync() { let pool = LocalPool::new(PoolCfg::new(vec![(8, 8)])); - let shared_pool: SharedPool = Arc::new(RwLock::new(Box::new(pool))); + let tm_store = Box::new(pool); + let shared_tm_store = SharedTmStore::new(tm_store); let (tx, _) = mpsc::channel(); - let mpsc_verif_sender = MpscTmInStoreSender::new(0, "verif_sender", shared_pool, tx); + let mpsc_verif_sender = MpscTmInStoreSender::new(0, "verif_sender", shared_tm_store, tx); is_send(&mpsc_verif_sender); } @@ -1707,23 +1690,6 @@ mod tests { acceptance_check(sender, &tok.req_id); } - #[test] - fn test_acceptance_send_fails() { - let (mut b, tok) = base_init(false); - let mut faulty_sender = FallibleSender::default(); - let res = - b.vr.acceptance_success(tok, &mut faulty_sender, Some(&EMPTY_STAMP)); - assert!(res.is_err()); - let err = res.unwrap_err(); - assert_eq!(err.1, tok); - match err.0 { - EcssTmtcError::SendError(e) => { - assert_eq!(e, DummyError {}) - } - _ => panic!("{}", format!("Unexpected error {:?}", err.0)), - } - } - fn acceptance_fail_check(sender: &mut TestSender, req_id: RequestId, stamp_buf: [u8; 7]) { let cmp_info = TmInfo { common: CommonTmInfo { @@ -1788,7 +1754,7 @@ mod tests { let err_with_token = res.unwrap_err(); assert_eq!(err_with_token.1, tok); match err_with_token.0 { - EcssTmtcError::EcssTmtcError(EcssTmtcError::ByteConversion(e)) => match e { + EcssTmtcError::Pus(PusError::ByteConversion(e)) => match e { ByteConversionError::ToSliceTooSmall(missmatch) => { assert_eq!( missmatch.expected, @@ -2357,11 +2323,11 @@ mod tests { // TODO: maybe a bit more extensive testing, all I have time for right now fn test_seq_count_increment() { let pool_cfg = PoolCfg::new(vec![(10, 32), (10, 64), (10, 128), (10, 1024)]); - let shared_tm_pool: SharedPool = - Arc::new(RwLock::new(Box::new(LocalPool::new(pool_cfg.clone())))); + let tm_pool = Box::new(LocalPool::new(pool_cfg.clone())); + let shared_tm_store = SharedTmStore::new(tm_pool); + let shared_tm_pool = shared_tm_store.clone_backing_pool(); let (verif_tx, verif_rx) = mpsc::channel(); - let sender = - MpscTmInStoreSender::new(0, "Verification Sender", shared_tm_pool.clone(), verif_tx); + let sender = MpscTmInStoreSender::new(0, "Verification Sender", shared_tm_store, verif_tx); let cfg = VerificationReporterCfg::new(TEST_APID, 1, 2, 8).unwrap(); let mut reporter = VerificationReporterWithSender::new(&cfg, Box::new(sender)); diff --git a/satrs-core/src/tmtc/tm_helper.rs b/satrs-core/src/tmtc/tm_helper.rs index 5c960b9..da2bc1e 100644 --- a/satrs-core/src/tmtc/tm_helper.rs +++ b/satrs-core/src/tmtc/tm_helper.rs @@ -26,7 +26,7 @@ pub mod std_mod { } } - pub fn backing_pool(&self) -> SharedPool { + pub fn clone_backing_pool(&self) -> SharedPool { self.pool.clone() } diff --git a/satrs-core/tests/pus_verification.rs b/satrs-core/tests/pus_verification.rs index 313bc84..a533dbc 100644 --- a/satrs-core/tests/pus_verification.rs +++ b/satrs-core/tests/pus_verification.rs @@ -3,11 +3,12 @@ #[cfg(feature = "crossbeam")] pub mod crossbeam_test { use hashbrown::HashMap; - use satrs_core::pool::{LocalPool, PoolCfg, PoolProvider, SharedPool}; + use satrs_core::pool::{LocalPool, PoolCfg, PoolProvider}; use satrs_core::pus::verification::{ FailParams, RequestId, VerificationReporterCfg, VerificationReporterWithSender, }; use satrs_core::pus::MpscTmInStoreSender; + use satrs_core::tmtc::tm_helper::SharedTmStore; use spacepackets::ecss::{EcssEnumU16, EcssEnumU8, PusPacket, SerializablePusPacket}; use spacepackets::tc::{PusTc, PusTcSecondaryHeader}; use spacepackets::tm::PusTm; @@ -35,13 +36,12 @@ pub mod crossbeam_test { let cfg = VerificationReporterCfg::new(TEST_APID, 1, 2, 8).unwrap(); // Shared pool object to store the verification PUS telemetry let pool_cfg = PoolCfg::new(vec![(10, 32), (10, 64), (10, 128), (10, 1024)]); - let shared_tm_pool: SharedPool = - Arc::new(RwLock::new(Box::new(LocalPool::new(pool_cfg.clone())))); + let shared_tm_store = SharedTmStore::new(Box::new(LocalPool::new(pool_cfg.clone()))); let shared_tc_pool_0 = Arc::new(RwLock::new(LocalPool::new(pool_cfg))); let shared_tc_pool_1 = shared_tc_pool_0.clone(); let (tx, rx) = mpsc::channel(); let sender = - MpscTmInStoreSender::new(0, "verif_sender", shared_tm_pool.clone(), tx.clone()); + MpscTmInStoreSender::new(0, "verif_sender", shared_tm_store.clone(), tx.clone()); let mut reporter_with_sender_0 = VerificationReporterWithSender::new(&cfg, Box::new(sender)); let mut reporter_with_sender_1 = reporter_with_sender_0.clone(); @@ -146,8 +146,9 @@ pub mod crossbeam_test { .recv_timeout(Duration::from_millis(50)) .expect("Packet reception timeout"); let tm_len; + let shared_tm_store = shared_tm_store.clone_backing_pool(); { - let mut rg = shared_tm_pool.write().expect("Error locking shared pool"); + let mut rg = shared_tm_store.write().expect("Error locking shared pool"); let store_guard = rg.read_with_guard(verif_addr); let slice = store_guard.read().expect("Error reading TM slice"); tm_len = slice.len(); diff --git a/satrs-example/src/lib.rs b/satrs-example/src/lib.rs index 0e77dad..31fb63d 100644 --- a/satrs-example/src/lib.rs +++ b/satrs-example/src/lib.rs @@ -83,4 +83,14 @@ pub enum TmSenderId { PusHk = 3, PusAction = 4, PusSched = 5, + AllEvents = 6, +} + +#[derive(Copy, Clone, PartialEq, Eq)] +pub enum TcReceiverId { + PusTest = 1, + PusEvent = 2, + PusHk = 3, + PusAction = 4, + PusSched = 5, } diff --git a/satrs-example/src/main.rs b/satrs-example/src/main.rs index 9f1be77..e6a4241 100644 --- a/satrs-example/src/main.rs +++ b/satrs-example/src/main.rs @@ -32,8 +32,10 @@ use satrs_core::pus::hk::Subservice as HkSubservice; use satrs_core::pus::scheduler::PusScheduler; use satrs_core::pus::scheduler_srv::PusService11SchedHandler; use satrs_core::pus::test::PusService17TestHandler; -use satrs_core::pus::verification::{VerificationReporterCfg, VerificationReporterWithSender}; -use satrs_core::pus::MpscTmInStoreSender; +use satrs_core::pus::verification::{ + TcStateStarted, VerificationReporterCfg, VerificationReporterWithSender, VerificationToken, +}; +use satrs_core::pus::{MpscTcInStoreReceiver, MpscTmInStoreSender}; use satrs_core::seq_count::{CcsdsSimpleSeqCountProvider, SequenceCountProviderCore}; use satrs_core::spacepackets::tm::PusTmZeroCopyWriter; use satrs_core::spacepackets::{ @@ -44,8 +46,8 @@ use satrs_core::spacepackets::{ }; use satrs_core::tmtc::tm_helper::SharedTmStore; use satrs_core::tmtc::{AddressableId, TargetId}; -use satrs_core::SenderId; -use satrs_example::{RequestTargetId, TmSenderId, OBSW_SERVER_ADDR, SERVER_PORT}; +use satrs_core::ChannelId; +use satrs_example::{RequestTargetId, TcReceiverId, TmSenderId, OBSW_SERVER_ADDR, SERVER_PORT}; use std::collections::HashMap; use std::net::{IpAddr, SocketAddr}; use std::sync::mpsc::{channel, TryRecvError}; @@ -85,7 +87,7 @@ fn main() { let (tm_funnel_tx, tm_funnel_rx) = channel(); let (tm_server_tx, tm_server_rx) = channel(); let verif_sender = MpscTmInStoreSender::new( - TmSenderId::PusVerification as SenderId, + TmSenderId::PusVerification as ChannelId, "verif_sender", shared_tm_store.clone(), tm_funnel_tx.clone(), @@ -156,13 +158,18 @@ fn main() { action_service_receiver: pus_action_tx, }; let test_srv_tm_sender = MpscTmInStoreSender::new( - TmSenderId::PusTest as SenderId, + TmSenderId::PusTest as ChannelId, "PUS_17_TM_SENDER", shared_tm_store.clone(), tm_funnel_tx.clone(), ); - let pus17_handler = PusService17TestHandler::new( + let test_srv_receiver = MpscTcInStoreReceiver::new( + TcReceiverId::PusTest as ChannelId, + "PUS_17_TC_RECV", pus_test_rx, + ); + let pus17_handler = PusService17TestHandler::new( + Box::new(test_srv_receiver), tc_store.pool.clone(), Box::new(test_srv_tm_sender), PUS_APID, @@ -174,15 +181,20 @@ fn main() { }; let sched_srv_tm_sender = MpscTmInStoreSender::new( - TmSenderId::PusSched as SenderId, + TmSenderId::PusSched as ChannelId, "PUS_11_TM_SENDER", shared_tm_store.clone(), tm_funnel_tx.clone(), ); + let sched_srv_receiver = MpscTcInStoreReceiver::new( + TcReceiverId::PusSched as ChannelId, + "PUS_11_TC_RECV", + pus_sched_rx, + ); let scheduler = PusScheduler::new_with_current_init_time(Duration::from_secs(5)) .expect("Creating PUS Scheduler failed"); let pus_11_handler = PusService11SchedHandler::new( - pus_sched_rx, + Box::new(sched_srv_receiver), tc_store.pool.clone(), Box::new(sched_srv_tm_sender), PUS_APID, @@ -195,13 +207,18 @@ fn main() { }; let event_srv_tm_sender = MpscTmInStoreSender::new( - TmSenderId::PusEvent as SenderId, + TmSenderId::PusEvent as ChannelId, "PUS_5_TM_SENDER", shared_tm_store.clone(), tm_funnel_tx.clone(), ); - let pus_5_handler = PusService5EventHandler::new( + let event_srv_receiver = MpscTcInStoreReceiver::new( + TcReceiverId::PusEvent as ChannelId, + "PUS_5_TC_RECV", pus_event_rx, + ); + let pus_5_handler = PusService5EventHandler::new( + Box::new(event_srv_receiver), tc_store.pool.clone(), Box::new(event_srv_tm_sender), PUS_APID, @@ -211,13 +228,18 @@ fn main() { let mut pus_5_wrapper = Pus5Wrapper { pus_5_handler }; let action_srv_tm_sender = MpscTmInStoreSender::new( - TmSenderId::PusAction as SenderId, + TmSenderId::PusAction as ChannelId, "PUS_8_TM_SENDER", shared_tm_store.clone(), tm_funnel_tx.clone(), ); - let pus_8_handler = PusService8ActionHandler::new( + let action_srv_receiver = MpscTcInStoreReceiver::new( + TcReceiverId::PusAction as ChannelId, + "PUS_8_TC_RECV", pus_action_rx, + ); + let pus_8_handler = PusService8ActionHandler::new( + Box::new(action_srv_receiver), tc_store.pool.clone(), Box::new(action_srv_tm_sender), PUS_APID, @@ -227,13 +249,15 @@ fn main() { let mut pus_8_wrapper = Pus8Wrapper { pus_8_handler }; let hk_srv_tm_sender = MpscTmInStoreSender::new( - TmSenderId::PusHk as SenderId, + TmSenderId::PusHk as ChannelId, "PUS_3_TM_SENDER", shared_tm_store.clone(), tm_funnel_tx.clone(), ); + let hk_srv_receiver = + MpscTcInStoreReceiver::new(TcReceiverId::PusHk as ChannelId, "PUS_8_TC_RECV", pus_hk_rx); let pus_3_handler = PusService3HkHandler::new( - pus_hk_rx, + Box::new(hk_srv_receiver), tc_store.pool.clone(), Box::new(hk_srv_tm_sender), PUS_APID, @@ -262,7 +286,7 @@ fn main() { if let Ok(addr) = tm_funnel.tm_funnel_rx.recv() { // Read the TM, set sequence counter and message counter, and finally update // the CRC. - let shared_pool = shared_tm_store.backing_pool(); + let shared_pool = shared_tm_store.clone_backing_pool(); let mut pool_guard = shared_pool.write().expect("Locking TM pool failed"); let tm_raw = pool_guard .modify(&addr) @@ -297,12 +321,20 @@ fn main() { .name("Event".to_string()) .spawn(move || { let mut timestamp: [u8; 7] = [0; 7]; - let mut sender = - MpscTmInStoreSender::new(1, "event_sender", tm_store_event.clone(), tm_funnel_tx); + let mut sender = MpscTmInStoreSender::new( + TmSenderId::AllEvents as ChannelId, + "ALL_EVENTS_TX", + tm_store_event.clone(), + tm_funnel_tx, + ); let mut time_provider = TimeProvider::new_with_u16_days(0, 0); let mut report_completion = |event_req: EventRequestWithToken, timestamp: &[u8]| { + let started_token: VerificationToken = event_req + .token + .try_into() + .expect("expected start verification token"); reporter_event_handler - .completion_success(event_req.token.try_into().unwrap(), Some(timestamp)) + .completion_success(started_token, Some(timestamp)) .expect("Sending completion success failed"); }; loop { diff --git a/satrs-example/src/pus/action.rs b/satrs-example/src/pus/action.rs index ab7885b..4dd3e55 100644 --- a/satrs-example/src/pus/action.rs +++ b/satrs-example/src/pus/action.rs @@ -5,7 +5,7 @@ use satrs_core::pus::verification::{ FailParams, StdVerifReporterWithSender, TcStateAccepted, VerificationToken, }; use satrs_core::pus::{ - AcceptedTc, EcssTmSender, PusPacketHandlerResult, PusPacketHandlingError, PusServiceBase, + EcssTcReceiver, EcssTmSender, PusPacketHandlerResult, PusPacketHandlingError, PusServiceBase, PusServiceHandler, }; use satrs_core::spacepackets::ecss::PusPacket; @@ -13,7 +13,7 @@ use satrs_core::spacepackets::tc::PusTc; use satrs_core::tmtc::TargetId; use satrs_example::tmtc_err; use std::collections::HashMap; -use std::sync::mpsc::{Receiver, Sender}; +use std::sync::mpsc::Sender; pub struct PusService8ActionHandler { psb: PusServiceBase, @@ -22,15 +22,21 @@ pub struct PusService8ActionHandler { impl PusService8ActionHandler { pub fn new( - receiver: Receiver, - tc_pool: SharedPool, + tc_receiver: Box, + shared_tc_pool: SharedPool, tm_sender: Box, tm_apid: u16, verification_handler: StdVerifReporterWithSender, request_handlers: HashMap>, ) -> Self { Self { - psb: PusServiceBase::new(receiver, tc_pool, tm_sender, tm_apid, verification_handler), + psb: PusServiceBase::new( + tc_receiver, + shared_tc_pool, + tm_sender, + tm_apid, + verification_handler, + ), request_handlers, } } diff --git a/satrs-example/src/pus/hk.rs b/satrs-example/src/pus/hk.rs index a56ed6b..b7e70c2 100644 --- a/satrs-example/src/pus/hk.rs +++ b/satrs-example/src/pus/hk.rs @@ -6,7 +6,7 @@ use satrs_core::pus::verification::{ FailParams, StdVerifReporterWithSender, TcStateAccepted, VerificationToken, }; use satrs_core::pus::{ - AcceptedTc, EcssTmSender, PusPacketHandlerResult, PusPacketHandlingError, PusServiceBase, + EcssTcReceiver, EcssTmSender, PusPacketHandlerResult, PusPacketHandlingError, PusServiceBase, PusServiceHandler, }; use satrs_core::spacepackets::ecss::{hk, PusPacket}; @@ -14,7 +14,7 @@ use satrs_core::spacepackets::tc::PusTc; use satrs_core::tmtc::{AddressableId, TargetId}; use satrs_example::{hk_err, tmtc_err}; use std::collections::HashMap; -use std::sync::mpsc::{Receiver, Sender}; +use std::sync::mpsc::Sender; pub struct PusService3HkHandler { psb: PusServiceBase, @@ -23,15 +23,21 @@ pub struct PusService3HkHandler { impl PusService3HkHandler { pub fn new( - receiver: Receiver, - tc_pool: SharedPool, + tc_receiver: Box, + shared_tc_pool: SharedPool, tm_sender: Box, tm_apid: u16, verification_handler: StdVerifReporterWithSender, request_handlers: HashMap>, ) -> Self { Self { - psb: PusServiceBase::new(receiver, tc_pool, tm_sender, tm_apid, verification_handler), + psb: PusServiceBase::new( + tc_receiver, + shared_tc_pool, + tm_sender, + tm_apid, + verification_handler, + ), request_handlers, } } diff --git a/satrs-example/src/pus/mod.rs b/satrs-example/src/pus/mod.rs index 0a879f8..d8b1c5b 100644 --- a/satrs-example/src/pus/mod.rs +++ b/satrs-example/src/pus/mod.rs @@ -2,7 +2,7 @@ use crate::tmtc::MpscStoreAndSendError; use log::warn; use satrs_core::pool::StoreAddr; use satrs_core::pus::verification::{FailParams, StdVerifReporterWithSender}; -use satrs_core::pus::{AcceptedTc, PusPacketHandlerResult}; +use satrs_core::pus::{PusPacketHandlerResult, TcAddrWithToken}; use satrs_core::spacepackets::ecss::PusServiceId; use satrs_core::spacepackets::tc::PusTc; use satrs_core::spacepackets::time::cds::TimeProvider; @@ -17,11 +17,11 @@ pub mod scheduler; pub mod test; pub struct PusTcMpscRouter { - pub test_service_receiver: Sender, - pub event_service_receiver: Sender, - pub sched_service_receiver: Sender, - pub hk_service_receiver: Sender, - pub action_service_receiver: Sender, + pub test_service_receiver: Sender, + pub event_service_receiver: Sender, + pub sched_service_receiver: Sender, + pub hk_service_receiver: Sender, + pub action_service_receiver: Sender, } pub struct PusReceiver { @@ -86,20 +86,20 @@ impl PusReceiver { PusServiceId::Test => { self.pus_router .test_service_receiver - .send((store_addr, accepted_token))?; + .send((store_addr, accepted_token.into()))?; } PusServiceId::Housekeeping => self .pus_router .hk_service_receiver - .send((store_addr, accepted_token))?, + .send((store_addr, accepted_token.into()))?, PusServiceId::Event => self .pus_router .event_service_receiver - .send((store_addr, accepted_token))?, + .send((store_addr, accepted_token.into()))?, PusServiceId::Scheduling => self .pus_router .sched_service_receiver - .send((store_addr, accepted_token))?, + .send((store_addr, accepted_token.into()))?, _ => { let result = self.verif_reporter.start_failure( accepted_token, diff --git a/satrs-example/src/tmtc.rs b/satrs-example/src/tmtc.rs index 95b9590..2a18dd3 100644 --- a/satrs-example/src/tmtc.rs +++ b/satrs-example/src/tmtc.rs @@ -10,7 +10,7 @@ use crate::ccsds::CcsdsReceiver; use crate::pus::{PusReceiver, PusTcMpscRouter}; use satrs_core::pool::{SharedPool, StoreAddr, StoreError}; use satrs_core::pus::verification::StdVerifReporterWithSender; -use satrs_core::pus::{AcceptedTc, ReceivesEcssPusTc}; +use satrs_core::pus::{ReceivesEcssPusTc, TcAddrWithToken}; use satrs_core::spacepackets::ecss::{PusPacket, SerializablePusPacket}; use satrs_core::spacepackets::tc::PusTc; use satrs_core::spacepackets::SpHeader; @@ -42,7 +42,7 @@ pub enum MpscStoreAndSendError { #[error("Store error: {0}")] Store(#[from] StoreError), #[error("TC send error: {0}")] - TcSend(#[from] SendError), + TcSend(#[from] SendError), #[error("TMTC send error: {0}")] TmTcSend(#[from] SendError), } @@ -123,7 +123,7 @@ pub fn core_tmtc_task( let mut udp_tmtc_server = UdpTmtcServer { udp_tc_server, tm_rx: tm_args.tm_server_rx, - tm_store: tm_args.tm_store.backing_pool(), + tm_store: tm_args.tm_store.clone_backing_pool(), }; let mut tc_buf: [u8; 4096] = [0; 4096];