diff --git a/satrs-core/src/hal/host/udp_server.rs b/satrs-core/src/hal/host/udp_server.rs index ca0a26e..a83f4f8 100644 --- a/satrs-core/src/hal/host/udp_server.rs +++ b/satrs-core/src/hal/host/udp_server.rs @@ -141,6 +141,7 @@ impl UdpTcServer { mod tests { use crate::hal::host::udp_server::{ReceiveResult, UdpTcServer}; use crate::tmtc::ReceivesTcCore; + use spacepackets::ecss::SerializablePusPacket; use spacepackets::tc::PusTc; use spacepackets::SpHeader; use std::boxed::Box; diff --git a/satrs-core/src/pus/event.rs b/satrs-core/src/pus/event.rs index 146986c..9bea69c 100644 --- a/satrs-core/src/pus/event.rs +++ b/satrs-core/src/pus/event.rs @@ -243,6 +243,7 @@ mod tests { use super::*; use crate::events::{EventU32, Severity}; use crate::pus::tests::CommonTmInfo; + use crate::pus::EcssSender; use crate::SenderId; use spacepackets::ByteConversionError; use std::collections::VecDeque; @@ -266,12 +267,15 @@ mod tests { pub service_queue: VecDeque, } - impl EcssTmSenderCore for TestSender { - type Error = (); - + impl EcssSender for TestSender { fn id(&self) -> SenderId { 0 } + } + + impl EcssTmSenderCore for TestSender { + type Error = (); + fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error> { assert!(tm.source_data().is_some()); let src_data = tm.source_data().unwrap(); @@ -413,7 +417,7 @@ mod tests { let err = reporter.event_info(sender, &time_stamp_empty, event, None); assert!(err.is_err()); let err = err.unwrap_err(); - if let EcssTmErrorWithSend::EcssTmError(EcssTmtcError::ByteConversionError( + if let EcssTmtcErrorWithSend::EcssTmtcError(EcssTmtcError::ByteConversionError( ByteConversionError::ToSliceTooSmall(missmatch), )) = err { diff --git a/satrs-core/src/pus/mod.rs b/satrs-core/src/pus/mod.rs index c74e587..1fd7a23 100644 --- a/satrs-core/src/pus/mod.rs +++ b/satrs-core/src/pus/mod.rs @@ -20,6 +20,7 @@ pub mod verification; #[cfg(feature = "alloc")] pub use alloc_mod::*; +use crate::pus::verification::TcStateToken; use crate::SenderId; #[cfg(feature = "std")] pub use std_mod::*; diff --git a/satrs-core/src/pus/scheduling.rs b/satrs-core/src/pus/scheduling.rs index 63d827f..cf3d8e0 100644 --- a/satrs-core/src/pus/scheduling.rs +++ b/satrs-core/src/pus/scheduling.rs @@ -620,6 +620,7 @@ impl PusScheduler { mod tests { use super::*; use crate::pool::{LocalPool, PoolCfg, PoolProvider, StoreAddr, StoreError}; + use spacepackets::ecss::SerializablePusPacket; use spacepackets::tc::{PusTc, PusTcSecondaryHeader}; use spacepackets::time::{cds, TimeWriter, UnixTimestamp}; use spacepackets::SpHeader; diff --git a/satrs-core/src/pus/verification.rs b/satrs-core/src/pus/verification.rs index 630afde..f9efd16 100644 --- a/satrs-core/src/pus/verification.rs +++ b/satrs-core/src/pus/verification.rs @@ -207,15 +207,19 @@ pub struct TcStateNone; pub struct TcStateAccepted; #[derive(Copy, Clone, Debug, Eq, PartialEq)] pub struct TcStateStarted; +#[derive(Copy, Clone, Debug, Eq, PartialEq)] +pub struct TcStateCompleted; impl WasAtLeastAccepted for TcStateAccepted {} impl WasAtLeastAccepted for TcStateStarted {} +impl WasAtLeastAccepted for TcStateCompleted {} #[derive(Debug, Eq, PartialEq)] pub enum TcStateToken { None(VerificationToken), Accepted(VerificationToken), Started(VerificationToken), + Completed(VerificationToken), } impl From> for TcStateToken { @@ -236,6 +240,12 @@ impl From> for TcStateToken { } } +impl From> for TcStateToken { + fn from(t: VerificationToken) -> Self { + TcStateToken::Completed(t) + } +} + impl VerificationToken { fn new(req_id: RequestId) -> VerificationToken { VerificationToken { @@ -1529,11 +1539,11 @@ mod tests { use crate::pool::{LocalPool, PoolCfg, SharedPool}; use crate::pus::tests::CommonTmInfo; use crate::pus::verification::{ - EcssTmError, EcssTmSenderCore, FailParams, FailParamsWithStep, MpscVerifSender, RequestId, - TcStateNone, VerificationReporter, VerificationReporterCfg, VerificationReporterWithSender, - VerificationToken, + EcssTmSenderCore, EcssTmtcError, FailParams, FailParamsWithStep, MpscVerifSender, + RequestId, TcStateNone, VerificationReporter, VerificationReporterCfg, + VerificationReporterWithSender, VerificationToken, }; - use crate::pus::EcssTmErrorWithSend; + use crate::pus::{EcssSender, EcssTmtcErrorWithSend}; use crate::seq_count::SeqCountProviderSimple; use crate::SenderId; use alloc::boxed::Box; @@ -1541,6 +1551,7 @@ mod tests { use spacepackets::ecss::{EcssEnumU16, EcssEnumU32, EcssEnumU8, EcssEnumeration, PusPacket}; use spacepackets::tc::{PusTc, PusTcSecondaryHeader}; use spacepackets::tm::PusTm; + use spacepackets::util::UnsignedEnum; use spacepackets::{ByteConversionError, CcsdsPacket, SpHeader}; use std::collections::VecDeque; use std::sync::{mpsc, Arc, RwLock}; @@ -1567,12 +1578,17 @@ mod tests { pub service_queue: VecDeque, } - impl EcssTmSenderCore for TestSender { - type Error = (); - + impl EcssSender for TestSender { fn id(&self) -> SenderId { 0 } + fn name(&self) -> &'static str { + "test_sender" + } + } + + impl EcssTmSenderCore for TestSender { + type Error = (); fn send_tm(&mut self, tm: PusTm) -> Result<(), Self::Error> { assert_eq!(PusPacket::service(&tm), 1); @@ -1595,10 +1611,6 @@ mod tests { }); Ok(()) } - - fn name(&self) -> &'static str { - "test_sender" - } } #[derive(Debug, Copy, Clone, Eq, PartialEq)] @@ -1606,11 +1618,13 @@ mod tests { #[derive(Default, Clone)] struct FallibleSender {} - impl EcssTmSenderCore for FallibleSender { - type Error = DummyError; + impl EcssSender for FallibleSender { fn id(&self) -> SenderId { 0 } + } + impl EcssTmSenderCore for FallibleSender { + type Error = DummyError; fn send_tm(&mut self, _: PusTm) -> Result<(), Self::Error> { Err(DummyError {}) } @@ -1747,7 +1761,7 @@ mod tests { let err = res.unwrap_err(); assert_eq!(err.1, tok); match err.0 { - EcssTmErrorWithSend::SendError(e) => { + EcssTmtcErrorWithSend::SendError(e) => { assert_eq!(e, DummyError {}) } _ => panic!("{}", format!("Unexpected error {:?}", err.0)), @@ -1817,18 +1831,20 @@ mod tests { let err_with_token = res.unwrap_err(); assert_eq!(err_with_token.1, tok); match err_with_token.0 { - EcssTmErrorWithSend::EcssTmError(EcssTmtcError::ByteConversionError(e)) => match e { - ByteConversionError::ToSliceTooSmall(missmatch) => { - assert_eq!( - missmatch.expected, - fail_data.len() + RequestId::SIZE_AS_BYTES + fail_code.byte_width() - ); - assert_eq!(missmatch.found, b.rep().allowed_source_data_len()); + EcssTmtcErrorWithSend::EcssTmtcError(EcssTmtcError::ByteConversionError(e)) => { + match e { + ByteConversionError::ToSliceTooSmall(missmatch) => { + assert_eq!( + missmatch.expected, + fail_data.len() + RequestId::SIZE_AS_BYTES + fail_code.size() + ); + assert_eq!(missmatch.found, b.rep().allowed_source_data_len()); + } + _ => { + panic!("{}", format!("Unexpected error {:?}", e)) + } } - _ => { - panic!("{}", format!("Unexpected error {:?}", e)) - } - }, + } _ => { panic!("{}", format!("Unexpected error {:?}", err_with_token.0)) } @@ -2386,7 +2402,8 @@ mod tests { let shared_tm_pool: SharedPool = Arc::new(RwLock::new(Box::new(LocalPool::new(pool_cfg.clone())))); let (verif_tx, verif_rx) = mpsc::channel(); - let sender = MpscVerifSender::new(shared_tm_pool.clone(), verif_tx); + let sender = + MpscVerifSender::new(0, "Verification Sender", shared_tm_pool.clone(), verif_tx); let cfg = VerificationReporterCfg::new( TEST_APID, Box::new(SeqCountProviderSimple::default()), diff --git a/satrs-core/src/tmtc/ccsds_distrib.rs b/satrs-core/src/tmtc/ccsds_distrib.rs index 1e45680..6cb4987 100644 --- a/satrs-core/src/tmtc/ccsds_distrib.rs +++ b/satrs-core/src/tmtc/ccsds_distrib.rs @@ -224,6 +224,7 @@ impl CcsdsDistributor { pub(crate) mod tests { use super::*; use crate::tmtc::ccsds_distrib::{CcsdsDistributor, CcsdsPacketHandler}; + use spacepackets::ecss::SerializablePusPacket; use spacepackets::tc::PusTc; use spacepackets::CcsdsPacket; use std::collections::VecDeque; diff --git a/satrs-core/tests/pus_verification.rs b/satrs-core/tests/pus_verification.rs index 5a8fce2..035904b 100644 --- a/satrs-core/tests/pus_verification.rs +++ b/satrs-core/tests/pus_verification.rs @@ -8,7 +8,7 @@ pub mod crossbeam_test { VerificationReporterWithSender, }; use satrs_core::seq_count::SeqCountProviderSyncClonable; - use spacepackets::ecss::{EcssEnumU16, EcssEnumU8, PusPacket}; + use spacepackets::ecss::{EcssEnumU16, EcssEnumU8, PusPacket, SerializablePusPacket}; use spacepackets::tc::{PusTc, PusTcSecondaryHeader}; use spacepackets::tm::PusTm; use spacepackets::SpHeader; diff --git a/satrs-example/src/pus.rs b/satrs-example/src/pus/mod.rs similarity index 86% rename from satrs-example/src/pus.rs rename to satrs-example/src/pus/mod.rs index efa04fd..96389e4 100644 --- a/satrs-example/src/pus.rs +++ b/satrs-example/src/pus/mod.rs @@ -1,5 +1,5 @@ use crate::requests::{Request, RequestWithToken}; -use crate::tmtc::{PusTcSource, TmStore, PUS_APID}; +use crate::tmtc::{MpscStoreAndSendError, PusTcSource, TmStore, PUS_APID}; use log::{info, warn}; use satrs_core::events::EventU32; use satrs_core::hk::{CollectionIntervalFactor, HkRequest}; @@ -34,6 +34,8 @@ use std::convert::TryFrom; use std::rc::Rc; use std::sync::mpsc::{Receiver, Sender}; +pub mod test; + // pub trait PusTcRouter { // type Error; // fn route_pus_tc( @@ -45,14 +47,19 @@ use std::sync::mpsc::{Receiver, Sender}; // ); // } +pub enum PusTcWrapper<'tc> { + PusTc(&'tc PusTc<'tc>), + StoreAddr(StoreAddr), +} + pub type AcceptedTc = (StoreAddr, VerificationToken); pub struct PusTcMpscRouter { - test_service_receiver: MpscTmtcInStoreSender, - event_service_receiver: Sender, - sched_service_receiver: Sender, - hk_service_receiver: Sender, - action_service_receiver: Sender, + pub test_service_receiver: Sender, + pub event_service_receiver: Sender, + pub sched_service_receiver: Sender, + pub hk_service_receiver: Sender, + pub action_service_receiver: Sender, } // impl PusTcRouter for PusTcMpscRouter { @@ -146,7 +153,7 @@ impl PusTmArgs { // } pub struct PusTcArgs { - pub event_request_tx: Sender, + //pub event_request_tx: Sender, /// This routes all telecommands to their respective recipients pub pus_router: PusTcMpscRouter, /// Request routing helper. Maps targeted requests to their recipient. @@ -196,15 +203,13 @@ impl PusReceiver { } } -impl PusServiceProvider for PusReceiver { - type Error = (); - - fn handle_pus_tc_packet( +impl PusReceiver { + pub fn handle_tc_packet( &mut self, + store_addr: StoreAddr, service: u8, - _header: &SpHeader, pus_tc: &PusTc, - ) -> Result<(), Self::Error> { + ) -> Result<(), MpscStoreAndSendError> { let init_token = self.tm_args.verif_reporter.add_tc(pus_tc); self.stamp_helper.update_from_now(); let accepted_token = self @@ -219,20 +224,26 @@ impl PusServiceProvider for PusReceiver { .tc_args .pus_router .test_service_receiver - .send_tc(*pus_tc), - PusServiceId::Housekeeping => { - self.tc_args.pus_router.hk_service_receiver.send_tc(*pus_tc) - } //self.handle_hk_request(pus_tc, accepted_token), + .send((store_addr, accepted_token)) + .unwrap(), + PusServiceId::Housekeeping => self + .tc_args + .pus_router + .hk_service_receiver + .send((store_addr, accepted_token)) + .unwrap(), PusServiceId::Event => self .tc_args .pus_router .event_service_receiver - .send_tc(*pus_tc), //self.handle_event_request(pus_tc, accepted_token), + .send((store_addr, accepted_token)) + .unwrap(), PusServiceId::Scheduling => self .tc_args .pus_router .sched_service_receiver - .send_tc(*pus_tc), //self.handle_scheduled_tc(pus_tc, accepted_token), + .send((store_addr, accepted_token)) + .unwrap(), _ => self .tm_args .verif_reporter @@ -250,7 +261,7 @@ impl PusServiceProvider for PusReceiver { if let Ok(custom_service) = CustomPusServiceId::try_from(e.number) { match custom_service { CustomPusServiceId::Mode => { - self.handle_mode_service(pus_tc, accepted_token) + //self.handle_mode_service(pus_tc, accepted_token) } CustomPusServiceId::Health => {} } @@ -272,6 +283,82 @@ impl PusServiceProvider for PusReceiver { Ok(()) } } +// impl PusServiceProvider for PusReceiver { +// type Error = (); +// +// fn handle_pus_tc_packet( +// &mut self, +// service: u8, +// _header: &SpHeader, +// pus_tc: &PusTc, +// ) -> Result<(), Self::Error> { +// let init_token = self.tm_args.verif_reporter.add_tc(pus_tc); +// self.stamp_helper.update_from_now(); +// let accepted_token = self +// .tm_args +// .vr() +// .acceptance_success(init_token, Some(self.stamp_helper.stamp())) +// .expect("Acceptance success failure"); +// let service = PusServiceId::try_from(service); +// match service { +// Ok(standard_service) => match standard_service { +// PusServiceId::Test => self +// .tc_args +// .pus_router +// .test_service_receiver +// .send_tc(*pus_tc), +// PusServiceId::Housekeeping => { +// self.tc_args.pus_router.hk_service_receiver.send_tc(*pus_tc) +// } //self.handle_hk_request(pus_tc, accepted_token), +// PusServiceId::Event => self +// .tc_args +// .pus_router +// .event_service_receiver +// .send_tc(*pus_tc), //self.handle_event_request(pus_tc, accepted_token), +// PusServiceId::Scheduling => self +// .tc_args +// .pus_router +// .sched_service_receiver +// .send_tc(*pus_tc), //self.handle_scheduled_tc(pus_tc, accepted_token), +// _ => self +// .tm_args +// .verif_reporter +// .start_failure( +// accepted_token, +// FailParams::new( +// Some(self.stamp_helper.stamp()), +// &tmtc_err::PUS_SERVICE_NOT_IMPLEMENTED, +// Some(&[standard_service as u8]), +// ), +// ) +// .expect("Start failure verification failed"), +// }, +// Err(e) => { +// if let Ok(custom_service) = CustomPusServiceId::try_from(e.number) { +// match custom_service { +// CustomPusServiceId::Mode => { +// self.handle_mode_service(pus_tc, accepted_token) +// } +// CustomPusServiceId::Health => {} +// } +// } else { +// self.tm_args +// .verif_reporter +// .start_failure( +// accepted_token, +// FailParams::new( +// Some(self.stamp_helper.stamp()), +// &tmtc_err::INVALID_PUS_SUBSERVICE, +// Some(&[e.number]), +// ), +// ) +// .expect("Start failure verification failed") +// } +// } +// } +// Ok(()) +// } +// } // impl PusReceiver { // fn handle_test_service(&mut self, pus_tc: &PusTc, token: VerificationToken) { diff --git a/satrs-example/src/pus/test.rs b/satrs-example/src/pus/test.rs new file mode 100644 index 0000000..4fcf8dc --- /dev/null +++ b/satrs-example/src/pus/test.rs @@ -0,0 +1,12 @@ +use crate::pus::AcceptedTc; +use satrs_core::pus::verification::StdVerifReporterWithSender; +use std::sync::mpsc::Receiver; + +struct PusService17Handler { + receiver: Receiver, + verification_handler: StdVerifReporterWithSender, +} + +impl PusService17Handler { + pub fn periodic_operation(&mut self) {} +} diff --git a/satrs-example/src/tmtc.rs b/satrs-example/src/tmtc.rs index 0124cc2..0e05799 100644 --- a/satrs-example/src/tmtc.rs +++ b/satrs-example/src/tmtc.rs @@ -8,12 +8,13 @@ use std::error::Error; use std::fmt::{Display, Formatter}; use std::net::SocketAddr; use std::rc::Rc; +use std::sync::mpsc; use std::sync::mpsc::{Receiver, SendError, Sender, TryRecvError}; use std::thread; use std::time::Duration; use crate::ccsds::CcsdsReceiver; -use crate::pus::{PusReceiver, PusTcArgs, PusTmArgs}; +use crate::pus::{PusReceiver, PusTcArgs, PusTcMpscRouter, PusTmArgs}; use crate::requests::RequestWithToken; use satrs_core::pool::{SharedPool, StoreAddr, StoreError}; use satrs_core::pus::event_man::EventRequestWithToken; @@ -178,12 +179,21 @@ pub fn core_tmtc_task(args: OtherArgs, mut tc_args: TcArgs, tm_args: TmArgs) { verif_reporter: args.verif_reporter, seq_count_provider: args.seq_count_provider.clone(), }; + let (pus_test_tx, pus_tedt_rx) = mpsc::channel(); + let (pus_event_tx, pus_event_rx) = mpsc::channel(); + let (pus_sched_tx, pus_sched_rx) = mpsc::channel(); + let (pus_hk_tx, pus_hk_rx) = mpsc::channel(); + let (pus_action_tx, pus_action_rx) = mpsc::channel(); + let pus_router = PusTcMpscRouter { + test_service_receiver: pus_test_tx, + event_service_receiver: pus_event_tx, + sched_service_receiver: pus_sched_tx, + hk_service_receiver: pus_hk_tx, + action_service_receiver: pus_action_tx, + }; let pus_tc_args = PusTcArgs { - event_request_tx: args.event_request_tx, - request_map: args.request_map, - tc_source: tc_args.tc_source.clone(), + pus_router, event_sender: args.event_sender, - scheduler: sched_clone, }; let mut pus_receiver = PusReceiver::new(PUS_APID, pus_tm_args, pus_tc_args); @@ -266,7 +276,7 @@ fn core_tmtc_loop( match PusTc::from_bytes(tc_buf) { Ok((pus_tc, _)) => { pus_receiver - .handle_pus_tc_packet(pus_tc.service(), pus_tc.sp_header(), &pus_tc) + .handle_tc_packet(addr, pus_tc.service(), &pus_tc) .ok(); } Err(e) => {