Add action service and controller component #5

Merged
muellerr merged 11 commits from add-action-service-controller-obj into main 2024-04-13 11:19:13 +02:00
7 changed files with 176 additions and 66 deletions
Showing only changes of commit 6b89f00d90 - Show all commits

46
Cargo.lock generated
View File

@ -79,7 +79,7 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e28923312444cdd728e4738b3f9c9cac739500909bb3d3c94b43551b16517648" checksum = "e28923312444cdd728e4738b3f9c9cac739500909bb3d3c94b43551b16517648"
dependencies = [ dependencies = [
"windows-sys", "windows-sys 0.52.0",
] ]
[[package]] [[package]]
@ -89,7 +89,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1cd54b81ec8d6180e24654d0b371ad22fc3dd083b6ff8ba325b72e00c87660a7" checksum = "1cd54b81ec8d6180e24654d0b371ad22fc3dd083b6ff8ba325b72e00c87660a7"
dependencies = [ dependencies = [
"anstyle", "anstyle",
"windows-sys", "windows-sys 0.52.0",
] ]
[[package]] [[package]]
@ -409,6 +409,18 @@ version = "2.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c8640c5d730cb13ebd907d8d04b52f55ac9a2eec55b440c8892f40d56c76c1d" checksum = "6c8640c5d730cb13ebd907d8d04b52f55ac9a2eec55b440c8892f40d56c76c1d"
[[package]]
name = "mio"
version = "0.8.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c"
dependencies = [
"libc",
"log",
"wasi",
"windows-sys 0.48.0",
]
[[package]] [[package]]
name = "nodrop" name = "nodrop"
version = "0.1.14" version = "0.1.14"
@ -517,9 +529,9 @@ dependencies = [
[[package]] [[package]]
name = "quote" name = "quote"
version = "1.0.35" version = "1.0.36"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "291ec9ab5efd934aaf503a6466c5d5251535d108ee747472c3977cc5acc868ef" checksum = "0fa76aaf39101c457836aec0ce2316dbdc3ab723cdda1c6bd4e6ad4208acaca7"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
] ]
@ -577,7 +589,7 @@ checksum = "e86697c916019a8588c99b5fac3cead74ec0b4b819707a682fd4d23fa0ce1ba1"
[[package]] [[package]]
name = "satrs" name = "satrs"
version = "0.2.0-rc.0" version = "0.2.0-rc.0"
source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#0fec99402803b69fbba8ac7537197e0c7f1c836f" source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#3375780e008506ba413be22d176aa567db04e09e"
dependencies = [ dependencies = [
"bus", "bus",
"cobs", "cobs",
@ -587,6 +599,7 @@ dependencies = [
"downcast-rs", "downcast-rs",
"dyn-clone", "dyn-clone",
"hashbrown", "hashbrown",
"mio",
"num-traits", "num-traits",
"num_enum", "num_enum",
"paste", "paste",
@ -601,7 +614,7 @@ dependencies = [
[[package]] [[package]]
name = "satrs-mib" name = "satrs-mib"
version = "0.1.1" version = "0.1.1"
source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#0fec99402803b69fbba8ac7537197e0c7f1c836f" source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#3375780e008506ba413be22d176aa567db04e09e"
dependencies = [ dependencies = [
"csv", "csv",
"satrs-mib-codegen", "satrs-mib-codegen",
@ -613,7 +626,7 @@ dependencies = [
[[package]] [[package]]
name = "satrs-mib-codegen" name = "satrs-mib-codegen"
version = "0.1.1" version = "0.1.1"
source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#0fec99402803b69fbba8ac7537197e0c7f1c836f" source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#3375780e008506ba413be22d176aa567db04e09e"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -623,7 +636,7 @@ dependencies = [
[[package]] [[package]]
name = "satrs-shared" name = "satrs-shared"
version = "0.1.3" version = "0.1.3"
source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#0fec99402803b69fbba8ac7537197e0c7f1c836f" source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#3375780e008506ba413be22d176aa567db04e09e"
dependencies = [ dependencies = [
"serde", "serde",
"spacepackets", "spacepackets",
@ -682,7 +695,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05ffd9c0a93b7543e062e759284fcf5f5e3b098501104bfbdde4d404db792871" checksum = "05ffd9c0a93b7543e062e759284fcf5f5e3b098501104bfbdde4d404db792871"
dependencies = [ dependencies = [
"libc", "libc",
"windows-sys", "windows-sys 0.52.0",
] ]
[[package]] [[package]]
@ -800,6 +813,12 @@ version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
[[package]]
name = "wasi"
version = "0.11.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]] [[package]]
name = "wasm-bindgen" name = "wasm-bindgen"
version = "0.2.92" version = "0.2.92"
@ -863,6 +882,15 @@ dependencies = [
"windows-targets 0.52.4", "windows-targets 0.52.4",
] ]
[[package]]
name = "windows-sys"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9"
dependencies = [
"windows-targets 0.48.5",
]
[[package]] [[package]]
name = "windows-sys" name = "windows-sys"
version = "0.52.0" version = "0.52.0"

View File

@ -1,8 +0,0 @@
{
"com_if": "tcp",
"tcpip_udp_ip_addr": "127.0.0.1",
"tcpip_udp_port": 7301,
"tcpip_udp_recv_max_size": 1500,
"tcpip_tcp_ip_addr": "127.0.0.1",
"tcpip_tcp_port": 7301
}

View File

@ -1,5 +1,4 @@
use ops_sat_rs::config::components::Apid; use ops_sat_rs::config::EXPERIMENT_APID;
use ops_sat_rs::config::APID_VALIDATOR;
use satrs::pus::ReceivesEcssPusTc; use satrs::pus::ReceivesEcssPusTc;
use satrs::spacepackets::{CcsdsPacket, SpHeader}; use satrs::spacepackets::{CcsdsPacket, SpHeader};
use satrs::tmtc::{CcsdsPacketHandler, ReceivesCcsdsTc}; use satrs::tmtc::{CcsdsPacketHandler, ReceivesCcsdsTc};
@ -19,7 +18,7 @@ impl<
> ValidatorU16Id for CcsdsReceiver<TcSource, E> > ValidatorU16Id for CcsdsReceiver<TcSource, E>
{ {
fn validate(&self, apid: u16) -> bool { fn validate(&self, apid: u16) -> bool {
APID_VALIDATOR.contains(&apid) apid == EXPERIMENT_APID
} }
} }
@ -35,12 +34,8 @@ impl<
sp_header: &SpHeader, sp_header: &SpHeader,
tc_raw: &[u8], tc_raw: &[u8],
) -> Result<(), Self::Error> { ) -> Result<(), Self::Error> {
if sp_header.apid() == Apid::Cfdp as u16 {
} else {
return self.tc_source.pass_ccsds(sp_header, tc_raw); return self.tc_source.pass_ccsds(sp_header, tc_raw);
} }
Ok(())
}
fn handle_packet_with_unknown_apid( fn handle_packet_with_unknown_apid(
&mut self, &mut self,
@ -48,6 +43,7 @@ impl<
_tc_raw: &[u8], _tc_raw: &[u8],
) -> Result<(), Self::Error> { ) -> Result<(), Self::Error> {
log::warn!("unknown APID 0x{:x?} detected", sp_header.apid()); log::warn!("unknown APID 0x{:x?} detected", sp_header.apid());
// TODO: Log event or something similar?
Ok(()) Ok(())
} }
} }

View File

@ -3,6 +3,8 @@ use satrs_mib::res_code::ResultU16Info;
use satrs_mib::resultcode; use satrs_mib::resultcode;
use std::net::Ipv4Addr; use std::net::Ipv4Addr;
pub const STOP_FILE_NAME: &str = "stop-experiment";
pub const OBSW_SERVER_ADDR: Ipv4Addr = Ipv4Addr::UNSPECIFIED; pub const OBSW_SERVER_ADDR: Ipv4Addr = Ipv4Addr::UNSPECIFIED;
pub const SERVER_PORT: u16 = 7301; pub const SERVER_PORT: u16 = 7301;
pub const EXPERIMENT_ID: u32 = 278; pub const EXPERIMENT_ID: u32 = 278;
@ -96,4 +98,7 @@ pub mod tasks {
pub const FREQ_MS_EVENT_HANDLING: u64 = 400; pub const FREQ_MS_EVENT_HANDLING: u64 = 400;
pub const FREQ_MS_AOCS: u64 = 500; pub const FREQ_MS_AOCS: u64 = 500;
pub const FREQ_MS_PUS_STACK: u64 = 200; pub const FREQ_MS_PUS_STACK: u64 = 200;
pub const FREQ_MS_CTRL: u64 = 400;
pub const STOP_CHECK_FREQUENCY: u64 = 400;
} }

View File

@ -1,11 +1,13 @@
use std::{ use std::{
collections::{HashSet, VecDeque}, collections::{HashSet, VecDeque},
sync::{Arc, Mutex}, sync::{atomic::AtomicBool, Arc, Mutex},
time::Duration,
}; };
use log::{info, warn}; use log::{info, warn};
use ops_sat_rs::config::tasks::STOP_CHECK_FREQUENCY;
use satrs::{ use satrs::{
hal::std::tcp_server::{ServerConfig, TcpSpacepacketsServer}, hal::std::tcp_server::{HandledConnectionHandler, ServerConfig, TcpSpacepacketsServer},
pus::ReceivesEcssPusTc, pus::ReceivesEcssPusTc,
spacepackets::PacketId, spacepackets::PacketId,
tmtc::{CcsdsDistributor, CcsdsError, ReceivesCcsdsTc, TmPacketSourceCore}, tmtc::{CcsdsDistributor, CcsdsError, ReceivesCcsdsTc, TmPacketSourceCore},
@ -69,12 +71,25 @@ impl TmPacketSourceCore for SyncTcpTmSource {
} }
} }
#[derive(Default)]
pub struct ConnectionFinishedHandler {}
impl HandledConnectionHandler for ConnectionFinishedHandler {
fn handled_connection(&mut self, info: satrs::hal::std::tcp_server::HandledConnectionInfo) {
info!(
"Served {} TMs and {} TCs for client {:?}",
info.num_sent_tms, info.num_received_tcs, info.addr
);
}
}
pub type TcpServerType<TcSource, MpscErrorType> = TcpSpacepacketsServer< pub type TcpServerType<TcSource, MpscErrorType> = TcpSpacepacketsServer<
(),
CcsdsError<MpscErrorType>,
SyncTcpTmSource, SyncTcpTmSource,
CcsdsDistributor<CcsdsReceiver<TcSource, MpscErrorType>, MpscErrorType>, CcsdsDistributor<CcsdsReceiver<TcSource, MpscErrorType>, MpscErrorType>,
HashSet<PacketId>, HashSet<PacketId>,
ConnectionFinishedHandler,
(),
CcsdsError<MpscErrorType>,
>; >;
pub struct TcpTask< pub struct TcpTask<
@ -102,26 +117,29 @@ impl<
tm_source: SyncTcpTmSource, tm_source: SyncTcpTmSource,
tc_receiver: CcsdsDistributor<CcsdsReceiver<TcSource, MpscErrorType>, MpscErrorType>, tc_receiver: CcsdsDistributor<CcsdsReceiver<TcSource, MpscErrorType>, MpscErrorType>,
packet_id_lookup: Vec<PacketId>, packet_id_lookup: Vec<PacketId>,
stop_signal: Arc<AtomicBool>,
) -> Result<Self, std::io::Error> { ) -> Result<Self, std::io::Error> {
Ok(Self { Ok(Self {
server: TcpSpacepacketsServer::new(cfg, tm_source, tc_receiver, packet_id_lookup)?, server: TcpSpacepacketsServer::new(
cfg,
tm_source,
tc_receiver,
packet_id_lookup,
ConnectionFinishedHandler::default(),
Some(stop_signal),
)?,
}) })
} }
pub fn periodic_operation(&mut self) { pub fn periodic_operation(&mut self) {
loop { let result = self
let result = self.server.handle_next_connection(); .server
.handle_next_connection(Some(Duration::from_millis(STOP_CHECK_FREQUENCY)));
match result { match result {
Ok(conn_result) => { Ok(_conn_result) => (),
info!(
"Served {} TMs and {} TCs for client {:?}",
conn_result.num_sent_tms, conn_result.num_received_tcs, conn_result.addr
);
}
Err(e) => { Err(e) => {
warn!("TCP server error: {e:?}"); warn!("TCP server error: {e:?}");
} }
} }
} }
}
} }

