start cleaning up example app
All checks were successful
Rust/sat-rs/pipeline/head This commit looks good

This commit is contained in:
Robin Müller 2024-02-05 16:42:10 +01:00
parent 8a5b81b67f
commit bb21345f01
Signed by: muellerr
GPG Key ID: A649FB78196E3849
6 changed files with 227 additions and 150 deletions

View File

@ -337,7 +337,7 @@ mod alloc_mod {
}
impl StaticPoolConfig {
pub fn new(cfg: Vec<(NumBlocks, usize)>) -> Self {
pub const fn new(cfg: Vec<(NumBlocks, usize)>) -> Self {
StaticPoolConfig { cfg }
}

112
satrs-example/src/acs.rs Normal file
View File

@ -0,0 +1,112 @@
use std::sync::mpsc::{self, TryRecvError};
use log::{info, warn};
use satrs_core::pool::StoreAddr;
use satrs_core::pus::verification::VerificationReporterWithSender;
use satrs_core::spacepackets::ecss::hk::Subservice as HkSubservice;
use satrs_core::tmtc::tm_helper::SharedTmStore;
use satrs_core::{
hk::HkRequest,
spacepackets::{
ecss::tm::{PusTmCreator, PusTmSecondaryHeader},
time::cds::{DaysLen16Bits, TimeProvider},
SequenceFlags, SpHeader,
},
};
use satrs_example::{RequestTargetId, PUS_APID};
use crate::{
hk::{AcsHkIds, HkUniqueId},
requests::{Request, RequestWithToken},
update_time,
};
pub struct AcsTask {
timestamp: [u8; 7],
time_provider: TimeProvider<DaysLen16Bits>,
tm_store: SharedTmStore,
tm_funnel: mpsc::Sender<StoreAddr>,
request_rx: mpsc::Receiver<RequestWithToken>,
verif_reporter: VerificationReporterWithSender,
}
impl AcsTask {
pub fn new(
tm_store: SharedTmStore,
tm_funnel: mpsc::Sender<StoreAddr>,
request_rx: mpsc::Receiver<RequestWithToken>,
verif_reporter: VerificationReporterWithSender,
) -> Self {
Self {
timestamp: [0; 7],
time_provider: TimeProvider::new_with_u16_days(0, 0),
tm_store,
tm_funnel,
request_rx,
verif_reporter,
}
}
fn handle_hk_request(&mut self, target_id: u32, unique_id: u32) {
assert_eq!(target_id, RequestTargetId::AcsSubsystem as u32);
if unique_id == AcsHkIds::TestMgmSet as u32 {
let mut sp_header = SpHeader::tm(PUS_APID, SequenceFlags::Unsegmented, 0, 0).unwrap();
let sec_header = PusTmSecondaryHeader::new_simple(
3,
HkSubservice::TmHkPacket as u8,
&self.timestamp,
);
let mut buf: [u8; 8] = [0; 8];
let hk_id = HkUniqueId::new(target_id, unique_id);
hk_id.write_to_be_bytes(&mut buf).unwrap();
let pus_tm = PusTmCreator::new(&mut sp_header, sec_header, &buf, true);
let addr = self
.tm_store
.add_pus_tm(&pus_tm)
.expect("Adding PUS TM failed");
self.tm_funnel.send(addr).expect("Sending HK TM failed");
}
// TODO: Verification failure for invalid unique IDs.
}
pub fn periodic_operation(&mut self) {
match self.request_rx.try_recv() {
Ok(request) => {
info!(
"ACS thread: Received HK request {:?}",
request.targeted_request
);
update_time(&mut self.time_provider, &mut self.timestamp);
match request.targeted_request.request {
Request::Hk(hk_req) => match hk_req {
HkRequest::OneShot(unique_id) => self.handle_hk_request(
request.targeted_request.target_id_with_apid.target_id(),
unique_id,
),
HkRequest::Enable(_) => {}
HkRequest::Disable(_) => {}
HkRequest::ModifyCollectionInterval(_, _) => {}
},
Request::Mode(_mode_req) => {
warn!("mode request handling not implemented yet")
}
Request::Action(_action_req) => {
warn!("action request handling not implemented yet")
}
}
let started_token = self
.verif_reporter
.start_success(request.token, Some(&self.timestamp))
.expect("Sending start success failed");
self.verif_reporter
.completion_success(started_token, Some(&self.timestamp))
.expect("Sending completion success failed");
}
Err(e) => match e {
TryRecvError::Empty => {}
TryRecvError::Disconnected => {
warn!("ACS thread: Message Queue TX disconnected!")
}
},
}
}
}

View File

@ -1,18 +1,21 @@
mod acs;
mod ccsds;
mod hk;
mod logger;
mod pus;
mod requests;
mod tcp;
mod tm_funnel;
mod tmtc;
mod udp;
use log::{info, warn};
use crate::tm_funnel::TmFunnel;
use log::info;
use satrs_core::hal::std::tcp_server::ServerConfig;
use satrs_core::hal::std::udp_server::UdpTcServer;
use crate::acs::AcsTask;
use crate::ccsds::CcsdsReceiver;
use crate::hk::{AcsHkIds, HkUniqueId};
use crate::logger::setup_logger;
use crate::pus::action::{Pus8Wrapper, PusService8ActionHandler};
use crate::pus::event::Pus5Wrapper;
@ -20,22 +23,20 @@ use crate::pus::hk::{Pus3Wrapper, PusService3HkHandler};
use crate::pus::scheduler::Pus11Wrapper;
use crate::pus::test::Service17CustomWrapper;
use crate::pus::{PusReceiver, PusTcMpscRouter};
use crate::requests::{Request, RequestWithToken};
use crate::requests::RequestWithToken;
use crate::tcp::{SyncTcpTmSource, TcpTask};
use crate::tmtc::{PusTcSource, TcArgs, TcStore, TmArgs, TmFunnel, TmtcTask};
use crate::tmtc::{PusTcSource, TcArgs, TcStore, TmArgs, TmtcTask};
use crate::udp::UdpTmtcServer;
use satrs_core::event_man::{
EventManagerWithMpscQueue, MpscEventReceiver, MpscEventU32SendProvider, SendEventProvider,
};
use satrs_core::events::EventU32;
use satrs_core::hk::HkRequest;
use satrs_core::pool::{PoolProviderMemInPlace, StaticMemoryPool, StaticPoolConfig};
use satrs_core::pool::{StaticMemoryPool, StaticPoolConfig};
use satrs_core::pus::event_man::{
DefaultPusMgmtBackendProvider, EventReporter, EventRequest, EventRequestWithToken,
PusEventDispatcher,
};
use satrs_core::pus::event_srv::PusService5EventHandler;
use satrs_core::pus::hk::Subservice as HkSubservice;
use satrs_core::pus::scheduler::PusScheduler;
use satrs_core::pus::scheduler_srv::PusService11SchedHandler;
use satrs_core::pus::test::PusService17TestHandler;
@ -45,12 +46,7 @@ use satrs_core::pus::verification::{
use satrs_core::pus::{
EcssTcInSharedStoreConverter, MpscTcReceiver, MpscTmInStoreSender, PusServiceHelper,
};
use satrs_core::seq_count::{CcsdsSimpleSeqCountProvider, SequenceCountProviderCore};
use satrs_core::spacepackets::ecss::tm::{PusTmCreator, PusTmZeroCopyWriter};
use satrs_core::spacepackets::{
ecss::tm::PusTmSecondaryHeader, time::cds::TimeProvider, time::TimeWriter, SequenceFlags,
SpHeader,
};
use satrs_core::spacepackets::{time::cds::TimeProvider, time::TimeWriter};
use satrs_core::tmtc::tm_helper::SharedTmStore;
use satrs_core::tmtc::{CcsdsDistributor, TargetId};
use satrs_core::ChannelId;
@ -60,7 +56,7 @@ use satrs_example::{
};
use std::collections::HashMap;
use std::net::{IpAddr, SocketAddr};
use std::sync::mpsc::{channel, TryRecvError};
use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock};
use std::thread;
use std::time::Duration;
@ -98,8 +94,6 @@ fn main() {
(15, 2048),
]));
let seq_count_provider = CcsdsSimpleSeqCountProvider::new();
let mut msg_counter_map: HashMap<u8, u16> = HashMap::new();
let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT);
let (tc_source_tx, tc_source_rx) = channel();
let (tm_funnel_tx, tm_funnel_rx) = channel();
@ -161,9 +155,6 @@ fn main() {
tm_udp_server_rx: tm_server_rx,
};
let aocs_tm_funnel = tm_funnel_tx.clone();
let aocs_tm_store = shared_tm_store.clone();
let (pus_test_tx, pus_test_rx) = channel();
let (pus_event_tx, pus_event_rx) = channel();
let (pus_sched_tx, pus_sched_rx) = channel();
@ -304,6 +295,30 @@ fn main() {
tm_store: tm_args.tm_store.clone_backing_pool(),
};
let mut acs_task = AcsTask::new(
shared_tm_store.clone(),
tm_funnel_tx.clone(),
acs_thread_rx,
reporter_aocs,
);
let tcp_ccsds_distributor = CcsdsDistributor::new(Box::new(ccsds_receiver));
let tcp_server_cfg = ServerConfig::new(sock_addr, Duration::from_millis(400), 4096, 8192);
let sync_tm_tcp_source = SyncTcpTmSource::new(200);
let mut tcp_server = TcpTask::new(
tcp_server_cfg,
sync_tm_tcp_source.clone(),
tcp_ccsds_distributor,
)
.expect("tcp server creation failed");
let mut tm_funnel = TmFunnel::new(
shared_tm_store,
sync_tm_tcp_source,
tm_funnel_rx,
tm_server_tx,
);
info!("Starting TMTC and UDP task");
let jh_udp_tmtc = thread::Builder::new()
.name("TMTC and UDP".to_string())
@ -317,15 +332,6 @@ fn main() {
})
.unwrap();
let tcp_ccsds_distributor = CcsdsDistributor::new(Box::new(ccsds_receiver));
let tcp_server_cfg = ServerConfig::new(sock_addr, Duration::from_millis(400), 4096, 8192);
let mut sync_tm_tcp_source = SyncTcpTmSource::new(200);
let mut tcp_server = TcpTask::new(
tcp_server_cfg,
sync_tm_tcp_source.clone(),
tcp_ccsds_distributor,
)
.expect("tcp server creation failed");
info!("Starting TCP task");
let jh_tcp = thread::Builder::new()
.name("TCP".to_string())
@ -340,43 +346,8 @@ fn main() {
info!("Starting TM funnel task");
let jh1 = thread::Builder::new()
.name("TM Funnel".to_string())
.spawn(move || {
let tm_funnel = TmFunnel {
tm_server_tx,
tm_funnel_rx,
};
loop {
if let Ok(addr) = tm_funnel.tm_funnel_rx.recv() {
// Read the TM, set sequence counter and message counter, and finally update
// the CRC.
let shared_pool = shared_tm_store.clone_backing_pool();
let mut pool_guard = shared_pool.write().expect("Locking TM pool failed");
let tm_raw = pool_guard
.modify(&addr)
.expect("Reading TM from pool failed");
let mut zero_copy_writer = PusTmZeroCopyWriter::new(tm_raw)
.expect("Creating TM zero copy writer failed");
zero_copy_writer.set_apid(PUS_APID);
zero_copy_writer.set_seq_count(seq_count_provider.get_and_increment());
let entry = msg_counter_map
.entry(zero_copy_writer.service())
.or_insert(0);
zero_copy_writer.set_msg_count(*entry);
if *entry == u16::MAX {
*entry = 0;
} else {
*entry += 1;
}
// This operation has to come last!
zero_copy_writer.finish();
tm_funnel
.tm_server_tx
.send(addr)
.expect("Sending TM to server failed");
sync_tm_tcp_source.add_tm(tm_raw);
}
}
.spawn(move || loop {
tm_funnel.operation();
})
.unwrap();
@ -442,84 +413,9 @@ fn main() {
info!("Starting AOCS thread");
let jh3 = thread::Builder::new()
.name("AOCS".to_string())
.spawn(move || {
let mut timestamp: [u8; 7] = [0; 7];
let mut time_provider = TimeProvider::new_with_u16_days(0, 0);
loop {
// TODO: Move this into a separate function/task/module..
match acs_thread_rx.try_recv() {
Ok(request) => {
info!(
"ACS thread: Received HK request {:?}",
request.targeted_request
);
update_time(&mut time_provider, &mut timestamp);
match request.targeted_request.request {
Request::Hk(hk_req) => match hk_req {
HkRequest::OneShot(unique_id) => {
// TODO: We should check whether the unique ID is even valid.
let target = request.targeted_request.target_id;
assert_eq!(
target.target_id(),
RequestTargetId::AcsSubsystem as u32
);
if request.targeted_request.target_id.target
== AcsHkIds::TestMgmSet as u32
{
let mut sp_header = SpHeader::tm(
PUS_APID,
SequenceFlags::Unsegmented,
0,
0,
)
.unwrap();
let sec_header = PusTmSecondaryHeader::new_simple(
3,
HkSubservice::TmHkPacket as u8,
&timestamp,
);
let mut buf: [u8; 8] = [0; 8];
let hk_id = HkUniqueId::new(target.target_id(), unique_id);
hk_id.write_to_be_bytes(&mut buf).unwrap();
let pus_tm = PusTmCreator::new(
&mut sp_header,
sec_header,
&buf,
true,
);
let addr = aocs_tm_store
.add_pus_tm(&pus_tm)
.expect("Adding PUS TM failed");
aocs_tm_funnel.send(addr).expect("Sending HK TM failed");
}
}
HkRequest::Enable(_) => {}
HkRequest::Disable(_) => {}
HkRequest::ModifyCollectionInterval(_, _) => {}
},
Request::Mode(_mode_req) => {
warn!("mode request handling not implemented yet")
}
Request::Action(_action_req) => {
warn!("action request handling not implemented yet")
}
}
let started_token = reporter_aocs
.start_success(request.token, Some(&timestamp))
.expect("Sending start success failed");
reporter_aocs
.completion_success(started_token, Some(&timestamp))
.expect("Sending completion success failed");
}
Err(e) => match e {
TryRecvError::Empty => {}
TryRecvError::Disconnected => {
warn!("ACS thread: Message Queue TX disconnected!")
}
},
}
thread::sleep(Duration::from_millis(500));
}
.spawn(move || loop {
acs_task.periodic_operation();
thread::sleep(Duration::from_millis(500));
})
.unwrap();

View File

@ -22,7 +22,7 @@ pub enum Request {
#[derive(Clone, Eq, PartialEq, Debug, new)]
pub struct TargetedRequest {
pub(crate) target_id: TargetIdWithApid,
pub(crate) target_id_with_apid: TargetIdWithApid,
pub(crate) request: Request,
}

View File

@ -0,0 +1,74 @@
use std::{
collections::HashMap,
sync::mpsc::{Receiver, Sender},
};
use satrs_core::{
pool::{PoolProviderMemInPlace, StoreAddr},
seq_count::{CcsdsSimpleSeqCountProvider, SequenceCountProviderCore},
spacepackets::ecss::tm::PusTmZeroCopyWriter,
tmtc::tm_helper::SharedTmStore,
};
use satrs_example::PUS_APID;
use crate::tcp::SyncTcpTmSource;
pub struct TmFunnel {
shared_tm_store: SharedTmStore,
seq_count_provider: CcsdsSimpleSeqCountProvider,
sync_tm_tcp_source: SyncTcpTmSource,
msg_counter_map: HashMap<u8, u16>,
tm_funnel_rx: Receiver<StoreAddr>,
tm_server_tx: Sender<StoreAddr>,
}
impl TmFunnel {
pub fn new(
shared_tm_store: SharedTmStore,
sync_tm_tcp_source: SyncTcpTmSource,
tm_funnel_rx: Receiver<StoreAddr>,
tm_server_tx: Sender<StoreAddr>,
) -> Self {
Self {
shared_tm_store,
seq_count_provider: CcsdsSimpleSeqCountProvider::new(),
msg_counter_map: HashMap::new(),
sync_tm_tcp_source,
tm_funnel_rx,
tm_server_tx,
}
}
pub fn operation(&mut self) {
if let Ok(addr) = self.tm_funnel_rx.recv() {
// Read the TM, set sequence counter and message counter, and finally update
// the CRC.
let shared_pool = self.shared_tm_store.clone_backing_pool();
let mut pool_guard = shared_pool.write().expect("Locking TM pool failed");
let tm_raw = pool_guard
.modify(&addr)
.expect("Reading TM from pool failed");
let mut zero_copy_writer =
PusTmZeroCopyWriter::new(tm_raw).expect("Creating TM zero copy writer failed");
zero_copy_writer.set_apid(PUS_APID);
zero_copy_writer.set_seq_count(self.seq_count_provider.get_and_increment());
let entry = self
.msg_counter_map
.entry(zero_copy_writer.service())
.or_insert(0);
zero_copy_writer.set_msg_count(*entry);
if *entry == u16::MAX {
*entry = 0;
} else {
*entry += 1;
}
// This operation has to come last!
zero_copy_writer.finish();
self.tm_server_tx
.send(addr)
.expect("Sending TM to server failed");
self.sync_tm_tcp_source.add_tm(tm_raw);
}
}
}

View File

@ -53,11 +53,6 @@ impl TcStore {
}
}
pub struct TmFunnel {
pub tm_funnel_rx: Receiver<StoreAddr>,
pub tm_server_tx: Sender<StoreAddr>,
}
#[derive(Clone)]
pub struct PusTcSource {
pub tc_source: Sender<StoreAddr>,