Start adding stop logic #3
42
Cargo.lock
generated
42
Cargo.lock
generated
@ -317,6 +317,18 @@ version = "2.7.2"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "6c8640c5d730cb13ebd907d8d04b52f55ac9a2eec55b440c8892f40d56c76c1d"
|
checksum = "6c8640c5d730cb13ebd907d8d04b52f55ac9a2eec55b440c8892f40d56c76c1d"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "mio"
|
||||||
|
version = "0.8.11"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c"
|
||||||
|
dependencies = [
|
||||||
|
"libc",
|
||||||
|
"log",
|
||||||
|
"wasi",
|
||||||
|
"windows-sys 0.48.0",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "nodrop"
|
name = "nodrop"
|
||||||
version = "0.1.14"
|
version = "0.1.14"
|
||||||
@ -424,9 +436,9 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "quote"
|
name = "quote"
|
||||||
version = "1.0.35"
|
version = "1.0.36"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "291ec9ab5efd934aaf503a6466c5d5251535d108ee747472c3977cc5acc868ef"
|
checksum = "0fa76aaf39101c457836aec0ce2316dbdc3ab723cdda1c6bd4e6ad4208acaca7"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"proc-macro2",
|
"proc-macro2",
|
||||||
]
|
]
|
||||||
@ -455,7 +467,7 @@ checksum = "e86697c916019a8588c99b5fac3cead74ec0b4b819707a682fd4d23fa0ce1ba1"
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "satrs"
|
name = "satrs"
|
||||||
version = "0.2.0-rc.0"
|
version = "0.2.0-rc.0"
|
||||||
source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#0fec99402803b69fbba8ac7537197e0c7f1c836f"
|
source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#3375780e008506ba413be22d176aa567db04e09e"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"bus",
|
"bus",
|
||||||
"cobs",
|
"cobs",
|
||||||
@ -465,6 +477,7 @@ dependencies = [
|
|||||||
"downcast-rs",
|
"downcast-rs",
|
||||||
"dyn-clone",
|
"dyn-clone",
|
||||||
"hashbrown",
|
"hashbrown",
|
||||||
|
"mio",
|
||||||
"num-traits",
|
"num-traits",
|
||||||
"num_enum",
|
"num_enum",
|
||||||
"paste",
|
"paste",
|
||||||
@ -479,7 +492,7 @@ dependencies = [
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "satrs-mib"
|
name = "satrs-mib"
|
||||||
version = "0.1.1"
|
version = "0.1.1"
|
||||||
source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#0fec99402803b69fbba8ac7537197e0c7f1c836f"
|
source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#3375780e008506ba413be22d176aa567db04e09e"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"csv",
|
"csv",
|
||||||
"satrs-mib-codegen",
|
"satrs-mib-codegen",
|
||||||
@ -491,7 +504,7 @@ dependencies = [
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "satrs-mib-codegen"
|
name = "satrs-mib-codegen"
|
||||||
version = "0.1.1"
|
version = "0.1.1"
|
||||||
source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#0fec99402803b69fbba8ac7537197e0c7f1c836f"
|
source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#3375780e008506ba413be22d176aa567db04e09e"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"proc-macro2",
|
"proc-macro2",
|
||||||
"quote",
|
"quote",
|
||||||
@ -501,7 +514,7 @@ dependencies = [
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "satrs-shared"
|
name = "satrs-shared"
|
||||||
version = "0.1.3"
|
version = "0.1.3"
|
||||||
source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#0fec99402803b69fbba8ac7537197e0c7f1c836f"
|
source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#3375780e008506ba413be22d176aa567db04e09e"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"serde",
|
"serde",
|
||||||
"spacepackets",
|
"spacepackets",
|
||||||
@ -560,7 +573,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||||||
checksum = "05ffd9c0a93b7543e062e759284fcf5f5e3b098501104bfbdde4d404db792871"
|
checksum = "05ffd9c0a93b7543e062e759284fcf5f5e3b098501104bfbdde4d404db792871"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"libc",
|
"libc",
|
||||||
"windows-sys",
|
"windows-sys 0.52.0",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@ -672,6 +685,12 @@ version = "0.9.4"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
|
checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "wasi"
|
||||||
|
version = "0.11.0+wasi-snapshot-preview1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "wasm-bindgen"
|
name = "wasm-bindgen"
|
||||||
version = "0.2.92"
|
version = "0.2.92"
|
||||||
@ -735,6 +754,15 @@ dependencies = [
|
|||||||
"windows-targets 0.52.4",
|
"windows-targets 0.52.4",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "windows-sys"
|
||||||
|
version = "0.48.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9"
|
||||||
|
dependencies = [
|
||||||
|
"windows-targets 0.48.5",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "windows-sys"
|
name = "windows-sys"
|
||||||
version = "0.52.0"
|
version = "0.52.0"
|
||||||
|
@ -1,8 +0,0 @@
|
|||||||
{
|
|
||||||
"com_if": "tcp",
|
|
||||||
"tcpip_udp_ip_addr": "127.0.0.1",
|
|
||||||
"tcpip_udp_port": 7301,
|
|
||||||
"tcpip_udp_recv_max_size": 1500,
|
|
||||||
"tcpip_tcp_ip_addr": "127.0.0.1",
|
|
||||||
"tcpip_tcp_port": 7301
|
|
||||||
}
|
|
@ -6,6 +6,8 @@ use satrs_mib::resultcode;
|
|||||||
use std::{collections::HashSet, net::Ipv4Addr};
|
use std::{collections::HashSet, net::Ipv4Addr};
|
||||||
use strum::IntoEnumIterator;
|
use strum::IntoEnumIterator;
|
||||||
|
|
||||||
|
pub const STOP_FILE_NAME: &str = "stop-experiment";
|
||||||
|
|
||||||
pub const OBSW_SERVER_ADDR: Ipv4Addr = Ipv4Addr::UNSPECIFIED;
|
pub const OBSW_SERVER_ADDR: Ipv4Addr = Ipv4Addr::UNSPECIFIED;
|
||||||
pub const SERVER_PORT: u16 = 7301;
|
pub const SERVER_PORT: u16 = 7301;
|
||||||
|
|
||||||
@ -124,4 +126,7 @@ pub mod tasks {
|
|||||||
pub const FREQ_MS_EVENT_HANDLING: u64 = 400;
|
pub const FREQ_MS_EVENT_HANDLING: u64 = 400;
|
||||||
pub const FREQ_MS_AOCS: u64 = 500;
|
pub const FREQ_MS_AOCS: u64 = 500;
|
||||||
pub const FREQ_MS_PUS_STACK: u64 = 200;
|
pub const FREQ_MS_PUS_STACK: u64 = 200;
|
||||||
|
pub const FREQ_MS_CTRL: u64 = 400;
|
||||||
|
|
||||||
|
pub const STOP_CHECK_FREQUENCY: u64 = 400;
|
||||||
}
|
}
|
||||||
|
@ -1,11 +1,13 @@
|
|||||||
use std::{
|
use std::{
|
||||||
collections::{HashSet, VecDeque},
|
collections::{HashSet, VecDeque},
|
||||||
sync::{Arc, Mutex},
|
sync::{atomic::AtomicBool, Arc, Mutex},
|
||||||
|
time::Duration,
|
||||||
};
|
};
|
||||||
|
|
||||||
use log::{info, warn};
|
use log::{info, warn};
|
||||||
|
use ops_sat_rs::config::tasks::STOP_CHECK_FREQUENCY;
|
||||||
use satrs::{
|
use satrs::{
|
||||||
hal::std::tcp_server::{ServerConfig, TcpSpacepacketsServer},
|
hal::std::tcp_server::{HandledConnectionHandler, ServerConfig, TcpSpacepacketsServer},
|
||||||
pus::ReceivesEcssPusTc,
|
pus::ReceivesEcssPusTc,
|
||||||
spacepackets::PacketId,
|
spacepackets::PacketId,
|
||||||
tmtc::{CcsdsDistributor, CcsdsError, ReceivesCcsdsTc, TmPacketSourceCore},
|
tmtc::{CcsdsDistributor, CcsdsError, ReceivesCcsdsTc, TmPacketSourceCore},
|
||||||
@ -69,12 +71,25 @@ impl TmPacketSourceCore for SyncTcpTmSource {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
pub struct ConnectionFinishedHandler {}
|
||||||
|
|
||||||
|
impl HandledConnectionHandler for ConnectionFinishedHandler {
|
||||||
|
fn handled_connection(&mut self, info: satrs::hal::std::tcp_server::HandledConnectionInfo) {
|
||||||
|
info!(
|
||||||
|
"Served {} TMs and {} TCs for client {:?}",
|
||||||
|
info.num_sent_tms, info.num_received_tcs, info.addr
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub type TcpServerType<TcSource, MpscErrorType> = TcpSpacepacketsServer<
|
pub type TcpServerType<TcSource, MpscErrorType> = TcpSpacepacketsServer<
|
||||||
(),
|
|
||||||
CcsdsError<MpscErrorType>,
|
|
||||||
SyncTcpTmSource,
|
SyncTcpTmSource,
|
||||||
CcsdsDistributor<CcsdsReceiver<TcSource, MpscErrorType>, MpscErrorType>,
|
CcsdsDistributor<CcsdsReceiver<TcSource, MpscErrorType>, MpscErrorType>,
|
||||||
HashSet<PacketId>,
|
HashSet<PacketId>,
|
||||||
|
ConnectionFinishedHandler,
|
||||||
|
(),
|
||||||
|
CcsdsError<MpscErrorType>,
|
||||||
>;
|
>;
|
||||||
|
|
||||||
pub struct TcpTask<
|
pub struct TcpTask<
|
||||||
@ -102,26 +117,29 @@ impl<
|
|||||||
tm_source: SyncTcpTmSource,
|
tm_source: SyncTcpTmSource,
|
||||||
tc_receiver: CcsdsDistributor<CcsdsReceiver<TcSource, MpscErrorType>, MpscErrorType>,
|
tc_receiver: CcsdsDistributor<CcsdsReceiver<TcSource, MpscErrorType>, MpscErrorType>,
|
||||||
packet_id_lookup: HashSet<PacketId>,
|
packet_id_lookup: HashSet<PacketId>,
|
||||||
|
stop_signal: Arc<AtomicBool>,
|
||||||
) -> Result<Self, std::io::Error> {
|
) -> Result<Self, std::io::Error> {
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
server: TcpSpacepacketsServer::new(cfg, tm_source, tc_receiver, packet_id_lookup)?,
|
server: TcpSpacepacketsServer::new(
|
||||||
|
cfg,
|
||||||
|
tm_source,
|
||||||
|
tc_receiver,
|
||||||
|
packet_id_lookup,
|
||||||
|
ConnectionFinishedHandler::default(),
|
||||||
|
Some(stop_signal),
|
||||||
|
)?,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn periodic_operation(&mut self) {
|
pub fn periodic_operation(&mut self) {
|
||||||
loop {
|
let result = self
|
||||||
let result = self.server.handle_next_connection();
|
.server
|
||||||
|
.handle_next_connection(Some(Duration::from_millis(STOP_CHECK_FREQUENCY)));
|
||||||
match result {
|
match result {
|
||||||
Ok(conn_result) => {
|
Ok(_conn_result) => (),
|
||||||
info!(
|
|
||||||
"Served {} TMs and {} TCs for client {:?}",
|
|
||||||
conn_result.num_sent_tms, conn_result.num_received_tcs, conn_result.addr
|
|
||||||
);
|
|
||||||
}
|
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
warn!("TCP server error: {e:?}");
|
warn!("TCP server error: {e:?}");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
64
src/main.rs
64
src/main.rs
@ -1,15 +1,18 @@
|
|||||||
use std::{
|
use std::{
|
||||||
net::{IpAddr, SocketAddr},
|
net::{IpAddr, SocketAddr},
|
||||||
sync::mpsc,
|
sync::{atomic::AtomicBool, mpsc, Arc},
|
||||||
thread,
|
thread,
|
||||||
time::Duration,
|
time::Duration,
|
||||||
};
|
};
|
||||||
|
|
||||||
use log::info;
|
use log::info;
|
||||||
use ops_sat_rs::config::tasks::FREQ_MS_PUS_STACK;
|
|
||||||
use ops_sat_rs::config::{
|
use ops_sat_rs::config::{
|
||||||
tasks::FREQ_MS_UDP_TMTC, OBSW_SERVER_ADDR, PACKET_ID_VALIDATOR, SERVER_PORT,
|
tasks::FREQ_MS_UDP_TMTC, OBSW_SERVER_ADDR, PACKET_ID_VALIDATOR, SERVER_PORT,
|
||||||
};
|
};
|
||||||
|
use ops_sat_rs::config::{
|
||||||
|
tasks::{FREQ_MS_CTRL, FREQ_MS_PUS_STACK},
|
||||||
|
STOP_FILE_NAME,
|
||||||
|
};
|
||||||
use satrs::{
|
use satrs::{
|
||||||
hal::std::{tcp_server::ServerConfig, udp_server::UdpTcServer},
|
hal::std::{tcp_server::ServerConfig, udp_server::UdpTcServer},
|
||||||
tmtc::CcsdsDistributor,
|
tmtc::CcsdsDistributor,
|
||||||
@ -34,10 +37,11 @@ mod pus;
|
|||||||
mod requests;
|
mod requests;
|
||||||
mod tmtc;
|
mod tmtc;
|
||||||
|
|
||||||
#[allow(dead_code)]
|
|
||||||
fn main() {
|
fn main() {
|
||||||
setup_logger().expect("setting up logging with fern failed");
|
setup_logger().expect("setting up logging with fern failed");
|
||||||
println!("OPS-SAT Rust experiment OBSW");
|
println!("OPS-SAT Rust Experiment OBSW");
|
||||||
|
|
||||||
|
let stop_signal = Arc::new(AtomicBool::new(false));
|
||||||
|
|
||||||
let (tc_source_tx, tc_source_rx) = mpsc::channel();
|
let (tc_source_tx, tc_source_rx) = mpsc::channel();
|
||||||
let (tm_funnel_tx, tm_funnel_rx) = mpsc::channel();
|
let (tm_funnel_tx, tm_funnel_rx) = mpsc::channel();
|
||||||
@ -132,52 +136,94 @@ fn main() {
|
|||||||
sync_tm_tcp_source.clone(),
|
sync_tm_tcp_source.clone(),
|
||||||
tcp_ccsds_distributor,
|
tcp_ccsds_distributor,
|
||||||
PACKET_ID_VALIDATOR.clone(),
|
PACKET_ID_VALIDATOR.clone(),
|
||||||
|
stop_signal.clone(),
|
||||||
)
|
)
|
||||||
.expect("tcp server creation failed");
|
.expect("tcp server creation failed");
|
||||||
|
|
||||||
let mut tm_funnel = TmFunnelDynamic::new(sync_tm_tcp_source, tm_funnel_rx, tm_server_tx);
|
let mut tm_funnel = TmFunnelDynamic::new(
|
||||||
|
sync_tm_tcp_source,
|
||||||
|
tm_funnel_rx,
|
||||||
|
tm_server_tx,
|
||||||
|
stop_signal.clone(),
|
||||||
|
);
|
||||||
|
|
||||||
|
info!("Starting CTRL task");
|
||||||
|
let ctrl_stop_signal = stop_signal.clone();
|
||||||
|
let jh_ctrl_thread = thread::Builder::new()
|
||||||
|
.name("ops-sat ctrl".to_string())
|
||||||
|
.spawn(move || loop {
|
||||||
|
if std::path::Path::new(STOP_FILE_NAME).exists() {
|
||||||
|
log::warn!(
|
||||||
|
"Detected stop file name at {}. Initiating experiment shutdown",
|
||||||
|
STOP_FILE_NAME
|
||||||
|
);
|
||||||
|
ctrl_stop_signal.store(true, std::sync::atomic::Ordering::Relaxed);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
thread::sleep(Duration::from_millis(FREQ_MS_CTRL));
|
||||||
|
})
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
info!("Starting TMTC and UDP task");
|
info!("Starting TMTC and UDP task");
|
||||||
|
let tmtc_stop_signal = stop_signal.clone();
|
||||||
let jh_udp_tmtc = thread::Builder::new()
|
let jh_udp_tmtc = thread::Builder::new()
|
||||||
.name("TMTC and UDP".to_string())
|
.name("ops-sat tmtc-udp".to_string())
|
||||||
.spawn(move || {
|
.spawn(move || {
|
||||||
info!("Running UDP server on port {SERVER_PORT}");
|
info!("Running UDP server on port {SERVER_PORT}");
|
||||||
loop {
|
loop {
|
||||||
udp_tmtc_server.periodic_operation();
|
udp_tmtc_server.periodic_operation();
|
||||||
tmtc_task.periodic_operation();
|
tmtc_task.periodic_operation();
|
||||||
|
if tmtc_stop_signal.load(std::sync::atomic::Ordering::Relaxed) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
thread::sleep(Duration::from_millis(FREQ_MS_UDP_TMTC));
|
thread::sleep(Duration::from_millis(FREQ_MS_UDP_TMTC));
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
let tcp_stop_signal = stop_signal.clone();
|
||||||
info!("Starting TCP task");
|
info!("Starting TCP task");
|
||||||
let jh_tcp = thread::Builder::new()
|
let jh_tcp = thread::Builder::new()
|
||||||
.name("TCP".to_string())
|
.name("ops-sat tcp".to_string())
|
||||||
.spawn(move || {
|
.spawn(move || {
|
||||||
info!("Running TCP server on port {SERVER_PORT}");
|
info!("Running TCP server on port {SERVER_PORT}");
|
||||||
loop {
|
loop {
|
||||||
tcp_server.periodic_operation();
|
tcp_server.periodic_operation();
|
||||||
|
if tcp_stop_signal.load(std::sync::atomic::Ordering::Relaxed) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
info!("Starting TM funnel task");
|
info!("Starting TM funnel task");
|
||||||
|
let funnel_stop_signal = stop_signal.clone();
|
||||||
let jh_tm_funnel = thread::Builder::new()
|
let jh_tm_funnel = thread::Builder::new()
|
||||||
.name("TM Funnel".to_string())
|
.name("ops-sat tm-funnel".to_string())
|
||||||
.spawn(move || loop {
|
.spawn(move || loop {
|
||||||
tm_funnel.operation();
|
tm_funnel.operation();
|
||||||
|
if funnel_stop_signal.load(std::sync::atomic::Ordering::Relaxed) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
info!("Starting PUS handler thread");
|
info!("Starting PUS handler thread");
|
||||||
|
let pus_stop_signal = stop_signal.clone();
|
||||||
let jh_pus_handler = thread::Builder::new()
|
let jh_pus_handler = thread::Builder::new()
|
||||||
.name("PUS".to_string())
|
.name("ops-sat pus".to_string())
|
||||||
.spawn(move || loop {
|
.spawn(move || loop {
|
||||||
pus_stack.periodic_operation();
|
pus_stack.periodic_operation();
|
||||||
|
if pus_stop_signal.load(std::sync::atomic::Ordering::Relaxed) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
thread::sleep(Duration::from_millis(FREQ_MS_PUS_STACK));
|
thread::sleep(Duration::from_millis(FREQ_MS_PUS_STACK));
|
||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
jh_ctrl_thread
|
||||||
|
.join()
|
||||||
|
.expect("Joining Controller thread failed");
|
||||||
jh_udp_tmtc
|
jh_udp_tmtc
|
||||||
.join()
|
.join()
|
||||||
.expect("Joining UDP TMTC server thread failed");
|
.expect("Joining UDP TMTC server thread failed");
|
||||||
|
@ -1,9 +1,9 @@
|
|||||||
use std::{
|
use std::sync::atomic::AtomicBool;
|
||||||
collections::HashMap,
|
use std::sync::Arc;
|
||||||
sync::mpsc::{self},
|
use std::{collections::HashMap, sync::mpsc, time::Duration};
|
||||||
};
|
|
||||||
|
|
||||||
use log::info;
|
use log::info;
|
||||||
|
use ops_sat_rs::config::tasks::STOP_CHECK_FREQUENCY;
|
||||||
use satrs::pus::PusTmAsVec;
|
use satrs::pus::PusTmAsVec;
|
||||||
use satrs::{
|
use satrs::{
|
||||||
seq_count::{CcsdsSimpleSeqCountProvider, SequenceCountProviderCore},
|
seq_count::{CcsdsSimpleSeqCountProvider, SequenceCountProviderCore},
|
||||||
@ -78,6 +78,7 @@ pub struct TmFunnelDynamic {
|
|||||||
common: TmFunnelCommon,
|
common: TmFunnelCommon,
|
||||||
tm_funnel_rx: mpsc::Receiver<PusTmAsVec>,
|
tm_funnel_rx: mpsc::Receiver<PusTmAsVec>,
|
||||||
tm_server_tx: mpsc::Sender<PusTmAsVec>,
|
tm_server_tx: mpsc::Sender<PusTmAsVec>,
|
||||||
|
stop_signal: Arc<AtomicBool>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TmFunnelDynamic {
|
impl TmFunnelDynamic {
|
||||||
@ -85,25 +86,49 @@ impl TmFunnelDynamic {
|
|||||||
sync_tm_tcp_source: SyncTcpTmSource,
|
sync_tm_tcp_source: SyncTcpTmSource,
|
||||||
tm_funnel_rx: mpsc::Receiver<PusTmAsVec>,
|
tm_funnel_rx: mpsc::Receiver<PusTmAsVec>,
|
||||||
tm_server_tx: mpsc::Sender<PusTmAsVec>,
|
tm_server_tx: mpsc::Sender<PusTmAsVec>,
|
||||||
|
stop_signal: Arc<AtomicBool>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
common: TmFunnelCommon::new(sync_tm_tcp_source),
|
common: TmFunnelCommon::new(sync_tm_tcp_source),
|
||||||
tm_funnel_rx,
|
tm_funnel_rx,
|
||||||
tm_server_tx,
|
tm_server_tx,
|
||||||
|
stop_signal,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn operation(&mut self) {
|
pub fn operation(&mut self) {
|
||||||
if let Ok(mut tm) = self.tm_funnel_rx.recv() {
|
loop {
|
||||||
|
match self
|
||||||
|
.tm_funnel_rx
|
||||||
|
.recv_timeout(Duration::from_millis(STOP_CHECK_FREQUENCY))
|
||||||
|
{
|
||||||
|
Ok(mut tm) => {
|
||||||
// Read the TM, set sequence counter and message counter, and finally update
|
// Read the TM, set sequence counter and message counter, and finally update
|
||||||
// the CRC.
|
// the CRC.
|
||||||
let zero_copy_writer = PusTmZeroCopyWriter::new(&mut tm.packet, MIN_CDS_FIELD_LEN)
|
let zero_copy_writer =
|
||||||
|
PusTmZeroCopyWriter::new(&mut tm.packet, MIN_CDS_FIELD_LEN)
|
||||||
.expect("Creating TM zero copy writer failed");
|
.expect("Creating TM zero copy writer failed");
|
||||||
self.common.apply_packet_processing(zero_copy_writer);
|
self.common.apply_packet_processing(zero_copy_writer);
|
||||||
self.common.sync_tm_tcp_source.add_tm(&tm.packet);
|
self.common.sync_tm_tcp_source.add_tm(&tm.packet);
|
||||||
self.tm_server_tx
|
self.tm_server_tx
|
||||||
.send(tm)
|
.send(tm)
|
||||||
.expect("Sending TM to server failed");
|
.expect("Sending TM to server failed");
|
||||||
|
if self.stop_signal.load(std::sync::atomic::Ordering::Relaxed) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => match e {
|
||||||
|
mpsc::RecvTimeoutError::Timeout => {
|
||||||
|
if self.stop_signal.load(std::sync::atomic::Ordering::Relaxed) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
mpsc::RecvTimeoutError::Disconnected => {
|
||||||
|
log::warn!("All TM funnel senders have disconnected");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user