View File

@ -1,12 +1,16 @@
use std::{ use std::{
net::{IpAddr, SocketAddr}, net::{IpAddr, SocketAddr},
sync::mpsc, sync::{atomic::AtomicBool, mpsc, Arc},
thread, thread,
time::Duration, time::Duration,
}; };
use log::info; use log::info;
use ops_sat_rs::config::{components::CONTROLLER_ID, tasks::FREQ_MS_PUS_STACK, EXPERIMENT_APID}; use ops_sat_rs::config::{
components::CONTROLLER_ID,
tasks::{FREQ_MS_CTRL, FREQ_MS_PUS_STACK},
EXPERIMENT_APID, STOP_FILE_NAME,
};
use ops_sat_rs::config::{tasks::FREQ_MS_UDP_TMTC, OBSW_SERVER_ADDR, SERVER_PORT}; use ops_sat_rs::config::{tasks::FREQ_MS_UDP_TMTC, OBSW_SERVER_ADDR, SERVER_PORT};
use satrs::{ use satrs::{
hal::std::{tcp_server::ServerConfig, udp_server::UdpTcServer}, hal::std::{tcp_server::ServerConfig, udp_server::UdpTcServer},
@ -38,10 +42,11 @@ mod requests;
mod tm_funnel; mod tm_funnel;
mod tmtc; mod tmtc;
#[allow(dead_code)]
fn main() { fn main() {
setup_logger().expect("setting up logging with fern failed"); setup_logger().expect("setting up logging with fern failed");
println!("OPS-SAT Rust experiment OBSW"); println!("OPS-SAT Rust Experiment OBSW");
let stop_signal = Arc::new(AtomicBool::new(false));
let (tc_source_tx, tc_source_rx) = mpsc::channel(); let (tc_source_tx, tc_source_rx) = mpsc::channel();
let (tm_funnel_tx, tm_funnel_rx) = mpsc::channel(); let (tm_funnel_tx, tm_funnel_rx) = mpsc::channel();
@ -143,53 +148,94 @@ fn main() {
sync_tm_tcp_source.clone(), sync_tm_tcp_source.clone(),
tcp_ccsds_distributor, tcp_ccsds_distributor,
vec![PacketId::new_for_tc(true, EXPERIMENT_APID)], vec![PacketId::new_for_tc(true, EXPERIMENT_APID)],
// PACKET_ID_VALIDATOR.clone(), stop_signal.clone(),
) )
.expect("tcp server creation failed"); .expect("tcp server creation failed");
let mut tm_funnel = TmFunnelDynamic::new(sync_tm_tcp_source, tm_funnel_rx, tm_server_tx); let mut tm_funnel = TmFunnelDynamic::new(
sync_tm_tcp_source,
tm_funnel_rx,
tm_server_tx,
stop_signal.clone(),
);
info!("Starting CTRL task");
let ctrl_stop_signal = stop_signal.clone();
let jh_ctrl_thread = thread::Builder::new()
.name("ops-sat ctrl".to_string())
.spawn(move || loop {
if std::path::Path::new(STOP_FILE_NAME).exists() {
log::warn!(
"Detected stop file name at {}. Initiating experiment shutdown",
STOP_FILE_NAME
);
ctrl_stop_signal.store(true, std::sync::atomic::Ordering::Relaxed);
break;
}
thread::sleep(Duration::from_millis(FREQ_MS_CTRL));
})
.unwrap();
info!("Starting TMTC and UDP task"); info!("Starting TMTC and UDP task");
let tmtc_stop_signal = stop_signal.clone();
let jh_udp_tmtc = thread::Builder::new() let jh_udp_tmtc = thread::Builder::new()
.name("TMTC and UDP".to_string()) .name("ops-sat tmtc-udp".to_string())
.spawn(move || { .spawn(move || {
info!("Running UDP server on port {SERVER_PORT}"); info!("Running UDP server on port {SERVER_PORT}");
loop { loop {
udp_tmtc_server.periodic_operation(); udp_tmtc_server.periodic_operation();
tmtc_task.periodic_operation(); tmtc_task.periodic_operation();
if tmtc_stop_signal.load(std::sync::atomic::Ordering::Relaxed) {
break;
}
thread::sleep(Duration::from_millis(FREQ_MS_UDP_TMTC)); thread::sleep(Duration::from_millis(FREQ_MS_UDP_TMTC));
} }
}) })
.unwrap(); .unwrap();
let tcp_stop_signal = stop_signal.clone();
info!("Starting TCP task"); info!("Starting TCP task");
let jh_tcp = thread::Builder::new() let jh_tcp = thread::Builder::new()
.name("TCP".to_string()) .name("ops-sat tcp".to_string())
.spawn(move || { .spawn(move || {
info!("Running TCP server on port {SERVER_PORT}"); info!("Running TCP server on port {SERVER_PORT}");
loop { loop {
tcp_server.periodic_operation(); tcp_server.periodic_operation();
if tcp_stop_signal.load(std::sync::atomic::Ordering::Relaxed) {
break;
}
} }
}) })
.unwrap(); .unwrap();
info!("Starting TM funnel task"); info!("Starting TM funnel task");
let funnel_stop_signal = stop_signal.clone();
let jh_tm_funnel = thread::Builder::new() let jh_tm_funnel = thread::Builder::new()
.name("TM Funnel".to_string()) .name("ops-sat tm-funnel".to_string())
.spawn(move || loop { .spawn(move || loop {
tm_funnel.operation(); tm_funnel.operation();
if funnel_stop_signal.load(std::sync::atomic::Ordering::Relaxed) {
break;
}
}) })
.unwrap(); .unwrap();
info!("Starting PUS handler thread"); info!("Starting PUS handler thread");
let pus_stop_signal = stop_signal.clone();
let jh_pus_handler = thread::Builder::new() let jh_pus_handler = thread::Builder::new()
.name("PUS".to_string()) .name("ops-sat pus".to_string())
.spawn(move || loop { .spawn(move || loop {
pus_stack.periodic_operation(); pus_stack.periodic_operation();
if pus_stop_signal.load(std::sync::atomic::Ordering::Relaxed) {
break;
}
thread::sleep(Duration::from_millis(FREQ_MS_PUS_STACK)); thread::sleep(Duration::from_millis(FREQ_MS_PUS_STACK));
}) })
.unwrap(); .unwrap();
jh_ctrl_thread
.join()
.expect("Joining Controller thread failed");
jh_udp_tmtc jh_udp_tmtc
.join() .join()
.expect("Joining UDP TMTC server thread failed"); .expect("Joining UDP TMTC server thread failed");

View File

@ -1,9 +1,9 @@
use std::{ use std::sync::atomic::AtomicBool;
collections::HashMap, use std::sync::Arc;
sync::mpsc::{self}, use std::{collections::HashMap, sync::mpsc, time::Duration};
};
use log::info; use log::info;
use ops_sat_rs::config::tasks::STOP_CHECK_FREQUENCY;
use satrs::pus::PusTmAsVec; use satrs::pus::PusTmAsVec;
use satrs::{ use satrs::{
seq_count::{CcsdsSimpleSeqCountProvider, SequenceCountProviderCore}, seq_count::{CcsdsSimpleSeqCountProvider, SequenceCountProviderCore},
@ -78,6 +78,7 @@ pub struct TmFunnelDynamic {
common: TmFunnelCommon, common: TmFunnelCommon,
tm_funnel_rx: mpsc::Receiver<PusTmAsVec>, tm_funnel_rx: mpsc::Receiver<PusTmAsVec>,
tm_server_tx: mpsc::Sender<PusTmAsVec>, tm_server_tx: mpsc::Sender<PusTmAsVec>,
stop_signal: Arc<AtomicBool>,
} }
impl TmFunnelDynamic { impl TmFunnelDynamic {
@ -85,25 +86,49 @@ impl TmFunnelDynamic {
sync_tm_tcp_source: SyncTcpTmSource, sync_tm_tcp_source: SyncTcpTmSource,
tm_funnel_rx: mpsc::Receiver<PusTmAsVec>, tm_funnel_rx: mpsc::Receiver<PusTmAsVec>,
tm_server_tx: mpsc::Sender<PusTmAsVec>, tm_server_tx: mpsc::Sender<PusTmAsVec>,
stop_signal: Arc<AtomicBool>,
) -> Self { ) -> Self {
Self { Self {
common: TmFunnelCommon::new(sync_tm_tcp_source), common: TmFunnelCommon::new(sync_tm_tcp_source),
tm_funnel_rx, tm_funnel_rx,
tm_server_tx, tm_server_tx,
stop_signal,
} }
} }
pub fn operation(&mut self) { pub fn operation(&mut self) {
if let Ok(mut tm) = self.tm_funnel_rx.recv() { loop {
match self
.tm_funnel_rx
.recv_timeout(Duration::from_millis(STOP_CHECK_FREQUENCY))
{
Ok(mut tm) => {
// Read the TM, set sequence counter and message counter, and finally update // Read the TM, set sequence counter and message counter, and finally update
// the CRC. // the CRC.
let zero_copy_writer = PusTmZeroCopyWriter::new(&mut tm.packet, MIN_CDS_FIELD_LEN) let zero_copy_writer =
PusTmZeroCopyWriter::new(&mut tm.packet, MIN_CDS_FIELD_LEN)
.expect("Creating TM zero copy writer failed"); .expect("Creating TM zero copy writer failed");
self.common.apply_packet_processing(zero_copy_writer); self.common.apply_packet_processing(zero_copy_writer);
self.common.sync_tm_tcp_source.add_tm(&tm.packet); self.common.sync_tm_tcp_source.add_tm(&tm.packet);
self.tm_server_tx self.tm_server_tx
.send(tm) .send(tm)
.expect("Sending TM to server failed"); .expect("Sending TM to server failed");
if self.stop_signal.load(std::sync::atomic::Ordering::Relaxed) {
break;
}
}
Err(e) => match e {
mpsc::RecvTimeoutError::Timeout => {
if self.stop_signal.load(std::sync::atomic::Ordering::Relaxed) {
break;
}
}
mpsc::RecvTimeoutError::Disconnected => {
log::warn!("All TM funnel senders have disconnected");
break;
}
},
}
} }
} }
} }