Major example update
All checks were successful
Rust/sat-rs/pipeline/pr-main This commit looks good

- Increased example modularization by moving the majority
  of app logic inside dedicated modules
- Added a new `dyn_tmtc` feature for the satrs-example which is used
  to configure the heap as the backing store for TMTC packages instead
  of static stores.
- Added dedicated satrs-example chapter in satrs-book
This commit is contained in:
Robin Müller 2024-02-07 18:10:47 +01:00
parent 28da48ca6e
commit 0fd70c08c2
Signed by: muellerr
GPG Key ID: A649FB78196E3849
26 changed files with 1916 additions and 745 deletions

View File

@ -2,9 +2,16 @@
- [Introduction](./introduction.md)
- [Design](./design.md)
# Basic concepts and components
- [Communication with Space Systems](./communication.md)
- [Working with Constrained Systems](./constrained-systems.md)
- [Actions](./actions.md)
- [Modes and Health](./modes-and-health.md)
- [Housekeeping Data](./housekeeping.md)
- [Events](./events.md)
# Example project
- [The satrs-example application](./example.md)

30
satrs-book/src/example.md Normal file
View File

@ -0,0 +1,30 @@
# sat-rs Example Application
The `sat-rs` framework includes a monolithic example application which can be found inside
the [`satrs-example`](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-example)
subdirectory of the repository. The primary purpose of this example application is to show how
the various components of the sat-rs framework could be used as part of a larger on-board
software application.
## Structure of the example project
The example project contains components which could also be expected to be part of a production
On-Board Software.
1. A UDP and TCP server to receive telecommands and poll telemetry from. This might be an optional
component for an OBSW which is only used during the development phase on ground. The TCP
server parses space packets by using the CCSDS space packet ID as the packet start delimiter.
2. A PUS service stack which exposes some functionality conformant with the ECSS PUS service. This
currently includes the following services:
- Service 1 for telecommand verification.
- Service 3 for housekeeping telemetry handling.
- Service 5 for management and downlink of on-board events.
- Service 8 for handling on-board actions.
- Service 11 for scheduling telecommands to be released at a specific time.
- Service 17 for test purposes (pings)
3. An event manager component which handles the event IPC mechanism.
4. A TC source component which demultiplexes and routes telecommands based on parameters like
packet APID or PUS service and subservice type.
5. A TM sink sink component which is the target of all sent telemetry and sends it to downlink
handlers like the UDP and TCP server.
6. An AOCS example task which can also process some PUS commands.

View File

@ -73,10 +73,11 @@ features = ["all"]
optional = true
[dependencies.spacepackets]
version = "0.8.1"
version = "0.9.0"
default-features = false
# git = "https://egit.irs.uni-stuttgart.de/rust/spacepackets.git"
# rev = "297cfad22637d3b07a1b27abe56d9a607b5b82a7"
# branch = "main"
[dependencies.cobs]
git = "https://github.com/robamu/cobs.rs.git"

View File

