diff --git a/satrs-book/src/SUMMARY.md b/satrs-book/src/SUMMARY.md index 0dfb982..fa0ca97 100644 --- a/satrs-book/src/SUMMARY.md +++ b/satrs-book/src/SUMMARY.md @@ -2,9 +2,16 @@ - [Introduction](./introduction.md) - [Design](./design.md) + +# Basic concepts and components + - [Communication with Space Systems](./communication.md) - [Working with Constrained Systems](./constrained-systems.md) - [Actions](./actions.md) - [Modes and Health](./modes-and-health.md) - [Housekeeping Data](./housekeeping.md) - [Events](./events.md) + +# Example project + +- [The satrs-example application](./example.md) diff --git a/satrs-book/src/example.md b/satrs-book/src/example.md new file mode 100644 index 0000000..db49cac --- /dev/null +++ b/satrs-book/src/example.md @@ -0,0 +1,30 @@ +# sat-rs Example Application + +The `sat-rs` framework includes a monolithic example application which can be found inside +the [`satrs-example`](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-example) +subdirectory of the repository. The primary purpose of this example application is to show how +the various components of the sat-rs framework could be used as part of a larger on-board +software application. + +## Structure of the example project + +The example project contains components which could also be expected to be part of a production +On-Board Software. + +1. A UDP and TCP server to receive telecommands and poll telemetry from. This might be an optional + component for an OBSW which is only used during the development phase on ground. The TCP + server parses space packets by using the CCSDS space packet ID as the packet start delimiter. +2. A PUS service stack which exposes some functionality conformant with the ECSS PUS service. This + currently includes the following services: + - Service 1 for telecommand verification. + - Service 3 for housekeeping telemetry handling. + - Service 5 for management and downlink of on-board events. + - Service 8 for handling on-board actions. + - Service 11 for scheduling telecommands to be released at a specific time. + - Service 17 for test purposes (pings) +3. An event manager component which handles the event IPC mechanism. +4. A TC source component which demultiplexes and routes telecommands based on parameters like + packet APID or PUS service and subservice type. +5. A TM sink sink component which is the target of all sent telemetry and sends it to downlink + handlers like the UDP and TCP server. +6. An AOCS example task which can also process some PUS commands. diff --git a/satrs-core/Cargo.toml b/satrs-core/Cargo.toml index 488966c..e07b7a6 100644 --- a/satrs-core/Cargo.toml +++ b/satrs-core/Cargo.toml @@ -73,10 +73,11 @@ features = ["all"] optional = true [dependencies.spacepackets] -version = "0.8.1" +version = "0.9.0" default-features = false # git = "https://egit.irs.uni-stuttgart.de/rust/spacepackets.git" # rev = "297cfad22637d3b07a1b27abe56d9a607b5b82a7" +# branch = "main" [dependencies.cobs] git = "https://github.com/robamu/cobs.rs.git" diff --git a/satrs-core/src/executable.rs b/satrs-core/src/executable.rs index 77ed178..033876f 100644 --- a/satrs-core/src/executable.rs +++ b/satrs-core/src/executable.rs @@ -1,13 +1,13 @@ //! Task scheduling module +use alloc::string::String; use bus::BusReader; use std::boxed::Box; -use std::error::Error; use std::sync::mpsc::TryRecvError; -use std::thread; use std::thread::JoinHandle; use std::time::Duration; use std::vec; use std::vec::Vec; +use std::{io, thread}; #[derive(Debug, PartialEq, Eq)] pub enum OpResult { @@ -34,47 +34,49 @@ pub trait Executable: Send { /// # Arguments /// /// * `executable`: Executable task -/// * `task_freq`: Optional frequency of task. Required for periodic and fixed cycle tasks -/// * `op_code`: Operation code which is passed to the executable task [operation call][Executable::periodic_op] +/// * `task_freq`: Optional frequency of task. Required for periodic and fixed cycle tasks. +/// If [None] is passed, no sleeping will be performed. +/// * `op_code`: Operation code which is passed to the executable task +/// [operation call][Executable::periodic_op] /// * `termination`: Optional termination handler which can cancel threads with a broadcast -pub fn exec_sched_single< - T: Executable + Send + 'static + ?Sized, - E: Error + Send + 'static, ->( +pub fn exec_sched_single + Send + 'static + ?Sized, E: Send + 'static>( mut executable: Box, task_freq: Option, op_code: i32, mut termination: Option>, -) -> JoinHandle> { +) -> Result>, io::Error> { let mut cycle_count = 0; - thread::spawn(move || loop { - if let Some(ref mut terminator) = termination { - match terminator.try_recv() { - Ok(_) | Err(TryRecvError::Disconnected) => { - return Ok(OpResult::Ok); - } - Err(TryRecvError::Empty) => (), - } - } - match executable.exec_type() { - ExecutionType::OneShot => { - executable.periodic_op(op_code)?; - return Ok(OpResult::Ok); - } - ExecutionType::Infinite => { - executable.periodic_op(op_code)?; - } - ExecutionType::Cycles(cycles) => { - executable.periodic_op(op_code)?; - cycle_count += 1; - if cycle_count == cycles { - return Ok(OpResult::Ok); + thread::Builder::new() + .name(String::from(executable.task_name())) + .spawn(move || loop { + if let Some(ref mut terminator) = termination { + match terminator.try_recv() { + Ok(_) | Err(TryRecvError::Disconnected) => { + return Ok(OpResult::Ok); + } + Err(TryRecvError::Empty) => (), } } - } - let freq = task_freq.unwrap_or_else(|| panic!("No task frequency specified")); - thread::sleep(freq); - }) + match executable.exec_type() { + ExecutionType::OneShot => { + executable.periodic_op(op_code)?; + return Ok(OpResult::Ok); + } + ExecutionType::Infinite => { + executable.periodic_op(op_code)?; + } + ExecutionType::Cycles(cycles) => { + executable.periodic_op(op_code)?; + cycle_count += 1; + if cycle_count == cycles { + return Ok(OpResult::Ok); + } + } + } + if let Some(freq) = task_freq { + thread::sleep(freq); + } + }) } /// This function allows executing multiple tasks as long as the tasks implement the @@ -86,55 +88,56 @@ pub fn exec_sched_single< /// * `task_freq`: Optional frequency of task. Required for periodic and fixed cycle tasks /// * `op_code`: Operation code which is passed to the executable task [operation call][Executable::periodic_op] /// * `termination`: Optional termination handler which can cancel threads with a broadcast -pub fn exec_sched_multi< - T: Executable + Send + 'static + ?Sized, - E: Error + Send + 'static, ->( +pub fn exec_sched_multi + Send + 'static + ?Sized, E: Send + 'static>( + task_name: &'static str, mut executable_vec: Vec>, task_freq: Option, op_code: i32, mut termination: Option>, -) -> JoinHandle> { +) -> Result>, io::Error> { let mut cycle_counts = vec![0; executable_vec.len()]; let mut removal_flags = vec![false; executable_vec.len()]; - thread::spawn(move || loop { - if let Some(ref mut terminator) = termination { - match terminator.try_recv() { - Ok(_) | Err(TryRecvError::Disconnected) => { - removal_flags.iter_mut().for_each(|x| *x = true); + + thread::Builder::new() + .name(String::from(task_name)) + .spawn(move || loop { + if let Some(ref mut terminator) = termination { + match terminator.try_recv() { + Ok(_) | Err(TryRecvError::Disconnected) => { + removal_flags.iter_mut().for_each(|x| *x = true); + } + Err(TryRecvError::Empty) => (), } - Err(TryRecvError::Empty) => (), } - } - for (idx, executable) in executable_vec.iter_mut().enumerate() { - match executable.exec_type() { - ExecutionType::OneShot => { - executable.periodic_op(op_code)?; - removal_flags[idx] = true; - } - ExecutionType::Infinite => { - executable.periodic_op(op_code)?; - } - ExecutionType::Cycles(cycles) => { - executable.periodic_op(op_code)?; - cycle_counts[idx] += 1; - if cycle_counts[idx] == cycles { + for (idx, executable) in executable_vec.iter_mut().enumerate() { + match executable.exec_type() { + ExecutionType::OneShot => { + executable.periodic_op(op_code)?; removal_flags[idx] = true; } + ExecutionType::Infinite => { + executable.periodic_op(op_code)?; + } + ExecutionType::Cycles(cycles) => { + executable.periodic_op(op_code)?; + cycle_counts[idx] += 1; + if cycle_counts[idx] == cycles { + removal_flags[idx] = true; + } + } } } - } - let mut removal_iter = removal_flags.iter(); - executable_vec.retain(|_| !*removal_iter.next().unwrap()); - removal_iter = removal_flags.iter(); - cycle_counts.retain(|_| !*removal_iter.next().unwrap()); - removal_flags.retain(|&i| !i); - if executable_vec.is_empty() { - return Ok(OpResult::Ok); - } - let freq = task_freq.unwrap_or_else(|| panic!("No task frequency specified")); - thread::sleep(freq); - }) + let mut removal_iter = removal_flags.iter(); + executable_vec.retain(|_| !*removal_iter.next().unwrap()); + removal_iter = removal_flags.iter(); + cycle_counts.retain(|_| !*removal_iter.next().unwrap()); + removal_flags.retain(|&i| !i); + if executable_vec.is_empty() { + return Ok(OpResult::Ok); + } + let freq = task_freq.unwrap_or_else(|| panic!("No task frequency specified")); + thread::sleep(freq); + }) } #[cfg(test)] @@ -294,7 +297,8 @@ mod tests { Some(Duration::from_millis(100)), expected_op_code, None, - ); + ) + .expect("thread creation failed"); let thread_res = jhandle.join().expect("One Shot Task failed"); assert!(thread_res.is_ok()); assert_eq!(thread_res.unwrap(), OpResult::Ok); @@ -319,7 +323,8 @@ mod tests { Some(Duration::from_millis(100)), op_code_inducing_failure, None, - ); + ) + .expect("thread creation failed"); let thread_res = jhandle.join().expect("One Shot Task failed"); assert!(thread_res.is_err()); let error = thread_res.unwrap_err(); @@ -356,11 +361,13 @@ mod tests { assert_eq!(task.task_name(), ONE_SHOT_TASK_NAME); } let jhandle = exec_sched_multi( + "multi-task-name", task_vec, Some(Duration::from_millis(100)), expected_op_code, None, - ); + ) + .expect("thread creation failed"); let thread_res = jhandle.join().expect("One Shot Task failed"); assert!(thread_res.is_ok()); assert_eq!(thread_res.unwrap(), OpResult::Ok); @@ -386,7 +393,8 @@ mod tests { Some(Duration::from_millis(100)), expected_op_code, None, - ); + ) + .expect("thread creation failed"); let thread_res = jh.join().expect("Cycles Task failed"); assert!(thread_res.is_ok()); let data = shared.lock().expect("Locking Mutex failed"); @@ -418,11 +426,13 @@ mod tests { let task_vec: Vec>> = vec![one_shot_task, cycled_task_0, cycled_task_1]; let jh = exec_sched_multi( + "multi-task-name", task_vec, Some(Duration::from_millis(100)), expected_op_code, None, - ); + ) + .expect("thread creation failed"); let thread_res = jh.join().expect("Cycles Task failed"); assert!(thread_res.is_ok()); let data = shared.lock().expect("Locking Mutex failed"); @@ -449,7 +459,8 @@ mod tests { Some(Duration::from_millis(20)), expected_op_code, Some(terminator.add_rx()), - ); + ) + .expect("thread creation failed"); thread::sleep(Duration::from_millis(40)); terminator.broadcast(()); let thread_res = jh.join().expect("Periodic Task failed"); @@ -485,11 +496,13 @@ mod tests { let task_vec: Vec>> = vec![cycled_task, periodic_task_0, periodic_task_1]; let jh = exec_sched_multi( + "multi-task-name", task_vec, Some(Duration::from_millis(20)), expected_op_code, Some(terminator.add_rx()), - ); + ) + .expect("thread creation failed"); thread::sleep(Duration::from_millis(60)); terminator.broadcast(()); let thread_res = jh.join().expect("Periodic Task failed"); diff --git a/satrs-core/src/pus/mod.rs b/satrs-core/src/pus/mod.rs index 7c721ee..5c62375 100644 --- a/satrs-core/src/pus/mod.rs +++ b/satrs-core/src/pus/mod.rs @@ -318,10 +318,31 @@ mod alloc_mod { /// [Clone]. #[cfg(feature = "alloc")] #[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))] - pub trait EcssTmSender: EcssTmSenderCore + Downcast + DynClone {} + pub trait EcssTmSender: EcssTmSenderCore + Downcast + DynClone { + // Remove this once trait upcasting coercion has been implemented. + // Tracking issue: https://github.com/rust-lang/rust/issues/65991 + fn upcast(&self) -> &dyn EcssTmSenderCore; + // Remove this once trait upcasting coercion has been implemented. + // Tracking issue: https://github.com/rust-lang/rust/issues/65991 + fn upcast_mut(&mut self) -> &mut dyn EcssTmSenderCore; + } /// Blanket implementation for all types which implement [EcssTmSenderCore] and are clonable. - impl EcssTmSender for T where T: EcssTmSenderCore + Clone + 'static {} + impl EcssTmSender for T + where + T: EcssTmSenderCore + Clone + 'static, + { + // Remove this once trait upcasting coercion has been implemented. + // Tracking issue: https://github.com/rust-lang/rust/issues/65991 + fn upcast(&self) -> &dyn EcssTmSenderCore { + self + } + // Remove this once trait upcasting coercion has been implemented. + // Tracking issue: https://github.com/rust-lang/rust/issues/65991 + fn upcast_mut(&mut self) -> &mut dyn EcssTmSenderCore { + self + } + } dyn_clone::clone_trait_object!(EcssTmSender); impl_downcast!(EcssTmSender); diff --git a/satrs-example/Cargo.toml b/satrs-example/Cargo.toml index c668cc5..e479656 100644 --- a/satrs-example/Cargo.toml +++ b/satrs-example/Cargo.toml @@ -26,3 +26,7 @@ path = "../satrs-core" [dependencies.satrs-mib] # version = "0.1.0-alpha.1" path = "../satrs-mib" + +[features] +dyn_tmtc = [] +default = ["dyn_tmtc"] diff --git a/satrs-example/README.md b/satrs-example/README.md index da5551f..3447a0d 100644 --- a/satrs-example/README.md +++ b/satrs-example/README.md @@ -3,10 +3,32 @@ sat-rs example This crate contains an example application which simulates an on-board software. It uses various components provided by the sat-rs framework to do this. As such, it shows how -a more complex real on-board software could be built from these components. -The application opens a UDP server on port 7301 to receive telecommands. +a more complex real on-board software could be built from these components. It is recommended to +read the dedicated +[example chapters](https://absatsw.irs.uni-stuttgart.de/projects/sat-rs/book/example.html) inside +the sat-rs book. -You can run the application using `cargo run`. The `simpleclient` binary target sends a +The application opens a UDP and a TCP server on port 7301 to receive telecommands. + +You can run the application using `cargo run`. + +# Features + +The example has the `dyn_tmtc` feature which is enabled by default. With this feature enabled, +TMTC packets are exchanged using the heap as the backing memory instead of pre-allocated static +stores. + +You can run the application without this feature using + +```sh +cargo run --no-default-features +``` + +# Interacting with the sat-rs example + +## Simple Client + +The `simpleclient` binary target sends a ping telecommand and then verifies the telemetry generated by the example application. It can be run like this: @@ -17,7 +39,7 @@ cargo run --bin simpleclient This repository also contains a more complex client using the [Python tmtccmd](https://github.com/robamu-org/tmtccmd) module. -# Using the tmtccmd Python client +## Using the tmtccmd Python client The python client requires a valid installation of the [tmtccmd package](https://github.com/robamu-org/tmtccmd). @@ -46,31 +68,7 @@ as Python code. For example, you can use the following command to send a ping li the `simpleclient`: ```sh -./main.py -s test -o ping +./main.py -p /test/ping ``` -You can also simply call the script without any arguments to view a list of services (`-s` flag) -and corresponding op codes (`-o` flag) for each service. - -# Structure of the example project - -The example project contains components which could also be expected to be part of a production -On-Board Software. - -1. A UDP and TCP server to receive telecommands and poll telemetry from. This might be an optional - component for an OBSW which is only used during the development phase on ground. The TCP - server parses space packets by using the CCSDS space packet ID as the packet start delimiter. -2. A PUS service stack which exposes some functionality conformant with the ECSS PUS service. This - currently includes the following services: - - Service 1 for telecommand verification. - - Service 3 for housekeeping telemetry handling. - - Service 5 for management and downlink of on-board events. - - Service 8 for handling on-board actions. - - Service 11 for scheduling telecommands to be released at a specific time. - - Service 17 for test purposes (pings) -3. An event manager component which handles the event IPC mechanism. -4. A TC source component which demultiplexes and routes telecommands based on parameters like - packet APID or PUS service and subservice type. -5. A TM sink sink component which is the target of all sent telemetry and sends it to downlink - handlers like the UDP and TCP server. -6. An AOCS example task which can also process some PUS commands. +You can also simply call the script without any arguments to view the command tree. diff --git a/satrs-example/src/acs.rs b/satrs-example/src/acs.rs new file mode 100644 index 0000000..9b4b3ee --- /dev/null +++ b/satrs-example/src/acs.rs @@ -0,0 +1,117 @@ +use std::sync::mpsc::{self, TryRecvError}; + +use log::{info, warn}; +use satrs_core::pus::verification::VerificationReporterWithSender; +use satrs_core::pus::{EcssTmSender, PusTmWrapper}; +use satrs_core::spacepackets::ecss::hk::Subservice as HkSubservice; +use satrs_core::{ + hk::HkRequest, + spacepackets::{ + ecss::tm::{PusTmCreator, PusTmSecondaryHeader}, + time::cds::{DaysLen16Bits, TimeProvider}, + SequenceFlags, SpHeader, + }, +}; +use satrs_example::config::{RequestTargetId, PUS_APID}; + +use crate::{ + hk::{AcsHkIds, HkUniqueId}, + requests::{Request, RequestWithToken}, + update_time, +}; + +pub struct AcsTask { + timestamp: [u8; 7], + time_provider: TimeProvider, + verif_reporter: VerificationReporterWithSender, + tm_sender: Box, + request_rx: mpsc::Receiver, +} + +impl AcsTask { + pub fn new( + tm_sender: impl EcssTmSender, + request_rx: mpsc::Receiver, + verif_reporter: VerificationReporterWithSender, + ) -> Self { + Self { + timestamp: [0; 7], + time_provider: TimeProvider::new_with_u16_days(0, 0), + verif_reporter, + tm_sender: Box::new(tm_sender), + request_rx, + } + } + + fn handle_hk_request(&mut self, target_id: u32, unique_id: u32) { + assert_eq!(target_id, RequestTargetId::AcsSubsystem as u32); + if unique_id == AcsHkIds::TestMgmSet as u32 { + let mut sp_header = SpHeader::tm(PUS_APID, SequenceFlags::Unsegmented, 0, 0).unwrap(); + let sec_header = PusTmSecondaryHeader::new_simple( + 3, + HkSubservice::TmHkPacket as u8, + &self.timestamp, + ); + let mut buf: [u8; 8] = [0; 8]; + let hk_id = HkUniqueId::new(target_id, unique_id); + hk_id.write_to_be_bytes(&mut buf).unwrap(); + let pus_tm = PusTmCreator::new(&mut sp_header, sec_header, &buf, true); + self.tm_sender + .send_tm(PusTmWrapper::Direct(pus_tm)) + .expect("Sending HK TM failed"); + } + // TODO: Verification failure for invalid unique IDs. + } + + pub fn try_reading_one_request(&mut self) -> bool { + match self.request_rx.try_recv() { + Ok(request) => { + info!( + "ACS thread: Received HK request {:?}", + request.targeted_request + ); + match request.targeted_request.request { + Request::Hk(hk_req) => match hk_req { + HkRequest::OneShot(unique_id) => self.handle_hk_request( + request.targeted_request.target_id_with_apid.target_id(), + unique_id, + ), + HkRequest::Enable(_) => {} + HkRequest::Disable(_) => {} + HkRequest::ModifyCollectionInterval(_, _) => {} + }, + Request::Mode(_mode_req) => { + warn!("mode request handling not implemented yet") + } + Request::Action(_action_req) => { + warn!("action request handling not implemented yet") + } + } + let started_token = self + .verif_reporter + .start_success(request.token, Some(&self.timestamp)) + .expect("Sending start success failed"); + self.verif_reporter + .completion_success(started_token, Some(&self.timestamp)) + .expect("Sending completion success failed"); + true + } + Err(e) => match e { + TryRecvError::Empty => false, + TryRecvError::Disconnected => { + warn!("ACS thread: Message Queue TX disconnected!"); + false + } + }, + } + } + + pub fn periodic_operation(&mut self) { + update_time(&mut self.time_provider, &mut self.timestamp); + loop { + if !self.try_reading_one_request() { + break; + } + } + } +} diff --git a/satrs-example/src/bin/simpleclient.rs b/satrs-example/src/bin/simpleclient.rs index 4d0dab7..c64aa04 100644 --- a/satrs-example/src/bin/simpleclient.rs +++ b/satrs-example/src/bin/simpleclient.rs @@ -5,7 +5,7 @@ use satrs_core::{ spacepackets::ecss::{PusPacket, WritablePusPacket}, spacepackets::SpHeader, }; -use satrs_example::{OBSW_SERVER_ADDR, SERVER_PORT}; +use satrs_example::config::{OBSW_SERVER_ADDR, SERVER_PORT}; use std::net::{IpAddr, SocketAddr, UdpSocket}; use std::time::Duration; diff --git a/satrs-example/src/ccsds.rs b/satrs-example/src/ccsds.rs index d0616c9..0706912 100644 --- a/satrs-example/src/ccsds.rs +++ b/satrs-example/src/ccsds.rs @@ -1,15 +1,22 @@ -use crate::tmtc::{MpscStoreAndSendError, PusTcSource}; +use satrs_core::pus::ReceivesEcssPusTc; use satrs_core::spacepackets::{CcsdsPacket, SpHeader}; use satrs_core::tmtc::{CcsdsPacketHandler, ReceivesCcsdsTc}; -use satrs_example::PUS_APID; +use satrs_example::config::PUS_APID; #[derive(Clone)] -pub struct CcsdsReceiver { - pub tc_source: PusTcSource, +pub struct CcsdsReceiver< + TcSource: ReceivesCcsdsTc + ReceivesEcssPusTc + Clone, + E, +> { + pub tc_source: TcSource, } -impl CcsdsPacketHandler for CcsdsReceiver { - type Error = MpscStoreAndSendError; +impl< + TcSource: ReceivesCcsdsTc + ReceivesEcssPusTc + Clone + 'static, + E: 'static, + > CcsdsPacketHandler for CcsdsReceiver +{ + type Error = E; fn valid_apids(&self) -> &'static [u16] { &[PUS_APID] diff --git a/satrs-example/src/config.rs b/satrs-example/src/config.rs new file mode 100644 index 0000000..a2fabd8 --- /dev/null +++ b/satrs-example/src/config.rs @@ -0,0 +1,142 @@ +use satrs_core::res_code::ResultU16; +use satrs_mib::res_code::ResultU16Info; +use satrs_mib::resultcode; +use std::net::Ipv4Addr; + +use num_enum::{IntoPrimitive, TryFromPrimitive}; +use satrs_core::{ + events::{EventU32TypedSev, SeverityInfo}, + pool::{StaticMemoryPool, StaticPoolConfig}, +}; + +pub const PUS_APID: u16 = 0x02; + +#[derive(Copy, Clone, PartialEq, Eq, Debug, TryFromPrimitive, IntoPrimitive)] +#[repr(u8)] +pub enum CustomPusServiceId { + Mode = 200, + Health = 201, +} + +#[derive(Copy, Clone, Eq, PartialEq, Debug)] +pub enum RequestTargetId { + AcsSubsystem = 1, +} + +pub const AOCS_APID: u16 = 1; + +#[derive(Debug)] +pub enum GroupId { + Tmtc = 0, + Hk = 1, +} + +pub const OBSW_SERVER_ADDR: Ipv4Addr = Ipv4Addr::UNSPECIFIED; +pub const SERVER_PORT: u16 = 7301; + +pub const TEST_EVENT: EventU32TypedSev = + EventU32TypedSev::::const_new(0, 0); + +pub mod tmtc_err { + + use super::*; + + #[resultcode] + pub const INVALID_PUS_SERVICE: ResultU16 = ResultU16::const_new(GroupId::Tmtc as u8, 0); + #[resultcode] + pub const INVALID_PUS_SUBSERVICE: ResultU16 = ResultU16::const_new(GroupId::Tmtc as u8, 1); + #[resultcode] + pub const PUS_SERVICE_NOT_IMPLEMENTED: ResultU16 = ResultU16::const_new(GroupId::Tmtc as u8, 2); + #[resultcode] + pub const UNKNOWN_TARGET_ID: ResultU16 = ResultU16::const_new(GroupId::Tmtc as u8, 3); + + #[resultcode( + info = "Not enough data inside the TC application data field. Optionally includes: \ + 8 bytes of failure data containing 2 failure parameters, \ + P1 (u32 big endian): Expected data length, P2: Found data length" + )] + pub const NOT_ENOUGH_APP_DATA: ResultU16 = ResultU16::const_new(GroupId::Tmtc as u8, 2); + + pub const TMTC_RESULTS: &[ResultU16Info] = &[ + INVALID_PUS_SERVICE_EXT, + INVALID_PUS_SUBSERVICE_EXT, + NOT_ENOUGH_APP_DATA_EXT, + ]; +} + +pub mod hk_err { + + use super::*; + + #[resultcode] + pub const TARGET_ID_MISSING: ResultU16 = ResultU16::const_new(GroupId::Hk as u8, 0); + #[resultcode] + pub const UNIQUE_ID_MISSING: ResultU16 = ResultU16::const_new(GroupId::Hk as u8, 1); + #[resultcode] + pub const UNKNOWN_TARGET_ID: ResultU16 = ResultU16::const_new(GroupId::Hk as u8, 2); + #[resultcode] + pub const COLLECTION_INTERVAL_MISSING: ResultU16 = ResultU16::const_new(GroupId::Hk as u8, 3); +} + +#[allow(clippy::enum_variant_names)] +#[derive(Copy, Clone, PartialEq, Eq)] +pub enum TmSenderId { + PusVerification = 0, + PusTest = 1, + PusEvent = 2, + PusHk = 3, + PusAction = 4, + PusSched = 5, + AllEvents = 6, + AcsSubsystem = 7, +} + +#[derive(Copy, Clone, PartialEq, Eq)] +pub enum TcReceiverId { + PusTest = 1, + PusEvent = 2, + PusHk = 3, + PusAction = 4, + PusSched = 5, +} +pub mod pool { + use super::*; + pub fn create_static_pools() -> (StaticMemoryPool, StaticMemoryPool) { + ( + StaticMemoryPool::new(StaticPoolConfig::new(vec![ + (30, 32), + (15, 64), + (15, 128), + (15, 256), + (15, 1024), + (15, 2048), + ])), + StaticMemoryPool::new(StaticPoolConfig::new(vec![ + (30, 32), + (15, 64), + (15, 128), + (15, 256), + (15, 1024), + (15, 2048), + ])), + ) + } + + pub fn create_sched_tc_pool() -> StaticMemoryPool { + StaticMemoryPool::new(StaticPoolConfig::new(vec![ + (30, 32), + (15, 64), + (15, 128), + (15, 256), + (15, 1024), + (15, 2048), + ])) + } +} + +pub mod tasks { + pub const FREQ_MS_UDP_TMTC: u64 = 200; + pub const FREQ_MS_EVENT_HANDLING: u64 = 400; + pub const FREQ_MS_AOCS: u64 = 500; + pub const FREQ_MS_PUS_STACK: u64 = 200; +} diff --git a/satrs-example/src/events.rs b/satrs-example/src/events.rs new file mode 100644 index 0000000..aa2369b --- /dev/null +++ b/satrs-example/src/events.rs @@ -0,0 +1,187 @@ +use std::sync::mpsc::{self, SendError}; + +use satrs_core::{ + event_man::{ + EventManager, EventManagerWithMpscQueue, MpscEventReceiver, MpscEventU32SendProvider, + SendEventProvider, + }, + events::EventU32, + params::Params, + pus::{ + event_man::{ + DefaultPusMgmtBackendProvider, EventReporter, EventRequest, EventRequestWithToken, + PusEventDispatcher, + }, + verification::{TcStateStarted, VerificationReporterWithSender, VerificationToken}, + EcssTmSender, + }, + spacepackets::time::cds::{self, TimeProvider}, +}; +use satrs_example::config::PUS_APID; + +use crate::update_time; + +pub type MpscEventManager = EventManager)>>; + +pub struct PusEventHandler { + event_request_rx: mpsc::Receiver, + pus_event_dispatcher: PusEventDispatcher<(), EventU32>, + pus_event_man_rx: mpsc::Receiver<(EventU32, Option)>, + tm_sender: Box, + time_provider: TimeProvider, + timestamp: [u8; 7], + verif_handler: VerificationReporterWithSender, +} +/* +*/ + +impl PusEventHandler { + pub fn new( + verif_handler: VerificationReporterWithSender, + event_manager: &mut MpscEventManager, + event_request_rx: mpsc::Receiver, + tm_sender: impl EcssTmSender, + ) -> Self { + let (pus_event_man_tx, pus_event_man_rx) = mpsc::channel(); + + // All events sent to the manager are routed to the PUS event manager, which generates PUS event + // telemetry for each event. + let event_reporter = EventReporter::new(PUS_APID, 128).unwrap(); + let pus_tm_backend = DefaultPusMgmtBackendProvider::::default(); + let pus_event_dispatcher = + PusEventDispatcher::new(event_reporter, Box::new(pus_tm_backend)); + let pus_event_man_send_provider = MpscEventU32SendProvider::new(1, pus_event_man_tx); + + event_manager.subscribe_all(pus_event_man_send_provider.id()); + event_manager.add_sender(pus_event_man_send_provider); + + Self { + event_request_rx, + pus_event_dispatcher, + pus_event_man_rx, + time_provider: cds::TimeProvider::new_with_u16_days(0, 0), + timestamp: [0; 7], + verif_handler, + tm_sender: Box::new(tm_sender), + } + } + + pub fn handle_event_requests(&mut self) { + let report_completion = |event_req: EventRequestWithToken, timestamp: &[u8]| { + let started_token: VerificationToken = event_req + .token + .try_into() + .expect("expected start verification token"); + self.verif_handler + .completion_success(started_token, Some(timestamp)) + .expect("Sending completion success failed"); + }; + // handle event requests + if let Ok(event_req) = self.event_request_rx.try_recv() { + match event_req.request { + EventRequest::Enable(event) => { + self.pus_event_dispatcher + .enable_tm_for_event(&event) + .expect("Enabling TM failed"); + update_time(&mut self.time_provider, &mut self.timestamp); + report_completion(event_req, &self.timestamp); + } + EventRequest::Disable(event) => { + self.pus_event_dispatcher + .disable_tm_for_event(&event) + .expect("Disabling TM failed"); + update_time(&mut self.time_provider, &mut self.timestamp); + report_completion(event_req, &self.timestamp); + } + } + } + } + + pub fn generate_pus_event_tm(&mut self) { + // Perform the generation of PUS event packets + if let Ok((event, _param)) = self.pus_event_man_rx.try_recv() { + update_time(&mut self.time_provider, &mut self.timestamp); + self.pus_event_dispatcher + .generate_pus_event_tm_generic( + self.tm_sender.upcast_mut(), + &self.timestamp, + event, + None, + ) + .expect("Sending TM as event failed"); + } + } +} + +pub struct EventManagerWrapper { + event_manager: MpscEventManager, + event_sender: mpsc::Sender<(EventU32, Option)>, +} + +impl EventManagerWrapper { + pub fn new() -> Self { + // The sender handle is the primary sender handle for all components which want to create events. + // The event manager will receive the RX handle to receive all the events. + let (event_sender, event_man_rx) = mpsc::channel(); + let event_recv = MpscEventReceiver::::new(event_man_rx); + Self { + event_manager: EventManagerWithMpscQueue::new(Box::new(event_recv)), + event_sender, + } + } + + pub fn clone_event_sender(&self) -> mpsc::Sender<(EventU32, Option)> { + self.event_sender.clone() + } + + pub fn event_manager(&mut self) -> &mut MpscEventManager { + &mut self.event_manager + } + + pub fn try_event_routing(&mut self) { + // Perform the event routing. + self.event_manager + .try_event_handling() + .expect("event handling failed"); + } +} + +pub struct EventHandler { + pub event_man_wrapper: EventManagerWrapper, + pub pus_event_handler: PusEventHandler, +} + +impl EventHandler { + pub fn new( + tm_sender: impl EcssTmSender, + verif_handler: VerificationReporterWithSender, + event_request_rx: mpsc::Receiver, + ) -> Self { + let mut event_man_wrapper = EventManagerWrapper::new(); + let pus_event_handler = PusEventHandler::new( + verif_handler, + event_man_wrapper.event_manager(), + event_request_rx, + tm_sender, + ); + Self { + event_man_wrapper, + pus_event_handler, + } + } + + pub fn clone_event_sender(&self) -> mpsc::Sender<(EventU32, Option)> { + self.event_man_wrapper.clone_event_sender() + } + + #[allow(dead_code)] + pub fn event_manager(&mut self) -> &mut MpscEventManager { + self.event_man_wrapper.event_manager() + } + + pub fn periodic_operation(&mut self) { + self.pus_event_handler.handle_event_requests(); + self.event_man_wrapper.try_event_routing(); + self.pus_event_handler.generate_pus_event_tm(); + } +} diff --git a/satrs-example/src/lib.rs b/satrs-example/src/lib.rs index 75fc598..97157f4 100644 --- a/satrs-example/src/lib.rs +++ b/satrs-example/src/lib.rs @@ -1,17 +1,12 @@ use derive_new::new; -use num_enum::{IntoPrimitive, TryFromPrimitive}; -use satrs_core::events::{EventU32TypedSev, SeverityInfo}; -use satrs_core::objects::ObjectId; use satrs_core::spacepackets::ecss::tc::IsPusTelecommand; use satrs_core::spacepackets::ecss::PusPacket; use satrs_core::spacepackets::{ByteConversionError, CcsdsPacket}; use satrs_core::tmtc::TargetId; use std::fmt; -use std::net::Ipv4Addr; use thiserror::Error; -use satrs_mib::res_code::{ResultU16, ResultU16Info}; -use satrs_mib::resultcode; +pub mod config; pub type Apid = u16; @@ -62,96 +57,3 @@ impl TargetIdWithApid { }) } } - -pub const PUS_APID: u16 = 0x02; - -#[derive(Copy, Clone, PartialEq, Eq, Debug, TryFromPrimitive, IntoPrimitive)] -#[repr(u8)] -pub enum CustomPusServiceId { - Mode = 200, - Health = 201, -} - -#[derive(Copy, Clone, Eq, PartialEq, Debug)] -pub enum RequestTargetId { - AcsSubsystem = 1, -} - -pub const AOCS_APID: u16 = 1; - -pub const ACS_OBJECT_ID: ObjectId = ObjectId { - id: RequestTargetId::AcsSubsystem as u32, - name: "ACS_SUBSYSTEM", -}; - -#[derive(Debug)] -pub enum GroupId { - Tmtc = 0, - Hk = 1, -} - -pub const OBSW_SERVER_ADDR: Ipv4Addr = Ipv4Addr::UNSPECIFIED; -pub const SERVER_PORT: u16 = 7301; - -pub const TEST_EVENT: EventU32TypedSev = - EventU32TypedSev::::const_new(0, 0); - -pub mod tmtc_err { - use super::*; - - #[resultcode] - pub const INVALID_PUS_SERVICE: ResultU16 = ResultU16::const_new(GroupId::Tmtc as u8, 0); - #[resultcode] - pub const INVALID_PUS_SUBSERVICE: ResultU16 = ResultU16::const_new(GroupId::Tmtc as u8, 1); - #[resultcode] - pub const PUS_SERVICE_NOT_IMPLEMENTED: ResultU16 = ResultU16::const_new(GroupId::Tmtc as u8, 2); - #[resultcode] - pub const UNKNOWN_TARGET_ID: ResultU16 = ResultU16::const_new(GroupId::Tmtc as u8, 3); - - #[resultcode( - info = "Not enough data inside the TC application data field. Optionally includes: \ - 8 bytes of failure data containing 2 failure parameters, \ - P1 (u32 big endian): Expected data length, P2: Found data length" - )] - pub const NOT_ENOUGH_APP_DATA: ResultU16 = ResultU16::const_new(GroupId::Tmtc as u8, 2); - - pub const TMTC_RESULTS: &[ResultU16Info] = &[ - INVALID_PUS_SERVICE_EXT, - INVALID_PUS_SUBSERVICE_EXT, - NOT_ENOUGH_APP_DATA_EXT, - ]; -} - -pub mod hk_err { - use super::*; - - #[resultcode] - pub const TARGET_ID_MISSING: ResultU16 = ResultU16::const_new(GroupId::Hk as u8, 0); - #[resultcode] - pub const UNIQUE_ID_MISSING: ResultU16 = ResultU16::const_new(GroupId::Hk as u8, 1); - #[resultcode] - pub const UNKNOWN_TARGET_ID: ResultU16 = ResultU16::const_new(GroupId::Hk as u8, 2); - #[resultcode] - pub const COLLECTION_INTERVAL_MISSING: ResultU16 = ResultU16::const_new(GroupId::Hk as u8, 3); -} - -#[allow(clippy::enum_variant_names)] -#[derive(Copy, Clone, PartialEq, Eq)] -pub enum TmSenderId { - PusVerification = 0, - PusTest = 1, - PusEvent = 2, - PusHk = 3, - PusAction = 4, - PusSched = 5, - AllEvents = 6, -} - -#[derive(Copy, Clone, PartialEq, Eq)] -pub enum TcReceiverId { - PusTest = 1, - PusEvent = 2, - PusHk = 3, - PusAction = 4, - PusSched = 5, -} diff --git a/satrs-example/src/main.rs b/satrs-example/src/main.rs index ed1ef32..113103c 100644 --- a/satrs-example/src/main.rs +++ b/satrs-example/src/main.rs @@ -1,152 +1,95 @@ +mod acs; mod ccsds; +mod events; mod hk; mod logger; mod pus; mod requests; mod tcp; +mod tm_funnel; mod tmtc; mod udp; -use log::{info, warn}; +use crate::events::EventHandler; +use crate::pus::stack::PusStack; +use crate::tm_funnel::{TmFunnelDynamic, TmFunnelStatic}; +use log::info; +use pus::test::create_test_service_dynamic; use satrs_core::hal::std::tcp_server::ServerConfig; use satrs_core::hal::std::udp_server::UdpTcServer; +use satrs_example::config::pool::{create_sched_tc_pool, create_static_pools}; +use satrs_example::config::tasks::{ + FREQ_MS_AOCS, FREQ_MS_EVENT_HANDLING, FREQ_MS_PUS_STACK, FREQ_MS_UDP_TMTC, +}; +use satrs_example::config::{RequestTargetId, TmSenderId, OBSW_SERVER_ADDR, PUS_APID, SERVER_PORT}; +use tmtc::PusTcSourceProviderDynamic; +use udp::DynamicUdpTmHandler; +use crate::acs::AcsTask; use crate::ccsds::CcsdsReceiver; -use crate::hk::{AcsHkIds, HkUniqueId}; use crate::logger::setup_logger; -use crate::pus::action::{Pus8Wrapper, PusService8ActionHandler}; -use crate::pus::event::Pus5Wrapper; -use crate::pus::hk::{Pus3Wrapper, PusService3HkHandler}; -use crate::pus::scheduler::Pus11Wrapper; -use crate::pus::test::Service17CustomWrapper; +use crate::pus::action::{create_action_service_dynamic, create_action_service_static}; +use crate::pus::event::{create_event_service_dynamic, create_event_service_static}; +use crate::pus::hk::{create_hk_service_dynamic, create_hk_service_static}; +use crate::pus::scheduler::{create_scheduler_service_dynamic, create_scheduler_service_static}; +use crate::pus::test::create_test_service_static; use crate::pus::{PusReceiver, PusTcMpscRouter}; -use crate::requests::{Request, RequestWithToken}; +use crate::requests::RequestWithToken; use crate::tcp::{SyncTcpTmSource, TcpTask}; -use crate::tmtc::{PusTcSource, TcArgs, TcStore, TmArgs, TmFunnel, TmtcTask}; -use crate::udp::UdpTmtcServer; -use satrs_core::event_man::{ - EventManagerWithMpscQueue, MpscEventReceiver, MpscEventU32SendProvider, SendEventProvider, -}; -use satrs_core::events::EventU32; -use satrs_core::hk::HkRequest; -use satrs_core::pool::{PoolProviderMemInPlace, StaticMemoryPool, StaticPoolConfig}; -use satrs_core::pus::event_man::{ - DefaultPusMgmtBackendProvider, EventReporter, EventRequest, EventRequestWithToken, - PusEventDispatcher, -}; -use satrs_core::pus::event_srv::PusService5EventHandler; -use satrs_core::pus::hk::Subservice as HkSubservice; -use satrs_core::pus::scheduler::PusScheduler; -use satrs_core::pus::scheduler_srv::PusService11SchedHandler; -use satrs_core::pus::test::PusService17TestHandler; -use satrs_core::pus::verification::{ - TcStateStarted, VerificationReporterCfg, VerificationReporterWithSender, VerificationToken, -}; -use satrs_core::pus::{ - EcssTcInSharedStoreConverter, MpscTcReceiver, MpscTmInStoreSender, PusServiceHelper, -}; -use satrs_core::seq_count::{CcsdsSimpleSeqCountProvider, SequenceCountProviderCore}; -use satrs_core::spacepackets::ecss::tm::{PusTmCreator, PusTmZeroCopyWriter}; -use satrs_core::spacepackets::{ - ecss::tm::PusTmSecondaryHeader, time::cds::TimeProvider, time::TimeWriter, SequenceFlags, - SpHeader, +use crate::tmtc::{ + PusTcSourceProviderSharedPool, SharedTcPool, TcArgs, TmArgs, TmtcTaskDynamic, TmtcTaskStatic, }; +use crate::udp::{StaticUdpTmHandler, UdpTmtcServer}; +use satrs_core::pus::event_man::EventRequestWithToken; +use satrs_core::pus::verification::{VerificationReporterCfg, VerificationReporterWithSender}; +use satrs_core::pus::{EcssTmSender, MpscTmAsVecSender, MpscTmInStoreSender}; +use satrs_core::spacepackets::{time::cds::TimeProvider, time::TimeWriter}; use satrs_core::tmtc::tm_helper::SharedTmStore; use satrs_core::tmtc::{CcsdsDistributor, TargetId}; use satrs_core::ChannelId; -use satrs_example::{ - RequestTargetId, TargetIdWithApid, TcReceiverId, TmSenderId, OBSW_SERVER_ADDR, PUS_APID, - SERVER_PORT, -}; +use satrs_example::TargetIdWithApid; use std::collections::HashMap; use std::net::{IpAddr, SocketAddr}; -use std::sync::mpsc::{channel, TryRecvError}; +use std::sync::mpsc::{self, channel}; use std::sync::{Arc, RwLock}; use std::thread; use std::time::Duration; -fn main() { - setup_logger().expect("setting up logging with fern failed"); - println!("Running OBSW example"); - let tm_pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![ - (30, 32), - (15, 64), - (15, 128), - (15, 256), - (15, 1024), - (15, 2048), - ])); +fn create_verification_reporter(verif_sender: impl EcssTmSender) -> VerificationReporterWithSender { + let verif_cfg = VerificationReporterCfg::new(PUS_APID, 1, 2, 8).unwrap(); + // Every software component which needs to generate verification telemetry, gets a cloned + // verification reporter. + VerificationReporterWithSender::new(&verif_cfg, Box::new(verif_sender)) +} + +#[allow(dead_code)] +fn static_tmtc_pool_main() { + let (tm_pool, tc_pool) = create_static_pools(); let shared_tm_store = SharedTmStore::new(tm_pool); - let tm_store_event = shared_tm_store.clone(); - let tc_pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![ - (30, 32), - (15, 64), - (15, 128), - (15, 256), - (15, 1024), - (15, 2048), - ])); - let tc_store = TcStore { + let shared_tc_pool = SharedTcPool { pool: Arc::new(RwLock::new(tc_pool)), }; - let sched_tc_pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![ - (30, 32), - (15, 64), - (15, 128), - (15, 256), - (15, 1024), - (15, 2048), - ])); - - let seq_count_provider = CcsdsSimpleSeqCountProvider::new(); - let mut msg_counter_map: HashMap = HashMap::new(); - let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT); let (tc_source_tx, tc_source_rx) = channel(); let (tm_funnel_tx, tm_funnel_rx) = channel(); let (tm_server_tx, tm_server_rx) = channel(); - let verif_sender = MpscTmInStoreSender::new( + + // Every software component which needs to generate verification telemetry, receives a cloned + // verification reporter. + let verif_reporter = create_verification_reporter(MpscTmInStoreSender::new( TmSenderId::PusVerification as ChannelId, "verif_sender", shared_tm_store.clone(), tm_funnel_tx.clone(), - ); - let verif_cfg = VerificationReporterCfg::new(PUS_APID, 1, 2, 8).unwrap(); - // Every software component which needs to generate verification telemetry, gets a cloned - // verification reporter. - let verif_reporter = VerificationReporterWithSender::new(&verif_cfg, Box::new(verif_sender)); - let reporter_event_handler = verif_reporter.clone(); - let reporter_aocs = verif_reporter.clone(); - - // Create event handling components - // These sender handles are used to send event requests, for example to enable or disable - // certain events - let (event_request_tx, event_request_rx) = channel::(); - // The sender handle is the primary sender handle for all components which want to create events. - // The event manager will receive the RX handle to receive all the events. - let (event_sender, event_man_rx) = channel(); - let event_recv = MpscEventReceiver::::new(event_man_rx); - let test_srv_event_sender = event_sender; - let mut event_man = EventManagerWithMpscQueue::new(Box::new(event_recv)); - - // All events sent to the manager are routed to the PUS event manager, which generates PUS event - // telemetry for each event. - let event_reporter = EventReporter::new(PUS_APID, 128).unwrap(); - let pus_tm_backend = DefaultPusMgmtBackendProvider::::default(); - let mut pus_event_dispatcher = - PusEventDispatcher::new(event_reporter, Box::new(pus_tm_backend)); - let (pus_event_man_tx, pus_event_man_rx) = channel(); - let pus_event_man_send_provider = MpscEventU32SendProvider::new(1, pus_event_man_tx); - event_man.subscribe_all(pus_event_man_send_provider.id()); - event_man.add_sender(pus_event_man_send_provider); + )); + let acs_target_id = TargetIdWithApid::new(PUS_APID, RequestTargetId::AcsSubsystem as TargetId); + let (acs_thread_tx, acs_thread_rx) = channel::(); // Some request are targetable. This map is used to retrieve sender handles based on a target ID. let mut request_map = HashMap::new(); - let (acs_thread_tx, acs_thread_rx) = channel::(); - let target_apid = TargetIdWithApid::new(PUS_APID, RequestTargetId::AcsSubsystem as TargetId); - request_map.insert(target_apid, acs_thread_tx); + request_map.insert(acs_target_id, acs_thread_tx); - let tc_source_wrapper = PusTcSource { - tc_store: tc_store.clone(), + let tc_source_wrapper = PusTcSourceProviderSharedPool { + shared_pool: shared_tc_pool.clone(), tc_source: tc_source_tx, }; @@ -161,8 +104,23 @@ fn main() { tm_udp_server_rx: tm_server_rx, }; - let aocs_tm_funnel = tm_funnel_tx.clone(); - let aocs_tm_store = shared_tm_store.clone(); + // Create event handling components + // These sender handles are used to send event requests, for example to enable or disable + // certain events. + let (event_request_tx, event_request_rx) = mpsc::channel::(); + + // The event task is the core handler to perform the event routing and TM handling as specified + // in the sat-rs documentation. + let mut event_handler = EventHandler::new( + MpscTmInStoreSender::new( + TmSenderId::AllEvents as ChannelId, + "ALL_EVENTS_TX", + shared_tm_store.clone(), + tm_funnel_tx.clone(), + ), + verif_reporter.clone(), + event_request_rx, + ); let (pus_test_tx, pus_test_rx) = channel(); let (pus_event_tx, pus_event_rx) = channel(); @@ -176,134 +134,102 @@ fn main() { hk_service_receiver: pus_hk_tx, action_service_receiver: pus_action_tx, }; - let test_srv_tm_sender = MpscTmInStoreSender::new( - TmSenderId::PusTest as ChannelId, - "PUS_17_TM_SENDER", + let pus_test_service = create_test_service_static( shared_tm_store.clone(), tm_funnel_tx.clone(), - ); - let test_srv_receiver = MpscTcReceiver::new( - TcReceiverId::PusTest as ChannelId, - "PUS_17_TC_RECV", + verif_reporter.clone(), + shared_tc_pool.pool.clone(), + event_handler.clone_event_sender(), pus_test_rx, ); - let pus17_handler = PusService17TestHandler::new(PusServiceHelper::new( - Box::new(test_srv_receiver), - Box::new(test_srv_tm_sender), - PUS_APID, + let pus_scheduler_service = create_scheduler_service_static( + shared_tm_store.clone(), + tm_funnel_tx.clone(), verif_reporter.clone(), - EcssTcInSharedStoreConverter::new(tc_store.pool.clone(), 2048), - )); - let mut pus_17_wrapper = Service17CustomWrapper { - pus17_handler, - test_srv_event_sender, - }; - - let sched_srv_tm_sender = MpscTmInStoreSender::new( - TmSenderId::PusSched as ChannelId, - "PUS_11_TM_SENDER", - shared_tm_store.clone(), - tm_funnel_tx.clone(), - ); - let sched_srv_receiver = MpscTcReceiver::new( - TcReceiverId::PusSched as ChannelId, - "PUS_11_TC_RECV", - pus_sched_rx, - ); - let scheduler = PusScheduler::new_with_current_init_time(Duration::from_secs(5)) - .expect("Creating PUS Scheduler failed"); - let pus_11_handler = PusService11SchedHandler::new( - PusServiceHelper::new( - Box::new(sched_srv_receiver), - Box::new(sched_srv_tm_sender), - PUS_APID, - verif_reporter.clone(), - EcssTcInSharedStoreConverter::new(tc_store.pool.clone(), 2048), - ), - scheduler, - ); - let mut pus_11_wrapper = Pus11Wrapper { - pus_11_handler, - sched_tc_pool, tc_source_wrapper, - }; - - let event_srv_tm_sender = MpscTmInStoreSender::new( - TmSenderId::PusEvent as ChannelId, - "PUS_5_TM_SENDER", + pus_sched_rx, + create_sched_tc_pool(), + ); + let pus_event_service = create_event_service_static( shared_tm_store.clone(), tm_funnel_tx.clone(), - ); - let event_srv_receiver = MpscTcReceiver::new( - TcReceiverId::PusEvent as ChannelId, - "PUS_5_TC_RECV", + verif_reporter.clone(), + shared_tc_pool.pool.clone(), pus_event_rx, - ); - let pus_5_handler = PusService5EventHandler::new( - PusServiceHelper::new( - Box::new(event_srv_receiver), - Box::new(event_srv_tm_sender), - PUS_APID, - verif_reporter.clone(), - EcssTcInSharedStoreConverter::new(tc_store.pool.clone(), 2048), - ), event_request_tx, ); - let mut pus_5_wrapper = Pus5Wrapper { pus_5_handler }; - - let action_srv_tm_sender = MpscTmInStoreSender::new( - TmSenderId::PusAction as ChannelId, - "PUS_8_TM_SENDER", + let pus_action_service = create_action_service_static( shared_tm_store.clone(), tm_funnel_tx.clone(), - ); - let action_srv_receiver = MpscTcReceiver::new( - TcReceiverId::PusAction as ChannelId, - "PUS_8_TC_RECV", - pus_action_rx, - ); - let pus_8_handler = PusService8ActionHandler::new( - Box::new(action_srv_receiver), - Box::new(action_srv_tm_sender), - PUS_APID, verif_reporter.clone(), - EcssTcInSharedStoreConverter::new(tc_store.pool.clone(), 2048), + shared_tc_pool.pool.clone(), + pus_action_rx, request_map.clone(), ); - let mut pus_8_wrapper = Pus8Wrapper { pus_8_handler }; - - let hk_srv_tm_sender = MpscTmInStoreSender::new( - TmSenderId::PusHk as ChannelId, - "PUS_3_TM_SENDER", + let pus_hk_service = create_hk_service_static( shared_tm_store.clone(), tm_funnel_tx.clone(), - ); - let hk_srv_receiver = - MpscTcReceiver::new(TcReceiverId::PusHk as ChannelId, "PUS_8_TC_RECV", pus_hk_rx); - let pus_3_handler = PusService3HkHandler::new( - Box::new(hk_srv_receiver), - Box::new(hk_srv_tm_sender), - PUS_APID, verif_reporter.clone(), - EcssTcInSharedStoreConverter::new(tc_store.pool.clone(), 2048), + shared_tc_pool.pool.clone(), + pus_hk_rx, request_map, ); - let mut pus_3_wrapper = Pus3Wrapper { pus_3_handler }; + let mut pus_stack = PusStack::new( + pus_hk_service, + pus_event_service, + pus_action_service, + pus_scheduler_service, + pus_test_service, + ); let ccsds_receiver = CcsdsReceiver { tc_source: tc_args.tc_source.clone(), }; - let mut tmtc_task = TmtcTask::new(tc_args, PusReceiver::new(verif_reporter, pus_router)); + let mut tmtc_task = TmtcTaskStatic::new( + tc_args, + PusReceiver::new(verif_reporter.clone(), pus_router), + ); + let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT); let udp_ccsds_distributor = CcsdsDistributor::new(Box::new(ccsds_receiver.clone())); let udp_tc_server = UdpTcServer::new(sock_addr, 2048, Box::new(udp_ccsds_distributor)) .expect("creating UDP TMTC server failed"); let mut udp_tmtc_server = UdpTmtcServer { udp_tc_server, - tm_rx: tm_args.tm_udp_server_rx, - tm_store: tm_args.tm_store.clone_backing_pool(), + tm_handler: StaticUdpTmHandler { + tm_rx: tm_args.tm_udp_server_rx, + tm_store: tm_args.tm_store.clone_backing_pool(), + }, }; + let tcp_ccsds_distributor = CcsdsDistributor::new(Box::new(ccsds_receiver)); + let tcp_server_cfg = ServerConfig::new(sock_addr, Duration::from_millis(400), 4096, 8192); + let sync_tm_tcp_source = SyncTcpTmSource::new(200); + let mut tcp_server = TcpTask::new( + tcp_server_cfg, + sync_tm_tcp_source.clone(), + tcp_ccsds_distributor, + ) + .expect("tcp server creation failed"); + + let mut acs_task = AcsTask::new( + MpscTmInStoreSender::new( + TmSenderId::AcsSubsystem as ChannelId, + "ACS_TASK_SENDER", + shared_tm_store.clone(), + tm_funnel_tx.clone(), + ), + acs_thread_rx, + verif_reporter, + ); + + let mut tm_funnel = TmFunnelStatic::new( + shared_tm_store, + sync_tm_tcp_source, + tm_funnel_rx, + tm_server_tx, + ); + info!("Starting TMTC and UDP task"); let jh_udp_tmtc = thread::Builder::new() .name("TMTC and UDP".to_string()) @@ -312,20 +238,11 @@ fn main() { loop { udp_tmtc_server.periodic_operation(); tmtc_task.periodic_operation(); - thread::sleep(Duration::from_millis(400)); + thread::sleep(Duration::from_millis(FREQ_MS_UDP_TMTC)); } }) .unwrap(); - let tcp_ccsds_distributor = CcsdsDistributor::new(Box::new(ccsds_receiver)); - let tcp_server_cfg = ServerConfig::new(sock_addr, Duration::from_millis(400), 4096, 8192); - let mut sync_tm_tcp_source = SyncTcpTmSource::new(200); - let mut tcp_server = TcpTask::new( - tcp_server_cfg, - sync_tm_tcp_source.clone(), - tcp_ccsds_distributor, - ) - .expect("tcp server creation failed"); info!("Starting TCP task"); let jh_tcp = thread::Builder::new() .name("TCP".to_string()) @@ -338,225 +255,271 @@ fn main() { .unwrap(); info!("Starting TM funnel task"); - let jh1 = thread::Builder::new() + let jh_tm_funnel = thread::Builder::new() .name("TM Funnel".to_string()) - .spawn(move || { - let tm_funnel = TmFunnel { - tm_server_tx, - tm_funnel_rx, - }; - loop { - if let Ok(addr) = tm_funnel.tm_funnel_rx.recv() { - // Read the TM, set sequence counter and message counter, and finally update - // the CRC. - let shared_pool = shared_tm_store.clone_backing_pool(); - let mut pool_guard = shared_pool.write().expect("Locking TM pool failed"); - let tm_raw = pool_guard - .modify(&addr) - .expect("Reading TM from pool failed"); - let mut zero_copy_writer = PusTmZeroCopyWriter::new(tm_raw) - .expect("Creating TM zero copy writer failed"); - zero_copy_writer.set_apid(PUS_APID); - zero_copy_writer.set_seq_count(seq_count_provider.get_and_increment()); - let entry = msg_counter_map - .entry(zero_copy_writer.service()) - .or_insert(0); - zero_copy_writer.set_msg_count(*entry); - if *entry == u16::MAX { - *entry = 0; - } else { - *entry += 1; - } - - // This operation has to come last! - zero_copy_writer.finish(); - tm_funnel - .tm_server_tx - .send(addr) - .expect("Sending TM to server failed"); - sync_tm_tcp_source.add_tm(tm_raw); - } - } + .spawn(move || loop { + tm_funnel.operation(); }) .unwrap(); info!("Starting event handling task"); - let jh2 = thread::Builder::new() + let jh_event_handling = thread::Builder::new() .name("Event".to_string()) - .spawn(move || { - let mut timestamp: [u8; 7] = [0; 7]; - let mut sender = MpscTmInStoreSender::new( - TmSenderId::AllEvents as ChannelId, - "ALL_EVENTS_TX", - tm_store_event.clone(), - tm_funnel_tx, - ); - let mut time_provider = TimeProvider::new_with_u16_days(0, 0); - let report_completion = |event_req: EventRequestWithToken, timestamp: &[u8]| { - let started_token: VerificationToken = event_req - .token - .try_into() - .expect("expected start verification token"); - reporter_event_handler - .completion_success(started_token, Some(timestamp)) - .expect("Sending completion success failed"); - }; - loop { - // handle event requests - if let Ok(event_req) = event_request_rx.try_recv() { - match event_req.request { - EventRequest::Enable(event) => { - pus_event_dispatcher - .enable_tm_for_event(&event) - .expect("Enabling TM failed"); - update_time(&mut time_provider, &mut timestamp); - report_completion(event_req, ×tamp); - } - EventRequest::Disable(event) => { - pus_event_dispatcher - .disable_tm_for_event(&event) - .expect("Disabling TM failed"); - update_time(&mut time_provider, &mut timestamp); - report_completion(event_req, ×tamp); - } - } - } - - // Perform the event routing. - event_man - .try_event_handling() - .expect("event handling failed"); - - // Perform the generation of PUS event packets - if let Ok((event, _param)) = pus_event_man_rx.try_recv() { - update_time(&mut time_provider, &mut timestamp); - pus_event_dispatcher - .generate_pus_event_tm_generic(&mut sender, ×tamp, event, None) - .expect("Sending TM as event failed"); - } - thread::sleep(Duration::from_millis(400)); - } + .spawn(move || loop { + event_handler.periodic_operation(); + thread::sleep(Duration::from_millis(FREQ_MS_EVENT_HANDLING)); }) .unwrap(); info!("Starting AOCS thread"); - let jh3 = thread::Builder::new() + let jh_aocs = thread::Builder::new() .name("AOCS".to_string()) - .spawn(move || { - let mut timestamp: [u8; 7] = [0; 7]; - let mut time_provider = TimeProvider::new_with_u16_days(0, 0); - loop { - // TODO: Move this into a separate function/task/module.. - match acs_thread_rx.try_recv() { - Ok(request) => { - info!( - "ACS thread: Received HK request {:?}", - request.targeted_request - ); - update_time(&mut time_provider, &mut timestamp); - match request.targeted_request.request { - Request::Hk(hk_req) => match hk_req { - HkRequest::OneShot(unique_id) => { - // TODO: We should check whether the unique ID is even valid. - let target = request.targeted_request.target_id; - assert_eq!( - target.target_id(), - RequestTargetId::AcsSubsystem as u32 - ); - if request.targeted_request.target_id.target - == AcsHkIds::TestMgmSet as u32 - { - let mut sp_header = SpHeader::tm( - PUS_APID, - SequenceFlags::Unsegmented, - 0, - 0, - ) - .unwrap(); - let sec_header = PusTmSecondaryHeader::new_simple( - 3, - HkSubservice::TmHkPacket as u8, - ×tamp, - ); - let mut buf: [u8; 8] = [0; 8]; - let hk_id = HkUniqueId::new(target.target_id(), unique_id); - hk_id.write_to_be_bytes(&mut buf).unwrap(); - let pus_tm = PusTmCreator::new( - &mut sp_header, - sec_header, - &buf, - true, - ); - let addr = aocs_tm_store - .add_pus_tm(&pus_tm) - .expect("Adding PUS TM failed"); - aocs_tm_funnel.send(addr).expect("Sending HK TM failed"); - } - } - HkRequest::Enable(_) => {} - HkRequest::Disable(_) => {} - HkRequest::ModifyCollectionInterval(_, _) => {} - }, - Request::Mode(_mode_req) => { - warn!("mode request handling not implemented yet") - } - Request::Action(_action_req) => { - warn!("action request handling not implemented yet") - } - } - let started_token = reporter_aocs - .start_success(request.token, Some(×tamp)) - .expect("Sending start success failed"); - reporter_aocs - .completion_success(started_token, Some(×tamp)) - .expect("Sending completion success failed"); - } - Err(e) => match e { - TryRecvError::Empty => {} - TryRecvError::Disconnected => { - warn!("ACS thread: Message Queue TX disconnected!") - } - }, - } - thread::sleep(Duration::from_millis(500)); - } + .spawn(move || loop { + acs_task.periodic_operation(); + thread::sleep(Duration::from_millis(FREQ_MS_AOCS)); }) .unwrap(); info!("Starting PUS handler thread"); - let jh4 = thread::Builder::new() + let jh_pus_handler = thread::Builder::new() .name("PUS".to_string()) .spawn(move || loop { - pus_11_wrapper.release_tcs(); - loop { - let mut all_queues_empty = true; - let mut is_srv_finished = |srv_handler_finished: bool| { - if !srv_handler_finished { - all_queues_empty = false; - } - }; - is_srv_finished(pus_17_wrapper.handle_next_packet()); - is_srv_finished(pus_11_wrapper.handle_next_packet()); - is_srv_finished(pus_5_wrapper.handle_next_packet()); - is_srv_finished(pus_8_wrapper.handle_next_packet()); - is_srv_finished(pus_3_wrapper.handle_next_packet()); - if all_queues_empty { - break; - } - } - thread::sleep(Duration::from_millis(200)); + pus_stack.periodic_operation(); + thread::sleep(Duration::from_millis(FREQ_MS_PUS_STACK)); }) .unwrap(); + jh_udp_tmtc .join() .expect("Joining UDP TMTC server thread failed"); jh_tcp .join() .expect("Joining TCP TMTC server thread failed"); - jh1.join().expect("Joining TM Funnel thread failed"); - jh2.join().expect("Joining Event Manager thread failed"); - jh3.join().expect("Joining AOCS thread failed"); - jh4.join().expect("Joining PUS handler thread failed"); + jh_tm_funnel + .join() + .expect("Joining TM Funnel thread failed"); + jh_event_handling + .join() + .expect("Joining Event Manager thread failed"); + jh_aocs.join().expect("Joining AOCS thread failed"); + jh_pus_handler + .join() + .expect("Joining PUS handler thread failed"); +} + +#[allow(dead_code)] +fn dyn_tmtc_pool_main() { + let (tc_source_tx, tc_source_rx) = channel(); + let (tm_funnel_tx, tm_funnel_rx) = channel(); + let (tm_server_tx, tm_server_rx) = channel(); + // Every software component which needs to generate verification telemetry, gets a cloned + // verification reporter. + let verif_reporter = create_verification_reporter(MpscTmAsVecSender::new( + TmSenderId::PusVerification as ChannelId, + "verif_sender", + tm_funnel_tx.clone(), + )); + + let acs_target_id = TargetIdWithApid::new(PUS_APID, RequestTargetId::AcsSubsystem as TargetId); + let (acs_thread_tx, acs_thread_rx) = channel::(); + // Some request are targetable. This map is used to retrieve sender handles based on a target ID. + let mut request_map = HashMap::new(); + request_map.insert(acs_target_id, acs_thread_tx); + + let tc_source = PusTcSourceProviderDynamic(tc_source_tx); + + // Create event handling components + // These sender handles are used to send event requests, for example to enable or disable + // certain events. + let (event_request_tx, event_request_rx) = mpsc::channel::(); + // The event task is the core handler to perform the event routing and TM handling as specified + // in the sat-rs documentation. + let mut event_handler = EventHandler::new( + MpscTmAsVecSender::new( + TmSenderId::AllEvents as ChannelId, + "ALL_EVENTS_TX", + tm_funnel_tx.clone(), + ), + verif_reporter.clone(), + event_request_rx, + ); + + let (pus_test_tx, pus_test_rx) = channel(); + let (pus_event_tx, pus_event_rx) = channel(); + let (pus_sched_tx, pus_sched_rx) = channel(); + let (pus_hk_tx, pus_hk_rx) = channel(); + let (pus_action_tx, pus_action_rx) = channel(); + let pus_router = PusTcMpscRouter { + test_service_receiver: pus_test_tx, + event_service_receiver: pus_event_tx, + sched_service_receiver: pus_sched_tx, + hk_service_receiver: pus_hk_tx, + action_service_receiver: pus_action_tx, + }; + + let pus_test_service = create_test_service_dynamic( + tm_funnel_tx.clone(), + verif_reporter.clone(), + event_handler.clone_event_sender(), + pus_test_rx, + ); + let pus_scheduler_service = create_scheduler_service_dynamic( + tm_funnel_tx.clone(), + verif_reporter.clone(), + tc_source.0.clone(), + pus_sched_rx, + create_sched_tc_pool(), + ); + + let pus_event_service = create_event_service_dynamic( + tm_funnel_tx.clone(), + verif_reporter.clone(), + pus_event_rx, + event_request_tx, + ); + let pus_action_service = create_action_service_dynamic( + tm_funnel_tx.clone(), + verif_reporter.clone(), + pus_action_rx, + request_map.clone(), + ); + let pus_hk_service = create_hk_service_dynamic( + tm_funnel_tx.clone(), + verif_reporter.clone(), + pus_hk_rx, + request_map, + ); + let mut pus_stack = PusStack::new( + pus_hk_service, + pus_event_service, + pus_action_service, + pus_scheduler_service, + pus_test_service, + ); + + let ccsds_receiver = CcsdsReceiver { tc_source }; + + let mut tmtc_task = TmtcTaskDynamic::new( + tc_source_rx, + PusReceiver::new(verif_reporter.clone(), pus_router), + ); + + let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT); + let udp_ccsds_distributor = CcsdsDistributor::new(Box::new(ccsds_receiver.clone())); + let udp_tc_server = UdpTcServer::new(sock_addr, 2048, Box::new(udp_ccsds_distributor)) + .expect("creating UDP TMTC server failed"); + let mut udp_tmtc_server = UdpTmtcServer { + udp_tc_server, + tm_handler: DynamicUdpTmHandler { + tm_rx: tm_server_rx, + }, + }; + + let tcp_ccsds_distributor = CcsdsDistributor::new(Box::new(ccsds_receiver)); + let tcp_server_cfg = ServerConfig::new(sock_addr, Duration::from_millis(400), 4096, 8192); + let sync_tm_tcp_source = SyncTcpTmSource::new(200); + let mut tcp_server = TcpTask::new( + tcp_server_cfg, + sync_tm_tcp_source.clone(), + tcp_ccsds_distributor, + ) + .expect("tcp server creation failed"); + + let mut acs_task = AcsTask::new( + MpscTmAsVecSender::new( + TmSenderId::AcsSubsystem as ChannelId, + "ACS_TASK_SENDER", + tm_funnel_tx.clone(), + ), + acs_thread_rx, + verif_reporter, + ); + let mut tm_funnel = TmFunnelDynamic::new(sync_tm_tcp_source, tm_funnel_rx, tm_server_tx); + + info!("Starting TMTC and UDP task"); + let jh_udp_tmtc = thread::Builder::new() + .name("TMTC and UDP".to_string()) + .spawn(move || { + info!("Running UDP server on port {SERVER_PORT}"); + loop { + udp_tmtc_server.periodic_operation(); + tmtc_task.periodic_operation(); + thread::sleep(Duration::from_millis(FREQ_MS_UDP_TMTC)); + } + }) + .unwrap(); + + info!("Starting TCP task"); + let jh_tcp = thread::Builder::new() + .name("TCP".to_string()) + .spawn(move || { + info!("Running TCP server on port {SERVER_PORT}"); + loop { + tcp_server.periodic_operation(); + } + }) + .unwrap(); + + info!("Starting TM funnel task"); + let jh_tm_funnel = thread::Builder::new() + .name("TM Funnel".to_string()) + .spawn(move || loop { + tm_funnel.operation(); + }) + .unwrap(); + + info!("Starting event handling task"); + let jh_event_handling = thread::Builder::new() + .name("Event".to_string()) + .spawn(move || loop { + event_handler.periodic_operation(); + thread::sleep(Duration::from_millis(FREQ_MS_EVENT_HANDLING)); + }) + .unwrap(); + + info!("Starting AOCS thread"); + let jh_aocs = thread::Builder::new() + .name("AOCS".to_string()) + .spawn(move || loop { + acs_task.periodic_operation(); + thread::sleep(Duration::from_millis(FREQ_MS_AOCS)); + }) + .unwrap(); + + info!("Starting PUS handler thread"); + let jh_pus_handler = thread::Builder::new() + .name("PUS".to_string()) + .spawn(move || loop { + pus_stack.periodic_operation(); + thread::sleep(Duration::from_millis(FREQ_MS_PUS_STACK)); + }) + .unwrap(); + + jh_udp_tmtc + .join() + .expect("Joining UDP TMTC server thread failed"); + jh_tcp + .join() + .expect("Joining TCP TMTC server thread failed"); + jh_tm_funnel + .join() + .expect("Joining TM Funnel thread failed"); + jh_event_handling + .join() + .expect("Joining Event Manager thread failed"); + jh_aocs.join().expect("Joining AOCS thread failed"); + jh_pus_handler + .join() + .expect("Joining PUS handler thread failed"); +} + +fn main() { + setup_logger().expect("setting up logging with fern failed"); + println!("Running OBSW example"); + #[cfg(not(feature = "dyn_tmtc"))] + static_tmtc_pool_main(); + #[cfg(feature = "dyn_tmtc")] + dyn_tmtc_pool_main(); } pub fn update_time(time_provider: &mut TimeProvider, timestamp: &mut [u8]) { diff --git a/satrs-example/src/pus/action.rs b/satrs-example/src/pus/action.rs index 3b8cc8d..2c7afb2 100644 --- a/satrs-example/src/pus/action.rs +++ b/satrs-example/src/pus/action.rs @@ -1,17 +1,79 @@ use crate::requests::{ActionRequest, Request, RequestWithToken}; use log::{error, warn}; +use satrs_core::pool::{SharedStaticMemoryPool, StoreAddr}; use satrs_core::pus::verification::{ FailParams, TcStateAccepted, VerificationReporterWithSender, VerificationToken, }; use satrs_core::pus::{ - EcssTcInMemConverter, EcssTcInSharedStoreConverter, EcssTcReceiver, EcssTmSender, + EcssTcAndToken, EcssTcInMemConverter, EcssTcInSharedStoreConverter, EcssTcInVecConverter, + EcssTcReceiver, EcssTmSender, MpscTcReceiver, MpscTmAsVecSender, MpscTmInStoreSender, PusPacketHandlerResult, PusPacketHandlingError, PusServiceBase, PusServiceHelper, }; use satrs_core::spacepackets::ecss::tc::PusTcReader; use satrs_core::spacepackets::ecss::PusPacket; -use satrs_example::{tmtc_err, TargetIdWithApid}; +use satrs_core::tmtc::tm_helper::SharedTmStore; +use satrs_core::ChannelId; +use satrs_example::config::{tmtc_err, TcReceiverId, TmSenderId, PUS_APID}; +use satrs_example::TargetIdWithApid; use std::collections::HashMap; -use std::sync::mpsc::Sender; +use std::sync::mpsc::{self, Sender}; + +pub fn create_action_service_static( + shared_tm_store: SharedTmStore, + tm_funnel_tx: mpsc::Sender, + verif_reporter: VerificationReporterWithSender, + tc_pool: SharedStaticMemoryPool, + pus_action_rx: mpsc::Receiver, + request_map: HashMap>, +) -> Pus8Wrapper { + let action_srv_tm_sender = MpscTmInStoreSender::new( + TmSenderId::PusAction as ChannelId, + "PUS_8_TM_SENDER", + shared_tm_store.clone(), + tm_funnel_tx.clone(), + ); + let action_srv_receiver = MpscTcReceiver::new( + TcReceiverId::PusAction as ChannelId, + "PUS_8_TC_RECV", + pus_action_rx, + ); + let pus_8_handler = PusService8ActionHandler::new( + Box::new(action_srv_receiver), + Box::new(action_srv_tm_sender), + PUS_APID, + verif_reporter.clone(), + EcssTcInSharedStoreConverter::new(tc_pool.clone(), 2048), + request_map.clone(), + ); + Pus8Wrapper { pus_8_handler } +} + +pub fn create_action_service_dynamic( + tm_funnel_tx: mpsc::Sender>, + verif_reporter: VerificationReporterWithSender, + pus_action_rx: mpsc::Receiver, + request_map: HashMap>, +) -> Pus8Wrapper { + let action_srv_tm_sender = MpscTmAsVecSender::new( + TmSenderId::PusAction as ChannelId, + "PUS_8_TM_SENDER", + tm_funnel_tx.clone(), + ); + let action_srv_receiver = MpscTcReceiver::new( + TcReceiverId::PusAction as ChannelId, + "PUS_8_TC_RECV", + pus_action_rx, + ); + let pus_8_handler = PusService8ActionHandler::new( + Box::new(action_srv_receiver), + Box::new(action_srv_tm_sender), + PUS_APID, + verif_reporter.clone(), + EcssTcInVecConverter::default(), + request_map.clone(), + ); + Pus8Wrapper { pus_8_handler } +} pub struct PusService8ActionHandler { service_helper: PusServiceHelper, @@ -141,11 +203,11 @@ impl PusService8ActionHandler, +pub struct Pus8Wrapper { + pub(crate) pus_8_handler: PusService8ActionHandler, } -impl Pus8Wrapper { +impl Pus8Wrapper { pub fn handle_next_packet(&mut self) -> bool { match self.pus_8_handler.handle_one_tc() { Ok(result) => match result { diff --git a/satrs-example/src/pus/event.rs b/satrs-example/src/pus/event.rs index 08aa786..825c6fd 100644 --- a/satrs-example/src/pus/event.rs +++ b/satrs-example/src/pus/event.rs @@ -1,12 +1,85 @@ -use log::{error, warn}; -use satrs_core::pus::event_srv::PusService5EventHandler; -use satrs_core::pus::{EcssTcInSharedStoreConverter, PusPacketHandlerResult}; +use std::sync::mpsc; -pub struct Pus5Wrapper { - pub pus_5_handler: PusService5EventHandler, +use log::{error, warn}; +use satrs_core::pool::{SharedStaticMemoryPool, StoreAddr}; +use satrs_core::pus::event_man::EventRequestWithToken; +use satrs_core::pus::event_srv::PusService5EventHandler; +use satrs_core::pus::verification::VerificationReporterWithSender; +use satrs_core::pus::{ + EcssTcAndToken, EcssTcInMemConverter, EcssTcInSharedStoreConverter, EcssTcInVecConverter, + MpscTcReceiver, MpscTmAsVecSender, MpscTmInStoreSender, PusPacketHandlerResult, + PusServiceHelper, +}; +use satrs_core::tmtc::tm_helper::SharedTmStore; +use satrs_core::ChannelId; +use satrs_example::config::{TcReceiverId, TmSenderId, PUS_APID}; + +pub fn create_event_service_static( + shared_tm_store: SharedTmStore, + tm_funnel_tx: mpsc::Sender, + verif_reporter: VerificationReporterWithSender, + tc_pool: SharedStaticMemoryPool, + pus_event_rx: mpsc::Receiver, + event_request_tx: mpsc::Sender, +) -> Pus5Wrapper { + let event_srv_tm_sender = MpscTmInStoreSender::new( + TmSenderId::PusEvent as ChannelId, + "PUS_5_TM_SENDER", + shared_tm_store.clone(), + tm_funnel_tx.clone(), + ); + let event_srv_receiver = MpscTcReceiver::new( + TcReceiverId::PusEvent as ChannelId, + "PUS_5_TC_RECV", + pus_event_rx, + ); + let pus_5_handler = PusService5EventHandler::new( + PusServiceHelper::new( + Box::new(event_srv_receiver), + Box::new(event_srv_tm_sender), + PUS_APID, + verif_reporter.clone(), + EcssTcInSharedStoreConverter::new(tc_pool.clone(), 2048), + ), + event_request_tx, + ); + Pus5Wrapper { pus_5_handler } } -impl Pus5Wrapper { +pub fn create_event_service_dynamic( + tm_funnel_tx: mpsc::Sender>, + verif_reporter: VerificationReporterWithSender, + pus_event_rx: mpsc::Receiver, + event_request_tx: mpsc::Sender, +) -> Pus5Wrapper { + let event_srv_tm_sender = MpscTmAsVecSender::new( + TmSenderId::PusEvent as ChannelId, + "PUS_5_TM_SENDER", + tm_funnel_tx, + ); + let event_srv_receiver = MpscTcReceiver::new( + TcReceiverId::PusEvent as ChannelId, + "PUS_5_TC_RECV", + pus_event_rx, + ); + let pus_5_handler = PusService5EventHandler::new( + PusServiceHelper::new( + Box::new(event_srv_receiver), + Box::new(event_srv_tm_sender), + PUS_APID, + verif_reporter.clone(), + EcssTcInVecConverter::default(), + ), + event_request_tx, + ); + Pus5Wrapper { pus_5_handler } +} + +pub struct Pus5Wrapper { + pub pus_5_handler: PusService5EventHandler, +} + +impl Pus5Wrapper { pub fn handle_next_packet(&mut self) -> bool { match self.pus_5_handler.handle_one_tc() { Ok(result) => match result { diff --git a/satrs-example/src/pus/hk.rs b/satrs-example/src/pus/hk.rs index ef373c4..a40fd7a 100644 --- a/satrs-example/src/pus/hk.rs +++ b/satrs-example/src/pus/hk.rs @@ -1,15 +1,73 @@ use crate::requests::{Request, RequestWithToken}; use log::{error, warn}; use satrs_core::hk::{CollectionIntervalFactor, HkRequest}; -use satrs_core::pus::verification::{FailParams, StdVerifReporterWithSender}; +use satrs_core::pool::{SharedStaticMemoryPool, StoreAddr}; +use satrs_core::pus::verification::{ + FailParams, StdVerifReporterWithSender, VerificationReporterWithSender, +}; use satrs_core::pus::{ - EcssTcInMemConverter, EcssTcInSharedStoreConverter, EcssTcReceiver, EcssTmSender, + EcssTcAndToken, EcssTcInMemConverter, EcssTcInSharedStoreConverter, EcssTcInVecConverter, + EcssTcReceiver, EcssTmSender, MpscTcReceiver, MpscTmAsVecSender, MpscTmInStoreSender, PusPacketHandlerResult, PusPacketHandlingError, PusServiceBase, PusServiceHelper, }; use satrs_core::spacepackets::ecss::{hk, PusPacket}; -use satrs_example::{hk_err, tmtc_err, TargetIdWithApid}; +use satrs_core::tmtc::tm_helper::SharedTmStore; +use satrs_core::ChannelId; +use satrs_example::config::{hk_err, tmtc_err, TcReceiverId, TmSenderId, PUS_APID}; +use satrs_example::TargetIdWithApid; use std::collections::HashMap; -use std::sync::mpsc::Sender; +use std::sync::mpsc::{self, Sender}; + +pub fn create_hk_service_static( + shared_tm_store: SharedTmStore, + tm_funnel_tx: mpsc::Sender, + verif_reporter: VerificationReporterWithSender, + tc_pool: SharedStaticMemoryPool, + pus_hk_rx: mpsc::Receiver, + request_map: HashMap>, +) -> Pus3Wrapper { + let hk_srv_tm_sender = MpscTmInStoreSender::new( + TmSenderId::PusHk as ChannelId, + "PUS_3_TM_SENDER", + shared_tm_store.clone(), + tm_funnel_tx.clone(), + ); + let hk_srv_receiver = + MpscTcReceiver::new(TcReceiverId::PusHk as ChannelId, "PUS_8_TC_RECV", pus_hk_rx); + let pus_3_handler = PusService3HkHandler::new( + Box::new(hk_srv_receiver), + Box::new(hk_srv_tm_sender), + PUS_APID, + verif_reporter.clone(), + EcssTcInSharedStoreConverter::new(tc_pool, 2048), + request_map, + ); + Pus3Wrapper { pus_3_handler } +} + +pub fn create_hk_service_dynamic( + tm_funnel_tx: mpsc::Sender>, + verif_reporter: VerificationReporterWithSender, + pus_hk_rx: mpsc::Receiver, + request_map: HashMap>, +) -> Pus3Wrapper { + let hk_srv_tm_sender = MpscTmAsVecSender::new( + TmSenderId::PusHk as ChannelId, + "PUS_3_TM_SENDER", + tm_funnel_tx.clone(), + ); + let hk_srv_receiver = + MpscTcReceiver::new(TcReceiverId::PusHk as ChannelId, "PUS_8_TC_RECV", pus_hk_rx); + let pus_3_handler = PusService3HkHandler::new( + Box::new(hk_srv_receiver), + Box::new(hk_srv_tm_sender), + PUS_APID, + verif_reporter.clone(), + EcssTcInVecConverter::default(), + request_map, + ); + Pus3Wrapper { pus_3_handler } +} pub struct PusService3HkHandler { psb: PusServiceHelper, @@ -147,11 +205,11 @@ impl PusService3HkHandler, +pub struct Pus3Wrapper { + pub(crate) pus_3_handler: PusService3HkHandler, } -impl Pus3Wrapper { +impl Pus3Wrapper { pub fn handle_next_packet(&mut self) -> bool { match self.pus_3_handler.handle_one_tc() { Ok(result) => match result { diff --git a/satrs-example/src/pus/mod.rs b/satrs-example/src/pus/mod.rs index a545b6b..3fe3cea 100644 --- a/satrs-example/src/pus/mod.rs +++ b/satrs-example/src/pus/mod.rs @@ -6,13 +6,14 @@ use satrs_core::spacepackets::ecss::tc::PusTcReader; use satrs_core::spacepackets::ecss::PusServiceId; use satrs_core::spacepackets::time::cds::TimeProvider; use satrs_core::spacepackets::time::TimeWriter; -use satrs_example::{tmtc_err, CustomPusServiceId}; +use satrs_example::config::{tmtc_err, CustomPusServiceId}; use std::sync::mpsc::Sender; pub mod action; pub mod event; pub mod hk; pub mod scheduler; +pub mod stack; pub mod test; pub struct PusTcMpscRouter { diff --git a/satrs-example/src/pus/scheduler.rs b/satrs-example/src/pus/scheduler.rs index 75f3494..04d8d7a 100644 --- a/satrs-example/src/pus/scheduler.rs +++ b/satrs-example/src/pus/scheduler.rs @@ -1,36 +1,66 @@ -use crate::tmtc::PusTcSource; +use std::sync::mpsc; +use std::time::Duration; + use log::{error, info, warn}; -use satrs_core::pool::{PoolProviderMemInPlace, StaticMemoryPool}; +use satrs_core::pool::{PoolProviderMemInPlace, StaticMemoryPool, StoreAddr}; use satrs_core::pus::scheduler::{PusScheduler, TcInfo}; use satrs_core::pus::scheduler_srv::PusService11SchedHandler; -use satrs_core::pus::{EcssTcInSharedStoreConverter, PusPacketHandlerResult}; +use satrs_core::pus::verification::VerificationReporterWithSender; +use satrs_core::pus::{ + EcssTcAndToken, EcssTcInMemConverter, EcssTcInSharedStoreConverter, EcssTcInVecConverter, + MpscTcReceiver, MpscTmAsVecSender, MpscTmInStoreSender, PusPacketHandlerResult, + PusServiceHelper, +}; +use satrs_core::tmtc::tm_helper::SharedTmStore; +use satrs_core::ChannelId; +use satrs_example::config::{TcReceiverId, TmSenderId, PUS_APID}; -pub struct Pus11Wrapper { - pub pus_11_handler: PusService11SchedHandler, - pub sched_tc_pool: StaticMemoryPool, - pub tc_source_wrapper: PusTcSource, +use crate::tmtc::PusTcSourceProviderSharedPool; + +pub trait TcReleaser { + fn release(&mut self, enabled: bool, info: &TcInfo, tc: &[u8]) -> bool; } -impl Pus11Wrapper { - pub fn release_tcs(&mut self) { - let releaser = |enabled: bool, _info: &TcInfo, tc: &[u8]| -> bool { - if enabled { - // Transfer TC from scheduler TC pool to shared TC pool. - let released_tc_addr = self - .tc_source_wrapper - .tc_store - .pool - .write() - .expect("locking pool failed") - .add(tc) - .expect("adding TC to shared pool failed"); +impl TcReleaser for PusTcSourceProviderSharedPool { + fn release(&mut self, enabled: bool, _info: &TcInfo, tc: &[u8]) -> bool { + if enabled { + // Transfer TC from scheduler TC pool to shared TC pool. + let released_tc_addr = self + .shared_pool + .pool + .write() + .expect("locking pool failed") + .add(tc) + .expect("adding TC to shared pool failed"); + self.tc_source + .send(released_tc_addr) + .expect("sending TC to TC source failed"); + } + true + } +} - self.tc_source_wrapper - .tc_source - .send(released_tc_addr) - .expect("sending TC to TC source failed"); - } - true +impl TcReleaser for mpsc::Sender> { + fn release(&mut self, enabled: bool, _info: &TcInfo, tc: &[u8]) -> bool { + if enabled { + // Send released TC to centralized TC source. + self.send(tc.to_vec()) + .expect("sending TC to TC source failed"); + } + true + } +} + +pub struct Pus11Wrapper { + pub pus_11_handler: PusService11SchedHandler, + pub sched_tc_pool: StaticMemoryPool, + pub tc_releaser: Box, +} + +impl Pus11Wrapper { + pub fn release_tcs(&mut self) { + let releaser = |enabled: bool, info: &TcInfo, tc: &[u8]| -> bool { + self.tc_releaser.release(enabled, info, tc) }; self.pus_11_handler @@ -71,3 +101,77 @@ impl Pus11Wrapper { false } } + +pub fn create_scheduler_service_static( + shared_tm_store: SharedTmStore, + tm_funnel_tx: mpsc::Sender, + verif_reporter: VerificationReporterWithSender, + tc_releaser: PusTcSourceProviderSharedPool, + pus_sched_rx: mpsc::Receiver, + sched_tc_pool: StaticMemoryPool, +) -> Pus11Wrapper { + let sched_srv_tm_sender = MpscTmInStoreSender::new( + TmSenderId::PusSched as ChannelId, + "PUS_11_TM_SENDER", + shared_tm_store.clone(), + tm_funnel_tx.clone(), + ); + let sched_srv_receiver = MpscTcReceiver::new( + TcReceiverId::PusSched as ChannelId, + "PUS_11_TC_RECV", + pus_sched_rx, + ); + let scheduler = PusScheduler::new_with_current_init_time(Duration::from_secs(5)) + .expect("Creating PUS Scheduler failed"); + let pus_11_handler = PusService11SchedHandler::new( + PusServiceHelper::new( + Box::new(sched_srv_receiver), + Box::new(sched_srv_tm_sender), + PUS_APID, + verif_reporter.clone(), + EcssTcInSharedStoreConverter::new(tc_releaser.clone_backing_pool(), 2048), + ), + scheduler, + ); + Pus11Wrapper { + pus_11_handler, + sched_tc_pool, + tc_releaser: Box::new(tc_releaser), + } +} + +pub fn create_scheduler_service_dynamic( + tm_funnel_tx: mpsc::Sender>, + verif_reporter: VerificationReporterWithSender, + tc_source_sender: mpsc::Sender>, + pus_sched_rx: mpsc::Receiver, + sched_tc_pool: StaticMemoryPool, +) -> Pus11Wrapper { + let sched_srv_tm_sender = MpscTmAsVecSender::new( + TmSenderId::PusSched as ChannelId, + "PUS_11_TM_SENDER", + tm_funnel_tx, + ); + let sched_srv_receiver = MpscTcReceiver::new( + TcReceiverId::PusSched as ChannelId, + "PUS_11_TC_RECV", + pus_sched_rx, + ); + let scheduler = PusScheduler::new_with_current_init_time(Duration::from_secs(5)) + .expect("Creating PUS Scheduler failed"); + let pus_11_handler = PusService11SchedHandler::new( + PusServiceHelper::new( + Box::new(sched_srv_receiver), + Box::new(sched_srv_tm_sender), + PUS_APID, + verif_reporter.clone(), + EcssTcInVecConverter::default(), + ), + scheduler, + ); + Pus11Wrapper { + pus_11_handler, + sched_tc_pool, + tc_releaser: Box::new(tc_source_sender), + } +} diff --git a/satrs-example/src/pus/stack.rs b/satrs-example/src/pus/stack.rs new file mode 100644 index 0000000..33bd5af --- /dev/null +++ b/satrs-example/src/pus/stack.rs @@ -0,0 +1,52 @@ +use satrs_core::pus::EcssTcInMemConverter; + +use super::{ + action::Pus8Wrapper, event::Pus5Wrapper, hk::Pus3Wrapper, scheduler::Pus11Wrapper, + test::Service17CustomWrapper, +}; + +pub struct PusStack { + event_srv: Pus5Wrapper, + hk_srv: Pus3Wrapper, + action_srv: Pus8Wrapper, + schedule_srv: Pus11Wrapper, + test_srv: Service17CustomWrapper, +} + +impl PusStack { + pub fn new( + hk_srv: Pus3Wrapper, + event_srv: Pus5Wrapper, + action_srv: Pus8Wrapper, + schedule_srv: Pus11Wrapper, + test_srv: Service17CustomWrapper, + ) -> Self { + Self { + event_srv, + action_srv, + schedule_srv, + test_srv, + hk_srv, + } + } + + pub fn periodic_operation(&mut self) { + self.schedule_srv.release_tcs(); + loop { + let mut all_queues_empty = true; + let mut is_srv_finished = |srv_handler_finished: bool| { + if !srv_handler_finished { + all_queues_empty = false; + } + }; + is_srv_finished(self.test_srv.handle_next_packet()); + is_srv_finished(self.schedule_srv.handle_next_packet()); + is_srv_finished(self.event_srv.handle_next_packet()); + is_srv_finished(self.action_srv.handle_next_packet()); + is_srv_finished(self.hk_srv.handle_next_packet()); + if all_queues_empty { + break; + } + } + } +} diff --git a/satrs-example/src/pus/test.rs b/satrs-example/src/pus/test.rs index a52d111..b00899c 100644 --- a/satrs-example/src/pus/test.rs +++ b/satrs-example/src/pus/test.rs @@ -1,22 +1,89 @@ use log::{info, warn}; use satrs_core::params::Params; +use satrs_core::pool::{SharedStaticMemoryPool, StoreAddr}; use satrs_core::pus::test::PusService17TestHandler; -use satrs_core::pus::verification::FailParams; -use satrs_core::pus::{EcssTcInMemConverter, PusPacketHandlerResult}; +use satrs_core::pus::verification::{FailParams, VerificationReporterWithSender}; +use satrs_core::pus::{ + EcssTcAndToken, EcssTcInMemConverter, EcssTcInVecConverter, MpscTcReceiver, MpscTmAsVecSender, + MpscTmInStoreSender, PusPacketHandlerResult, PusServiceHelper, +}; use satrs_core::spacepackets::ecss::tc::PusTcReader; use satrs_core::spacepackets::ecss::PusPacket; use satrs_core::spacepackets::time::cds::TimeProvider; use satrs_core::spacepackets::time::TimeWriter; +use satrs_core::tmtc::tm_helper::SharedTmStore; +use satrs_core::ChannelId; use satrs_core::{events::EventU32, pus::EcssTcInSharedStoreConverter}; -use satrs_example::{tmtc_err, TEST_EVENT}; -use std::sync::mpsc::Sender; +use satrs_example::config::{tmtc_err, TcReceiverId, TmSenderId, PUS_APID, TEST_EVENT}; +use std::sync::mpsc::{self, Sender}; -pub struct Service17CustomWrapper { - pub pus17_handler: PusService17TestHandler, +pub fn create_test_service_static( + shared_tm_store: SharedTmStore, + tm_funnel_tx: mpsc::Sender, + verif_reporter: VerificationReporterWithSender, + tc_pool: SharedStaticMemoryPool, + event_sender: mpsc::Sender<(EventU32, Option)>, + pus_test_rx: mpsc::Receiver, +) -> Service17CustomWrapper { + let test_srv_tm_sender = MpscTmInStoreSender::new( + TmSenderId::PusTest as ChannelId, + "PUS_17_TM_SENDER", + shared_tm_store.clone(), + tm_funnel_tx.clone(), + ); + let test_srv_receiver = MpscTcReceiver::new( + TcReceiverId::PusTest as ChannelId, + "PUS_17_TC_RECV", + pus_test_rx, + ); + let pus17_handler = PusService17TestHandler::new(PusServiceHelper::new( + Box::new(test_srv_receiver), + Box::new(test_srv_tm_sender), + PUS_APID, + verif_reporter.clone(), + EcssTcInSharedStoreConverter::new(tc_pool, 2048), + )); + Service17CustomWrapper { + pus17_handler, + test_srv_event_sender: event_sender, + } +} + +pub fn create_test_service_dynamic( + tm_funnel_tx: mpsc::Sender>, + verif_reporter: VerificationReporterWithSender, + event_sender: mpsc::Sender<(EventU32, Option)>, + pus_test_rx: mpsc::Receiver, +) -> Service17CustomWrapper { + let test_srv_tm_sender = MpscTmAsVecSender::new( + TmSenderId::PusTest as ChannelId, + "PUS_17_TM_SENDER", + tm_funnel_tx.clone(), + ); + let test_srv_receiver = MpscTcReceiver::new( + TcReceiverId::PusTest as ChannelId, + "PUS_17_TC_RECV", + pus_test_rx, + ); + let pus17_handler = PusService17TestHandler::new(PusServiceHelper::new( + Box::new(test_srv_receiver), + Box::new(test_srv_tm_sender), + PUS_APID, + verif_reporter.clone(), + EcssTcInVecConverter::default(), + )); + Service17CustomWrapper { + pus17_handler, + test_srv_event_sender: event_sender, + } +} + +pub struct Service17CustomWrapper { + pub pus17_handler: PusService17TestHandler, pub test_srv_event_sender: Sender<(EventU32, Option)>, } -impl Service17CustomWrapper { +impl Service17CustomWrapper { pub fn handle_next_packet(&mut self) -> bool { let res = self.pus17_handler.handle_one_tc(); if res.is_err() { diff --git a/satrs-example/src/requests.rs b/satrs-example/src/requests.rs index bcae8d7..031f0f4 100644 --- a/satrs-example/src/requests.rs +++ b/satrs-example/src/requests.rs @@ -22,7 +22,7 @@ pub enum Request { #[derive(Clone, Eq, PartialEq, Debug, new)] pub struct TargetedRequest { - pub(crate) target_id: TargetIdWithApid, + pub(crate) target_id_with_apid: TargetIdWithApid, pub(crate) request: Request, } diff --git a/satrs-example/src/tcp.rs b/satrs-example/src/tcp.rs index 8b7feff..c0934e0 100644 --- a/satrs-example/src/tcp.rs +++ b/satrs-example/src/tcp.rs @@ -9,9 +9,7 @@ use satrs_core::{ spacepackets::PacketId, tmtc::{CcsdsDistributor, CcsdsError, TmPacketSourceCore}, }; -use satrs_example::PUS_APID; - -use crate::tmtc::MpscStoreAndSendError; +use satrs_example::config::PUS_APID; pub const PACKET_ID_LOOKUP: &[PacketId] = &[PacketId::const_tc(true, PUS_APID)]; @@ -71,20 +69,21 @@ impl TmPacketSourceCore for SyncTcpTmSource { } } -pub struct TcpTask { +pub struct TcpTask { server: TcpSpacepacketsServer< (), - CcsdsError, + CcsdsError, SyncTcpTmSource, - CcsdsDistributor, + CcsdsDistributor, >, + phantom: std::marker::PhantomData, } -impl TcpTask { +impl TcpTask { pub fn new( cfg: ServerConfig, tm_source: SyncTcpTmSource, - tc_receiver: CcsdsDistributor, + tc_receiver: CcsdsDistributor, ) -> Result { Ok(Self { server: TcpSpacepacketsServer::new( @@ -93,6 +92,7 @@ impl TcpTask { tc_receiver, Box::new(PACKET_ID_LOOKUP), )?, + phantom: std::marker::PhantomData, }) } diff --git a/satrs-example/src/tm_funnel.rs b/satrs-example/src/tm_funnel.rs new file mode 100644 index 0000000..ba1ff32 --- /dev/null +++ b/satrs-example/src/tm_funnel.rs @@ -0,0 +1,145 @@ +use std::{ + collections::HashMap, + sync::mpsc::{Receiver, Sender}, +}; + +use satrs_core::{ + pool::{PoolProviderMemInPlace, StoreAddr}, + seq_count::{CcsdsSimpleSeqCountProvider, SequenceCountProviderCore}, + spacepackets::{ + ecss::{tm::PusTmZeroCopyWriter, PusPacket}, + time::cds::MIN_CDS_FIELD_LEN, + CcsdsPacket, + }, + tmtc::tm_helper::SharedTmStore, +}; + +use crate::tcp::SyncTcpTmSource; + +#[derive(Default)] +pub struct CcsdsSeqCounterMap { + apid_seq_counter_map: HashMap, +} +impl CcsdsSeqCounterMap { + pub fn get_and_increment(&mut self, apid: u16) -> u16 { + self.apid_seq_counter_map + .entry(apid) + .or_default() + .get_and_increment() + } +} + +pub struct TmFunnelCommon { + seq_counter_map: CcsdsSeqCounterMap, + msg_counter_map: HashMap, + sync_tm_tcp_source: SyncTcpTmSource, +} + +impl TmFunnelCommon { + pub fn new(sync_tm_tcp_source: SyncTcpTmSource) -> Self { + Self { + seq_counter_map: Default::default(), + msg_counter_map: Default::default(), + sync_tm_tcp_source, + } + } + + // Applies common packet processing operations for PUS TM packets. This includes setting + // a sequence counter + fn apply_packet_processing(&mut self, mut zero_copy_writer: PusTmZeroCopyWriter) { + // zero_copy_writer.set_apid(PUS_APID); + zero_copy_writer.set_seq_count( + self.seq_counter_map + .get_and_increment(zero_copy_writer.apid()), + ); + let entry = self + .msg_counter_map + .entry(zero_copy_writer.service()) + .or_insert(0); + zero_copy_writer.set_msg_count(*entry); + if *entry == u16::MAX { + *entry = 0; + } else { + *entry += 1; + } + + // This operation has to come last! + zero_copy_writer.finish(); + } +} + +pub struct TmFunnelStatic { + common: TmFunnelCommon, + shared_tm_store: SharedTmStore, + tm_funnel_rx: Receiver, + tm_server_tx: Sender, +} + +impl TmFunnelStatic { + pub fn new( + shared_tm_store: SharedTmStore, + sync_tm_tcp_source: SyncTcpTmSource, + tm_funnel_rx: Receiver, + tm_server_tx: Sender, + ) -> Self { + Self { + common: TmFunnelCommon::new(sync_tm_tcp_source), + shared_tm_store, + tm_funnel_rx, + tm_server_tx, + } + } + + pub fn operation(&mut self) { + if let Ok(addr) = self.tm_funnel_rx.recv() { + // Read the TM, set sequence counter and message counter, and finally update + // the CRC. + let shared_pool = self.shared_tm_store.clone_backing_pool(); + let mut pool_guard = shared_pool.write().expect("Locking TM pool failed"); + let tm_raw = pool_guard + .modify(&addr) + .expect("Reading TM from pool failed"); + let zero_copy_writer = PusTmZeroCopyWriter::new(tm_raw, MIN_CDS_FIELD_LEN) + .expect("Creating TM zero copy writer failed"); + self.common.apply_packet_processing(zero_copy_writer); + self.tm_server_tx + .send(addr) + .expect("Sending TM to server failed"); + self.common.sync_tm_tcp_source.add_tm(tm_raw); + } + } +} + +pub struct TmFunnelDynamic { + common: TmFunnelCommon, + tm_funnel_rx: Receiver>, + tm_server_tx: Sender>, +} + +impl TmFunnelDynamic { + pub fn new( + sync_tm_tcp_source: SyncTcpTmSource, + tm_funnel_rx: Receiver>, + tm_server_tx: Sender>, + ) -> Self { + Self { + common: TmFunnelCommon::new(sync_tm_tcp_source), + tm_funnel_rx, + tm_server_tx, + } + } + + pub fn operation(&mut self) { + if let Ok(mut tm) = self.tm_funnel_rx.recv() { + // Read the TM, set sequence counter and message counter, and finally update + // the CRC. + let zero_copy_writer = PusTmZeroCopyWriter::new(&mut tm, MIN_CDS_FIELD_LEN) + .expect("Creating TM zero copy writer failed"); + self.common.apply_packet_processing(zero_copy_writer); + self.tm_server_tx + .send(tm.clone()) + .expect("Sending TM to server failed"); + self.common.sync_tm_tcp_source.add_tm(&tm); + } + } +} diff --git a/satrs-example/src/tmtc.rs b/satrs-example/src/tmtc.rs index b4a584d..39b4b3c 100644 --- a/satrs-example/src/tmtc.rs +++ b/satrs-example/src/tmtc.rs @@ -1,7 +1,7 @@ use log::warn; use satrs_core::pus::{EcssTcAndToken, ReceivesEcssPusTc}; use satrs_core::spacepackets::SpHeader; -use std::sync::mpsc::{Receiver, SendError, Sender, TryRecvError}; +use std::sync::mpsc::{self, Receiver, SendError, Sender, TryRecvError}; use thiserror::Error; use crate::pus::PusReceiver; @@ -18,13 +18,13 @@ pub struct TmArgs { } pub struct TcArgs { - pub tc_source: PusTcSource, + pub tc_source: PusTcSourceProviderSharedPool, pub tc_receiver: Receiver, } impl TcArgs { #[allow(dead_code)] - fn split(self) -> (PusTcSource, Receiver) { + fn split(self) -> (PusTcSourceProviderSharedPool, Receiver) { (self.tc_source, self.tc_receiver) } } @@ -40,11 +40,11 @@ pub enum MpscStoreAndSendError { } #[derive(Clone)] -pub struct TcStore { +pub struct SharedTcPool { pub pool: SharedStaticMemoryPool, } -impl TcStore { +impl SharedTcPool { pub fn add_pus_tc(&mut self, pus_tc: &PusTcReader) -> Result { let mut pg = self.pool.write().expect("error locking TC store"); let (addr, buf) = pg.free_element(pus_tc.len_packed())?; @@ -53,32 +53,34 @@ impl TcStore { } } -pub struct TmFunnel { - pub tm_funnel_rx: Receiver, - pub tm_server_tx: Sender, -} - #[derive(Clone)] -pub struct PusTcSource { +pub struct PusTcSourceProviderSharedPool { pub tc_source: Sender, - pub tc_store: TcStore, + pub shared_pool: SharedTcPool, } -impl ReceivesEcssPusTc for PusTcSource { +impl PusTcSourceProviderSharedPool { + #[allow(dead_code)] + pub fn clone_backing_pool(&self) -> SharedStaticMemoryPool { + self.shared_pool.pool.clone() + } +} + +impl ReceivesEcssPusTc for PusTcSourceProviderSharedPool { type Error = MpscStoreAndSendError; fn pass_pus_tc(&mut self, _: &SpHeader, pus_tc: &PusTcReader) -> Result<(), Self::Error> { - let addr = self.tc_store.add_pus_tc(pus_tc)?; + let addr = self.shared_pool.add_pus_tc(pus_tc)?; self.tc_source.send(addr)?; Ok(()) } } -impl ReceivesCcsdsTc for PusTcSource { +impl ReceivesCcsdsTc for PusTcSourceProviderSharedPool { type Error = MpscStoreAndSendError; fn pass_ccsds(&mut self, _: &SpHeader, tc_raw: &[u8]) -> Result<(), Self::Error> { - let mut pool = self.tc_store.pool.write().expect("locking pool failed"); + let mut pool = self.shared_pool.pool.write().expect("locking pool failed"); let addr = pool.add(tc_raw)?; drop(pool); self.tc_source.send(addr)?; @@ -86,13 +88,35 @@ impl ReceivesCcsdsTc for PusTcSource { } } -pub struct TmtcTask { +// Newtype, can not implement necessary traits on MPSC sender directly because of orphan rules. +#[derive(Clone)] +pub struct PusTcSourceProviderDynamic(pub Sender>); + +impl ReceivesEcssPusTc for PusTcSourceProviderDynamic { + type Error = SendError>; + + fn pass_pus_tc(&mut self, _: &SpHeader, pus_tc: &PusTcReader) -> Result<(), Self::Error> { + self.0.send(pus_tc.raw_data().to_vec())?; + Ok(()) + } +} + +impl ReceivesCcsdsTc for PusTcSourceProviderDynamic { + type Error = mpsc::SendError>; + + fn pass_ccsds(&mut self, _: &SpHeader, tc_raw: &[u8]) -> Result<(), Self::Error> { + self.0.send(tc_raw.to_vec())?; + Ok(()) + } +} + +pub struct TmtcTaskStatic { tc_args: TcArgs, tc_buf: [u8; 4096], pus_receiver: PusReceiver, } -impl TmtcTask { +impl TmtcTaskStatic { pub fn new(tc_args: TcArgs, pus_receiver: PusReceiver) -> Self { Self { tc_args, @@ -111,7 +135,7 @@ impl TmtcTask { let pool = self .tc_args .tc_source - .tc_store + .shared_pool .pool .read() .expect("locking tc pool failed"); @@ -146,3 +170,50 @@ impl TmtcTask { } } } + +pub struct TmtcTaskDynamic { + pub tc_receiver: Receiver>, + pus_receiver: PusReceiver, +} + +impl TmtcTaskDynamic { + pub fn new(tc_receiver: Receiver>, pus_receiver: PusReceiver) -> Self { + Self { + tc_receiver, + pus_receiver, + } + } + + pub fn periodic_operation(&mut self) { + self.poll_tc(); + } + + pub fn poll_tc(&mut self) -> bool { + match self.tc_receiver.try_recv() { + Ok(tc) => match PusTcReader::new(&tc) { + Ok((pus_tc, _)) => { + self.pus_receiver + .handle_tc_packet( + satrs_core::pus::TcInMemory::Vec(tc.clone()), + pus_tc.service(), + &pus_tc, + ) + .ok(); + true + } + Err(e) => { + warn!("error creating PUS TC from raw data: {e}"); + warn!("raw data: {:x?}", tc); + true + } + }, + Err(e) => match e { + TryRecvError::Empty => false, + TryRecvError::Disconnected => { + warn!("tmtc thread: sender disconnected"); + false + } + }, + } + } +} diff --git a/satrs-example/src/udp.rs b/satrs-example/src/udp.rs index 3853fd3..11f84b2 100644 --- a/satrs-example/src/udp.rs +++ b/satrs-example/src/udp.rs @@ -1,4 +1,7 @@ -use std::{net::SocketAddr, sync::mpsc::Receiver}; +use std::{ + net::{SocketAddr, UdpSocket}, + sync::mpsc::Receiver, +}; use log::{info, warn}; use satrs_core::{ @@ -7,45 +10,17 @@ use satrs_core::{ tmtc::CcsdsError, }; -use crate::tmtc::MpscStoreAndSendError; +pub trait UdpTmHandler { + fn send_tm_to_udp_client(&mut self, socket: &UdpSocket, recv_addr: &SocketAddr); +} -pub struct UdpTmtcServer { - pub udp_tc_server: UdpTcServer>, +pub struct StaticUdpTmHandler { pub tm_rx: Receiver, pub tm_store: SharedStaticMemoryPool, } -impl UdpTmtcServer { - pub fn periodic_operation(&mut self) { - while self.poll_tc_server() {} - if let Some(recv_addr) = self.udp_tc_server.last_sender() { - self.send_tm_to_udp_client(&recv_addr); - } - } - fn poll_tc_server(&mut self) -> bool { - match self.udp_tc_server.try_recv_tc() { - Ok(_) => true, - Err(e) => match e { - ReceiveResult::ReceiverError(e) => match e { - CcsdsError::ByteConversionError(e) => { - warn!("packet error: {e:?}"); - true - } - CcsdsError::CustomError(e) => { - warn!("mpsc store and send error {e:?}"); - true - } - }, - ReceiveResult::IoError(e) => { - warn!("IO error {e}"); - false - } - ReceiveResult::NothingReceived => false, - }, - } - } - - fn send_tm_to_udp_client(&mut self, recv_addr: &SocketAddr) { +impl UdpTmHandler for StaticUdpTmHandler { + fn send_tm_to_udp_client(&mut self, socket: &UdpSocket, &recv_addr: &SocketAddr) { while let Ok(addr) = self.tm_rx.try_recv() { let store_lock = self.tm_store.write(); if store_lock.is_err() { @@ -67,10 +42,181 @@ impl UdpTmtcServer { } else { info!("Sending PUS TM"); } - let result = self.udp_tc_server.socket.send_to(buf, recv_addr); + let result = socket.send_to(buf, recv_addr); if let Err(e) = result { warn!("Sending TM with UDP socket failed: {e}") } } } } + +pub struct DynamicUdpTmHandler { + pub tm_rx: Receiver>, +} + +impl UdpTmHandler for DynamicUdpTmHandler { + fn send_tm_to_udp_client(&mut self, socket: &UdpSocket, recv_addr: &SocketAddr) { + while let Ok(tm) = self.tm_rx.try_recv() { + if tm.len() > 9 { + let service = tm[7]; + let subservice = tm[8]; + info!("Sending PUS TM[{service},{subservice}]") + } else { + info!("Sending PUS TM"); + } + let result = socket.send_to(&tm, recv_addr); + if let Err(e) = result { + warn!("Sending TM with UDP socket failed: {e}") + } + } + } +} + +pub struct UdpTmtcServer { + pub udp_tc_server: UdpTcServer>, + pub tm_handler: TmHandler, +} + +impl + UdpTmtcServer +{ + pub fn periodic_operation(&mut self) { + while self.poll_tc_server() {} + if let Some(recv_addr) = self.udp_tc_server.last_sender() { + self.tm_handler + .send_tm_to_udp_client(&self.udp_tc_server.socket, &recv_addr); + } + } + + fn poll_tc_server(&mut self) -> bool { + match self.udp_tc_server.try_recv_tc() { + Ok(_) => true, + Err(e) => match e { + ReceiveResult::ReceiverError(e) => match e { + CcsdsError::ByteConversionError(e) => { + warn!("packet error: {e:?}"); + true + } + CcsdsError::CustomError(e) => { + warn!("mpsc custom error {e:?}"); + true + } + }, + ReceiveResult::IoError(e) => { + warn!("IO error {e}"); + false + } + ReceiveResult::NothingReceived => false, + }, + } + } +} + +#[cfg(test)] +mod tests { + use std::{ + collections::VecDeque, + net::IpAddr, + sync::{Arc, Mutex}, + }; + + use satrs_core::{ + spacepackets::{ + ecss::{tc::PusTcCreator, WritablePusPacket}, + SpHeader, + }, + tmtc::ReceivesTcCore, + }; + use satrs_example::config::{OBSW_SERVER_ADDR, PUS_APID}; + + use super::*; + + #[derive(Default, Debug, Clone)] + pub struct TestReceiver { + tc_vec: Arc>>>, + } + + impl ReceivesTcCore for TestReceiver { + type Error = CcsdsError<()>; + fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> { + self.tc_vec.lock().unwrap().push_back(tc_raw.to_vec()); + Ok(()) + } + } + + #[derive(Default, Debug, Clone)] + pub struct TestTmHandler { + addrs_to_send_to: Arc>>, + } + + impl UdpTmHandler for TestTmHandler { + fn send_tm_to_udp_client(&mut self, _socket: &UdpSocket, recv_addr: &SocketAddr) { + self.addrs_to_send_to.lock().unwrap().push_back(*recv_addr); + } + } + + #[test] + fn test_basic() { + let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), 0); + let test_receiver = TestReceiver::default(); + let tc_queue = test_receiver.tc_vec.clone(); + let udp_tc_server = UdpTcServer::new(sock_addr, 2048, Box::new(test_receiver)).unwrap(); + let tm_handler = TestTmHandler::default(); + let tm_handler_calls = tm_handler.addrs_to_send_to.clone(); + let mut udp_dyn_server = UdpTmtcServer { + udp_tc_server, + tm_handler, + }; + udp_dyn_server.periodic_operation(); + assert!(tc_queue.lock().unwrap().is_empty()); + assert!(tm_handler_calls.lock().unwrap().is_empty()); + } + + #[test] + fn test_transactions() { + let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), 0); + let test_receiver = TestReceiver::default(); + let tc_queue = test_receiver.tc_vec.clone(); + let udp_tc_server = UdpTcServer::new(sock_addr, 2048, Box::new(test_receiver)).unwrap(); + let server_addr = udp_tc_server.socket.local_addr().unwrap(); + let tm_handler = TestTmHandler::default(); + let tm_handler_calls = tm_handler.addrs_to_send_to.clone(); + let mut udp_dyn_server = UdpTmtcServer { + udp_tc_server, + tm_handler, + }; + let mut sph = SpHeader::tc_unseg(PUS_APID, 0, 0).unwrap(); + let ping_tc = PusTcCreator::new_simple(&mut sph, 17, 1, None, true) + .to_vec() + .unwrap(); + let client = UdpSocket::bind("127.0.0.1:0").expect("Connecting to UDP server failed"); + let client_addr = client.local_addr().unwrap(); + client.connect(server_addr).unwrap(); + client.send(&ping_tc).unwrap(); + udp_dyn_server.periodic_operation(); + { + let mut tc_queue = tc_queue.lock().unwrap(); + assert!(!tc_queue.is_empty()); + let received_tc = tc_queue.pop_front().unwrap(); + assert_eq!(received_tc, ping_tc); + } + + { + let mut tm_handler_calls = tm_handler_calls.lock().unwrap(); + assert!(!tm_handler_calls.is_empty()); + assert_eq!(tm_handler_calls.len(), 1); + let received_addr = tm_handler_calls.pop_front().unwrap(); + assert_eq!(received_addr, client_addr); + } + udp_dyn_server.periodic_operation(); + assert!(tc_queue.lock().unwrap().is_empty()); + // Still tries to send to the same client. + { + let mut tm_handler_calls = tm_handler_calls.lock().unwrap(); + assert!(!tm_handler_calls.is_empty()); + assert_eq!(tm_handler_calls.len(), 1); + let received_addr = tm_handler_calls.pop_front().unwrap(); + assert_eq!(received_addr, client_addr); + } + } +}