From 6dddfd5a709007fa779511c05fb9e2e70e3663cb Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Sat, 13 Apr 2024 15:16:53 +0200 Subject: [PATCH 1/7] start with mio tcp client --- Cargo.lock | 1 + Cargo.toml | 1 + README.md | 1 + src/config.rs | 1 + src/interface/mod.rs | 5 +- src/interface/{tcp.rs => tcp_server.rs} | 0 src/interface/tcp_spp_client.rs | 83 +++++++++++++++ src/interface/{udp.rs => udp_server.rs} | 0 src/main.rs | 4 +- src/pus/mod.rs | 4 +- src/tmtc/ccsds.rs | 49 --------- src/tmtc/mod.rs | 99 +----------------- src/tmtc/tc_source.rs | 128 ++++++++++++++++++++++++ src/tmtc/{tm_funnel.rs => tm_sink.rs} | 2 +- 14 files changed, 225 insertions(+), 153 deletions(-) rename src/interface/{tcp.rs => tcp_server.rs} (100%) create mode 100644 src/interface/tcp_spp_client.rs rename src/interface/{udp.rs => udp_server.rs} (100%) delete mode 100644 src/tmtc/ccsds.rs create mode 100644 src/tmtc/tc_source.rs rename src/tmtc/{tm_funnel.rs => tm_sink.rs} (98%) diff --git a/Cargo.lock b/Cargo.lock index d9d0993..01d6b53 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -483,6 +483,7 @@ dependencies = [ "fern", "lazy_static", "log", + "mio", "num_enum", "satrs", "satrs-mib", diff --git a/Cargo.toml b/Cargo.toml index 0b0fd40..3c4e1b4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ strum = { version = "0.26", features = ["derive"] } thiserror = "1" derive-new = "0.6" num_enum = "0.7" +mio = "0.8" [dependencies.satrs] version = "0.2.0-rc.0" diff --git a/README.md b/README.md index c0b42a6..069dd95 100644 --- a/README.md +++ b/README.md @@ -34,6 +34,7 @@ appears to be a useful source for documentation. - [OBSW documents](https://opssat1.esoc.esa.int/projects/experimenter-information/dmsf?folder_id=7) - [Software Integration Process](https://opssat1.esoc.esa.int/dmsf/files/34/view) +- [SPP/TCP bridge](https://opssat1.esoc.esa.int/dmsf/files/65/view) - [Cross-compiling SEPP](https://opssat1.esoc.esa.int/projects/experimenter-information/wiki/Cross-compiling_SEPP_application) - [TMTC infrastructure](https://opssat1.esoc.esa.int/projects/experimenter-information/wiki/Live_TM_TC_data) - [Submitting an Experiment](https://opssat1.esoc.esa.int/projects/experimenter-information/wiki/Building_and_submitting_your_application_to_ESOC) diff --git a/src/config.rs b/src/config.rs index ef391a7..78938bc 100644 --- a/src/config.rs +++ b/src/config.rs @@ -11,6 +11,7 @@ pub const HOME_FOLER_EXPERIMENT: &str = "/home/exp278"; pub const OBSW_SERVER_ADDR: Ipv4Addr = Ipv4Addr::UNSPECIFIED; pub const SERVER_PORT: u16 = 7301; +pub const TCP_SPP_SERVER_PORT: u16 = 4096; pub const EXPERIMENT_ID: u32 = 278; pub const EXPERIMENT_APID: u16 = 1024 + EXPERIMENT_ID as u16; diff --git a/src/interface/mod.rs b/src/interface/mod.rs index cc4703d..6439a55 100644 --- a/src/interface/mod.rs +++ b/src/interface/mod.rs @@ -1,3 +1,4 @@ pub mod can; -pub mod tcp; -pub mod udp; +pub mod tcp_server; +pub mod tcp_spp_client; +pub mod udp_server; diff --git a/src/interface/tcp.rs b/src/interface/tcp_server.rs similarity index 100% rename from src/interface/tcp.rs rename to src/interface/tcp_server.rs diff --git a/src/interface/tcp_spp_client.rs b/src/interface/tcp_spp_client.rs new file mode 100644 index 0000000..3108f9a --- /dev/null +++ b/src/interface/tcp_spp_client.rs @@ -0,0 +1,83 @@ +use std::io::{self, Read}; +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::sync::mpsc; + +use mio::net::TcpStream; +use mio::{Events, Interest, Poll, Token}; +use ops_sat_rs::config::TCP_SPP_SERVER_PORT; +use satrs::spacepackets::ecss::CCSDS_HEADER_LEN; +use satrs::spacepackets::{CcsdsPacket, SpHeader}; + +pub struct TcpSppClient { + poll: Poll, + events: Events, + client: TcpStream, + read_buf: [u8; 4096], + tc_source_tx: mpsc::Sender>, +} + +impl TcpSppClient { + pub fn new() -> io::Result { + let poll = Poll::new()?; + let events = Events::with_capacity(128); + let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 1)), TCP_SPP_SERVER_PORT); + let mut client = TcpStream::connect(addr)?; + poll.registry().register( + &mut client, + Token(0), + Interest::READABLE | Interest::WRITABLE, + )?; + Ok(Self { + poll, + events, + client, + read_buf: [0; 4096], + }) + } + + pub fn periodic_operation(&mut self) -> io::Result<()> { + self.poll.poll(&mut self.events, None)?; + let events: Vec = self.events.iter().cloned().collect(); + for event in events { + if event.token() == Token(0) { + if event.is_readable() { + self.read_from_server()?; + } + if event.is_writable() { + // Read packets from a queue and send them here.. + // self.client.write_all(b"hello")?; + } + } + } + Ok(()) + } + + pub fn read_from_server(&mut self) -> io::Result<()> { + match self.client.read(&mut self.read_buf) { + Ok(0) => return Err(io::Error::from(io::ErrorKind::BrokenPipe)), + Ok(n) => { + if n < CCSDS_HEADER_LEN + 1 { + log::warn!("received packet smaller than minimum expected packet size."); + log::debug!("{:?}", &self.read_buf[..n]); + return Ok(()); + } + // We already checked that the received size has the minimum expected size. + let (sp_header, data) = + SpHeader::from_be_bytes(&self.read_buf[..n]).expect("parsing SP header failed"); + let full_expected_packet_len = sp_header.total_len(); + // We received an incomplete frame? + if n < full_expected_packet_len { + log::warn!( + "received incomplete SPP, with detected packet length {} but read buffer length {}", + full_expected_packet_len, n + ); + return Ok(()); + } + self.tc_source_tx + .send(self.read_buf[0..full_expected_packet_len].to_vec()); + } + Err(e) => return Err(e), + } + Ok(()) + } +} diff --git a/src/interface/udp.rs b/src/interface/udp_server.rs similarity index 100% rename from src/interface/udp.rs rename to src/interface/udp_server.rs diff --git a/src/main.rs b/src/main.rs index db66e01..a03cba6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -23,8 +23,8 @@ use crate::tmtc::tm_funnel::TmFunnelDynamic; use crate::tmtc::TcSourceTaskDynamic; use crate::{controller::ExperimentController, pus::test::create_test_service}; use crate::{ - interface::tcp::{SyncTcpTmSource, TcpTask}, - interface::udp::{DynamicUdpTmHandler, UdpTmtcServer}, + interface::tcp_server::{SyncTcpTmSource, TcpTask}, + interface::udp_server::{DynamicUdpTmHandler, UdpTmtcServer}, logger::setup_logger, tmtc::ccsds::CcsdsReceiver, tmtc::PusTcSourceProviderDynamic, diff --git a/src/pus/mod.rs b/src/pus/mod.rs index eda55d4..3731892 100644 --- a/src/pus/mod.rs +++ b/src/pus/mod.rs @@ -59,7 +59,7 @@ pub struct PusTcMpscRouter { // pub mode_tc_sender: Sender, } -pub struct PusReceiver { +pub struct PusTcDistributor { pub id: ComponentId, pub tm_sender: TmSender, pub verif_reporter: VerificationReporter, @@ -67,7 +67,7 @@ pub struct PusReceiver { stamp_helper: TimeStampHelper, } -impl PusReceiver { +impl PusTcDistributor { pub fn new(tm_sender: TmSender, pus_router: PusTcMpscRouter) -> Self { Self { id: PUS_ROUTING_SERVICE.raw(), diff --git a/src/tmtc/ccsds.rs b/src/tmtc/ccsds.rs deleted file mode 100644 index 97edf26..0000000 --- a/src/tmtc/ccsds.rs +++ /dev/null @@ -1,49 +0,0 @@ -use ops_sat_rs::config::EXPERIMENT_APID; -use satrs::pus::ReceivesEcssPusTc; -use satrs::spacepackets::{CcsdsPacket, SpHeader}; -use satrs::tmtc::{CcsdsPacketHandler, ReceivesCcsdsTc}; -use satrs::ValidatorU16Id; - -#[derive(Clone)] -pub struct CcsdsReceiver< - TcSource: ReceivesCcsdsTc + ReceivesEcssPusTc + Clone, - E, -> { - pub tc_source: TcSource, -} - -impl< - TcSource: ReceivesCcsdsTc + ReceivesEcssPusTc + Clone + 'static, - E: 'static, - > ValidatorU16Id for CcsdsReceiver -{ - fn validate(&self, apid: u16) -> bool { - apid == EXPERIMENT_APID - } -} - -impl< - TcSource: ReceivesCcsdsTc + ReceivesEcssPusTc + Clone + 'static, - E: 'static, - > CcsdsPacketHandler for CcsdsReceiver -{ - type Error = E; - - fn handle_packet_with_valid_apid( - &mut self, - sp_header: &SpHeader, - tc_raw: &[u8], - ) -> Result<(), Self::Error> { - self.tc_source.pass_ccsds(sp_header, tc_raw) - } - - fn handle_packet_with_unknown_apid( - &mut self, - sp_header: &SpHeader, - _tc_raw: &[u8], - ) -> Result<(), Self::Error> { - // TODO: Log event as telemetry or something similar? - log::warn!("unknown APID 0x{:x?} detected", sp_header.apid()); - Ok(()) - } -} diff --git a/src/tmtc/mod.rs b/src/tmtc/mod.rs index fc28a37..bfd24c5 100644 --- a/src/tmtc/mod.rs +++ b/src/tmtc/mod.rs @@ -1,97 +1,2 @@ -use crate::pus::PusReceiver; -use satrs::pool::{StoreAddr, StoreError}; -use satrs::pus::{EcssTcAndToken, MpscTmAsVecSender}; -use satrs::spacepackets::ecss::PusPacket; -use satrs::{ - pus::ReceivesEcssPusTc, - spacepackets::{ecss::tc::PusTcReader, SpHeader}, - tmtc::ReceivesCcsdsTc, -}; -use std::sync::mpsc::{self, SendError, Sender, TryRecvError}; -use thiserror::Error; - -pub mod ccsds; -pub mod tm_funnel; - -#[derive(Debug, Clone, PartialEq, Eq, Error)] -pub enum MpscStoreAndSendError { - #[error("Store error: {0}")] - Store(#[from] StoreError), - #[error("TC send error: {0}")] - TcSend(#[from] SendError), - #[error("TMTC send error: {0}")] - TmTcSend(#[from] SendError), -} - -// Newtype, can not implement necessary traits on MPSC sender directly because of orphan rules. -#[derive(Clone)] -pub struct PusTcSourceProviderDynamic(pub Sender>); - -impl ReceivesEcssPusTc for PusTcSourceProviderDynamic { - type Error = SendError>; - - fn pass_pus_tc(&mut self, _: &SpHeader, pus_tc: &PusTcReader) -> Result<(), Self::Error> { - self.0.send(pus_tc.raw_data().to_vec())?; - Ok(()) - } -} - -impl ReceivesCcsdsTc for PusTcSourceProviderDynamic { - type Error = mpsc::SendError>; - - fn pass_ccsds(&mut self, _: &SpHeader, tc_raw: &[u8]) -> Result<(), Self::Error> { - self.0.send(tc_raw.to_vec())?; - Ok(()) - } -} - -// TC source components where the heap is the backing memory of the received telecommands. -pub struct TcSourceTaskDynamic { - pub tc_receiver: mpsc::Receiver>, - pus_receiver: PusReceiver, -} - -impl TcSourceTaskDynamic { - pub fn new( - tc_receiver: mpsc::Receiver>, - pus_receiver: PusReceiver, - ) -> Self { - Self { - tc_receiver, - pus_receiver, - } - } - - pub fn periodic_operation(&mut self) { - self.poll_tc(); - } - - pub fn poll_tc(&mut self) -> bool { - match self.tc_receiver.try_recv() { - Ok(tc) => match PusTcReader::new(&tc) { - Ok((pus_tc, _)) => { - self.pus_receiver - .handle_tc_packet( - satrs::pus::TcInMemory::Vec(tc.clone()), - pus_tc.service(), - &pus_tc, - ) - .ok(); - true - } - Err(e) => { - log::warn!("error creating PUS TC from raw data: {e}"); - log::warn!("raw data: {:x?}", tc); - true - } - }, - Err(e) => match e { - TryRecvError::Empty => false, - TryRecvError::Disconnected => { - log::warn!("tmtc thread: sender disconnected"); - false - } - }, - } - } -} +pub mod tc_source; +pub mod tm_sink; diff --git a/src/tmtc/tc_source.rs b/src/tmtc/tc_source.rs new file mode 100644 index 0000000..0371113 --- /dev/null +++ b/src/tmtc/tc_source.rs @@ -0,0 +1,128 @@ +use satrs::{pool::PoolProvider, tmtc::tc_helper::SharedTcPool}; +use std::sync::mpsc::{self, TryRecvError}; + +use satrs::{ + pool::StoreAddr, + pus::{MpscTmAsVecSender, MpscTmInSharedPoolSenderBounded}, + spacepackets::ecss::{tc::PusTcReader, PusPacket}, +}; + +use crate::pus::PusTcDistributor; + +// TC source components where static pools are the backing memory of the received telecommands. +pub struct TcSourceTaskStatic { + shared_tc_pool: SharedTcPool, + tc_receiver: mpsc::Receiver, + tc_buf: [u8; 4096], + pus_receiver: PusTcDistributor, +} + +impl TcSourceTaskStatic { + pub fn new( + shared_tc_pool: SharedTcPool, + tc_receiver: mpsc::Receiver, + pus_receiver: PusTcDistributor, + ) -> Self { + Self { + shared_tc_pool, + tc_receiver, + tc_buf: [0; 4096], + pus_receiver, + } + } + + pub fn periodic_operation(&mut self) { + self.poll_tc(); + } + + pub fn poll_tc(&mut self) -> bool { + match self.tc_receiver.try_recv() { + Ok(addr) => { + let pool = self + .shared_tc_pool + .0 + .read() + .expect("locking tc pool failed"); + pool.read(&addr, &mut self.tc_buf) + .expect("reading pool failed"); + drop(pool); + match PusTcReader::new(&self.tc_buf) { + Ok((pus_tc, _)) => { + self.pus_receiver + .handle_tc_packet( + satrs::pus::TcInMemory::StoreAddr(addr), + pus_tc.service(), + &pus_tc, + ) + .ok(); + true + } + Err(e) => { + log::warn!("error creating PUS TC from raw data: {e}"); + log::warn!("raw data: {:x?}", self.tc_buf); + true + } + } + } + Err(e) => match e { + TryRecvError::Empty => false, + TryRecvError::Disconnected => { + log::warn!("tmtc thread: sender disconnected"); + false + } + }, + } + } +} + +// TC source components where the heap is the backing memory of the received telecommands. +pub struct TcSourceTaskDynamic { + pub tc_receiver: mpsc::Receiver>, + pus_receiver: PusTcDistributor, +} + +impl TcSourceTaskDynamic { + pub fn new( + tc_receiver: mpsc::Receiver>, + pus_receiver: PusTcDistributor, + ) -> Self { + Self { + tc_receiver, + pus_receiver, + } + } + + pub fn periodic_operation(&mut self) { + self.poll_tc(); + } + + pub fn poll_tc(&mut self) -> bool { + // Right now, we only expect PUS packets. + match self.tc_receiver.try_recv() { + Ok(tc) => match PusTcReader::new(&tc) { + Ok((pus_tc, _)) => { + self.pus_receiver + .handle_tc_packet( + satrs::pus::TcInMemory::Vec(tc.clone()), + pus_tc.service(), + &pus_tc, + ) + .ok(); + true + } + Err(e) => { + log::warn!("error creating PUS TC from raw data: {e}"); + log::warn!("raw data: {:x?}", tc); + true + } + }, + Err(e) => match e { + TryRecvError::Empty => false, + TryRecvError::Disconnected => { + log::warn!("tmtc thread: sender disconnected"); + false + } + }, + } + } +} diff --git a/src/tmtc/tm_funnel.rs b/src/tmtc/tm_sink.rs similarity index 98% rename from src/tmtc/tm_funnel.rs rename to src/tmtc/tm_sink.rs index a0ef7b1..7e4fe2e 100644 --- a/src/tmtc/tm_funnel.rs +++ b/src/tmtc/tm_sink.rs @@ -14,7 +14,7 @@ use satrs::{ }, }; -use crate::interface::tcp::SyncTcpTmSource; +use crate::interface::tcp_server::SyncTcpTmSource; #[derive(Default)] pub struct CcsdsSeqCounterMap { From df72676c0d921535e95f10c48eb9a7af424074c2 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Mon, 15 Apr 2024 12:16:01 +0200 Subject: [PATCH 2/7] cleaned up code and bumped sat-rs --- Cargo.lock | 101 +++++++++++++++------------- Cargo.toml | 4 +- src/config.rs | 9 +++ src/controller.rs | 6 +- src/interface/tcp_server.rs | 60 ++++++----------- src/interface/tcp_spp_client.rs | 77 +++++++++++++--------- src/interface/udp_server.rs | 99 ++++++++++++++-------------- src/main.rs | 31 ++++----- src/pus/action.rs | 42 ++++++------ src/pus/mod.rs | 52 ++++++++------- src/pus/test.rs | 5 +- src/requests.rs | 4 +- src/tmtc/tc_source.rs | 113 +++++--------------------------- src/tmtc/tm_sink.rs | 10 +-- 14 files changed, 273 insertions(+), 340 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 01d6b53..9b62799 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -25,9 +25,9 @@ dependencies = [ [[package]] name = "allocator-api2" -version = "0.2.16" +version = "0.2.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0942ffc6dcaadf03badf6e6a2d0228460359d5e34b57ccdc720b7382dfbd5ec5" +checksum = "5c6cb57a04249c6480766f7f7cef5467412af1490f8d1e243141daddada3264f" [[package]] name = "android-tzdata" @@ -138,9 +138,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "cc" -version = "1.0.92" +version = "1.0.94" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2678b2e3449475e95b0aa6f9b506a28e61b3dc8996592b983695e8ebb58a8b41" +checksum = "17f6e324229dc011159fcc089755d1e2e216a90d43a7dea6853ca740b84f35e7" [[package]] name = "cfg-if" @@ -160,7 +160,7 @@ dependencies = [ "num-traits", "serde", "wasm-bindgen", - "windows-targets 0.52.4", + "windows-targets 0.52.5", ] [[package]] @@ -250,7 +250,7 @@ checksum = "d150dea618e920167e5973d70ae6ece4385b7164e0d799fe7c122dd0a5d912ad" dependencies = [ "proc-macro2", "quote", - "syn 2.0.58", + "syn 2.0.59", ] [[package]] @@ -464,7 +464,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote", - "syn 2.0.58", + "syn 2.0.59", ] [[package]] @@ -521,9 +521,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.79" +version = "1.0.80" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e835ff2298f5721608eb1a980ecaee1aef2c132bf95ecc026a11b7bf3c01c02e" +checksum = "a56dea16b0a29e94408b9aa5e2940a4eedbd128a1ba20e8f7ae60fd3d465af0e" dependencies = [ "unicode-ident", ] @@ -590,7 +590,7 @@ checksum = "e86697c916019a8588c99b5fac3cead74ec0b4b819707a682fd4d23fa0ce1ba1" [[package]] name = "satrs" version = "0.2.0-rc.0" -source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#d43a8eb571ea3f2198f75ecdc125921272d6bc7f" +source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=rework-tmtc-modules#301a7a19a62a4f4541e8c7299cd67ab347c44352" dependencies = [ "bus", "cobs", @@ -615,7 +615,7 @@ dependencies = [ [[package]] name = "satrs-mib" version = "0.1.1" -source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#d43a8eb571ea3f2198f75ecdc125921272d6bc7f" +source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=rework-tmtc-modules#301a7a19a62a4f4541e8c7299cd67ab347c44352" dependencies = [ "csv", "satrs-mib-codegen", @@ -627,17 +627,17 @@ dependencies = [ [[package]] name = "satrs-mib-codegen" version = "0.1.1" -source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#d43a8eb571ea3f2198f75ecdc125921272d6bc7f" +source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=rework-tmtc-modules#301a7a19a62a4f4541e8c7299cd67ab347c44352" dependencies = [ "proc-macro2", "quote", - "syn 2.0.58", + "syn 2.0.59", ] [[package]] name = "satrs-shared" version = "0.1.3" -source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#d43a8eb571ea3f2198f75ecdc125921272d6bc7f" +source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=rework-tmtc-modules#301a7a19a62a4f4541e8c7299cd67ab347c44352" dependencies = [ "serde", "spacepackets", @@ -671,7 +671,7 @@ checksum = "7eb0b34b42edc17f6b7cac84a52a1c5f0e1bb2227e997ca9011ea3dd34e8610b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.58", + "syn 2.0.59", ] [[package]] @@ -734,7 +734,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.58", + "syn 2.0.59", ] [[package]] @@ -750,9 +750,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.58" +version = "2.0.59" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44cfb93f38070beee36b3fef7d4f5a16f27751d94b187b666a5cc5e9b0d30687" +checksum = "4a6531ffc7b071655e4ce2e04bd464c4830bb585a61cabb96cf808f05172615a" dependencies = [ "proc-macro2", "quote", @@ -776,7 +776,7 @@ checksum = "c61f3ba182994efc43764a46c018c347bc492c79f024e705f46567b418f6d4f7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.58", + "syn 2.0.59", ] [[package]] @@ -841,7 +841,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.58", + "syn 2.0.59", "wasm-bindgen-shared", ] @@ -863,7 +863,7 @@ checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.58", + "syn 2.0.59", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -880,7 +880,7 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" dependencies = [ - "windows-targets 0.52.4", + "windows-targets 0.52.5", ] [[package]] @@ -898,7 +898,7 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" dependencies = [ - "windows-targets 0.52.4", + "windows-targets 0.52.5", ] [[package]] @@ -918,17 +918,18 @@ dependencies = [ [[package]] name = "windows-targets" -version = "0.52.4" +version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7dd37b7e5ab9018759f893a1952c9420d060016fc19a472b4bb20d1bdd694d1b" +checksum = "6f0713a46559409d202e70e28227288446bf7841d3211583a4b53e3f6d96e7eb" dependencies = [ - "windows_aarch64_gnullvm 0.52.4", - "windows_aarch64_msvc 0.52.4", - "windows_i686_gnu 0.52.4", - "windows_i686_msvc 0.52.4", - "windows_x86_64_gnu 0.52.4", - "windows_x86_64_gnullvm 0.52.4", - "windows_x86_64_msvc 0.52.4", + "windows_aarch64_gnullvm 0.52.5", + "windows_aarch64_msvc 0.52.5", + "windows_i686_gnu 0.52.5", + "windows_i686_gnullvm", + "windows_i686_msvc 0.52.5", + "windows_x86_64_gnu 0.52.5", + "windows_x86_64_gnullvm 0.52.5", + "windows_x86_64_msvc 0.52.5", ] [[package]] @@ -939,9 +940,9 @@ checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" [[package]] name = "windows_aarch64_gnullvm" -version = "0.52.4" +version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bcf46cf4c365c6f2d1cc93ce535f2c8b244591df96ceee75d8e83deb70a9cac9" +checksum = "7088eed71e8b8dda258ecc8bac5fb1153c5cffaf2578fc8ff5d61e23578d3263" [[package]] name = "windows_aarch64_msvc" @@ -951,9 +952,9 @@ checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" [[package]] name = "windows_aarch64_msvc" -version = "0.52.4" +version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da9f259dd3bcf6990b55bffd094c4f7235817ba4ceebde8e6d11cd0c5633b675" +checksum = "9985fd1504e250c615ca5f281c3f7a6da76213ebd5ccc9561496568a2752afb6" [[package]] name = "windows_i686_gnu" @@ -963,9 +964,15 @@ checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" [[package]] name = "windows_i686_gnu" -version = "0.52.4" +version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b474d8268f99e0995f25b9f095bc7434632601028cf86590aea5c8a5cb7801d3" +checksum = "88ba073cf16d5372720ec942a8ccbf61626074c6d4dd2e745299726ce8b89670" + +[[package]] +name = "windows_i686_gnullvm" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87f4261229030a858f36b459e748ae97545d6f1ec60e5e0d6a3d32e0dc232ee9" [[package]] name = "windows_i686_msvc" @@ -975,9 +982,9 @@ checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" [[package]] name = "windows_i686_msvc" -version = "0.52.4" +version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1515e9a29e5bed743cb4415a9ecf5dfca648ce85ee42e15873c3cd8610ff8e02" +checksum = "db3c2bf3d13d5b658be73463284eaf12830ac9a26a90c717b7f771dfe97487bf" [[package]] name = "windows_x86_64_gnu" @@ -987,9 +994,9 @@ checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" [[package]] name = "windows_x86_64_gnu" -version = "0.52.4" +version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5eee091590e89cc02ad514ffe3ead9eb6b660aedca2183455434b93546371a03" +checksum = "4e4246f76bdeff09eb48875a0fd3e2af6aada79d409d33011886d3e1581517d9" [[package]] name = "windows_x86_64_gnullvm" @@ -999,9 +1006,9 @@ checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" [[package]] name = "windows_x86_64_gnullvm" -version = "0.52.4" +version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77ca79f2451b49fa9e2af39f0747fe999fcda4f5e241b2898624dca97a1f2177" +checksum = "852298e482cd67c356ddd9570386e2862b5673c85bd5f88df9ab6802b334c596" [[package]] name = "windows_x86_64_msvc" @@ -1011,9 +1018,9 @@ checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" [[package]] name = "windows_x86_64_msvc" -version = "0.52.4" +version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32b752e52a2da0ddfbdbcc6fceadfeede4c939ed16d13e648833a61dfb611ed8" +checksum = "bec47e5bfd1bff0eeaf6d8b485cc1074891a197ab4225d504cb7a1ab88b02bf0" [[package]] name = "winnow" @@ -1042,5 +1049,5 @@ checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.58", + "syn 2.0.59", ] diff --git a/Cargo.toml b/Cargo.toml index 3c4e1b4..f1c0533 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,13 +19,13 @@ mio = "0.8" [dependencies.satrs] version = "0.2.0-rc.0" git = "https://egit.irs.uni-stuttgart.de/rust/sat-rs.git" -branch = "main" +branch = "rework-tmtc-modules" features = ["test_util"] [dependencies.satrs-mib] version = "0.1.1" git = "https://egit.irs.uni-stuttgart.de/rust/sat-rs.git" -branch = "main" +branch = "rework-tmtc-modules" [dev-dependencies] env_logger = "0.11" diff --git a/src/config.rs b/src/config.rs index 78938bc..12c334f 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,5 +1,6 @@ use lazy_static::lazy_static; use num_enum::{IntoPrimitive, TryFromPrimitive}; +use satrs::spacepackets::PacketId; use satrs_mib::res_code::ResultU16Info; use satrs_mib::resultcode; use std::env; @@ -14,6 +15,7 @@ pub const SERVER_PORT: u16 = 7301; pub const TCP_SPP_SERVER_PORT: u16 = 4096; pub const EXPERIMENT_ID: u32 = 278; pub const EXPERIMENT_APID: u16 = 1024 + EXPERIMENT_ID as u16; +pub const EXPERIMENT_PACKET_ID: PacketId = PacketId::new_for_tc(true, EXPERIMENT_APID); #[derive(Copy, Clone, PartialEq, Eq, Debug, TryFromPrimitive, IntoPrimitive)] #[repr(u8)] @@ -104,6 +106,9 @@ pub mod components { PusAction = 4, PusMode = 5, PusHk = 6, + UdpServer = 7, + TcpServer = 8, + TcpSppClient = 9, } pub const CONTROLLER_ID: UniqueApidTargetId = @@ -120,6 +125,10 @@ pub mod components { UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::PusMode as u32); pub const PUS_HK_SERVICE: UniqueApidTargetId = UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::PusHk as u32); + pub const UDP_SERVER: UniqueApidTargetId = + UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::UdpServer as u32); + pub const TCP_SERVER: UniqueApidTargetId = + UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::TcpServer as u32); } pub mod tasks { diff --git a/src/controller.rs b/src/controller.rs index f69d76b..69c27c8 100644 --- a/src/controller.rs +++ b/src/controller.rs @@ -1,7 +1,7 @@ use num_enum::TryFromPrimitive; use satrs::{ action::ActionRequest, - pus::action::{ActionReplyVariant, PusActionReply}, + pus::action::{ActionReplyPus, ActionReplyVariant}, request::{GenericMessage, MessageMetadata}, }; use std::{ @@ -22,7 +22,7 @@ pub enum ActionId { pub struct ExperimentController { pub composite_request_rx: mpsc::Receiver>, - pub action_reply_tx: mpsc::Sender>, + pub action_reply_tx: mpsc::Sender>, pub stop_signal: Arc, home_path_stop_file: PathBuf, tmp_path_stop_file: PathBuf, @@ -31,7 +31,7 @@ pub struct ExperimentController { impl ExperimentController { pub fn new( composite_request_rx: mpsc::Receiver>, - action_reply_tx: mpsc::Sender>, + action_reply_tx: mpsc::Sender>, stop_signal: Arc, ) -> Self { let mut home_path_stop_file = PathBuf::new(); diff --git a/src/interface/tcp_server.rs b/src/interface/tcp_server.rs index f722dfa..52df8ea 100644 --- a/src/interface/tcp_server.rs +++ b/src/interface/tcp_server.rs @@ -1,6 +1,6 @@ use std::{ collections::VecDeque, - sync::{atomic::AtomicBool, Arc, Mutex}, + sync::{atomic::AtomicBool, mpsc, Arc, Mutex}, time::Duration, }; @@ -8,13 +8,12 @@ use log::{info, warn}; use ops_sat_rs::config::tasks::STOP_CHECK_FREQUENCY; use satrs::{ hal::std::tcp_server::{HandledConnectionHandler, ServerConfig, TcpSpacepacketsServer}, - pus::ReceivesEcssPusTc, + pus::PacketAsVec, + queue::GenericSendError, spacepackets::PacketId, - tmtc::{CcsdsDistributor, CcsdsError, ReceivesCcsdsTc, TmPacketSourceCore}, + tmtc::PacketSource, }; -use crate::tmtc::ccsds::CcsdsReceiver; - #[derive(Default, Clone)] pub struct SyncTcpTmSource { tm_queue: Arc>>>, @@ -43,7 +42,7 @@ impl SyncTcpTmSource { } } -impl TmPacketSourceCore for SyncTcpTmSource { +impl PacketSource for SyncTcpTmSource { type Error = (); fn retrieve_packet(&mut self, buffer: &mut [u8]) -> Result { @@ -83,57 +82,38 @@ impl HandledConnectionHandler for ConnectionFinishedHandler { } } -pub type TcpServerType = TcpSpacepacketsServer< +pub type TcpServer = TcpSpacepacketsServer< SyncTcpTmSource, - CcsdsDistributor, MpscErrorType>, + mpsc::Sender, Vec, ConnectionFinishedHandler, (), - CcsdsError, + GenericSendError, >; -pub struct TcpTask< - TcSource: ReceivesCcsdsTc - + ReceivesEcssPusTc - + Clone - + Send - + 'static, - MpscErrorType: 'static, -> { - server: TcpServerType, -} +pub struct TcpTask(pub TcpServer); -impl< - TcSource: ReceivesCcsdsTc - + ReceivesEcssPusTc - + Clone - + Send - + 'static, - MpscErrorType: 'static + core::fmt::Debug, - > TcpTask -{ +impl TcpTask { pub fn new( cfg: ServerConfig, tm_source: SyncTcpTmSource, - tc_receiver: CcsdsDistributor, MpscErrorType>, + tc_sender: mpsc::Sender, packet_id_lookup: Vec, stop_signal: Arc, ) -> Result { - Ok(Self { - server: TcpSpacepacketsServer::new( - cfg, - tm_source, - tc_receiver, - packet_id_lookup, - ConnectionFinishedHandler::default(), - Some(stop_signal), - )?, - }) + Ok(Self(TcpSpacepacketsServer::new( + cfg, + tm_source, + tc_sender, + packet_id_lookup, + ConnectionFinishedHandler::default(), + Some(stop_signal), + )?)) } pub fn periodic_operation(&mut self) { let result = self - .server + .0 .handle_next_connection(Some(Duration::from_millis(STOP_CHECK_FREQUENCY))); match result { Ok(_conn_result) => (), diff --git a/src/interface/tcp_spp_client.rs b/src/interface/tcp_spp_client.rs index 3108f9a..39ad72d 100644 --- a/src/interface/tcp_spp_client.rs +++ b/src/interface/tcp_spp_client.rs @@ -1,23 +1,37 @@ use std::io::{self, Read}; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::mpsc; +use std::time::Duration; use mio::net::TcpStream; use mio::{Events, Interest, Poll, Token}; -use ops_sat_rs::config::TCP_SPP_SERVER_PORT; -use satrs::spacepackets::ecss::CCSDS_HEADER_LEN; -use satrs::spacepackets::{CcsdsPacket, SpHeader}; +use ops_sat_rs::config::tasks::STOP_CHECK_FREQUENCY; +use ops_sat_rs::config::{EXPERIMENT_PACKET_ID, TCP_SPP_SERVER_PORT}; +use satrs::encoding::ccsds::parse_buffer_for_ccsds_space_packets; +use satrs::pus::PacketAsVec; +use satrs::queue::GenericSendError; +use satrs::ComponentId; +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum PacketForwardingError { + #[error("send error: {0}")] + Send(#[from] GenericSendError), + #[error("io error: {0}")] + Io(#[from] io::Error), +} pub struct TcpSppClient { + id: ComponentId, poll: Poll, events: Events, client: TcpStream, read_buf: [u8; 4096], - tc_source_tx: mpsc::Sender>, + tc_source_tx: mpsc::Sender, } impl TcpSppClient { - pub fn new() -> io::Result { + pub fn new(id: ComponentId, tc_source_tx: mpsc::Sender) -> io::Result { let poll = Poll::new()?; let events = Events::with_capacity(128); let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 1)), TCP_SPP_SERVER_PORT); @@ -28,15 +42,20 @@ impl TcpSppClient { Interest::READABLE | Interest::WRITABLE, )?; Ok(Self { + id, poll, events, client, read_buf: [0; 4096], + tc_source_tx, }) } - pub fn periodic_operation(&mut self) -> io::Result<()> { - self.poll.poll(&mut self.events, None)?; + pub fn periodic_operation(&mut self) -> Result<(), PacketForwardingError> { + self.poll.poll( + &mut self.events, + Some(Duration::from_millis(STOP_CHECK_FREQUENCY)), + )?; let events: Vec = self.events.iter().cloned().collect(); for event in events { if event.token() == Token(0) { @@ -52,32 +71,28 @@ impl TcpSppClient { Ok(()) } - pub fn read_from_server(&mut self) -> io::Result<()> { + pub fn read_from_server(&mut self) -> Result<(), PacketForwardingError> { match self.client.read(&mut self.read_buf) { - Ok(0) => return Err(io::Error::from(io::ErrorKind::BrokenPipe)), - Ok(n) => { - if n < CCSDS_HEADER_LEN + 1 { - log::warn!("received packet smaller than minimum expected packet size."); - log::debug!("{:?}", &self.read_buf[..n]); - return Ok(()); - } - // We already checked that the received size has the minimum expected size. - let (sp_header, data) = - SpHeader::from_be_bytes(&self.read_buf[..n]).expect("parsing SP header failed"); - let full_expected_packet_len = sp_header.total_len(); - // We received an incomplete frame? - if n < full_expected_packet_len { - log::warn!( - "received incomplete SPP, with detected packet length {} but read buffer length {}", - full_expected_packet_len, n - ); - return Ok(()); - } - self.tc_source_tx - .send(self.read_buf[0..full_expected_packet_len].to_vec()); - } - Err(e) => return Err(e), + Ok(0) => return Err(io::Error::from(io::ErrorKind::BrokenPipe).into()), + Ok(read_bytes) => self.handle_read_bytstream(read_bytes)?, + Err(e) => return Err(e.into()), } Ok(()) } + + pub fn handle_read_bytstream( + &mut self, + read_bytes: usize, + ) -> Result<(), PacketForwardingError> { + let mut dummy = 0; + // This parser is able to deal with broken tail packets, but we ignore those for now.. + parse_buffer_for_ccsds_space_packets( + &mut self.read_buf[..read_bytes], + &[EXPERIMENT_PACKET_ID].as_slice(), + self.id, + &mut self.tc_source_tx, + &mut dummy, + )?; + Ok(()) + } } diff --git a/src/interface/udp_server.rs b/src/interface/udp_server.rs index 65193a9..d10292d 100644 --- a/src/interface/udp_server.rs +++ b/src/interface/udp_server.rs @@ -2,18 +2,18 @@ use std::net::{SocketAddr, UdpSocket}; use std::sync::mpsc; use log::{info, warn}; -use satrs::pus::PusTmAsVec; -use satrs::{ - hal::std::udp_server::{ReceiveResult, UdpTcServer}, - tmtc::CcsdsError, -}; +use satrs::hal::std::udp_server::{ReceiveResult, UdpTcServer}; +use satrs::pus::PacketAsVec; +use satrs::queue::GenericSendError; + +use crate::pus::HandlingStatus; pub trait UdpTmHandler { fn send_tm_to_udp_client(&mut self, socket: &UdpSocket, recv_addr: &SocketAddr); } pub struct DynamicUdpTmHandler { - pub tm_rx: mpsc::Receiver, + pub tm_rx: mpsc::Receiver, } impl UdpTmHandler for DynamicUdpTmHandler { @@ -34,42 +34,39 @@ impl UdpTmHandler for DynamicUdpTmHandler { } } -pub struct UdpTmtcServer { - pub udp_tc_server: UdpTcServer>, +pub struct UdpTmtcServer { + pub udp_tc_server: UdpTcServer, GenericSendError>, pub tm_handler: TmHandler, } -impl - UdpTmtcServer -{ +impl UdpTmtcServer { pub fn periodic_operation(&mut self) { - while self.poll_tc_server() {} + loop { + if self.poll_tc_server() == HandlingStatus::Empty { + break; + } + } if let Some(recv_addr) = self.udp_tc_server.last_sender() { self.tm_handler .send_tm_to_udp_client(&self.udp_tc_server.socket, &recv_addr); } } - fn poll_tc_server(&mut self) -> bool { + fn poll_tc_server(&mut self) -> HandlingStatus { match self.udp_tc_server.try_recv_tc() { - Ok(_) => true, - Err(e) => match e { - ReceiveResult::ReceiverError(e) => match e { - CcsdsError::ByteConversionError(e) => { - warn!("packet error: {e:?}"); - true + Ok(_) => HandlingStatus::HandledOne, + Err(e) => { + match e { + ReceiveResult::NothingReceived => (), + ReceiveResult::Io(io_error) => { + warn!("Error receiving TC from UDP server: {io_error}"); } - CcsdsError::CustomError(e) => { - warn!("mpsc custom error {e:?}"); - true + ReceiveResult::Send(send_error) => { + warn!("error sending TM to UDP client: {send_error}"); } - }, - ReceiveResult::IoError(e) => { - warn!("IO error {e}"); - false } - ReceiveResult::NothingReceived => false, - }, + HandlingStatus::Empty + } } } } @@ -79,29 +76,35 @@ mod tests { use std::{ collections::VecDeque, net::IpAddr, - sync::{Arc, Mutex}, + sync::{mpsc::TryRecvError, Arc, Mutex}, }; - use ops_sat_rs::config::{components, OBSW_SERVER_ADDR}; + use ops_sat_rs::config::{EXPERIMENT_APID, OBSW_SERVER_ADDR}; use satrs::{ spacepackets::{ ecss::{tc::PusTcCreator, WritablePusPacket}, SpHeader, }, - tmtc::ReceivesTcCore, + tmtc::PacketSenderRaw, + ComponentId, }; use super::*; + const UDP_SERVER_ID: ComponentId = 0x05; + #[derive(Default, Debug, Clone)] pub struct TestReceiver { - tc_vec: Arc>>>, + tc_vec: Arc>>, } - impl ReceivesTcCore for TestReceiver { - type Error = CcsdsError<()>; - fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> { - self.tc_vec.lock().unwrap().push_back(tc_raw.to_vec()); + impl PacketSenderRaw for TestReceiver { + type Error = (); + fn send_packet(&self, sender_id: ComponentId, packet: &[u8]) -> Result<(), Self::Error> { + self.tc_vec + .lock() + .unwrap() + .push_back(PacketAsVec::new(sender_id, packet.to_vec())); Ok(()) } } @@ -120,9 +123,8 @@ mod tests { #[test] fn test_basic() { let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), 0); - let test_receiver = TestReceiver::default(); - let tc_queue = test_receiver.tc_vec.clone(); - let udp_tc_server = UdpTcServer::new(sock_addr, 2048, Box::new(test_receiver)).unwrap(); + let (tx, rx) = mpsc::channel(); + let udp_tc_server = UdpTcServer::new(UDP_SERVER_ID, sock_addr, 2048, tx).unwrap(); let tm_handler = TestTmHandler::default(); let tm_handler_calls = tm_handler.addrs_to_send_to.clone(); let mut udp_dyn_server = UdpTmtcServer { @@ -130,16 +132,14 @@ mod tests { tm_handler, }; udp_dyn_server.periodic_operation(); - assert!(tc_queue.lock().unwrap().is_empty()); - assert!(tm_handler_calls.lock().unwrap().is_empty()); + matches!(rx.try_recv(), Err(TryRecvError::Empty)); } #[test] fn test_transactions() { let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), 0); - let test_receiver = TestReceiver::default(); - let tc_queue = test_receiver.tc_vec.clone(); - let udp_tc_server = UdpTcServer::new(sock_addr, 2048, Box::new(test_receiver)).unwrap(); + let (tx, rx) = mpsc::channel(); + let udp_tc_server = UdpTcServer::new(UDP_SERVER_ID, sock_addr, 2048, tx).unwrap(); let server_addr = udp_tc_server.socket.local_addr().unwrap(); let tm_handler = TestTmHandler::default(); let tm_handler_calls = tm_handler.addrs_to_send_to.clone(); @@ -147,7 +147,7 @@ mod tests { udp_tc_server, tm_handler, }; - let sph = SpHeader::new_for_unseg_tc(components::Apid::GenericPus as u16, 0, 0); + let sph = SpHeader::new_for_unseg_tc(EXPERIMENT_APID, 0, 0); let ping_tc = PusTcCreator::new_simple(sph, 17, 1, &[], true) .to_vec() .unwrap(); @@ -157,10 +157,9 @@ mod tests { client.send(&ping_tc).unwrap(); udp_dyn_server.periodic_operation(); { - let mut tc_queue = tc_queue.lock().unwrap(); - assert!(!tc_queue.is_empty()); - let received_tc = tc_queue.pop_front().unwrap(); - assert_eq!(received_tc, ping_tc); + let packet_with_sender = rx.try_recv().unwrap(); + assert_eq!(packet_with_sender.packet, ping_tc); + matches!(rx.try_recv(), Err(TryRecvError::Empty)); } { @@ -171,7 +170,7 @@ mod tests { assert_eq!(received_addr, client_addr); } udp_dyn_server.periodic_operation(); - assert!(tc_queue.lock().unwrap().is_empty()); + matches!(rx.try_recv(), Err(TryRecvError::Empty)); // Still tries to send to the same client. { let mut tm_handler_calls = tm_handler_calls.lock().unwrap(); diff --git a/src/main.rs b/src/main.rs index a03cba6..67543c2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,7 +7,7 @@ use std::{ use log::info; use ops_sat_rs::config::{ - components::CONTROLLER_ID, + components::{CONTROLLER_ID, TCP_SERVER, UDP_SERVER}, tasks::{FREQ_MS_CTRL, FREQ_MS_PUS_STACK}, EXPERIMENT_APID, }; @@ -15,19 +15,16 @@ use ops_sat_rs::config::{tasks::FREQ_MS_UDP_TMTC, OBSW_SERVER_ADDR, SERVER_PORT} use satrs::{ hal::std::{tcp_server::ServerConfig, udp_server::UdpTcServer}, spacepackets::PacketId, - tmtc::CcsdsDistributor, }; -use crate::pus::{PusReceiver, PusTcMpscRouter}; -use crate::tmtc::tm_funnel::TmFunnelDynamic; -use crate::tmtc::TcSourceTaskDynamic; +use crate::pus::{PusTcDistributor, PusTcMpscRouter}; +use crate::tmtc::tc_source::TcSourceTaskDynamic; +use crate::tmtc::tm_sink::TmFunnelDynamic; use crate::{controller::ExperimentController, pus::test::create_test_service}; use crate::{ interface::tcp_server::{SyncTcpTmSource, TcpTask}, interface::udp_server::{DynamicUdpTmHandler, UdpTmtcServer}, logger::setup_logger, - tmtc::ccsds::CcsdsReceiver, - tmtc::PusTcSourceProviderDynamic, }; use crate::{ pus::{action::create_action_service, stack::PusStack}, @@ -51,8 +48,6 @@ fn main() { let (tm_funnel_tx, tm_funnel_rx) = mpsc::channel(); let (tm_server_tx, tm_server_rx) = mpsc::channel(); - let tc_source = PusTcSourceProviderDynamic(tc_source_tx); - let (pus_test_tx, pus_test_rx) = mpsc::channel(); // let (pus_event_tx, pus_event_rx) = mpsc::channel(); // let (pus_sched_tx, pus_sched_rx) = mpsc::channel(); @@ -122,16 +117,13 @@ fn main() { // pus_mode_service, ); - let ccsds_receiver = CcsdsReceiver { tc_source }; - let mut tmtc_task = TcSourceTaskDynamic::new( tc_source_rx, - PusReceiver::new(tm_funnel_tx.clone(), pus_router), + PusTcDistributor::new(tm_funnel_tx.clone(), pus_router), ); let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT); - let udp_ccsds_distributor = CcsdsDistributor::new(ccsds_receiver.clone()); - let udp_tc_server = UdpTcServer::new(sock_addr, 2048, Box::new(udp_ccsds_distributor)) + let udp_tc_server = UdpTcServer::new(UDP_SERVER.id(), sock_addr, 2048, tc_source_tx.clone()) .expect("creating UDP TMTC server failed"); let mut udp_tmtc_server = UdpTmtcServer { udp_tc_server, @@ -140,13 +132,18 @@ fn main() { }, }; - let tcp_ccsds_distributor = CcsdsDistributor::new(ccsds_receiver); - let tcp_server_cfg = ServerConfig::new(sock_addr, Duration::from_millis(400), 4096, 8192); + let tcp_server_cfg = ServerConfig::new( + TCP_SERVER.id(), + sock_addr, + Duration::from_millis(400), + 4096, + 8192, + ); let sync_tm_tcp_source = SyncTcpTmSource::new(200); let mut tcp_server = TcpTask::new( tcp_server_cfg, sync_tm_tcp_source.clone(), - tcp_ccsds_distributor, + tc_source_tx.clone(), vec![PacketId::new_for_tc(true, EXPERIMENT_APID)], stop_signal.clone(), ) diff --git a/src/pus/action.rs b/src/pus/action.rs index 409f330..d4bf2d1 100644 --- a/src/pus/action.rs +++ b/src/pus/action.rs @@ -4,16 +4,16 @@ use ops_sat_rs::config::tmtc_err; use satrs::action::{ActionRequest, ActionRequestVariant}; use satrs::params::WritableToBeBytes; use satrs::pus::action::{ - ActionReplyVariant, ActivePusActionRequestStd, DefaultActiveActionRequestMap, PusActionReply, + ActionReplyPus, ActionReplyVariant, ActivePusActionRequestStd, DefaultActiveActionRequestMap, }; use satrs::pus::verification::{ FailParams, FailParamsWithStep, TcStateAccepted, TcStateStarted, VerificationReporter, VerificationReportingProvider, VerificationToken, }; use satrs::pus::{ - ActiveRequestProvider, EcssTcAndToken, EcssTcInVecConverter, EcssTmSenderCore, EcssTmtcError, - GenericConversionError, PusPacketHandlerResult, PusReplyHandler, PusServiceHelper, - PusTcToRequestConverter, PusTmAsVec, + ActiveRequestProvider, EcssTcAndToken, EcssTcInVecConverter, EcssTmSender, EcssTmtcError, + GenericConversionError, PacketAsVec, PusPacketHandlerResult, PusReplyHandler, PusServiceHelper, + PusTcToRequestConverter, }; use satrs::request::{GenericMessage, UniqueApidTargetId}; use satrs::spacepackets::ecss::tc::PusTcReader; @@ -40,13 +40,13 @@ impl Default for ActionReplyHandler { } } -impl PusReplyHandler for ActionReplyHandler { +impl PusReplyHandler for ActionReplyHandler { type Error = EcssTmtcError; fn handle_unrequested_reply( &mut self, - reply: &GenericMessage, - _tm_sender: &impl EcssTmSenderCore, + reply: &GenericMessage, + _tm_sender: &impl EcssTmSender, ) -> Result<(), Self::Error> { warn!("received unexpected reply for service 8: {reply:?}"); Ok(()) @@ -54,9 +54,9 @@ impl PusReplyHandler for ActionReplyH fn handle_reply( &mut self, - reply: &GenericMessage, + reply: &GenericMessage, active_request: &ActivePusActionRequestStd, - tm_sender: &(impl EcssTmSenderCore + ?Sized), + tm_sender: &(impl EcssTmSender + ?Sized), verification_handler: &impl VerificationReportingProvider, time_stamp: &[u8], ) -> Result { @@ -119,7 +119,7 @@ impl PusReplyHandler for ActionReplyH fn handle_request_timeout( &mut self, active_request: &ActivePusActionRequestStd, - tm_sender: &impl EcssTmSenderCore, + tm_sender: &impl EcssTmSender, verification_handler: &impl VerificationReportingProvider, time_stamp: &[u8], ) -> Result<(), Self::Error> { @@ -143,7 +143,7 @@ impl PusTcToRequestConverter for Actio &mut self, token: VerificationToken, tc: &PusTcReader, - tm_sender: &(impl EcssTmSenderCore + ?Sized), + tm_sender: &(impl EcssTmSender + ?Sized), verif_reporter: &impl VerificationReportingProvider, time_stamp: &[u8], ) -> Result<(ActivePusActionRequestStd, ActionRequest), Self::Error> { @@ -193,10 +193,10 @@ impl PusTcToRequestConverter for Actio } pub fn create_action_service( - tm_funnel_tx: mpsc::Sender, + tm_funnel_tx: mpsc::Sender, pus_action_rx: mpsc::Receiver, action_router: GenericRequestRouter, - reply_receiver: mpsc::Receiver>, + reply_receiver: mpsc::Receiver>, ) -> ActionServiceWrapper { let action_request_handler = PusTargetedRequestService::new( PusServiceHelper::new( @@ -225,7 +225,7 @@ pub struct ActionServiceWrapper { DefaultActiveActionRequestMap, ActivePusActionRequestStd, ActionRequest, - PusActionReply, + ActionReplyPus, >, } @@ -304,7 +304,7 @@ mod tests { DefaultActiveActionRequestMap, ActivePusActionRequestStd, ActionRequest, - PusActionReply, + ActionReplyPus, > { pub fn new_for_action(owner_id: ComponentId, target_id: ComponentId) -> Self { @@ -460,7 +460,7 @@ mod tests { if let CompositeRequest::Action(action_req) = req.message { assert_eq!(action_req.action_id, action_id); assert_eq!(action_req.variant, ActionRequestVariant::NoData); - let action_reply = PusActionReply::new(action_id, ActionReplyVariant::Completed); + let action_reply = ActionReplyPus::new(action_id, ActionReplyVariant::Completed); testbench .reply_tx .send(GenericMessage::new(req.requestor_info, action_reply)) @@ -580,7 +580,7 @@ mod tests { let (req_id, active_req) = testbench.add_tc(TEST_APID, TEST_UNIQUE_ID_0, &[]); let active_action_req = ActivePusActionRequestStd::new_from_common_req(action_id, active_req); - let reply = PusActionReply::new(action_id, ActionReplyVariant::Completed); + let reply = ActionReplyPus::new(action_id, ActionReplyVariant::Completed); let generic_reply = GenericMessage::new(MessageMetadata::new(req_id.into(), 0), reply); let result = testbench.handle_reply(&generic_reply, &active_action_req, &[]); assert!(result.is_ok()); @@ -601,7 +601,7 @@ mod tests { let active_action_req = ActivePusActionRequestStd::new_from_common_req(action_id, active_req); let error_code = ResultU16::new(2, 3); - let reply = PusActionReply::new( + let reply = ActionReplyPus::new( action_id, ActionReplyVariant::CompletionFailed { error_code, @@ -628,7 +628,7 @@ mod tests { let (req_id, active_req) = testbench.add_tc(TEST_APID, TEST_UNIQUE_ID_0, &[]); let active_action_req = ActivePusActionRequestStd::new_from_common_req(action_id, active_req); - let reply = PusActionReply::new(action_id, ActionReplyVariant::StepSuccess { step: 1 }); + let reply = ActionReplyPus::new(action_id, ActionReplyVariant::StepSuccess { step: 1 }); let generic_reply = GenericMessage::new(MessageMetadata::new(req_id.into(), 0), reply); let result = testbench.handle_reply(&generic_reply, &active_action_req, &[]); assert!(result.is_ok()); @@ -655,7 +655,7 @@ mod tests { let active_action_req = ActivePusActionRequestStd::new_from_common_req(action_id, active_req); let error_code = ResultU16::new(2, 3); - let reply = PusActionReply::new( + let reply = ActionReplyPus::new( action_id, ActionReplyVariant::StepFailed { error_code, @@ -685,7 +685,7 @@ mod tests { fn reply_handling_unrequested_reply() { let mut testbench = ReplyHandlerTestbench::new(TEST_COMPONENT_ID_0.id(), ActionReplyHandler::default()); - let action_reply = PusActionReply::new(5_u32, ActionReplyVariant::Completed); + let action_reply = ActionReplyPus::new(5_u32, ActionReplyVariant::Completed); let unrequested_reply = GenericMessage::new(MessageMetadata::new(10_u32, 15_u64), action_reply); // Right now this function does not do a lot. We simply check that it does not panic or do diff --git a/src/pus/mod.rs b/src/pus/mod.rs index 3731892..903c677 100644 --- a/src/pus/mod.rs +++ b/src/pus/mod.rs @@ -3,7 +3,6 @@ pub mod stack; pub mod test; use crate::requests::GenericRequestRouter; -use crate::tmtc::MpscStoreAndSendError; use log::warn; use ops_sat_rs::config::components::PUS_ROUTING_SERVICE; use ops_sat_rs::config::{tmtc_err, CustomPusServiceId}; @@ -14,15 +13,14 @@ use satrs::pus::verification::{ }; use satrs::pus::{ ActiveRequestMapProvider, ActiveRequestProvider, EcssTcAndToken, EcssTcInMemConverter, - EcssTcInVecConverter, EcssTmSenderCore, EcssTmtcError, GenericConversionError, - GenericRoutingError, MpscTcReceiver, MpscTmAsVecSender, PusPacketHandlerResult, - PusPacketHandlingError, PusReplyHandler, PusRequestRouter, PusServiceHelper, - PusTcToRequestConverter, TcInMemory, + EcssTcInVecConverter, EcssTmSender, EcssTmtcError, GenericConversionError, GenericRoutingError, + MpscTcReceiver, MpscTmAsVecSender, PusPacketHandlerResult, PusPacketHandlingError, + PusReplyHandler, PusRequestRouter, PusServiceHelper, PusTcToRequestConverter, }; -use satrs::queue::GenericReceiveError; +use satrs::queue::{GenericReceiveError, GenericSendError}; use satrs::request::{Apid, GenericMessage, MessageMetadata}; use satrs::spacepackets::ecss::tc::PusTcReader; -use satrs::spacepackets::ecss::PusServiceId; +use satrs::spacepackets::ecss::{PusPacket, PusServiceId}; use satrs::ComponentId; use std::fmt::Debug; use std::sync::mpsc::{self, Sender}; @@ -59,7 +57,7 @@ pub struct PusTcMpscRouter { // pub mode_tc_sender: Sender, } -pub struct PusTcDistributor { +pub struct PusTcDistributor { pub id: ComponentId, pub tm_sender: TmSender, pub verif_reporter: VerificationReporter, @@ -67,7 +65,7 @@ pub struct PusTcDistributor { stamp_helper: TimeStampHelper, } -impl PusTcDistributor { +impl PusTcDistributor { pub fn new(tm_sender: TmSender, pus_router: PusTcMpscRouter) -> Self { Self { id: PUS_ROUTING_SERVICE.raw(), @@ -83,25 +81,35 @@ impl PusTcDistributor { pub fn handle_tc_packet( &mut self, - tc_in_memory: TcInMemory, - service: u8, - pus_tc: &PusTcReader, - ) -> Result { - let init_token = self.verif_reporter.add_tc(pus_tc); + sender_id: ComponentId, + tc: Vec, + ) -> Result { + let pus_tc_result = PusTcReader::new(&tc); + if pus_tc_result.is_err() { + log::warn!( + "error creating PUS TC received from {}: {}", + sender_id, + pus_tc_result.unwrap_err() + ); + log::warn!("raw data: {:x?}", tc); + return Ok(PusPacketHandlerResult::RequestHandled); + } + let pus_tc = pus_tc_result.unwrap().0; + let init_token = self.verif_reporter.add_tc(&pus_tc); self.stamp_helper.update_from_now(); let accepted_token = self .verif_reporter .acceptance_success(&self.tm_sender, init_token, self.stamp_helper.stamp()) .expect("Acceptance success failure"); - let service = PusServiceId::try_from(service); + let service = PusServiceId::try_from(pus_tc.service()); match service { Ok(standard_service) => match standard_service { PusServiceId::Test => self.pus_router.test_tc_sender.send(EcssTcAndToken { - tc_in_memory, + tc_in_memory: tc.into(), token: Some(accepted_token.into()), })?, PusServiceId::Action => self.pus_router.action_tc_sender.send(EcssTcAndToken { - tc_in_memory, + tc_in_memory: tc.into(), token: Some(accepted_token.into()), })?, // PusServiceId::Event => self.pus_router.event_tc_sender.send(EcssTcAndToken { @@ -436,7 +444,7 @@ where /// and also log the error. #[allow(dead_code)] pub fn generic_pus_request_timeout_handler( - sender: &(impl EcssTmSenderCore + ?Sized), + sender: &(impl EcssTmSender + ?Sized), active_request: &(impl ActiveRequestProvider + Debug), verification_handler: &impl VerificationReportingProvider, time_stamp: &[u8], @@ -460,7 +468,7 @@ pub(crate) mod tests { use std::time::Duration; use satrs::pus::test_util::TEST_COMPONENT_ID_0; - use satrs::pus::{MpscTmAsVecSender, PusTmAsVec, PusTmVariant}; + use satrs::pus::{MpscTmAsVecSender, PacketAsVec, PusTmVariant}; use satrs::request::RequestId; use satrs::{ pus::{ @@ -490,7 +498,7 @@ pub(crate) mod tests { pub id: ComponentId, pub verif_reporter: TestVerificationReporter, pub reply_handler: ReplyHandler, - pub tm_receiver: mpsc::Receiver, + pub tm_receiver: mpsc::Receiver, pub default_timeout: Duration, tm_sender: MpscTmAsVecSender, phantom: std::marker::PhantomData<(ActiveRequestInfo, Reply)>, @@ -590,7 +598,7 @@ pub(crate) mod tests { /// Dummy sender component which does nothing on the [Self::send_tm] call. /// /// Useful for unit tests. - impl EcssTmSenderCore for DummySender { + impl EcssTmSender for DummySender { fn send_tm(&self, _source_id: ComponentId, _tm: PusTmVariant) -> Result<(), EcssTmtcError> { Ok(()) } @@ -694,7 +702,7 @@ pub(crate) mod tests { ReplyType, >, pub request_id: Option, - pub tm_funnel_rx: mpsc::Receiver, + pub tm_funnel_rx: mpsc::Receiver, pub pus_packet_tx: mpsc::Sender, pub reply_tx: mpsc::Sender>, pub request_rx: mpsc::Receiver>, diff --git a/src/pus/test.rs b/src/pus/test.rs index 3f46fdd..bf3596d 100644 --- a/src/pus/test.rs +++ b/src/pus/test.rs @@ -7,7 +7,7 @@ use satrs::pus::test::PusService17TestHandler; use satrs::pus::verification::{FailParams, VerificationReporter, VerificationReportingProvider}; use satrs::pus::{ EcssTcAndToken, EcssTcInMemConverter, EcssTcInVecConverter, MpscTcReceiver, MpscTmAsVecSender, - PusPacketHandlerResult, PusServiceHelper, PusTmAsVec, + PacketAsVec, PusPacketHandlerResult, PusServiceHelper, }; use satrs::spacepackets::ecss::tc::PusTcReader; use satrs::spacepackets::ecss::PusPacket; @@ -18,8 +18,7 @@ use std::sync::mpsc; use super::HandlingStatus; pub fn create_test_service( - tm_funnel_tx: mpsc::Sender, - // event_sender: mpsc::Sender, + tm_funnel_tx: mpsc::Sender, pus_test_rx: mpsc::Receiver, ) -> TestCustomServiceWrapper { let pus17_handler = PusService17TestHandler::new(PusServiceHelper::new( diff --git a/src/requests.rs b/src/requests.rs index 90ed366..67d40fc 100644 --- a/src/requests.rs +++ b/src/requests.rs @@ -10,7 +10,7 @@ use satrs::mode::ModeRequest; use satrs::pus::verification::{ FailParams, TcStateAccepted, VerificationReportingProvider, VerificationToken, }; -use satrs::pus::{ActiveRequestProvider, EcssTmSenderCore, GenericRoutingError, PusRequestRouter}; +use satrs::pus::{ActiveRequestProvider, EcssTmSender, GenericRoutingError, PusRequestRouter}; use satrs::queue::GenericSendError; use satrs::request::{GenericMessage, MessageMetadata, UniqueApidTargetId}; use satrs::spacepackets::ecss::tc::PusTcReader; @@ -49,7 +49,7 @@ impl GenericRequestRouter { active_request: &impl ActiveRequestProvider, tc: &PusTcReader, error: GenericRoutingError, - tm_sender: &(impl EcssTmSenderCore + ?Sized), + tm_sender: &(impl EcssTmSender + ?Sized), verif_reporter: &impl VerificationReportingProvider, time_stamp: &[u8], ) { diff --git a/src/tmtc/tc_source.rs b/src/tmtc/tc_source.rs index 0371113..d75004f 100644 --- a/src/tmtc/tc_source.rs +++ b/src/tmtc/tc_source.rs @@ -1,33 +1,23 @@ -use satrs::{pool::PoolProvider, tmtc::tc_helper::SharedTcPool}; use std::sync::mpsc::{self, TryRecvError}; -use satrs::{ - pool::StoreAddr, - pus::{MpscTmAsVecSender, MpscTmInSharedPoolSenderBounded}, - spacepackets::ecss::{tc::PusTcReader, PusPacket}, -}; +use satrs::pus::{MpscTmAsVecSender, PacketAsVec}; use crate::pus::PusTcDistributor; -// TC source components where static pools are the backing memory of the received telecommands. -pub struct TcSourceTaskStatic { - shared_tc_pool: SharedTcPool, - tc_receiver: mpsc::Receiver, - tc_buf: [u8; 4096], - pus_receiver: PusTcDistributor, +// TC source components where the heap is the backing memory of the received telecommands. +pub struct TcSourceTaskDynamic { + pub tc_receiver: mpsc::Receiver, + pus_distrib: PusTcDistributor, } -impl TcSourceTaskStatic { +impl TcSourceTaskDynamic { pub fn new( - shared_tc_pool: SharedTcPool, - tc_receiver: mpsc::Receiver, - pus_receiver: PusTcDistributor, + tc_receiver: mpsc::Receiver, + pus_receiver: PusTcDistributor, ) -> Self { Self { - shared_tc_pool, tc_receiver, - tc_buf: [0; 4096], - pus_receiver, + pus_distrib: pus_receiver, } } @@ -36,33 +26,14 @@ impl TcSourceTaskStatic { } pub fn poll_tc(&mut self) -> bool { + // Right now, we only expect PUS packets. If any other protocols like CFDP are added at + // a later stage, we probably need to check for the APID before routing the packet. match self.tc_receiver.try_recv() { - Ok(addr) => { - let pool = self - .shared_tc_pool - .0 - .read() - .expect("locking tc pool failed"); - pool.read(&addr, &mut self.tc_buf) - .expect("reading pool failed"); - drop(pool); - match PusTcReader::new(&self.tc_buf) { - Ok((pus_tc, _)) => { - self.pus_receiver - .handle_tc_packet( - satrs::pus::TcInMemory::StoreAddr(addr), - pus_tc.service(), - &pus_tc, - ) - .ok(); - true - } - Err(e) => { - log::warn!("error creating PUS TC from raw data: {e}"); - log::warn!("raw data: {:x?}", self.tc_buf); - true - } - } + Ok(packet_with_sender) => { + self.pus_distrib + .handle_tc_packet(packet_with_sender.sender_id, packet_with_sender.packet) + .ok(); + true } Err(e) => match e { TryRecvError::Empty => false, @@ -74,55 +45,3 @@ impl TcSourceTaskStatic { } } } - -// TC source components where the heap is the backing memory of the received telecommands. -pub struct TcSourceTaskDynamic { - pub tc_receiver: mpsc::Receiver>, - pus_receiver: PusTcDistributor, -} - -impl TcSourceTaskDynamic { - pub fn new( - tc_receiver: mpsc::Receiver>, - pus_receiver: PusTcDistributor, - ) -> Self { - Self { - tc_receiver, - pus_receiver, - } - } - - pub fn periodic_operation(&mut self) { - self.poll_tc(); - } - - pub fn poll_tc(&mut self) -> bool { - // Right now, we only expect PUS packets. - match self.tc_receiver.try_recv() { - Ok(tc) => match PusTcReader::new(&tc) { - Ok((pus_tc, _)) => { - self.pus_receiver - .handle_tc_packet( - satrs::pus::TcInMemory::Vec(tc.clone()), - pus_tc.service(), - &pus_tc, - ) - .ok(); - true - } - Err(e) => { - log::warn!("error creating PUS TC from raw data: {e}"); - log::warn!("raw data: {:x?}", tc); - true - } - }, - Err(e) => match e { - TryRecvError::Empty => false, - TryRecvError::Disconnected => { - log::warn!("tmtc thread: sender disconnected"); - false - } - }, - } - } -} diff --git a/src/tmtc/tm_sink.rs b/src/tmtc/tm_sink.rs index 7e4fe2e..5e8b236 100644 --- a/src/tmtc/tm_sink.rs +++ b/src/tmtc/tm_sink.rs @@ -4,7 +4,7 @@ use std::{collections::HashMap, sync::mpsc, time::Duration}; use log::info; use ops_sat_rs::config::tasks::STOP_CHECK_FREQUENCY; -use satrs::pus::PusTmAsVec; +use satrs::pus::PacketAsVec; use satrs::{ seq_count::{CcsdsSimpleSeqCountProvider, SequenceCountProviderCore}, spacepackets::{ @@ -76,16 +76,16 @@ impl TmFunnelCommon { pub struct TmFunnelDynamic { common: TmFunnelCommon, - tm_funnel_rx: mpsc::Receiver, - tm_server_tx: mpsc::Sender, + tm_funnel_rx: mpsc::Receiver, + tm_server_tx: mpsc::Sender, stop_signal: Arc, } impl TmFunnelDynamic { pub fn new( sync_tm_tcp_source: SyncTcpTmSource, - tm_funnel_rx: mpsc::Receiver, - tm_server_tx: mpsc::Sender, + tm_funnel_rx: mpsc::Receiver, + tm_server_tx: mpsc::Sender, stop_signal: Arc, ) -> Self { Self { From 8313a0b26c28dbf003fca4a9c5f5804e164bd398 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Mon, 15 Apr 2024 14:13:10 +0200 Subject: [PATCH 3/7] another small update --- Cargo.lock | 12 ++++++------ src/interface/tcp_server.rs | 3 +-- src/interface/tcp_spp_client.rs | 2 +- src/interface/udp_server.rs | 2 +- src/pus/action.rs | 3 ++- src/pus/mod.rs | 3 ++- src/pus/test.rs | 3 ++- src/tmtc/tc_source.rs | 12 ++++++------ src/tmtc/tm_sink.rs | 2 +- 9 files changed, 22 insertions(+), 20 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9b62799..2bec391 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -150,9 +150,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chrono" -version = "0.4.37" +version = "0.4.38" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a0d04d43504c61aa6c7531f1871dd0d418d91130162063b789da00fd7057a5e" +checksum = "a21f936df1771bf62b77f047b726c4625ff2e8aa607c01ec06e5a05bd8463401" dependencies = [ "android-tzdata", "iana-time-zone", @@ -590,7 +590,7 @@ checksum = "e86697c916019a8588c99b5fac3cead74ec0b4b819707a682fd4d23fa0ce1ba1" [[package]] name = "satrs" version = "0.2.0-rc.0" -source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=rework-tmtc-modules#301a7a19a62a4f4541e8c7299cd67ab347c44352" +source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=rework-tmtc-modules#794094ae9fc948cc1575c48d2d57cfe0deb8c799" dependencies = [ "bus", "cobs", @@ -615,7 +615,7 @@ dependencies = [ [[package]] name = "satrs-mib" version = "0.1.1" -source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=rework-tmtc-modules#301a7a19a62a4f4541e8c7299cd67ab347c44352" +source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=rework-tmtc-modules#794094ae9fc948cc1575c48d2d57cfe0deb8c799" dependencies = [ "csv", "satrs-mib-codegen", @@ -627,7 +627,7 @@ dependencies = [ [[package]] name = "satrs-mib-codegen" version = "0.1.1" -source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=rework-tmtc-modules#301a7a19a62a4f4541e8c7299cd67ab347c44352" +source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=rework-tmtc-modules#794094ae9fc948cc1575c48d2d57cfe0deb8c799" dependencies = [ "proc-macro2", "quote", @@ -637,7 +637,7 @@ dependencies = [ [[package]] name = "satrs-shared" version = "0.1.3" -source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=rework-tmtc-modules#301a7a19a62a4f4541e8c7299cd67ab347c44352" +source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=rework-tmtc-modules#794094ae9fc948cc1575c48d2d57cfe0deb8c799" dependencies = [ "serde", "spacepackets", diff --git a/src/interface/tcp_server.rs b/src/interface/tcp_server.rs index 52df8ea..42b7b40 100644 --- a/src/interface/tcp_server.rs +++ b/src/interface/tcp_server.rs @@ -8,10 +8,9 @@ use log::{info, warn}; use ops_sat_rs::config::tasks::STOP_CHECK_FREQUENCY; use satrs::{ hal::std::tcp_server::{HandledConnectionHandler, ServerConfig, TcpSpacepacketsServer}, - pus::PacketAsVec, queue::GenericSendError, spacepackets::PacketId, - tmtc::PacketSource, + tmtc::{PacketAsVec, PacketSource}, }; #[derive(Default, Clone)] diff --git a/src/interface/tcp_spp_client.rs b/src/interface/tcp_spp_client.rs index 39ad72d..d8d6936 100644 --- a/src/interface/tcp_spp_client.rs +++ b/src/interface/tcp_spp_client.rs @@ -8,8 +8,8 @@ use mio::{Events, Interest, Poll, Token}; use ops_sat_rs::config::tasks::STOP_CHECK_FREQUENCY; use ops_sat_rs::config::{EXPERIMENT_PACKET_ID, TCP_SPP_SERVER_PORT}; use satrs::encoding::ccsds::parse_buffer_for_ccsds_space_packets; -use satrs::pus::PacketAsVec; use satrs::queue::GenericSendError; +use satrs::tmtc::PacketAsVec; use satrs::ComponentId; use thiserror::Error; diff --git a/src/interface/udp_server.rs b/src/interface/udp_server.rs index d10292d..9a6cfb8 100644 --- a/src/interface/udp_server.rs +++ b/src/interface/udp_server.rs @@ -3,8 +3,8 @@ use std::sync::mpsc; use log::{info, warn}; use satrs::hal::std::udp_server::{ReceiveResult, UdpTcServer}; -use satrs::pus::PacketAsVec; use satrs::queue::GenericSendError; +use satrs::tmtc::PacketAsVec; use crate::pus::HandlingStatus; diff --git a/src/pus/action.rs b/src/pus/action.rs index d4bf2d1..cd2472f 100644 --- a/src/pus/action.rs +++ b/src/pus/action.rs @@ -12,12 +12,13 @@ use satrs::pus::verification::{ }; use satrs::pus::{ ActiveRequestProvider, EcssTcAndToken, EcssTcInVecConverter, EcssTmSender, EcssTmtcError, - GenericConversionError, PacketAsVec, PusPacketHandlerResult, PusReplyHandler, PusServiceHelper, + GenericConversionError, PusPacketHandlerResult, PusReplyHandler, PusServiceHelper, PusTcToRequestConverter, }; use satrs::request::{GenericMessage, UniqueApidTargetId}; use satrs::spacepackets::ecss::tc::PusTcReader; use satrs::spacepackets::ecss::{EcssEnumU16, PusPacket}; +use satrs::tmtc::PacketAsVec; use std::sync::mpsc; use std::time::Duration; diff --git a/src/pus/mod.rs b/src/pus/mod.rs index 903c677..bc323b0 100644 --- a/src/pus/mod.rs +++ b/src/pus/mod.rs @@ -468,8 +468,9 @@ pub(crate) mod tests { use std::time::Duration; use satrs::pus::test_util::TEST_COMPONENT_ID_0; - use satrs::pus::{MpscTmAsVecSender, PacketAsVec, PusTmVariant}; + use satrs::pus::{MpscTmAsVecSender, PusTmVariant}; use satrs::request::RequestId; + use satrs::tmtc::PacketAsVec; use satrs::{ pus::{ verification::test_util::TestVerificationReporter, ActivePusRequestStd, diff --git a/src/pus/test.rs b/src/pus/test.rs index bf3596d..8428be7 100644 --- a/src/pus/test.rs +++ b/src/pus/test.rs @@ -7,12 +7,13 @@ use satrs::pus::test::PusService17TestHandler; use satrs::pus::verification::{FailParams, VerificationReporter, VerificationReportingProvider}; use satrs::pus::{ EcssTcAndToken, EcssTcInMemConverter, EcssTcInVecConverter, MpscTcReceiver, MpscTmAsVecSender, - PacketAsVec, PusPacketHandlerResult, PusServiceHelper, + PusPacketHandlerResult, PusServiceHelper, }; use satrs::spacepackets::ecss::tc::PusTcReader; use satrs::spacepackets::ecss::PusPacket; use satrs::spacepackets::time::cds::CdsTime; use satrs::spacepackets::time::TimeWriter; +use satrs::tmtc::PacketAsVec; use std::sync::mpsc; use super::HandlingStatus; diff --git a/src/tmtc/tc_source.rs b/src/tmtc/tc_source.rs index d75004f..02cf5d0 100644 --- a/src/tmtc/tc_source.rs +++ b/src/tmtc/tc_source.rs @@ -1,8 +1,8 @@ use std::sync::mpsc::{self, TryRecvError}; -use satrs::pus::{MpscTmAsVecSender, PacketAsVec}; +use satrs::{pus::MpscTmAsVecSender, tmtc::PacketAsVec}; -use crate::pus::PusTcDistributor; +use crate::pus::{HandlingStatus, PusTcDistributor}; // TC source components where the heap is the backing memory of the received telecommands. pub struct TcSourceTaskDynamic { @@ -25,7 +25,7 @@ impl TcSourceTaskDynamic { self.poll_tc(); } - pub fn poll_tc(&mut self) -> bool { + pub fn poll_tc(&mut self) -> HandlingStatus { // Right now, we only expect PUS packets. If any other protocols like CFDP are added at // a later stage, we probably need to check for the APID before routing the packet. match self.tc_receiver.try_recv() { @@ -33,13 +33,13 @@ impl TcSourceTaskDynamic { self.pus_distrib .handle_tc_packet(packet_with_sender.sender_id, packet_with_sender.packet) .ok(); - true + HandlingStatus::HandledOne } Err(e) => match e { - TryRecvError::Empty => false, + TryRecvError::Empty => HandlingStatus::Empty, TryRecvError::Disconnected => { log::warn!("tmtc thread: sender disconnected"); - false + HandlingStatus::Empty } }, } diff --git a/src/tmtc/tm_sink.rs b/src/tmtc/tm_sink.rs index 5e8b236..9dfac6f 100644 --- a/src/tmtc/tm_sink.rs +++ b/src/tmtc/tm_sink.rs @@ -4,7 +4,7 @@ use std::{collections::HashMap, sync::mpsc, time::Duration}; use log::info; use ops_sat_rs::config::tasks::STOP_CHECK_FREQUENCY; -use satrs::pus::PacketAsVec; +use satrs::tmtc::PacketAsVec; use satrs::{ seq_count::{CcsdsSimpleSeqCountProvider, SequenceCountProviderCore}, spacepackets::{ From 192e701785abcdffcf140b6adaffe9f7988b31f4 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Mon, 15 Apr 2024 16:42:48 +0200 Subject: [PATCH 4/7] MIO tcp client --- Cargo.lock | 8 +++--- src/config.rs | 2 ++ src/interface/tcp_spp_client.rs | 2 +- src/main.rs | 43 +++++++++++++++++++++++++++------ src/pus/mod.rs | 8 +++--- 5 files changed, 47 insertions(+), 16 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2bec391..c83fe31 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -590,7 +590,7 @@ checksum = "e86697c916019a8588c99b5fac3cead74ec0b4b819707a682fd4d23fa0ce1ba1" [[package]] name = "satrs" version = "0.2.0-rc.0" -source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=rework-tmtc-modules#794094ae9fc948cc1575c48d2d57cfe0deb8c799" +source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=rework-tmtc-modules#a077c32f3c977f79c9d165e8f3bff66f1d81a669" dependencies = [ "bus", "cobs", @@ -615,7 +615,7 @@ dependencies = [ [[package]] name = "satrs-mib" version = "0.1.1" -source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=rework-tmtc-modules#794094ae9fc948cc1575c48d2d57cfe0deb8c799" +source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=rework-tmtc-modules#a077c32f3c977f79c9d165e8f3bff66f1d81a669" dependencies = [ "csv", "satrs-mib-codegen", @@ -627,7 +627,7 @@ dependencies = [ [[package]] name = "satrs-mib-codegen" version = "0.1.1" -source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=rework-tmtc-modules#794094ae9fc948cc1575c48d2d57cfe0deb8c799" +source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=rework-tmtc-modules#a077c32f3c977f79c9d165e8f3bff66f1d81a669" dependencies = [ "proc-macro2", "quote", @@ -637,7 +637,7 @@ dependencies = [ [[package]] name = "satrs-shared" version = "0.1.3" -source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=rework-tmtc-modules#794094ae9fc948cc1575c48d2d57cfe0deb8c799" +source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=rework-tmtc-modules#a077c32f3c977f79c9d165e8f3bff66f1d81a669" dependencies = [ "serde", "spacepackets", diff --git a/src/config.rs b/src/config.rs index 12c334f..4295e75 100644 --- a/src/config.rs +++ b/src/config.rs @@ -129,6 +129,8 @@ pub mod components { UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::UdpServer as u32); pub const TCP_SERVER: UniqueApidTargetId = UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::TcpServer as u32); + pub const TCP_SPP_CLIENT: UniqueApidTargetId = + UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::TcpSppClient as u32); } pub mod tasks { diff --git a/src/interface/tcp_spp_client.rs b/src/interface/tcp_spp_client.rs index d8d6936..20c15ee 100644 --- a/src/interface/tcp_spp_client.rs +++ b/src/interface/tcp_spp_client.rs @@ -90,7 +90,7 @@ impl TcpSppClient { &mut self.read_buf[..read_bytes], &[EXPERIMENT_PACKET_ID].as_slice(), self.id, - &mut self.tc_source_tx, + &self.tc_source_tx, &mut dummy, )?; Ok(()) diff --git a/src/main.rs b/src/main.rs index 67543c2..eb669d3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,7 +7,7 @@ use std::{ use log::info; use ops_sat_rs::config::{ - components::{CONTROLLER_ID, TCP_SERVER, UDP_SERVER}, + components::{CONTROLLER_ID, TCP_SERVER, TCP_SPP_CLIENT, UDP_SERVER}, tasks::{FREQ_MS_CTRL, FREQ_MS_PUS_STACK}, EXPERIMENT_APID, }; @@ -17,7 +17,6 @@ use satrs::{ spacepackets::PacketId, }; -use crate::pus::{PusTcDistributor, PusTcMpscRouter}; use crate::tmtc::tc_source::TcSourceTaskDynamic; use crate::tmtc::tm_sink::TmFunnelDynamic; use crate::{controller::ExperimentController, pus::test::create_test_service}; @@ -26,6 +25,10 @@ use crate::{ interface::udp_server::{DynamicUdpTmHandler, UdpTmtcServer}, logger::setup_logger, }; +use crate::{ + interface::tcp_spp_client::TcpSppClient, + pus::{PusTcDistributor, PusTcMpscRouter}, +}; use crate::{ pus::{action::create_action_service, stack::PusStack}, requests::GenericRequestRouter, @@ -162,6 +165,9 @@ fn main() { stop_signal.clone(), ); + let mut tcp_spp_client = TcpSppClient::new(TCP_SPP_CLIENT.id(), tc_source_tx) + .expect("creating TCP SPP client failed"); + info!("Starting CTRL task"); let ctrl_stop_signal = stop_signal.clone(); let jh_ctrl_thread = thread::Builder::new() @@ -192,15 +198,33 @@ fn main() { }) .unwrap(); - let tcp_stop_signal = stop_signal.clone(); - info!("Starting TCP task"); - let jh_tcp = thread::Builder::new() - .name("ops-sat tcp".to_string()) + let tcp_server_stop_signal = stop_signal.clone(); + info!("Starting TCP server task"); + let jh_tcp_server = thread::Builder::new() + .name("ops-sat tcp-server".to_string()) .spawn(move || { info!("Running TCP server on port {SERVER_PORT}"); loop { tcp_server.periodic_operation(); - if tcp_stop_signal.load(std::sync::atomic::Ordering::Relaxed) { + if tcp_server_stop_signal.load(std::sync::atomic::Ordering::Relaxed) { + break; + } + } + }) + .unwrap(); + + // We could also move this to the existing TCP server thread, but we would have to adapt + // the server code for this so we do not block anymore and we pause manually if both the client + // and server are IDLE and have nothing to do.. + let tcp_client_stop_signal = stop_signal.clone(); + info!("Starting TCP SPP client task"); + let jh_tcp_client = thread::Builder::new() + .name("ops-sat tcp-client".to_string()) + .spawn(move || { + info!("Running TCP SPP client"); + loop { + let _result = tcp_spp_client.periodic_operation(); + if tcp_client_stop_signal.load(std::sync::atomic::Ordering::Relaxed) { break; } } @@ -238,9 +262,12 @@ fn main() { jh_udp_tmtc .join() .expect("Joining UDP TMTC server thread failed"); - jh_tcp + jh_tcp_server .join() .expect("Joining TCP TMTC server thread failed"); + jh_tcp_client + .join() + .expect("Joining TCP TMTC client thread failed"); jh_tm_funnel .join() .expect("Joining TM Funnel thread failed"); diff --git a/src/pus/mod.rs b/src/pus/mod.rs index bc323b0..2f88a26 100644 --- a/src/pus/mod.rs +++ b/src/pus/mod.rs @@ -15,12 +15,13 @@ use satrs::pus::{ ActiveRequestMapProvider, ActiveRequestProvider, EcssTcAndToken, EcssTcInMemConverter, EcssTcInVecConverter, EcssTmSender, EcssTmtcError, GenericConversionError, GenericRoutingError, MpscTcReceiver, MpscTmAsVecSender, PusPacketHandlerResult, PusPacketHandlingError, - PusReplyHandler, PusRequestRouter, PusServiceHelper, PusTcToRequestConverter, + PusReplyHandler, PusRequestRouter, PusServiceHelper, PusTcToRequestConverter, TcInMemory, }; use satrs::queue::{GenericReceiveError, GenericSendError}; use satrs::request::{Apid, GenericMessage, MessageMetadata}; use satrs::spacepackets::ecss::tc::PusTcReader; use satrs::spacepackets::ecss::{PusPacket, PusServiceId}; +use satrs::tmtc::PacketAsVec; use satrs::ComponentId; use std::fmt::Debug; use std::sync::mpsc::{self, Sender}; @@ -102,14 +103,15 @@ impl PusTcDistributor { .acceptance_success(&self.tm_sender, init_token, self.stamp_helper.stamp()) .expect("Acceptance success failure"); let service = PusServiceId::try_from(pus_tc.service()); + let tc_in_memory = TcInMemory::Vec(PacketAsVec::new(sender_id, tc)); match service { Ok(standard_service) => match standard_service { PusServiceId::Test => self.pus_router.test_tc_sender.send(EcssTcAndToken { - tc_in_memory: tc.into(), + tc_in_memory, token: Some(accepted_token.into()), })?, PusServiceId::Action => self.pus_router.action_tc_sender.send(EcssTcAndToken { - tc_in_memory: tc.into(), + tc_in_memory, token: Some(accepted_token.into()), })?, // PusServiceId::Event => self.pus_router.event_tc_sender.send(EcssTcAndToken { From b359ff9d33e647b9f32189b05dfb11cf45d10c08 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Tue, 16 Apr 2024 09:59:31 +0200 Subject: [PATCH 5/7] updated TCP code --- Cargo.lock | 8 ++++---- Cargo.toml | 8 ++++++++ src/config.rs | 5 +++++ src/interface/mod.rs | 32 ++++++++++++++++++++++++++++++++ src/interface/tcp_server.rs | 10 ++++++---- src/interface/tcp_spp_client.rs | 22 +++++++++++++++++++--- src/main.rs | 14 ++++++-------- 7 files changed, 80 insertions(+), 19 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c83fe31..570d8ac 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -590,7 +590,7 @@ checksum = "e86697c916019a8588c99b5fac3cead74ec0b4b819707a682fd4d23fa0ce1ba1" [[package]] name = "satrs" version = "0.2.0-rc.0" -source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=rework-tmtc-modules#a077c32f3c977f79c9d165e8f3bff66f1d81a669" +source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=rework-tmtc-modules#29c0961fab78a0e192e5fc918c7e07ccf20d39a6" dependencies = [ "bus", "cobs", @@ -615,7 +615,7 @@ dependencies = [ [[package]] name = "satrs-mib" version = "0.1.1" -source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=rework-tmtc-modules#a077c32f3c977f79c9d165e8f3bff66f1d81a669" +source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=rework-tmtc-modules#29c0961fab78a0e192e5fc918c7e07ccf20d39a6" dependencies = [ "csv", "satrs-mib-codegen", @@ -627,7 +627,7 @@ dependencies = [ [[package]] name = "satrs-mib-codegen" version = "0.1.1" -source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=rework-tmtc-modules#a077c32f3c977f79c9d165e8f3bff66f1d81a669" +source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=rework-tmtc-modules#29c0961fab78a0e192e5fc918c7e07ccf20d39a6" dependencies = [ "proc-macro2", "quote", @@ -637,7 +637,7 @@ dependencies = [ [[package]] name = "satrs-shared" version = "0.1.3" -source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=rework-tmtc-modules#a077c32f3c977f79c9d165e8f3bff66f1d81a669" +source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=rework-tmtc-modules#29c0961fab78a0e192e5fc918c7e07ccf20d39a6" dependencies = [ "serde", "spacepackets", diff --git a/Cargo.toml b/Cargo.toml index f1c0533..fdfac9a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,3 +29,11 @@ branch = "rework-tmtc-modules" [dev-dependencies] env_logger = "0.11" + +# I don't think we need insane performance. If anything, a small binary is easier to upload +# to the satellite. +[profile.release] +strip = true +opt-level = "z" # Optimize for size. +lto = true +codegen-units = 1 diff --git a/src/config.rs b/src/config.rs index 4295e75..4f0ad9e 100644 --- a/src/config.rs +++ b/src/config.rs @@ -16,6 +16,11 @@ pub const TCP_SPP_SERVER_PORT: u16 = 4096; pub const EXPERIMENT_ID: u32 = 278; pub const EXPERIMENT_APID: u16 = 1024 + EXPERIMENT_ID as u16; pub const EXPERIMENT_PACKET_ID: PacketId = PacketId::new_for_tc(true, EXPERIMENT_APID); +pub const VALID_PACKET_ID_LIST: &[PacketId] = &[PacketId::new_for_tc(true, EXPERIMENT_APID)]; + +// TODO: Would be nice if this can be commanded as well.. +/// Can be enabled to print all SPP packets received from the SPP server on port 4096. +pub const SPP_CLIENT_WIRETAPPING_RX: bool = false; #[derive(Copy, Clone, PartialEq, Eq, Debug, TryFromPrimitive, IntoPrimitive)] #[repr(u8)] diff --git a/src/interface/mod.rs b/src/interface/mod.rs index 6439a55..5da763d 100644 --- a/src/interface/mod.rs +++ b/src/interface/mod.rs @@ -1,4 +1,36 @@ +use derive_new::new; +use ops_sat_rs::config::SPP_CLIENT_WIRETAPPING_RX; +use satrs::{ + encoding::ccsds::{SpValidity, SpacePacketValidator}, + spacepackets::PacketId, +}; + pub mod can; pub mod tcp_server; pub mod tcp_spp_client; pub mod udp_server; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum TcpComponent { + Server, + Client, +} + +#[derive(new, Clone)] +pub struct SimpleSpValidator { + component: TcpComponent, + valid_ids: Vec, +} + +impl SpacePacketValidator for SimpleSpValidator { + fn validate(&self, sp_header: &satrs::spacepackets::SpHeader, raw_buf: &[u8]) -> SpValidity { + if SPP_CLIENT_WIRETAPPING_RX && self.component == TcpComponent::Client { + log::debug!("sp header: {:?}", sp_header); + log::debug!("raw data: {:x?}", raw_buf); + } + if self.valid_ids.contains(&sp_header.packet_id) { + return SpValidity::Valid; + } + SpValidity::Ignore + } +} diff --git a/src/interface/tcp_server.rs b/src/interface/tcp_server.rs index 42b7b40..c3275c6 100644 --- a/src/interface/tcp_server.rs +++ b/src/interface/tcp_server.rs @@ -13,6 +13,8 @@ use satrs::{ tmtc::{PacketAsVec, PacketSource}, }; +use super::{SimpleSpValidator, TcpComponent}; + #[derive(Default, Clone)] pub struct SyncTcpTmSource { tm_queue: Arc>>>, @@ -84,7 +86,7 @@ impl HandledConnectionHandler for ConnectionFinishedHandler { pub type TcpServer = TcpSpacepacketsServer< SyncTcpTmSource, mpsc::Sender, - Vec, + SimpleSpValidator, ConnectionFinishedHandler, (), GenericSendError, @@ -97,14 +99,14 @@ impl TcpTask { cfg: ServerConfig, tm_source: SyncTcpTmSource, tc_sender: mpsc::Sender, - packet_id_lookup: Vec, + valid_ids: Vec, stop_signal: Arc, ) -> Result { Ok(Self(TcpSpacepacketsServer::new( cfg, tm_source, tc_sender, - packet_id_lookup, + SimpleSpValidator::new(TcpComponent::Server, valid_ids), ConnectionFinishedHandler::default(), Some(stop_signal), )?)) @@ -113,7 +115,7 @@ impl TcpTask { pub fn periodic_operation(&mut self) { let result = self .0 - .handle_next_connection(Some(Duration::from_millis(STOP_CHECK_FREQUENCY))); + .handle_all_connections(Some(Duration::from_millis(STOP_CHECK_FREQUENCY))); match result { Ok(_conn_result) => (), Err(e) => { diff --git a/src/interface/tcp_spp_client.rs b/src/interface/tcp_spp_client.rs index 20c15ee..24bedb2 100644 --- a/src/interface/tcp_spp_client.rs +++ b/src/interface/tcp_spp_client.rs @@ -6,13 +6,16 @@ use std::time::Duration; use mio::net::TcpStream; use mio::{Events, Interest, Poll, Token}; use ops_sat_rs::config::tasks::STOP_CHECK_FREQUENCY; -use ops_sat_rs::config::{EXPERIMENT_PACKET_ID, TCP_SPP_SERVER_PORT}; +use ops_sat_rs::config::{SPP_CLIENT_WIRETAPPING_RX, TCP_SPP_SERVER_PORT}; use satrs::encoding::ccsds::parse_buffer_for_ccsds_space_packets; use satrs::queue::GenericSendError; +use satrs::spacepackets::PacketId; use satrs::tmtc::PacketAsVec; use satrs::ComponentId; use thiserror::Error; +use super::{SimpleSpValidator, TcpComponent}; + #[derive(Debug, Error)] pub enum PacketForwardingError { #[error("send error: {0}")] @@ -28,10 +31,15 @@ pub struct TcpSppClient { client: TcpStream, read_buf: [u8; 4096], tc_source_tx: mpsc::Sender, + validator: SimpleSpValidator, } impl TcpSppClient { - pub fn new(id: ComponentId, tc_source_tx: mpsc::Sender) -> io::Result { + pub fn new( + id: ComponentId, + tc_source_tx: mpsc::Sender, + valid_ids: &'static [PacketId], + ) -> io::Result { let poll = Poll::new()?; let events = Events::with_capacity(128); let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 1)), TCP_SPP_SERVER_PORT); @@ -48,6 +56,7 @@ impl TcpSppClient { client, read_buf: [0; 4096], tc_source_tx, + validator: SimpleSpValidator::new(TcpComponent::Client, valid_ids.to_vec()), }) } @@ -85,10 +94,17 @@ impl TcpSppClient { read_bytes: usize, ) -> Result<(), PacketForwardingError> { let mut dummy = 0; + if SPP_CLIENT_WIRETAPPING_RX { + log::debug!( + "received {} bytes on TCP client: {:x?}", + read_bytes, + &self.read_buf[..read_bytes] + ); + } // This parser is able to deal with broken tail packets, but we ignore those for now.. parse_buffer_for_ccsds_space_packets( &mut self.read_buf[..read_bytes], - &[EXPERIMENT_PACKET_ID].as_slice(), + &self.validator, self.id, &self.tc_source_tx, &mut dummy, diff --git a/src/main.rs b/src/main.rs index eb669d3..fb7bfc4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,13 +9,10 @@ use log::info; use ops_sat_rs::config::{ components::{CONTROLLER_ID, TCP_SERVER, TCP_SPP_CLIENT, UDP_SERVER}, tasks::{FREQ_MS_CTRL, FREQ_MS_PUS_STACK}, - EXPERIMENT_APID, + VALID_PACKET_ID_LIST, }; use ops_sat_rs::config::{tasks::FREQ_MS_UDP_TMTC, OBSW_SERVER_ADDR, SERVER_PORT}; -use satrs::{ - hal::std::{tcp_server::ServerConfig, udp_server::UdpTcServer}, - spacepackets::PacketId, -}; +use satrs::hal::std::{tcp_server::ServerConfig, udp_server::UdpTcServer}; use crate::tmtc::tc_source::TcSourceTaskDynamic; use crate::tmtc::tm_sink::TmFunnelDynamic; @@ -147,7 +144,7 @@ fn main() { tcp_server_cfg, sync_tm_tcp_source.clone(), tc_source_tx.clone(), - vec![PacketId::new_for_tc(true, EXPERIMENT_APID)], + VALID_PACKET_ID_LIST.to_vec(), stop_signal.clone(), ) .expect("tcp server creation failed"); @@ -165,8 +162,9 @@ fn main() { stop_signal.clone(), ); - let mut tcp_spp_client = TcpSppClient::new(TCP_SPP_CLIENT.id(), tc_source_tx) - .expect("creating TCP SPP client failed"); + let mut tcp_spp_client = + TcpSppClient::new(TCP_SPP_CLIENT.id(), tc_source_tx, VALID_PACKET_ID_LIST) + .expect("creating TCP SPP client failed"); info!("Starting CTRL task"); let ctrl_stop_signal = stop_signal.clone(); From b5e048a13b511782ec00df051aae0bc4f2bbdc8a Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Tue, 16 Apr 2024 10:05:27 +0200 Subject: [PATCH 6/7] start TM handling for TCP client --- src/interface/tcp_spp_client.rs | 3 +++ src/main.rs | 11 ++++++++--- src/tmtc/tm_sink.rs | 20 ++++++++++++++------ 3 files changed, 25 insertions(+), 9 deletions(-) diff --git a/src/interface/tcp_spp_client.rs b/src/interface/tcp_spp_client.rs index 24bedb2..894c693 100644 --- a/src/interface/tcp_spp_client.rs +++ b/src/interface/tcp_spp_client.rs @@ -30,6 +30,7 @@ pub struct TcpSppClient { events: Events, client: TcpStream, read_buf: [u8; 4096], + tm_tcp_client_rx: mpsc::Receiver, tc_source_tx: mpsc::Sender, validator: SimpleSpValidator, } @@ -38,6 +39,7 @@ impl TcpSppClient { pub fn new( id: ComponentId, tc_source_tx: mpsc::Sender, + tm_tcp_client_rx: mpsc::Receiver, valid_ids: &'static [PacketId], ) -> io::Result { let poll = Poll::new()?; @@ -55,6 +57,7 @@ impl TcpSppClient { events, client, read_buf: [0; 4096], + tm_tcp_client_rx, tc_source_tx, validator: SimpleSpValidator::new(TcpComponent::Client, valid_ids.to_vec()), }) diff --git a/src/main.rs b/src/main.rs index fb7bfc4..038cc4e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -47,6 +47,7 @@ fn main() { let (tc_source_tx, tc_source_rx) = mpsc::channel(); let (tm_funnel_tx, tm_funnel_rx) = mpsc::channel(); let (tm_server_tx, tm_server_rx) = mpsc::channel(); + let (tm_tcp_client_tx, tm_tcp_client_rx) = mpsc::channel(); let (pus_test_tx, pus_test_rx) = mpsc::channel(); // let (pus_event_tx, pus_event_rx) = mpsc::channel(); @@ -162,9 +163,13 @@ fn main() { stop_signal.clone(), ); - let mut tcp_spp_client = - TcpSppClient::new(TCP_SPP_CLIENT.id(), tc_source_tx, VALID_PACKET_ID_LIST) - .expect("creating TCP SPP client failed"); + let mut tcp_spp_client = TcpSppClient::new( + TCP_SPP_CLIENT.id(), + tc_source_tx, + tm_tcp_client_rx, + VALID_PACKET_ID_LIST, + ) + .expect("creating TCP SPP client failed"); info!("Starting CTRL task"); let ctrl_stop_signal = stop_signal.clone(); diff --git a/src/tmtc/tm_sink.rs b/src/tmtc/tm_sink.rs index 9dfac6f..642b106 100644 --- a/src/tmtc/tm_sink.rs +++ b/src/tmtc/tm_sink.rs @@ -77,7 +77,8 @@ impl TmFunnelCommon { pub struct TmFunnelDynamic { common: TmFunnelCommon, tm_funnel_rx: mpsc::Receiver, - tm_server_tx: mpsc::Sender, + tm_udp_server_tx: mpsc::Sender, + tm_tcp_client_tx: mpsc::Sender, stop_signal: Arc, } @@ -85,13 +86,15 @@ impl TmFunnelDynamic { pub fn new( sync_tm_tcp_source: SyncTcpTmSource, tm_funnel_rx: mpsc::Receiver, - tm_server_tx: mpsc::Sender, + tm_udp_server_tx: mpsc::Sender, + tm_tcp_client_tx: mpsc::Sender, stop_signal: Arc, ) -> Self { Self { common: TmFunnelCommon::new(sync_tm_tcp_source), tm_funnel_rx, - tm_server_tx, + tm_udp_server_tx, + tm_tcp_client_tx, stop_signal, } } @@ -110,9 +113,14 @@ impl TmFunnelDynamic { .expect("Creating TM zero copy writer failed"); self.common.apply_packet_processing(zero_copy_writer); self.common.sync_tm_tcp_source.add_tm(&tm.packet); - self.tm_server_tx - .send(tm) - .expect("Sending TM to server failed"); + let result = self.tm_udp_server_tx.send(tm); + if result.is_err() { + log::error!("TM UDP server has disconnected"); + } + let result = self.tm_tcp_client_tx.send(tm); + if result.is_err() { + log::error!("TM TCP client has disconnected"); + } if self.stop_signal.load(std::sync::atomic::Ordering::Relaxed) { break; } From 8ce305491b4e6d2b5798ae0b3cc02c0f8924d0a5 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Tue, 16 Apr 2024 13:08:10 +0200 Subject: [PATCH 7/7] extend mio client to allow reconnection --- Cargo.lock | 8 +-- Cargo.toml | 4 +- src/interface/mod.rs | 2 +- src/interface/tcp_spp_client.rs | 102 +++++++++++++++++++++++++++----- src/main.rs | 7 ++- src/tmtc/tm_sink.rs | 2 +- 6 files changed, 100 insertions(+), 25 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 570d8ac..4e5f4ca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -590,7 +590,7 @@ checksum = "e86697c916019a8588c99b5fac3cead74ec0b4b819707a682fd4d23fa0ce1ba1" [[package]] name = "satrs" version = "0.2.0-rc.0" -source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=rework-tmtc-modules#29c0961fab78a0e192e5fc918c7e07ccf20d39a6" +source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#786671bbd785ecee4282985c730f9655134a87f9" dependencies = [ "bus", "cobs", @@ -615,7 +615,7 @@ dependencies = [ [[package]] name = "satrs-mib" version = "0.1.1" -source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=rework-tmtc-modules#29c0961fab78a0e192e5fc918c7e07ccf20d39a6" +source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#786671bbd785ecee4282985c730f9655134a87f9" dependencies = [ "csv", "satrs-mib-codegen", @@ -627,7 +627,7 @@ dependencies = [ [[package]] name = "satrs-mib-codegen" version = "0.1.1" -source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=rework-tmtc-modules#29c0961fab78a0e192e5fc918c7e07ccf20d39a6" +source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#786671bbd785ecee4282985c730f9655134a87f9" dependencies = [ "proc-macro2", "quote", @@ -637,7 +637,7 @@ dependencies = [ [[package]] name = "satrs-shared" version = "0.1.3" -source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=rework-tmtc-modules#29c0961fab78a0e192e5fc918c7e07ccf20d39a6" +source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#786671bbd785ecee4282985c730f9655134a87f9" dependencies = [ "serde", "spacepackets", diff --git a/Cargo.toml b/Cargo.toml index fdfac9a..a082321 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,13 +19,13 @@ mio = "0.8" [dependencies.satrs] version = "0.2.0-rc.0" git = "https://egit.irs.uni-stuttgart.de/rust/sat-rs.git" -branch = "rework-tmtc-modules" +branch = "main" features = ["test_util"] [dependencies.satrs-mib] version = "0.1.1" git = "https://egit.irs.uni-stuttgart.de/rust/sat-rs.git" -branch = "rework-tmtc-modules" +branch = "main" [dev-dependencies] env_logger = "0.11" diff --git a/src/interface/mod.rs b/src/interface/mod.rs index 5da763d..6b6038c 100644 --- a/src/interface/mod.rs +++ b/src/interface/mod.rs @@ -31,6 +31,6 @@ impl SpacePacketValidator for SimpleSpValidator { if self.valid_ids.contains(&sp_header.packet_id) { return SpValidity::Valid; } - SpValidity::Ignore + SpValidity::Skip } } diff --git a/src/interface/tcp_spp_client.rs b/src/interface/tcp_spp_client.rs index 894c693..847f000 100644 --- a/src/interface/tcp_spp_client.rs +++ b/src/interface/tcp_spp_client.rs @@ -1,4 +1,4 @@ -use std::io::{self, Read}; +use std::io::{self, Read, Write}; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::mpsc; use std::time::Duration; @@ -28,9 +28,11 @@ pub struct TcpSppClient { id: ComponentId, poll: Poll, events: Events, - client: TcpStream, + // Optional to allow periodic reconnection attempts on the TCP server. + client: Option, read_buf: [u8; 4096], tm_tcp_client_rx: mpsc::Receiver, + server_addr: SocketAddr, tc_source_tx: mpsc::Sender, validator: SimpleSpValidator, } @@ -42,28 +44,75 @@ impl TcpSppClient { tm_tcp_client_rx: mpsc::Receiver, valid_ids: &'static [PacketId], ) -> io::Result { - let poll = Poll::new()?; + let mut poll = Poll::new()?; let events = Events::with_capacity(128); - let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 1)), TCP_SPP_SERVER_PORT); - let mut client = TcpStream::connect(addr)?; - poll.registry().register( - &mut client, - Token(0), - Interest::READABLE | Interest::WRITABLE, - )?; + let server_addr = + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 1)), TCP_SPP_SERVER_PORT); + let client = Self::attempt_connection(&mut poll, &server_addr); + if client.is_err() { + log::warn!( + "connection to TCP server {} failed: {}", + server_addr, + client.unwrap_err() + ); + return Ok(Self { + id, + poll, + events, + client: None, + read_buf: [0; 4096], + server_addr, + tm_tcp_client_rx, + tc_source_tx, + validator: SimpleSpValidator::new(TcpComponent::Client, valid_ids.to_vec()), + }); + } Ok(Self { id, poll, events, - client, + client: Some(client.unwrap()), read_buf: [0; 4096], + server_addr, tm_tcp_client_rx, tc_source_tx, validator: SimpleSpValidator::new(TcpComponent::Client, valid_ids.to_vec()), }) } + pub fn attempt_connection(poll: &mut Poll, server_addr: &SocketAddr) -> io::Result { + let mut client = TcpStream::connect(*server_addr)?; + poll.registry().register( + &mut client, + Token(0), + Interest::READABLE | Interest::WRITABLE, + )?; + Ok(client) + } + pub fn periodic_operation(&mut self) -> Result<(), PacketForwardingError> { + if self.client.is_some() { + return self.perform_regular_operation(); + } else { + let client_result = Self::attempt_connection(&mut self.poll, &self.server_addr); + match client_result { + Ok(client) => { + self.client = Some(client); + self.perform_regular_operation()?; + } + Err(ref e) => { + log::warn!( + "connection to TCP server {} failed: {}", + self.server_addr, + e + ); + } + } + } + Ok(()) + } + + pub fn perform_regular_operation(&mut self) -> Result<(), PacketForwardingError> { self.poll.poll( &mut self.events, Some(Duration::from_millis(STOP_CHECK_FREQUENCY)), @@ -75,8 +124,7 @@ impl TcpSppClient { self.read_from_server()?; } if event.is_writable() { - // Read packets from a queue and send them here.. - // self.client.write_all(b"hello")?; + self.write_to_server()?; } } } @@ -84,7 +132,11 @@ impl TcpSppClient { } pub fn read_from_server(&mut self) -> Result<(), PacketForwardingError> { - match self.client.read(&mut self.read_buf) { + let client = self + .client + .as_mut() + .expect("TCP stream invalid when it should not be"); + match client.read(&mut self.read_buf) { Ok(0) => return Err(io::Error::from(io::ErrorKind::BrokenPipe).into()), Ok(read_bytes) => self.handle_read_bytstream(read_bytes)?, Err(e) => return Err(e.into()), @@ -92,6 +144,28 @@ impl TcpSppClient { Ok(()) } + pub fn write_to_server(&mut self) -> io::Result<()> { + let client = self + .client + .as_mut() + .expect("TCP stream invalid when it should not be"); + loop { + match self.tm_tcp_client_rx.try_recv() { + Ok(tm) => { + client.write_all(&tm.packet)?; + } + Err(e) => match e { + mpsc::TryRecvError::Empty => break, + mpsc::TryRecvError::Disconnected => { + log::error!("TM sender to TCP client has disconnected"); + break; + } + }, + } + } + Ok(()) + } + pub fn handle_read_bytstream( &mut self, read_bytes: usize, diff --git a/src/main.rs b/src/main.rs index 038cc4e..afc0843 100644 --- a/src/main.rs +++ b/src/main.rs @@ -46,7 +46,7 @@ fn main() { let (tc_source_tx, tc_source_rx) = mpsc::channel(); let (tm_funnel_tx, tm_funnel_rx) = mpsc::channel(); - let (tm_server_tx, tm_server_rx) = mpsc::channel(); + let (tm_tcp_server_tx, tm_tcp_server_rx) = mpsc::channel(); let (tm_tcp_client_tx, tm_tcp_client_rx) = mpsc::channel(); let (pus_test_tx, pus_test_rx) = mpsc::channel(); @@ -129,7 +129,7 @@ fn main() { let mut udp_tmtc_server = UdpTmtcServer { udp_tc_server, tm_handler: DynamicUdpTmHandler { - tm_rx: tm_server_rx, + tm_rx: tm_tcp_server_rx, }, }; @@ -153,7 +153,8 @@ fn main() { let mut tm_funnel = TmFunnelDynamic::new( sync_tm_tcp_source, tm_funnel_rx, - tm_server_tx, + tm_tcp_server_tx, + tm_tcp_client_tx, stop_signal.clone(), ); diff --git a/src/tmtc/tm_sink.rs b/src/tmtc/tm_sink.rs index 642b106..fdb8043 100644 --- a/src/tmtc/tm_sink.rs +++ b/src/tmtc/tm_sink.rs @@ -113,7 +113,7 @@ impl TmFunnelDynamic { .expect("Creating TM zero copy writer failed"); self.common.apply_packet_processing(zero_copy_writer); self.common.sync_tm_tcp_source.add_tm(&tm.packet); - let result = self.tm_udp_server_tx.send(tm); + let result = self.tm_udp_server_tx.send(tm.clone()); if result.is_err() { log::error!("TM UDP server has disconnected"); }