more complex app runs as well..
This commit is contained in:
parent
18978f807f
commit
ef58c54d27
47
Cargo.lock
generated
47
Cargo.lock
generated
@ -90,8 +90,10 @@ checksum = "8a0d04d43504c61aa6c7531f1871dd0d418d91130162063b789da00fd7057a5e"
|
|||||||
dependencies = [
|
dependencies = [
|
||||||
"android-tzdata",
|
"android-tzdata",
|
||||||
"iana-time-zone",
|
"iana-time-zone",
|
||||||
|
"js-sys",
|
||||||
"num-traits",
|
"num-traits",
|
||||||
"serde",
|
"serde",
|
||||||
|
"wasm-bindgen",
|
||||||
"windows-targets 0.52.4",
|
"windows-targets 0.52.4",
|
||||||
]
|
]
|
||||||
|
|
||||||
@ -184,6 +186,12 @@ dependencies = [
|
|||||||
"allocator-api2",
|
"allocator-api2",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "heck"
|
||||||
|
version = "0.4.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "hermit-abi"
|
name = "hermit-abi"
|
||||||
version = "0.3.9"
|
version = "0.3.9"
|
||||||
@ -232,6 +240,12 @@ dependencies = [
|
|||||||
"wasm-bindgen",
|
"wasm-bindgen",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "lazy_static"
|
||||||
|
version = "1.4.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "libc"
|
name = "libc"
|
||||||
version = "0.2.153"
|
version = "0.2.153"
|
||||||
@ -297,12 +311,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||||||
checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92"
|
checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ops-sat-sw"
|
name = "ops-sat-rs"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
|
"chrono",
|
||||||
"fern",
|
"fern",
|
||||||
|
"lazy_static",
|
||||||
"log",
|
"log",
|
||||||
"satrs",
|
"satrs",
|
||||||
|
"strum",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@ -360,6 +377,12 @@ dependencies = [
|
|||||||
"bitflags",
|
"bitflags",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "rustversion"
|
||||||
|
version = "1.0.15"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "80af6f9131f277a45a3fba6ce8e2258037bb0477a67e610d3c1fe046ab31de47"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "satrs"
|
name = "satrs"
|
||||||
version = "0.2.0-rc.0"
|
version = "0.2.0-rc.0"
|
||||||
@ -445,6 +468,28 @@ dependencies = [
|
|||||||
"zerocopy",
|
"zerocopy",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "strum"
|
||||||
|
version = "0.26.2"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "5d8cec3501a5194c432b2b7976db6b7d10ec95c253208b45f83f7136aa985e29"
|
||||||
|
dependencies = [
|
||||||
|
"strum_macros",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "strum_macros"
|
||||||
|
version = "0.26.2"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "c6cf59daf282c0a494ba14fd21610a0325f9f90ec9d1231dea26bcb1d696c946"
|
||||||
|
dependencies = [
|
||||||
|
"heck",
|
||||||
|
"proc-macro2",
|
||||||
|
"quote",
|
||||||
|
"rustversion",
|
||||||
|
"syn 2.0.58",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "syn"
|
name = "syn"
|
||||||
version = "1.0.109"
|
version = "1.0.109"
|
||||||
|
@ -7,7 +7,10 @@ edition = "2021"
|
|||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
fern = "0.6"
|
fern = "0.6"
|
||||||
|
chrono = "0.4"
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
|
lazy_static = "1"
|
||||||
|
strum = { version = "0.26", features = ["derive"] }
|
||||||
|
|
||||||
[dependencies.satrs]
|
[dependencies.satrs]
|
||||||
version = "0.2.0-rc.0"
|
version = "0.2.0-rc.0"
|
||||||
|
53
src/ccsds.rs
Normal file
53
src/ccsds.rs
Normal file
@ -0,0 +1,53 @@
|
|||||||
|
use satrs::pus::ReceivesEcssPusTc;
|
||||||
|
use satrs::spacepackets::{CcsdsPacket, SpHeader};
|
||||||
|
use satrs::tmtc::{CcsdsPacketHandler, ReceivesCcsdsTc};
|
||||||
|
use satrs::ValidatorU16Id;
|
||||||
|
use ops_sat_rs::config::components::Apid;
|
||||||
|
use ops_sat_rs::config::APID_VALIDATOR;
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct CcsdsReceiver<
|
||||||
|
TcSource: ReceivesCcsdsTc<Error = E> + ReceivesEcssPusTc<Error = E> + Clone,
|
||||||
|
E,
|
||||||
|
> {
|
||||||
|
pub tc_source: TcSource,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<
|
||||||
|
TcSource: ReceivesCcsdsTc<Error = E> + ReceivesEcssPusTc<Error = E> + Clone + 'static,
|
||||||
|
E: 'static,
|
||||||
|
> ValidatorU16Id for CcsdsReceiver<TcSource, E>
|
||||||
|
{
|
||||||
|
fn validate(&self, apid: u16) -> bool {
|
||||||
|
APID_VALIDATOR.contains(&apid)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<
|
||||||
|
TcSource: ReceivesCcsdsTc<Error = E> + ReceivesEcssPusTc<Error = E> + Clone + 'static,
|
||||||
|
E: 'static,
|
||||||
|
> CcsdsPacketHandler for CcsdsReceiver<TcSource, E>
|
||||||
|
{
|
||||||
|
type Error = E;
|
||||||
|
|
||||||
|
fn handle_packet_with_valid_apid(
|
||||||
|
&mut self,
|
||||||
|
sp_header: &SpHeader,
|
||||||
|
tc_raw: &[u8],
|
||||||
|
) -> Result<(), Self::Error> {
|
||||||
|
if sp_header.apid() == Apid::Cfdp as u16 {
|
||||||
|
} else {
|
||||||
|
return self.tc_source.pass_ccsds(sp_header, tc_raw);
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_packet_with_unknown_apid(
|
||||||
|
&mut self,
|
||||||
|
sp_header: &SpHeader,
|
||||||
|
_tc_raw: &[u8],
|
||||||
|
) -> Result<(), Self::Error> {
|
||||||
|
log::warn!("unknown APID 0x{:x?} detected", sp_header.apid());
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
74
src/config.rs
Normal file
74
src/config.rs
Normal file
@ -0,0 +1,74 @@
|
|||||||
|
use lazy_static::lazy_static;
|
||||||
|
use satrs::spacepackets::{PacketId, PacketType};
|
||||||
|
use std::{collections::HashSet, net::Ipv4Addr};
|
||||||
|
use strum::IntoEnumIterator;
|
||||||
|
|
||||||
|
pub const OBSW_SERVER_ADDR: Ipv4Addr = Ipv4Addr::UNSPECIFIED;
|
||||||
|
pub const SERVER_PORT: u16 = 7301;
|
||||||
|
|
||||||
|
lazy_static! {
|
||||||
|
pub static ref PACKET_ID_VALIDATOR: HashSet<PacketId> = {
|
||||||
|
let mut set = HashSet::new();
|
||||||
|
for id in components::Apid::iter() {
|
||||||
|
set.insert(PacketId::new(PacketType::Tc, true, id as u16));
|
||||||
|
}
|
||||||
|
set
|
||||||
|
};
|
||||||
|
pub static ref APID_VALIDATOR: HashSet<u16> = {
|
||||||
|
let mut set = HashSet::new();
|
||||||
|
for id in components::Apid::iter() {
|
||||||
|
set.insert(id as u16);
|
||||||
|
}
|
||||||
|
set
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
pub mod components {
|
||||||
|
use satrs::request::UniqueApidTargetId;
|
||||||
|
use strum::EnumIter;
|
||||||
|
|
||||||
|
#[derive(Copy, Clone, PartialEq, Eq, EnumIter)]
|
||||||
|
pub enum Apid {
|
||||||
|
Sched = 1,
|
||||||
|
GenericPus = 2,
|
||||||
|
Cfdp = 4,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Component IDs for components with the PUS APID.
|
||||||
|
#[derive(Copy, Clone, PartialEq, Eq)]
|
||||||
|
pub enum PusId {
|
||||||
|
PusEventManagement = 0,
|
||||||
|
PusRouting = 1,
|
||||||
|
PusTest = 2,
|
||||||
|
PusAction = 3,
|
||||||
|
PusMode = 4,
|
||||||
|
PusHk = 5,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Copy, Clone, PartialEq, Eq)]
|
||||||
|
pub enum AcsId {
|
||||||
|
Mgm0 = 0,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub const PUS_ACTION_SERVICE: UniqueApidTargetId =
|
||||||
|
UniqueApidTargetId::new(Apid::GenericPus as u16, PusId::PusAction as u32);
|
||||||
|
pub const PUS_EVENT_MANAGEMENT: UniqueApidTargetId =
|
||||||
|
UniqueApidTargetId::new(Apid::GenericPus as u16, 0);
|
||||||
|
pub const PUS_ROUTING_SERVICE: UniqueApidTargetId =
|
||||||
|
UniqueApidTargetId::new(Apid::GenericPus as u16, PusId::PusRouting as u32);
|
||||||
|
pub const PUS_TEST_SERVICE: UniqueApidTargetId =
|
||||||
|
UniqueApidTargetId::new(Apid::GenericPus as u16, PusId::PusTest as u32);
|
||||||
|
pub const PUS_MODE_SERVICE: UniqueApidTargetId =
|
||||||
|
UniqueApidTargetId::new(Apid::GenericPus as u16, PusId::PusMode as u32);
|
||||||
|
pub const PUS_HK_SERVICE: UniqueApidTargetId =
|
||||||
|
UniqueApidTargetId::new(Apid::GenericPus as u16, PusId::PusHk as u32);
|
||||||
|
pub const PUS_SCHED_SERVICE: UniqueApidTargetId =
|
||||||
|
UniqueApidTargetId::new(Apid::Sched as u16, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub mod tasks {
|
||||||
|
pub const FREQ_MS_UDP_TMTC: u64 = 200;
|
||||||
|
pub const FREQ_MS_EVENT_HANDLING: u64 = 400;
|
||||||
|
pub const FREQ_MS_AOCS: u64 = 500;
|
||||||
|
pub const FREQ_MS_PUS_STACK: u64 = 200;
|
||||||
|
}
|
3
src/lib.rs
Normal file
3
src/lib.rs
Normal file
@ -0,0 +1,3 @@
|
|||||||
|
use std::net::Ipv4Addr;
|
||||||
|
|
||||||
|
pub mod config;
|
17
src/logger.rs
Normal file
17
src/logger.rs
Normal file
@ -0,0 +1,17 @@
|
|||||||
|
pub fn setup_logger() -> Result<(), fern::InitError> {
|
||||||
|
fern::Dispatch::new()
|
||||||
|
.format(|out, message, record| {
|
||||||
|
out.finish(format_args!(
|
||||||
|
"{}[{}][{}] {}",
|
||||||
|
chrono::Local::now().format("[%Y-%m-%d][%H:%M:%S]"),
|
||||||
|
std::thread::current().name().expect("unnamed_thread"),
|
||||||
|
record.level(),
|
||||||
|
message
|
||||||
|
))
|
||||||
|
})
|
||||||
|
.level(log::LevelFilter::Debug)
|
||||||
|
.chain(std::io::stdout())
|
||||||
|
.chain(fern::log_file("output.log")?)
|
||||||
|
.apply()?;
|
||||||
|
Ok(())
|
||||||
|
}
|
94
src/main.rs
94
src/main.rs
@ -1,7 +1,95 @@
|
|||||||
fn main() {
|
use std::{
|
||||||
println!("OPSSAT Rust experiment OBSW");
|
net::{IpAddr, SocketAddr},
|
||||||
|
sync::mpsc,
|
||||||
|
thread,
|
||||||
|
time::Duration,
|
||||||
|
};
|
||||||
|
|
||||||
|
use log::info;
|
||||||
|
use ops_sat_rs::config::{
|
||||||
|
tasks::FREQ_MS_UDP_TMTC, OBSW_SERVER_ADDR, PACKET_ID_VALIDATOR, SERVER_PORT,
|
||||||
|
};
|
||||||
|
use satrs::{
|
||||||
|
hal::std::{tcp_server::ServerConfig, udp_server::UdpTcServer},
|
||||||
|
tmtc::CcsdsDistributor,
|
||||||
|
};
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
ccsds::CcsdsReceiver,
|
||||||
|
logger::setup_logger,
|
||||||
|
tcp::{SyncTcpTmSource, TcpTask},
|
||||||
|
tmtc::PusTcSourceProviderDynamic,
|
||||||
|
udp::{DynamicUdpTmHandler, UdpTmtcServer},
|
||||||
|
};
|
||||||
|
|
||||||
|
mod ccsds;
|
||||||
|
mod logger;
|
||||||
|
mod tcp;
|
||||||
|
mod tmtc;
|
||||||
|
mod udp;
|
||||||
|
|
||||||
|
fn main() {
|
||||||
|
setup_logger().expect("setting up logging with fern failed");
|
||||||
|
println!("OPS-SAT Rust experiment OBSW");
|
||||||
|
|
||||||
|
let (tc_source_tx, tc_source_rx) = mpsc::channel();
|
||||||
|
let (tm_server_tx, tm_server_rx) = mpsc::channel();
|
||||||
|
|
||||||
|
let tc_source = PusTcSourceProviderDynamic(tc_source_tx);
|
||||||
|
|
||||||
|
let ccsds_receiver = CcsdsReceiver { tc_source };
|
||||||
|
|
||||||
|
let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT);
|
||||||
|
let udp_ccsds_distributor = CcsdsDistributor::new(ccsds_receiver.clone());
|
||||||
|
let udp_tc_server = UdpTcServer::new(sock_addr, 2048, Box::new(udp_ccsds_distributor))
|
||||||
|
.expect("creating UDP TMTC server failed");
|
||||||
|
|
||||||
|
let mut udp_tmtc_server = UdpTmtcServer {
|
||||||
|
udp_tc_server,
|
||||||
|
tm_handler: DynamicUdpTmHandler {
|
||||||
|
tm_rx: tm_server_rx,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
let tcp_ccsds_distributor = CcsdsDistributor::new(ccsds_receiver);
|
||||||
|
let tcp_server_cfg = ServerConfig::new(sock_addr, Duration::from_millis(400), 4096, 8192);
|
||||||
|
let sync_tm_tcp_source = SyncTcpTmSource::new(200);
|
||||||
|
let mut tcp_server = TcpTask::new(
|
||||||
|
tcp_server_cfg,
|
||||||
|
sync_tm_tcp_source.clone(),
|
||||||
|
tcp_ccsds_distributor,
|
||||||
|
PACKET_ID_VALIDATOR.clone(),
|
||||||
|
)
|
||||||
|
.expect("tcp server creation failed");
|
||||||
|
|
||||||
|
info!("Starting TMTC and UDP task");
|
||||||
|
let jh_udp_tmtc = thread::Builder::new()
|
||||||
|
.name("TMTC and UDP".to_string())
|
||||||
|
.spawn(move || {
|
||||||
|
info!("Running UDP server on port {SERVER_PORT}");
|
||||||
loop {
|
loop {
|
||||||
std::thread::sleep(std::time::Duration::from_millis(500));
|
udp_tmtc_server.periodic_operation();
|
||||||
|
// tmtc_task.periodic_operation();
|
||||||
|
thread::sleep(Duration::from_millis(FREQ_MS_UDP_TMTC));
|
||||||
}
|
}
|
||||||
|
})
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
info!("Starting TCP task");
|
||||||
|
let jh_tcp = thread::Builder::new()
|
||||||
|
.name("TCP".to_string())
|
||||||
|
.spawn(move || {
|
||||||
|
info!("Running TCP server on port {SERVER_PORT}");
|
||||||
|
loop {
|
||||||
|
tcp_server.periodic_operation();
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
jh_udp_tmtc
|
||||||
|
.join()
|
||||||
|
.expect("Joining UDP TMTC server thread failed");
|
||||||
|
jh_tcp
|
||||||
|
.join()
|
||||||
|
.expect("Joining TCP TMTC server thread failed");
|
||||||
}
|
}
|
||||||
|
127
src/tcp.rs
Normal file
127
src/tcp.rs
Normal file
@ -0,0 +1,127 @@
|
|||||||
|
use std::{
|
||||||
|
collections::{HashSet, VecDeque},
|
||||||
|
sync::{Arc, Mutex},
|
||||||
|
};
|
||||||
|
|
||||||
|
use log::{info, warn};
|
||||||
|
use satrs::{
|
||||||
|
hal::std::tcp_server::{ServerConfig, TcpSpacepacketsServer},
|
||||||
|
pus::ReceivesEcssPusTc,
|
||||||
|
spacepackets::PacketId,
|
||||||
|
tmtc::{CcsdsDistributor, CcsdsError, ReceivesCcsdsTc, TmPacketSourceCore},
|
||||||
|
};
|
||||||
|
|
||||||
|
use crate::ccsds::CcsdsReceiver;
|
||||||
|
|
||||||
|
#[derive(Default, Clone)]
|
||||||
|
pub struct SyncTcpTmSource {
|
||||||
|
tm_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
|
||||||
|
max_packets_stored: usize,
|
||||||
|
pub silent_packet_overwrite: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SyncTcpTmSource {
|
||||||
|
pub fn new(max_packets_stored: usize) -> Self {
|
||||||
|
Self {
|
||||||
|
tm_queue: Arc::default(),
|
||||||
|
max_packets_stored,
|
||||||
|
silent_packet_overwrite: true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn add_tm(&mut self, tm: &[u8]) {
|
||||||
|
let mut tm_queue = self.tm_queue.lock().expect("locking tm queue failec");
|
||||||
|
if tm_queue.len() > self.max_packets_stored {
|
||||||
|
if !self.silent_packet_overwrite {
|
||||||
|
warn!("TPC TM source is full, deleting oldest packet");
|
||||||
|
}
|
||||||
|
tm_queue.pop_front();
|
||||||
|
}
|
||||||
|
tm_queue.push_back(tm.to_vec());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TmPacketSourceCore for SyncTcpTmSource {
|
||||||
|
type Error = ();
|
||||||
|
|
||||||
|
fn retrieve_packet(&mut self, buffer: &mut [u8]) -> Result<usize, Self::Error> {
|
||||||
|
let mut tm_queue = self.tm_queue.lock().expect("locking tm queue failed");
|
||||||
|
if !tm_queue.is_empty() {
|
||||||
|
let next_vec = tm_queue.front().unwrap();
|
||||||
|
if buffer.len() < next_vec.len() {
|
||||||
|
panic!(
|
||||||
|
"provided buffer too small, must be at least {} bytes",
|
||||||
|
next_vec.len()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
let next_vec = tm_queue.pop_front().unwrap();
|
||||||
|
buffer[0..next_vec.len()].copy_from_slice(&next_vec);
|
||||||
|
if next_vec.len() > 9 {
|
||||||
|
let service = next_vec[7];
|
||||||
|
let subservice = next_vec[8];
|
||||||
|
info!("Sending PUS TM[{service},{subservice}]")
|
||||||
|
} else {
|
||||||
|
info!("Sending PUS TM");
|
||||||
|
}
|
||||||
|
return Ok(next_vec.len());
|
||||||
|
}
|
||||||
|
Ok(0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub type TcpServerType<TcSource, MpscErrorType> = TcpSpacepacketsServer<
|
||||||
|
(),
|
||||||
|
CcsdsError<MpscErrorType>,
|
||||||
|
SyncTcpTmSource,
|
||||||
|
CcsdsDistributor<CcsdsReceiver<TcSource, MpscErrorType>, MpscErrorType>,
|
||||||
|
HashSet<PacketId>,
|
||||||
|
>;
|
||||||
|
|
||||||
|
pub struct TcpTask<
|
||||||
|
TcSource: ReceivesCcsdsTc<Error = MpscErrorType>
|
||||||
|
+ ReceivesEcssPusTc<Error = MpscErrorType>
|
||||||
|
+ Clone
|
||||||
|
+ Send
|
||||||
|
+ 'static,
|
||||||
|
MpscErrorType: 'static,
|
||||||
|
> {
|
||||||
|
server: TcpServerType<TcSource, MpscErrorType>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<
|
||||||
|
TcSource: ReceivesCcsdsTc<Error = MpscErrorType>
|
||||||
|
+ ReceivesEcssPusTc<Error = MpscErrorType>
|
||||||
|
+ Clone
|
||||||
|
+ Send
|
||||||
|
+ 'static,
|
||||||
|
MpscErrorType: 'static + core::fmt::Debug,
|
||||||
|
> TcpTask<TcSource, MpscErrorType>
|
||||||
|
{
|
||||||
|
pub fn new(
|
||||||
|
cfg: ServerConfig,
|
||||||
|
tm_source: SyncTcpTmSource,
|
||||||
|
tc_receiver: CcsdsDistributor<CcsdsReceiver<TcSource, MpscErrorType>, MpscErrorType>,
|
||||||
|
packet_id_lookup: HashSet<PacketId>,
|
||||||
|
) -> Result<Self, std::io::Error> {
|
||||||
|
Ok(Self {
|
||||||
|
server: TcpSpacepacketsServer::new(cfg, tm_source, tc_receiver, packet_id_lookup)?,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn periodic_operation(&mut self) {
|
||||||
|
loop {
|
||||||
|
let result = self.server.handle_next_connection();
|
||||||
|
match result {
|
||||||
|
Ok(conn_result) => {
|
||||||
|
info!(
|
||||||
|
"Served {} TMs and {} TCs for client {:?}",
|
||||||
|
conn_result.num_sent_tms, conn_result.num_received_tcs, conn_result.addr
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
warn!("TCP server error: {e:?}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
81
src/tmtc.rs
Normal file
81
src/tmtc.rs
Normal file
@ -0,0 +1,81 @@
|
|||||||
|
use satrs::{
|
||||||
|
pus::ReceivesEcssPusTc,
|
||||||
|
spacepackets::{ecss::tc::PusTcReader, SpHeader},
|
||||||
|
tmtc::ReceivesCcsdsTc,
|
||||||
|
};
|
||||||
|
use std::sync::mpsc::{self, SendError, Sender, TryRecvError};
|
||||||
|
|
||||||
|
// Newtype, can not implement necessary traits on MPSC sender directly because of orphan rules.
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct PusTcSourceProviderDynamic(pub Sender<Vec<u8>>);
|
||||||
|
|
||||||
|
impl ReceivesEcssPusTc for PusTcSourceProviderDynamic {
|
||||||
|
type Error = SendError<Vec<u8>>;
|
||||||
|
|
||||||
|
fn pass_pus_tc(&mut self, _: &SpHeader, pus_tc: &PusTcReader) -> Result<(), Self::Error> {
|
||||||
|
self.0.send(pus_tc.raw_data().to_vec())?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ReceivesCcsdsTc for PusTcSourceProviderDynamic {
|
||||||
|
type Error = mpsc::SendError<Vec<u8>>;
|
||||||
|
|
||||||
|
fn pass_ccsds(&mut self, _: &SpHeader, tc_raw: &[u8]) -> Result<(), Self::Error> {
|
||||||
|
self.0.send(tc_raw.to_vec())?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TC source components where the heap is the backing memory of the received telecommands.
|
||||||
|
pub struct TcSourceTaskDynamic {
|
||||||
|
pub tc_receiver: mpsc::Receiver<Vec<u8>>,
|
||||||
|
// pus_receiver: PusReceiver<MpscTmAsVecSender>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TcSourceTaskDynamic {
|
||||||
|
pub fn new(
|
||||||
|
tc_receiver: mpsc::Receiver<Vec<u8>>,
|
||||||
|
// pus_receiver: PusReceiver<MpscTmAsVecSender>,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
tc_receiver,
|
||||||
|
// pus_receiver,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn periodic_operation(&mut self) {
|
||||||
|
self.poll_tc();
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn poll_tc(&mut self) -> bool {
|
||||||
|
match self.tc_receiver.try_recv() {
|
||||||
|
Ok(tc) => match PusTcReader::new(&tc) {
|
||||||
|
Ok((pus_tc, _)) => {
|
||||||
|
/*
|
||||||
|
self.pus_receiver
|
||||||
|
.handle_tc_packet(
|
||||||
|
satrs::pus::TcInMemory::Vec(tc.clone()),
|
||||||
|
pus_tc.service(),
|
||||||
|
&pus_tc,
|
||||||
|
)
|
||||||
|
.ok();
|
||||||
|
*/
|
||||||
|
true
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
log::warn!("error creating PUS TC from raw data: {e}");
|
||||||
|
log::warn!("raw data: {:x?}", tc);
|
||||||
|
true
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(e) => match e {
|
||||||
|
TryRecvError::Empty => false,
|
||||||
|
TryRecvError::Disconnected => {
|
||||||
|
log::warn!("tmtc thread: sender disconnected");
|
||||||
|
false
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
185
src/udp.rs
Normal file
185
src/udp.rs
Normal file
@ -0,0 +1,185 @@
|
|||||||
|
use std::net::{SocketAddr, UdpSocket};
|
||||||
|
use std::sync::mpsc;
|
||||||
|
|
||||||
|
use log::{info, warn};
|
||||||
|
use satrs::pus::{PusTmAsVec, PusTmInPool};
|
||||||
|
use satrs::{
|
||||||
|
hal::std::udp_server::{ReceiveResult, UdpTcServer},
|
||||||
|
pool::{PoolProviderWithGuards, SharedStaticMemoryPool},
|
||||||
|
tmtc::CcsdsError,
|
||||||
|
};
|
||||||
|
|
||||||
|
pub trait UdpTmHandler {
|
||||||
|
fn send_tm_to_udp_client(&mut self, socket: &UdpSocket, recv_addr: &SocketAddr);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct DynamicUdpTmHandler {
|
||||||
|
pub tm_rx: mpsc::Receiver<PusTmAsVec>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl UdpTmHandler for DynamicUdpTmHandler {
|
||||||
|
fn send_tm_to_udp_client(&mut self, socket: &UdpSocket, recv_addr: &SocketAddr) {
|
||||||
|
while let Ok(tm) = self.tm_rx.try_recv() {
|
||||||
|
if tm.packet.len() > 9 {
|
||||||
|
let service = tm.packet[7];
|
||||||
|
let subservice = tm.packet[8];
|
||||||
|
info!("Sending PUS TM[{service},{subservice}]")
|
||||||
|
} else {
|
||||||
|
info!("Sending PUS TM");
|
||||||
|
}
|
||||||
|
let result = socket.send_to(&tm.packet, recv_addr);
|
||||||
|
if let Err(e) = result {
|
||||||
|
warn!("Sending TM with UDP socket failed: {e}")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct UdpTmtcServer<TmHandler: UdpTmHandler, SendError> {
|
||||||
|
pub udp_tc_server: UdpTcServer<CcsdsError<SendError>>,
|
||||||
|
pub tm_handler: TmHandler,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<TmHandler: UdpTmHandler, SendError: core::fmt::Debug + 'static>
|
||||||
|
UdpTmtcServer<TmHandler, SendError>
|
||||||
|
{
|
||||||
|
pub fn periodic_operation(&mut self) {
|
||||||
|
while self.poll_tc_server() {}
|
||||||
|
if let Some(recv_addr) = self.udp_tc_server.last_sender() {
|
||||||
|
self.tm_handler
|
||||||
|
.send_tm_to_udp_client(&self.udp_tc_server.socket, &recv_addr);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_tc_server(&mut self) -> bool {
|
||||||
|
match self.udp_tc_server.try_recv_tc() {
|
||||||
|
Ok(_) => true,
|
||||||
|
Err(e) => match e {
|
||||||
|
ReceiveResult::ReceiverError(e) => match e {
|
||||||
|
CcsdsError::ByteConversionError(e) => {
|
||||||
|
warn!("packet error: {e:?}");
|
||||||
|
true
|
||||||
|
}
|
||||||
|
CcsdsError::CustomError(e) => {
|
||||||
|
warn!("mpsc custom error {e:?}");
|
||||||
|
true
|
||||||
|
}
|
||||||
|
},
|
||||||
|
ReceiveResult::IoError(e) => {
|
||||||
|
warn!("IO error {e}");
|
||||||
|
false
|
||||||
|
}
|
||||||
|
ReceiveResult::NothingReceived => false,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use std::{
|
||||||
|
collections::VecDeque,
|
||||||
|
net::IpAddr,
|
||||||
|
sync::{Arc, Mutex},
|
||||||
|
};
|
||||||
|
|
||||||
|
use satrs::{
|
||||||
|
spacepackets::{
|
||||||
|
ecss::{tc::PusTcCreator, WritablePusPacket},
|
||||||
|
SpHeader,
|
||||||
|
},
|
||||||
|
tmtc::ReceivesTcCore,
|
||||||
|
};
|
||||||
|
use satrs_example::config::{components, OBSW_SERVER_ADDR};
|
||||||
|
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[derive(Default, Debug, Clone)]
|
||||||
|
pub struct TestReceiver {
|
||||||
|
tc_vec: Arc<Mutex<VecDeque<Vec<u8>>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ReceivesTcCore for TestReceiver {
|
||||||
|
type Error = CcsdsError<()>;
|
||||||
|
fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> {
|
||||||
|
self.tc_vec.lock().unwrap().push_back(tc_raw.to_vec());
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Default, Debug, Clone)]
|
||||||
|
pub struct TestTmHandler {
|
||||||
|
addrs_to_send_to: Arc<Mutex<VecDeque<SocketAddr>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl UdpTmHandler for TestTmHandler {
|
||||||
|
fn send_tm_to_udp_client(&mut self, _socket: &UdpSocket, recv_addr: &SocketAddr) {
|
||||||
|
self.addrs_to_send_to.lock().unwrap().push_back(*recv_addr);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_basic() {
|
||||||
|
let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), 0);
|
||||||
|
let test_receiver = TestReceiver::default();
|
||||||
|
let tc_queue = test_receiver.tc_vec.clone();
|
||||||
|
let udp_tc_server = UdpTcServer::new(sock_addr, 2048, Box::new(test_receiver)).unwrap();
|
||||||
|
let tm_handler = TestTmHandler::default();
|
||||||
|
let tm_handler_calls = tm_handler.addrs_to_send_to.clone();
|
||||||
|
let mut udp_dyn_server = UdpTmtcServer {
|
||||||
|
udp_tc_server,
|
||||||
|
tm_handler,
|
||||||
|
};
|
||||||
|
udp_dyn_server.periodic_operation();
|
||||||
|
assert!(tc_queue.lock().unwrap().is_empty());
|
||||||
|
assert!(tm_handler_calls.lock().unwrap().is_empty());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_transactions() {
|
||||||
|
let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), 0);
|
||||||
|
let test_receiver = TestReceiver::default();
|
||||||
|
let tc_queue = test_receiver.tc_vec.clone();
|
||||||
|
let udp_tc_server = UdpTcServer::new(sock_addr, 2048, Box::new(test_receiver)).unwrap();
|
||||||
|
let server_addr = udp_tc_server.socket.local_addr().unwrap();
|
||||||
|
let tm_handler = TestTmHandler::default();
|
||||||
|
let tm_handler_calls = tm_handler.addrs_to_send_to.clone();
|
||||||
|
let mut udp_dyn_server = UdpTmtcServer {
|
||||||
|
udp_tc_server,
|
||||||
|
tm_handler,
|
||||||
|
};
|
||||||
|
let sph = SpHeader::new_for_unseg_tc(components::Apid::GenericPus as u16, 0, 0);
|
||||||
|
let ping_tc = PusTcCreator::new_simple(sph, 17, 1, &[], true)
|
||||||
|
.to_vec()
|
||||||
|
.unwrap();
|
||||||
|
let client = UdpSocket::bind("127.0.0.1:0").expect("Connecting to UDP server failed");
|
||||||
|
let client_addr = client.local_addr().unwrap();
|
||||||
|
client.connect(server_addr).unwrap();
|
||||||
|
client.send(&ping_tc).unwrap();
|
||||||
|
udp_dyn_server.periodic_operation();
|
||||||
|
{
|
||||||
|
let mut tc_queue = tc_queue.lock().unwrap();
|
||||||
|
assert!(!tc_queue.is_empty());
|
||||||
|
let received_tc = tc_queue.pop_front().unwrap();
|
||||||
|
assert_eq!(received_tc, ping_tc);
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
let mut tm_handler_calls = tm_handler_calls.lock().unwrap();
|
||||||
|
assert!(!tm_handler_calls.is_empty());
|
||||||
|
assert_eq!(tm_handler_calls.len(), 1);
|
||||||
|
let received_addr = tm_handler_calls.pop_front().unwrap();
|
||||||
|
assert_eq!(received_addr, client_addr);
|
||||||
|
}
|
||||||
|
udp_dyn_server.periodic_operation();
|
||||||
|
assert!(tc_queue.lock().unwrap().is_empty());
|
||||||
|
// Still tries to send to the same client.
|
||||||
|
{
|
||||||
|
let mut tm_handler_calls = tm_handler_calls.lock().unwrap();
|
||||||
|
assert!(!tm_handler_calls.is_empty());
|
||||||
|
assert_eq!(tm_handler_calls.len(), 1);
|
||||||
|
let received_addr = tm_handler_calls.pop_front().unwrap();
|
||||||
|
assert_eq!(received_addr, client_addr);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user