sat-rs/satrs-example/src/main.rs

426 lines
17 KiB
Rust
Raw Normal View History

2022-09-03 13:47:25 +02:00
mod ccsds;
2022-12-19 17:03:26 +01:00
mod hk;
2023-02-15 22:30:32 +01:00
mod logging;
2022-08-29 01:33:32 +02:00
mod pus;
2022-12-19 17:03:26 +01:00
mod requests;
2022-08-29 01:33:32 +02:00
mod tmtc;
2023-02-15 22:30:32 +01:00
use log::{info, warn};
use std::collections::hash_map::Entry;
2023-02-15 22:30:32 +01:00
2023-02-14 15:53:14 +01:00
use crate::hk::AcsHkIds;
2023-02-15 22:30:32 +01:00
use crate::logging::setup_logger;
2023-07-05 14:25:51 +02:00
use crate::pus::event::Pus5Wrapper;
2023-07-05 11:58:43 +02:00
use crate::pus::scheduler::Pus11Wrapper;
2023-07-05 11:25:23 +02:00
use crate::pus::test::Service17CustomWrapper;
2023-07-04 18:51:54 +02:00
use crate::pus::PusTcMpscRouter;
2022-12-22 09:26:00 +01:00
use crate::requests::{Request, RequestWithToken};
use crate::tmtc::{
2023-07-04 15:17:43 +02:00
core_tmtc_task, OtherArgs, PusTcSource, TcArgs, TcStore, TmArgs, TmFunnel, PUS_APID,
};
2022-11-20 19:54:14 +01:00
use satrs_core::event_man::{
EventManagerWithMpscQueue, MpscEventReceiver, MpscEventU32SendProvider, SendEventProvider,
};
2022-11-20 19:54:14 +01:00
use satrs_core::events::EventU32;
2023-02-14 15:53:14 +01:00
use satrs_core::hk::HkRequest;
2023-02-27 17:00:21 +01:00
use satrs_core::pool::{LocalPool, PoolCfg};
2022-11-20 19:54:14 +01:00
use satrs_core::pus::event_man::{
2022-11-13 21:07:16 +01:00
DefaultPusMgmtBackendProvider, EventReporter, EventRequest, EventRequestWithToken,
PusEventDispatcher,
};
2023-07-05 14:25:51 +02:00
use satrs_core::pus::event_srv::PusService5EventHandler;
2023-02-04 15:35:18 +01:00
use satrs_core::pus::hk::Subservice as HkSubservice;
2023-07-05 11:58:43 +02:00
use satrs_core::pus::scheduler::PusScheduler;
use satrs_core::pus::scheduler_srv::PusService11SchedHandler;
2023-07-05 11:25:23 +02:00
use satrs_core::pus::test::PusService17TestHandler;
2022-11-21 10:28:31 +01:00
use satrs_core::pus::verification::{
MpscVerifSender, VerificationReporterCfg, VerificationReporterWithSender,
};
2023-07-03 00:42:20 +02:00
use satrs_core::pus::MpscTmtcInStoreSender;
use satrs_core::seq_count::{SeqCountProviderSimple, SequenceCountProviderCore};
use satrs_core::spacepackets::ecss::{PusPacket, SerializablePusPacket};
2023-01-10 17:16:57 +01:00
use satrs_core::spacepackets::{
time::cds::TimeProvider,
time::TimeWriter,
tm::{PusTm, PusTmSecondaryHeader},
SequenceFlags, SpHeader,
};
2023-07-05 11:25:23 +02:00
use satrs_core::tmtc::tm_helper::SharedTmStore;
2023-02-27 13:44:24 +01:00
use satrs_core::tmtc::AddressableId;
2023-07-05 11:25:23 +02:00
use satrs_example::{RequestTargetId, OBSW_SERVER_ADDR, SERVER_PORT};
2022-12-19 17:03:26 +01:00
use std::collections::HashMap;
2022-08-29 01:33:32 +02:00
use std::net::{IpAddr, SocketAddr};
2022-12-21 09:47:27 +01:00
use std::sync::mpsc::{channel, TryRecvError};
2023-07-05 11:25:23 +02:00
use std::sync::{Arc, RwLock};
2022-08-29 01:33:32 +02:00
use std::thread;
2022-12-20 15:33:00 +01:00
use std::time::Duration;
2022-08-29 01:33:32 +02:00
fn main() {
2023-02-15 22:30:32 +01:00
setup_logger().expect("setting up logging with fern failed");
2022-09-03 13:47:25 +02:00
println!("Running OBSW example");
let tm_pool = LocalPool::new(PoolCfg::new(vec![
(30, 32),
(15, 64),
(15, 128),
(15, 256),
(15, 1024),
(15, 2048),
]));
2023-07-04 15:17:43 +02:00
let tm_store = SharedTmStore::new(Arc::new(RwLock::new(Box::new(tm_pool))));
let tm_store_event = tm_store.clone();
let tc_pool = LocalPool::new(PoolCfg::new(vec![
(30, 32),
(15, 64),
(15, 128),
(15, 256),
(15, 1024),
(15, 2048),
]));
let tc_store = TcStore {
pool: Arc::new(RwLock::new(Box::new(tc_pool))),
};
2022-12-22 09:15:59 +01:00
let seq_count_provider = SeqCountProviderSimple::new();
let mut msg_counter_map: HashMap<u8, u16> = HashMap::new();
let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT);
let (tc_source_tx, tc_source_rx) = channel();
2022-11-13 21:07:16 +01:00
let (tm_funnel_tx, tm_funnel_rx) = channel();
let (tm_server_tx, tm_server_rx) = channel();
let verif_sender = MpscVerifSender::new(
0,
"verif_sender",
2023-07-04 15:17:43 +02:00
tm_store.backing_pool(),
tm_funnel_tx.clone(),
);
let verif_cfg = VerificationReporterCfg::new(PUS_APID, 1, 2, 8).unwrap();
// Every software component which needs to generate verification telemetry, gets a cloned
// verification reporter.
2022-12-22 09:15:59 +01:00
let verif_reporter = VerificationReporterWithSender::new(&verif_cfg, Box::new(verif_sender));
let mut reporter_event_handler = verif_reporter.clone();
let mut reporter_aocs = verif_reporter.clone();
2022-11-13 21:07:16 +01:00
// Create event handling components
// These sender handles are used to send event requests, for example to enable or disable
// certain events
2022-11-13 21:07:16 +01:00
let (event_request_tx, event_request_rx) = channel::<EventRequestWithToken>();
// The sender handle is the primary sender handle for all components which want to create events.
// The event manager will receive the RX handle to receive all the events.
let (event_sender, event_man_rx) = channel();
let event_recv = MpscEventReceiver::<EventU32>::new(event_man_rx);
2023-07-04 21:13:26 +02:00
let test_srv_event_sender = event_sender.clone();
let mut event_man = EventManagerWithMpscQueue::new(Box::new(event_recv));
// All events sent to the manager are routed to the PUS event manager, which generates PUS event
// telemetry for each event.
let event_reporter = EventReporter::new(PUS_APID, 128).unwrap();
let pus_tm_backend = DefaultPusMgmtBackendProvider::<EventU32>::default();
2022-11-13 21:07:16 +01:00
let mut pus_event_dispatcher =
PusEventDispatcher::new(event_reporter, Box::new(pus_tm_backend));
let (pus_event_man_tx, pus_event_man_rx) = channel();
let pus_event_man_send_provider = MpscEventU32SendProvider::new(1, pus_event_man_tx);
event_man.subscribe_all(pus_event_man_send_provider.id());
event_man.add_sender(pus_event_man_send_provider);
2022-11-13 21:07:16 +01:00
// Some request are targetable. This map is used to retrieve sender handles based on a target ID.
2022-12-19 17:03:26 +01:00
let mut request_map = HashMap::new();
2022-12-21 10:23:32 +01:00
let (acs_thread_tx, acs_thread_rx) = channel::<RequestWithToken>();
2022-12-19 17:03:26 +01:00
request_map.insert(RequestTargetId::AcsSubsystem as u32, acs_thread_tx);
2023-07-05 15:12:03 +02:00
let tc_source_wrapper = PusTcSource {
2023-07-04 18:51:54 +02:00
tc_store: tc_store.clone(),
tc_source: tc_source_tx,
};
// Create clones here to allow moving the values
let core_args = OtherArgs {
sock_addr,
2023-07-04 18:51:54 +02:00
verif_reporter: verif_reporter.clone(),
2022-11-13 21:07:16 +01:00
event_sender,
2022-12-19 17:03:26 +01:00
request_map,
2022-11-13 21:07:16 +01:00
};
let tc_args = TcArgs {
2023-07-05 15:12:03 +02:00
tc_source: tc_source_wrapper.clone(),
tc_receiver: tc_source_rx,
};
let tm_args = TmArgs {
tm_store: tm_store.clone(),
tm_sink_sender: tm_funnel_tx.clone(),
tm_server_rx,
};
2022-11-20 18:45:11 +01:00
2022-12-22 09:15:59 +01:00
let aocs_to_funnel = tm_funnel_tx.clone();
let mut aocs_tm_store = tm_store.clone();
2023-07-04 18:51:54 +02:00
let (pus_test_tx, pus_test_rx) = channel();
let (pus_event_tx, pus_event_rx) = channel();
let (pus_sched_tx, pus_sched_rx) = channel();
let (pus_hk_tx, pus_hk_rx) = channel();
let (pus_action_tx, pus_action_rx) = channel();
let pus_router = PusTcMpscRouter {
test_service_receiver: pus_test_tx,
event_service_receiver: pus_event_tx,
sched_service_receiver: pus_sched_tx,
hk_service_receiver: pus_hk_tx,
action_service_receiver: pus_action_tx,
};
2023-07-05 11:25:23 +02:00
let pus17_handler = PusService17TestHandler::new(
2023-07-04 18:51:54 +02:00
pus_test_rx,
tc_store.pool.clone(),
tm_funnel_tx.clone(),
tm_store.clone(),
2023-07-04 22:26:41 +02:00
PUS_APID,
2023-07-05 11:58:43 +02:00
verif_reporter.clone(),
2023-07-04 18:51:54 +02:00
);
2023-07-05 11:58:43 +02:00
let mut pus_17_wrapper = Service17CustomWrapper {
2023-07-04 21:13:26 +02:00
pus17_handler,
test_srv_event_sender,
};
2023-07-05 11:58:43 +02:00
let scheduler = PusScheduler::new_with_current_init_time(Duration::from_secs(5))
.expect("Creating PUS Scheduler failed");
2023-07-05 14:25:51 +02:00
let pus_11_handler = PusService11SchedHandler::new(
2023-07-05 11:58:43 +02:00
pus_sched_rx,
tc_store.pool.clone(),
tm_funnel_tx.clone(),
tm_store.clone(),
PUS_APID,
2023-07-05 14:25:51 +02:00
verif_reporter.clone(),
2023-07-05 11:58:43 +02:00
scheduler,
);
2023-07-05 15:12:03 +02:00
let mut pus_11_wrapper = Pus11Wrapper {
pus_11_handler,
tc_source_wrapper,
};
2023-07-05 14:25:51 +02:00
let pus_5_handler = PusService5EventHandler::new(
pus_event_rx,
tc_store.pool.clone(),
tm_funnel_tx.clone(),
tm_store.clone(),
PUS_APID,
verif_reporter,
event_request_tx,
);
let mut pus_5_wrapper = Pus5Wrapper { pus_5_handler };
2023-07-04 15:17:43 +02:00
2023-02-15 22:30:32 +01:00
info!("Starting TMTC task");
let jh0 = thread::Builder::new()
.name("TMTC".to_string())
.spawn(move || {
2023-07-04 18:51:54 +02:00
core_tmtc_task(core_args, tc_args, tm_args, pus_router);
})
.unwrap();
2022-08-29 01:33:32 +02:00
2023-02-15 22:30:32 +01:00
info!("Starting TM funnel task");
let jh1 = thread::Builder::new()
.name("TM Funnel".to_string())
.spawn(move || {
let mut tm_buf: [u8; 2048] = [0; 2048];
let tm_funnel = TmFunnel {
tm_server_tx,
tm_funnel_rx,
};
loop {
if let Ok(addr) = tm_funnel.tm_funnel_rx.recv() {
// Read the TM, set sequence counter and message counter, and finally write
2023-07-05 17:32:00 +02:00
// it back with the updated CRC.
// We could theoretically manipulate the counters and the CRC directly
// in place as an optimization, but I don't think this is necessary..
let shared_pool = tm_store.backing_pool();
let mut pool_guard = shared_pool.write().expect("Locking TM pool failed");
let tm_raw = pool_guard
.modify(&addr)
.expect("Reading TM from pool failed");
tm_buf[0..tm_raw.len()].copy_from_slice(&tm_raw);
let (mut tm, size) =
PusTm::from_bytes(&tm_buf, 7).expect("Creating TM from raw slice failed");
tm.sp_header.set_apid(PUS_APID);
tm.sp_header
.set_seq_count(seq_count_provider.get_and_increment());
let entry = msg_counter_map.entry(tm.service()).or_insert(0);
tm.sec_header.msg_counter = *entry;
2023-07-05 19:11:09 +02:00
if *entry == u16::MAX {
*entry = 0;
} else {
*entry += 1;
}
tm.calc_crc_on_serialization = true;
tm.write_to_bytes(tm_raw)
.expect("Writing PUS TM back failed");
tm_funnel
.tm_server_tx
.send(addr)
.expect("Sending TM to server failed");
}
2022-08-29 01:33:32 +02:00
}
})
.unwrap();
2023-02-15 22:30:32 +01:00
info!("Starting event handling task");
let jh2 = thread::Builder::new()
.name("Event".to_string())
.spawn(move || {
let mut timestamp: [u8; 7] = [0; 7];
2023-07-04 15:17:43 +02:00
let mut sender = MpscTmtcInStoreSender::new(
1,
"event_sender",
tm_store_event.backing_pool(),
2023-07-04 15:17:43 +02:00
tm_funnel_tx,
);
let mut time_provider = TimeProvider::new_with_u16_days(0, 0);
let mut report_completion = |event_req: EventRequestWithToken, timestamp: &[u8]| {
reporter_event_handler
2023-07-05 14:25:51 +02:00
.completion_success(event_req.token.try_into().unwrap(), Some(timestamp))
.expect("Sending completion success failed");
};
loop {
// handle event requests
if let Ok(event_req) = event_request_rx.try_recv() {
match event_req.request {
EventRequest::Enable(event) => {
pus_event_dispatcher
.enable_tm_for_event(&event)
.expect("Enabling TM failed");
update_time(&mut time_provider, &mut timestamp);
report_completion(event_req, &timestamp);
}
EventRequest::Disable(event) => {
pus_event_dispatcher
.disable_tm_for_event(&event)
.expect("Disabling TM failed");
update_time(&mut time_provider, &mut timestamp);
report_completion(event_req, &timestamp);
}
2022-11-13 21:07:16 +01:00
}
}
// Perform the event routing.
event_man
.try_event_handling()
.expect("event handling failed");
// Perform the generation of PUS event packets
if let Ok((event, _param)) = pus_event_man_rx.try_recv() {
update_time(&mut time_provider, &mut timestamp);
pus_event_dispatcher
.generate_pus_event_tm_generic(&mut sender, &timestamp, event, None)
.expect("Sending TM as event failed");
}
thread::sleep(Duration::from_millis(400));
2022-11-13 21:07:16 +01:00
}
})
.unwrap();
2023-02-15 22:30:32 +01:00
info!("Starting AOCS thread");
let jh3 = thread::Builder::new()
.name("AOCS".to_string())
.spawn(move || {
let mut timestamp: [u8; 7] = [0; 7];
let mut time_provider = TimeProvider::new_with_u16_days(0, 0);
loop {
match acs_thread_rx.try_recv() {
Ok(request) => {
2023-02-27 13:44:24 +01:00
info!(
"ACS thread: Received HK request {:?}",
request.targeted_request
);
update_time(&mut time_provider, &mut timestamp);
2023-02-27 13:44:24 +01:00
match request.targeted_request.request {
Request::HkRequest(hk_req) => match hk_req {
2023-02-27 13:44:24 +01:00
HkRequest::OneShot(unique_id) => {
let target = request.targeted_request.target_id;
assert_eq!(target, RequestTargetId::AcsSubsystem as u32);
if request.targeted_request.target_id
== AcsHkIds::TestMgmSet as u32
{
let mut sp_header = SpHeader::tm(
PUS_APID,
SequenceFlags::Unsegmented,
0,
0,
)
.unwrap();
let sec_header = PusTmSecondaryHeader::new_simple(
3,
HkSubservice::TmHkPacket as u8,
&timestamp,
);
let mut buf: [u8; 8] = [0; 8];
2023-02-27 13:44:24 +01:00
let addressable_id = AddressableId {
target_id: target,
unique_id,
};
addressable_id.write_to_be_bytes(&mut buf).unwrap();
let pus_tm = PusTm::new(
&mut sp_header,
sec_header,
Some(&buf),
true,
);
let addr = aocs_tm_store.add_pus_tm(&pus_tm);
aocs_to_funnel.send(addr).expect("Sending HK TM failed");
}
2022-12-22 09:26:00 +01:00
}
HkRequest::Enable(_) => {}
HkRequest::Disable(_) => {}
HkRequest::ModifyCollectionInterval(_, _) => {}
},
Request::ModeRequest(_mode_req) => {
warn!("mode request handling not implemented yet")
2022-12-22 09:26:00 +01:00
}
2023-02-15 11:19:23 +01:00
}
let started_token = reporter_aocs
2023-02-27 13:44:24 +01:00
.start_success(request.token, Some(&timestamp))
.expect("Sending start success failed");
reporter_aocs
.completion_success(started_token, Some(&timestamp))
.expect("Sending completion success failed");
2022-12-22 09:26:00 +01:00
}
Err(e) => match e {
TryRecvError::Empty => {}
TryRecvError::Disconnected => {
warn!("ACS thread: Message Queue TX disconnected!")
}
},
2022-12-21 09:47:27 +01:00
}
thread::sleep(Duration::from_millis(500));
2022-12-21 10:23:32 +01:00
}
})
.unwrap();
2022-12-20 15:33:00 +01:00
2023-07-04 15:17:43 +02:00
info!("Starting PUS handler thread");
let jh4 = thread::Builder::new()
2023-07-04 18:51:54 +02:00
.name("PUS".to_string())
2023-07-04 21:13:26 +02:00
.spawn(move || loop {
2023-07-05 15:12:03 +02:00
pus_11_wrapper.release_tcs();
loop {
let mut all_queues_empty = true;
let mut is_srv_finished = |srv_handler_finished: bool| {
if !srv_handler_finished {
all_queues_empty = false;
}
};
is_srv_finished(pus_17_wrapper.handle_next_packet());
is_srv_finished(pus_11_wrapper.handle_next_packet());
is_srv_finished(pus_5_wrapper.handle_next_packet());
if all_queues_empty {
break;
2023-07-05 11:58:43 +02:00
}
2023-07-04 18:51:54 +02:00
}
2023-07-05 15:12:03 +02:00
thread::sleep(Duration::from_millis(200));
2023-07-04 18:51:54 +02:00
})
.unwrap();
2022-08-29 01:33:32 +02:00
jh0.join().expect("Joining UDP TMTC server thread failed");
jh1.join().expect("Joining TM Funnel thread failed");
jh2.join().expect("Joining Event Manager thread failed");
2022-12-20 15:33:00 +01:00
jh3.join().expect("Joining AOCS thread failed");
2023-07-04 15:17:43 +02:00
jh4.join().expect("Joining PUS handler thread failed");
2022-08-29 01:33:32 +02:00
}
2022-11-13 21:07:16 +01:00
2022-12-08 15:26:49 +01:00
pub fn update_time(time_provider: &mut TimeProvider, timestamp: &mut [u8]) {
2022-11-13 21:07:16 +01:00
time_provider
.update_from_now()
.expect("Could not get current time");
time_provider
.write_to_bytes(timestamp)
.expect("Writing timestamp failed");
}