Start adding stop logic #3

Merged
muellerr merged 6 commits from stop-logic into main 2024-04-13 10:56:45 +02:00
4 changed files with 84 additions and 19 deletions
Showing only changes of commit 710fc94384 - Show all commits

View File

@ -6,6 +6,8 @@ use satrs_mib::resultcode;
use std::{collections::HashSet, net::Ipv4Addr};
use strum::IntoEnumIterator;
pub const STOP_FILE_NAME: &str = "stop-experiment";
pub const OBSW_SERVER_ADDR: Ipv4Addr = Ipv4Addr::UNSPECIFIED;
pub const SERVER_PORT: u16 = 7301;
@ -124,4 +126,5 @@ pub mod tasks {
pub const FREQ_MS_EVENT_HANDLING: u64 = 400;
pub const FREQ_MS_AOCS: u64 = 500;
pub const FREQ_MS_PUS_STACK: u64 = 200;
pub const FREQ_MS_CTRL: u64 = 400;
}

View File

@ -82,6 +82,7 @@ mod tests {
sync::{Arc, Mutex},
};
use ops_sat_rs::config::{components, OBSW_SERVER_ADDR};
use satrs::{
spacepackets::{
ecss::{tc::PusTcCreator, WritablePusPacket},
@ -89,7 +90,6 @@ mod tests {
},
tmtc::ReceivesTcCore,
};
use ops_sat_rs::config::{components, OBSW_SERVER_ADDR};
use super::*;

View File

@ -1,15 +1,18 @@
use std::{
net::{IpAddr, SocketAddr},
sync::mpsc,
sync::{atomic::AtomicBool, mpsc, Arc},
thread,
time::Duration,
};
use log::info;
use ops_sat_rs::config::tasks::FREQ_MS_PUS_STACK;
use ops_sat_rs::config::{
tasks::FREQ_MS_UDP_TMTC, OBSW_SERVER_ADDR, PACKET_ID_VALIDATOR, SERVER_PORT,
};
use ops_sat_rs::config::{
tasks::{FREQ_MS_CTRL, FREQ_MS_PUS_STACK},
STOP_FILE_NAME,
};
use satrs::{
hal::std::{tcp_server::ServerConfig, udp_server::UdpTcServer},
tmtc::CcsdsDistributor,
@ -36,10 +39,11 @@ mod requests;
mod tm_funnel;
mod tmtc;
#[allow(dead_code)]
fn main() {
setup_logger().expect("setting up logging with fern failed");
println!("OPS-SAT Rust experiment OBSW");
println!("OPS-SAT Rust Experiment OBSW");
let stop_signal = Arc::new(AtomicBool::new(false));
let (tc_source_tx, tc_source_rx) = mpsc::channel();
let (tm_funnel_tx, tm_funnel_rx) = mpsc::channel();
@ -152,6 +156,23 @@ fn main() {
})
.unwrap();
let ctrl_stop_signal = stop_signal.clone();
let jh_ctrl_thread = thread::Builder::new()
.name("CTRL".to_string())
.spawn(move || loop {
// TODO: Check stop file status regularly. If it exists, set the stop signal.
if std::path::Path::new(STOP_FILE_NAME).exists() {
log::warn!(
"Detected stop file name at {}. Initiating experiment shutdown",
STOP_FILE_NAME
);
ctrl_stop_signal.store(true, std::sync::atomic::Ordering::Relaxed);
}
thread::sleep(Duration::from_millis(FREQ_MS_CTRL));
})
.unwrap();
let tcp_stop_signal = stop_signal.clone();
info!("Starting TCP task");
let jh_tcp = thread::Builder::new()
.name("TCP".to_string())
@ -159,27 +180,42 @@ fn main() {
info!("Running TCP server on port {SERVER_PORT}");
loop {
tcp_server.periodic_operation();
if tcp_stop_signal.load(std::sync::atomic::Ordering::Relaxed) {
log::warn!("breaking0");
break;
}
}
})
.unwrap();
info!("Starting TM funnel task");
let funnel_stop_signal = stop_signal.clone();
let jh_tm_funnel = thread::Builder::new()
.name("TM Funnel".to_string())
.spawn(move || loop {
tm_funnel.operation();
if funnel_stop_signal.load(std::sync::atomic::Ordering::Relaxed) {
break;
}
})
.unwrap();
info!("Starting PUS handler thread");
let pus_stop_signal = stop_signal.clone();
let jh_pus_handler = thread::Builder::new()
.name("PUS".to_string())
.spawn(move || loop {
pus_stack.periodic_operation();
if pus_stop_signal.load(std::sync::atomic::Ordering::Relaxed) {
break;
}
thread::sleep(Duration::from_millis(FREQ_MS_PUS_STACK));
})
.unwrap();
jh_ctrl_thread
.join()
.expect("Joining Controller thread failed");
jh_udp_tmtc
.join()
.expect("Joining UDP TMTC server thread failed");

View File

@ -1,7 +1,5 @@
use std::{
collections::HashMap,
sync::mpsc::{self},
};
use std::time::Instant;
use std::{collections::HashMap, sync::mpsc, time::Duration};
use log::info;
use satrs::pus::PusTmAsVec;
@ -76,6 +74,7 @@ impl TmFunnelCommon {
pub struct TmFunnelDynamic {
common: TmFunnelCommon,
last_ctrl_check: Instant,
tm_funnel_rx: mpsc::Receiver<PusTmAsVec>,
tm_server_tx: mpsc::Sender<PusTmAsVec>,
}
@ -90,20 +89,47 @@ impl TmFunnelDynamic {
common: TmFunnelCommon::new(sync_tm_tcp_source),
tm_funnel_rx,
tm_server_tx,
last_ctrl_check: Instant::now(),
}
}
pub fn check_for_ctrl_check(&mut self) -> bool {
if Instant::now() - self.last_ctrl_check > Duration::from_millis(400) {
self.last_ctrl_check = Instant::now();
return true;
}
false
}
pub fn operation(&mut self) {
if let Ok(mut tm) = self.tm_funnel_rx.recv() {
// Read the TM, set sequence counter and message counter, and finally update
// the CRC.
let zero_copy_writer = PusTmZeroCopyWriter::new(&mut tm.packet, MIN_CDS_FIELD_LEN)
.expect("Creating TM zero copy writer failed");
self.common.apply_packet_processing(zero_copy_writer);
self.common.sync_tm_tcp_source.add_tm(&tm.packet);
self.tm_server_tx
.send(tm)
.expect("Sending TM to server failed");
loop {
match self.tm_funnel_rx.recv_timeout(Duration::from_millis(100)) {
Ok(mut tm) => {
// Read the TM, set sequence counter and message counter, and finally update
// the CRC.
let zero_copy_writer =
PusTmZeroCopyWriter::new(&mut tm.packet, MIN_CDS_FIELD_LEN)
.expect("Creating TM zero copy writer failed");
self.common.apply_packet_processing(zero_copy_writer);
self.common.sync_tm_tcp_source.add_tm(&tm.packet);
self.tm_server_tx
.send(tm)
.expect("Sending TM to server failed");
if self.check_for_ctrl_check() {
break;
}
}
Err(e) => match e {
mpsc::RecvTimeoutError::Timeout => {
if self.check_for_ctrl_check() {
break;
}
}
mpsc::RecvTimeoutError::Disconnected => {
log::warn!("All TM funnel senders have disconnected");
}
},
}
}
}
}