diff --git a/satrs-core/src/pool.rs b/satrs-core/src/pool.rs index ebc404e..a37e406 100644 --- a/satrs-core/src/pool.rs +++ b/satrs-core/src/pool.rs @@ -77,10 +77,13 @@ use alloc::format; use alloc::string::String; use alloc::vec; use alloc::vec::Vec; +use core::fmt::{Display, Formatter}; use delegate::delegate; #[cfg(feature = "std")] use std::boxed::Box; #[cfg(feature = "std")] +use std::error::Error; +#[cfg(feature = "std")] use std::sync::{Arc, RwLock}; type NumBlocks = u16; @@ -146,6 +149,22 @@ pub enum StoreIdError { InvalidPacketIdx(u16), } +impl Display for StoreIdError { + fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { + match self { + StoreIdError::InvalidSubpool(pool) => { + write!(f, "invalid subpool, index: {}", pool) + } + StoreIdError::InvalidPacketIdx(packet_idx) => { + write!(f, "invalid packet index: {}", packet_idx) + } + } + } +} + +#[cfg(feature = "std")] +impl Error for StoreIdError {} + #[derive(Debug, Clone, PartialEq, Eq)] pub enum StoreError { /// Requested data block is too large @@ -160,6 +179,38 @@ pub enum StoreError { InternalError(String), } +impl Display for StoreError { + fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { + match self { + StoreError::DataTooLarge(size) => { + write!(f, "data to store with size {} is too large", size) + } + StoreError::StoreFull(u16) => { + write!(f, "store is too full. index for full subpool: {}", u16) + } + StoreError::InvalidStoreId(id_e, addr) => { + write!(f, "invalid store ID: {}, address: {:?}", id_e, addr) + } + StoreError::DataDoesNotExist(addr) => { + write!(f, "no data exists at address {:?}", addr) + } + StoreError::InternalError(e) => { + write!(f, "internal error: {}", e) + } + } + } +} + +#[cfg(feature = "std")] +impl Error for StoreError { + fn source(&self) -> Option<&(dyn Error + 'static)> { + if let StoreError::InvalidStoreId(e, _) = self { + return Some(e); + } + None + } +} + pub trait PoolProvider { /// Add new data to the pool. The provider should attempt to reserve a memory block with the /// appropriate size and then copy the given data to the block. Yields a [StoreAddr] which can diff --git a/satrs-example/src/ccsds.rs b/satrs-example/src/ccsds.rs index cbaf5c7..75c4ae8 100644 --- a/satrs-example/src/ccsds.rs +++ b/satrs-example/src/ccsds.rs @@ -1,13 +1,13 @@ -use crate::tmtc::PUS_APID; -use satrs_core::tmtc::{CcsdsPacketHandler, PusDistributor, ReceivesCcsdsTc}; +use crate::tmtc::{MpscStoreAndSendError, PusTcSource, PUS_APID}; +use satrs_core::tmtc::{CcsdsPacketHandler, ReceivesCcsdsTc}; use spacepackets::{CcsdsPacket, SpHeader}; pub struct CcsdsReceiver { - pub pus_handler: PusDistributor<()>, + pub tc_source: PusTcSource, } impl CcsdsPacketHandler for CcsdsReceiver { - type Error = (); + type Error = MpscStoreAndSendError; fn valid_apids(&self) -> &'static [u16] { &[PUS_APID] @@ -19,9 +19,7 @@ impl CcsdsPacketHandler for CcsdsReceiver { tc_raw: &[u8], ) -> Result<(), Self::Error> { if sp_header.apid() == PUS_APID { - self.pus_handler - .pass_ccsds(sp_header, tc_raw) - .expect("Handling PUS packet failed"); + return self.tc_source.pass_ccsds(sp_header, tc_raw); } Ok(()) } diff --git a/satrs-example/src/main.rs b/satrs-example/src/main.rs index fddd1b1..238c62e 100644 --- a/satrs-example/src/main.rs +++ b/satrs-example/src/main.rs @@ -5,13 +5,14 @@ mod requests; mod tmtc; use crate::requests::RequestWithToken; -use crate::tmtc::{core_tmtc_task, CoreTmtcArgs, TmStore, PUS_APID}; +use crate::tmtc::{ + core_tmtc_task, OtherArgs, PusTcSource, TcArgs, TcStore, TmArgs, TmFunnel, TmStore, PUS_APID, +}; use satrs_core::event_man::{ EventManagerWithMpscQueue, MpscEventReceiver, MpscEventU32SendProvider, SendEventProvider, }; use satrs_core::events::EventU32; -use satrs_core::hal::host::udp_server::UdpTcServer; -use satrs_core::pool::{LocalPool, PoolCfg, SharedPool, StoreAddr}; +use satrs_core::pool::{LocalPool, PoolCfg, StoreAddr}; use satrs_core::pus::event_man::{ DefaultPusMgmtBackendProvider, EventReporter, EventRequest, EventRequestWithToken, PusEventDispatcher, @@ -21,7 +22,6 @@ use satrs_core::pus::verification::{ }; use satrs_core::pus::{EcssTmError, EcssTmSender}; use satrs_core::seq_count::SimpleSeqCountProvider; -use satrs_core::tmtc::CcsdsError; use satrs_example::{RequestTargetId, OBSW_SERVER_ADDR, SERVER_PORT}; use spacepackets::time::cds::TimeProvider; use spacepackets::time::TimeWriter; @@ -33,17 +33,6 @@ use std::sync::{mpsc, Arc, RwLock}; use std::thread; use std::time::Duration; -struct TmFunnel { - tm_funnel_rx: mpsc::Receiver, - tm_server_tx: mpsc::Sender, -} - -struct UdpTmtcServer { - udp_tc_server: UdpTcServer>, - tm_rx: mpsc::Receiver, - tm_store: SharedPool, -} - #[derive(Clone)] struct EventTmSender { store_helper: TmStore, @@ -70,16 +59,33 @@ impl EcssTmSender for EventTmSender { fn main() { println!("Running OBSW example"); - let pool_cfg = PoolCfg::new(vec![(8, 32), (4, 64), (2, 128)]); - let tm_pool = LocalPool::new(pool_cfg); - let tm_store: SharedPool = Arc::new(RwLock::new(Box::new(tm_pool))); - let tm_store_helper = TmStore { - pool: tm_store.clone(), + let tm_pool = LocalPool::new(PoolCfg::new(vec![ + (30, 32), + (15, 64), + (15, 128), + (15, 256), + (15, 1024), + (15, 2048), + ])); + let tm_store = TmStore { + pool: Arc::new(RwLock::new(Box::new(tm_pool))), }; - let addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT); + let tc_pool = LocalPool::new(PoolCfg::new(vec![ + (30, 32), + (15, 64), + (15, 128), + (15, 256), + (15, 1024), + (15, 2048), + ])); + let tc_store = TcStore { + pool: Arc::new(RwLock::new(Box::new(tc_pool))), + }; + let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT); + let (tc_source_tx, tc_source_rx) = channel(); let (tm_funnel_tx, tm_funnel_rx) = channel(); let (tm_server_tx, tm_server_rx) = channel(); - let sender = MpscVerifSender::new(tm_store.clone(), tm_funnel_tx.clone()); + let sender = MpscVerifSender::new(tm_store.pool.clone(), tm_funnel_tx.clone()); let verif_cfg = VerificationReporterCfg::new( PUS_APID, #[allow(clippy::box_default)] @@ -89,7 +95,7 @@ fn main() { 8, ) .unwrap(); - let reporter_with_sender_0 = VerificationReporterWithSender::new(&verif_cfg, Box::new(sender)); + let verif_reporter = VerificationReporterWithSender::new(&verif_cfg, Box::new(sender)); // Create event handling components let (event_request_tx, event_request_rx) = channel::(); @@ -102,26 +108,40 @@ fn main() { PusEventDispatcher::new(event_reporter, Box::new(pus_tm_backend)); let (pus_event_man_tx, pus_event_man_rx) = channel(); let pus_event_man_send_provider = MpscEventU32SendProvider::new(1, pus_event_man_tx); - let mut reporter_event_handler = reporter_with_sender_0.clone(); - let mut reporter_aocs = reporter_with_sender_0.clone(); + let mut reporter_event_handler = verif_reporter.clone(); + let mut reporter_aocs = verif_reporter.clone(); event_man.subscribe_all(pus_event_man_send_provider.id()); let mut request_map = HashMap::new(); let (acs_thread_tx, acs_thread_rx) = channel::(); request_map.insert(RequestTargetId::AcsSubsystem as u32, acs_thread_tx); - // Create clones here to allow move for thread 0 - let core_args = CoreTmtcArgs { - tm_store: tm_store_helper.clone(), - tm_sender: tm_funnel_tx.clone(), + let tc_source = PusTcSource { + tc_store, + tc_source: tc_source_tx, + }; + + // Create clones here to allow moving the values + let core_args = OtherArgs { + sock_addr, + verif_reporter, event_sender, event_request_tx, request_map, }; + let tc_args = TcArgs { + tc_source, + tc_receiver: tc_source_rx, + }; + let tm_args = TmArgs { + tm_store: tm_store.clone(), + tm_sink_sender: tm_funnel_tx.clone(), + tm_server_rx, + }; println!("Starting TMTC task"); let jh0 = thread::spawn(move || { - core_tmtc_task(core_args, tm_server_rx, addr, reporter_with_sender_0); + core_tmtc_task(core_args, tc_args, tm_args); }); println!("Starting TM funnel task"); @@ -143,7 +163,7 @@ fn main() { println!("Starting event handling task"); let jh2 = thread::spawn(move || { let mut timestamp: [u8; 7] = [0; 7]; - let mut sender = EventTmSender::new(tm_store_helper, tm_funnel_tx); + let mut sender = EventTmSender::new(tm_store, tm_funnel_tx); let mut time_provider = TimeProvider::new_with_u16_days(0, 0); let mut report_completion = |event_req: EventRequestWithToken, timestamp: &[u8]| { reporter_event_handler diff --git a/satrs-example/src/pus.rs b/satrs-example/src/pus.rs index 49dc6bc..d0480f4 100644 --- a/satrs-example/src/pus.rs +++ b/satrs-example/src/pus.rs @@ -1,6 +1,6 @@ use crate::hk::{CollectionIntervalFactor, HkRequest}; use crate::requests::{Request, RequestWithToken}; -use crate::tmtc::TmStore; +use crate::tmtc::{PusTcSource, TmStore}; use satrs_core::events::EventU32; use satrs_core::pool::StoreAddr; use satrs_core::pus::event::Subservices; @@ -26,6 +26,8 @@ pub struct PusReceiver { pub tm_tx: Sender, pub tm_store: TmStore, pub verif_reporter: StdVerifReporterWithSender, + #[allow(dead_code)] + tc_source: PusTcSource, event_request_tx: Sender, request_map: HashMap>, stamper: TimeProvider, @@ -38,6 +40,7 @@ impl PusReceiver { tm_tx: Sender, tm_store: TmStore, verif_reporter: StdVerifReporterWithSender, + tc_source: PusTcSource, event_request_tx: Sender, request_map: HashMap>, ) -> Self { @@ -46,6 +49,7 @@ impl PusReceiver { tm_tx, tm_store, verif_reporter, + tc_source, event_request_tx, request_map, stamper: TimeProvider::new_with_u16_days(0, 0), diff --git a/satrs-example/src/tmtc.rs b/satrs-example/src/tmtc.rs index e63502a..32546cc 100644 --- a/satrs-example/src/tmtc.rs +++ b/satrs-example/src/tmtc.rs @@ -2,89 +2,225 @@ use satrs_core::events::EventU32; use satrs_core::hal::host::udp_server::{ReceiveResult, UdpTcServer}; use satrs_core::params::Params; use std::collections::HashMap; +use std::error::Error; +use std::fmt::{Display, Formatter}; use std::net::SocketAddr; -use std::sync::mpsc; -use std::sync::mpsc::Sender; +use std::sync::mpsc::{Receiver, SendError, Sender, TryRecvError}; use std::thread; use std::time::Duration; use crate::ccsds::CcsdsReceiver; use crate::pus::PusReceiver; use crate::requests::RequestWithToken; -use crate::UdpTmtcServer; -use satrs_core::pool::{SharedPool, StoreAddr}; +use satrs_core::pool::{SharedPool, StoreAddr, StoreError}; use satrs_core::pus::event_man::EventRequestWithToken; use satrs_core::pus::verification::StdVerifReporterWithSender; -use satrs_core::tmtc::{CcsdsDistributor, CcsdsError, PusDistributor}; +use satrs_core::tmtc::{ + CcsdsDistributor, CcsdsError, PusServiceProvider, ReceivesCcsdsTc, ReceivesEcssPusTc, +}; +use spacepackets::ecss::PusPacket; +use spacepackets::tc::PusTc; use spacepackets::tm::PusTm; +use spacepackets::SpHeader; pub const PUS_APID: u16 = 0x02; -pub struct CoreTmtcArgs { - pub tm_store: TmStore, - pub tm_sender: Sender, +pub struct OtherArgs { + pub sock_addr: SocketAddr, + pub verif_reporter: StdVerifReporterWithSender, pub event_sender: Sender<(EventU32, Option)>, pub event_request_tx: Sender, pub request_map: HashMap>, } +pub struct TmArgs { + pub tm_store: TmStore, + pub tm_sink_sender: Sender, + pub tm_server_rx: Receiver, +} + +pub struct TcArgs { + pub tc_source: PusTcSource, + pub tc_receiver: Receiver, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum MpscStoreAndSendError { + StoreError(StoreError), + SendError(SendError), +} + +impl Display for MpscStoreAndSendError { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + MpscStoreAndSendError::StoreError(s) => { + write!(f, "store error {}", s) + } + MpscStoreAndSendError::SendError(s) => { + write!(f, "send error {}", s) + } + } + } +} + +impl Error for MpscStoreAndSendError { + fn source(&self) -> Option<&(dyn Error + 'static)> { + match self { + MpscStoreAndSendError::StoreError(s) => Some(s), + MpscStoreAndSendError::SendError(s) => Some(s), + } + } +} + +impl From for MpscStoreAndSendError { + fn from(value: StoreError) -> Self { + Self::StoreError(value) + } +} + +impl From> for MpscStoreAndSendError { + fn from(value: SendError) -> Self { + Self::SendError(value) + } +} + #[derive(Clone)] pub struct TmStore { pub pool: SharedPool, } +#[derive(Clone)] +pub struct TcStore { + pub pool: SharedPool, +} + impl TmStore { pub fn add_pus_tm(&mut self, pus_tm: &PusTm) -> StoreAddr { - let mut pg = self.pool.write().expect("Error locking TM store"); + let mut pg = self.pool.write().expect("error locking TM store"); let (addr, buf) = pg.free_element(pus_tm.len_packed()).expect("Store error"); pus_tm .write_to_bytes(buf) - .expect("Writing PUS TM to store failed"); + .expect("writing PUS TM to store failed"); addr } } -pub fn core_tmtc_task( - args: CoreTmtcArgs, - tm_server_rx: mpsc::Receiver, - addr: SocketAddr, - verif_reporter: StdVerifReporterWithSender, -) { - let pus_receiver = PusReceiver::new( +impl TcStore { + pub fn add_pus_tc(&mut self, pus_tc: &PusTc) -> Result { + let mut pg = self.pool.write().expect("error locking TC store"); + let (addr, buf) = pg.free_element(pus_tc.len_packed())?; + pus_tc + .write_to_bytes(buf) + .expect("writing PUS TC to store failed"); + Ok(addr) + } +} + +pub struct TmFunnel { + pub tm_funnel_rx: Receiver, + pub tm_server_tx: Sender, +} + +pub struct UdpTmtcServer { + udp_tc_server: UdpTcServer>, + tm_rx: Receiver, + tm_store: SharedPool, +} + +#[derive(Clone)] +pub struct PusTcSource { + pub tc_source: Sender, + pub tc_store: TcStore, +} + +impl ReceivesEcssPusTc for PusTcSource { + type Error = MpscStoreAndSendError; + + fn pass_pus_tc(&mut self, _: &SpHeader, pus_tc: &PusTc) -> Result<(), Self::Error> { + let addr = self.tc_store.add_pus_tc(pus_tc)?; + self.tc_source.send(addr)?; + Ok(()) + } +} + +impl ReceivesCcsdsTc for PusTcSource { + type Error = MpscStoreAndSendError; + + fn pass_ccsds(&mut self, _: &SpHeader, tc_raw: &[u8]) -> Result<(), Self::Error> { + let mut pool = self.tc_store.pool.write().expect("locking pool failed"); + let addr = pool.add(tc_raw)?; + drop(pool); + self.tc_source.send(addr)?; + Ok(()) + } +} +pub fn core_tmtc_task(args: OtherArgs, mut tc_args: TcArgs, tm_args: TmArgs) { + let mut pus_receiver = PusReceiver::new( PUS_APID, - args.tm_sender, - args.tm_store.clone(), - verif_reporter, + tm_args.tm_sink_sender, + tm_args.tm_store.clone(), + args.verif_reporter, + tc_args.tc_source.clone(), args.event_request_tx, args.request_map, ); - let pus_distributor = PusDistributor::new(Box::new(pus_receiver)); let ccsds_receiver = CcsdsReceiver { - pus_handler: pus_distributor, + tc_source: tc_args.tc_source.clone(), }; let ccsds_distributor = CcsdsDistributor::new(Box::new(ccsds_receiver)); - let udp_tc_server = UdpTcServer::new(addr, 2048, Box::new(ccsds_distributor)) + let udp_tc_server = UdpTcServer::new(args.sock_addr, 2048, Box::new(ccsds_distributor)) .expect("Creating UDP TMTC server failed"); let mut udp_tmtc_server = UdpTmtcServer { udp_tc_server, - tm_rx: tm_server_rx, - tm_store: args.tm_store.pool.clone(), + tm_rx: tm_args.tm_server_rx, + tm_store: tm_args.tm_store.pool.clone(), }; loop { - core_tmtc_loop(&mut udp_tmtc_server); + core_tmtc_loop(&mut udp_tmtc_server, &mut tc_args, &mut pus_receiver); thread::sleep(Duration::from_millis(400)); } } -fn core_tmtc_loop(udp_tmtc_server: &mut UdpTmtcServer) { - while core_tc_handling(udp_tmtc_server) {} +fn core_tmtc_loop( + udp_tmtc_server: &mut UdpTmtcServer, + tc_args: &mut TcArgs, + pus_receiver: &mut PusReceiver, +) { + while poll_tc_server(udp_tmtc_server) {} + match tc_args.tc_receiver.try_recv() { + Ok(addr) => { + let pool = tc_args + .tc_source + .tc_store + .pool + .read() + .expect("locking tc pool failed"); + let data = pool.read(&addr).expect("reading pool failed"); + match PusTc::from_bytes(data) { + Ok((pus_tc, _)) => { + pus_receiver + .handle_pus_tc_packet(pus_tc.service(), pus_tc.sp_header(), &pus_tc) + .ok(); + } + Err(e) => { + println!("error creating PUS TC from raw data: {}", e); + println!("raw data: {:x?}", data); + } + } + } + Err(e) => { + if let TryRecvError::Disconnected = e { + println!("tmtc thread: sender disconnected") + } + } + } if let Some(recv_addr) = udp_tmtc_server.udp_tc_server.last_sender() { core_tm_handling(udp_tmtc_server, &recv_addr); } } -fn core_tc_handling(udp_tmtc_server: &mut UdpTmtcServer) -> bool { +fn poll_tc_server(udp_tmtc_server: &mut UdpTmtcServer) -> bool { match udp_tmtc_server.udp_tc_server.try_recv_tc() { Ok(_) => true, Err(e) => match e { @@ -112,9 +248,9 @@ fn core_tm_handling(udp_tmtc_server: &mut UdpTmtcServer, recv_addr: &SocketAddr) let mut store_lock = udp_tmtc_server .tm_store .write() - .expect("Locking TM store failed"); + .expect("locking TM store failed"); let pg = store_lock.read_with_guard(addr); - let buf = pg.read().expect("Error reading TM pool data"); + let buf = pg.read().expect("error reading TM pool data"); if buf.len() > 9 { let service = buf[7]; let subservice = buf[8]; @@ -126,6 +262,6 @@ fn core_tm_handling(udp_tmtc_server: &mut UdpTmtcServer, recv_addr: &SocketAddr) .udp_tc_server .socket .send_to(buf, recv_addr) - .expect("Sending TM failed"); + .expect("sending TM failed"); } } diff --git a/spacepackets b/spacepackets index 9b091e3..f8feb02 160000 --- a/spacepackets +++ b/spacepackets @@ -1 +1 @@ -Subproject commit 9b091e3a3a6f599b093c96327751bcf1bc911ca1 +Subproject commit f8feb027dee212702d1609433348784be3cb602b