diff --git a/README.md b/README.md index 585d3a9..172d0ce 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,9 @@ sat-rs This is the repository of the sat-rs framework. Its primary goal is to provide re-usable components to write on-board software for remote systems like rovers or satellites. It is specifically written -for the special requirements for these systems. +for the special requirements for these systems. You can find an overview of the project and the +link to the [more high-level sat-rs book](https://documentation.irs.uni-stuttgart.de/projects/sat-rs/) +at the [IRS documentation website](https://documentation.irs.uni-stuttgart.de/sat-rs.html). A lot of the architecture and general design considerations are based on the [FSFW](https://egit.irs.uni-stuttgart.de/fsfw/fsfw) C++ framework which has flight heritage @@ -16,6 +18,10 @@ and [EIVE](https://www.irs.uni-stuttgart.de/en/research/satellitetechnology-and- This project currently contains following crates: +* [`satrs-book`](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-book): + Primary information resource in addition to the API documentation, hosted + [here](https://documentation.irs.uni-stuttgart.de/projects/sat-rs/). It can be useful to read + this first before delving into the example application and the API documentation. * [`satrs-core`](https://egit.irs.uni-stuttgart.de/rust/satrs-launchpad/src/branch/main/satrs-core): Core components of sat-rs. * [`satrs-example`](https://egit.irs.uni-stuttgart.de/rust/satrs-launchpad/src/branch/main/satrs-example): @@ -37,3 +43,16 @@ Each project has its own `CHANGELOG.md`. packet protocol implementations. This repository is re-exported in the [`satrs-core`](https://egit.irs.uni-stuttgart.de/rust/satrs-launchpad/src/branch/main/satrs-core) crate. + +# Coverage + +Coverage was generated using [`grcov`](https://github.com/mozilla/grcov). If you have not done so +already, install the `llvm-tools-preview`: + +```sh +rustup component add llvm-tools-preview +cargo install grcov --locked +``` + +After that, you can simply run `coverage.py` to test the `satrs-core` crate with coverage. You can +optionally supply the `--open` flag to open the coverage report in your webbrowser. diff --git a/automation/Jenkinsfile b/automation/Jenkinsfile index 937fea1..0a27b8b 100644 --- a/automation/Jenkinsfile +++ b/automation/Jenkinsfile @@ -67,7 +67,7 @@ pipeline { sh 'mdbook build' sshagent(credentials: ['documentation-buildfix']) { // Deploy to Apache webserver - sh 'rsync -r --delete book/ buildfix@documentation.irs.uni-stuttgart.de:/projects/sat-rs' + sh 'rsync -r --delete book buildfix@documentation.irs.uni-stuttgart.de:/projects/sat-rs' } } } diff --git a/coverage.py b/coverage.py new file mode 100755 index 0000000..b7efbe9 --- /dev/null +++ b/coverage.py @@ -0,0 +1,61 @@ +#!/usr/bin/env python3 +import os +import logging +import argparse +import webbrowser + + +_LOGGER = logging.getLogger() + + +def generate_cov_report(open_report: bool, format: str, package: str): + logging.basicConfig(level=logging.INFO) + os.environ["RUSTFLAGS"] = "-Cinstrument-coverage" + os.environ["LLVM_PROFILE_FILE"] = "target/coverage/%p-%m.profraw" + _LOGGER.info("Executing tests with coverage") + os.system(f"cargo test -p {package}") + + out_path = "./target/debug/coverage" + if format == "lcov": + out_path = "./target/debug/lcov.info" + os.system( + f"grcov . -s . --binary-path ./target/debug/ -t {format} --branch --ignore-not-existing " + f"-o {out_path}" + ) + if format == "lcov": + os.system( + "genhtml -o ./target/debug/coverage/ --show-details --highlight --ignore-errors source " + "--legend ./target/debug/lcov.info" + ) + if open_report: + coverage_report_path = os.path.abspath("./target/debug/coverage/index.html") + webbrowser.open_new_tab(coverage_report_path) + _LOGGER.info("Done") + + +def main(): + parser = argparse.ArgumentParser( + description="Generate coverage report and optionally open it in a browser" + ) + parser.add_argument( + "--open", action="store_true", help="Open the coverage report in a browser" + ) + parser.add_argument( + "-p", + "--package", + choices=["satrs-core"], + default="satrs-core", + help="Choose project to generate coverage for", + ) + parser.add_argument( + "--format", + choices=["html", "lcov"], + default="html", + help="Choose report format (html or lcov)", + ) + args = parser.parse_args() + generate_cov_report(args.open, args.format, args.package) + + +if __name__ == "__main__": + main() diff --git a/satrs-book/src/introduction.md b/satrs-book/src/introduction.md index 31a0b0c..f448441 100644 --- a/satrs-book/src/introduction.md +++ b/satrs-book/src/introduction.md @@ -21,3 +21,9 @@ A lot of the architecture and general design considerations are based on the through the 2 missions [FLP](https://www.irs.uni-stuttgart.de/en/research/satellitetechnology-and-instruments/smallsatelliteprogram/flying-laptop/) and [EIVE](https://www.irs.uni-stuttgart.de/en/research/satellitetechnology-and-instruments/smallsatelliteprogram/EIVE/). +# Getting started with the example + +The [`satrs-example`](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-example) +provides various practical usage examples of the `sat-rs` framework. If you are more interested in +the practical application of `sat-rs` inside an application, it is recommended to have a look at +the example application. diff --git a/satrs-core/Cargo.toml b/satrs-core/Cargo.toml index 4b75576..6fa8535 100644 --- a/satrs-core/Cargo.toml +++ b/satrs-core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "satrs-core" -version = "0.1.0-alpha.0" +version = "0.1.0-alpha.1" edition = "2021" rust-version = "1.61" authors = ["Robin Mueller "] @@ -15,6 +15,7 @@ categories = ["aerospace", "aerospace::space-protocols", "no-std", "hardware-sup [dependencies] delegate = ">0.7, <=0.10" paste = "1" +# TODO: Remove this as soon as the image including the description was moved to the satrs-book. embed-doc-image = "0.1" [dependencies.smallvec] @@ -72,15 +73,15 @@ features = ["all"] optional = true [dependencies.spacepackets] -# version = "0.7.0-beta.1" -# path = "../../spacepackets" -git = "https://egit.irs.uni-stuttgart.de/rust/spacepackets.git" -rev = "79d26e1a6" -# branch = "" +version = "0.7.0-beta.2" default-features = false +# git = "https://egit.irs.uni-stuttgart.de/rust/spacepackets.git" +# rev = "79d26e1a6" +# branch = "" [dependencies.cobs] git = "https://github.com/robamu/cobs.rs.git" +version = "0.2.3" branch = "all_features" default-features = false diff --git a/satrs-core/release-checklist.md b/satrs-core/release-checklist.md index e8b8ee3..d0359b0 100644 --- a/satrs-core/release-checklist.md +++ b/satrs-core/release-checklist.md @@ -4,7 +4,7 @@ Checklist for new releases # Pre-Release 1. Make sure any new modules are documented sufficiently enough and check docs with - `cargo doc --all-features --open`. + `cargo +nightly doc --all-features --config 'rustdocflags=["--cfg", "doc_cfg"]' --open`. 2. Bump version specifier in `Cargo.toml`. 3. Update `CHANGELOG.md`: Convert `unreleased` section into version section with date and add new `unreleased` section. diff --git a/satrs-core/src/encoding/ccsds.rs b/satrs-core/src/encoding/ccsds.rs index ec7da4c..ecd4ff5 100644 --- a/satrs-core/src/encoding/ccsds.rs +++ b/satrs-core/src/encoding/ccsds.rs @@ -30,6 +30,12 @@ impl PacketIdLookup for [u16] { } } +impl PacketIdLookup for &[u16] { + fn validate(&self, packet_id: u16) -> bool { + self.binary_search(&packet_id).is_ok() + } +} + #[cfg(feature = "alloc")] impl PacketIdLookup for Vec { fn validate(&self, packet_id: u16) -> bool { @@ -49,6 +55,12 @@ impl PacketIdLookup for [PacketId] { } } +impl PacketIdLookup for &[PacketId] { + fn validate(&self, packet_id: u16) -> bool { + self.binary_search(&PacketId::from(packet_id)).is_ok() + } +} + /// This function parses a given buffer for tightly packed CCSDS space packets. It uses the /// [PacketId] field of the CCSDS packets to detect the start of a CCSDS space packet and then /// uses the length field of the packet to extract CCSDS packets. diff --git a/satrs-core/src/hal/std/tcp_cobs_server.rs b/satrs-core/src/hal/std/tcp_cobs_server.rs index 4a22a8a..2d14589 100644 --- a/satrs-core/src/hal/std/tcp_cobs_server.rs +++ b/satrs-core/src/hal/std/tcp_cobs_server.rs @@ -1,4 +1,3 @@ -use alloc::boxed::Box; use alloc::vec; use cobs::encode; use delegate::delegate; @@ -29,7 +28,6 @@ impl TcpTcParser for CobsTcParser { current_write_idx: usize, next_write_idx: &mut usize, ) -> Result<(), TcpTmtcError> { - // Reader vec full, need to parse for packets. conn_result.num_received_tcs += parse_buffer_for_cobs_encoded_packets( &mut tc_buffer[..current_write_idx], tc_receiver.upcast_mut(), @@ -111,11 +109,23 @@ impl TcpTmSender for CobsTmSender { /// /// The [TCP integration tests](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-core/tests/tcp_servers.rs) /// test also serves as the example application for this module. -pub struct TcpTmtcInCobsServer { - generic_server: TcpTmtcGenericServer, +pub struct TcpTmtcInCobsServer< + TmError, + TcError: 'static, + TmSource: TmPacketSource, + TcReceiver: ReceivesTc, +> { + generic_server: + TcpTmtcGenericServer, } -impl TcpTmtcInCobsServer { +impl< + TmError: 'static, + TcError: 'static, + TmSource: TmPacketSource, + TcReceiver: ReceivesTc, + > TcpTmtcInCobsServer +{ /// Create a new TCP TMTC server which exchanges TMTC packets encoded with /// [COBS protocol](https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing). /// @@ -128,9 +138,9 @@ impl TcpTmtcInCobsServer { /// forwarded to this TC receiver. pub fn new( cfg: ServerConfig, - tm_source: Box>, - tc_receiver: Box>, - ) -> Result> { + tm_source: TmSource, + tc_receiver: TcReceiver, + ) -> Result { Ok(Self { generic_server: TcpTmtcGenericServer::new( cfg, @@ -177,7 +187,7 @@ mod tests { ServerConfig, }, }; - use alloc::{boxed::Box, sync::Arc}; + use alloc::sync::Arc; use cobs::encode; use super::TcpTmtcInCobsServer; @@ -202,11 +212,11 @@ mod tests { addr: &SocketAddr, tc_receiver: SyncTcCacher, tm_source: SyncTmSource, - ) -> TcpTmtcInCobsServer<(), ()> { + ) -> TcpTmtcInCobsServer<(), (), SyncTmSource, SyncTcCacher> { TcpTmtcInCobsServer::new( ServerConfig::new(*addr, Duration::from_millis(2), 1024, 1024), - Box::new(tm_source), - Box::new(tc_receiver), + tm_source, + tc_receiver, ) .expect("TCP server generation failed") } diff --git a/satrs-core/src/hal/std/tcp_server.rs b/satrs-core/src/hal/std/tcp_server.rs index e9fe657..deeb902 100644 --- a/satrs-core/src/hal/std/tcp_server.rs +++ b/satrs-core/src/hal/std/tcp_server.rs @@ -1,6 +1,6 @@ //! Generic TCP TMTC servers with different TMTC format flavours. use alloc::vec; -use alloc::{boxed::Box, vec::Vec}; +use alloc::vec::Vec; use core::time::Duration; use socket2::{Domain, Socket, Type}; use std::io::Read; @@ -134,20 +134,29 @@ pub trait TcpTmSender { pub struct TcpTmtcGenericServer< TmError, TcError, - TmHandler: TcpTmSender, - TcHandler: TcpTcParser, + TmSource: TmPacketSource, + TcReceiver: ReceivesTc, + TmSender: TcpTmSender, + TcParser: TcpTcParser, > { - base: TcpTmtcServerBase, - tc_handler: TcHandler, - tm_handler: TmHandler, + pub(crate) listener: TcpListener, + pub(crate) inner_loop_delay: Duration, + pub(crate) tm_source: TmSource, + pub(crate) tm_buffer: Vec, + pub(crate) tc_receiver: TcReceiver, + pub(crate) tc_buffer: Vec, + tc_handler: TcParser, + tm_handler: TmSender, } impl< TmError: 'static, TcError: 'static, + TmSource: TmPacketSource, + TcReceiver: ReceivesTc, TmSender: TcpTmSender, TcParser: TcpTcParser, - > TcpTmtcGenericServer + > TcpTmtcGenericServer { /// Create a new generic TMTC server instance. /// @@ -165,25 +174,38 @@ impl< cfg: ServerConfig, tc_parser: TcParser, tm_sender: TmSender, - tm_source: Box>, - tc_receiver: Box>, - ) -> Result, std::io::Error> { + tm_source: TmSource, + tc_receiver: TcReceiver, + ) -> Result { + // Create a TCP listener bound to two addresses. + let socket = Socket::new(Domain::IPV4, Type::STREAM, None)?; + socket.set_reuse_address(cfg.reuse_addr)?; + #[cfg(unix)] + socket.set_reuse_port(cfg.reuse_port)?; + let addr = (cfg.addr).into(); + socket.bind(&addr)?; + socket.listen(128)?; Ok(Self { - base: TcpTmtcServerBase::new(cfg, tm_source, tc_receiver)?, tc_handler: tc_parser, tm_handler: tm_sender, + listener: socket.into(), + inner_loop_delay: cfg.inner_loop_delay, + tm_source, + tm_buffer: vec![0; cfg.tm_buffer_size], + tc_receiver, + tc_buffer: vec![0; cfg.tc_buffer_size], }) } /// Retrieve the internal [TcpListener] class. pub fn listener(&mut self) -> &mut TcpListener { - self.base.listener() + &mut self.listener } /// Can be used to retrieve the local assigned address of the TCP server. This is especially /// useful if using the port number 0 for OS auto-assignment. pub fn local_addr(&self) -> std::io::Result { - self.base.local_addr() + self.listener.local_addr() } /// This call is used to handle the next connection to a client. Right now, it performs @@ -205,20 +227,20 @@ impl< let mut connection_result = ConnectionResult::default(); let mut current_write_idx; let mut next_write_idx = 0; - let (mut stream, addr) = self.base.listener.accept()?; + let (mut stream, addr) = self.listener.accept()?; stream.set_nonblocking(true)?; connection_result.addr = Some(addr); current_write_idx = next_write_idx; loop { - let read_result = stream.read(&mut self.base.tc_buffer[current_write_idx..]); + let read_result = stream.read(&mut self.tc_buffer[current_write_idx..]); match read_result { Ok(0) => { // Connection closed by client. If any TC was read, parse for complete packets. // After that, break the outer loop. if current_write_idx > 0 { self.tc_handler.handle_tc_parsing( - &mut self.base.tc_buffer, - self.base.tc_receiver.as_mut(), + &mut self.tc_buffer, + &mut self.tc_receiver, &mut connection_result, current_write_idx, &mut next_write_idx, @@ -229,10 +251,10 @@ impl< Ok(read_len) => { current_write_idx += read_len; // TC buffer is full, we must parse for complete packets now. - if current_write_idx == self.base.tc_buffer.capacity() { + if current_write_idx == self.tc_buffer.capacity() { self.tc_handler.handle_tc_parsing( - &mut self.base.tc_buffer, - self.base.tc_receiver.as_mut(), + &mut self.tc_buffer, + &mut self.tc_receiver, &mut connection_result, current_write_idx, &mut next_write_idx, @@ -245,8 +267,8 @@ impl< // both UNIX and Windows. std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut => { self.tc_handler.handle_tc_parsing( - &mut self.base.tc_buffer, - self.base.tc_receiver.as_mut(), + &mut self.tc_buffer, + &mut self.tc_receiver, &mut connection_result, current_write_idx, &mut next_write_idx, @@ -254,14 +276,14 @@ impl< current_write_idx = next_write_idx; if !self.tm_handler.handle_tm_sending( - &mut self.base.tm_buffer, - self.base.tm_source.as_mut(), + &mut self.tm_buffer, + &mut self.tm_source, &mut connection_result, &mut stream, )? { // No TC read, no TM was sent, but the client has not disconnected. // Perform an inner delay to avoid burning CPU time. - thread::sleep(self.base.inner_loop_delay); + thread::sleep(self.inner_loop_delay); } } _ => { @@ -271,8 +293,8 @@ impl< } } self.tm_handler.handle_tm_sending( - &mut self.base.tm_buffer, - self.base.tm_source.as_mut(), + &mut self.tm_buffer, + &mut self.tm_source, &mut connection_result, &mut stream, )?; @@ -280,47 +302,6 @@ impl< } } -pub(crate) struct TcpTmtcServerBase { - pub(crate) listener: TcpListener, - pub(crate) inner_loop_delay: Duration, - pub(crate) tm_source: Box>, - pub(crate) tm_buffer: Vec, - pub(crate) tc_receiver: Box>, - pub(crate) tc_buffer: Vec, -} - -impl TcpTmtcServerBase { - pub(crate) fn new( - cfg: ServerConfig, - tm_source: Box>, - tc_receiver: Box>, - ) -> Result { - // Create a TCP listener bound to two addresses. - let socket = Socket::new(Domain::IPV4, Type::STREAM, None)?; - socket.set_reuse_address(cfg.reuse_addr)?; - socket.set_reuse_port(cfg.reuse_port)?; - let addr = (cfg.addr).into(); - socket.bind(&addr)?; - socket.listen(128)?; - Ok(Self { - listener: socket.into(), - inner_loop_delay: cfg.inner_loop_delay, - tm_source, - tm_buffer: vec![0; cfg.tm_buffer_size], - tc_receiver, - tc_buffer: vec![0; cfg.tc_buffer_size], - }) - } - - pub(crate) fn listener(&mut self) -> &mut TcpListener { - &mut self.listener - } - - pub(crate) fn local_addr(&self) -> std::io::Result { - self.listener.local_addr() - } -} - #[cfg(test)] pub(crate) mod tests { use std::sync::Mutex; diff --git a/satrs-core/src/hal/std/tcp_spacepackets_server.rs b/satrs-core/src/hal/std/tcp_spacepackets_server.rs index b9fc86b..c124a47 100644 --- a/satrs-core/src/hal/std/tcp_spacepackets_server.rs +++ b/satrs-core/src/hal/std/tcp_spacepackets_server.rs @@ -88,16 +88,31 @@ impl TcpTmSender for SpacepacketsTmSender { /// [spacepackets::PacketId]s as part of the server configuration for that purpose. /// /// ## Example -/// /// The [TCP server integration tests](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-core/tests/tcp_servers.rs) /// also serves as the example application for this module. -pub struct TcpSpacepacketsServer { - generic_server: - TcpTmtcGenericServer, +pub struct TcpSpacepacketsServer< + TmError, + TcError: 'static, + TmSource: TmPacketSource, + TcReceiver: ReceivesTc, +> { + generic_server: TcpTmtcGenericServer< + TmError, + TcError, + TmSource, + TcReceiver, + SpacepacketsTmSender, + SpacepacketsTcParser, + >, } -impl TcpSpacepacketsServer { - /// Create a new TCP TMTC server which exchanges CCSDS space packets. +impl< + TmError: 'static, + TcError: 'static, + TmSource: TmPacketSource, + TcReceiver: ReceivesTc, + > TcpSpacepacketsServer +{ /// /// ## Parameter /// @@ -110,10 +125,10 @@ impl TcpSpacepacketsServer /// parsing. This mechanism is used to have a start marker for finding CCSDS packets. pub fn new( cfg: ServerConfig, - tm_source: Box>, - tc_receiver: Box>, + tm_source: TmSource, + tc_receiver: TcReceiver, packet_id_lookup: Box, - ) -> Result> { + ) -> Result { Ok(Self { generic_server: TcpTmtcGenericServer::new( cfg, @@ -179,11 +194,11 @@ mod tests { tc_receiver: SyncTcCacher, tm_source: SyncTmSource, packet_id_lookup: HashSet, - ) -> TcpSpacepacketsServer<(), ()> { + ) -> TcpSpacepacketsServer<(), (), SyncTmSource, SyncTcCacher> { TcpSpacepacketsServer::new( ServerConfig::new(*addr, Duration::from_millis(2), 1024, 1024), - Box::new(tm_source), - Box::new(tc_receiver), + tm_source, + tc_receiver, Box::new(packet_id_lookup), ) .expect("TCP server generation failed") @@ -220,13 +235,10 @@ mod tests { }); let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap(); let ping_tc = PusTcCreator::new_simple(&mut sph, 17, 1, None, true); - let mut buffer: [u8; 32] = [0; 32]; - let packet_len_ping = ping_tc - .write_to_bytes(&mut buffer) - .expect("writing packet failed"); + let tc_0 = ping_tc.to_vec().expect("packet generation failed"); let mut stream = TcpStream::connect(dest_addr).expect("connecting to TCP server failed"); stream - .write_all(&buffer[..packet_len_ping]) + .write_all(&tc_0) .expect("writing to TCP server failed"); drop(stream); @@ -242,12 +254,11 @@ mod tests { // Check that TC has arrived. let mut tc_queue = tc_receiver.tc_queue.lock().unwrap(); assert_eq!(tc_queue.len(), 1); - assert_eq!(tc_queue.pop_front().unwrap(), buffer[..packet_len_ping]); + assert_eq!(tc_queue.pop_front().unwrap(), tc_0); } #[test] fn test_multi_tc_multi_tm() { - let mut buffer: [u8; 32] = [0; 32]; let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0); let tc_receiver = SyncTcCacher::default(); let mut tm_source = SyncTmSource::default(); @@ -256,19 +267,13 @@ mod tests { let mut total_tm_len = 0; let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap(); let verif_tm = PusTcCreator::new_simple(&mut sph, 1, 1, None, true); - let tm_packet_len = verif_tm - .write_to_bytes(&mut buffer) - .expect("writing packet failed"); - total_tm_len += tm_packet_len; - let tm_0 = buffer[..tm_packet_len].to_vec(); + let tm_0 = verif_tm.to_vec().expect("writing packet failed"); + total_tm_len += tm_0.len(); tm_source.add_tm(&tm_0); let mut sph = SpHeader::tc_unseg(TEST_APID_1, 0, 0).unwrap(); let verif_tm = PusTcCreator::new_simple(&mut sph, 1, 3, None, true); - let tm_packet_len = verif_tm - .write_to_bytes(&mut buffer) - .expect("writing packet failed"); - total_tm_len += tm_packet_len; - let tm_1 = buffer[..tm_packet_len].to_vec(); + let tm_1 = verif_tm.to_vec().expect("writing packet failed"); + total_tm_len += tm_1.len(); tm_source.add_tm(&tm_1); // Set up server @@ -309,19 +314,13 @@ mod tests { // Send telecommands let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap(); let ping_tc = PusTcCreator::new_simple(&mut sph, 17, 1, None, true); - let packet_len = ping_tc - .write_to_bytes(&mut buffer) - .expect("writing packet failed"); - let tc_0 = buffer[..packet_len].to_vec(); + let tc_0 = ping_tc.to_vec().expect("ping tc creation failed"); stream .write_all(&tc_0) .expect("writing to TCP server failed"); let mut sph = SpHeader::tc_unseg(TEST_APID_1, 0, 0).unwrap(); let action_tc = PusTcCreator::new_simple(&mut sph, 8, 0, None, true); - let packet_len = action_tc - .write_to_bytes(&mut buffer) - .expect("writing packet failed"); - let tc_1 = buffer[..packet_len].to_vec(); + let tc_1 = action_tc.to_vec().expect("action tc creation failed"); stream .write_all(&tc_1) .expect("writing to TCP server failed"); diff --git a/satrs-core/tests/tcp_servers.rs b/satrs-core/tests/tcp_servers.rs index b3e7993..251eead 100644 --- a/satrs-core/tests/tcp_servers.rs +++ b/satrs-core/tests/tcp_servers.rs @@ -94,8 +94,8 @@ fn test_cobs_server() { tm_source.add_tm(&INVERTED_PACKET); let mut tcp_server = TcpTmtcInCobsServer::new( ServerConfig::new(AUTO_PORT_ADDR, Duration::from_millis(2), 1024, 1024), - Box::new(tm_source), - Box::new(tc_receiver.clone()), + tm_source, + tc_receiver.clone(), ) .expect("TCP server generation failed"); let dest_addr = tcp_server @@ -166,22 +166,18 @@ const TEST_PACKET_ID_0: PacketId = PacketId::const_tc(true, TEST_APID_0); #[test] fn test_ccsds_server() { - let mut buffer: [u8; 32] = [0; 32]; let tc_receiver = SyncTcCacher::default(); let mut tm_source = SyncTmSource::default(); let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap(); let verif_tm = PusTcCreator::new_simple(&mut sph, 1, 1, None, true); - let tm_packet_len = verif_tm - .write_to_bytes(&mut buffer) - .expect("writing packet failed"); - tm_source.add_tm(&buffer[..tm_packet_len]); - let tm_vec = buffer[..tm_packet_len].to_vec(); + let tm_0 = verif_tm.to_vec().expect("tm generation failed"); + tm_source.add_tm(&tm_0); let mut packet_id_lookup = HashSet::new(); packet_id_lookup.insert(TEST_PACKET_ID_0); let mut tcp_server = TcpSpacepacketsServer::new( ServerConfig::new(AUTO_PORT_ADDR, Duration::from_millis(2), 1024, 1024), - Box::new(tm_source), - Box::new(tc_receiver.clone()), + tm_source, + tc_receiver.clone(), Box::new(packet_id_lookup), ) .expect("TCP server generation failed"); @@ -202,29 +198,30 @@ fn test_ccsds_server() { set_if_done.store(true, Ordering::Relaxed); }); let mut stream = TcpStream::connect(dest_addr).expect("connecting to TCP server failed"); - let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap(); - let ping_tc = PusTcCreator::new_simple(&mut sph, 17, 1, None, true); stream .set_read_timeout(Some(Duration::from_millis(10))) .expect("setting reas timeout failed"); - let packet_len = ping_tc - .write_to_bytes(&mut buffer) - .expect("writing packet failed"); - stream - .write_all(&buffer[..packet_len]) - .expect("writing to TCP server failed"); + // Send ping telecommand. + let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap(); + let ping_tc = PusTcCreator::new_simple(&mut sph, 17, 1, None, true); + let tc_0 = ping_tc.to_vec().expect("packet creation failed"); + stream + .write_all(&tc_0) + .expect("writing to TCP server failed"); // Done with writing. stream .shutdown(std::net::Shutdown::Write) .expect("shutting down write failed"); + + // Now read all the telemetry from the server. let mut read_buf: [u8; 16] = [0; 16]; let mut read_len_total = 0; // Timeout ensures this does not block forever. - while read_len_total < tm_packet_len { + while read_len_total < tm_0.len() { let read_len = stream.read(&mut read_buf).expect("read failed"); read_len_total += read_len; - assert_eq!(read_buf[..read_len], tm_vec); + assert_eq!(read_buf[..read_len], tm_0); } drop(stream); @@ -240,5 +237,5 @@ fn test_ccsds_server() { // Check that TC has arrived. let mut tc_queue = tc_receiver.tc_queue.lock().unwrap(); assert_eq!(tc_queue.len(), 1); - assert_eq!(tc_queue.pop_front().unwrap(), buffer[..packet_len]); + assert_eq!(tc_queue.pop_front().unwrap(), tc_0); } diff --git a/satrs-example/Cargo.toml b/satrs-example/Cargo.toml index e564305..cf3cade 100644 --- a/satrs-example/Cargo.toml +++ b/satrs-example/Cargo.toml @@ -24,11 +24,9 @@ num-traits = "0.2" num-derive = "0.3" [dependencies.satrs-core] -# version = "0.1.0-alpha.0" +# version = "0.1.0-alpha.1" path = "../satrs-core" - [dependencies.satrs-mib] -path = "../satrs-mib" - - +version = "0.1.0-alpha.1" +# path = "../satrs-mib" diff --git a/satrs-example/README.md b/satrs-example/README.md index 021ae67..da5551f 100644 --- a/satrs-example/README.md +++ b/satrs-example/README.md @@ -17,7 +17,7 @@ cargo run --bin simpleclient This repository also contains a more complex client using the [Python tmtccmd](https://github.com/robamu-org/tmtccmd) module. -# Using the tmtccmd Python client +# Using the tmtccmd Python client The python client requires a valid installation of the [tmtccmd package](https://github.com/robamu-org/tmtccmd). @@ -51,3 +51,26 @@ the `simpleclient`: You can also simply call the script without any arguments to view a list of services (`-s` flag) and corresponding op codes (`-o` flag) for each service. + +# Structure of the example project + +The example project contains components which could also be expected to be part of a production +On-Board Software. + +1. A UDP and TCP server to receive telecommands and poll telemetry from. This might be an optional + component for an OBSW which is only used during the development phase on ground. The TCP + server parses space packets by using the CCSDS space packet ID as the packet start delimiter. +2. A PUS service stack which exposes some functionality conformant with the ECSS PUS service. This + currently includes the following services: + - Service 1 for telecommand verification. + - Service 3 for housekeeping telemetry handling. + - Service 5 for management and downlink of on-board events. + - Service 8 for handling on-board actions. + - Service 11 for scheduling telecommands to be released at a specific time. + - Service 17 for test purposes (pings) +3. An event manager component which handles the event IPC mechanism. +4. A TC source component which demultiplexes and routes telecommands based on parameters like + packet APID or PUS service and subservice type. +5. A TM sink sink component which is the target of all sent telemetry and sends it to downlink + handlers like the UDP and TCP server. +6. An AOCS example task which can also process some PUS commands. diff --git a/satrs-example/pyclient/common.py b/satrs-example/pyclient/common.py index 81df032..c2d0777 100644 --- a/satrs-example/pyclient/common.py +++ b/satrs-example/pyclient/common.py @@ -4,7 +4,11 @@ import dataclasses import enum import struct +from spacepackets.ecss.tc import PacketId, PacketType + EXAMPLE_PUS_APID = 0x02 +EXAMPLE_PUS_PACKET_ID_TM = PacketId(PacketType.TM, True, EXAMPLE_PUS_APID) +TM_PACKET_IDS = [EXAMPLE_PUS_PACKET_ID_TM] class EventSeverity(enum.IntEnum): diff --git a/satrs-example/pyclient/main.py b/satrs-example/pyclient/main.py index 39217e7..c749e26 100755 --- a/satrs-example/pyclient/main.py +++ b/satrs-example/pyclient/main.py @@ -14,7 +14,7 @@ from spacepackets.ccsds.time import CdsShortTimestamp from tmtccmd import CcsdsTmtcBackend, TcHandlerBase, ProcedureParamsWrapper from tmtccmd.core.base import BackendRequest from tmtccmd.pus import VerificationWrapper -from tmtccmd.tm import CcsdsTmHandler, SpecificApidHandlerBase +from tmtccmd.tmtc import CcsdsTmHandler, SpecificApidHandlerBase from tmtccmd.com import ComInterface from tmtccmd.config import ( default_json_path, @@ -30,7 +30,7 @@ from tmtccmd.logging.pus import ( RawTmtcTimedLogWrapper, TimedLogWhen, ) -from tmtccmd.tc import ( +from tmtccmd.tmtc import ( TcQueueEntryType, ProcedureWrapper, TcProcedureType, @@ -45,7 +45,7 @@ from tmtccmd.util.obj_id import ObjectIdDictT import pus_tc import tc_definitions -from common import EXAMPLE_PUS_APID, EventU32 +from common import EXAMPLE_PUS_APID, TM_PACKET_IDS, EventU32 _LOGGER = logging.getLogger() @@ -63,7 +63,7 @@ class SatRsConfigHook(HookBase): cfg = create_com_interface_cfg_default( com_if_key=com_if_key, json_cfg_path=self.cfg_path, - space_packet_ids=None, + space_packet_ids=TM_PACKET_IDS, ) return create_com_interface_default(cfg) @@ -128,6 +128,7 @@ class PusHandler(SpecificApidHandlerBase): if len(pus_tm.source_data) < 8: raise ValueError("No addressable ID in HK packet") json_str = pus_tm.source_data[8:] + _LOGGER.info(json_str) dedicated_handler = True if service == 5: tm_packet = PusTelemetry.unpack( diff --git a/satrs-example/pyclient/pus_tc.py b/satrs-example/pyclient/pus_tc.py index 4373cb4..9996e5b 100644 --- a/satrs-example/pyclient/pus_tc.py +++ b/satrs-example/pyclient/pus_tc.py @@ -3,9 +3,9 @@ import datetime from spacepackets.ccsds import CdsShortTimestamp from spacepackets.ecss import PusTelecommand from tmtccmd.config import CoreServiceList -from tmtccmd.tc import DefaultPusQueueHelper -from tmtccmd.tc.pus_11_tc_sched import create_time_tagged_cmd -from tmtccmd.tc.pus_3_fsfw_hk import create_request_one_hk_command +from tmtccmd.tmtc import DefaultPusQueueHelper +from tmtccmd.pus.s11_tc_sched import create_time_tagged_cmd +from tmtccmd.pus.tc.s3_fsfw_hk import create_request_one_hk_command from common import ( EXAMPLE_PUS_APID, diff --git a/satrs-example/pyclient/requirements.txt b/satrs-example/pyclient/requirements.txt index bcb3cb4..485c76a 100644 --- a/satrs-example/pyclient/requirements.txt +++ b/satrs-example/pyclient/requirements.txt @@ -1,2 +1,2 @@ -tmtccmd == 5.0.0rc0 +tmtccmd == 7.0.0 # -e git+https://github.com/robamu-org/tmtccmd@97e5e51101a08b21472b3ddecc2063359f7e307a#egg=tmtccmd diff --git a/satrs-example/pyclient/tmtc_conf.json b/satrs-example/pyclient/tmtc_conf.json index ab02100..f2c8afd 100644 --- a/satrs-example/pyclient/tmtc_conf.json +++ b/satrs-example/pyclient/tmtc_conf.json @@ -1,6 +1,8 @@ { - "com_if": "udp", + "com_if": "tcp", "tcpip_udp_ip_addr": "127.0.0.1", "tcpip_udp_port": 7301, - "tcpip_udp_recv_max_size": 1500 -} \ No newline at end of file + "tcpip_udp_recv_max_size": 1500, + "tcpip_tcp_ip_addr": "127.0.0.1", + "tcpip_tcp_port": 7301 +} diff --git a/satrs-example/src/ccsds.rs b/satrs-example/src/ccsds.rs index ef361f2..d0616c9 100644 --- a/satrs-example/src/ccsds.rs +++ b/satrs-example/src/ccsds.rs @@ -3,6 +3,7 @@ use satrs_core::spacepackets::{CcsdsPacket, SpHeader}; use satrs_core::tmtc::{CcsdsPacketHandler, ReceivesCcsdsTc}; use satrs_example::PUS_APID; +#[derive(Clone)] pub struct CcsdsReceiver { pub tc_source: PusTcSource, } diff --git a/satrs-example/src/main.rs b/satrs-example/src/main.rs index fe55f79..f8d6b65 100644 --- a/satrs-example/src/main.rs +++ b/satrs-example/src/main.rs @@ -3,12 +3,16 @@ mod hk; mod logging; mod pus; mod requests; +mod tcp; mod tmtc; -//mod can; -//mod can_ids; +mod udp; use log::{info, warn}; +use satrs_core::hal::std::tcp_server::ServerConfig; +use satrs_core::hal::std::udp_server::UdpTcServer; +use crate::ccsds::CcsdsReceiver; +use crate::hk::AcsHkIds; use crate::hk::{AcsHkIds, HkUniqueId}; use crate::logging::setup_logger; use crate::pus::action::{Pus8Wrapper, PusService8ActionHandler}; @@ -16,9 +20,11 @@ use crate::pus::event::Pus5Wrapper; use crate::pus::hk::{Pus3Wrapper, PusService3HkHandler}; use crate::pus::scheduler::Pus11Wrapper; use crate::pus::test::Service17CustomWrapper; -use crate::pus::PusTcMpscRouter; +use crate::pus::{PusReceiver, PusTcMpscRouter}; use crate::requests::{Request, RequestWithToken}; -use crate::tmtc::{core_tmtc_task, PusTcSource, TcArgs, TcStore, TmArgs, TmFunnel}; +use crate::tcp::{SyncTcpTmSource, TcpTask}; +use crate::tmtc::{PusTcSource, TcArgs, TcStore, TmArgs, TmFunnel, TmtcTask}; +use crate::udp::UdpTmtcServer; use satrs_core::event_man::{ EventManagerWithMpscQueue, MpscEventReceiver, MpscEventU32SendProvider, SendEventProvider, }; @@ -143,7 +149,7 @@ fn main() { let tm_args = TmArgs { tm_store: shared_tm_store.clone(), tm_sink_sender: tm_funnel_tx.clone(), - tm_server_rx, + tm_udp_server_rx: tm_server_rx, }; let aocs_tm_funnel = tm_funnel_tx.clone(); @@ -270,11 +276,50 @@ fn main() { ); let mut pus_3_wrapper = Pus3Wrapper { pus_3_handler }; - info!("Starting TMTC task"); - let jh0 = thread::Builder::new() - .name("TMTC".to_string()) + let ccsds_receiver = CcsdsReceiver { + tc_source: tc_args.tc_source.clone(), + }; + let mut tmtc_task = TmtcTask::new(tc_args, PusReceiver::new(verif_reporter, pus_router)); + + let udp_ccsds_distributor = CcsdsDistributor::new(Box::new(ccsds_receiver.clone())); + let udp_tc_server = UdpTcServer::new(sock_addr, 2048, Box::new(udp_ccsds_distributor)) + .expect("creating UDP TMTC server failed"); + let mut udp_tmtc_server = UdpTmtcServer { + udp_tc_server, + tm_rx: tm_args.tm_udp_server_rx, + tm_store: tm_args.tm_store.clone_backing_pool(), + }; + + info!("Starting TMTC and UDP task"); + let jh_udp_tmtc = thread::Builder::new() + .name("TMTC and UDP".to_string()) .spawn(move || { - core_tmtc_task(sock_addr, tc_args, tm_args, verif_reporter, pus_router); + info!("Running UDP server on port {SERVER_PORT}"); + loop { + udp_tmtc_server.periodic_operation(); + tmtc_task.periodic_operation(); + thread::sleep(Duration::from_millis(400)); + } + }) + .unwrap(); + + let tcp_ccsds_distributor = CcsdsDistributor::new(Box::new(ccsds_receiver)); + let tcp_server_cfg = ServerConfig::new(sock_addr, Duration::from_millis(400), 4096, 8192); + let mut sync_tm_tcp_source = SyncTcpTmSource::new(200); + let mut tcp_server = TcpTask::new( + tcp_server_cfg, + sync_tm_tcp_source.clone(), + tcp_ccsds_distributor, + ) + .expect("tcp server creation failed"); + info!("Starting TCP task"); + let jh_tcp = thread::Builder::new() + .name("TCP".to_string()) + .spawn(move || { + info!("Running TCP server on port {SERVER_PORT}"); + loop { + tcp_server.periodic_operation(); + } }) .unwrap(); @@ -315,6 +360,7 @@ fn main() { .tm_server_tx .send(addr) .expect("Sending TM to server failed"); + sync_tm_tcp_source.add_tm(tm_raw); } } }) @@ -386,6 +432,7 @@ fn main() { let mut timestamp: [u8; 7] = [0; 7]; let mut time_provider = TimeProvider::new_with_u16_days(0, 0); loop { + // TODO: Move this into a separate function/task/module.. match acs_thread_rx.try_recv() { Ok(request) => { info!( @@ -488,7 +535,12 @@ fn main() { thread::sleep(Duration::from_millis(200)); }) .unwrap(); - jh0.join().expect("Joining UDP TMTC server thread failed"); + jh_udp_tmtc + .join() + .expect("Joining UDP TMTC server thread failed"); + jh_tcp + .join() + .expect("Joining TCP TMTC server thread failed"); jh1.join().expect("Joining TM Funnel thread failed"); jh2.join().expect("Joining Event Manager thread failed"); jh3.join().expect("Joining AOCS thread failed"); diff --git a/satrs-example/src/tcp.rs b/satrs-example/src/tcp.rs new file mode 100644 index 0000000..8b7feff --- /dev/null +++ b/satrs-example/src/tcp.rs @@ -0,0 +1,115 @@ +use std::{ + collections::VecDeque, + sync::{Arc, Mutex}, +}; + +use log::{info, warn}; +use satrs_core::{ + hal::std::tcp_server::{ServerConfig, TcpSpacepacketsServer}, + spacepackets::PacketId, + tmtc::{CcsdsDistributor, CcsdsError, TmPacketSourceCore}, +}; +use satrs_example::PUS_APID; + +use crate::tmtc::MpscStoreAndSendError; + +pub const PACKET_ID_LOOKUP: &[PacketId] = &[PacketId::const_tc(true, PUS_APID)]; + +#[derive(Default, Clone)] +pub struct SyncTcpTmSource { + tm_queue: Arc>>>, + max_packets_stored: usize, + pub silent_packet_overwrite: bool, +} + +impl SyncTcpTmSource { + pub fn new(max_packets_stored: usize) -> Self { + Self { + tm_queue: Arc::default(), + max_packets_stored, + silent_packet_overwrite: true, + } + } + + pub fn add_tm(&mut self, tm: &[u8]) { + let mut tm_queue = self.tm_queue.lock().expect("locking tm queue failec"); + if tm_queue.len() > self.max_packets_stored { + if !self.silent_packet_overwrite { + warn!("TPC TM source is full, deleting oldest packet"); + } + tm_queue.pop_front(); + } + tm_queue.push_back(tm.to_vec()); + } +} + +impl TmPacketSourceCore for SyncTcpTmSource { + type Error = (); + + fn retrieve_packet(&mut self, buffer: &mut [u8]) -> Result { + let mut tm_queue = self.tm_queue.lock().expect("locking tm queue failed"); + if !tm_queue.is_empty() { + let next_vec = tm_queue.front().unwrap(); + if buffer.len() < next_vec.len() { + panic!( + "provided buffer too small, must be at least {} bytes", + next_vec.len() + ); + } + let next_vec = tm_queue.pop_front().unwrap(); + buffer[0..next_vec.len()].copy_from_slice(&next_vec); + if next_vec.len() > 9 { + let service = next_vec[7]; + let subservice = next_vec[8]; + info!("Sending PUS TM[{service},{subservice}]") + } else { + info!("Sending PUS TM"); + } + return Ok(next_vec.len()); + } + Ok(0) + } +} + +pub struct TcpTask { + server: TcpSpacepacketsServer< + (), + CcsdsError, + SyncTcpTmSource, + CcsdsDistributor, + >, +} + +impl TcpTask { + pub fn new( + cfg: ServerConfig, + tm_source: SyncTcpTmSource, + tc_receiver: CcsdsDistributor, + ) -> Result { + Ok(Self { + server: TcpSpacepacketsServer::new( + cfg, + tm_source, + tc_receiver, + Box::new(PACKET_ID_LOOKUP), + )?, + }) + } + + pub fn periodic_operation(&mut self) { + loop { + let result = self.server.handle_next_connection(); + match result { + Ok(conn_result) => { + info!( + "Served {} TMs and {} TCs for client {:?}", + conn_result.num_sent_tms, conn_result.num_received_tcs, conn_result.addr + ); + } + Err(e) => { + warn!("TCP server error: {e:?}"); + } + } + } + } +} diff --git a/satrs-example/src/tmtc.rs b/satrs-example/src/tmtc.rs index 5d2ea5e..8af701a 100644 --- a/satrs-example/src/tmtc.rs +++ b/satrs-example/src/tmtc.rs @@ -1,26 +1,21 @@ -use log::{info, warn}; -use satrs_core::hal::std::udp_server::{ReceiveResult, UdpTcServer}; -use std::net::SocketAddr; +use log::warn; +use satrs_core::pus::ReceivesEcssPusTc; +use satrs_core::spacepackets::SpHeader; use std::sync::mpsc::{Receiver, SendError, Sender, TryRecvError}; -use std::thread; -use std::time::Duration; use thiserror::Error; -use crate::ccsds::CcsdsReceiver; -use crate::pus::{PusReceiver, PusTcMpscRouter}; +use crate::pus::PusReceiver; use satrs_core::pool::{SharedPool, StoreAddr, StoreError}; -use satrs_core::pus::verification::StdVerifReporterWithSender; -use satrs_core::pus::{ReceivesEcssPusTc, TcAddrWithToken}; +use satrs_core::pus::TcAddrWithToken; use satrs_core::spacepackets::ecss::tc::PusTcReader; use satrs_core::spacepackets::ecss::PusPacket; -use satrs_core::spacepackets::SpHeader; use satrs_core::tmtc::tm_helper::SharedTmStore; -use satrs_core::tmtc::{CcsdsDistributor, CcsdsError, ReceivesCcsdsTc}; +use satrs_core::tmtc::ReceivesCcsdsTc; pub struct TmArgs { pub tm_store: SharedTmStore, pub tm_sink_sender: Sender, - pub tm_server_rx: Receiver, + pub tm_udp_server_rx: Receiver, } pub struct TcArgs { @@ -64,12 +59,6 @@ pub struct TmFunnel { pub tm_server_tx: Sender, } -pub struct UdpTmtcServer { - udp_tc_server: UdpTcServer>, - tm_rx: Receiver, - tm_store: SharedPool, -} - #[derive(Clone)] pub struct PusTcSource { pub tc_source: Sender, @@ -98,131 +87,60 @@ impl ReceivesCcsdsTc for PusTcSource { } } -pub fn core_tmtc_task( - socket_addr: SocketAddr, - mut tc_args: TcArgs, - tm_args: TmArgs, - verif_reporter: StdVerifReporterWithSender, - pus_router: PusTcMpscRouter, -) { - let mut pus_receiver = PusReceiver::new(verif_reporter, pus_router); - - let ccsds_receiver = CcsdsReceiver { - tc_source: tc_args.tc_source.clone(), - }; - - let ccsds_distributor = CcsdsDistributor::new(Box::new(ccsds_receiver)); - - let udp_tc_server = UdpTcServer::new(socket_addr, 2048, Box::new(ccsds_distributor)) - .expect("creating UDP TMTC server failed"); - - let mut udp_tmtc_server = UdpTmtcServer { - udp_tc_server, - tm_rx: tm_args.tm_server_rx, - tm_store: tm_args.tm_store.clone_backing_pool(), - }; - - let mut tc_buf: [u8; 4096] = [0; 4096]; - loop { - core_tmtc_loop( - &mut udp_tmtc_server, - &mut tc_args, - &mut tc_buf, - &mut pus_receiver, - ); - thread::sleep(Duration::from_millis(400)); - } +pub struct TmtcTask { + tc_args: TcArgs, + tc_buf: [u8; 4096], + pus_receiver: PusReceiver, } -fn core_tmtc_loop( - udp_tmtc_server: &mut UdpTmtcServer, - tc_args: &mut TcArgs, - tc_buf: &mut [u8], - pus_receiver: &mut PusReceiver, -) { - while poll_tc_server(udp_tmtc_server) {} - match tc_args.tc_receiver.try_recv() { - Ok(addr) => { - let pool = tc_args - .tc_source - .tc_store - .pool - .read() - .expect("locking tc pool failed"); - let data = pool.read(&addr).expect("reading pool failed"); - tc_buf[0..data.len()].copy_from_slice(data); - drop(pool); - match PusTcReader::new(tc_buf) { - Ok((pus_tc, _)) => { - pus_receiver - .handle_tc_packet(addr, pus_tc.service(), &pus_tc) - .ok(); - } - Err(e) => { - warn!("error creating PUS TC from raw data: {e}"); - warn!("raw data: {tc_buf:x?}"); - } - } - } - Err(e) => { - if let TryRecvError::Disconnected = e { - warn!("tmtc thread: sender disconnected") - } +impl TmtcTask { + pub fn new(tc_args: TcArgs, pus_receiver: PusReceiver) -> Self { + Self { + tc_args, + tc_buf: [0; 4096], + pus_receiver, } } - if let Some(recv_addr) = udp_tmtc_server.udp_tc_server.last_sender() { - core_tm_handling(udp_tmtc_server, &recv_addr); - } -} -fn poll_tc_server(udp_tmtc_server: &mut UdpTmtcServer) -> bool { - match udp_tmtc_server.udp_tc_server.try_recv_tc() { - Ok(_) => true, - Err(e) => match e { - ReceiveResult::ReceiverError(e) => match e { - CcsdsError::ByteConversionError(e) => { - warn!("packet error: {e:?}"); - true + pub fn periodic_operation(&mut self) { + //while self.poll_tc() {} + self.poll_tc(); + } + + pub fn poll_tc(&mut self) -> bool { + match self.tc_args.tc_receiver.try_recv() { + Ok(addr) => { + let pool = self + .tc_args + .tc_source + .tc_store + .pool + .read() + .expect("locking tc pool failed"); + let data = pool.read(&addr).expect("reading pool failed"); + self.tc_buf[0..data.len()].copy_from_slice(data); + drop(pool); + match PusTcReader::new(&self.tc_buf) { + Ok((pus_tc, _)) => { + self.pus_receiver + .handle_tc_packet(addr, pus_tc.service(), &pus_tc) + .ok(); + true + } + Err(e) => { + warn!("error creating PUS TC from raw data: {e}"); + warn!("raw data: {:x?}", self.tc_buf); + true + } } - CcsdsError::CustomError(e) => { - warn!("mpsc store and send error {e:?}"); - true + } + Err(e) => match e { + TryRecvError::Empty => false, + TryRecvError::Disconnected => { + warn!("tmtc thread: sender disconnected"); + false } }, - ReceiveResult::IoError(e) => { - warn!("IO error {e}"); - false - } - ReceiveResult::NothingReceived => false, - }, - } -} - -fn core_tm_handling(udp_tmtc_server: &mut UdpTmtcServer, recv_addr: &SocketAddr) { - while let Ok(addr) = udp_tmtc_server.tm_rx.try_recv() { - let store_lock = udp_tmtc_server.tm_store.write(); - if store_lock.is_err() { - warn!("Locking TM store failed"); - continue; - } - let mut store_lock = store_lock.unwrap(); - let pg = store_lock.read_with_guard(addr); - let read_res = pg.read(); - if read_res.is_err() { - warn!("Error reading TM pool data"); - continue; - } - let buf = read_res.unwrap(); - if buf.len() > 9 { - let service = buf[7]; - let subservice = buf[8]; - info!("Sending PUS TM[{service},{subservice}]") - } else { - info!("Sending PUS TM"); - } - let result = udp_tmtc_server.udp_tc_server.socket.send_to(buf, recv_addr); - if let Err(e) = result { - warn!("Sending TM with UDP socket failed: {e}") } } } diff --git a/satrs-example/src/udp.rs b/satrs-example/src/udp.rs new file mode 100644 index 0000000..e3ca9f6 --- /dev/null +++ b/satrs-example/src/udp.rs @@ -0,0 +1,76 @@ +use std::{net::SocketAddr, sync::mpsc::Receiver}; + +use log::{info, warn}; +use satrs_core::{ + hal::std::udp_server::{ReceiveResult, UdpTcServer}, + pool::{SharedPool, StoreAddr}, + tmtc::CcsdsError, +}; + +use crate::tmtc::MpscStoreAndSendError; + +pub struct UdpTmtcServer { + pub udp_tc_server: UdpTcServer>, + pub tm_rx: Receiver, + pub tm_store: SharedPool, +} +impl UdpTmtcServer { + pub fn periodic_operation(&mut self) { + while self.poll_tc_server() {} + if let Some(recv_addr) = self.udp_tc_server.last_sender() { + self.send_tm_to_udp_client(&recv_addr); + } + } + + fn poll_tc_server(&mut self) -> bool { + match self.udp_tc_server.try_recv_tc() { + Ok(_) => true, + Err(e) => match e { + ReceiveResult::ReceiverError(e) => match e { + CcsdsError::ByteConversionError(e) => { + warn!("packet error: {e:?}"); + true + } + CcsdsError::CustomError(e) => { + warn!("mpsc store and send error {e:?}"); + true + } + }, + ReceiveResult::IoError(e) => { + warn!("IO error {e}"); + false + } + ReceiveResult::NothingReceived => false, + }, + } + } + + fn send_tm_to_udp_client(&mut self, recv_addr: &SocketAddr) { + while let Ok(addr) = self.tm_rx.try_recv() { + let store_lock = self.tm_store.write(); + if store_lock.is_err() { + warn!("Locking TM store failed"); + continue; + } + let mut store_lock = store_lock.unwrap(); + let pg = store_lock.read_with_guard(addr); + let read_res = pg.read(); + if read_res.is_err() { + warn!("Error reading TM pool data"); + continue; + } + let buf = read_res.unwrap(); + if buf.len() > 9 { + let service = buf[7]; + let subservice = buf[8]; + info!("Sending PUS TM[{service},{subservice}]") + } else { + info!("Sending PUS TM"); + } + let result = self.udp_tc_server.socket.send_to(buf, recv_addr); + if let Err(e) = result { + warn!("Sending TM with UDP socket failed: {e}") + } + } + } +} diff --git a/satrs-mib/Cargo.toml b/satrs-mib/Cargo.toml index cdfa4f5..e901ebe 100644 --- a/satrs-mib/Cargo.toml +++ b/satrs-mib/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "satrs-mib" -version = "0.1.0-alpha.0" +version = "0.1.0-alpha.1" edition = "2021" rust-version = "1.61" authors = ["Robin Mueller "] @@ -23,13 +23,14 @@ version = "1" optional = true [dependencies.satrs-core] -# version = "0.1.0-alpha.0" -git = "https://egit.irs.uni-stuttgart.de/rust/sat-rs.git" -rev = "35e1f7a983f6535c5571186e361fe101d4306b89" +version = "0.1.0-alpha.1" +# git = "https://egit.irs.uni-stuttgart.de/rust/sat-rs.git" +# branch = "main" +# rev = "35e1f7a983f6535c5571186e361fe101d4306b89" [dependencies.satrs-mib-codegen] path = "codegen" -version = "0.1.0-alpha.0" +version = "0.1.0-alpha.1" [dependencies.serde] version = "1" diff --git a/satrs-mib/codegen/Cargo.toml b/satrs-mib/codegen/Cargo.toml index db6a671..9a3887c 100644 --- a/satrs-mib/codegen/Cargo.toml +++ b/satrs-mib/codegen/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "satrs-mib-codegen" -version = "0.1.0-alpha.0" +version = "0.1.0-alpha.1" edition = "2021" description = "satrs-mib proc macro implementation" homepage = "https://egit.irs.uni-stuttgart.de/rust/sat-rs" @@ -20,9 +20,10 @@ quote = "1" proc-macro2 = "1" [dependencies.satrs-core] -# version = "0.1.0-alpha.0" -git = "https://egit.irs.uni-stuttgart.de/rust/sat-rs.git" -rev = "35e1f7a983f6535c5571186e361fe101d4306b89" +version = "0.1.0-alpha.1" +# git = "https://egit.irs.uni-stuttgart.de/rust/sat-rs.git" +# branch = "main" +# rev = "35e1f7a983f6535c5571186e361fe101d4306b89" [dev-dependencies] trybuild = { version = "1", features = ["diff"] }