@ -1,13 +1,13 @@
//! Task scheduling module
use alloc::string::String;
use bus::BusReader;
use std::boxed::Box;
use std::error::Error;
use std::sync::mpsc::TryRecvError;
use std::thread;
use std::thread::JoinHandle;
use std::time::Duration;
use std::vec;
use std::vec::Vec;
use std::{io, thread};
#[derive(Debug, PartialEq, Eq)]
pub enum OpResult {
@ -34,47 +34,49 @@ pub trait Executable: Send {
/// # Arguments
///
/// * `executable`: Executable task
/// * `task_freq`: Optional frequency of task. Required for periodic and fixed cycle tasks
/// * `op_code`: Operation code which is passed to the executable task [operation call][Executable::periodic_op]
/// * `task_freq`: Optional frequency of task. Required for periodic and fixed cycle tasks.
/// If [None] is passed, no sleeping will be performed.
/// * `op_code`: Operation code which is passed to the executable task
/// [operation call][Executable::periodic_op]
/// * `termination`: Optional termination handler which can cancel threads with a broadcast
pub fn exec_sched_single<
T: Executable<Error = E> + Send + 'static + ?Sized,
E: Error + Send + 'static,
>(
pub fn exec_sched_single<T: Executable<Error = E> + Send + 'static + ?Sized, E: Send + 'static>(
mut executable: Box<T>,
task_freq: Option<Duration>,
op_code: i32,
mut termination: Option<BusReader<()>>,
) -> JoinHandle<Result<OpResult, E>> {
) -> Result<JoinHandle<Result<OpResult, E>>, io::Error> {
let mut cycle_count = 0;
thread::spawn(move || loop {
if let Some(ref mut terminator) = termination {
match terminator.try_recv() {
Ok(_) | Err(TryRecvError::Disconnected) => {
return Ok(OpResult::Ok);
}
Err(TryRecvError::Empty) => (),
}
}
match executable.exec_type() {
ExecutionType::OneShot => {
executable.periodic_op(op_code)?;
return Ok(OpResult::Ok);
}
ExecutionType::Infinite => {
executable.periodic_op(op_code)?;
}
ExecutionType::Cycles(cycles) => {
executable.periodic_op(op_code)?;
cycle_count += 1;
if cycle_count == cycles {
return Ok(OpResult::Ok);
thread::Builder::new()
.name(String::from(executable.task_name()))
.spawn(move || loop {
if let Some(ref mut terminator) = termination {
match terminator.try_recv() {
Ok(_) | Err(TryRecvError::Disconnected) => {
return Ok(OpResult::Ok);
}
Err(TryRecvError::Empty) => (),
}
}
}
let freq = task_freq.unwrap_or_else(|| panic!("No task frequency specified"));
thread::sleep(freq);
})
match executable.exec_type() {
ExecutionType::OneShot => {
executable.periodic_op(op_code)?;
return Ok(OpResult::Ok);
}
ExecutionType::Infinite => {
executable.periodic_op(op_code)?;
}
ExecutionType::Cycles(cycles) => {
executable.periodic_op(op_code)?;
cycle_count += 1;
if cycle_count == cycles {
return Ok(OpResult::Ok);
}
}
}
if let Some(freq) = task_freq {
thread::sleep(freq);
}
})
}
/// This function allows executing multiple tasks as long as the tasks implement the
@ -86,55 +88,56 @@ pub fn exec_sched_single<
/// * `task_freq`: Optional frequency of task. Required for periodic and fixed cycle tasks
/// * `op_code`: Operation code which is passed to the executable task [operation call][Executable::periodic_op]
/// * `termination`: Optional termination handler which can cancel threads with a broadcast
pub fn exec_sched_multi<
T: Executable<Error = E> + Send + 'static + ?Sized,
E: Error + Send + 'static,
>(
pub fn exec_sched_multi<T: Executable<Error = E> + Send + 'static + ?Sized, E: Send + 'static>(
task_name: &'static str,
mut executable_vec: Vec<Box<T>>,
task_freq: Option<Duration>,
op_code: i32,
mut termination: Option<BusReader<()>>,
) -> JoinHandle<Result<OpResult, E>> {
) -> Result<JoinHandle<Result<OpResult, E>>, io::Error> {
let mut cycle_counts = vec![0; executable_vec.len()];
let mut removal_flags = vec![false; executable_vec.len()];
thread::spawn(move || loop {
if let Some(ref mut terminator) = termination {
match terminator.try_recv() {
Ok(_) | Err(TryRecvError::Disconnected) => {
removal_flags.iter_mut().for_each(|x| *x = true);
thread::Builder::new()
.name(String::from(task_name))
.spawn(move || loop {
if let Some(ref mut terminator) = termination {
match terminator.try_recv() {
Ok(_) | Err(TryRecvError::Disconnected) => {
removal_flags.iter_mut().for_each(|x| *x = true);
}
Err(TryRecvError::Empty) => (),
}
Err(TryRecvError::Empty) => (),
}
}
for (idx, executable) in executable_vec.iter_mut().enumerate() {
match executable.exec_type() {
ExecutionType::OneShot => {
executable.periodic_op(op_code)?;
removal_flags[idx] = true;
}
ExecutionType::Infinite => {
executable.periodic_op(op_code)?;
}
ExecutionType::Cycles(cycles) => {
executable.periodic_op(op_code)?;
cycle_counts[idx] += 1;
if cycle_counts[idx] == cycles {
for (idx, executable) in executable_vec.iter_mut().enumerate() {
match executable.exec_type() {
ExecutionType::OneShot => {
executable.periodic_op(op_code)?;
removal_flags[idx] = true;
}
ExecutionType::Infinite => {
executable.periodic_op(op_code)?;
}
ExecutionType::Cycles(cycles) => {
executable.periodic_op(op_code)?;
cycle_counts[idx] += 1;
if cycle_counts[idx] == cycles {
removal_flags[idx] = true;
}
}
}
}
}
let mut removal_iter = removal_flags.iter();
executable_vec.retain(|_| !*removal_iter.next().unwrap());
removal_iter = removal_flags.iter();
cycle_counts.retain(|_| !*removal_iter.next().unwrap());
removal_flags.retain(|&i| !i);
if executable_vec.is_empty() {
return Ok(OpResult::Ok);
}
let freq = task_freq.unwrap_or_else(|| panic!("No task frequency specified"));
thread::sleep(freq);
})
let mut removal_iter = removal_flags.iter();
executable_vec.retain(|_| !*removal_iter.next().unwrap());
removal_iter = removal_flags.iter();
cycle_counts.retain(|_| !*removal_iter.next().unwrap());
removal_flags.retain(|&i| !i);
if executable_vec.is_empty() {
return Ok(OpResult::Ok);
}
let freq = task_freq.unwrap_or_else(|| panic!("No task frequency specified"));
thread::sleep(freq);
})
}
#[cfg(test)]
@ -294,7 +297,8 @@ mod tests {
Some(Duration::from_millis(100)),
expected_op_code,
None,
);
)
.expect("thread creation failed");
let thread_res = jhandle.join().expect("One Shot Task failed");
assert!(thread_res.is_ok());
assert_eq!(thread_res.unwrap(), OpResult::Ok);
@ -319,7 +323,8 @@ mod tests {
Some(Duration::from_millis(100)),
op_code_inducing_failure,
None,
);
)
.expect("thread creation failed");
let thread_res = jhandle.join().expect("One Shot Task failed");
assert!(thread_res.is_err());
let error = thread_res.unwrap_err();
@ -356,11 +361,13 @@ mod tests {
assert_eq!(task.task_name(), ONE_SHOT_TASK_NAME);
}
let jhandle = exec_sched_multi(
"multi-task-name",
task_vec,
Some(Duration::from_millis(100)),
expected_op_code,
None,
);
)
.expect("thread creation failed");
let thread_res = jhandle.join().expect("One Shot Task failed");
assert!(thread_res.is_ok());
assert_eq!(thread_res.unwrap(), OpResult::Ok);
@ -386,7 +393,8 @@ mod tests {
Some(Duration::from_millis(100)),
expected_op_code,
None,
);
)
.expect("thread creation failed");
let thread_res = jh.join().expect("Cycles Task failed");
assert!(thread_res.is_ok());
let data = shared.lock().expect("Locking Mutex failed");
@ -418,11 +426,13 @@ mod tests {
let task_vec: Vec<Box<dyn Executable<Error = ExampleError>>> =
vec![one_shot_task, cycled_task_0, cycled_task_1];
let jh = exec_sched_multi(
"multi-task-name",
task_vec,
Some(Duration::from_millis(100)),
expected_op_code,
None,
);
)
.expect("thread creation failed");
let thread_res = jh.join().expect("Cycles Task failed");
assert!(thread_res.is_ok());
let data = shared.lock().expect("Locking Mutex failed");
@ -449,7 +459,8 @@ mod tests {
Some(Duration::from_millis(20)),
expected_op_code,
Some(terminator.add_rx()),
);
)
.expect("thread creation failed");
thread::sleep(Duration::from_millis(40));
terminator.broadcast(());
let thread_res = jh.join().expect("Periodic Task failed");
@ -485,11 +496,13 @@ mod tests {
let task_vec: Vec<Box<dyn Executable<Error = ExampleError>>> =
vec![cycled_task, periodic_task_0, periodic_task_1];
let jh = exec_sched_multi(
"multi-task-name",
task_vec,
Some(Duration::from_millis(20)),
expected_op_code,
Some(terminator.add_rx()),
);
)
.expect("thread creation failed");
thread::sleep(Duration::from_millis(60));
terminator.broadcast(());
let thread_res = jh.join().expect("Periodic Task failed");

View File

@ -318,10 +318,31 @@ mod alloc_mod {
/// [Clone].
#[cfg(feature = "alloc")]
#[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))]
pub trait EcssTmSender: EcssTmSenderCore + Downcast + DynClone {}
pub trait EcssTmSender: EcssTmSenderCore + Downcast + DynClone {
// Remove this once trait upcasting coercion has been implemented.
// Tracking issue: https://github.com/rust-lang/rust/issues/65991
fn upcast(&self) -> &dyn EcssTmSenderCore;
// Remove this once trait upcasting coercion has been implemented.
// Tracking issue: https://github.com/rust-lang/rust/issues/65991
fn upcast_mut(&mut self) -> &mut dyn EcssTmSenderCore;
}
/// Blanket implementation for all types which implement [EcssTmSenderCore] and are clonable.
impl<T> EcssTmSender for T where T: EcssTmSenderCore + Clone + 'static {}
impl<T> EcssTmSender for T
where
T: EcssTmSenderCore + Clone + 'static,
{
// Remove this once trait upcasting coercion has been implemented.
// Tracking issue: https://github.com/rust-lang/rust/issues/65991
fn upcast(&self) -> &dyn EcssTmSenderCore {
self
}
// Remove this once trait upcasting coercion has been implemented.
// Tracking issue: https://github.com/rust-lang/rust/issues/65991
fn upcast_mut(&mut self) -> &mut dyn EcssTmSenderCore {
self
}
}
dyn_clone::clone_trait_object!(EcssTmSender);
impl_downcast!(EcssTmSender);

View File

@ -26,3 +26,7 @@ path = "../satrs-core"
[dependencies.satrs-mib]
# version = "0.1.0-alpha.1"
path = "../satrs-mib"
[features]
dyn_tmtc = []
default = ["dyn_tmtc"]

View File

@ -3,10 +3,32 @@ sat-rs example
This crate contains an example application which simulates an on-board software.
It uses various components provided by the sat-rs framework to do this. As such, it shows how
a more complex real on-board software could be built from these components.
The application opens a UDP server on port 7301 to receive telecommands.
a more complex real on-board software could be built from these components. It is recommended to
read the dedicated
[example chapters](https://absatsw.irs.uni-stuttgart.de/projects/sat-rs/book/example.html) inside
the sat-rs book.
You can run the application using `cargo run`. The `simpleclient` binary target sends a
The application opens a UDP and a TCP server on port 7301 to receive telecommands.
You can run the application using `cargo run`.
# Features
The example has the `dyn_tmtc` feature which is enabled by default. With this feature enabled,
TMTC packets are exchanged using the heap as the backing memory instead of pre-allocated static
stores.
You can run the application without this feature using
```sh
cargo run --no-default-features
```
# Interacting with the sat-rs example
## Simple Client
The `simpleclient` binary target sends a
ping telecommand and then verifies the telemetry generated by the example application.
It can be run like this:
@ -17,7 +39,7 @@ cargo run --bin simpleclient
This repository also contains a more complex client using the
[Python tmtccmd](https://github.com/robamu-org/tmtccmd) module.
# <a id="tmtccmd"></a> Using the tmtccmd Python client
## <a id="tmtccmd"></a> Using the tmtccmd Python client
The python client requires a valid installation of the
[tmtccmd package](https://github.com/robamu-org/tmtccmd).
@ -46,31 +68,7 @@ as Python code. For example, you can use the following command to send a ping li
the `simpleclient`:
```sh
./main.py -s test -o ping
./main.py -p /test/ping
```
You can also simply call the script without any arguments to view a list of services (`-s` flag)
and corresponding op codes (`-o` flag) for each service.
# Structure of the example project
The example project contains components which could also be expected to be part of a production
On-Board Software.
1. A UDP and TCP server to receive telecommands and poll telemetry from. This might be an optional
component for an OBSW which is only used during the development phase on ground. The TCP
server parses space packets by using the CCSDS space packet ID as the packet start delimiter.
2. A PUS service stack which exposes some functionality conformant with the ECSS PUS service. This
currently includes the following services:
- Service 1 for telecommand verification.
- Service 3 for housekeeping telemetry handling.
- Service 5 for management and downlink of on-board events.
- Service 8 for handling on-board actions.
- Service 11 for scheduling telecommands to be released at a specific time.
- Service 17 for test purposes (pings)
3. An event manager component which handles the event IPC mechanism.
4. A TC source component which demultiplexes and routes telecommands based on parameters like
packet APID or PUS service and subservice type.
5. A TM sink sink component which is the target of all sent telemetry and sends it to downlink
handlers like the UDP and TCP server.
6. An AOCS example task which can also process some PUS commands.
You can also simply call the script without any arguments to view the command tree.

117
satrs-example/src/acs.rs Normal file
View File

@ -0,0 +1,117 @@
use std::sync::mpsc::{self, TryRecvError};
use log::{info, warn};
use satrs_core::pus::verification::VerificationReporterWithSender;
use satrs_core::pus::{EcssTmSender, PusTmWrapper};
use satrs_core::spacepackets::ecss::hk::Subservice as HkSubservice;
use satrs_core::{
hk::HkRequest,
spacepackets::{
ecss::tm::{PusTmCreator, PusTmSecondaryHeader},
time::cds::{DaysLen16Bits, TimeProvider},
SequenceFlags, SpHeader,
},
};
use satrs_example::config::{RequestTargetId, PUS_APID};
use crate::{
hk::{AcsHkIds, HkUniqueId},
requests::{Request, RequestWithToken},
update_time,
};
pub struct AcsTask {
timestamp: [u8; 7],
time_provider: TimeProvider<DaysLen16Bits>,
verif_reporter: VerificationReporterWithSender,
tm_sender: Box<dyn EcssTmSender>,
request_rx: mpsc::Receiver<RequestWithToken>,
}
impl AcsTask {
pub fn new(
tm_sender: impl EcssTmSender,
request_rx: mpsc::Receiver<RequestWithToken>,
verif_reporter: VerificationReporterWithSender,
) -> Self {
Self {
timestamp: [0; 7],
time_provider: TimeProvider::new_with_u16_days(0, 0),
verif_reporter,
tm_sender: Box::new(tm_sender),
request_rx,
}
}
fn handle_hk_request(&mut self, target_id: u32, unique_id: u32) {
assert_eq!(target_id, RequestTargetId::AcsSubsystem as u32);
if unique_id == AcsHkIds::TestMgmSet as u32 {
let mut sp_header = SpHeader::tm(PUS_APID, SequenceFlags::Unsegmented, 0, 0).unwrap();
let sec_header = PusTmSecondaryHeader::new_simple(
3,
HkSubservice::TmHkPacket as u8,
&self.timestamp,
);
let mut buf: [u8; 8] = [0; 8];
let hk_id = HkUniqueId::new(target_id, unique_id);
hk_id.write_to_be_bytes(&mut buf).unwrap();
let pus_tm = PusTmCreator::new(&mut sp_header, sec_header, &buf, true);
self.tm_sender
.send_tm(PusTmWrapper::Direct(pus_tm))
.expect("Sending HK TM failed");
}
// TODO: Verification failure for invalid unique IDs.
}
pub fn try_reading_one_request(&mut self) -> bool {
match self.request_rx.try_recv() {
Ok(request) => {
info!(
"ACS thread: Received HK request {:?}",
request.targeted_request
);
match request.targeted_request.request {
Request::Hk(hk_req) => match hk_req {
HkRequest::OneShot(unique_id) => self.handle_hk_request(
request.targeted_request.target_id_with_apid.target_id(),
unique_id,
),
HkRequest::Enable(_) => {}
HkRequest::Disable(_) => {}
HkRequest::ModifyCollectionInterval(_, _) => {}
},
Request::Mode(_mode_req) => {
warn!("mode request handling not implemented yet")
}
Request::Action(_action_req) => {
warn!("action request handling not implemented yet")
}
}
let started_token = self
.verif_reporter
.start_success(request.token, Some(&self.timestamp))
.expect("Sending start success failed");
self.verif_reporter
.completion_success(started_token, Some(&self.timestamp))
.expect("Sending completion success failed");
true
}
Err(e) => match e {
TryRecvError::Empty => false,
TryRecvError::Disconnected => {
warn!("ACS thread: Message Queue TX disconnected!");
false
}
},
}
}
pub fn periodic_operation(&mut self) {
update_time(&mut self.time_provider, &mut self.timestamp);
loop {
if !self.try_reading_one_request() {
break;
}
}
}
}

View File

@ -5,7 +5,7 @@ use satrs_core::{
spacepackets::ecss::{PusPacket, WritablePusPacket},
spacepackets::SpHeader,
};
use satrs_example::{OBSW_SERVER_ADDR, SERVER_PORT};
use satrs_example::config::{OBSW_SERVER_ADDR, SERVER_PORT};
use std::net::{IpAddr, SocketAddr, UdpSocket};
use std::time::Duration;

View File

@ -1,15 +1,22 @@
use crate::tmtc::{MpscStoreAndSendError, PusTcSource};
use satrs_core::pus::ReceivesEcssPusTc;
use satrs_core::spacepackets::{CcsdsPacket, SpHeader};
use satrs_core::tmtc::{CcsdsPacketHandler, ReceivesCcsdsTc};
use satrs_example::PUS_APID;
use satrs_example::config::PUS_APID;
#[derive(Clone)]
pub struct CcsdsReceiver {
pub tc_source: PusTcSource,
pub struct CcsdsReceiver<
TcSource: ReceivesCcsdsTc<Error = E> + ReceivesEcssPusTc<Error = E> + Clone,
E,
> {
pub tc_source: TcSource,
}
impl CcsdsPacketHandler for CcsdsReceiver {
type Error = MpscStoreAndSendError;
impl<
TcSource: ReceivesCcsdsTc<Error = E> + ReceivesEcssPusTc<Error = E> + Clone + 'static,
E: 'static,
> CcsdsPacketHandler for CcsdsReceiver<TcSource, E>
{
type Error = E;
fn valid_apids(&self) -> &'static [u16] {
&[PUS_APID]

142
satrs-example/src/config.rs Normal file
View File

@ -0,0 +1,142 @@
use satrs_core::res_code::ResultU16;
use satrs_mib::res_code::ResultU16Info;
use satrs_mib::resultcode;
use std::net::Ipv4Addr;
use num_enum::{IntoPrimitive, TryFromPrimitive};
use satrs_core::{
events::{EventU32TypedSev, SeverityInfo},
pool::{StaticMemoryPool, StaticPoolConfig},
};
pub const PUS_APID: u16 = 0x02;
#[derive(Copy, Clone, PartialEq, Eq, Debug, TryFromPrimitive, IntoPrimitive)]
#[repr(u8)]
pub enum CustomPusServiceId {
Mode = 200,
Health = 201,
}
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
pub enum RequestTargetId {
AcsSubsystem = 1,
}
pub const AOCS_APID: u16 = 1;
#[derive(Debug)]
pub enum GroupId {
Tmtc = 0,
Hk = 1,
}
pub const OBSW_SERVER_ADDR: Ipv4Addr = Ipv4Addr::UNSPECIFIED;
pub const SERVER_PORT: u16 = 7301;
pub const TEST_EVENT: EventU32TypedSev<SeverityInfo> =
EventU32TypedSev::<SeverityInfo>::const_new(0, 0);
pub mod tmtc_err {
use super::*;
#[resultcode]
pub const INVALID_PUS_SERVICE: ResultU16 = ResultU16::const_new(GroupId::Tmtc as u8, 0);
#[resultcode]
pub const INVALID_PUS_SUBSERVICE: ResultU16 = ResultU16::const_new(GroupId::Tmtc as u8, 1);
#[resultcode]
pub const PUS_SERVICE_NOT_IMPLEMENTED: ResultU16 = ResultU16::const_new(GroupId::Tmtc as u8, 2);
#[resultcode]
pub const UNKNOWN_TARGET_ID: ResultU16 = ResultU16::const_new(GroupId::Tmtc as u8, 3);
#[resultcode(
info = "Not enough data inside the TC application data field. Optionally includes: \
8 bytes of failure data containing 2 failure parameters, \
P1 (u32 big endian): Expected data length, P2: Found data length"
)]
pub const NOT_ENOUGH_APP_DATA: ResultU16 = ResultU16::const_new(GroupId::Tmtc as u8, 2);
pub const TMTC_RESULTS: &[ResultU16Info] = &[
INVALID_PUS_SERVICE_EXT,
INVALID_PUS_SUBSERVICE_EXT,
NOT_ENOUGH_APP_DATA_EXT,
];
}
pub mod hk_err {
use super::*;
#[resultcode]
pub const TARGET_ID_MISSING: ResultU16 = ResultU16::const_new(GroupId::Hk as u8, 0);
#[resultcode]
pub const UNIQUE_ID_MISSING: ResultU16 = ResultU16::const_new(GroupId::Hk as u8, 1);
#[resultcode]
pub const UNKNOWN_TARGET_ID: ResultU16 = ResultU16::const_new(GroupId::Hk as u8, 2);
#[resultcode]
pub const COLLECTION_INTERVAL_MISSING: ResultU16 = ResultU16::const_new(GroupId::Hk as u8, 3);
}
#[allow(clippy::enum_variant_names)]
#[derive(Copy, Clone, PartialEq, Eq)]
pub enum TmSenderId {
PusVerification = 0,
PusTest = 1,
PusEvent = 2,
PusHk = 3,
PusAction = 4,
PusSched = 5,
AllEvents = 6,
AcsSubsystem = 7,
}
#[derive(Copy, Clone, PartialEq, Eq)]
pub enum TcReceiverId {
PusTest = 1,
PusEvent = 2,
PusHk = 3,
PusAction = 4,
PusSched = 5,
}
pub mod pool {
use super::*;
pub fn create_static_pools() -> (StaticMemoryPool, StaticMemoryPool) {
(
StaticMemoryPool::new(StaticPoolConfig::new(vec![
(30, 32),
(15, 64),
(15, 128),
(15, 256),
(15, 1024),
(15, 2048),
])),
StaticMemoryPool::new(StaticPoolConfig::new(vec![
(30, 32),
(15, 64),
(15, 128),
(15, 256),
(15, 1024),
(15, 2048),
])),
)
}
pub fn create_sched_tc_pool() -> StaticMemoryPool {
StaticMemoryPool::new(StaticPoolConfig::new(vec![
(30, 32),
(15, 64),
(15, 128),
(15, 256),
(15, 1024),
(15, 2048),
]))
}
}
pub mod tasks {
pub const FREQ_MS_UDP_TMTC: u64 = 200;
pub const FREQ_MS_EVENT_HANDLING: u64 = 400;
pub const FREQ_MS_AOCS: u64 = 500;
pub const FREQ_MS_PUS_STACK: u64 = 200;
}

187
satrs-example/src/events.rs Normal file
View File

@ -0,0 +1,187 @@
use std::sync::mpsc::{self, SendError};
use satrs_core::{
event_man::{
EventManager, EventManagerWithMpscQueue, MpscEventReceiver, MpscEventU32SendProvider,
SendEventProvider,
},
events::EventU32,
params::Params,
pus::{
event_man::{
DefaultPusMgmtBackendProvider, EventReporter, EventRequest, EventRequestWithToken,
PusEventDispatcher,
},
verification::{TcStateStarted, VerificationReporterWithSender, VerificationToken},
EcssTmSender,
},
spacepackets::time::cds::{self, TimeProvider},
};
use satrs_example::config::PUS_APID;
use crate::update_time;
pub type MpscEventManager = EventManager<SendError<(EventU32, Option<Params>)>>;
pub struct PusEventHandler {
event_request_rx: mpsc::Receiver<EventRequestWithToken>,
pus_event_dispatcher: PusEventDispatcher<(), EventU32>,
pus_event_man_rx: mpsc::Receiver<(EventU32, Option<Params>)>,
tm_sender: Box<dyn EcssTmSender>,
time_provider: TimeProvider,
timestamp: [u8; 7],
verif_handler: VerificationReporterWithSender,
}
/*
*/
impl PusEventHandler {
pub fn new(
verif_handler: VerificationReporterWithSender,
event_manager: &mut MpscEventManager,
event_request_rx: mpsc::Receiver<EventRequestWithToken>,
tm_sender: impl EcssTmSender,
) -> Self {
let (pus_event_man_tx, pus_event_man_rx) = mpsc::channel();
// All events sent to the manager are routed to the PUS event manager, which generates PUS event
// telemetry for each event.
let event_reporter = EventReporter::new(PUS_APID, 128).unwrap();
let pus_tm_backend = DefaultPusMgmtBackendProvider::<EventU32>::default();
let pus_event_dispatcher =
PusEventDispatcher::new(event_reporter, Box::new(pus_tm_backend));
let pus_event_man_send_provider = MpscEventU32SendProvider::new(1, pus_event_man_tx);
event_manager.subscribe_all(pus_event_man_send_provider.id());
event_manager.add_sender(pus_event_man_send_provider);
Self {
event_request_rx,
pus_event_dispatcher,
pus_event_man_rx,
time_provider: cds::TimeProvider::new_with_u16_days(0, 0),
timestamp: [0; 7],
verif_handler,
tm_sender: Box::new(tm_sender),
}
}
pub fn handle_event_requests(&mut self) {
let report_completion = |event_req: EventRequestWithToken, timestamp: &[u8]| {
let started_token: VerificationToken<TcStateStarted> = event_req
.token
.try_into()
.expect("expected start verification token");
self.verif_handler
.completion_success(started_token, Some(timestamp))
.expect("Sending completion success failed");
};
// handle event requests
if let Ok(event_req) = self.event_request_rx.try_recv() {
match event_req.request {
EventRequest::Enable(event) => {
self.pus_event_dispatcher
.enable_tm_for_event(&event)
.expect("Enabling TM failed");
update_time(&mut self.time_provider, &mut self.timestamp);
report_completion(event_req, &self.timestamp);
}
EventRequest::Disable(event) => {
self.pus_event_dispatcher
.disable_tm_for_event(&event)
.expect("Disabling TM failed");
update_time(&mut self.time_provider, &mut self.timestamp);
report_completion(event_req, &self.timestamp);
}
}
}
}
pub fn generate_pus_event_tm(&mut self) {
// Perform the generation of PUS event packets
if let Ok((event, _param)) = self.pus_event_man_rx.try_recv() {
update_time(&mut self.time_provider, &mut self.timestamp);
self.pus_event_dispatcher
.generate_pus_event_tm_generic(
self.tm_sender.upcast_mut(),
&self.timestamp,
event,
None,
)
.expect("Sending TM as event failed");
}
}
}
pub struct EventManagerWrapper {
event_manager: MpscEventManager,
event_sender: mpsc::Sender<(EventU32, Option<Params>)>,
}
impl EventManagerWrapper {
pub fn new() -> Self {
// The sender handle is the primary sender handle for all components which want to create events.
// The event manager will receive the RX handle to receive all the events.
let (event_sender, event_man_rx) = mpsc::channel();
let event_recv = MpscEventReceiver::<EventU32>::new(event_man_rx);
Self {
event_manager: EventManagerWithMpscQueue::new(Box::new(event_recv)),
event_sender,
}
}
pub fn clone_event_sender(&self) -> mpsc::Sender<(EventU32, Option<Params>)> {
self.event_sender.clone()
}
pub fn event_manager(&mut self) -> &mut MpscEventManager {
&mut self.event_manager
}
pub fn try_event_routing(&mut self) {
// Perform the event routing.
self.event_manager
.try_event_handling()
.expect("event handling failed");
}
}
pub struct EventHandler {
pub event_man_wrapper: EventManagerWrapper,
pub pus_event_handler: PusEventHandler,
}
impl EventHandler {
pub fn new(
tm_sender: impl EcssTmSender,
verif_handler: VerificationReporterWithSender,
event_request_rx: mpsc::Receiver<EventRequestWithToken>,
) -> Self {
let mut event_man_wrapper = EventManagerWrapper::new();
let pus_event_handler = PusEventHandler::new(
verif_handler,
event_man_wrapper.event_manager(),
event_request_rx,
tm_sender,
);
Self {
event_man_wrapper,
pus_event_handler,
}
}
pub fn clone_event_sender(&self) -> mpsc::Sender<(EventU32, Option<Params>)> {
self.event_man_wrapper.clone_event_sender()
}
#[allow(dead_code)]
pub fn event_manager(&mut self) -> &mut MpscEventManager {
self.event_man_wrapper.event_manager()
}
pub fn periodic_operation(&mut self) {
self.pus_event_handler.handle_event_requests();
self.event_man_wrapper.try_event_routing();
self.pus_event_handler.generate_pus_event_tm();
}
}

View File

@ -1,17 +1,12 @@
use derive_new::new;
use num_enum::{IntoPrimitive, TryFromPrimitive};
use satrs_core::events::{EventU32TypedSev, SeverityInfo};
use satrs_core::objects::ObjectId;
use satrs_core::spacepackets::ecss::tc::IsPusTelecommand;
use satrs_core::spacepackets::ecss::PusPacket;
use satrs_core::spacepackets::{ByteConversionError, CcsdsPacket};
use satrs_core::tmtc::TargetId;
use std::fmt;
use std::net::Ipv4Addr;
use thiserror::Error;
use satrs_mib::res_code::{ResultU16, ResultU16Info};
use satrs_mib::resultcode;
pub mod config;
pub type Apid = u16;
@ -62,96 +57,3 @@ impl TargetIdWithApid {
})
}
}
pub const PUS_APID: u16 = 0x02;
#[derive(Copy, Clone, PartialEq, Eq, Debug, TryFromPrimitive, IntoPrimitive)]
#[repr(u8)]
pub enum CustomPusServiceId {
Mode = 200,
Health = 201,
}
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
pub enum RequestTargetId {
AcsSubsystem = 1,
}
pub const AOCS_APID: u16 = 1;
pub const ACS_OBJECT_ID: ObjectId = ObjectId {
id: RequestTargetId::AcsSubsystem as u32,
name: "ACS_SUBSYSTEM",
};
#[derive(Debug)]
pub enum GroupId {
Tmtc = 0,
Hk = 1,
}
pub const OBSW_SERVER_ADDR: Ipv4Addr = Ipv4Addr::UNSPECIFIED;
pub const SERVER_PORT: u16 = 7301;
pub const TEST_EVENT: EventU32TypedSev<SeverityInfo> =
EventU32TypedSev::<SeverityInfo>::const_new(0, 0);
pub mod tmtc_err {
use super::*;
#[resultcode]
pub const INVALID_PUS_SERVICE: ResultU16 = ResultU16::const_new(GroupId::Tmtc as u8, 0);
#[resultcode]
pub const INVALID_PUS_SUBSERVICE: ResultU16 = ResultU16::const_new(GroupId::Tmtc as u8, 1);
#[resultcode]
pub const PUS_SERVICE_NOT_IMPLEMENTED: ResultU16 = ResultU16::const_new(GroupId::Tmtc as u8, 2);
#[resultcode]
pub const UNKNOWN_TARGET_ID: ResultU16 = ResultU16::const_new(GroupId::Tmtc as u8, 3);
#[resultcode(
info = "Not enough data inside the TC application data field. Optionally includes: \
8 bytes of failure data containing 2 failure parameters, \
P1 (u32 big endian): Expected data length, P2: Found data length"
)]
pub const NOT_ENOUGH_APP_DATA: ResultU16 = ResultU16::const_new(GroupId::Tmtc as u8, 2);
pub const TMTC_RESULTS: &[ResultU16Info] = &[
INVALID_PUS_SERVICE_EXT,
INVALID_PUS_SUBSERVICE_EXT,
NOT_ENOUGH_APP_DATA_EXT,
];
}
pub mod hk_err {
use super::*;
#[resultcode]
pub const TARGET_ID_MISSING: ResultU16 = ResultU16::const_new(GroupId::Hk as u8, 0);
#[resultcode]
pub const UNIQUE_ID_MISSING: ResultU16 = ResultU16::const_new(GroupId::Hk as u8, 1);
#[resultcode]
pub const UNKNOWN_TARGET_ID: ResultU16 = ResultU16::const_new(GroupId::Hk as u8, 2);
#[resultcode]
pub const COLLECTION_INTERVAL_MISSING: ResultU16 = ResultU16::const_new(GroupId::Hk as u8, 3);
}
#[allow(clippy::enum_variant_names)]
#[derive(Copy, Clone, PartialEq, Eq)]
pub enum TmSenderId {
PusVerification = 0,
PusTest = 1,
PusEvent = 2,
PusHk = 3,
PusAction = 4,
PusSched = 5,
AllEvents = 6,
}
#[derive(Copy, Clone, PartialEq, Eq)]
pub enum TcReceiverId {
PusTest = 1,
PusEvent = 2,
PusHk = 3,
PusAction = 4,
PusSched = 5,
}

View File

@ -1,152 +1,95 @@
mod acs;
mod ccsds;
mod events;
mod hk;
mod logger;
mod pus;
mod requests;
mod tcp;
mod tm_funnel;
mod tmtc;
mod udp;
use log::{info, warn};
use crate::events::EventHandler;
use crate::pus::stack::PusStack;
use crate::tm_funnel::{TmFunnelDynamic, TmFunnelStatic};
use log::info;
use pus::test::create_test_service_dynamic;
use satrs_core::hal::std::tcp_server::ServerConfig;
use satrs_core::hal::std::udp_server::UdpTcServer;
use satrs_example::config::pool::{create_sched_tc_pool, create_static_pools};
use satrs_example::config::tasks::{
FREQ_MS_AOCS, FREQ_MS_EVENT_HANDLING, FREQ_MS_PUS_STACK, FREQ_MS_UDP_TMTC,
};
use satrs_example::config::{RequestTargetId, TmSenderId, OBSW_SERVER_ADDR, PUS_APID, SERVER_PORT};
use tmtc::PusTcSourceProviderDynamic;
use udp::DynamicUdpTmHandler;
use crate::acs::AcsTask;
use crate::ccsds::CcsdsReceiver;
use crate::hk::{AcsHkIds, HkUniqueId};
use crate::logger::setup_logger;
use crate::pus::action::{Pus8Wrapper, PusService8ActionHandler};
use crate::pus::event::Pus5Wrapper;
use crate::pus::hk::{Pus3Wrapper, PusService3HkHandler};
use crate::pus::scheduler::Pus11Wrapper;
use crate::pus::test::Service17CustomWrapper;
use crate::pus::action::{create_action_service_dynamic, create_action_service_static};
use crate::pus::event::{create_event_service_dynamic, create_event_service_static};
use crate::pus::hk::{create_hk_service_dynamic, create_hk_service_static};
use crate::pus::scheduler::{create_scheduler_service_dynamic, create_scheduler_service_static};
use crate::pus::test::create_test_service_static;
use crate::pus::{PusReceiver, PusTcMpscRouter};
use crate::requests::{Request, RequestWithToken};
use crate::requests::RequestWithToken;
use crate::tcp::{SyncTcpTmSource, TcpTask};
use crate::tmtc::{PusTcSource, TcArgs, TcStore, TmArgs, TmFunnel, TmtcTask};
use crate::udp::UdpTmtcServer;
use satrs_core::event_man::{
EventManagerWithMpscQueue, MpscEventReceiver, MpscEventU32SendProvider, SendEventProvider,
};
use satrs_core::events::EventU32;
use satrs_core::hk::HkRequest;
use satrs_core::pool::{PoolProviderMemInPlace, StaticMemoryPool, StaticPoolConfig};
use satrs_core::pus::event_man::{
DefaultPusMgmtBackendProvider, EventReporter, EventRequest, EventRequestWithToken,
PusEventDispatcher,
};
use satrs_core::pus::event_srv::PusService5EventHandler;
use satrs_core::pus::hk::Subservice as HkSubservice;
use satrs_core::pus::scheduler::PusScheduler;
use satrs_core::pus::scheduler_srv::PusService11SchedHandler;
use satrs_core::pus::test::PusService17TestHandler;
use satrs_core::pus::verification::{
TcStateStarted, VerificationReporterCfg, VerificationReporterWithSender, VerificationToken,
};
use satrs_core::pus::{
EcssTcInSharedStoreConverter, MpscTcReceiver, MpscTmInStoreSender, PusServiceHelper,
};
use satrs_core::seq_count::{CcsdsSimpleSeqCountProvider, SequenceCountProviderCore};
use satrs_core::spacepackets::ecss::tm::{PusTmCreator, PusTmZeroCopyWriter};
use satrs_core::spacepackets::{
ecss::tm::PusTmSecondaryHeader, time::cds::TimeProvider, time::TimeWriter, SequenceFlags,
SpHeader,
use crate::tmtc::{
PusTcSourceProviderSharedPool, SharedTcPool, TcArgs, TmArgs, TmtcTaskDynamic, TmtcTaskStatic,
};
use crate::udp::{StaticUdpTmHandler, UdpTmtcServer};
use satrs_core::pus::event_man::EventRequestWithToken;
use satrs_core::pus::verification::{VerificationReporterCfg, VerificationReporterWithSender};
use satrs_core::pus::{EcssTmSender, MpscTmAsVecSender, MpscTmInStoreSender};
use satrs_core::spacepackets::{time::cds::TimeProvider, time::TimeWriter};
use satrs_core::tmtc::tm_helper::SharedTmStore;
use satrs_core::tmtc::{CcsdsDistributor, TargetId};
use satrs_core::ChannelId;
use satrs_example::{
RequestTargetId, TargetIdWithApid, TcReceiverId, TmSenderId, OBSW_SERVER_ADDR, PUS_APID,
SERVER_PORT,
};
use satrs_example::TargetIdWithApid;
use std::collections::HashMap;
use std::net::{IpAddr, SocketAddr};
use std::sync::mpsc::{channel, TryRecvError};
use std::sync::mpsc::{self, channel};
use std::sync::{Arc, RwLock};
use std::thread;
use std::time::Duration;
fn main() {
setup_logger().expect("setting up logging with fern failed");
println!("Running OBSW example");
let tm_pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![
(30, 32),
(15, 64),
(15, 128),
(15, 256),
(15, 1024),
(15, 2048),
]));
fn create_verification_reporter(verif_sender: impl EcssTmSender) -> VerificationReporterWithSender {
let verif_cfg = VerificationReporterCfg::new(PUS_APID, 1, 2, 8).unwrap();
// Every software component which needs to generate verification telemetry, gets a cloned
// verification reporter.
VerificationReporterWithSender::new(&verif_cfg, Box::new(verif_sender))
}
#[allow(dead_code)]
fn static_tmtc_pool_main() {
let (tm_pool, tc_pool) = create_static_pools();
let shared_tm_store = SharedTmStore::new(tm_pool);
let tm_store_event = shared_tm_store.clone();
let tc_pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![
(30, 32),
(15, 64),
(15, 128),
(15, 256),
(15, 1024),
(15, 2048),
]));
let tc_store = TcStore {
let shared_tc_pool = SharedTcPool {
pool: Arc::new(RwLock::new(tc_pool)),
};
let sched_tc_pool = StaticMemoryPool::new(StaticPoolConfig::new(vec![
(30, 32),
(15, 64),
(15, 128),
(15, 256),
(15, 1024),
(15, 2048),
]));
let seq_count_provider = CcsdsSimpleSeqCountProvider::new();
let mut msg_counter_map: HashMap<u8, u16> = HashMap::new();
let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT);
let (tc_source_tx, tc_source_rx) = channel();
let (tm_funnel_tx, tm_funnel_rx) = channel();
let (tm_server_tx, tm_server_rx) = channel();
let verif_sender = MpscTmInStoreSender::new(
// Every software component which needs to generate verification telemetry, receives a cloned
// verification reporter.
let verif_reporter = create_verification_reporter(MpscTmInStoreSender::new(
TmSenderId::PusVerification as ChannelId,
"verif_sender",
shared_tm_store.clone(),
tm_funnel_tx.clone(),
);
let verif_cfg = VerificationReporterCfg::new(PUS_APID, 1, 2, 8).unwrap();
// Every software component which needs to generate verification telemetry, gets a cloned
// verification reporter.
let verif_reporter = VerificationReporterWithSender::new(&verif_cfg, Box::new(verif_sender));
let reporter_event_handler = verif_reporter.clone();
let reporter_aocs = verif_reporter.clone();
// Create event handling components
// These sender handles are used to send event requests, for example to enable or disable
// certain events
let (event_request_tx, event_request_rx) = channel::<EventRequestWithToken>();
// The sender handle is the primary sender handle for all components which want to create events.
// The event manager will receive the RX handle to receive all the events.
let (event_sender, event_man_rx) = channel();
let event_recv = MpscEventReceiver::<EventU32>::new(event_man_rx);
let test_srv_event_sender = event_sender;
let mut event_man = EventManagerWithMpscQueue::new(Box::new(event_recv));
// All events sent to the manager are routed to the PUS event manager, which generates PUS event
// telemetry for each event.
let event_reporter = EventReporter::new(PUS_APID, 128).unwrap();
let pus_tm_backend = DefaultPusMgmtBackendProvider::<EventU32>::default();
let mut pus_event_dispatcher =
PusEventDispatcher::new(event_reporter, Box::new(pus_tm_backend));
let (pus_event_man_tx, pus_event_man_rx) = channel();
let pus_event_man_send_provider = MpscEventU32SendProvider::new(1, pus_event_man_tx);
event_man.subscribe_all(pus_event_man_send_provider.id());
event_man.add_sender(pus_event_man_send_provider);
));
let acs_target_id = TargetIdWithApid::new(PUS_APID, RequestTargetId::AcsSubsystem as TargetId);
let (acs_thread_tx, acs_thread_rx) = channel::<RequestWithToken>();
// Some request are targetable. This map is used to retrieve sender handles based on a target ID.
let mut request_map = HashMap::new();
let (acs_thread_tx, acs_thread_rx) = channel::<RequestWithToken>();
let target_apid = TargetIdWithApid::new(PUS_APID, RequestTargetId::AcsSubsystem as TargetId);
request_map.insert(target_apid, acs_thread_tx);
request_map.insert(acs_target_id, acs_thread_tx);
let tc_source_wrapper = PusTcSource {
tc_store: tc_store.clone(),
let tc_source_wrapper = PusTcSourceProviderSharedPool {
shared_pool: shared_tc_pool.clone(),
tc_source: tc_source_tx,
};
@ -161,8 +104,23 @@ fn main() {
tm_udp_server_rx: tm_server_rx,
};
let aocs_tm_funnel = tm_funnel_tx.clone();
let aocs_tm_store = shared_tm_store.clone();
// Create event handling components
// These sender handles are used to send event requests, for example to enable or disable
// certain events.
let (event_request_tx, event_request_rx) = mpsc::channel::<EventRequestWithToken>();
// The event task is the core handler to perform the event routing and TM handling as specified
// in the sat-rs documentation.
let mut event_handler = EventHandler::new(
MpscTmInStoreSender::new(
TmSenderId::AllEvents as ChannelId,
"ALL_EVENTS_TX",
shared_tm_store.clone(),
tm_funnel_tx.clone(),
),
verif_reporter.clone(),
event_request_rx,
);
let (pus_test_tx, pus_test_rx) = channel();
let (pus_event_tx, pus_event_rx) = channel();
@ -176,134 +134,102 @@ fn main() {
hk_service_receiver: pus_hk_tx,
action_service_receiver: pus_action_tx,
};
let test_srv_tm_sender = MpscTmInStoreSender::new(
TmSenderId::PusTest as ChannelId,
"PUS_17_TM_SENDER",
let pus_test_service = create_test_service_static(
shared_tm_store.clone(),
tm_funnel_tx.clone(),
);
let test_srv_receiver = MpscTcReceiver::new(
TcReceiverId::PusTest as ChannelId,
"PUS_17_TC_RECV",
verif_reporter.clone(),
shared_tc_pool.pool.clone(),
event_handler.clone_event_sender(),
pus_test_rx,
);
let pus17_handler = PusService17TestHandler::new(PusServiceHelper::new(
Box::new(test_srv_receiver),
Box::new(test_srv_tm_sender),
PUS_APID,
let pus_scheduler_service = create_scheduler_service_static(
shared_tm_store.clone(),
tm_funnel_tx.clone(),
verif_reporter.clone(),
EcssTcInSharedStoreConverter::new(tc_store.pool.clone(), 2048),
));
let mut pus_17_wrapper = Service17CustomWrapper {
pus17_handler,
test_srv_event_sender,
};
let sched_srv_tm_sender = MpscTmInStoreSender::new(
TmSenderId::PusSched as ChannelId,
"PUS_11_TM_SENDER",
shared_tm_store.clone(),
tm_funnel_tx.clone(),
);
let sched_srv_receiver = MpscTcReceiver::new(
TcReceiverId::PusSched as ChannelId,
"PUS_11_TC_RECV",
pus_sched_rx,
);
let scheduler = PusScheduler::new_with_current_init_time(Duration::from_secs(5))
.expect("Creating PUS Scheduler failed");
let pus_11_handler = PusService11SchedHandler::new(
PusServiceHelper::new(
Box::new(sched_srv_receiver),
Box::new(sched_srv_tm_sender),
PUS_APID,
verif_reporter.clone(),
EcssTcInSharedStoreConverter::new(tc_store.pool.clone(), 2048),
),
scheduler,
);
let mut pus_11_wrapper = Pus11Wrapper {
pus_11_handler,
sched_tc_pool,
tc_source_wrapper,
};
let event_srv_tm_sender = MpscTmInStoreSender::new(
TmSenderId::PusEvent as ChannelId,
"PUS_5_TM_SENDER",
pus_sched_rx,
create_sched_tc_pool(),
);
let pus_event_service = create_event_service_static(
shared_tm_store.clone(),
tm_funnel_tx.clone(),
);
let event_srv_receiver = MpscTcReceiver::new(
TcReceiverId::PusEvent as ChannelId,
"PUS_5_TC_RECV",
verif_reporter.clone(),
shared_tc_pool.pool.clone(),
pus_event_rx,
);
let pus_5_handler = PusService5EventHandler::new(
PusServiceHelper::new(
Box::new(event_srv_receiver),
Box::new(event_srv_tm_sender),
PUS_APID,
verif_reporter.clone(),
EcssTcInSharedStoreConverter::new(tc_store.pool.clone(), 2048),
),
event_request_tx,
);
let mut pus_5_wrapper = Pus5Wrapper { pus_5_handler };
let action_srv_tm_sender = MpscTmInStoreSender::new(
TmSenderId::PusAction as ChannelId,
"PUS_8_TM_SENDER",
let pus_action_service = create_action_service_static(
shared_tm_store.clone(),
tm_funnel_tx.clone(),
);
let action_srv_receiver = MpscTcReceiver::new(
TcReceiverId::PusAction as ChannelId,
"PUS_8_TC_RECV",
pus_action_rx,
);
let pus_8_handler = PusService8ActionHandler::new(
Box::new(action_srv_receiver),
Box::new(action_srv_tm_sender),
PUS_APID,
verif_reporter.clone(),
EcssTcInSharedStoreConverter::new(tc_store.pool.clone(), 2048),
shared_tc_pool.pool.clone(),
pus_action_rx,
request_map.clone(),
);
let mut pus_8_wrapper = Pus8Wrapper { pus_8_handler };
let hk_srv_tm_sender = MpscTmInStoreSender::new(
TmSenderId::PusHk as ChannelId,
"PUS_3_TM_SENDER",
let pus_hk_service = create_hk_service_static(
shared_tm_store.clone(),
tm_funnel_tx.clone(),
);
let hk_srv_receiver =
MpscTcReceiver::new(TcReceiverId::PusHk as ChannelId, "PUS_8_TC_RECV", pus_hk_rx);
let pus_3_handler = PusService3HkHandler::new(
Box::new(hk_srv_receiver),
Box::new(hk_srv_tm_sender),
PUS_APID,
verif_reporter.clone(),
EcssTcInSharedStoreConverter::new(tc_store.pool.clone(), 2048),
shared_tc_pool.pool.clone(),
pus_hk_rx,
request_map,
);
let mut pus_3_wrapper = Pus3Wrapper { pus_3_handler };
let mut pus_stack = PusStack::new(
pus_hk_service,
pus_event_service,
pus_action_service,
pus_scheduler_service,
pus_test_service,
);
let ccsds_receiver = CcsdsReceiver {
tc_source: tc_args.tc_source.clone(),
};
let mut tmtc_task = TmtcTask::new(tc_args, PusReceiver::new(verif_reporter, pus_router));
let mut tmtc_task = TmtcTaskStatic::new(
tc_args,
PusReceiver::new(verif_reporter.clone(), pus_router),
);
let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT);
let udp_ccsds_distributor = CcsdsDistributor::new(Box::new(ccsds_receiver.clone()));
let udp_tc_server = UdpTcServer::new(sock_addr, 2048, Box::new(udp_ccsds_distributor))
.expect("creating UDP TMTC server failed");
let mut udp_tmtc_server = UdpTmtcServer {
udp_tc_server,
tm_rx: tm_args.tm_udp_server_rx,
tm_store: tm_args.tm_store.clone_backing_pool(),
tm_handler: StaticUdpTmHandler {
tm_rx: tm_args.tm_udp_server_rx,
tm_store: tm_args.tm_store.clone_backing_pool(),
},
};
let tcp_ccsds_distributor = CcsdsDistributor::new(Box::new(ccsds_receiver));
let tcp_server_cfg = ServerConfig::new(sock_addr, Duration::from_millis(400), 4096, 8192);
let sync_tm_tcp_source = SyncTcpTmSource::new(200);
let mut tcp_server = TcpTask::new(
tcp_server_cfg,
sync_tm_tcp_source.clone(),
tcp_ccsds_distributor,
)
.expect("tcp server creation failed");
let mut acs_task = AcsTask::new(
MpscTmInStoreSender::new(
TmSenderId::AcsSubsystem as ChannelId,
"ACS_TASK_SENDER",
shared_tm_store.clone(),
tm_funnel_tx.clone(),
),
acs_thread_rx,
verif_reporter,
);
let mut tm_funnel = TmFunnelStatic::new(
shared_tm_store,
sync_tm_tcp_source,
tm_funnel_rx,
tm_server_tx,
);
info!("Starting TMTC and UDP task");
let jh_udp_tmtc = thread::Builder::new()
.name("TMTC and UDP".to_string())
@ -312,20 +238,11 @@ fn main() {
loop {
udp_tmtc_server.periodic_operation();
tmtc_task.periodic_operation();
thread::sleep(Duration::from_millis(400));
thread::sleep(Duration::from_millis(FREQ_MS_UDP_TMTC));
}
})
.unwrap();
let tcp_ccsds_distributor = CcsdsDistributor::new(Box::new(ccsds_receiver));
let tcp_server_cfg = ServerConfig::new(sock_addr, Duration::from_millis(400), 4096, 8192);
let mut sync_tm_tcp_source = SyncTcpTmSource::new(200);
let mut tcp_server = TcpTask::new(
tcp_server_cfg,
sync_tm_tcp_source.clone(),
tcp_ccsds_distributor,
)
.expect("tcp server creation failed");
info!("Starting TCP task");
let jh_tcp = thread::Builder::new()
.name("TCP".to_string())
@ -338,225 +255,271 @@ fn main() {
.unwrap();
info!("Starting TM funnel task");
let jh1 = thread::Builder::new()
let jh_tm_funnel = thread::Builder::new()
.name("TM Funnel".to_string())
.spawn(move || {
let tm_funnel = TmFunnel {
tm_server_tx,
tm_funnel_rx,
};
loop {
if let Ok(addr) = tm_funnel.tm_funnel_rx.recv() {
// Read the TM, set sequence counter and message counter, and finally update
// the CRC.
let shared_pool = shared_tm_store.clone_backing_pool();
let mut pool_guard = shared_pool.write().expect("Locking TM pool failed");
let tm_raw = pool_guard
.modify(&addr)
.expect("Reading TM from pool failed");
let mut zero_copy_writer = PusTmZeroCopyWriter::new(tm_raw)
.expect("Creating TM zero copy writer failed");
zero_copy_writer.set_apid(PUS_APID);
zero_copy_writer.set_seq_count(seq_count_provider.get_and_increment());
let entry = msg_counter_map
.entry(zero_copy_writer.service())
.or_insert(0);
zero_copy_writer.set_msg_count(*entry);
if *entry == u16::MAX {
*entry = 0;
} else {
*entry += 1;
}
// This operation has to come last!
zero_copy_writer.finish();
tm_funnel
.tm_server_tx
.send(addr)
.expect("Sending TM to server failed");
sync_tm_tcp_source.add_tm(tm_raw);
}
}
.spawn(move || loop {
tm_funnel.operation();
})
.unwrap();
info!("Starting event handling task");
let jh2 = thread::Builder::new()
let jh_event_handling = thread::Builder::new()
.name("Event".to_string())
.spawn(move || {
let mut timestamp: [u8; 7] = [0; 7];
let mut sender = MpscTmInStoreSender::new(
TmSenderId::AllEvents as ChannelId,
"ALL_EVENTS_TX",
tm_store_event.clone(),
tm_funnel_tx,
);
let mut time_provider = TimeProvider::new_with_u16_days(0, 0);
let report_completion = |event_req: EventRequestWithToken, timestamp: &[u8]| {
let started_token: VerificationToken<TcStateStarted> = event_req
.token
.try_into()
.expect("expected start verification token");
reporter_event_handler
.completion_success(started_token, Some(timestamp))
.expect("Sending completion success failed");
};
loop {
// handle event requests
if let Ok(event_req) = event_request_rx.try_recv() {
match event_req.request {
EventRequest::Enable(event) => {
pus_event_dispatcher
.enable_tm_for_event(&event)
.expect("Enabling TM failed");
update_time(&mut time_provider, &mut timestamp);
report_completion(event_req, &timestamp);
}
EventRequest::Disable(event) => {
pus_event_dispatcher
.disable_tm_for_event(&event)
.expect("Disabling TM failed");
update_time(&mut time_provider, &mut timestamp);
report_completion(event_req, &timestamp);
}
}
}
// Perform the event routing.
event_man
.try_event_handling()
.expect("event handling failed");
// Perform the generation of PUS event packets
if let Ok((event, _param)) = pus_event_man_rx.try_recv() {
update_time(&mut time_provider, &mut timestamp);
pus_event_dispatcher
.generate_pus_event_tm_generic(&mut sender, &timestamp, event, None)
.expect("Sending TM as event failed");
}
thread::sleep(Duration::from_millis(400));
}
.spawn(move || loop {
event_handler.periodic_operation();
thread::sleep(Duration::from_millis(FREQ_MS_EVENT_HANDLING));
})
.unwrap();
info!("Starting AOCS thread");
let jh3 = thread::Builder::new()
let jh_aocs = thread::Builder::new()
.name("AOCS".to_string())
.spawn(move || {
let mut timestamp: [u8; 7] = [0; 7];
let mut time_provider = TimeProvider::new_with_u16_days(0, 0);
loop {
// TODO: Move this into a separate function/task/module..
match acs_thread_rx.try_recv() {
Ok(request) => {
info!(
"ACS thread: Received HK request {:?}",
request.targeted_request
);
update_time(&mut time_provider, &mut timestamp);
match request.targeted_request.request {
Request::Hk(hk_req) => match hk_req {
HkRequest::OneShot(unique_id) => {
// TODO: We should check whether the unique ID is even valid.
let target = request.targeted_request.target_id;
assert_eq!(
target.target_id(),
RequestTargetId::AcsSubsystem as u32
);
if request.targeted_request.target_id.target
== AcsHkIds::TestMgmSet as u32
{
let mut sp_header = SpHeader::tm(
PUS_APID,
SequenceFlags::Unsegmented,
0,
0,
)
.unwrap();
let sec_header = PusTmSecondaryHeader::new_simple(
3,
HkSubservice::TmHkPacket as u8,
&timestamp,
);
let mut buf: [u8; 8] = [0; 8];
let hk_id = HkUniqueId::new(target.target_id(), unique_id);
hk_id.write_to_be_bytes(&mut buf).unwrap();
let pus_tm = PusTmCreator::new(
&mut sp_header,
sec_header,
&buf,
true,
);
let addr = aocs_tm_store
.add_pus_tm(&pus_tm)
.expect("Adding PUS TM failed");
aocs_tm_funnel.send(addr).expect("Sending HK TM failed");
}
}
HkRequest::Enable(_) => {}
HkRequest::Disable(_) => {}
HkRequest::ModifyCollectionInterval(_, _) => {}
},
Request::Mode(_mode_req) => {
warn!("mode request handling not implemented yet")
}
Request::Action(_action_req) => {
warn!("action request handling not implemented yet")
}
}
let started_token = reporter_aocs
.start_success(request.token, Some(&timestamp))
.expect("Sending start success failed");
reporter_aocs
.completion_success(started_token, Some(&timestamp))
.expect("Sending completion success failed");
}
Err(e) => match e {
TryRecvError::Empty => {}
TryRecvError::Disconnected => {
warn!("ACS thread: Message Queue TX disconnected!")
}
},
}
thread::sleep(Duration::from_millis(500));
}
.spawn(move || loop {
acs_task.periodic_operation();
thread::sleep(Duration::from_millis(FREQ_MS_AOCS));
})
.unwrap();
info!("Starting PUS handler thread");
let jh4 = thread::Builder::new()
let jh_pus_handler = thread::Builder::new()
.name("PUS".to_string())
.spawn(move || loop {
pus_11_wrapper.release_tcs();
loop {
let mut all_queues_empty = true;
let mut is_srv_finished = |srv_handler_finished: bool| {
if !srv_handler_finished {
all_queues_empty = false;
}
};
is_srv_finished(pus_17_wrapper.handle_next_packet());
is_srv_finished(pus_11_wrapper.handle_next_packet());
is_srv_finished(pus_5_wrapper.handle_next_packet());
is_srv_finished(pus_8_wrapper.handle_next_packet());
is_srv_finished(pus_3_wrapper.handle_next_packet());
if all_queues_empty {
break;
}
}
thread::sleep(Duration::from_millis(200));
pus_stack.periodic_operation();
thread::sleep(Duration::from_millis(FREQ_MS_PUS_STACK));
})
.unwrap();
jh_udp_tmtc
.join()
.expect("Joining UDP TMTC server thread failed");
jh_tcp
.join()
.expect("Joining TCP TMTC server thread failed");
jh1.join().expect("Joining TM Funnel thread failed");
jh2.join().expect("Joining Event Manager thread failed");
jh3.join().expect("Joining AOCS thread failed");
jh4.join().expect("Joining PUS handler thread failed");
jh_tm_funnel
.join()
.expect("Joining TM Funnel thread failed");
jh_event_handling
.join()
.expect("Joining Event Manager thread failed");
jh_aocs.join().expect("Joining AOCS thread failed");
jh_pus_handler
.join()
.expect("Joining PUS handler thread failed");
}
#[allow(dead_code)]
fn dyn_tmtc_pool_main() {
let (tc_source_tx, tc_source_rx) = channel();
let (tm_funnel_tx, tm_funnel_rx) = channel();
let (tm_server_tx, tm_server_rx) = channel();
// Every software component which needs to generate verification telemetry, gets a cloned
// verification reporter.
let verif_reporter = create_verification_reporter(MpscTmAsVecSender::new(
TmSenderId::PusVerification as ChannelId,
"verif_sender",
tm_funnel_tx.clone(),
));
let acs_target_id = TargetIdWithApid::new(PUS_APID, RequestTargetId::AcsSubsystem as TargetId);
let (acs_thread_tx, acs_thread_rx) = channel::<RequestWithToken>();
// Some request are targetable. This map is used to retrieve sender handles based on a target ID.
let mut request_map = HashMap::new();
request_map.insert(acs_target_id, acs_thread_tx);
let tc_source = PusTcSourceProviderDynamic(tc_source_tx);
// Create event handling components
// These sender handles are used to send event requests, for example to enable or disable
// certain events.
let (event_request_tx, event_request_rx) = mpsc::channel::<EventRequestWithToken>();
// The event task is the core handler to perform the event routing and TM handling as specified
// in the sat-rs documentation.
let mut event_handler = EventHandler::new(
MpscTmAsVecSender::new(
TmSenderId::AllEvents as ChannelId,
"ALL_EVENTS_TX",
tm_funnel_tx.clone(),
),
verif_reporter.clone(),
event_request_rx,
);
let (pus_test_tx, pus_test_rx) = channel();
let (pus_event_tx, pus_event_rx) = channel();
let (pus_sched_tx, pus_sched_rx) = channel();
let (pus_hk_tx, pus_hk_rx) = channel();
let (pus_action_tx, pus_action_rx) = channel();
let pus_router = PusTcMpscRouter {
test_service_receiver: pus_test_tx,
event_service_receiver: pus_event_tx,
sched_service_receiver: pus_sched_tx,
hk_service_receiver: pus_hk_tx,
action_service_receiver: pus_action_tx,
};
let pus_test_service = create_test_service_dynamic(
tm_funnel_tx.clone(),
verif_reporter.clone(),
event_handler.clone_event_sender(),
pus_test_rx,
);
let pus_scheduler_service = create_scheduler_service_dynamic(
tm_funnel_tx.clone(),
verif_reporter.clone(),
tc_source.0.clone(),
pus_sched_rx,
create_sched_tc_pool(),
);
let pus_event_service = create_event_service_dynamic(
tm_funnel_tx.clone(),
verif_reporter.clone(),
pus_event_rx,
event_request_tx,
);
let pus_action_service = create_action_service_dynamic(
tm_funnel_tx.clone(),
verif_reporter.clone(),
pus_action_rx,
request_map.clone(),
);
let pus_hk_service = create_hk_service_dynamic(
tm_funnel_tx.clone(),
verif_reporter.clone(),
pus_hk_rx,
request_map,
);
let mut pus_stack = PusStack::new(
pus_hk_service,
pus_event_service,
pus_action_service,
pus_scheduler_service,
pus_test_service,
);
let ccsds_receiver = CcsdsReceiver { tc_source };
let mut tmtc_task = TmtcTaskDynamic::new(
tc_source_rx,
PusReceiver::new(verif_reporter.clone(), pus_router),
);
let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT);
let udp_ccsds_distributor = CcsdsDistributor::new(Box::new(ccsds_receiver.clone()));
let udp_tc_server = UdpTcServer::new(sock_addr, 2048, Box::new(udp_ccsds_distributor))
.expect("creating UDP TMTC server failed");
let mut udp_tmtc_server = UdpTmtcServer {
udp_tc_server,
tm_handler: DynamicUdpTmHandler {
tm_rx: tm_server_rx,
},
};
let tcp_ccsds_distributor = CcsdsDistributor::new(Box::new(ccsds_receiver));
let tcp_server_cfg = ServerConfig::new(sock_addr, Duration::from_millis(400), 4096, 8192);
let sync_tm_tcp_source = SyncTcpTmSource::new(200);
let mut tcp_server = TcpTask::new(
tcp_server_cfg,
sync_tm_tcp_source.clone(),
tcp_ccsds_distributor,
)
.expect("tcp server creation failed");
let mut acs_task = AcsTask::new(
MpscTmAsVecSender::new(
TmSenderId::AcsSubsystem as ChannelId,
"ACS_TASK_SENDER",
tm_funnel_tx.clone(),
),
acs_thread_rx,
verif_reporter,
);
let mut tm_funnel = TmFunnelDynamic::new(sync_tm_tcp_source, tm_funnel_rx, tm_server_tx);
info!("Starting TMTC and UDP task");
let jh_udp_tmtc = thread::Builder::new()
.name("TMTC and UDP".to_string())
.spawn(move || {
info!("Running UDP server on port {SERVER_PORT}");
loop {
udp_tmtc_server.periodic_operation();
tmtc_task.periodic_operation();
thread::sleep(Duration::from_millis(FREQ_MS_UDP_TMTC));
}
})
.unwrap();
info!("Starting TCP task");
let jh_tcp = thread::Builder::new()
.name("TCP".to_string())
.spawn(move || {
info!("Running TCP server on port {SERVER_PORT}");
loop {
tcp_server.periodic_operation();
}
})
.unwrap();
info!("Starting TM funnel task");
let jh_tm_funnel = thread::Builder::new()
.name("TM Funnel".to_string())
.spawn(move || loop {
tm_funnel.operation();
})
.unwrap();
info!("Starting event handling task");
let jh_event_handling = thread::Builder::new()
.name("Event".to_string())
.spawn(move || loop {
event_handler.periodic_operation();
thread::sleep(Duration::from_millis(FREQ_MS_EVENT_HANDLING));
})
.unwrap();
info!("Starting AOCS thread");
let jh_aocs = thread::Builder::new()
.name("AOCS".to_string())
.spawn(move || loop {
acs_task.periodic_operation();
thread::sleep(Duration::from_millis(FREQ_MS_AOCS));
})
.unwrap();
info!("Starting PUS handler thread");
let jh_pus_handler = thread::Builder::new()
.name("PUS".to_string())
.spawn(move || loop {
pus_stack.periodic_operation();
thread::sleep(Duration::from_millis(FREQ_MS_PUS_STACK));
})
.unwrap();
jh_udp_tmtc
.join()
.expect("Joining UDP TMTC server thread failed");
jh_tcp
.join()
.expect("Joining TCP TMTC server thread failed");
jh_tm_funnel
.join()
.expect("Joining TM Funnel thread failed");
jh_event_handling
.join()
.expect("Joining Event Manager thread failed");
jh_aocs.join().expect("Joining AOCS thread failed");
jh_pus_handler
.join()
.expect("Joining PUS handler thread failed");
}
fn main() {
setup_logger().expect("setting up logging with fern failed");
println!("Running OBSW example");
#[cfg(not(feature = "dyn_tmtc"))]
static_tmtc_pool_main();
#[cfg(feature = "dyn_tmtc")]
dyn_tmtc_pool_main();
}
pub fn update_time(time_provider: &mut TimeProvider, timestamp: &mut [u8]) {

View File

@ -1,17 +1,79 @@
use crate::requests::{ActionRequest, Request, RequestWithToken};
use log::{error, warn};
use satrs_core::pool::{SharedStaticMemoryPool, StoreAddr};
use satrs_core::pus::verification::{
FailParams, TcStateAccepted, VerificationReporterWithSender, VerificationToken,
};
use satrs_core::pus::{
EcssTcInMemConverter, EcssTcInSharedStoreConverter, EcssTcReceiver, EcssTmSender,
EcssTcAndToken, EcssTcInMemConverter, EcssTcInSharedStoreConverter, EcssTcInVecConverter,
EcssTcReceiver, EcssTmSender, MpscTcReceiver, MpscTmAsVecSender, MpscTmInStoreSender,
PusPacketHandlerResult, PusPacketHandlingError, PusServiceBase, PusServiceHelper,
};
use satrs_core::spacepackets::ecss::tc::PusTcReader;
use satrs_core::spacepackets::ecss::PusPacket;
use satrs_example::{tmtc_err, TargetIdWithApid};
use satrs_core::tmtc::tm_helper::SharedTmStore;
use satrs_core::ChannelId;
use satrs_example::config::{tmtc_err, TcReceiverId, TmSenderId, PUS_APID};
use satrs_example::TargetIdWithApid;
use std::collections::HashMap;
use std::sync::mpsc::Sender;
use std::sync::mpsc::{self, Sender};
pub fn create_action_service_static(
shared_tm_store: SharedTmStore,
tm_funnel_tx: mpsc::Sender<StoreAddr>,
verif_reporter: VerificationReporterWithSender,
tc_pool: SharedStaticMemoryPool,
pus_action_rx: mpsc::Receiver<EcssTcAndToken>,
request_map: HashMap<TargetIdWithApid, mpsc::Sender<RequestWithToken>>,
) -> Pus8Wrapper<EcssTcInSharedStoreConverter> {
let action_srv_tm_sender = MpscTmInStoreSender::new(
TmSenderId::PusAction as ChannelId,
"PUS_8_TM_SENDER",
shared_tm_store.clone(),
tm_funnel_tx.clone(),
);
let action_srv_receiver = MpscTcReceiver::new(
TcReceiverId::PusAction as ChannelId,
"PUS_8_TC_RECV",
pus_action_rx,
);
let pus_8_handler = PusService8ActionHandler::new(
Box::new(action_srv_receiver),
Box::new(action_srv_tm_sender),
PUS_APID,
verif_reporter.clone(),
EcssTcInSharedStoreConverter::new(tc_pool.clone(), 2048),
request_map.clone(),
);
Pus8Wrapper { pus_8_handler }
}
pub fn create_action_service_dynamic(
tm_funnel_tx: mpsc::Sender<Vec<u8>>,
verif_reporter: VerificationReporterWithSender,
pus_action_rx: mpsc::Receiver<EcssTcAndToken>,
request_map: HashMap<TargetIdWithApid, mpsc::Sender<RequestWithToken>>,
) -> Pus8Wrapper<EcssTcInVecConverter> {
let action_srv_tm_sender = MpscTmAsVecSender::new(
TmSenderId::PusAction as ChannelId,
"PUS_8_TM_SENDER",
tm_funnel_tx.clone(),
);
let action_srv_receiver = MpscTcReceiver::new(
TcReceiverId::PusAction as ChannelId,
"PUS_8_TC_RECV",
pus_action_rx,
);
let pus_8_handler = PusService8ActionHandler::new(
Box::new(action_srv_receiver),
Box::new(action_srv_tm_sender),
PUS_APID,
verif_reporter.clone(),
EcssTcInVecConverter::default(),
request_map.clone(),
);
Pus8Wrapper { pus_8_handler }
}
pub struct PusService8ActionHandler<TcInMemConverter: EcssTcInMemConverter> {
service_helper: PusServiceHelper<TcInMemConverter>,
@ -141,11 +203,11 @@ impl<TcInMemConverter: EcssTcInMemConverter> PusService8ActionHandler<TcInMemCon
}
}
pub struct Pus8Wrapper {
pub(crate) pus_8_handler: PusService8ActionHandler<EcssTcInSharedStoreConverter>,
pub struct Pus8Wrapper<TcInMemConverter: EcssTcInMemConverter> {
pub(crate) pus_8_handler: PusService8ActionHandler<TcInMemConverter>,
}
impl Pus8Wrapper {
impl<TcInMemConverter: EcssTcInMemConverter> Pus8Wrapper<TcInMemConverter> {
pub fn handle_next_packet(&mut self) -> bool {
match self.pus_8_handler.handle_one_tc() {
Ok(result) => match result {

View File

@ -1,12 +1,85 @@
use log::{error, warn};
use satrs_core::pus::event_srv::PusService5EventHandler;
use satrs_core::pus::{EcssTcInSharedStoreConverter, PusPacketHandlerResult};
use std::sync::mpsc;
pub struct Pus5Wrapper {
pub pus_5_handler: PusService5EventHandler<EcssTcInSharedStoreConverter>,
use log::{error, warn};
use satrs_core::pool::{SharedStaticMemoryPool, StoreAddr};
use satrs_core::pus::event_man::EventRequestWithToken;
use satrs_core::pus::event_srv::PusService5EventHandler;
use satrs_core::pus::verification::VerificationReporterWithSender;
use satrs_core::pus::{
EcssTcAndToken, EcssTcInMemConverter, EcssTcInSharedStoreConverter, EcssTcInVecConverter,
MpscTcReceiver, MpscTmAsVecSender, MpscTmInStoreSender, PusPacketHandlerResult,
PusServiceHelper,
};
use satrs_core::tmtc::tm_helper::SharedTmStore;
use satrs_core::ChannelId;
use satrs_example::config::{TcReceiverId, TmSenderId, PUS_APID};
pub fn create_event_service_static(
shared_tm_store: SharedTmStore,
tm_funnel_tx: mpsc::Sender<StoreAddr>,
verif_reporter: VerificationReporterWithSender,
tc_pool: SharedStaticMemoryPool,
pus_event_rx: mpsc::Receiver<EcssTcAndToken>,
event_request_tx: mpsc::Sender<EventRequestWithToken>,
) -> Pus5Wrapper<EcssTcInSharedStoreConverter> {
let event_srv_tm_sender = MpscTmInStoreSender::new(
TmSenderId::PusEvent as ChannelId,
"PUS_5_TM_SENDER",
shared_tm_store.clone(),
tm_funnel_tx.clone(),
);
let event_srv_receiver = MpscTcReceiver::new(
TcReceiverId::PusEvent as ChannelId,
"PUS_5_TC_RECV",
pus_event_rx,
);
let pus_5_handler = PusService5EventHandler::new(
PusServiceHelper::new(
Box::new(event_srv_receiver),
Box::new(event_srv_tm_sender),
PUS_APID,
verif_reporter.clone(),
EcssTcInSharedStoreConverter::new(tc_pool.clone(), 2048),
),
event_request_tx,
);
Pus5Wrapper { pus_5_handler }
}
impl Pus5Wrapper {
pub fn create_event_service_dynamic(
tm_funnel_tx: mpsc::Sender<Vec<u8>>,
verif_reporter: VerificationReporterWithSender,
pus_event_rx: mpsc::Receiver<EcssTcAndToken>,
event_request_tx: mpsc::Sender<EventRequestWithToken>,
) -> Pus5Wrapper<EcssTcInVecConverter> {
let event_srv_tm_sender = MpscTmAsVecSender::new(
TmSenderId::PusEvent as ChannelId,
"PUS_5_TM_SENDER",
tm_funnel_tx,
);
let event_srv_receiver = MpscTcReceiver::new(
TcReceiverId::PusEvent as ChannelId,
"PUS_5_TC_RECV",
pus_event_rx,
);
let pus_5_handler = PusService5EventHandler::new(
PusServiceHelper::new(
Box::new(event_srv_receiver),
Box::new(event_srv_tm_sender),
PUS_APID,
verif_reporter.clone(),
EcssTcInVecConverter::default(),
),
event_request_tx,
);
Pus5Wrapper { pus_5_handler }
}
pub struct Pus5Wrapper<TcInMemConverter: EcssTcInMemConverter> {
pub pus_5_handler: PusService5EventHandler<TcInMemConverter>,
}
impl<TcInMemConverter: EcssTcInMemConverter> Pus5Wrapper<TcInMemConverter> {
pub fn handle_next_packet(&mut self) -> bool {
match self.pus_5_handler.handle_one_tc() {
Ok(result) => match result {

View File

@ -1,15 +1,73 @@
use crate::requests::{Request, RequestWithToken};
use log::{error, warn};
use satrs_core::hk::{CollectionIntervalFactor, HkRequest};
use satrs_core::pus::verification::{FailParams, StdVerifReporterWithSender};
use satrs_core::pool::{SharedStaticMemoryPool, StoreAddr};
use satrs_core::pus::verification::{
FailParams, StdVerifReporterWithSender, VerificationReporterWithSender,
};
use satrs_core::pus::{
EcssTcInMemConverter, EcssTcInSharedStoreConverter, EcssTcReceiver, EcssTmSender,
EcssTcAndToken, EcssTcInMemConverter, EcssTcInSharedStoreConverter, EcssTcInVecConverter,
EcssTcReceiver, EcssTmSender, MpscTcReceiver, MpscTmAsVecSender, MpscTmInStoreSender,
PusPacketHandlerResult, PusPacketHandlingError, PusServiceBase, PusServiceHelper,
};
use satrs_core::spacepackets::ecss::{hk, PusPacket};
use satrs_example::{hk_err, tmtc_err, TargetIdWithApid};
use satrs_core::tmtc::tm_helper::SharedTmStore;
use satrs_core::ChannelId;
use satrs_example::config::{hk_err, tmtc_err, TcReceiverId, TmSenderId, PUS_APID};
use satrs_example::TargetIdWithApid;
use std::collections::HashMap;
use std::sync::mpsc::Sender;
use std::sync::mpsc::{self, Sender};
pub fn create_hk_service_static(
shared_tm_store: SharedTmStore,
tm_funnel_tx: mpsc::Sender<StoreAddr>,
verif_reporter: VerificationReporterWithSender,
tc_pool: SharedStaticMemoryPool,
pus_hk_rx: mpsc::Receiver<EcssTcAndToken>,
request_map: HashMap<TargetIdWithApid, mpsc::Sender<RequestWithToken>>,
) -> Pus3Wrapper<EcssTcInSharedStoreConverter> {
let hk_srv_tm_sender = MpscTmInStoreSender::new(
TmSenderId::PusHk as ChannelId,
"PUS_3_TM_SENDER",
shared_tm_store.clone(),
tm_funnel_tx.clone(),
);
let hk_srv_receiver =
MpscTcReceiver::new(TcReceiverId::PusHk as ChannelId, "PUS_8_TC_RECV", pus_hk_rx);
let pus_3_handler = PusService3HkHandler::new(
Box::new(hk_srv_receiver),
Box::new(hk_srv_tm_sender),
PUS_APID,
verif_reporter.clone(),
EcssTcInSharedStoreConverter::new(tc_pool, 2048),
request_map,
);
Pus3Wrapper { pus_3_handler }
}
pub fn create_hk_service_dynamic(
tm_funnel_tx: mpsc::Sender<Vec<u8>>,
verif_reporter: VerificationReporterWithSender,
pus_hk_rx: mpsc::Receiver<EcssTcAndToken>,
request_map: HashMap<TargetIdWithApid, mpsc::Sender<RequestWithToken>>,
) -> Pus3Wrapper<EcssTcInVecConverter> {
let hk_srv_tm_sender = MpscTmAsVecSender::new(
TmSenderId::PusHk as ChannelId,
"PUS_3_TM_SENDER",
tm_funnel_tx.clone(),
);
let hk_srv_receiver =
MpscTcReceiver::new(TcReceiverId::PusHk as ChannelId, "PUS_8_TC_RECV", pus_hk_rx);
let pus_3_handler = PusService3HkHandler::new(
Box::new(hk_srv_receiver),
Box::new(hk_srv_tm_sender),
PUS_APID,
verif_reporter.clone(),
EcssTcInVecConverter::default(),
request_map,
);
Pus3Wrapper { pus_3_handler }
}
pub struct PusService3HkHandler<TcInMemConverter: EcssTcInMemConverter> {
psb: PusServiceHelper<TcInMemConverter>,
@ -147,11 +205,11 @@ impl<TcInMemConverter: EcssTcInMemConverter> PusService3HkHandler<TcInMemConvert
}
}
pub struct Pus3Wrapper {
pub(crate) pus_3_handler: PusService3HkHandler<EcssTcInSharedStoreConverter>,
pub struct Pus3Wrapper<TcInMemConverter: EcssTcInMemConverter> {
pub(crate) pus_3_handler: PusService3HkHandler<TcInMemConverter>,
}
impl Pus3Wrapper {
impl<TcInMemConverter: EcssTcInMemConverter> Pus3Wrapper<TcInMemConverter> {
pub fn handle_next_packet(&mut self) -> bool {
match self.pus_3_handler.handle_one_tc() {
Ok(result) => match result {

View File

@ -6,13 +6,14 @@ use satrs_core::spacepackets::ecss::tc::PusTcReader;
use satrs_core::spacepackets::ecss::PusServiceId;
use satrs_core::spacepackets::time::cds::TimeProvider;
use satrs_core::spacepackets::time::TimeWriter;
use satrs_example::{tmtc_err, CustomPusServiceId};
use satrs_example::config::{tmtc_err, CustomPusServiceId};
use std::sync::mpsc::Sender;
pub mod action;
pub mod event;
pub mod hk;
pub mod scheduler;
pub mod stack;
pub mod test;
pub struct PusTcMpscRouter {

View File

@ -1,36 +1,66 @@
use crate::tmtc::PusTcSource;
use std::sync::mpsc;
use std::time::Duration;
use log::{error, info, warn};
use satrs_core::pool::{PoolProviderMemInPlace, StaticMemoryPool};
use satrs_core::pool::{PoolProviderMemInPlace, StaticMemoryPool, StoreAddr};
use satrs_core::pus::scheduler::{PusScheduler, TcInfo};
use satrs_core::pus::scheduler_srv::PusService11SchedHandler;
use satrs_core::pus::{EcssTcInSharedStoreConverter, PusPacketHandlerResult};
use satrs_core::pus::verification::VerificationReporterWithSender;
use satrs_core::pus::{
EcssTcAndToken, EcssTcInMemConverter, EcssTcInSharedStoreConverter, EcssTcInVecConverter,
MpscTcReceiver, MpscTmAsVecSender, MpscTmInStoreSender, PusPacketHandlerResult,
PusServiceHelper,
};
use satrs_core::tmtc::tm_helper::SharedTmStore;
use satrs_core::ChannelId;
use satrs_example::config::{TcReceiverId, TmSenderId, PUS_APID};
pub struct Pus11Wrapper {
pub pus_11_handler: PusService11SchedHandler<EcssTcInSharedStoreConverter, PusScheduler>,
pub sched_tc_pool: StaticMemoryPool,
pub tc_source_wrapper: PusTcSource,
use crate::tmtc::PusTcSourceProviderSharedPool;
pub trait TcReleaser {
fn release(&mut self, enabled: bool, info: &TcInfo, tc: &[u8]) -> bool;
}
impl Pus11Wrapper {
pub fn release_tcs(&mut self) {
let releaser = |enabled: bool, _info: &TcInfo, tc: &[u8]| -> bool {
if enabled {
// Transfer TC from scheduler TC pool to shared TC pool.
let released_tc_addr = self
.tc_source_wrapper
.tc_store
.pool
.write()
.expect("locking pool failed")
.add(tc)
.expect("adding TC to shared pool failed");
impl TcReleaser for PusTcSourceProviderSharedPool {
fn release(&mut self, enabled: bool, _info: &TcInfo, tc: &[u8]) -> bool {
if enabled {
// Transfer TC from scheduler TC pool to shared TC pool.
let released_tc_addr = self
.shared_pool
.pool
.write()
.expect("locking pool failed")
.add(tc)
.expect("adding TC to shared pool failed");
self.tc_source
.send(released_tc_addr)
.expect("sending TC to TC source failed");
}
true
}
}
self.tc_source_wrapper
.tc_source
.send(released_tc_addr)
.expect("sending TC to TC source failed");
}
true
impl TcReleaser for mpsc::Sender<Vec<u8>> {
fn release(&mut self, enabled: bool, _info: &TcInfo, tc: &[u8]) -> bool {
if enabled {
// Send released TC to centralized TC source.
self.send(tc.to_vec())
.expect("sending TC to TC source failed");
}
true
}
}
pub struct Pus11Wrapper<TcInMemConverter: EcssTcInMemConverter> {
pub pus_11_handler: PusService11SchedHandler<TcInMemConverter, PusScheduler>,
pub sched_tc_pool: StaticMemoryPool,
pub tc_releaser: Box<dyn TcReleaser + Send>,
}
impl<TcInMemConverter: EcssTcInMemConverter> Pus11Wrapper<TcInMemConverter> {
pub fn release_tcs(&mut self) {
let releaser = |enabled: bool, info: &TcInfo, tc: &[u8]| -> bool {
self.tc_releaser.release(enabled, info, tc)
};
self.pus_11_handler
@ -71,3 +101,77 @@ impl Pus11Wrapper {
false
}
}
pub fn create_scheduler_service_static(
shared_tm_store: SharedTmStore,
tm_funnel_tx: mpsc::Sender<StoreAddr>,
verif_reporter: VerificationReporterWithSender,
tc_releaser: PusTcSourceProviderSharedPool,
pus_sched_rx: mpsc::Receiver<EcssTcAndToken>,
sched_tc_pool: StaticMemoryPool,
) -> Pus11Wrapper<EcssTcInSharedStoreConverter> {
let sched_srv_tm_sender = MpscTmInStoreSender::new(
TmSenderId::PusSched as ChannelId,
"PUS_11_TM_SENDER",
shared_tm_store.clone(),
tm_funnel_tx.clone(),
);
let sched_srv_receiver = MpscTcReceiver::new(
TcReceiverId::PusSched as ChannelId,
"PUS_11_TC_RECV",
pus_sched_rx,
);
let scheduler = PusScheduler::new_with_current_init_time(Duration::from_secs(5))
.expect("Creating PUS Scheduler failed");
let pus_11_handler = PusService11SchedHandler::new(
PusServiceHelper::new(
Box::new(sched_srv_receiver),
Box::new(sched_srv_tm_sender),
PUS_APID,
verif_reporter.clone(),
EcssTcInSharedStoreConverter::new(tc_releaser.clone_backing_pool(), 2048),
),
scheduler,
);
Pus11Wrapper {
pus_11_handler,
sched_tc_pool,
tc_releaser: Box::new(tc_releaser),
}
}
pub fn create_scheduler_service_dynamic(
tm_funnel_tx: mpsc::Sender<Vec<u8>>,
verif_reporter: VerificationReporterWithSender,
tc_source_sender: mpsc::Sender<Vec<u8>>,
pus_sched_rx: mpsc::Receiver<EcssTcAndToken>,
sched_tc_pool: StaticMemoryPool,
) -> Pus11Wrapper<EcssTcInVecConverter> {
let sched_srv_tm_sender = MpscTmAsVecSender::new(
TmSenderId::PusSched as ChannelId,
"PUS_11_TM_SENDER",
tm_funnel_tx,
);
let sched_srv_receiver = MpscTcReceiver::new(
TcReceiverId::PusSched as ChannelId,
"PUS_11_TC_RECV",
pus_sched_rx,
);
let scheduler = PusScheduler::new_with_current_init_time(Duration::from_secs(5))
.expect("Creating PUS Scheduler failed");
let pus_11_handler = PusService11SchedHandler::new(
PusServiceHelper::new(
Box::new(sched_srv_receiver),
Box::new(sched_srv_tm_sender),
PUS_APID,
verif_reporter.clone(),
EcssTcInVecConverter::default(),
),
scheduler,
);
Pus11Wrapper {
pus_11_handler,
sched_tc_pool,
tc_releaser: Box::new(tc_source_sender),
}
}

View File

@ -0,0 +1,52 @@
use satrs_core::pus::EcssTcInMemConverter;
use super::{
action::Pus8Wrapper, event::Pus5Wrapper, hk::Pus3Wrapper, scheduler::Pus11Wrapper,
test::Service17CustomWrapper,
};
pub struct PusStack<TcInMemConverter: EcssTcInMemConverter> {
event_srv: Pus5Wrapper<TcInMemConverter>,
hk_srv: Pus3Wrapper<TcInMemConverter>,
action_srv: Pus8Wrapper<TcInMemConverter>,
schedule_srv: Pus11Wrapper<TcInMemConverter>,
test_srv: Service17CustomWrapper<TcInMemConverter>,
}
impl<TcInMemConverter: EcssTcInMemConverter> PusStack<TcInMemConverter> {
pub fn new(
hk_srv: Pus3Wrapper<TcInMemConverter>,
event_srv: Pus5Wrapper<TcInMemConverter>,
action_srv: Pus8Wrapper<TcInMemConverter>,
schedule_srv: Pus11Wrapper<TcInMemConverter>,
test_srv: Service17CustomWrapper<TcInMemConverter>,
) -> Self {
Self {
event_srv,
action_srv,
schedule_srv,
test_srv,
hk_srv,
}
}
pub fn periodic_operation(&mut self) {
self.schedule_srv.release_tcs();
loop {
let mut all_queues_empty = true;
let mut is_srv_finished = |srv_handler_finished: bool| {
if !srv_handler_finished {
all_queues_empty = false;
}
};
is_srv_finished(self.test_srv.handle_next_packet());
is_srv_finished(self.schedule_srv.handle_next_packet());
is_srv_finished(self.event_srv.handle_next_packet());
is_srv_finished(self.action_srv.handle_next_packet());
is_srv_finished(self.hk_srv.handle_next_packet());
if all_queues_empty {
break;
}
}
}
}

View File

@ -1,22 +1,89 @@
use log::{info, warn};
use satrs_core::params::Params;
use satrs_core::pool::{SharedStaticMemoryPool, StoreAddr};
use satrs_core::pus::test::PusService17TestHandler;
use satrs_core::pus::verification::FailParams;
use satrs_core::pus::{EcssTcInMemConverter, PusPacketHandlerResult};
use satrs_core::pus::verification::{FailParams, VerificationReporterWithSender};
use satrs_core::pus::{
EcssTcAndToken, EcssTcInMemConverter, EcssTcInVecConverter, MpscTcReceiver, MpscTmAsVecSender,
MpscTmInStoreSender, PusPacketHandlerResult, PusServiceHelper,
};
use satrs_core::spacepackets::ecss::tc::PusTcReader;
use satrs_core::spacepackets::ecss::PusPacket;
use satrs_core::spacepackets::time::cds::TimeProvider;
use satrs_core::spacepackets::time::TimeWriter;
use satrs_core::tmtc::tm_helper::SharedTmStore;
use satrs_core::ChannelId;
use satrs_core::{events::EventU32, pus::EcssTcInSharedStoreConverter};
use satrs_example::{tmtc_err, TEST_EVENT};
use std::sync::mpsc::Sender;
use satrs_example::config::{tmtc_err, TcReceiverId, TmSenderId, PUS_APID, TEST_EVENT};
use std::sync::mpsc::{self, Sender};
pub struct Service17CustomWrapper {
pub pus17_handler: PusService17TestHandler<EcssTcInSharedStoreConverter>,
pub fn create_test_service_static(
shared_tm_store: SharedTmStore,
tm_funnel_tx: mpsc::Sender<StoreAddr>,
verif_reporter: VerificationReporterWithSender,
tc_pool: SharedStaticMemoryPool,
event_sender: mpsc::Sender<(EventU32, Option<Params>)>,
pus_test_rx: mpsc::Receiver<EcssTcAndToken>,
) -> Service17CustomWrapper<EcssTcInSharedStoreConverter> {
let test_srv_tm_sender = MpscTmInStoreSender::new(
TmSenderId::PusTest as ChannelId,
"PUS_17_TM_SENDER",
shared_tm_store.clone(),
tm_funnel_tx.clone(),
);
let test_srv_receiver = MpscTcReceiver::new(
TcReceiverId::PusTest as ChannelId,
"PUS_17_TC_RECV",
pus_test_rx,
);
let pus17_handler = PusService17TestHandler::new(PusServiceHelper::new(
Box::new(test_srv_receiver),
Box::new(test_srv_tm_sender),
PUS_APID,
verif_reporter.clone(),
EcssTcInSharedStoreConverter::new(tc_pool, 2048),
));
Service17CustomWrapper {
pus17_handler,
test_srv_event_sender: event_sender,
}
}
pub fn create_test_service_dynamic(
tm_funnel_tx: mpsc::Sender<Vec<u8>>,
verif_reporter: VerificationReporterWithSender,
event_sender: mpsc::Sender<(EventU32, Option<Params>)>,
pus_test_rx: mpsc::Receiver<EcssTcAndToken>,
) -> Service17CustomWrapper<EcssTcInVecConverter> {
let test_srv_tm_sender = MpscTmAsVecSender::new(
TmSenderId::PusTest as ChannelId,
"PUS_17_TM_SENDER",
tm_funnel_tx.clone(),
);
let test_srv_receiver = MpscTcReceiver::new(
TcReceiverId::PusTest as ChannelId,
"PUS_17_TC_RECV",
pus_test_rx,
);
let pus17_handler = PusService17TestHandler::new(PusServiceHelper::new(
Box::new(test_srv_receiver),
Box::new(test_srv_tm_sender),
PUS_APID,
verif_reporter.clone(),
EcssTcInVecConverter::default(),
));
Service17CustomWrapper {
pus17_handler,
test_srv_event_sender: event_sender,
}
}
pub struct Service17CustomWrapper<TcInMemConverter: EcssTcInMemConverter> {
pub pus17_handler: PusService17TestHandler<TcInMemConverter>,
pub test_srv_event_sender: Sender<(EventU32, Option<Params>)>,
}
impl Service17CustomWrapper {
impl<TcInMemConverter: EcssTcInMemConverter> Service17CustomWrapper<TcInMemConverter> {
pub fn handle_next_packet(&mut self) -> bool {
let res = self.pus17_handler.handle_one_tc();
if res.is_err() {

View File

@ -22,7 +22,7 @@ pub enum Request {
#[derive(Clone, Eq, PartialEq, Debug, new)]
pub struct TargetedRequest {
pub(crate) target_id: TargetIdWithApid,
pub(crate) target_id_with_apid: TargetIdWithApid,
pub(crate) request: Request,
}

View File

@ -9,9 +9,7 @@ use satrs_core::{
spacepackets::PacketId,
tmtc::{CcsdsDistributor, CcsdsError, TmPacketSourceCore},
};
use satrs_example::PUS_APID;
use crate::tmtc::MpscStoreAndSendError;
use satrs_example::config::PUS_APID;
pub const PACKET_ID_LOOKUP: &[PacketId] = &[PacketId::const_tc(true, PUS_APID)];
@ -71,20 +69,21 @@ impl TmPacketSourceCore for SyncTcpTmSource {
}
}
pub struct TcpTask {
pub struct TcpTask<MpscErrorType: 'static> {
server: TcpSpacepacketsServer<
(),
CcsdsError<MpscStoreAndSendError>,
CcsdsError<MpscErrorType>,
SyncTcpTmSource,
CcsdsDistributor<MpscStoreAndSendError>,
CcsdsDistributor<MpscErrorType>,
>,
phantom: std::marker::PhantomData<MpscErrorType>,
}
impl TcpTask {
impl<MpscErrorType: 'static + core::fmt::Debug> TcpTask<MpscErrorType> {
pub fn new(
cfg: ServerConfig,
tm_source: SyncTcpTmSource,
tc_receiver: CcsdsDistributor<MpscStoreAndSendError>,
tc_receiver: CcsdsDistributor<MpscErrorType>,
) -> Result<Self, std::io::Error> {
Ok(Self {
server: TcpSpacepacketsServer::new(
@ -93,6 +92,7 @@ impl TcpTask {
tc_receiver,
Box::new(PACKET_ID_LOOKUP),
)?,
phantom: std::marker::PhantomData,
})
}

View File

@ -0,0 +1,145 @@
use std::{
collections::HashMap,
sync::mpsc::{Receiver, Sender},
};
use satrs_core::{
pool::{PoolProviderMemInPlace, StoreAddr},
seq_count::{CcsdsSimpleSeqCountProvider, SequenceCountProviderCore},
spacepackets::{
ecss::{tm::PusTmZeroCopyWriter, PusPacket},
time::cds::MIN_CDS_FIELD_LEN,
CcsdsPacket,
},
tmtc::tm_helper::SharedTmStore,
};
use crate::tcp::SyncTcpTmSource;
#[derive(Default)]
pub struct CcsdsSeqCounterMap {
apid_seq_counter_map: HashMap<u16, CcsdsSimpleSeqCountProvider>,
}
impl CcsdsSeqCounterMap {
pub fn get_and_increment(&mut self, apid: u16) -> u16 {
self.apid_seq_counter_map
.entry(apid)
.or_default()
.get_and_increment()
}
}
pub struct TmFunnelCommon {
seq_counter_map: CcsdsSeqCounterMap,
msg_counter_map: HashMap<u8, u16>,
sync_tm_tcp_source: SyncTcpTmSource,
}
impl TmFunnelCommon {
pub fn new(sync_tm_tcp_source: SyncTcpTmSource) -> Self {
Self {
seq_counter_map: Default::default(),
msg_counter_map: Default::default(),
sync_tm_tcp_source,
}
}
// Applies common packet processing operations for PUS TM packets. This includes setting
// a sequence counter
fn apply_packet_processing(&mut self, mut zero_copy_writer: PusTmZeroCopyWriter) {
// zero_copy_writer.set_apid(PUS_APID);
zero_copy_writer.set_seq_count(
self.seq_counter_map
.get_and_increment(zero_copy_writer.apid()),
);
let entry = self
.msg_counter_map
.entry(zero_copy_writer.service())
.or_insert(0);
zero_copy_writer.set_msg_count(*entry);
if *entry == u16::MAX {
*entry = 0;
} else {
*entry += 1;
}
// This operation has to come last!
zero_copy_writer.finish();
}
}
pub struct TmFunnelStatic {
common: TmFunnelCommon,
shared_tm_store: SharedTmStore,
tm_funnel_rx: Receiver<StoreAddr>,
tm_server_tx: Sender<StoreAddr>,
}
impl TmFunnelStatic {
pub fn new(
shared_tm_store: SharedTmStore,
sync_tm_tcp_source: SyncTcpTmSource,
tm_funnel_rx: Receiver<StoreAddr>,
tm_server_tx: Sender<StoreAddr>,
) -> Self {
Self {
common: TmFunnelCommon::new(sync_tm_tcp_source),
shared_tm_store,
tm_funnel_rx,
tm_server_tx,
}
}
pub fn operation(&mut self) {
if let Ok(addr) = self.tm_funnel_rx.recv() {
// Read the TM, set sequence counter and message counter, and finally update
// the CRC.
let shared_pool = self.shared_tm_store.clone_backing_pool();
let mut pool_guard = shared_pool.write().expect("Locking TM pool failed");
let tm_raw = pool_guard
.modify(&addr)
.expect("Reading TM from pool failed");
let zero_copy_writer = PusTmZeroCopyWriter::new(tm_raw, MIN_CDS_FIELD_LEN)
.expect("Creating TM zero copy writer failed");
self.common.apply_packet_processing(zero_copy_writer);
self.tm_server_tx
.send(addr)
.expect("Sending TM to server failed");
self.common.sync_tm_tcp_source.add_tm(tm_raw);
}
}
}
pub struct TmFunnelDynamic {
common: TmFunnelCommon,
tm_funnel_rx: Receiver<Vec<u8>>,
tm_server_tx: Sender<Vec<u8>>,
}
impl TmFunnelDynamic {
pub fn new(
sync_tm_tcp_source: SyncTcpTmSource,
tm_funnel_rx: Receiver<Vec<u8>>,
tm_server_tx: Sender<Vec<u8>>,
) -> Self {
Self {
common: TmFunnelCommon::new(sync_tm_tcp_source),
tm_funnel_rx,
tm_server_tx,
}
}
pub fn operation(&mut self) {
if let Ok(mut tm) = self.tm_funnel_rx.recv() {
// Read the TM, set sequence counter and message counter, and finally update
// the CRC.
let zero_copy_writer = PusTmZeroCopyWriter::new(&mut tm, MIN_CDS_FIELD_LEN)
.expect("Creating TM zero copy writer failed");
self.common.apply_packet_processing(zero_copy_writer);
self.tm_server_tx
.send(tm.clone())
.expect("Sending TM to server failed");
self.common.sync_tm_tcp_source.add_tm(&tm);
}
}
}

View File

@ -1,7 +1,7 @@
use log::warn;
use satrs_core::pus::{EcssTcAndToken, ReceivesEcssPusTc};
use satrs_core::spacepackets::SpHeader;
use std::sync::mpsc::{Receiver, SendError, Sender, TryRecvError};
use std::sync::mpsc::{self, Receiver, SendError, Sender, TryRecvError};
use thiserror::Error;
use crate::pus::PusReceiver;
@ -18,13 +18,13 @@ pub struct TmArgs {
}
pub struct TcArgs {
pub tc_source: PusTcSource,
pub tc_source: PusTcSourceProviderSharedPool,
pub tc_receiver: Receiver<StoreAddr>,
}
impl TcArgs {
#[allow(dead_code)]
fn split(self) -> (PusTcSource, Receiver<StoreAddr>) {
fn split(self) -> (PusTcSourceProviderSharedPool, Receiver<StoreAddr>) {
(self.tc_source, self.tc_receiver)
}
}
@ -40,11 +40,11 @@ pub enum MpscStoreAndSendError {
}
#[derive(Clone)]
pub struct TcStore {
pub struct SharedTcPool {
pub pool: SharedStaticMemoryPool,
}
impl TcStore {
impl SharedTcPool {
pub fn add_pus_tc(&mut self, pus_tc: &PusTcReader) -> Result<StoreAddr, StoreError> {
let mut pg = self.pool.write().expect("error locking TC store");
let (addr, buf) = pg.free_element(pus_tc.len_packed())?;
@ -53,32 +53,34 @@ impl TcStore {
}
}
pub struct TmFunnel {
pub tm_funnel_rx: Receiver<StoreAddr>,
pub tm_server_tx: Sender<StoreAddr>,
}
#[derive(Clone)]
pub struct PusTcSource {
pub struct PusTcSourceProviderSharedPool {
pub tc_source: Sender<StoreAddr>,
pub tc_store: TcStore,
pub shared_pool: SharedTcPool,
}
impl ReceivesEcssPusTc for PusTcSource {
impl PusTcSourceProviderSharedPool {
#[allow(dead_code)]
pub fn clone_backing_pool(&self) -> SharedStaticMemoryPool {
self.shared_pool.pool.clone()
}
}
impl ReceivesEcssPusTc for PusTcSourceProviderSharedPool {
type Error = MpscStoreAndSendError;
fn pass_pus_tc(&mut self, _: &SpHeader, pus_tc: &PusTcReader) -> Result<(), Self::Error> {
let addr = self.tc_store.add_pus_tc(pus_tc)?;
let addr = self.shared_pool.add_pus_tc(pus_tc)?;
self.tc_source.send(addr)?;
Ok(())
}
}
impl ReceivesCcsdsTc for PusTcSource {
impl ReceivesCcsdsTc for PusTcSourceProviderSharedPool {
type Error = MpscStoreAndSendError;
fn pass_ccsds(&mut self, _: &SpHeader, tc_raw: &[u8]) -> Result<(), Self::Error> {
let mut pool = self.tc_store.pool.write().expect("locking pool failed");
let mut pool = self.shared_pool.pool.write().expect("locking pool failed");
let addr = pool.add(tc_raw)?;
drop(pool);
self.tc_source.send(addr)?;
@ -86,13 +88,35 @@ impl ReceivesCcsdsTc for PusTcSource {
}
}
pub struct TmtcTask {
// Newtype, can not implement necessary traits on MPSC sender directly because of orphan rules.
#[derive(Clone)]
pub struct PusTcSourceProviderDynamic(pub Sender<Vec<u8>>);
impl ReceivesEcssPusTc for PusTcSourceProviderDynamic {
type Error = SendError<Vec<u8>>;
fn pass_pus_tc(&mut self, _: &SpHeader, pus_tc: &PusTcReader) -> Result<(), Self::Error> {
self.0.send(pus_tc.raw_data().to_vec())?;
Ok(())
}
}
impl ReceivesCcsdsTc for PusTcSourceProviderDynamic {
type Error = mpsc::SendError<Vec<u8>>;
fn pass_ccsds(&mut self, _: &SpHeader, tc_raw: &[u8]) -> Result<(), Self::Error> {
self.0.send(tc_raw.to_vec())?;
Ok(())
}
}
pub struct TmtcTaskStatic {
tc_args: TcArgs,
tc_buf: [u8; 4096],
pus_receiver: PusReceiver,
}
impl TmtcTask {
impl TmtcTaskStatic {
pub fn new(tc_args: TcArgs, pus_receiver: PusReceiver) -> Self {
Self {
tc_args,
@ -111,7 +135,7 @@ impl TmtcTask {
let pool = self
.tc_args
.tc_source
.tc_store
.shared_pool
.pool
.read()
.expect("locking tc pool failed");
@ -146,3 +170,50 @@ impl TmtcTask {
}
}
}
pub struct TmtcTaskDynamic {
pub tc_receiver: Receiver<Vec<u8>>,
pus_receiver: PusReceiver,
}
impl TmtcTaskDynamic {
pub fn new(tc_receiver: Receiver<Vec<u8>>, pus_receiver: PusReceiver) -> Self {
Self {
tc_receiver,
pus_receiver,
}
}
pub fn periodic_operation(&mut self) {
self.poll_tc();
}
pub fn poll_tc(&mut self) -> bool {
match self.tc_receiver.try_recv() {
Ok(tc) => match PusTcReader::new(&tc) {
Ok((pus_tc, _)) => {
self.pus_receiver
.handle_tc_packet(
satrs_core::pus::TcInMemory::Vec(tc.clone()),
pus_tc.service(),
&pus_tc,
)
.ok();
true
}
Err(e) => {
warn!("error creating PUS TC from raw data: {e}");
warn!("raw data: {:x?}", tc);
true
}
},
Err(e) => match e {
TryRecvError::Empty => false,
TryRecvError::Disconnected => {
warn!("tmtc thread: sender disconnected");
false
}
},
}
}
}

View File

@ -1,4 +1,7 @@
use std::{net::SocketAddr, sync::mpsc::Receiver};
use std::{
net::{SocketAddr, UdpSocket},
sync::mpsc::Receiver,
};
use log::{info, warn};
use satrs_core::{
@ -7,45 +10,17 @@ use satrs_core::{
tmtc::CcsdsError,
};
use crate::tmtc::MpscStoreAndSendError;
pub trait UdpTmHandler {
fn send_tm_to_udp_client(&mut self, socket: &UdpSocket, recv_addr: &SocketAddr);
}
pub struct UdpTmtcServer {
pub udp_tc_server: UdpTcServer<CcsdsError<MpscStoreAndSendError>>,
pub struct StaticUdpTmHandler {
pub tm_rx: Receiver<StoreAddr>,
pub tm_store: SharedStaticMemoryPool,
}
impl UdpTmtcServer {
pub fn periodic_operation(&mut self) {
while self.poll_tc_server() {}
if let Some(recv_addr) = self.udp_tc_server.last_sender() {
self.send_tm_to_udp_client(&recv_addr);
}
}
fn poll_tc_server(&mut self) -> bool {
match self.udp_tc_server.try_recv_tc() {
Ok(_) => true,
Err(e) => match e {
ReceiveResult::ReceiverError(e) => match e {
CcsdsError::ByteConversionError(e) => {
warn!("packet error: {e:?}");
true
}
CcsdsError::CustomError(e) => {
warn!("mpsc store and send error {e:?}");
true
}
},
ReceiveResult::IoError(e) => {
warn!("IO error {e}");
false
}
ReceiveResult::NothingReceived => false,
},
}
}
fn send_tm_to_udp_client(&mut self, recv_addr: &SocketAddr) {
impl UdpTmHandler for StaticUdpTmHandler {
fn send_tm_to_udp_client(&mut self, socket: &UdpSocket, &recv_addr: &SocketAddr) {
while let Ok(addr) = self.tm_rx.try_recv() {
let store_lock = self.tm_store.write();
if store_lock.is_err() {
@ -67,10 +42,181 @@ impl UdpTmtcServer {
} else {
info!("Sending PUS TM");
}
let result = self.udp_tc_server.socket.send_to(buf, recv_addr);
let result = socket.send_to(buf, recv_addr);
if let Err(e) = result {
warn!("Sending TM with UDP socket failed: {e}")
}
}
}
}
pub struct DynamicUdpTmHandler {
pub tm_rx: Receiver<Vec<u8>>,
}
impl UdpTmHandler for DynamicUdpTmHandler {
fn send_tm_to_udp_client(&mut self, socket: &UdpSocket, recv_addr: &SocketAddr) {
while let Ok(tm) = self.tm_rx.try_recv() {
if tm.len() > 9 {
let service = tm[7];
let subservice = tm[8];
info!("Sending PUS TM[{service},{subservice}]")
} else {
info!("Sending PUS TM");
}
let result = socket.send_to(&tm, recv_addr);
if let Err(e) = result {
warn!("Sending TM with UDP socket failed: {e}")
}
}
}
}
pub struct UdpTmtcServer<TmHandler: UdpTmHandler, SendError> {
pub udp_tc_server: UdpTcServer<CcsdsError<SendError>>,
pub tm_handler: TmHandler,
}
impl<TmHandler: UdpTmHandler, SendError: core::fmt::Debug + 'static>
UdpTmtcServer<TmHandler, SendError>
{
pub fn periodic_operation(&mut self) {
while self.poll_tc_server() {}
if let Some(recv_addr) = self.udp_tc_server.last_sender() {
self.tm_handler
.send_tm_to_udp_client(&self.udp_tc_server.socket, &recv_addr);
}
}
fn poll_tc_server(&mut self) -> bool {
match self.udp_tc_server.try_recv_tc() {
Ok(_) => true,
Err(e) => match e {
ReceiveResult::ReceiverError(e) => match e {
CcsdsError::ByteConversionError(e) => {
warn!("packet error: {e:?}");
true
}
CcsdsError::CustomError(e) => {
warn!("mpsc custom error {e:?}");
true
}
},
ReceiveResult::IoError(e) => {
warn!("IO error {e}");
false
}
ReceiveResult::NothingReceived => false,
},
}
}
}
#[cfg(test)]
mod tests {
use std::{
collections::VecDeque,
net::IpAddr,
sync::{Arc, Mutex},
};
use satrs_core::{
spacepackets::{
ecss::{tc::PusTcCreator, WritablePusPacket},
SpHeader,
},
tmtc::ReceivesTcCore,
};
use satrs_example::config::{OBSW_SERVER_ADDR, PUS_APID};
use super::*;
#[derive(Default, Debug, Clone)]
pub struct TestReceiver {
tc_vec: Arc<Mutex<VecDeque<Vec<u8>>>>,
}
impl ReceivesTcCore for TestReceiver {
type Error = CcsdsError<()>;
fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> {
self.tc_vec.lock().unwrap().push_back(tc_raw.to_vec());
Ok(())
}
}
#[derive(Default, Debug, Clone)]
pub struct TestTmHandler {
addrs_to_send_to: Arc<Mutex<VecDeque<SocketAddr>>>,
}
impl UdpTmHandler for TestTmHandler {
fn send_tm_to_udp_client(&mut self, _socket: &UdpSocket, recv_addr: &SocketAddr) {
self.addrs_to_send_to.lock().unwrap().push_back(*recv_addr);
}
}
#[test]
fn test_basic() {
let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), 0);
let test_receiver = TestReceiver::default();
let tc_queue = test_receiver.tc_vec.clone();
let udp_tc_server = UdpTcServer::new(sock_addr, 2048, Box::new(test_receiver)).unwrap();
let tm_handler = TestTmHandler::default();
let tm_handler_calls = tm_handler.addrs_to_send_to.clone();
let mut udp_dyn_server = UdpTmtcServer {
udp_tc_server,
tm_handler,
};
udp_dyn_server.periodic_operation();
assert!(tc_queue.lock().unwrap().is_empty());
assert!(tm_handler_calls.lock().unwrap().is_empty());
}
#[test]
fn test_transactions() {
let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), 0);
let test_receiver = TestReceiver::default();
let tc_queue = test_receiver.tc_vec.clone();
let udp_tc_server = UdpTcServer::new(sock_addr, 2048, Box::new(test_receiver)).unwrap();
let server_addr = udp_tc_server.socket.local_addr().unwrap();
let tm_handler = TestTmHandler::default();
let tm_handler_calls = tm_handler.addrs_to_send_to.clone();
let mut udp_dyn_server = UdpTmtcServer {
udp_tc_server,
tm_handler,
};
let mut sph = SpHeader::tc_unseg(PUS_APID, 0, 0).unwrap();
let ping_tc = PusTcCreator::new_simple(&mut sph, 17, 1, None, true)
.to_vec()
.unwrap();
let client = UdpSocket::bind("127.0.0.1:0").expect("Connecting to UDP server failed");
let client_addr = client.local_addr().unwrap();
client.connect(server_addr).unwrap();
client.send(&ping_tc).unwrap();
udp_dyn_server.periodic_operation();
{
let mut tc_queue = tc_queue.lock().unwrap();
assert!(!tc_queue.is_empty());
let received_tc = tc_queue.pop_front().unwrap();
assert_eq!(received_tc, ping_tc);
}
{
let mut tm_handler_calls = tm_handler_calls.lock().unwrap();
assert!(!tm_handler_calls.is_empty());
assert_eq!(tm_handler_calls.len(), 1);
let received_addr = tm_handler_calls.pop_front().unwrap();
assert_eq!(received_addr, client_addr);
}
udp_dyn_server.periodic_operation();
assert!(tc_queue.lock().unwrap().is_empty());
// Still tries to send to the same client.
{
let mut tm_handler_calls = tm_handler_calls.lock().unwrap();
assert!(!tm_handler_calls.is_empty());
assert_eq!(tm_handler_calls.len(), 1);
let received_addr = tm_handler_calls.pop_front().unwrap();
assert_eq!(received_addr, client_addr);
}
}
}