From 78c54cf617443d828f7d5126a1f2f064b0769137 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Wed, 21 Dec 2022 19:50:31 +0100 Subject: [PATCH 1/7] add Send requirement on traits --- satrs-core/src/hal/host/udp_server.rs | 4 ++++ satrs-core/src/tmtc/ccsds_distrib.rs | 5 ++++- satrs-core/src/tmtc/mod.rs | 2 +- satrs-core/src/tmtc/pus_distrib.rs | 6 ++++-- satrs-example/src/main.rs | 1 + 5 files changed, 14 insertions(+), 4 deletions(-) diff --git a/satrs-core/src/hal/host/udp_server.rs b/satrs-core/src/hal/host/udp_server.rs index 97acb64..84a40b7 100644 --- a/satrs-core/src/hal/host/udp_server.rs +++ b/satrs-core/src/hal/host/udp_server.rs @@ -140,6 +140,8 @@ mod tests { use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}; use std::vec::Vec; + fn is_send(_: &T) {} + #[derive(Default)] struct PingReceiver { pub sent_cmds: VecDeque>, @@ -161,8 +163,10 @@ mod tests { let mut buf = [0; 32]; let dest_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 7777); let ping_receiver = PingReceiver::default(); + is_send(&ping_receiver); let mut udp_tc_server = UdpTcServer::new(dest_addr, 2048, Box::new(ping_receiver)) .expect("Creating UDP TMTC server failed"); + is_send(&udp_tc_server); let mut sph = SpHeader::tc_unseg(0x02, 0, 0).unwrap(); let pus_tc = PusTc::new_simple(&mut sph, 17, 1, None, true); let len = pus_tc diff --git a/satrs-core/src/tmtc/ccsds_distrib.rs b/satrs-core/src/tmtc/ccsds_distrib.rs index 8934a90..6ff51a5 100644 --- a/satrs-core/src/tmtc/ccsds_distrib.rs +++ b/satrs-core/src/tmtc/ccsds_distrib.rs @@ -99,7 +99,7 @@ use spacepackets::{ByteConversionError, CcsdsPacket, SizeMissmatch, SpHeader}; /// This trait automatically implements the [downcast_rs::Downcast] to allow a more convenient API /// to cast trait objects back to their concrete type after the handler was passed to the /// distributor. -pub trait CcsdsPacketHandler: Downcast { +pub trait CcsdsPacketHandler: Downcast + Send { type Error; fn valid_apids(&self) -> &'static [u16]; @@ -199,6 +199,8 @@ pub(crate) mod tests { use std::sync::{Arc, Mutex}; use std::vec::Vec; + fn is_send(_: &T) {} + pub fn generate_ping_tc(buf: &mut [u8]) -> &[u8] { let mut sph = SpHeader::tc_unseg(0x002, 0x34, 0).unwrap(); let pus_tc = PusTc::new_simple(&mut sph, 17, 1, None, true); @@ -292,6 +294,7 @@ pub(crate) mod tests { unknown_packet_queue: unknown_packet_queue.clone(), }; let mut ccsds_distrib = CcsdsDistributor::new(Box::new(apid_handler)); + is_send(&ccsds_distrib); let mut test_buf: [u8; 32] = [0; 32]; let tc_slice = generate_ping_tc(test_buf.as_mut_slice()); diff --git a/satrs-core/src/tmtc/mod.rs b/satrs-core/src/tmtc/mod.rs index 0a31362..6d67887 100644 --- a/satrs-core/src/tmtc/mod.rs +++ b/satrs-core/src/tmtc/mod.rs @@ -48,7 +48,7 @@ impl AddressableId { /// This trait is implemented by both the [crate::tmtc::pus_distrib::PusDistributor] and the /// [crate::tmtc::ccsds_distrib::CcsdsDistributor] which allows to pass the respective packets in /// raw byte format into them. -pub trait ReceivesTc: Downcast { +pub trait ReceivesTc: Downcast + Send { type Error; fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error>; } diff --git a/satrs-core/src/tmtc/pus_distrib.rs b/satrs-core/src/tmtc/pus_distrib.rs index 206be08..ee13bb4 100644 --- a/satrs-core/src/tmtc/pus_distrib.rs +++ b/satrs-core/src/tmtc/pus_distrib.rs @@ -67,7 +67,7 @@ use spacepackets::ecss::{PusError, PusPacket}; use spacepackets::tc::PusTc; use spacepackets::SpHeader; -pub trait PusServiceProvider: Downcast { +pub trait PusServiceProvider: Downcast + Send { type Error; fn handle_pus_tc_packet( &mut self, @@ -147,6 +147,8 @@ mod tests { #[cfg(feature = "std")] use std::sync::{Arc, Mutex}; + fn is_send(_: &T) {} + struct PusHandlerSharedQueue { pub pus_queue: Arc)>>>, } @@ -263,7 +265,7 @@ mod tests { let pus_distrib = PusDistributor { service_provider: Box::new(pus_handler), }; - + is_send(&pus_distrib); let apid_handler = ApidHandlerShared { pus_distrib, handler_base, diff --git a/satrs-example/src/main.rs b/satrs-example/src/main.rs index f929798..fe7e88e 100644 --- a/satrs-example/src/main.rs +++ b/satrs-example/src/main.rs @@ -69,6 +69,7 @@ impl EcssTmSender for EventTmSender { self.sender.send(addr).map_err(EcssTmError::SendError) } } + fn main() { println!("Running OBSW example"); let pool_cfg = PoolCfg::new(vec![(8, 32), (4, 64), (2, 128)]); From 0b0a929a11e96dee8cad1ec99a76c0785272a6fc Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Wed, 21 Dec 2022 20:02:29 +0100 Subject: [PATCH 2/7] impl ReceivesTc for TC UDP server --- satrs-core/src/hal/host/udp_server.rs | 8 ++++++++ satrs-example/src/main.rs | 2 -- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/satrs-core/src/hal/host/udp_server.rs b/satrs-core/src/hal/host/udp_server.rs index 84a40b7..ce600d0 100644 --- a/satrs-core/src/hal/host/udp_server.rs +++ b/satrs-core/src/hal/host/udp_server.rs @@ -89,6 +89,14 @@ impl PartialEq for ReceiveResult { impl Eq for ReceiveResult {} +impl ReceivesTc for UdpTcServer { + type Error = E; + + fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> { + self.tc_receiver.pass_tc(tc_raw) + } +} + impl UdpTcServer { pub fn new( addr: A, diff --git a/satrs-example/src/main.rs b/satrs-example/src/main.rs index fe7e88e..fddd1b1 100644 --- a/satrs-example/src/main.rs +++ b/satrs-example/src/main.rs @@ -44,8 +44,6 @@ struct UdpTmtcServer { tm_store: SharedPool, } -unsafe impl Send for UdpTmtcServer {} - #[derive(Clone)] struct EventTmSender { store_helper: TmStore, From 49c5f3eda22f47b359dcd90488a1368234286898 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Wed, 21 Dec 2022 22:14:42 +0100 Subject: [PATCH 3/7] refactored example to support different tc sources --- satrs-core/src/pool.rs | 51 ++++++++++ satrs-example/src/ccsds.rs | 12 +-- satrs-example/src/main.rs | 82 +++++++++------ satrs-example/src/pus.rs | 6 +- satrs-example/src/tmtc.rs | 200 +++++++++++++++++++++++++++++++------ spacepackets | 2 +- 6 files changed, 281 insertions(+), 72 deletions(-) diff --git a/satrs-core/src/pool.rs b/satrs-core/src/pool.rs index ebc404e..a37e406 100644 --- a/satrs-core/src/pool.rs +++ b/satrs-core/src/pool.rs @@ -77,10 +77,13 @@ use alloc::format; use alloc::string::String; use alloc::vec; use alloc::vec::Vec; +use core::fmt::{Display, Formatter}; use delegate::delegate; #[cfg(feature = "std")] use std::boxed::Box; #[cfg(feature = "std")] +use std::error::Error; +#[cfg(feature = "std")] use std::sync::{Arc, RwLock}; type NumBlocks = u16; @@ -146,6 +149,22 @@ pub enum StoreIdError { InvalidPacketIdx(u16), } +impl Display for StoreIdError { + fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { + match self { + StoreIdError::InvalidSubpool(pool) => { + write!(f, "invalid subpool, index: {}", pool) + } + StoreIdError::InvalidPacketIdx(packet_idx) => { + write!(f, "invalid packet index: {}", packet_idx) + } + } + } +} + +#[cfg(feature = "std")] +impl Error for StoreIdError {} + #[derive(Debug, Clone, PartialEq, Eq)] pub enum StoreError { /// Requested data block is too large @@ -160,6 +179,38 @@ pub enum StoreError { InternalError(String), } +impl Display for StoreError { + fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { + match self { + StoreError::DataTooLarge(size) => { + write!(f, "data to store with size {} is too large", size) + } + StoreError::StoreFull(u16) => { + write!(f, "store is too full. index for full subpool: {}", u16) + } + StoreError::InvalidStoreId(id_e, addr) => { + write!(f, "invalid store ID: {}, address: {:?}", id_e, addr) + } + StoreError::DataDoesNotExist(addr) => { + write!(f, "no data exists at address {:?}", addr) + } + StoreError::InternalError(e) => { + write!(f, "internal error: {}", e) + } + } + } +} + +#[cfg(feature = "std")] +impl Error for StoreError { + fn source(&self) -> Option<&(dyn Error + 'static)> { + if let StoreError::InvalidStoreId(e, _) = self { + return Some(e); + } + None + } +} + pub trait PoolProvider { /// Add new data to the pool. The provider should attempt to reserve a memory block with the /// appropriate size and then copy the given data to the block. Yields a [StoreAddr] which can diff --git a/satrs-example/src/ccsds.rs b/satrs-example/src/ccsds.rs index cbaf5c7..75c4ae8 100644 --- a/satrs-example/src/ccsds.rs +++ b/satrs-example/src/ccsds.rs @@ -1,13 +1,13 @@ -use crate::tmtc::PUS_APID; -use satrs_core::tmtc::{CcsdsPacketHandler, PusDistributor, ReceivesCcsdsTc}; +use crate::tmtc::{MpscStoreAndSendError, PusTcSource, PUS_APID}; +use satrs_core::tmtc::{CcsdsPacketHandler, ReceivesCcsdsTc}; use spacepackets::{CcsdsPacket, SpHeader}; pub struct CcsdsReceiver { - pub pus_handler: PusDistributor<()>, + pub tc_source: PusTcSource, } impl CcsdsPacketHandler for CcsdsReceiver { - type Error = (); + type Error = MpscStoreAndSendError; fn valid_apids(&self) -> &'static [u16] { &[PUS_APID] @@ -19,9 +19,7 @@ impl CcsdsPacketHandler for CcsdsReceiver { tc_raw: &[u8], ) -> Result<(), Self::Error> { if sp_header.apid() == PUS_APID { - self.pus_handler - .pass_ccsds(sp_header, tc_raw) - .expect("Handling PUS packet failed"); + return self.tc_source.pass_ccsds(sp_header, tc_raw); } Ok(()) } diff --git a/satrs-example/src/main.rs b/satrs-example/src/main.rs index fddd1b1..238c62e 100644 --- a/satrs-example/src/main.rs +++ b/satrs-example/src/main.rs @@ -5,13 +5,14 @@ mod requests; mod tmtc; use crate::requests::RequestWithToken; -use crate::tmtc::{core_tmtc_task, CoreTmtcArgs, TmStore, PUS_APID}; +use crate::tmtc::{ + core_tmtc_task, OtherArgs, PusTcSource, TcArgs, TcStore, TmArgs, TmFunnel, TmStore, PUS_APID, +}; use satrs_core::event_man::{ EventManagerWithMpscQueue, MpscEventReceiver, MpscEventU32SendProvider, SendEventProvider, }; use satrs_core::events::EventU32; -use satrs_core::hal::host::udp_server::UdpTcServer; -use satrs_core::pool::{LocalPool, PoolCfg, SharedPool, StoreAddr}; +use satrs_core::pool::{LocalPool, PoolCfg, StoreAddr}; use satrs_core::pus::event_man::{ DefaultPusMgmtBackendProvider, EventReporter, EventRequest, EventRequestWithToken, PusEventDispatcher, @@ -21,7 +22,6 @@ use satrs_core::pus::verification::{ }; use satrs_core::pus::{EcssTmError, EcssTmSender}; use satrs_core::seq_count::SimpleSeqCountProvider; -use satrs_core::tmtc::CcsdsError; use satrs_example::{RequestTargetId, OBSW_SERVER_ADDR, SERVER_PORT}; use spacepackets::time::cds::TimeProvider; use spacepackets::time::TimeWriter; @@ -33,17 +33,6 @@ use std::sync::{mpsc, Arc, RwLock}; use std::thread; use std::time::Duration; -struct TmFunnel { - tm_funnel_rx: mpsc::Receiver, - tm_server_tx: mpsc::Sender, -} - -struct UdpTmtcServer { - udp_tc_server: UdpTcServer>, - tm_rx: mpsc::Receiver, - tm_store: SharedPool, -} - #[derive(Clone)] struct EventTmSender { store_helper: TmStore, @@ -70,16 +59,33 @@ impl EcssTmSender for EventTmSender { fn main() { println!("Running OBSW example"); - let pool_cfg = PoolCfg::new(vec![(8, 32), (4, 64), (2, 128)]); - let tm_pool = LocalPool::new(pool_cfg); - let tm_store: SharedPool = Arc::new(RwLock::new(Box::new(tm_pool))); - let tm_store_helper = TmStore { - pool: tm_store.clone(), + let tm_pool = LocalPool::new(PoolCfg::new(vec![ + (30, 32), + (15, 64), + (15, 128), + (15, 256), + (15, 1024), + (15, 2048), + ])); + let tm_store = TmStore { + pool: Arc::new(RwLock::new(Box::new(tm_pool))), }; - let addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT); + let tc_pool = LocalPool::new(PoolCfg::new(vec![ + (30, 32), + (15, 64), + (15, 128), + (15, 256), + (15, 1024), + (15, 2048), + ])); + let tc_store = TcStore { + pool: Arc::new(RwLock::new(Box::new(tc_pool))), + }; + let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT); + let (tc_source_tx, tc_source_rx) = channel(); let (tm_funnel_tx, tm_funnel_rx) = channel(); let (tm_server_tx, tm_server_rx) = channel(); - let sender = MpscVerifSender::new(tm_store.clone(), tm_funnel_tx.clone()); + let sender = MpscVerifSender::new(tm_store.pool.clone(), tm_funnel_tx.clone()); let verif_cfg = VerificationReporterCfg::new( PUS_APID, #[allow(clippy::box_default)] @@ -89,7 +95,7 @@ fn main() { 8, ) .unwrap(); - let reporter_with_sender_0 = VerificationReporterWithSender::new(&verif_cfg, Box::new(sender)); + let verif_reporter = VerificationReporterWithSender::new(&verif_cfg, Box::new(sender)); // Create event handling components let (event_request_tx, event_request_rx) = channel::(); @@ -102,26 +108,40 @@ fn main() { PusEventDispatcher::new(event_reporter, Box::new(pus_tm_backend)); let (pus_event_man_tx, pus_event_man_rx) = channel(); let pus_event_man_send_provider = MpscEventU32SendProvider::new(1, pus_event_man_tx); - let mut reporter_event_handler = reporter_with_sender_0.clone(); - let mut reporter_aocs = reporter_with_sender_0.clone(); + let mut reporter_event_handler = verif_reporter.clone(); + let mut reporter_aocs = verif_reporter.clone(); event_man.subscribe_all(pus_event_man_send_provider.id()); let mut request_map = HashMap::new(); let (acs_thread_tx, acs_thread_rx) = channel::(); request_map.insert(RequestTargetId::AcsSubsystem as u32, acs_thread_tx); - // Create clones here to allow move for thread 0 - let core_args = CoreTmtcArgs { - tm_store: tm_store_helper.clone(), - tm_sender: tm_funnel_tx.clone(), + let tc_source = PusTcSource { + tc_store, + tc_source: tc_source_tx, + }; + + // Create clones here to allow moving the values + let core_args = OtherArgs { + sock_addr, + verif_reporter, event_sender, event_request_tx, request_map, }; + let tc_args = TcArgs { + tc_source, + tc_receiver: tc_source_rx, + }; + let tm_args = TmArgs { + tm_store: tm_store.clone(), + tm_sink_sender: tm_funnel_tx.clone(), + tm_server_rx, + }; println!("Starting TMTC task"); let jh0 = thread::spawn(move || { - core_tmtc_task(core_args, tm_server_rx, addr, reporter_with_sender_0); + core_tmtc_task(core_args, tc_args, tm_args); }); println!("Starting TM funnel task"); @@ -143,7 +163,7 @@ fn main() { println!("Starting event handling task"); let jh2 = thread::spawn(move || { let mut timestamp: [u8; 7] = [0; 7]; - let mut sender = EventTmSender::new(tm_store_helper, tm_funnel_tx); + let mut sender = EventTmSender::new(tm_store, tm_funnel_tx); let mut time_provider = TimeProvider::new_with_u16_days(0, 0); let mut report_completion = |event_req: EventRequestWithToken, timestamp: &[u8]| { reporter_event_handler diff --git a/satrs-example/src/pus.rs b/satrs-example/src/pus.rs index 49dc6bc..d0480f4 100644 --- a/satrs-example/src/pus.rs +++ b/satrs-example/src/pus.rs @@ -1,6 +1,6 @@ use crate::hk::{CollectionIntervalFactor, HkRequest}; use crate::requests::{Request, RequestWithToken}; -use crate::tmtc::TmStore; +use crate::tmtc::{PusTcSource, TmStore}; use satrs_core::events::EventU32; use satrs_core::pool::StoreAddr; use satrs_core::pus::event::Subservices; @@ -26,6 +26,8 @@ pub struct PusReceiver { pub tm_tx: Sender, pub tm_store: TmStore, pub verif_reporter: StdVerifReporterWithSender, + #[allow(dead_code)] + tc_source: PusTcSource, event_request_tx: Sender, request_map: HashMap>, stamper: TimeProvider, @@ -38,6 +40,7 @@ impl PusReceiver { tm_tx: Sender, tm_store: TmStore, verif_reporter: StdVerifReporterWithSender, + tc_source: PusTcSource, event_request_tx: Sender, request_map: HashMap>, ) -> Self { @@ -46,6 +49,7 @@ impl PusReceiver { tm_tx, tm_store, verif_reporter, + tc_source, event_request_tx, request_map, stamper: TimeProvider::new_with_u16_days(0, 0), diff --git a/satrs-example/src/tmtc.rs b/satrs-example/src/tmtc.rs index e63502a..32546cc 100644 --- a/satrs-example/src/tmtc.rs +++ b/satrs-example/src/tmtc.rs @@ -2,89 +2,225 @@ use satrs_core::events::EventU32; use satrs_core::hal::host::udp_server::{ReceiveResult, UdpTcServer}; use satrs_core::params::Params; use std::collections::HashMap; +use std::error::Error; +use std::fmt::{Display, Formatter}; use std::net::SocketAddr; -use std::sync::mpsc; -use std::sync::mpsc::Sender; +use std::sync::mpsc::{Receiver, SendError, Sender, TryRecvError}; use std::thread; use std::time::Duration; use crate::ccsds::CcsdsReceiver; use crate::pus::PusReceiver; use crate::requests::RequestWithToken; -use crate::UdpTmtcServer; -use satrs_core::pool::{SharedPool, StoreAddr}; +use satrs_core::pool::{SharedPool, StoreAddr, StoreError}; use satrs_core::pus::event_man::EventRequestWithToken; use satrs_core::pus::verification::StdVerifReporterWithSender; -use satrs_core::tmtc::{CcsdsDistributor, CcsdsError, PusDistributor}; +use satrs_core::tmtc::{ + CcsdsDistributor, CcsdsError, PusServiceProvider, ReceivesCcsdsTc, ReceivesEcssPusTc, +}; +use spacepackets::ecss::PusPacket; +use spacepackets::tc::PusTc; use spacepackets::tm::PusTm; +use spacepackets::SpHeader; pub const PUS_APID: u16 = 0x02; -pub struct CoreTmtcArgs { - pub tm_store: TmStore, - pub tm_sender: Sender, +pub struct OtherArgs { + pub sock_addr: SocketAddr, + pub verif_reporter: StdVerifReporterWithSender, pub event_sender: Sender<(EventU32, Option)>, pub event_request_tx: Sender, pub request_map: HashMap>, } +pub struct TmArgs { + pub tm_store: TmStore, + pub tm_sink_sender: Sender, + pub tm_server_rx: Receiver, +} + +pub struct TcArgs { + pub tc_source: PusTcSource, + pub tc_receiver: Receiver, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum MpscStoreAndSendError { + StoreError(StoreError), + SendError(SendError), +} + +impl Display for MpscStoreAndSendError { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + MpscStoreAndSendError::StoreError(s) => { + write!(f, "store error {}", s) + } + MpscStoreAndSendError::SendError(s) => { + write!(f, "send error {}", s) + } + } + } +} + +impl Error for MpscStoreAndSendError { + fn source(&self) -> Option<&(dyn Error + 'static)> { + match self { + MpscStoreAndSendError::StoreError(s) => Some(s), + MpscStoreAndSendError::SendError(s) => Some(s), + } + } +} + +impl From for MpscStoreAndSendError { + fn from(value: StoreError) -> Self { + Self::StoreError(value) + } +} + +impl From> for MpscStoreAndSendError { + fn from(value: SendError) -> Self { + Self::SendError(value) + } +} + #[derive(Clone)] pub struct TmStore { pub pool: SharedPool, } +#[derive(Clone)] +pub struct TcStore { + pub pool: SharedPool, +} + impl TmStore { pub fn add_pus_tm(&mut self, pus_tm: &PusTm) -> StoreAddr { - let mut pg = self.pool.write().expect("Error locking TM store"); + let mut pg = self.pool.write().expect("error locking TM store"); let (addr, buf) = pg.free_element(pus_tm.len_packed()).expect("Store error"); pus_tm .write_to_bytes(buf) - .expect("Writing PUS TM to store failed"); + .expect("writing PUS TM to store failed"); addr } } -pub fn core_tmtc_task( - args: CoreTmtcArgs, - tm_server_rx: mpsc::Receiver, - addr: SocketAddr, - verif_reporter: StdVerifReporterWithSender, -) { - let pus_receiver = PusReceiver::new( +impl TcStore { + pub fn add_pus_tc(&mut self, pus_tc: &PusTc) -> Result { + let mut pg = self.pool.write().expect("error locking TC store"); + let (addr, buf) = pg.free_element(pus_tc.len_packed())?; + pus_tc + .write_to_bytes(buf) + .expect("writing PUS TC to store failed"); + Ok(addr) + } +} + +pub struct TmFunnel { + pub tm_funnel_rx: Receiver, + pub tm_server_tx: Sender, +} + +pub struct UdpTmtcServer { + udp_tc_server: UdpTcServer>, + tm_rx: Receiver, + tm_store: SharedPool, +} + +#[derive(Clone)] +pub struct PusTcSource { + pub tc_source: Sender, + pub tc_store: TcStore, +} + +impl ReceivesEcssPusTc for PusTcSource { + type Error = MpscStoreAndSendError; + + fn pass_pus_tc(&mut self, _: &SpHeader, pus_tc: &PusTc) -> Result<(), Self::Error> { + let addr = self.tc_store.add_pus_tc(pus_tc)?; + self.tc_source.send(addr)?; + Ok(()) + } +} + +impl ReceivesCcsdsTc for PusTcSource { + type Error = MpscStoreAndSendError; + + fn pass_ccsds(&mut self, _: &SpHeader, tc_raw: &[u8]) -> Result<(), Self::Error> { + let mut pool = self.tc_store.pool.write().expect("locking pool failed"); + let addr = pool.add(tc_raw)?; + drop(pool); + self.tc_source.send(addr)?; + Ok(()) + } +} +pub fn core_tmtc_task(args: OtherArgs, mut tc_args: TcArgs, tm_args: TmArgs) { + let mut pus_receiver = PusReceiver::new( PUS_APID, - args.tm_sender, - args.tm_store.clone(), - verif_reporter, + tm_args.tm_sink_sender, + tm_args.tm_store.clone(), + args.verif_reporter, + tc_args.tc_source.clone(), args.event_request_tx, args.request_map, ); - let pus_distributor = PusDistributor::new(Box::new(pus_receiver)); let ccsds_receiver = CcsdsReceiver { - pus_handler: pus_distributor, + tc_source: tc_args.tc_source.clone(), }; let ccsds_distributor = CcsdsDistributor::new(Box::new(ccsds_receiver)); - let udp_tc_server = UdpTcServer::new(addr, 2048, Box::new(ccsds_distributor)) + let udp_tc_server = UdpTcServer::new(args.sock_addr, 2048, Box::new(ccsds_distributor)) .expect("Creating UDP TMTC server failed"); let mut udp_tmtc_server = UdpTmtcServer { udp_tc_server, - tm_rx: tm_server_rx, - tm_store: args.tm_store.pool.clone(), + tm_rx: tm_args.tm_server_rx, + tm_store: tm_args.tm_store.pool.clone(), }; loop { - core_tmtc_loop(&mut udp_tmtc_server); + core_tmtc_loop(&mut udp_tmtc_server, &mut tc_args, &mut pus_receiver); thread::sleep(Duration::from_millis(400)); } } -fn core_tmtc_loop(udp_tmtc_server: &mut UdpTmtcServer) { - while core_tc_handling(udp_tmtc_server) {} +fn core_tmtc_loop( + udp_tmtc_server: &mut UdpTmtcServer, + tc_args: &mut TcArgs, + pus_receiver: &mut PusReceiver, +) { + while poll_tc_server(udp_tmtc_server) {} + match tc_args.tc_receiver.try_recv() { + Ok(addr) => { + let pool = tc_args + .tc_source + .tc_store + .pool + .read() + .expect("locking tc pool failed"); + let data = pool.read(&addr).expect("reading pool failed"); + match PusTc::from_bytes(data) { + Ok((pus_tc, _)) => { + pus_receiver + .handle_pus_tc_packet(pus_tc.service(), pus_tc.sp_header(), &pus_tc) + .ok(); + } + Err(e) => { + println!("error creating PUS TC from raw data: {}", e); + println!("raw data: {:x?}", data); + } + } + } + Err(e) => { + if let TryRecvError::Disconnected = e { + println!("tmtc thread: sender disconnected") + } + } + } if let Some(recv_addr) = udp_tmtc_server.udp_tc_server.last_sender() { core_tm_handling(udp_tmtc_server, &recv_addr); } } -fn core_tc_handling(udp_tmtc_server: &mut UdpTmtcServer) -> bool { +fn poll_tc_server(udp_tmtc_server: &mut UdpTmtcServer) -> bool { match udp_tmtc_server.udp_tc_server.try_recv_tc() { Ok(_) => true, Err(e) => match e { @@ -112,9 +248,9 @@ fn core_tm_handling(udp_tmtc_server: &mut UdpTmtcServer, recv_addr: &SocketAddr) let mut store_lock = udp_tmtc_server .tm_store .write() - .expect("Locking TM store failed"); + .expect("locking TM store failed"); let pg = store_lock.read_with_guard(addr); - let buf = pg.read().expect("Error reading TM pool data"); + let buf = pg.read().expect("error reading TM pool data"); if buf.len() > 9 { let service = buf[7]; let subservice = buf[8]; @@ -126,6 +262,6 @@ fn core_tm_handling(udp_tmtc_server: &mut UdpTmtcServer, recv_addr: &SocketAddr) .udp_tc_server .socket .send_to(buf, recv_addr) - .expect("Sending TM failed"); + .expect("sending TM failed"); } } diff --git a/spacepackets b/spacepackets index 9b091e3..f8feb02 160000 --- a/spacepackets +++ b/spacepackets @@ -1 +1 @@ -Subproject commit 9b091e3a3a6f599b093c96327751bcf1bc911ca1 +Subproject commit f8feb027dee212702d1609433348784be3cb602b From 27c1a9850cbd6ed31e2196f4e47d5d7f2638d51a Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Thu, 22 Dec 2022 09:15:59 +0100 Subject: [PATCH 4/7] basic HK reply TM packing and handling --- satrs-example/pyclient/main.py | 9 +++++++++ satrs-example/src/main.rs | 23 ++++++++++++++++++++--- 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/satrs-example/pyclient/main.py b/satrs-example/pyclient/main.py index be63b2b..c06f9ed 100755 --- a/satrs-example/pyclient/main.py +++ b/satrs-example/pyclient/main.py @@ -142,6 +142,15 @@ class PusHandler(SpecificApidHandlerBase): self.verif_wrapper.log_to_console(tm_packet, res) self.verif_wrapper.log_to_file(tm_packet, res) dedicated_handler = True + if service == 3: + LOGGER.info("No handling for HK packets implemented") + LOGGER.info(f"Raw packet: 0x[{packet.hex(sep=',')}]") + pus_tm = PusTelemetry.unpack(packet) + if pus_tm.subservice == 25: + if len(pus_tm.source_data) < 8: + raise ValueError("No addressable ID in HK packet") + json_str = pus_tm.source_data[8:] + dedicated_handler = True if service == 5: tm_packet = Service5Tm.unpack(packet) if service == 17: diff --git a/satrs-example/src/main.rs b/satrs-example/src/main.rs index 238c62e..fd3e3bf 100644 --- a/satrs-example/src/main.rs +++ b/satrs-example/src/main.rs @@ -17,6 +17,7 @@ use satrs_core::pus::event_man::{ DefaultPusMgmtBackendProvider, EventReporter, EventRequest, EventRequestWithToken, PusEventDispatcher, }; +use satrs_core::pus::hk::Subservice; use satrs_core::pus::verification::{ MpscVerifSender, VerificationReporterCfg, VerificationReporterWithSender, }; @@ -25,7 +26,8 @@ use satrs_core::seq_count::SimpleSeqCountProvider; use satrs_example::{RequestTargetId, OBSW_SERVER_ADDR, SERVER_PORT}; use spacepackets::time::cds::TimeProvider; use spacepackets::time::TimeWriter; -use spacepackets::tm::PusTm; +use spacepackets::tm::{PusTm, PusTmSecondaryHeader}; +use spacepackets::{SequenceFlags, SpHeader}; use std::collections::HashMap; use std::net::{IpAddr, SocketAddr}; use std::sync::mpsc::{channel, TryRecvError}; @@ -81,11 +83,12 @@ fn main() { let tc_store = TcStore { pool: Arc::new(RwLock::new(Box::new(tc_pool))), }; + let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT); let (tc_source_tx, tc_source_rx) = channel(); let (tm_funnel_tx, tm_funnel_rx) = channel(); let (tm_server_tx, tm_server_rx) = channel(); - let sender = MpscVerifSender::new(tm_store.pool.clone(), tm_funnel_tx.clone()); + let verif_sender = MpscVerifSender::new(tm_store.pool.clone(), tm_funnel_tx.clone()); let verif_cfg = VerificationReporterCfg::new( PUS_APID, #[allow(clippy::box_default)] @@ -95,7 +98,7 @@ fn main() { 8, ) .unwrap(); - let verif_reporter = VerificationReporterWithSender::new(&verif_cfg, Box::new(sender)); + let verif_reporter = VerificationReporterWithSender::new(&verif_cfg, Box::new(verif_sender)); // Create event handling components let (event_request_tx, event_request_rx) = channel::(); @@ -139,6 +142,9 @@ fn main() { tm_server_rx, }; + let aocs_to_funnel = tm_funnel_tx.clone(); + let mut aocs_tm_store = tm_store.clone(); + println!("Starting TMTC task"); let jh0 = thread::spawn(move || { core_tmtc_task(core_args, tc_args, tm_args); @@ -195,6 +201,7 @@ fn main() { .generate_pus_event_tm_generic(&mut sender, ×tamp, event, None) .expect("Sending TM as event failed"); } + thread::sleep(Duration::from_millis(400)); } }); @@ -207,6 +214,16 @@ fn main() { Ok(request) => { println!("ACS thread: Received HK request {:?}", request.0); update_time(&mut time_provider, &mut timestamp); + let mut sp_header = + SpHeader::tc(PUS_APID, SequenceFlags::Unsegmented, 0, 0).unwrap(); + let sec_header = PusTmSecondaryHeader::new_simple( + 3, + Subservice::TmHkPacket as u8, + ×tamp, + ); + let pus_tm = PusTm::new(&mut sp_header, sec_header, None, true); + let addr = aocs_tm_store.add_pus_tm(&pus_tm); + aocs_to_funnel.send(addr).expect("Sending HK TM failed"); let started_token = reporter_aocs .start_success(request.1, ×tamp) .expect("Sending start success failed"); From d69e03a675bb06c5f0cccb3b9dfe3747d6691560 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Thu, 22 Dec 2022 09:16:57 +0100 Subject: [PATCH 5/7] use correct TM ctor --- satrs-example/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/satrs-example/src/main.rs b/satrs-example/src/main.rs index fd3e3bf..51c62ca 100644 --- a/satrs-example/src/main.rs +++ b/satrs-example/src/main.rs @@ -215,7 +215,7 @@ fn main() { println!("ACS thread: Received HK request {:?}", request.0); update_time(&mut time_provider, &mut timestamp); let mut sp_header = - SpHeader::tc(PUS_APID, SequenceFlags::Unsegmented, 0, 0).unwrap(); + SpHeader::tm(PUS_APID, SequenceFlags::Unsegmented, 0, 0).unwrap(); let sec_header = PusTmSecondaryHeader::new_simple( 3, Subservice::TmHkPacket as u8, From 4e450808b747e8a54a3da950b1ddfb05b5d2da3b Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Thu, 22 Dec 2022 09:26:00 +0100 Subject: [PATCH 6/7] always supply addressable ID --- satrs-core/src/tmtc/mod.rs | 12 ++++++++++++ satrs-example/src/hk.rs | 11 ++++++----- satrs-example/src/main.rs | 39 +++++++++++++++++++++++++++----------- satrs-example/src/pus.rs | 7 ++++--- 4 files changed, 50 insertions(+), 19 deletions(-) diff --git a/satrs-core/src/tmtc/mod.rs b/satrs-core/src/tmtc/mod.rs index 6d67887..511099c 100644 --- a/satrs-core/src/tmtc/mod.rs +++ b/satrs-core/src/tmtc/mod.rs @@ -40,6 +40,18 @@ impl AddressableId { unique_id: u32::from_be_bytes(buf[4..8].try_into().unwrap()), }) } + + pub fn write_to_be_bytes(&self, buf: &mut [u8]) -> Result { + if buf.len() < 8 { + return Err(ByteConversionError::ToSliceTooSmall(SizeMissmatch { + found: buf.len(), + expected: 8, + })); + } + buf[0..4].copy_from_slice(&self.target_id.to_be_bytes()); + buf[4..8].copy_from_slice(&self.unique_id.to_be_bytes()); + Ok(8) + } } /// Generic trait for object which can receive any telecommands in form of a raw bytestream, with diff --git a/satrs-example/src/hk.rs b/satrs-example/src/hk.rs index 96cd37b..d27c0f0 100644 --- a/satrs-example/src/hk.rs +++ b/satrs-example/src/hk.rs @@ -1,4 +1,5 @@ -#![allow(dead_code)] +use satrs_core::tmtc::AddressableId; + pub type CollectionIntervalFactor = u32; #[derive(Debug, Copy, Clone, PartialEq, Eq)] @@ -8,8 +9,8 @@ pub enum AcsHkIds { #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub enum HkRequest { - OneShot(u32), - Enable(u32), - Disable(u32), - ModifyCollectionInterval(CollectionIntervalFactor), + OneShot(AddressableId), + Enable(AddressableId), + Disable(AddressableId), + ModifyCollectionInterval(AddressableId, CollectionIntervalFactor), } diff --git a/satrs-example/src/main.rs b/satrs-example/src/main.rs index 51c62ca..eecb858 100644 --- a/satrs-example/src/main.rs +++ b/satrs-example/src/main.rs @@ -4,7 +4,8 @@ mod pus; mod requests; mod tmtc; -use crate::requests::RequestWithToken; +use crate::hk::{AcsHkIds, HkRequest}; +use crate::requests::{Request, RequestWithToken}; use crate::tmtc::{ core_tmtc_task, OtherArgs, PusTcSource, TcArgs, TcStore, TmArgs, TmFunnel, TmStore, PUS_APID, }; @@ -214,16 +215,32 @@ fn main() { Ok(request) => { println!("ACS thread: Received HK request {:?}", request.0); update_time(&mut time_provider, &mut timestamp); - let mut sp_header = - SpHeader::tm(PUS_APID, SequenceFlags::Unsegmented, 0, 0).unwrap(); - let sec_header = PusTmSecondaryHeader::new_simple( - 3, - Subservice::TmHkPacket as u8, - ×tamp, - ); - let pus_tm = PusTm::new(&mut sp_header, sec_header, None, true); - let addr = aocs_tm_store.add_pus_tm(&pus_tm); - aocs_to_funnel.send(addr).expect("Sending HK TM failed"); + match request.0 { + Request::HkRequest(hk_req) => match hk_req { + HkRequest::OneShot(one_shot_req) => { + assert_eq!( + one_shot_req.unique_id, + RequestTargetId::AcsSubsystem as u32 + ); + if one_shot_req.unique_id == AcsHkIds::TestMgmSet as u32 { + let mut sp_header = + SpHeader::tm(PUS_APID, SequenceFlags::Unsegmented, 0, 0) + .unwrap(); + let sec_header = PusTmSecondaryHeader::new_simple( + 3, + Subservice::TmHkPacket as u8, + ×tamp, + ); + let pus_tm = PusTm::new(&mut sp_header, sec_header, None, true); + let addr = aocs_tm_store.add_pus_tm(&pus_tm); + aocs_to_funnel.send(addr).expect("Sending HK TM failed"); + } + } + HkRequest::Enable(_) => {} + HkRequest::Disable(_) => {} + HkRequest::ModifyCollectionInterval(_, _) => {} + }, + } let started_token = reporter_aocs .start_success(request.1, ×tamp) .expect("Sending start success failed"); diff --git a/satrs-example/src/pus.rs b/satrs-example/src/pus.rs index d0480f4..c7e1e40 100644 --- a/satrs-example/src/pus.rs +++ b/satrs-example/src/pus.rs @@ -171,11 +171,11 @@ impl PusReceiver { .unwrap_or_else(|_| panic!("Sending HK request {:?} failed", request)); }; if PusPacket::subservice(pus_tc) == hk::Subservice::TcEnableGeneration as u8 { - send_request(HkRequest::Enable(addressable_id.unique_id)); + send_request(HkRequest::Enable(addressable_id)); } else if PusPacket::subservice(pus_tc) == hk::Subservice::TcDisableGeneration as u8 { - send_request(HkRequest::Disable(addressable_id.unique_id)); + send_request(HkRequest::Disable(addressable_id)); } else if PusPacket::subservice(pus_tc) == hk::Subservice::TcGenerateOneShotHk as u8 { - send_request(HkRequest::OneShot(addressable_id.unique_id)); + send_request(HkRequest::OneShot(addressable_id)); } else if PusPacket::subservice(pus_tc) == hk::Subservice::TcModifyCollectionInterval as u8 { if user_data.len() < 12 { @@ -193,6 +193,7 @@ impl PusReceiver { return; } send_request(HkRequest::ModifyCollectionInterval( + addressable_id, CollectionIntervalFactor::from_be_bytes(user_data[8..12].try_into().unwrap()), )); } From e8706f3c690d42550ad8e2b15f6bac9094023689 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Thu, 22 Dec 2022 10:26:49 +0100 Subject: [PATCH 7/7] send addressable ID --- satrs-example/src/main.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/satrs-example/src/main.rs b/satrs-example/src/main.rs index eecb858..14655b5 100644 --- a/satrs-example/src/main.rs +++ b/satrs-example/src/main.rs @@ -217,12 +217,9 @@ fn main() { update_time(&mut time_provider, &mut timestamp); match request.0 { Request::HkRequest(hk_req) => match hk_req { - HkRequest::OneShot(one_shot_req) => { - assert_eq!( - one_shot_req.unique_id, - RequestTargetId::AcsSubsystem as u32 - ); - if one_shot_req.unique_id == AcsHkIds::TestMgmSet as u32 { + HkRequest::OneShot(address) => { + assert_eq!(address.target_id, RequestTargetId::AcsSubsystem as u32); + if address.unique_id == AcsHkIds::TestMgmSet as u32 { let mut sp_header = SpHeader::tm(PUS_APID, SequenceFlags::Unsegmented, 0, 0) .unwrap(); @@ -231,7 +228,10 @@ fn main() { Subservice::TmHkPacket as u8, ×tamp, ); - let pus_tm = PusTm::new(&mut sp_header, sec_header, None, true); + let mut buf: [u8; 8] = [0; 8]; + address.write_to_be_bytes(&mut buf).unwrap(); + let pus_tm = + PusTm::new(&mut sp_header, sec_header, Some(&buf), true); let addr = aocs_tm_store.add_pus_tm(&pus_tm); aocs_to_funnel.send(addr).expect("Sending HK TM failed"); }