From d21e98d2e5c49cb2cb42f50c052826ebfa95454e Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Tue, 26 Sep 2023 23:00:47 +0200 Subject: [PATCH 01/11] start adding tcp server --- satrs-example/Cargo.toml | 2 +- satrs-mib/Cargo.toml | 4 ++-- satrs-mib/codegen/Cargo.toml | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/satrs-example/Cargo.toml b/satrs-example/Cargo.toml index 6e2bc16..0446f51 100644 --- a/satrs-example/Cargo.toml +++ b/satrs-example/Cargo.toml @@ -19,7 +19,7 @@ num_enum = "0.7" thiserror = "1" [dependencies.satrs-core] -# version = "0.1.0-alpha.0" +# version = "0.1.0-alpha.1" path = "../satrs-core" diff --git a/satrs-mib/Cargo.toml b/satrs-mib/Cargo.toml index a34d886..e901ebe 100644 --- a/satrs-mib/Cargo.toml +++ b/satrs-mib/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "satrs-mib" -version = "0.1.0-alpha.0" +version = "0.1.0-alpha.1" edition = "2021" rust-version = "1.61" authors = ["Robin Mueller "] @@ -30,7 +30,7 @@ version = "0.1.0-alpha.1" [dependencies.satrs-mib-codegen] path = "codegen" -version = "0.1.0-alpha.0" +version = "0.1.0-alpha.1" [dependencies.serde] version = "1" diff --git a/satrs-mib/codegen/Cargo.toml b/satrs-mib/codegen/Cargo.toml index cba83eb..9a3887c 100644 --- a/satrs-mib/codegen/Cargo.toml +++ b/satrs-mib/codegen/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "satrs-mib-codegen" -version = "0.1.0-alpha.0" +version = "0.1.0-alpha.1" edition = "2021" description = "satrs-mib proc macro implementation" homepage = "https://egit.irs.uni-stuttgart.de/rust/sat-rs" From b13e9b59ac412332b2eed303791b3b28c3edd018 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Tue, 26 Sep 2023 23:02:30 +0200 Subject: [PATCH 02/11] bump mib versions --- satrs-mib/Cargo.toml | 4 ++-- satrs-mib/codegen/Cargo.toml | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/satrs-mib/Cargo.toml b/satrs-mib/Cargo.toml index a34d886..e901ebe 100644 --- a/satrs-mib/Cargo.toml +++ b/satrs-mib/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "satrs-mib" -version = "0.1.0-alpha.0" +version = "0.1.0-alpha.1" edition = "2021" rust-version = "1.61" authors = ["Robin Mueller "] @@ -30,7 +30,7 @@ version = "0.1.0-alpha.1" [dependencies.satrs-mib-codegen] path = "codegen" -version = "0.1.0-alpha.0" +version = "0.1.0-alpha.1" [dependencies.serde] version = "1" diff --git a/satrs-mib/codegen/Cargo.toml b/satrs-mib/codegen/Cargo.toml index cba83eb..9a3887c 100644 --- a/satrs-mib/codegen/Cargo.toml +++ b/satrs-mib/codegen/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "satrs-mib-codegen" -version = "0.1.0-alpha.0" +version = "0.1.0-alpha.1" edition = "2021" description = "satrs-mib proc macro implementation" homepage = "https://egit.irs.uni-stuttgart.de/rust/sat-rs" From 7ca8d52368eb68ddcc16605d8625b8e9da3c23c7 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Tue, 26 Sep 2023 23:06:52 +0200 Subject: [PATCH 03/11] use explicit versions for sat-rs dependencies --- satrs-example/Cargo.toml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/satrs-example/Cargo.toml b/satrs-example/Cargo.toml index 0446f51..3ca175e 100644 --- a/satrs-example/Cargo.toml +++ b/satrs-example/Cargo.toml @@ -19,9 +19,9 @@ num_enum = "0.7" thiserror = "1" [dependencies.satrs-core] -# version = "0.1.0-alpha.1" -path = "../satrs-core" - +version = "0.1.0-alpha.1" +# path = "../satrs-core" [dependencies.satrs-mib] -path = "../satrs-mib" +version = "0.1.0-alpha.1" +# path = "../satrs-mib" From 5a3b9fb46b6ac574468f6d05285b6968f02c46c1 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Wed, 27 Sep 2023 00:21:03 +0200 Subject: [PATCH 04/11] why is this so problematic.. --- satrs-example/pyclient/requirements.txt | 2 +- satrs-example/src/main.rs | 27 ++- satrs-example/src/tmtc.rs | 233 +++++++++++------------- 3 files changed, 134 insertions(+), 128 deletions(-) diff --git a/satrs-example/pyclient/requirements.txt b/satrs-example/pyclient/requirements.txt index bcb3cb4..4f61463 100644 --- a/satrs-example/pyclient/requirements.txt +++ b/satrs-example/pyclient/requirements.txt @@ -1,2 +1,2 @@ -tmtccmd == 5.0.0rc0 +tmtccmd == 6.0.0 # -e git+https://github.com/robamu-org/tmtccmd@97e5e51101a08b21472b3ddecc2063359f7e307a#egg=tmtccmd diff --git a/satrs-example/src/main.rs b/satrs-example/src/main.rs index 4053e50..4bfdbda 100644 --- a/satrs-example/src/main.rs +++ b/satrs-example/src/main.rs @@ -6,7 +6,9 @@ mod requests; mod tmtc; use log::{info, warn}; +use satrs_core::hal::std::udp_server::UdpTcServer; +use crate::ccsds::CcsdsReceiver; use crate::hk::AcsHkIds; use crate::logging::setup_logger; use crate::pus::action::{Pus8Wrapper, PusService8ActionHandler}; @@ -14,9 +16,9 @@ use crate::pus::event::Pus5Wrapper; use crate::pus::hk::{Pus3Wrapper, PusService3HkHandler}; use crate::pus::scheduler::Pus11Wrapper; use crate::pus::test::Service17CustomWrapper; -use crate::pus::PusTcMpscRouter; +use crate::pus::{PusReceiver, PusTcMpscRouter}; use crate::requests::{Request, RequestWithToken}; -use crate::tmtc::{core_tmtc_task, PusTcSource, TcArgs, TcStore, TmArgs, TmFunnel}; +use crate::tmtc::{PusTcSource, TcArgs, TcStore, TmArgs, TmFunnel, TmtcTask, UdpTmtcServer}; use satrs_core::event_man::{ EventManagerWithMpscQueue, MpscEventReceiver, MpscEventU32SendProvider, SendEventProvider, }; @@ -43,7 +45,7 @@ use satrs_core::spacepackets::{ SpHeader, }; use satrs_core::tmtc::tm_helper::SharedTmStore; -use satrs_core::tmtc::{AddressableId, TargetId}; +use satrs_core::tmtc::{AddressableId, CcsdsDistributor, TargetId}; use satrs_core::ChannelId; use satrs_example::{ RequestTargetId, TcReceiverId, TmSenderId, OBSW_SERVER_ADDR, PUS_APID, SERVER_PORT, @@ -266,11 +268,27 @@ fn main() { ); let mut pus_3_wrapper = Pus3Wrapper { pus_3_handler }; + let ccsds_receiver = CcsdsReceiver { + tc_source: tc_args.tc_source.clone(), + }; + let mut tmtc_task = TmtcTask::new(tc_args, PusReceiver::new(verif_reporter, pus_router)); + + let ccsds_distributor = CcsdsDistributor::new(Box::new(ccsds_receiver)); + let udp_tc_server = UdpTcServer::new(sock_addr, 2048, Box::new(ccsds_distributor)) + .expect("creating UDP TMTC server failed"); + let mut udp_tmtc_server = UdpTmtcServer { + udp_tc_server, + tm_rx: tm_args.tm_server_rx, + tm_store: tm_args.tm_store.clone_backing_pool(), + }; + info!("Starting TMTC task"); let jh0 = thread::Builder::new() .name("TMTC".to_string()) .spawn(move || { - core_tmtc_task(sock_addr, tc_args, tm_args, verif_reporter, pus_router); + udp_tmtc_server.periodic_operation(); + tmtc_task.periodic_operation(); + thread::sleep(Duration::from_millis(400)); }) .unwrap(); @@ -382,6 +400,7 @@ fn main() { let mut timestamp: [u8; 7] = [0; 7]; let mut time_provider = TimeProvider::new_with_u16_days(0, 0); loop { + // TODO: Move this into a separate thread.. match acs_thread_rx.try_recv() { Ok(request) => { info!( diff --git a/satrs-example/src/tmtc.rs b/satrs-example/src/tmtc.rs index 5d2ea5e..8ab22a1 100644 --- a/satrs-example/src/tmtc.rs +++ b/satrs-example/src/tmtc.rs @@ -1,21 +1,18 @@ use log::{info, warn}; use satrs_core::hal::std::udp_server::{ReceiveResult, UdpTcServer}; +use satrs_core::pus::ReceivesEcssPusTc; +use satrs_core::spacepackets::SpHeader; use std::net::SocketAddr; use std::sync::mpsc::{Receiver, SendError, Sender, TryRecvError}; -use std::thread; -use std::time::Duration; use thiserror::Error; -use crate::ccsds::CcsdsReceiver; -use crate::pus::{PusReceiver, PusTcMpscRouter}; +use crate::pus::PusReceiver; use satrs_core::pool::{SharedPool, StoreAddr, StoreError}; -use satrs_core::pus::verification::StdVerifReporterWithSender; -use satrs_core::pus::{ReceivesEcssPusTc, TcAddrWithToken}; +use satrs_core::pus::TcAddrWithToken; use satrs_core::spacepackets::ecss::tc::PusTcReader; use satrs_core::spacepackets::ecss::PusPacket; -use satrs_core::spacepackets::SpHeader; use satrs_core::tmtc::tm_helper::SharedTmStore; -use satrs_core::tmtc::{CcsdsDistributor, CcsdsError, ReceivesCcsdsTc}; +use satrs_core::tmtc::{CcsdsError, ReceivesCcsdsTc}; pub struct TmArgs { pub tm_store: SharedTmStore, @@ -65,9 +62,9 @@ pub struct TmFunnel { } pub struct UdpTmtcServer { - udp_tc_server: UdpTcServer>, - tm_rx: Receiver, - tm_store: SharedPool, + pub udp_tc_server: UdpTcServer>, + pub tm_rx: Receiver, + pub tm_store: SharedPool, } #[derive(Clone)] @@ -98,131 +95,121 @@ impl ReceivesCcsdsTc for PusTcSource { } } -pub fn core_tmtc_task( - socket_addr: SocketAddr, - mut tc_args: TcArgs, - tm_args: TmArgs, - verif_reporter: StdVerifReporterWithSender, - pus_router: PusTcMpscRouter, -) { - let mut pus_receiver = PusReceiver::new(verif_reporter, pus_router); - - let ccsds_receiver = CcsdsReceiver { - tc_source: tc_args.tc_source.clone(), - }; - - let ccsds_distributor = CcsdsDistributor::new(Box::new(ccsds_receiver)); - - let udp_tc_server = UdpTcServer::new(socket_addr, 2048, Box::new(ccsds_distributor)) - .expect("creating UDP TMTC server failed"); - - let mut udp_tmtc_server = UdpTmtcServer { - udp_tc_server, - tm_rx: tm_args.tm_server_rx, - tm_store: tm_args.tm_store.clone_backing_pool(), - }; - - let mut tc_buf: [u8; 4096] = [0; 4096]; - loop { - core_tmtc_loop( - &mut udp_tmtc_server, - &mut tc_args, - &mut tc_buf, - &mut pus_receiver, - ); - thread::sleep(Duration::from_millis(400)); - } +pub struct TmtcTask { + tc_args: TcArgs, + tc_buf: [u8; 4096], + pus_receiver: PusReceiver, } -fn core_tmtc_loop( - udp_tmtc_server: &mut UdpTmtcServer, - tc_args: &mut TcArgs, - tc_buf: &mut [u8], - pus_receiver: &mut PusReceiver, -) { - while poll_tc_server(udp_tmtc_server) {} - match tc_args.tc_receiver.try_recv() { - Ok(addr) => { - let pool = tc_args - .tc_source - .tc_store - .pool - .read() - .expect("locking tc pool failed"); - let data = pool.read(&addr).expect("reading pool failed"); - tc_buf[0..data.len()].copy_from_slice(data); - drop(pool); - match PusTcReader::new(tc_buf) { - Ok((pus_tc, _)) => { - pus_receiver - .handle_tc_packet(addr, pus_tc.service(), &pus_tc) - .ok(); - } - Err(e) => { - warn!("error creating PUS TC from raw data: {e}"); - warn!("raw data: {tc_buf:x?}"); - } - } - } - Err(e) => { - if let TryRecvError::Disconnected = e { - warn!("tmtc thread: sender disconnected") - } +impl TmtcTask { + pub fn new(tc_args: TcArgs, pus_receiver: PusReceiver) -> Self { + Self { + tc_args, + tc_buf: [0; 4096], + pus_receiver, } } - if let Some(recv_addr) = udp_tmtc_server.udp_tc_server.last_sender() { - core_tm_handling(udp_tmtc_server, &recv_addr); - } -} -fn poll_tc_server(udp_tmtc_server: &mut UdpTmtcServer) -> bool { - match udp_tmtc_server.udp_tc_server.try_recv_tc() { - Ok(_) => true, - Err(e) => match e { - ReceiveResult::ReceiverError(e) => match e { - CcsdsError::ByteConversionError(e) => { - warn!("packet error: {e:?}"); - true + pub fn periodic_operation(&mut self) { + //while self.poll_tc() {} + self.poll_tc(); + } + + pub fn poll_tc(&mut self) -> bool { + match self.tc_args.tc_receiver.try_recv() { + Ok(addr) => { + let pool = self + .tc_args + .tc_source + .tc_store + .pool + .read() + .expect("locking tc pool failed"); + let data = pool.read(&addr).expect("reading pool failed"); + self.tc_buf[0..data.len()].copy_from_slice(data); + drop(pool); + match PusTcReader::new(&self.tc_buf) { + Ok((pus_tc, _)) => { + self.pus_receiver + .handle_tc_packet(addr, pus_tc.service(), &pus_tc) + .ok(); + true + } + Err(e) => { + warn!("error creating PUS TC from raw data: {e}"); + warn!("raw data: {:x?}", self.tc_buf); + true + } } - CcsdsError::CustomError(e) => { - warn!("mpsc store and send error {e:?}"); - true + } + Err(e) => match e { + TryRecvError::Empty => false, + TryRecvError::Disconnected => { + warn!("tmtc thread: sender disconnected"); + false } }, - ReceiveResult::IoError(e) => { - warn!("IO error {e}"); - false - } - ReceiveResult::NothingReceived => false, - }, + } } } -fn core_tm_handling(udp_tmtc_server: &mut UdpTmtcServer, recv_addr: &SocketAddr) { - while let Ok(addr) = udp_tmtc_server.tm_rx.try_recv() { - let store_lock = udp_tmtc_server.tm_store.write(); - if store_lock.is_err() { - warn!("Locking TM store failed"); - continue; +impl UdpTmtcServer { + pub fn periodic_operation(&mut self) { + while self.poll_tc_server() {} + if let Some(recv_addr) = self.udp_tc_server.last_sender() { + self.send_tm_to_udp_client(&recv_addr); } - let mut store_lock = store_lock.unwrap(); - let pg = store_lock.read_with_guard(addr); - let read_res = pg.read(); - if read_res.is_err() { - warn!("Error reading TM pool data"); - continue; + } + + fn poll_tc_server(&mut self) -> bool { + match self.udp_tc_server.try_recv_tc() { + Ok(_) => true, + Err(e) => match e { + ReceiveResult::ReceiverError(e) => match e { + CcsdsError::ByteConversionError(e) => { + warn!("packet error: {e:?}"); + true + } + CcsdsError::CustomError(e) => { + warn!("mpsc store and send error {e:?}"); + true + } + }, + ReceiveResult::IoError(e) => { + warn!("IO error {e}"); + false + } + ReceiveResult::NothingReceived => false, + }, } - let buf = read_res.unwrap(); - if buf.len() > 9 { - let service = buf[7]; - let subservice = buf[8]; - info!("Sending PUS TM[{service},{subservice}]") - } else { - info!("Sending PUS TM"); - } - let result = udp_tmtc_server.udp_tc_server.socket.send_to(buf, recv_addr); - if let Err(e) = result { - warn!("Sending TM with UDP socket failed: {e}") + } + + fn send_tm_to_udp_client(&mut self, recv_addr: &SocketAddr) { + while let Ok(addr) = self.tm_rx.try_recv() { + let store_lock = self.tm_store.write(); + if store_lock.is_err() { + warn!("Locking TM store failed"); + continue; + } + let mut store_lock = store_lock.unwrap(); + let pg = store_lock.read_with_guard(addr); + let read_res = pg.read(); + if read_res.is_err() { + warn!("Error reading TM pool data"); + continue; + } + let buf = read_res.unwrap(); + if buf.len() > 9 { + let service = buf[7]; + let subservice = buf[8]; + info!("Sending PUS TM[{service},{subservice}]") + } else { + info!("Sending PUS TM"); + } + let result = self.udp_tc_server.socket.send_to(buf, recv_addr); + if let Err(e) = result { + warn!("Sending TM with UDP socket failed: {e}") + } } } } From 8f325138ff9d1307c705bd028ffecd6c52b32638 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Wed, 27 Sep 2023 00:25:50 +0200 Subject: [PATCH 05/11] found the bug --- satrs-example/src/main.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/satrs-example/src/main.rs b/satrs-example/src/main.rs index 4bfdbda..3a8664f 100644 --- a/satrs-example/src/main.rs +++ b/satrs-example/src/main.rs @@ -282,10 +282,10 @@ fn main() { tm_store: tm_args.tm_store.clone_backing_pool(), }; - info!("Starting TMTC task"); + info!("Starting TMTC and UDP task"); let jh0 = thread::Builder::new() - .name("TMTC".to_string()) - .spawn(move || { + .name("TMTC_UDP".to_string()) + .spawn(move || loop { udp_tmtc_server.periodic_operation(); tmtc_task.periodic_operation(); thread::sleep(Duration::from_millis(400)); @@ -400,7 +400,7 @@ fn main() { let mut timestamp: [u8; 7] = [0; 7]; let mut time_provider = TimeProvider::new_with_u16_days(0, 0); loop { - // TODO: Move this into a separate thread.. + // TODO: Move this into a separate function/task/module.. match acs_thread_rx.try_recv() { Ok(request) => { info!( From 6bee0f35ff725067ff2e425a22a3ddc3350e11cb Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Wed, 27 Sep 2023 14:28:42 +0200 Subject: [PATCH 06/11] add structure overview --- satrs-example/README.md | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/satrs-example/README.md b/satrs-example/README.md index 021ae67..75a067b 100644 --- a/satrs-example/README.md +++ b/satrs-example/README.md @@ -17,7 +17,7 @@ cargo run --bin simpleclient This repository also contains a more complex client using the [Python tmtccmd](https://github.com/robamu-org/tmtccmd) module. -# Using the tmtccmd Python client +# Using the tmtccmd Python client The python client requires a valid installation of the [tmtccmd package](https://github.com/robamu-org/tmtccmd). @@ -51,3 +51,24 @@ the `simpleclient`: You can also simply call the script without any arguments to view a list of services (`-s` flag) and corresponding op codes (`-o` flag) for each service. + +# Structure of the example project + +The example project contains components which could also be expected to be part of a production +On-Board Software. + +1. A UDP server to receive telecommands and poll telemetry from. This might be an optional + component for an OBSW which is only used during the development phase on ground. +2. A PUS service stack which exposes some functionality conformant with the ECSS PUS service. This + currently includes the following services: + - Service 1 for telecommand verification. + - Service 3 for housekeeping telemetry handling. + - Service 5 for management and downlink of on-board events. + - Service 8 for handling on-board actions. + - Service 17 for test purposes (pings) +3. An event manager component which handles the event IPC mechanism. +4. A TC source component which demultiplexes and routes telecommands based on parameters like + packet APID or PUS service and subservice type. +5. A TM sink sink component which is the target of all sent telemetry and sends it to downlink + handlers like the UDP and TCP server. +6. An AOCS example task which can also process some PUS commands. From 77c06718c92907da48d35d55c19eb46cfe0c57b6 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Wed, 27 Sep 2023 14:33:24 +0200 Subject: [PATCH 07/11] README update --- satrs-example/README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/satrs-example/README.md b/satrs-example/README.md index 75a067b..5841a9d 100644 --- a/satrs-example/README.md +++ b/satrs-example/README.md @@ -65,6 +65,7 @@ On-Board Software. - Service 3 for housekeeping telemetry handling. - Service 5 for management and downlink of on-board events. - Service 8 for handling on-board actions. + - Service 11 for scheduling telecommands to be released at a specific time. - Service 17 for test purposes (pings) 3. An event manager component which handles the event IPC mechanism. 4. A TC source component which demultiplexes and routes telecommands based on parameters like From 47b794e12f4487d4c69d223b4b4b7c706c28a420 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Fri, 29 Sep 2023 12:38:57 +0200 Subject: [PATCH 08/11] smaller modules --- satrs-example/src/main.rs | 4 ++- satrs-example/src/tmtc.rs | 66 ---------------------------------- satrs-example/src/udp.rs | 76 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 79 insertions(+), 67 deletions(-) create mode 100644 satrs-example/src/udp.rs diff --git a/satrs-example/src/main.rs b/satrs-example/src/main.rs index 3a8664f..2f4c1d2 100644 --- a/satrs-example/src/main.rs +++ b/satrs-example/src/main.rs @@ -4,6 +4,7 @@ mod logging; mod pus; mod requests; mod tmtc; +mod udp; use log::{info, warn}; use satrs_core::hal::std::udp_server::UdpTcServer; @@ -18,7 +19,8 @@ use crate::pus::scheduler::Pus11Wrapper; use crate::pus::test::Service17CustomWrapper; use crate::pus::{PusReceiver, PusTcMpscRouter}; use crate::requests::{Request, RequestWithToken}; -use crate::tmtc::{PusTcSource, TcArgs, TcStore, TmArgs, TmFunnel, TmtcTask, UdpTmtcServer}; +use crate::tmtc::{PusTcSource, TcArgs, TcStore, TmArgs, TmFunnel, TmtcTask}; +use crate::udp::UdpTmtcServer; use satrs_core::event_man::{ EventManagerWithMpscQueue, MpscEventReceiver, MpscEventU32SendProvider, SendEventProvider, }; diff --git a/satrs-example/src/tmtc.rs b/satrs-example/src/tmtc.rs index 8ab22a1..9b52367 100644 --- a/satrs-example/src/tmtc.rs +++ b/satrs-example/src/tmtc.rs @@ -61,11 +61,6 @@ pub struct TmFunnel { pub tm_server_tx: Sender, } -pub struct UdpTmtcServer { - pub udp_tc_server: UdpTcServer>, - pub tm_rx: Receiver, - pub tm_store: SharedPool, -} #[derive(Clone)] pub struct PusTcSource { @@ -152,64 +147,3 @@ impl TmtcTask { } } } - -impl UdpTmtcServer { - pub fn periodic_operation(&mut self) { - while self.poll_tc_server() {} - if let Some(recv_addr) = self.udp_tc_server.last_sender() { - self.send_tm_to_udp_client(&recv_addr); - } - } - - fn poll_tc_server(&mut self) -> bool { - match self.udp_tc_server.try_recv_tc() { - Ok(_) => true, - Err(e) => match e { - ReceiveResult::ReceiverError(e) => match e { - CcsdsError::ByteConversionError(e) => { - warn!("packet error: {e:?}"); - true - } - CcsdsError::CustomError(e) => { - warn!("mpsc store and send error {e:?}"); - true - } - }, - ReceiveResult::IoError(e) => { - warn!("IO error {e}"); - false - } - ReceiveResult::NothingReceived => false, - }, - } - } - - fn send_tm_to_udp_client(&mut self, recv_addr: &SocketAddr) { - while let Ok(addr) = self.tm_rx.try_recv() { - let store_lock = self.tm_store.write(); - if store_lock.is_err() { - warn!("Locking TM store failed"); - continue; - } - let mut store_lock = store_lock.unwrap(); - let pg = store_lock.read_with_guard(addr); - let read_res = pg.read(); - if read_res.is_err() { - warn!("Error reading TM pool data"); - continue; - } - let buf = read_res.unwrap(); - if buf.len() > 9 { - let service = buf[7]; - let subservice = buf[8]; - info!("Sending PUS TM[{service},{subservice}]") - } else { - info!("Sending PUS TM"); - } - let result = self.udp_tc_server.socket.send_to(buf, recv_addr); - if let Err(e) = result { - warn!("Sending TM with UDP socket failed: {e}") - } - } - } -} diff --git a/satrs-example/src/udp.rs b/satrs-example/src/udp.rs new file mode 100644 index 0000000..e3ca9f6 --- /dev/null +++ b/satrs-example/src/udp.rs @@ -0,0 +1,76 @@ +use std::{net::SocketAddr, sync::mpsc::Receiver}; + +use log::{info, warn}; +use satrs_core::{ + hal::std::udp_server::{ReceiveResult, UdpTcServer}, + pool::{SharedPool, StoreAddr}, + tmtc::CcsdsError, +}; + +use crate::tmtc::MpscStoreAndSendError; + +pub struct UdpTmtcServer { + pub udp_tc_server: UdpTcServer>, + pub tm_rx: Receiver, + pub tm_store: SharedPool, +} +impl UdpTmtcServer { + pub fn periodic_operation(&mut self) { + while self.poll_tc_server() {} + if let Some(recv_addr) = self.udp_tc_server.last_sender() { + self.send_tm_to_udp_client(&recv_addr); + } + } + + fn poll_tc_server(&mut self) -> bool { + match self.udp_tc_server.try_recv_tc() { + Ok(_) => true, + Err(e) => match e { + ReceiveResult::ReceiverError(e) => match e { + CcsdsError::ByteConversionError(e) => { + warn!("packet error: {e:?}"); + true + } + CcsdsError::CustomError(e) => { + warn!("mpsc store and send error {e:?}"); + true + } + }, + ReceiveResult::IoError(e) => { + warn!("IO error {e}"); + false + } + ReceiveResult::NothingReceived => false, + }, + } + } + + fn send_tm_to_udp_client(&mut self, recv_addr: &SocketAddr) { + while let Ok(addr) = self.tm_rx.try_recv() { + let store_lock = self.tm_store.write(); + if store_lock.is_err() { + warn!("Locking TM store failed"); + continue; + } + let mut store_lock = store_lock.unwrap(); + let pg = store_lock.read_with_guard(addr); + let read_res = pg.read(); + if read_res.is_err() { + warn!("Error reading TM pool data"); + continue; + } + let buf = read_res.unwrap(); + if buf.len() > 9 { + let service = buf[7]; + let subservice = buf[8]; + info!("Sending PUS TM[{service},{subservice}]") + } else { + info!("Sending PUS TM"); + } + let result = self.udp_tc_server.socket.send_to(buf, recv_addr); + if let Err(e) = result { + warn!("Sending TM with UDP socket failed: {e}") + } + } + } +} From 183aca32191de19ab584cb235d16981d044ef5fb Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Fri, 29 Sep 2023 14:11:03 +0200 Subject: [PATCH 09/11] TCP support working --- satrs-core/src/encoding/ccsds.rs | 12 ++ satrs-core/src/hal/std/tcp_cobs_server.rs | 2 +- .../src/hal/std/tcp_spacepackets_server.rs | 2 +- satrs-example/Cargo.toml | 4 +- satrs-example/pyclient/common.py | 4 + satrs-example/pyclient/main.py | 4 +- satrs-example/pyclient/tmtc_conf.json | 8 +- satrs-example/src/ccsds.rs | 1 + satrs-example/src/main.rs | 54 +++++++-- satrs-example/src/tcp.rs | 110 ++++++++++++++++++ satrs-example/src/tmtc.rs | 9 +- 11 files changed, 184 insertions(+), 26 deletions(-) create mode 100644 satrs-example/src/tcp.rs diff --git a/satrs-core/src/encoding/ccsds.rs b/satrs-core/src/encoding/ccsds.rs index ec7da4c..ecd4ff5 100644 --- a/satrs-core/src/encoding/ccsds.rs +++ b/satrs-core/src/encoding/ccsds.rs @@ -30,6 +30,12 @@ impl PacketIdLookup for [u16] { } } +impl PacketIdLookup for &[u16] { + fn validate(&self, packet_id: u16) -> bool { + self.binary_search(&packet_id).is_ok() + } +} + #[cfg(feature = "alloc")] impl PacketIdLookup for Vec { fn validate(&self, packet_id: u16) -> bool { @@ -49,6 +55,12 @@ impl PacketIdLookup for [PacketId] { } } +impl PacketIdLookup for &[PacketId] { + fn validate(&self, packet_id: u16) -> bool { + self.binary_search(&PacketId::from(packet_id)).is_ok() + } +} + /// This function parses a given buffer for tightly packed CCSDS space packets. It uses the /// [PacketId] field of the CCSDS packets to detect the start of a CCSDS space packet and then /// uses the length field of the packet to extract CCSDS packets. diff --git a/satrs-core/src/hal/std/tcp_cobs_server.rs b/satrs-core/src/hal/std/tcp_cobs_server.rs index 4a22a8a..dc00c44 100644 --- a/satrs-core/src/hal/std/tcp_cobs_server.rs +++ b/satrs-core/src/hal/std/tcp_cobs_server.rs @@ -130,7 +130,7 @@ impl TcpTmtcInCobsServer { cfg: ServerConfig, tm_source: Box>, tc_receiver: Box>, - ) -> Result> { + ) -> Result { Ok(Self { generic_server: TcpTmtcGenericServer::new( cfg, diff --git a/satrs-core/src/hal/std/tcp_spacepackets_server.rs b/satrs-core/src/hal/std/tcp_spacepackets_server.rs index 42c61e9..c256578 100644 --- a/satrs-core/src/hal/std/tcp_spacepackets_server.rs +++ b/satrs-core/src/hal/std/tcp_spacepackets_server.rs @@ -113,7 +113,7 @@ impl TcpSpacepacketsServer tm_source: Box>, tc_receiver: Box>, packet_id_lookup: Box, - ) -> Result> { + ) -> Result { Ok(Self { generic_server: TcpTmtcGenericServer::new( cfg, diff --git a/satrs-example/Cargo.toml b/satrs-example/Cargo.toml index 3ca175e..3d61254 100644 --- a/satrs-example/Cargo.toml +++ b/satrs-example/Cargo.toml @@ -19,8 +19,8 @@ num_enum = "0.7" thiserror = "1" [dependencies.satrs-core] -version = "0.1.0-alpha.1" -# path = "../satrs-core" +# version = "0.1.0-alpha.1" +path = "../satrs-core" [dependencies.satrs-mib] version = "0.1.0-alpha.1" diff --git a/satrs-example/pyclient/common.py b/satrs-example/pyclient/common.py index 81df032..c2d0777 100644 --- a/satrs-example/pyclient/common.py +++ b/satrs-example/pyclient/common.py @@ -4,7 +4,11 @@ import dataclasses import enum import struct +from spacepackets.ecss.tc import PacketId, PacketType + EXAMPLE_PUS_APID = 0x02 +EXAMPLE_PUS_PACKET_ID_TM = PacketId(PacketType.TM, True, EXAMPLE_PUS_APID) +TM_PACKET_IDS = [EXAMPLE_PUS_PACKET_ID_TM] class EventSeverity(enum.IntEnum): diff --git a/satrs-example/pyclient/main.py b/satrs-example/pyclient/main.py index 39217e7..4f3c6c4 100755 --- a/satrs-example/pyclient/main.py +++ b/satrs-example/pyclient/main.py @@ -45,7 +45,7 @@ from tmtccmd.util.obj_id import ObjectIdDictT import pus_tc import tc_definitions -from common import EXAMPLE_PUS_APID, EventU32 +from common import EXAMPLE_PUS_APID, TM_PACKET_IDS, EventU32 _LOGGER = logging.getLogger() @@ -63,7 +63,7 @@ class SatRsConfigHook(HookBase): cfg = create_com_interface_cfg_default( com_if_key=com_if_key, json_cfg_path=self.cfg_path, - space_packet_ids=None, + space_packet_ids=TM_PACKET_IDS, ) return create_com_interface_default(cfg) diff --git a/satrs-example/pyclient/tmtc_conf.json b/satrs-example/pyclient/tmtc_conf.json index ab02100..f2c8afd 100644 --- a/satrs-example/pyclient/tmtc_conf.json +++ b/satrs-example/pyclient/tmtc_conf.json @@ -1,6 +1,8 @@ { - "com_if": "udp", + "com_if": "tcp", "tcpip_udp_ip_addr": "127.0.0.1", "tcpip_udp_port": 7301, - "tcpip_udp_recv_max_size": 1500 -} \ No newline at end of file + "tcpip_udp_recv_max_size": 1500, + "tcpip_tcp_ip_addr": "127.0.0.1", + "tcpip_tcp_port": 7301 +} diff --git a/satrs-example/src/ccsds.rs b/satrs-example/src/ccsds.rs index ef361f2..d0616c9 100644 --- a/satrs-example/src/ccsds.rs +++ b/satrs-example/src/ccsds.rs @@ -3,6 +3,7 @@ use satrs_core::spacepackets::{CcsdsPacket, SpHeader}; use satrs_core::tmtc::{CcsdsPacketHandler, ReceivesCcsdsTc}; use satrs_example::PUS_APID; +#[derive(Clone)] pub struct CcsdsReceiver { pub tc_source: PusTcSource, } diff --git a/satrs-example/src/main.rs b/satrs-example/src/main.rs index 2f4c1d2..c537ab2 100644 --- a/satrs-example/src/main.rs +++ b/satrs-example/src/main.rs @@ -3,10 +3,12 @@ mod hk; mod logging; mod pus; mod requests; +mod tcp; mod tmtc; mod udp; use log::{info, warn}; +use satrs_core::hal::std::tcp_server::ServerConfig; use satrs_core::hal::std::udp_server::UdpTcServer; use crate::ccsds::CcsdsReceiver; @@ -19,6 +21,7 @@ use crate::pus::scheduler::Pus11Wrapper; use crate::pus::test::Service17CustomWrapper; use crate::pus::{PusReceiver, PusTcMpscRouter}; use crate::requests::{Request, RequestWithToken}; +use crate::tcp::{SyncTcpTmSource, TcpTask}; use crate::tmtc::{PusTcSource, TcArgs, TcStore, TmArgs, TmFunnel, TmtcTask}; use crate::udp::UdpTmtcServer; use satrs_core::event_man::{ @@ -143,7 +146,7 @@ fn main() { let tm_args = TmArgs { tm_store: shared_tm_store.clone(), tm_sink_sender: tm_funnel_tx.clone(), - tm_server_rx, + tm_udp_server_rx: tm_server_rx, }; let aocs_tm_funnel = tm_funnel_tx.clone(); @@ -275,22 +278,45 @@ fn main() { }; let mut tmtc_task = TmtcTask::new(tc_args, PusReceiver::new(verif_reporter, pus_router)); - let ccsds_distributor = CcsdsDistributor::new(Box::new(ccsds_receiver)); - let udp_tc_server = UdpTcServer::new(sock_addr, 2048, Box::new(ccsds_distributor)) + let udp_ccsds_distributor = CcsdsDistributor::new(Box::new(ccsds_receiver.clone())); + let udp_tc_server = UdpTcServer::new(sock_addr, 2048, Box::new(udp_ccsds_distributor)) .expect("creating UDP TMTC server failed"); let mut udp_tmtc_server = UdpTmtcServer { udp_tc_server, - tm_rx: tm_args.tm_server_rx, + tm_rx: tm_args.tm_udp_server_rx, tm_store: tm_args.tm_store.clone_backing_pool(), }; info!("Starting TMTC and UDP task"); - let jh0 = thread::Builder::new() - .name("TMTC_UDP".to_string()) - .spawn(move || loop { - udp_tmtc_server.periodic_operation(); - tmtc_task.periodic_operation(); - thread::sleep(Duration::from_millis(400)); + let jh_udp_tmtc = thread::Builder::new() + .name("TMTC and UDP".to_string()) + .spawn(move || { + info!("Running UDP server on port {SERVER_PORT}"); + loop { + udp_tmtc_server.periodic_operation(); + tmtc_task.periodic_operation(); + thread::sleep(Duration::from_millis(400)); + } + }) + .unwrap(); + + let tcp_ccsds_distributor = CcsdsDistributor::new(Box::new(ccsds_receiver)); + let tcp_server_cfg = ServerConfig::new(sock_addr, Duration::from_millis(400), 4096, 8192); + let mut sync_tm_tcp_source = SyncTcpTmSource::new(200); + let mut tcp_server = TcpTask::new( + tcp_server_cfg, + sync_tm_tcp_source.clone(), + tcp_ccsds_distributor, + ) + .expect("tcp server creation failed"); + info!("Starting TCP task"); + let jh_tcp = thread::Builder::new() + .name("TCP".to_string()) + .spawn(move || { + info!("Running TCP server on port {SERVER_PORT}"); + loop { + tcp_server.periodic_operation(); + } }) .unwrap(); @@ -331,6 +357,7 @@ fn main() { .tm_server_tx .send(addr) .expect("Sending TM to server failed"); + sync_tm_tcp_source.add_tm(tm_raw); } } }) @@ -502,7 +529,12 @@ fn main() { thread::sleep(Duration::from_millis(200)); }) .unwrap(); - jh0.join().expect("Joining UDP TMTC server thread failed"); + jh_udp_tmtc + .join() + .expect("Joining UDP TMTC server thread failed"); + jh_tcp + .join() + .expect("Joining TCP TMTC server thread failed"); jh1.join().expect("Joining TM Funnel thread failed"); jh2.join().expect("Joining Event Manager thread failed"); jh3.join().expect("Joining AOCS thread failed"); diff --git a/satrs-example/src/tcp.rs b/satrs-example/src/tcp.rs new file mode 100644 index 0000000..a25804a --- /dev/null +++ b/satrs-example/src/tcp.rs @@ -0,0 +1,110 @@ +use std::{ + collections::VecDeque, + sync::{Arc, Mutex}, +}; + +use log::{info, warn}; +use satrs_core::{ + hal::std::tcp_server::{ServerConfig, TcpSpacepacketsServer}, + spacepackets::PacketId, + tmtc::{CcsdsDistributor, CcsdsError, TmPacketSourceCore}, +}; +use satrs_example::PUS_APID; + +use crate::tmtc::MpscStoreAndSendError; + +pub const PACKET_ID_LOOKUP: &[PacketId] = &[PacketId::const_tc(true, PUS_APID)]; + +#[derive(Default, Clone)] +pub struct SyncTcpTmSource { + tm_queue: Arc>>>, + max_packets_stored: usize, + pub silent_packet_overwrite: bool, +} + +impl SyncTcpTmSource { + pub fn new(max_packets_stored: usize) -> Self { + Self { + tm_queue: Arc::default(), + max_packets_stored, + silent_packet_overwrite: true, + } + } + + pub fn add_tm(&mut self, tm: &[u8]) { + let mut tm_queue = self.tm_queue.lock().expect("locking tm queue failec"); + if tm_queue.len() > self.max_packets_stored { + if !self.silent_packet_overwrite { + warn!("TPC TM source is full, deleting oldest packet"); + } + tm_queue.pop_front(); + } + tm_queue.push_back(tm.to_vec()); + } +} + +impl TmPacketSourceCore for SyncTcpTmSource { + type Error = (); + + fn retrieve_packet(&mut self, buffer: &mut [u8]) -> Result { + let mut tm_queue = self.tm_queue.lock().expect("locking tm queue failed"); + if !tm_queue.is_empty() { + let next_vec = tm_queue.front().unwrap(); + if buffer.len() < next_vec.len() { + panic!( + "provided buffer too small, must be at least {} bytes", + next_vec.len() + ); + } + let next_vec = tm_queue.pop_front().unwrap(); + buffer[0..next_vec.len()].copy_from_slice(&next_vec); + if next_vec.len() > 9 { + let service = next_vec[7]; + let subservice = next_vec[8]; + info!("Sending PUS TM[{service},{subservice}]") + } else { + info!("Sending PUS TM"); + } + return Ok(next_vec.len()); + } + Ok(0) + } +} + +pub struct TcpTask { + server: TcpSpacepacketsServer<(), CcsdsError>, +} + +impl TcpTask { + pub fn new( + cfg: ServerConfig, + tm_source: SyncTcpTmSource, + tc_receiver: CcsdsDistributor, + ) -> Result { + Ok(Self { + server: TcpSpacepacketsServer::new( + cfg, + Box::new(tm_source), + Box::new(tc_receiver), + Box::new(PACKET_ID_LOOKUP), + )?, + }) + } + + pub fn periodic_operation(&mut self) { + loop { + let result = self.server.handle_next_connection(); + match result { + Ok(conn_result) => { + info!( + "Served {} TMs and {} TCs for client {:?}", + conn_result.num_sent_tms, conn_result.num_received_tcs, conn_result.addr + ); + } + Err(e) => { + warn!("TCP server error: {e:?}"); + } + } + } + } +} diff --git a/satrs-example/src/tmtc.rs b/satrs-example/src/tmtc.rs index 9b52367..8af701a 100644 --- a/satrs-example/src/tmtc.rs +++ b/satrs-example/src/tmtc.rs @@ -1,8 +1,6 @@ -use log::{info, warn}; -use satrs_core::hal::std::udp_server::{ReceiveResult, UdpTcServer}; +use log::warn; use satrs_core::pus::ReceivesEcssPusTc; use satrs_core::spacepackets::SpHeader; -use std::net::SocketAddr; use std::sync::mpsc::{Receiver, SendError, Sender, TryRecvError}; use thiserror::Error; @@ -12,12 +10,12 @@ use satrs_core::pus::TcAddrWithToken; use satrs_core::spacepackets::ecss::tc::PusTcReader; use satrs_core::spacepackets::ecss::PusPacket; use satrs_core::tmtc::tm_helper::SharedTmStore; -use satrs_core::tmtc::{CcsdsError, ReceivesCcsdsTc}; +use satrs_core::tmtc::ReceivesCcsdsTc; pub struct TmArgs { pub tm_store: SharedTmStore, pub tm_sink_sender: Sender, - pub tm_server_rx: Receiver, + pub tm_udp_server_rx: Receiver, } pub struct TcArgs { @@ -61,7 +59,6 @@ pub struct TmFunnel { pub tm_server_tx: Sender, } - #[derive(Clone)] pub struct PusTcSource { pub tc_source: Sender, From 7cfa4f978516fa02757c8893374180166900bc5a Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Fri, 29 Sep 2023 14:13:22 +0200 Subject: [PATCH 10/11] extend README --- satrs-example/README.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/satrs-example/README.md b/satrs-example/README.md index 5841a9d..da5551f 100644 --- a/satrs-example/README.md +++ b/satrs-example/README.md @@ -57,8 +57,9 @@ and corresponding op codes (`-o` flag) for each service. The example project contains components which could also be expected to be part of a production On-Board Software. -1. A UDP server to receive telecommands and poll telemetry from. This might be an optional - component for an OBSW which is only used during the development phase on ground. +1. A UDP and TCP server to receive telecommands and poll telemetry from. This might be an optional + component for an OBSW which is only used during the development phase on ground. The TCP + server parses space packets by using the CCSDS space packet ID as the packet start delimiter. 2. A PUS service stack which exposes some functionality conformant with the ECSS PUS service. This currently includes the following services: - Service 1 for telecommand verification. From 40bf53d261fa1ce7d1d2fab598b85ba393982d07 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Fri, 29 Sep 2023 14:17:25 +0200 Subject: [PATCH 11/11] extend introduction --- satrs-book/src/introduction.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/satrs-book/src/introduction.md b/satrs-book/src/introduction.md index 31a0b0c..f448441 100644 --- a/satrs-book/src/introduction.md +++ b/satrs-book/src/introduction.md @@ -21,3 +21,9 @@ A lot of the architecture and general design considerations are based on the through the 2 missions [FLP](https://www.irs.uni-stuttgart.de/en/research/satellitetechnology-and-instruments/smallsatelliteprogram/flying-laptop/) and [EIVE](https://www.irs.uni-stuttgart.de/en/research/satellitetechnology-and-instruments/smallsatelliteprogram/EIVE/). +# Getting started with the example + +The [`satrs-example`](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-example) +provides various practical usage examples of the `sat-rs` framework. If you are more interested in +the practical application of `sat-rs` inside an application, it is recommended to have a look at +the example application.