From 6dddfd5a709007fa779511c05fb9e2e70e3663cb Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Sat, 13 Apr 2024 15:16:53 +0200 Subject: [PATCH] start with mio tcp client --- Cargo.lock | 1 + Cargo.toml | 1 + README.md | 1 + src/config.rs | 1 + src/interface/mod.rs | 5 +- src/interface/{tcp.rs => tcp_server.rs} | 0 src/interface/tcp_spp_client.rs | 83 +++++++++++++++ src/interface/{udp.rs => udp_server.rs} | 0 src/main.rs | 4 +- src/pus/mod.rs | 4 +- src/tmtc/ccsds.rs | 49 --------- src/tmtc/mod.rs | 99 +----------------- src/tmtc/tc_source.rs | 128 ++++++++++++++++++++++++ src/tmtc/{tm_funnel.rs => tm_sink.rs} | 2 +- 14 files changed, 225 insertions(+), 153 deletions(-) rename src/interface/{tcp.rs => tcp_server.rs} (100%) create mode 100644 src/interface/tcp_spp_client.rs rename src/interface/{udp.rs => udp_server.rs} (100%) delete mode 100644 src/tmtc/ccsds.rs create mode 100644 src/tmtc/tc_source.rs rename src/tmtc/{tm_funnel.rs => tm_sink.rs} (98%) diff --git a/Cargo.lock b/Cargo.lock index d9d0993..01d6b53 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -483,6 +483,7 @@ dependencies = [ "fern", "lazy_static", "log", + "mio", "num_enum", "satrs", "satrs-mib", diff --git a/Cargo.toml b/Cargo.toml index 0b0fd40..3c4e1b4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ strum = { version = "0.26", features = ["derive"] } thiserror = "1" derive-new = "0.6" num_enum = "0.7" +mio = "0.8" [dependencies.satrs] version = "0.2.0-rc.0" diff --git a/README.md b/README.md index c0b42a6..069dd95 100644 --- a/README.md +++ b/README.md @@ -34,6 +34,7 @@ appears to be a useful source for documentation. - [OBSW documents](https://opssat1.esoc.esa.int/projects/experimenter-information/dmsf?folder_id=7) - [Software Integration Process](https://opssat1.esoc.esa.int/dmsf/files/34/view) +- [SPP/TCP bridge](https://opssat1.esoc.esa.int/dmsf/files/65/view) - [Cross-compiling SEPP](https://opssat1.esoc.esa.int/projects/experimenter-information/wiki/Cross-compiling_SEPP_application) - [TMTC infrastructure](https://opssat1.esoc.esa.int/projects/experimenter-information/wiki/Live_TM_TC_data) - [Submitting an Experiment](https://opssat1.esoc.esa.int/projects/experimenter-information/wiki/Building_and_submitting_your_application_to_ESOC) diff --git a/src/config.rs b/src/config.rs index ef391a7..78938bc 100644 --- a/src/config.rs +++ b/src/config.rs @@ -11,6 +11,7 @@ pub const HOME_FOLER_EXPERIMENT: &str = "/home/exp278"; pub const OBSW_SERVER_ADDR: Ipv4Addr = Ipv4Addr::UNSPECIFIED; pub const SERVER_PORT: u16 = 7301; +pub const TCP_SPP_SERVER_PORT: u16 = 4096; pub const EXPERIMENT_ID: u32 = 278; pub const EXPERIMENT_APID: u16 = 1024 + EXPERIMENT_ID as u16; diff --git a/src/interface/mod.rs b/src/interface/mod.rs index cc4703d..6439a55 100644 --- a/src/interface/mod.rs +++ b/src/interface/mod.rs @@ -1,3 +1,4 @@ pub mod can; -pub mod tcp; -pub mod udp; +pub mod tcp_server; +pub mod tcp_spp_client; +pub mod udp_server; diff --git a/src/interface/tcp.rs b/src/interface/tcp_server.rs similarity index 100% rename from src/interface/tcp.rs rename to src/interface/tcp_server.rs diff --git a/src/interface/tcp_spp_client.rs b/src/interface/tcp_spp_client.rs new file mode 100644 index 0000000..3108f9a --- /dev/null +++ b/src/interface/tcp_spp_client.rs @@ -0,0 +1,83 @@ +use std::io::{self, Read}; +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::sync::mpsc; + +use mio::net::TcpStream; +use mio::{Events, Interest, Poll, Token}; +use ops_sat_rs::config::TCP_SPP_SERVER_PORT; +use satrs::spacepackets::ecss::CCSDS_HEADER_LEN; +use satrs::spacepackets::{CcsdsPacket, SpHeader}; + +pub struct TcpSppClient { + poll: Poll, + events: Events, + client: TcpStream, + read_buf: [u8; 4096], + tc_source_tx: mpsc::Sender>, +} + +impl TcpSppClient { + pub fn new() -> io::Result { + let poll = Poll::new()?; + let events = Events::with_capacity(128); + let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 1)), TCP_SPP_SERVER_PORT); + let mut client = TcpStream::connect(addr)?; + poll.registry().register( + &mut client, + Token(0), + Interest::READABLE | Interest::WRITABLE, + )?; + Ok(Self { + poll, + events, + client, + read_buf: [0; 4096], + }) + } + + pub fn periodic_operation(&mut self) -> io::Result<()> { + self.poll.poll(&mut self.events, None)?; + let events: Vec = self.events.iter().cloned().collect(); + for event in events { + if event.token() == Token(0) { + if event.is_readable() { + self.read_from_server()?; + } + if event.is_writable() { + // Read packets from a queue and send them here.. + // self.client.write_all(b"hello")?; + } + } + } + Ok(()) + } + + pub fn read_from_server(&mut self) -> io::Result<()> { + match self.client.read(&mut self.read_buf) { + Ok(0) => return Err(io::Error::from(io::ErrorKind::BrokenPipe)), + Ok(n) => { + if n < CCSDS_HEADER_LEN + 1 { + log::warn!("received packet smaller than minimum expected packet size."); + log::debug!("{:?}", &self.read_buf[..n]); + return Ok(()); + } + // We already checked that the received size has the minimum expected size. + let (sp_header, data) = + SpHeader::from_be_bytes(&self.read_buf[..n]).expect("parsing SP header failed"); + let full_expected_packet_len = sp_header.total_len(); + // We received an incomplete frame? + if n < full_expected_packet_len { + log::warn!( + "received incomplete SPP, with detected packet length {} but read buffer length {}", + full_expected_packet_len, n + ); + return Ok(()); + } + self.tc_source_tx + .send(self.read_buf[0..full_expected_packet_len].to_vec()); + } + Err(e) => return Err(e), + } + Ok(()) + } +} diff --git a/src/interface/udp.rs b/src/interface/udp_server.rs similarity index 100% rename from src/interface/udp.rs rename to src/interface/udp_server.rs diff --git a/src/main.rs b/src/main.rs index db66e01..a03cba6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -23,8 +23,8 @@ use crate::tmtc::tm_funnel::TmFunnelDynamic; use crate::tmtc::TcSourceTaskDynamic; use crate::{controller::ExperimentController, pus::test::create_test_service}; use crate::{ - interface::tcp::{SyncTcpTmSource, TcpTask}, - interface::udp::{DynamicUdpTmHandler, UdpTmtcServer}, + interface::tcp_server::{SyncTcpTmSource, TcpTask}, + interface::udp_server::{DynamicUdpTmHandler, UdpTmtcServer}, logger::setup_logger, tmtc::ccsds::CcsdsReceiver, tmtc::PusTcSourceProviderDynamic, diff --git a/src/pus/mod.rs b/src/pus/mod.rs index eda55d4..3731892 100644 --- a/src/pus/mod.rs +++ b/src/pus/mod.rs @@ -59,7 +59,7 @@ pub struct PusTcMpscRouter { // pub mode_tc_sender: Sender, } -pub struct PusReceiver { +pub struct PusTcDistributor { pub id: ComponentId, pub tm_sender: TmSender, pub verif_reporter: VerificationReporter, @@ -67,7 +67,7 @@ pub struct PusReceiver { stamp_helper: TimeStampHelper, } -impl PusReceiver { +impl PusTcDistributor { pub fn new(tm_sender: TmSender, pus_router: PusTcMpscRouter) -> Self { Self { id: PUS_ROUTING_SERVICE.raw(), diff --git a/src/tmtc/ccsds.rs b/src/tmtc/ccsds.rs deleted file mode 100644 index 97edf26..0000000 --- a/src/tmtc/ccsds.rs +++ /dev/null @@ -1,49 +0,0 @@ -use ops_sat_rs::config::EXPERIMENT_APID; -use satrs::pus::ReceivesEcssPusTc; -use satrs::spacepackets::{CcsdsPacket, SpHeader}; -use satrs::tmtc::{CcsdsPacketHandler, ReceivesCcsdsTc}; -use satrs::ValidatorU16Id; - -#[derive(Clone)] -pub struct CcsdsReceiver< - TcSource: ReceivesCcsdsTc + ReceivesEcssPusTc + Clone, - E, -> { - pub tc_source: TcSource, -} - -impl< - TcSource: ReceivesCcsdsTc + ReceivesEcssPusTc + Clone + 'static, - E: 'static, - > ValidatorU16Id for CcsdsReceiver -{ - fn validate(&self, apid: u16) -> bool { - apid == EXPERIMENT_APID - } -} - -impl< - TcSource: ReceivesCcsdsTc + ReceivesEcssPusTc + Clone + 'static, - E: 'static, - > CcsdsPacketHandler for CcsdsReceiver -{ - type Error = E; - - fn handle_packet_with_valid_apid( - &mut self, - sp_header: &SpHeader, - tc_raw: &[u8], - ) -> Result<(), Self::Error> { - self.tc_source.pass_ccsds(sp_header, tc_raw) - } - - fn handle_packet_with_unknown_apid( - &mut self, - sp_header: &SpHeader, - _tc_raw: &[u8], - ) -> Result<(), Self::Error> { - // TODO: Log event as telemetry or something similar? - log::warn!("unknown APID 0x{:x?} detected", sp_header.apid()); - Ok(()) - } -} diff --git a/src/tmtc/mod.rs b/src/tmtc/mod.rs index fc28a37..bfd24c5 100644 --- a/src/tmtc/mod.rs +++ b/src/tmtc/mod.rs @@ -1,97 +1,2 @@ -use crate::pus::PusReceiver; -use satrs::pool::{StoreAddr, StoreError}; -use satrs::pus::{EcssTcAndToken, MpscTmAsVecSender}; -use satrs::spacepackets::ecss::PusPacket; -use satrs::{ - pus::ReceivesEcssPusTc, - spacepackets::{ecss::tc::PusTcReader, SpHeader}, - tmtc::ReceivesCcsdsTc, -}; -use std::sync::mpsc::{self, SendError, Sender, TryRecvError}; -use thiserror::Error; - -pub mod ccsds; -pub mod tm_funnel; - -#[derive(Debug, Clone, PartialEq, Eq, Error)] -pub enum MpscStoreAndSendError { - #[error("Store error: {0}")] - Store(#[from] StoreError), - #[error("TC send error: {0}")] - TcSend(#[from] SendError), - #[error("TMTC send error: {0}")] - TmTcSend(#[from] SendError), -} - -// Newtype, can not implement necessary traits on MPSC sender directly because of orphan rules. -#[derive(Clone)] -pub struct PusTcSourceProviderDynamic(pub Sender>); - -impl ReceivesEcssPusTc for PusTcSourceProviderDynamic { - type Error = SendError>; - - fn pass_pus_tc(&mut self, _: &SpHeader, pus_tc: &PusTcReader) -> Result<(), Self::Error> { - self.0.send(pus_tc.raw_data().to_vec())?; - Ok(()) - } -} - -impl ReceivesCcsdsTc for PusTcSourceProviderDynamic { - type Error = mpsc::SendError>; - - fn pass_ccsds(&mut self, _: &SpHeader, tc_raw: &[u8]) -> Result<(), Self::Error> { - self.0.send(tc_raw.to_vec())?; - Ok(()) - } -} - -// TC source components where the heap is the backing memory of the received telecommands. -pub struct TcSourceTaskDynamic { - pub tc_receiver: mpsc::Receiver>, - pus_receiver: PusReceiver, -} - -impl TcSourceTaskDynamic { - pub fn new( - tc_receiver: mpsc::Receiver>, - pus_receiver: PusReceiver, - ) -> Self { - Self { - tc_receiver, - pus_receiver, - } - } - - pub fn periodic_operation(&mut self) { - self.poll_tc(); - } - - pub fn poll_tc(&mut self) -> bool { - match self.tc_receiver.try_recv() { - Ok(tc) => match PusTcReader::new(&tc) { - Ok((pus_tc, _)) => { - self.pus_receiver - .handle_tc_packet( - satrs::pus::TcInMemory::Vec(tc.clone()), - pus_tc.service(), - &pus_tc, - ) - .ok(); - true - } - Err(e) => { - log::warn!("error creating PUS TC from raw data: {e}"); - log::warn!("raw data: {:x?}", tc); - true - } - }, - Err(e) => match e { - TryRecvError::Empty => false, - TryRecvError::Disconnected => { - log::warn!("tmtc thread: sender disconnected"); - false - } - }, - } - } -} +pub mod tc_source; +pub mod tm_sink; diff --git a/src/tmtc/tc_source.rs b/src/tmtc/tc_source.rs new file mode 100644 index 0000000..0371113 --- /dev/null +++ b/src/tmtc/tc_source.rs @@ -0,0 +1,128 @@ +use satrs::{pool::PoolProvider, tmtc::tc_helper::SharedTcPool}; +use std::sync::mpsc::{self, TryRecvError}; + +use satrs::{ + pool::StoreAddr, + pus::{MpscTmAsVecSender, MpscTmInSharedPoolSenderBounded}, + spacepackets::ecss::{tc::PusTcReader, PusPacket}, +}; + +use crate::pus::PusTcDistributor; + +// TC source components where static pools are the backing memory of the received telecommands. +pub struct TcSourceTaskStatic { + shared_tc_pool: SharedTcPool, + tc_receiver: mpsc::Receiver, + tc_buf: [u8; 4096], + pus_receiver: PusTcDistributor, +} + +impl TcSourceTaskStatic { + pub fn new( + shared_tc_pool: SharedTcPool, + tc_receiver: mpsc::Receiver, + pus_receiver: PusTcDistributor, + ) -> Self { + Self { + shared_tc_pool, + tc_receiver, + tc_buf: [0; 4096], + pus_receiver, + } + } + + pub fn periodic_operation(&mut self) { + self.poll_tc(); + } + + pub fn poll_tc(&mut self) -> bool { + match self.tc_receiver.try_recv() { + Ok(addr) => { + let pool = self + .shared_tc_pool + .0 + .read() + .expect("locking tc pool failed"); + pool.read(&addr, &mut self.tc_buf) + .expect("reading pool failed"); + drop(pool); + match PusTcReader::new(&self.tc_buf) { + Ok((pus_tc, _)) => { + self.pus_receiver + .handle_tc_packet( + satrs::pus::TcInMemory::StoreAddr(addr), + pus_tc.service(), + &pus_tc, + ) + .ok(); + true + } + Err(e) => { + log::warn!("error creating PUS TC from raw data: {e}"); + log::warn!("raw data: {:x?}", self.tc_buf); + true + } + } + } + Err(e) => match e { + TryRecvError::Empty => false, + TryRecvError::Disconnected => { + log::warn!("tmtc thread: sender disconnected"); + false + } + }, + } + } +} + +// TC source components where the heap is the backing memory of the received telecommands. +pub struct TcSourceTaskDynamic { + pub tc_receiver: mpsc::Receiver>, + pus_receiver: PusTcDistributor, +} + +impl TcSourceTaskDynamic { + pub fn new( + tc_receiver: mpsc::Receiver>, + pus_receiver: PusTcDistributor, + ) -> Self { + Self { + tc_receiver, + pus_receiver, + } + } + + pub fn periodic_operation(&mut self) { + self.poll_tc(); + } + + pub fn poll_tc(&mut self) -> bool { + // Right now, we only expect PUS packets. + match self.tc_receiver.try_recv() { + Ok(tc) => match PusTcReader::new(&tc) { + Ok((pus_tc, _)) => { + self.pus_receiver + .handle_tc_packet( + satrs::pus::TcInMemory::Vec(tc.clone()), + pus_tc.service(), + &pus_tc, + ) + .ok(); + true + } + Err(e) => { + log::warn!("error creating PUS TC from raw data: {e}"); + log::warn!("raw data: {:x?}", tc); + true + } + }, + Err(e) => match e { + TryRecvError::Empty => false, + TryRecvError::Disconnected => { + log::warn!("tmtc thread: sender disconnected"); + false + } + }, + } + } +} diff --git a/src/tmtc/tm_funnel.rs b/src/tmtc/tm_sink.rs similarity index 98% rename from src/tmtc/tm_funnel.rs rename to src/tmtc/tm_sink.rs index a0ef7b1..7e4fe2e 100644 --- a/src/tmtc/tm_funnel.rs +++ b/src/tmtc/tm_sink.rs @@ -14,7 +14,7 @@ use satrs::{ }, }; -use crate::interface::tcp::SyncTcpTmSource; +use crate::interface::tcp_server::SyncTcpTmSource; #[derive(Default)] pub struct CcsdsSeqCounterMap {