diff --git a/satrs-example/src/main.rs b/satrs-example/src/main.rs index bd69787..9f5eea2 100644 --- a/satrs-example/src/main.rs +++ b/satrs-example/src/main.rs @@ -9,7 +9,7 @@ use log::{info, warn}; use crate::hk::AcsHkIds; use crate::logging::setup_logger; -use crate::pus::test::{PacketHandlerResult, PusService17TestHandler}; +use crate::pus::test::{PusService17TestHandler, Service17CustomWrapper}; use crate::pus::PusTcMpscRouter; use crate::requests::{Request, RequestWithToken}; use crate::tmtc::{ @@ -109,6 +109,7 @@ fn main() { // The event manager will receive the RX handle to receive all the events. let (event_sender, event_man_rx) = channel(); let event_recv = MpscEventReceiver::::new(event_man_rx); + let test_srv_event_sender = event_sender.clone(); let mut event_man = EventManagerWithMpscQueue::new(Box::new(event_recv)); // All events sent to the manager are routed to the PUS event manager, which generates PUS event @@ -174,6 +175,10 @@ fn main() { tm_store.clone(), verif_reporter.clone(), ); + let mut srv_17_wrapper = Service17CustomWrapper { + pus17_handler, + test_srv_event_sender, + }; info!("Starting TMTC task"); let jh0 = thread::Builder::new() @@ -337,38 +342,10 @@ fn main() { info!("Starting PUS handler thread"); let jh4 = thread::Builder::new() .name("PUS".to_string()) - .spawn(move || { - loop { - let mut handled_pings = 0; - // TODO: Better error handling - let res = pus17_handler.handle_next_packet().unwrap(); - match res { - PacketHandlerResult::PingRequestHandled => { - handled_pings += 1; - } - PacketHandlerResult::CustomSubservice => { - let (buf, _) = pus17_handler.pus_tc_buf(); - let (tc, size) = PusTc::from_bytes(&buf).unwrap(); - if tc.subservice() == 128 { - info!("Generating test event"); - event_sender - .send((TEST_EVENT.into(), None)) - .expect("Sending test event failed"); - let start_token = pus17_handler - .verification_handler() - .start_success(token, Some(&stamp_buf)) - .expect("Error sending start success"); - pus17_handler - .verification_handler() - .completion_success(start_token, Some(&stamp_buf)) - .expect("Error sending completion success"); - } - } - PacketHandlerResult::Empty => { - thread::sleep(Duration::from_millis(400)); - } - } - res.expect("some PUS17 error"); + .spawn(move || loop { + let queue_empty = srv_17_wrapper.perform_operation(); + if queue_empty { + thread::sleep(Duration::from_millis(400)); } }) .unwrap(); diff --git a/satrs-example/src/pus/mod.rs b/satrs-example/src/pus/mod.rs index 5c38663..62dd423 100644 --- a/satrs-example/src/pus/mod.rs +++ b/satrs-example/src/pus/mod.rs @@ -5,7 +5,7 @@ use satrs_core::hk::{CollectionIntervalFactor, HkRequest}; use satrs_core::mode::{ModeAndSubmode, ModeRequest}; use satrs_core::objects::ObjectId; use satrs_core::params::Params; -use satrs_core::pool::{PoolProvider, SharedPool, StoreAddr}; +use satrs_core::pool::{PoolProvider, SharedPool, StoreAddr, StoreError}; use satrs_core::pus::event_man::{EventRequest, EventRequestWithToken}; use satrs_core::pus::hk; use satrs_core::pus::mode::Subservice; @@ -18,8 +18,8 @@ use satrs_core::pus::{event, EcssTcSenderCore, GenericTcCheckError, MpscTmtcInSt use satrs_core::pus::{mode, EcssTcSender}; use satrs_core::res_code::ResultU16; use satrs_core::seq_count::{SeqCountProviderSyncClonable, SequenceCountProviderCore}; -use satrs_core::spacepackets::ecss::{scheduling, PusServiceId}; -use satrs_core::spacepackets::time::CcsdsTimeProvider; +use satrs_core::spacepackets::ecss::{scheduling, PusError, PusServiceId}; +use satrs_core::spacepackets::time::{CcsdsTimeProvider, StdTimestampError}; use satrs_core::tmtc::tm_helper::{PusTmWithCdsShortHelper, SharedTmStore}; use satrs_core::tmtc::{AddressableId, PusServiceProvider, TargetId}; use satrs_core::{ @@ -36,6 +36,36 @@ use std::sync::mpsc::{Receiver, SendError, Sender}; pub mod scheduler; pub mod test; +#[derive(Debug, Clone)] +pub enum PusPacketHandlingError { + PusError(PusError), + WrongService(u8), + StoreError(StoreError), + RwGuardError(String), + TimeError(StdTimestampError), + TmSendError(String), + QueueDisconnected, + OtherError(String), +} + +impl From for PusPacketHandlingError { + fn from(value: PusError) -> Self { + Self::PusError(value) + } +} + +impl From for PusPacketHandlingError { + fn from(value: StdTimestampError) -> Self { + Self::TimeError(value) + } +} + +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum PusPacketHandlerResult { + RequestHandled, + CustomSubservice(VerificationToken), + Empty, +} pub struct PusServiceBase { tc_rx: Receiver, tc_store: SharedPool, diff --git a/satrs-example/src/pus/scheduler.rs b/satrs-example/src/pus/scheduler.rs index cf35b5a..5b3b418 100644 --- a/satrs-example/src/pus/scheduler.rs +++ b/satrs-example/src/pus/scheduler.rs @@ -1,4 +1,4 @@ -use crate::pus::{AcceptedTc, PusServiceBase}; +use crate::pus::{AcceptedTc, PusPacketHandlerResult, PusPacketHandlingError, PusServiceBase}; use delegate::delegate; use satrs_core::pool::{SharedPool, StoreAddr}; use satrs_core::pus::scheduling::PusScheduler; @@ -12,7 +12,7 @@ use satrs_core::spacepackets::tc::PusTc; use satrs_core::spacepackets::time::cds::TimeProvider; use satrs_core::tmtc::tm_helper::{PusTmWithCdsShortHelper, SharedTmStore}; use satrs_example::tmtc_err; -use std::sync::mpsc::{Receiver, Sender}; +use std::sync::mpsc::{Receiver, Sender, TryRecvError}; pub struct PusService11SchedHandler { psb: PusServiceBase, @@ -41,12 +41,27 @@ impl PusService11SchedHandler { scheduler, } } - // TODO: Return errors which occured - pub fn periodic_operation(&mut self) -> Result { - Ok(self.psb.handled_tcs) + + pub fn handle_next_packet(&mut self) -> Result { + return match self.psb.tc_rx.try_recv() { + Ok((addr, token)) => { + if self.handle_one_tc(addr, token)? { + return Ok(PusPacketHandlerResult::RequestHandled); + } + Ok(PusPacketHandlerResult::CustomSubservice(token)) + } + Err(e) => match e { + TryRecvError::Empty => Ok(PusPacketHandlerResult::Empty), + TryRecvError::Disconnected => Err(PusPacketHandlingError::QueueDisconnected), + }, + }; } - pub fn handle_one_tc(&mut self, addr: StoreAddr, token: VerificationToken) { + pub fn handle_one_tc( + &mut self, + addr: StoreAddr, + token: VerificationToken, + ) -> Result { let time_provider = TimeProvider::from_now_with_u16_days().unwrap(); // TODO: Better error handling { diff --git a/satrs-example/src/pus/test.rs b/satrs-example/src/pus/test.rs index c3130f1..2ec1b73 100644 --- a/satrs-example/src/pus/test.rs +++ b/satrs-example/src/pus/test.rs @@ -1,36 +1,91 @@ -use crate::pus::{AcceptedTc, PusServiceBase}; +use crate::pus::{AcceptedTc, PusPacketHandlerResult, PusPacketHandlingError, PusServiceBase}; use delegate::delegate; -use log::info; +use log::{error, info, warn}; use satrs_core::events::EventU32; use satrs_core::params::Params; -use satrs_core::pool::{SharedPool, StoreAddr}; +use satrs_core::pool::{SharedPool, StoreAddr, StoreError}; use satrs_core::pus::verification::{ - StdVerifReporterWithSender, TcStateAccepted, VerificationToken, + FailParams, StdVerifReporterWithSender, TcStateAccepted, TcStateStarted, + VerificationOrSendErrorWithToken, VerificationToken, }; use satrs_core::seq_count::{SeqCountProviderSyncClonable, SequenceCountProviderCore}; -use satrs_core::spacepackets::ecss::PusPacket; +use satrs_core::spacepackets::ecss::{PusError, PusPacket}; use satrs_core::spacepackets::tc::PusTc; use satrs_core::spacepackets::time::cds::TimeProvider; -use satrs_core::spacepackets::time::TimeWriter; +use satrs_core::spacepackets::time::{StdTimestampError, TimeWriter}; use satrs_core::spacepackets::tm::PusTm; use satrs_core::tmtc::tm_helper::{PusTmWithCdsShortHelper, SharedTmStore}; -use satrs_example::TEST_EVENT; +use satrs_example::{tmtc_err, TEST_EVENT}; use std::sync::mpsc::{Receiver, Sender, TryRecvError}; +use std::thread; +use std::time::Duration; -pub struct SatrsTestServiceCustomHandler { - pub event_sender: Sender<(EventU32, Option)>, +pub struct Service17CustomWrapper { + pub pus17_handler: PusService17TestHandler, + pub test_srv_event_sender: Sender<(EventU32, Option)>, +} + +impl Service17CustomWrapper { + pub fn perform_operation(&mut self) -> bool { + let mut handled_pings = 0; + let res = self.pus17_handler.handle_next_packet(); + if res.is_err() { + warn!("PUS17 handler failed with error {:?}", res.unwrap_err()); + return true; + } + match res.unwrap() { + PusPacketHandlerResult::RequestHandled => { + info!("Received PUS ping command TC[17,1]"); + info!("Sent ping reply PUS TM[17,2]"); + handled_pings += 1; + } + PusPacketHandlerResult::CustomSubservice(token) => { + let (buf, _) = self.pus17_handler.pus_tc_buf(); + let (tc, size) = PusTc::from_bytes(buf).unwrap(); + let time_stamper = TimeProvider::from_now_with_u16_days().unwrap(); + let mut stamp_buf: [u8; 7] = [0; 7]; + time_stamper.write_to_bytes(&mut stamp_buf).unwrap(); + if tc.subservice() == 128 { + info!("Generating test event"); + self.test_srv_event_sender + .send((TEST_EVENT.into(), None)) + .expect("Sending test event failed"); + let start_token = self + .pus17_handler + .verification_handler() + .start_success(token, Some(&stamp_buf)) + .expect("Error sending start success"); + self.pus17_handler + .verification_handler() + .completion_success(start_token, Some(&stamp_buf)) + .expect("Error sending completion success"); + } else { + let fail_data = [tc.subservice()]; + self.pus17_handler + .verification_handler() + .start_failure( + token, + FailParams::new( + Some(&stamp_buf), + &tmtc_err::INVALID_PUS_SUBSERVICE, + Some(&fail_data), + ), + ) + .expect("Sending start failure verification failed"); + } + } + PusPacketHandlerResult::Empty => { + return false; + } + } + true + } } pub struct PusService17TestHandler { psb: PusServiceBase, } -pub enum PacketHandlerResult { - PingRequestHandled, - CustomSubservice(VerificationToken), - Empty, -} - impl PusService17TestHandler { pub fn new( receiver: Receiver, @@ -60,75 +115,57 @@ impl PusService17TestHandler { (&self.psb.pus_buf, self.psb.pus_size) } - // TODO: Return errors which occured - pub fn periodic_operation(&mut self) -> Result { - self.psb.handled_tcs = 0; - loop { - match self.psb.tc_rx.try_recv() { - Ok((addr, token)) => { - self.handle_one_tc(addr, token); - } - Err(e) => { - match e { - TryRecvError::Empty => return Ok(self.psb.handled_tcs), - TryRecvError::Disconnected => { - // TODO: Replace panic by something cleaner - panic!("PusService17Handler: Sender disconnected"); - } - } - } - } - } - } - - pub fn handle_next_packet(&mut self) -> Result { - match self.psb.tc_rx.try_recv() { + pub fn handle_next_packet(&mut self) -> Result { + return match self.psb.tc_rx.try_recv() { Ok((addr, token)) => { - if self.handle_one_tc(addr, token) { - return Ok(PacketHandlerResult::PingRequestHandled); - } else { - return Ok(PacketHandlerResult::CustomSubservice); + if self.handle_one_tc(addr, token)? { + return Ok(PusPacketHandlerResult::RequestHandled); } + Ok(PusPacketHandlerResult::CustomSubservice(token)) } - Err(e) => { - match e { - TryRecvError::Empty => return Ok(PacketHandlerResult::Empty), - TryRecvError::Disconnected => { - // TODO: Replace panic by something cleaner - panic!("PusService17Handler: Sender disconnected"); - } - } - } - } + Err(e) => match e { + TryRecvError::Empty => Ok(PusPacketHandlerResult::Empty), + TryRecvError::Disconnected => Err(PusPacketHandlingError::QueueDisconnected), + }, + }; } pub fn handle_one_tc( &mut self, addr: StoreAddr, token: VerificationToken, - ) -> bool { - let time_provider = TimeProvider::from_now_with_u16_days().unwrap(); - // TODO: Better error handling + ) -> Result { { // Keep locked section as short as possible. - let mut tc_pool = self.psb.tc_store.write().unwrap(); + let mut tc_pool = self + .psb + .tc_store + .write() + .map_err(|e| PusPacketHandlingError::RwGuardError(format!("{e}")))?; let tc_guard = tc_pool.read_with_guard(addr); - let tc_raw = tc_guard.read().unwrap(); + let tc_raw = tc_guard.read().expect("Reading pool guard failed"); self.psb.pus_buf[0..tc_raw.len()].copy_from_slice(tc_raw); } - let (tc, tc_size) = PusTc::from_bytes(&self.psb.pus_buf).unwrap(); - // TODO: Robustness: Check that service is 17 + let (tc, tc_size) = PusTc::from_bytes(&self.psb.pus_buf)?; + if tc.service() != 17 { + return Err(PusPacketHandlingError::WrongService(tc.service())); + } if tc.subservice() == 1 { - info!("Received PUS ping command TC[17,1]"); - info!("Sending ping reply PUS TM[17,2]"); + let time_provider = TimeProvider::from_now_with_u16_days()?; + // Can not fail, buffer is large enough. time_provider .write_to_bytes(&mut self.psb.stamp_buf) .unwrap(); - let start_token = self + let result = self .psb .verification_handler - .start_success(token, Some(&self.psb.stamp_buf)) - .expect("Error sending start success"); + .start_success(token, Some(&self.psb.stamp_buf)); + let start_token = if result.is_err() { + error!("Could not send start success verification"); + None + } else { + Some(result.unwrap()) + }; // Sequence count will be handled centrally in TM funnel. let ping_reply = self.psb @@ -138,29 +175,19 @@ impl PusService17TestHandler { self.psb .tm_tx .send(addr) - .expect("Sending TM to TM funnel failed"); - self.psb - .verification_handler - .completion_success(start_token, Some(&self.psb.stamp_buf)) - .expect("Error sending completion success"); - self.psb.handled_tcs += 1; - true + .map_err(|e| PusPacketHandlingError::TmSendError(format!("{e}")))?; + if let Some(start_token) = start_token { + if self + .psb + .verification_handler + .completion_success(start_token, Some(&self.psb.stamp_buf)) + .is_err() + { + error!("Could not send completion success verification"); + } + } + return Ok(true); } - false - // TODO: How to handle invalid subservice? - // TODO: How do we handle custom code like this? Custom subservice handler via trait? - // if tc.subservice() == 128 { - // info!("Generating test event"); - // self.event_sender - // .send((TEST_EVENT.into(), None)) - // .expect("Sending test event failed"); - // let start_token = - // verification_handler - // .start_success(token, Some(&stamp_buf)) - // .expect("Error sending start success"); - // verification_handler - // .completion_success(start_token, Some(&stamp_buf)) - // .expect("Error sending completion success"); - // + Ok(false) } }