diff --git a/satrs-book/src/introduction.md b/satrs-book/src/introduction.md index 31a0b0c..f448441 100644 --- a/satrs-book/src/introduction.md +++ b/satrs-book/src/introduction.md @@ -21,3 +21,9 @@ A lot of the architecture and general design considerations are based on the through the 2 missions [FLP](https://www.irs.uni-stuttgart.de/en/research/satellitetechnology-and-instruments/smallsatelliteprogram/flying-laptop/) and [EIVE](https://www.irs.uni-stuttgart.de/en/research/satellitetechnology-and-instruments/smallsatelliteprogram/EIVE/). +# Getting started with the example + +The [`satrs-example`](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-example) +provides various practical usage examples of the `sat-rs` framework. If you are more interested in +the practical application of `sat-rs` inside an application, it is recommended to have a look at +the example application. diff --git a/satrs-core/src/encoding/ccsds.rs b/satrs-core/src/encoding/ccsds.rs index ec7da4c..ecd4ff5 100644 --- a/satrs-core/src/encoding/ccsds.rs +++ b/satrs-core/src/encoding/ccsds.rs @@ -30,6 +30,12 @@ impl PacketIdLookup for [u16] { } } +impl PacketIdLookup for &[u16] { + fn validate(&self, packet_id: u16) -> bool { + self.binary_search(&packet_id).is_ok() + } +} + #[cfg(feature = "alloc")] impl PacketIdLookup for Vec { fn validate(&self, packet_id: u16) -> bool { @@ -49,6 +55,12 @@ impl PacketIdLookup for [PacketId] { } } +impl PacketIdLookup for &[PacketId] { + fn validate(&self, packet_id: u16) -> bool { + self.binary_search(&PacketId::from(packet_id)).is_ok() + } +} + /// This function parses a given buffer for tightly packed CCSDS space packets. It uses the /// [PacketId] field of the CCSDS packets to detect the start of a CCSDS space packet and then /// uses the length field of the packet to extract CCSDS packets. diff --git a/satrs-core/src/hal/std/tcp_cobs_server.rs b/satrs-core/src/hal/std/tcp_cobs_server.rs index 0f43710..2d14589 100644 --- a/satrs-core/src/hal/std/tcp_cobs_server.rs +++ b/satrs-core/src/hal/std/tcp_cobs_server.rs @@ -140,7 +140,7 @@ impl< cfg: ServerConfig, tm_source: TmSource, tc_receiver: TcReceiver, - ) -> Result> { + ) -> Result { Ok(Self { generic_server: TcpTmtcGenericServer::new( cfg, diff --git a/satrs-core/src/hal/std/tcp_spacepackets_server.rs b/satrs-core/src/hal/std/tcp_spacepackets_server.rs index 23c8466..c124a47 100644 --- a/satrs-core/src/hal/std/tcp_spacepackets_server.rs +++ b/satrs-core/src/hal/std/tcp_spacepackets_server.rs @@ -128,7 +128,7 @@ impl< tm_source: TmSource, tc_receiver: TcReceiver, packet_id_lookup: Box, - ) -> Result> { + ) -> Result { Ok(Self { generic_server: TcpTmtcGenericServer::new( cfg, diff --git a/satrs-example/Cargo.toml b/satrs-example/Cargo.toml index 6e2bc16..3d61254 100644 --- a/satrs-example/Cargo.toml +++ b/satrs-example/Cargo.toml @@ -19,9 +19,9 @@ num_enum = "0.7" thiserror = "1" [dependencies.satrs-core] -# version = "0.1.0-alpha.0" +# version = "0.1.0-alpha.1" path = "../satrs-core" - [dependencies.satrs-mib] -path = "../satrs-mib" +version = "0.1.0-alpha.1" +# path = "../satrs-mib" diff --git a/satrs-example/README.md b/satrs-example/README.md index 021ae67..da5551f 100644 --- a/satrs-example/README.md +++ b/satrs-example/README.md @@ -17,7 +17,7 @@ cargo run --bin simpleclient This repository also contains a more complex client using the [Python tmtccmd](https://github.com/robamu-org/tmtccmd) module. -# Using the tmtccmd Python client +# Using the tmtccmd Python client The python client requires a valid installation of the [tmtccmd package](https://github.com/robamu-org/tmtccmd). @@ -51,3 +51,26 @@ the `simpleclient`: You can also simply call the script without any arguments to view a list of services (`-s` flag) and corresponding op codes (`-o` flag) for each service. + +# Structure of the example project + +The example project contains components which could also be expected to be part of a production +On-Board Software. + +1. A UDP and TCP server to receive telecommands and poll telemetry from. This might be an optional + component for an OBSW which is only used during the development phase on ground. The TCP + server parses space packets by using the CCSDS space packet ID as the packet start delimiter. +2. A PUS service stack which exposes some functionality conformant with the ECSS PUS service. This + currently includes the following services: + - Service 1 for telecommand verification. + - Service 3 for housekeeping telemetry handling. + - Service 5 for management and downlink of on-board events. + - Service 8 for handling on-board actions. + - Service 11 for scheduling telecommands to be released at a specific time. + - Service 17 for test purposes (pings) +3. An event manager component which handles the event IPC mechanism. +4. A TC source component which demultiplexes and routes telecommands based on parameters like + packet APID or PUS service and subservice type. +5. A TM sink sink component which is the target of all sent telemetry and sends it to downlink + handlers like the UDP and TCP server. +6. An AOCS example task which can also process some PUS commands. diff --git a/satrs-example/pyclient/common.py b/satrs-example/pyclient/common.py index 81df032..c2d0777 100644 --- a/satrs-example/pyclient/common.py +++ b/satrs-example/pyclient/common.py @@ -4,7 +4,11 @@ import dataclasses import enum import struct +from spacepackets.ecss.tc import PacketId, PacketType + EXAMPLE_PUS_APID = 0x02 +EXAMPLE_PUS_PACKET_ID_TM = PacketId(PacketType.TM, True, EXAMPLE_PUS_APID) +TM_PACKET_IDS = [EXAMPLE_PUS_PACKET_ID_TM] class EventSeverity(enum.IntEnum): diff --git a/satrs-example/pyclient/main.py b/satrs-example/pyclient/main.py index 39217e7..4f3c6c4 100755 --- a/satrs-example/pyclient/main.py +++ b/satrs-example/pyclient/main.py @@ -45,7 +45,7 @@ from tmtccmd.util.obj_id import ObjectIdDictT import pus_tc import tc_definitions -from common import EXAMPLE_PUS_APID, EventU32 +from common import EXAMPLE_PUS_APID, TM_PACKET_IDS, EventU32 _LOGGER = logging.getLogger() @@ -63,7 +63,7 @@ class SatRsConfigHook(HookBase): cfg = create_com_interface_cfg_default( com_if_key=com_if_key, json_cfg_path=self.cfg_path, - space_packet_ids=None, + space_packet_ids=TM_PACKET_IDS, ) return create_com_interface_default(cfg) diff --git a/satrs-example/pyclient/requirements.txt b/satrs-example/pyclient/requirements.txt index bcb3cb4..4f61463 100644 --- a/satrs-example/pyclient/requirements.txt +++ b/satrs-example/pyclient/requirements.txt @@ -1,2 +1,2 @@ -tmtccmd == 5.0.0rc0 +tmtccmd == 6.0.0 # -e git+https://github.com/robamu-org/tmtccmd@97e5e51101a08b21472b3ddecc2063359f7e307a#egg=tmtccmd diff --git a/satrs-example/pyclient/tmtc_conf.json b/satrs-example/pyclient/tmtc_conf.json index ab02100..f2c8afd 100644 --- a/satrs-example/pyclient/tmtc_conf.json +++ b/satrs-example/pyclient/tmtc_conf.json @@ -1,6 +1,8 @@ { - "com_if": "udp", + "com_if": "tcp", "tcpip_udp_ip_addr": "127.0.0.1", "tcpip_udp_port": 7301, - "tcpip_udp_recv_max_size": 1500 -} \ No newline at end of file + "tcpip_udp_recv_max_size": 1500, + "tcpip_tcp_ip_addr": "127.0.0.1", + "tcpip_tcp_port": 7301 +} diff --git a/satrs-example/src/ccsds.rs b/satrs-example/src/ccsds.rs index ef361f2..d0616c9 100644 --- a/satrs-example/src/ccsds.rs +++ b/satrs-example/src/ccsds.rs @@ -3,6 +3,7 @@ use satrs_core::spacepackets::{CcsdsPacket, SpHeader}; use satrs_core::tmtc::{CcsdsPacketHandler, ReceivesCcsdsTc}; use satrs_example::PUS_APID; +#[derive(Clone)] pub struct CcsdsReceiver { pub tc_source: PusTcSource, } diff --git a/satrs-example/src/main.rs b/satrs-example/src/main.rs index 4053e50..c537ab2 100644 --- a/satrs-example/src/main.rs +++ b/satrs-example/src/main.rs @@ -3,10 +3,15 @@ mod hk; mod logging; mod pus; mod requests; +mod tcp; mod tmtc; +mod udp; use log::{info, warn}; +use satrs_core::hal::std::tcp_server::ServerConfig; +use satrs_core::hal::std::udp_server::UdpTcServer; +use crate::ccsds::CcsdsReceiver; use crate::hk::AcsHkIds; use crate::logging::setup_logger; use crate::pus::action::{Pus8Wrapper, PusService8ActionHandler}; @@ -14,9 +19,11 @@ use crate::pus::event::Pus5Wrapper; use crate::pus::hk::{Pus3Wrapper, PusService3HkHandler}; use crate::pus::scheduler::Pus11Wrapper; use crate::pus::test::Service17CustomWrapper; -use crate::pus::PusTcMpscRouter; +use crate::pus::{PusReceiver, PusTcMpscRouter}; use crate::requests::{Request, RequestWithToken}; -use crate::tmtc::{core_tmtc_task, PusTcSource, TcArgs, TcStore, TmArgs, TmFunnel}; +use crate::tcp::{SyncTcpTmSource, TcpTask}; +use crate::tmtc::{PusTcSource, TcArgs, TcStore, TmArgs, TmFunnel, TmtcTask}; +use crate::udp::UdpTmtcServer; use satrs_core::event_man::{ EventManagerWithMpscQueue, MpscEventReceiver, MpscEventU32SendProvider, SendEventProvider, }; @@ -43,7 +50,7 @@ use satrs_core::spacepackets::{ SpHeader, }; use satrs_core::tmtc::tm_helper::SharedTmStore; -use satrs_core::tmtc::{AddressableId, TargetId}; +use satrs_core::tmtc::{AddressableId, CcsdsDistributor, TargetId}; use satrs_core::ChannelId; use satrs_example::{ RequestTargetId, TcReceiverId, TmSenderId, OBSW_SERVER_ADDR, PUS_APID, SERVER_PORT, @@ -139,7 +146,7 @@ fn main() { let tm_args = TmArgs { tm_store: shared_tm_store.clone(), tm_sink_sender: tm_funnel_tx.clone(), - tm_server_rx, + tm_udp_server_rx: tm_server_rx, }; let aocs_tm_funnel = tm_funnel_tx.clone(); @@ -266,11 +273,50 @@ fn main() { ); let mut pus_3_wrapper = Pus3Wrapper { pus_3_handler }; - info!("Starting TMTC task"); - let jh0 = thread::Builder::new() - .name("TMTC".to_string()) + let ccsds_receiver = CcsdsReceiver { + tc_source: tc_args.tc_source.clone(), + }; + let mut tmtc_task = TmtcTask::new(tc_args, PusReceiver::new(verif_reporter, pus_router)); + + let udp_ccsds_distributor = CcsdsDistributor::new(Box::new(ccsds_receiver.clone())); + let udp_tc_server = UdpTcServer::new(sock_addr, 2048, Box::new(udp_ccsds_distributor)) + .expect("creating UDP TMTC server failed"); + let mut udp_tmtc_server = UdpTmtcServer { + udp_tc_server, + tm_rx: tm_args.tm_udp_server_rx, + tm_store: tm_args.tm_store.clone_backing_pool(), + }; + + info!("Starting TMTC and UDP task"); + let jh_udp_tmtc = thread::Builder::new() + .name("TMTC and UDP".to_string()) .spawn(move || { - core_tmtc_task(sock_addr, tc_args, tm_args, verif_reporter, pus_router); + info!("Running UDP server on port {SERVER_PORT}"); + loop { + udp_tmtc_server.periodic_operation(); + tmtc_task.periodic_operation(); + thread::sleep(Duration::from_millis(400)); + } + }) + .unwrap(); + + let tcp_ccsds_distributor = CcsdsDistributor::new(Box::new(ccsds_receiver)); + let tcp_server_cfg = ServerConfig::new(sock_addr, Duration::from_millis(400), 4096, 8192); + let mut sync_tm_tcp_source = SyncTcpTmSource::new(200); + let mut tcp_server = TcpTask::new( + tcp_server_cfg, + sync_tm_tcp_source.clone(), + tcp_ccsds_distributor, + ) + .expect("tcp server creation failed"); + info!("Starting TCP task"); + let jh_tcp = thread::Builder::new() + .name("TCP".to_string()) + .spawn(move || { + info!("Running TCP server on port {SERVER_PORT}"); + loop { + tcp_server.periodic_operation(); + } }) .unwrap(); @@ -311,6 +357,7 @@ fn main() { .tm_server_tx .send(addr) .expect("Sending TM to server failed"); + sync_tm_tcp_source.add_tm(tm_raw); } } }) @@ -382,6 +429,7 @@ fn main() { let mut timestamp: [u8; 7] = [0; 7]; let mut time_provider = TimeProvider::new_with_u16_days(0, 0); loop { + // TODO: Move this into a separate function/task/module.. match acs_thread_rx.try_recv() { Ok(request) => { info!( @@ -481,7 +529,12 @@ fn main() { thread::sleep(Duration::from_millis(200)); }) .unwrap(); - jh0.join().expect("Joining UDP TMTC server thread failed"); + jh_udp_tmtc + .join() + .expect("Joining UDP TMTC server thread failed"); + jh_tcp + .join() + .expect("Joining TCP TMTC server thread failed"); jh1.join().expect("Joining TM Funnel thread failed"); jh2.join().expect("Joining Event Manager thread failed"); jh3.join().expect("Joining AOCS thread failed"); diff --git a/satrs-example/src/tcp.rs b/satrs-example/src/tcp.rs new file mode 100644 index 0000000..8b7feff --- /dev/null +++ b/satrs-example/src/tcp.rs @@ -0,0 +1,115 @@ +use std::{ + collections::VecDeque, + sync::{Arc, Mutex}, +}; + +use log::{info, warn}; +use satrs_core::{ + hal::std::tcp_server::{ServerConfig, TcpSpacepacketsServer}, + spacepackets::PacketId, + tmtc::{CcsdsDistributor, CcsdsError, TmPacketSourceCore}, +}; +use satrs_example::PUS_APID; + +use crate::tmtc::MpscStoreAndSendError; + +pub const PACKET_ID_LOOKUP: &[PacketId] = &[PacketId::const_tc(true, PUS_APID)]; + +#[derive(Default, Clone)] +pub struct SyncTcpTmSource { + tm_queue: Arc>>>, + max_packets_stored: usize, + pub silent_packet_overwrite: bool, +} + +impl SyncTcpTmSource { + pub fn new(max_packets_stored: usize) -> Self { + Self { + tm_queue: Arc::default(), + max_packets_stored, + silent_packet_overwrite: true, + } + } + + pub fn add_tm(&mut self, tm: &[u8]) { + let mut tm_queue = self.tm_queue.lock().expect("locking tm queue failec"); + if tm_queue.len() > self.max_packets_stored { + if !self.silent_packet_overwrite { + warn!("TPC TM source is full, deleting oldest packet"); + } + tm_queue.pop_front(); + } + tm_queue.push_back(tm.to_vec()); + } +} + +impl TmPacketSourceCore for SyncTcpTmSource { + type Error = (); + + fn retrieve_packet(&mut self, buffer: &mut [u8]) -> Result { + let mut tm_queue = self.tm_queue.lock().expect("locking tm queue failed"); + if !tm_queue.is_empty() { + let next_vec = tm_queue.front().unwrap(); + if buffer.len() < next_vec.len() { + panic!( + "provided buffer too small, must be at least {} bytes", + next_vec.len() + ); + } + let next_vec = tm_queue.pop_front().unwrap(); + buffer[0..next_vec.len()].copy_from_slice(&next_vec); + if next_vec.len() > 9 { + let service = next_vec[7]; + let subservice = next_vec[8]; + info!("Sending PUS TM[{service},{subservice}]") + } else { + info!("Sending PUS TM"); + } + return Ok(next_vec.len()); + } + Ok(0) + } +} + +pub struct TcpTask { + server: TcpSpacepacketsServer< + (), + CcsdsError, + SyncTcpTmSource, + CcsdsDistributor, + >, +} + +impl TcpTask { + pub fn new( + cfg: ServerConfig, + tm_source: SyncTcpTmSource, + tc_receiver: CcsdsDistributor, + ) -> Result { + Ok(Self { + server: TcpSpacepacketsServer::new( + cfg, + tm_source, + tc_receiver, + Box::new(PACKET_ID_LOOKUP), + )?, + }) + } + + pub fn periodic_operation(&mut self) { + loop { + let result = self.server.handle_next_connection(); + match result { + Ok(conn_result) => { + info!( + "Served {} TMs and {} TCs for client {:?}", + conn_result.num_sent_tms, conn_result.num_received_tcs, conn_result.addr + ); + } + Err(e) => { + warn!("TCP server error: {e:?}"); + } + } + } + } +} diff --git a/satrs-example/src/tmtc.rs b/satrs-example/src/tmtc.rs index 5d2ea5e..8af701a 100644 --- a/satrs-example/src/tmtc.rs +++ b/satrs-example/src/tmtc.rs @@ -1,26 +1,21 @@ -use log::{info, warn}; -use satrs_core::hal::std::udp_server::{ReceiveResult, UdpTcServer}; -use std::net::SocketAddr; +use log::warn; +use satrs_core::pus::ReceivesEcssPusTc; +use satrs_core::spacepackets::SpHeader; use std::sync::mpsc::{Receiver, SendError, Sender, TryRecvError}; -use std::thread; -use std::time::Duration; use thiserror::Error; -use crate::ccsds::CcsdsReceiver; -use crate::pus::{PusReceiver, PusTcMpscRouter}; +use crate::pus::PusReceiver; use satrs_core::pool::{SharedPool, StoreAddr, StoreError}; -use satrs_core::pus::verification::StdVerifReporterWithSender; -use satrs_core::pus::{ReceivesEcssPusTc, TcAddrWithToken}; +use satrs_core::pus::TcAddrWithToken; use satrs_core::spacepackets::ecss::tc::PusTcReader; use satrs_core::spacepackets::ecss::PusPacket; -use satrs_core::spacepackets::SpHeader; use satrs_core::tmtc::tm_helper::SharedTmStore; -use satrs_core::tmtc::{CcsdsDistributor, CcsdsError, ReceivesCcsdsTc}; +use satrs_core::tmtc::ReceivesCcsdsTc; pub struct TmArgs { pub tm_store: SharedTmStore, pub tm_sink_sender: Sender, - pub tm_server_rx: Receiver, + pub tm_udp_server_rx: Receiver, } pub struct TcArgs { @@ -64,12 +59,6 @@ pub struct TmFunnel { pub tm_server_tx: Sender, } -pub struct UdpTmtcServer { - udp_tc_server: UdpTcServer>, - tm_rx: Receiver, - tm_store: SharedPool, -} - #[derive(Clone)] pub struct PusTcSource { pub tc_source: Sender, @@ -98,131 +87,60 @@ impl ReceivesCcsdsTc for PusTcSource { } } -pub fn core_tmtc_task( - socket_addr: SocketAddr, - mut tc_args: TcArgs, - tm_args: TmArgs, - verif_reporter: StdVerifReporterWithSender, - pus_router: PusTcMpscRouter, -) { - let mut pus_receiver = PusReceiver::new(verif_reporter, pus_router); - - let ccsds_receiver = CcsdsReceiver { - tc_source: tc_args.tc_source.clone(), - }; - - let ccsds_distributor = CcsdsDistributor::new(Box::new(ccsds_receiver)); - - let udp_tc_server = UdpTcServer::new(socket_addr, 2048, Box::new(ccsds_distributor)) - .expect("creating UDP TMTC server failed"); - - let mut udp_tmtc_server = UdpTmtcServer { - udp_tc_server, - tm_rx: tm_args.tm_server_rx, - tm_store: tm_args.tm_store.clone_backing_pool(), - }; - - let mut tc_buf: [u8; 4096] = [0; 4096]; - loop { - core_tmtc_loop( - &mut udp_tmtc_server, - &mut tc_args, - &mut tc_buf, - &mut pus_receiver, - ); - thread::sleep(Duration::from_millis(400)); - } +pub struct TmtcTask { + tc_args: TcArgs, + tc_buf: [u8; 4096], + pus_receiver: PusReceiver, } -fn core_tmtc_loop( - udp_tmtc_server: &mut UdpTmtcServer, - tc_args: &mut TcArgs, - tc_buf: &mut [u8], - pus_receiver: &mut PusReceiver, -) { - while poll_tc_server(udp_tmtc_server) {} - match tc_args.tc_receiver.try_recv() { - Ok(addr) => { - let pool = tc_args - .tc_source - .tc_store - .pool - .read() - .expect("locking tc pool failed"); - let data = pool.read(&addr).expect("reading pool failed"); - tc_buf[0..data.len()].copy_from_slice(data); - drop(pool); - match PusTcReader::new(tc_buf) { - Ok((pus_tc, _)) => { - pus_receiver - .handle_tc_packet(addr, pus_tc.service(), &pus_tc) - .ok(); - } - Err(e) => { - warn!("error creating PUS TC from raw data: {e}"); - warn!("raw data: {tc_buf:x?}"); - } - } - } - Err(e) => { - if let TryRecvError::Disconnected = e { - warn!("tmtc thread: sender disconnected") - } +impl TmtcTask { + pub fn new(tc_args: TcArgs, pus_receiver: PusReceiver) -> Self { + Self { + tc_args, + tc_buf: [0; 4096], + pus_receiver, } } - if let Some(recv_addr) = udp_tmtc_server.udp_tc_server.last_sender() { - core_tm_handling(udp_tmtc_server, &recv_addr); - } -} -fn poll_tc_server(udp_tmtc_server: &mut UdpTmtcServer) -> bool { - match udp_tmtc_server.udp_tc_server.try_recv_tc() { - Ok(_) => true, - Err(e) => match e { - ReceiveResult::ReceiverError(e) => match e { - CcsdsError::ByteConversionError(e) => { - warn!("packet error: {e:?}"); - true + pub fn periodic_operation(&mut self) { + //while self.poll_tc() {} + self.poll_tc(); + } + + pub fn poll_tc(&mut self) -> bool { + match self.tc_args.tc_receiver.try_recv() { + Ok(addr) => { + let pool = self + .tc_args + .tc_source + .tc_store + .pool + .read() + .expect("locking tc pool failed"); + let data = pool.read(&addr).expect("reading pool failed"); + self.tc_buf[0..data.len()].copy_from_slice(data); + drop(pool); + match PusTcReader::new(&self.tc_buf) { + Ok((pus_tc, _)) => { + self.pus_receiver + .handle_tc_packet(addr, pus_tc.service(), &pus_tc) + .ok(); + true + } + Err(e) => { + warn!("error creating PUS TC from raw data: {e}"); + warn!("raw data: {:x?}", self.tc_buf); + true + } } - CcsdsError::CustomError(e) => { - warn!("mpsc store and send error {e:?}"); - true + } + Err(e) => match e { + TryRecvError::Empty => false, + TryRecvError::Disconnected => { + warn!("tmtc thread: sender disconnected"); + false } }, - ReceiveResult::IoError(e) => { - warn!("IO error {e}"); - false - } - ReceiveResult::NothingReceived => false, - }, - } -} - -fn core_tm_handling(udp_tmtc_server: &mut UdpTmtcServer, recv_addr: &SocketAddr) { - while let Ok(addr) = udp_tmtc_server.tm_rx.try_recv() { - let store_lock = udp_tmtc_server.tm_store.write(); - if store_lock.is_err() { - warn!("Locking TM store failed"); - continue; - } - let mut store_lock = store_lock.unwrap(); - let pg = store_lock.read_with_guard(addr); - let read_res = pg.read(); - if read_res.is_err() { - warn!("Error reading TM pool data"); - continue; - } - let buf = read_res.unwrap(); - if buf.len() > 9 { - let service = buf[7]; - let subservice = buf[8]; - info!("Sending PUS TM[{service},{subservice}]") - } else { - info!("Sending PUS TM"); - } - let result = udp_tmtc_server.udp_tc_server.socket.send_to(buf, recv_addr); - if let Err(e) = result { - warn!("Sending TM with UDP socket failed: {e}") } } } diff --git a/satrs-example/src/udp.rs b/satrs-example/src/udp.rs new file mode 100644 index 0000000..e3ca9f6 --- /dev/null +++ b/satrs-example/src/udp.rs @@ -0,0 +1,76 @@ +use std::{net::SocketAddr, sync::mpsc::Receiver}; + +use log::{info, warn}; +use satrs_core::{ + hal::std::udp_server::{ReceiveResult, UdpTcServer}, + pool::{SharedPool, StoreAddr}, + tmtc::CcsdsError, +}; + +use crate::tmtc::MpscStoreAndSendError; + +pub struct UdpTmtcServer { + pub udp_tc_server: UdpTcServer>, + pub tm_rx: Receiver, + pub tm_store: SharedPool, +} +impl UdpTmtcServer { + pub fn periodic_operation(&mut self) { + while self.poll_tc_server() {} + if let Some(recv_addr) = self.udp_tc_server.last_sender() { + self.send_tm_to_udp_client(&recv_addr); + } + } + + fn poll_tc_server(&mut self) -> bool { + match self.udp_tc_server.try_recv_tc() { + Ok(_) => true, + Err(e) => match e { + ReceiveResult::ReceiverError(e) => match e { + CcsdsError::ByteConversionError(e) => { + warn!("packet error: {e:?}"); + true + } + CcsdsError::CustomError(e) => { + warn!("mpsc store and send error {e:?}"); + true + } + }, + ReceiveResult::IoError(e) => { + warn!("IO error {e}"); + false + } + ReceiveResult::NothingReceived => false, + }, + } + } + + fn send_tm_to_udp_client(&mut self, recv_addr: &SocketAddr) { + while let Ok(addr) = self.tm_rx.try_recv() { + let store_lock = self.tm_store.write(); + if store_lock.is_err() { + warn!("Locking TM store failed"); + continue; + } + let mut store_lock = store_lock.unwrap(); + let pg = store_lock.read_with_guard(addr); + let read_res = pg.read(); + if read_res.is_err() { + warn!("Error reading TM pool data"); + continue; + } + let buf = read_res.unwrap(); + if buf.len() > 9 { + let service = buf[7]; + let subservice = buf[8]; + info!("Sending PUS TM[{service},{subservice}]") + } else { + info!("Sending PUS TM"); + } + let result = self.udp_tc_server.socket.send_to(buf, recv_addr); + if let Err(e) = result { + warn!("Sending TM with UDP socket failed: {e}") + } + } + } +} diff --git a/satrs-mib/Cargo.toml b/satrs-mib/Cargo.toml index a34d886..e901ebe 100644 --- a/satrs-mib/Cargo.toml +++ b/satrs-mib/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "satrs-mib" -version = "0.1.0-alpha.0" +version = "0.1.0-alpha.1" edition = "2021" rust-version = "1.61" authors = ["Robin Mueller "] @@ -30,7 +30,7 @@ version = "0.1.0-alpha.1" [dependencies.satrs-mib-codegen] path = "codegen" -version = "0.1.0-alpha.0" +version = "0.1.0-alpha.1" [dependencies.serde] version = "1" diff --git a/satrs-mib/codegen/Cargo.toml b/satrs-mib/codegen/Cargo.toml index cba83eb..9a3887c 100644 --- a/satrs-mib/codegen/Cargo.toml +++ b/satrs-mib/codegen/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "satrs-mib-codegen" -version = "0.1.0-alpha.0" +version = "0.1.0-alpha.1" edition = "2021" description = "satrs-mib proc macro implementation" homepage = "https://egit.irs.uni-stuttgart.de/rust/sat-rs"