diff --git a/Cargo.toml b/Cargo.toml index c9d35a8..eaeb356 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace] - +resolver = "2" members = [ "satrs-core", "satrs-mib", @@ -9,3 +9,4 @@ members = [ exclude = [ "satrs-example-stm32f3-disco", ] + diff --git a/automation/Jenkinsfile b/automation/Jenkinsfile index 44946f7..0770614 100644 --- a/automation/Jenkinsfile +++ b/automation/Jenkinsfile @@ -8,6 +8,11 @@ pipeline { } stages { + stage('Rust Toolchain Info') { + steps { + sh 'rustc --version' + } + } stage('Clippy') { steps { sh 'cargo clippy' @@ -15,7 +20,9 @@ pipeline { } stage('Docs') { steps { - sh 'cargo +nightly doc --all-features' + catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') { + sh 'cargo +nightly doc --all-features' + } } } stage('Rustfmt') { diff --git a/satrs-core/Cargo.toml b/satrs-core/Cargo.toml index 2d25971..5eb0421 100644 --- a/satrs-core/Cargo.toml +++ b/satrs-core/Cargo.toml @@ -60,14 +60,24 @@ version = "1" default-features = false optional = true +[dependencies.socket2] +version = "0.5.4" +features = ["all"] +optional = true + [dependencies.spacepackets] -version = "0.7.0-beta.1" +# version = "0.7.0-beta.1" # path = "../../spacepackets" -# git = "https://egit.irs.uni-stuttgart.de/rust/spacepackets.git" -# rev = "" +git = "https://egit.irs.uni-stuttgart.de/rust/spacepackets.git" +rev = "79d26e1a6" # branch = "" default-features = false +[dependencies.cobs] +git = "https://github.com/robamu/cobs.rs.git" +branch = "all_features" +default-features = false + [dev-dependencies] serde = "1" zerocopy = "0.7" @@ -80,22 +90,23 @@ version = "1" [features] default = ["std"] std = [ - "downcast-rs/std", - "alloc", - "bus", - "postcard/use-std", - "crossbeam-channel/std", - "serde/std", - "spacepackets/std", - "num_enum/std", - "thiserror" + "downcast-rs/std", + "alloc", + "bus", + "postcard/use-std", + "crossbeam-channel/std", + "serde/std", + "spacepackets/std", + "num_enum/std", + "thiserror", + "socket2" ] alloc = [ - "serde/alloc", - "spacepackets/alloc", - "hashbrown", - "dyn-clone", - "downcast-rs" + "serde/alloc", + "spacepackets/alloc", + "hashbrown", + "dyn-clone", + "downcast-rs" ] serde = ["dep:serde", "spacepackets/serde"] crossbeam = ["crossbeam-channel"] diff --git a/satrs-core/src/encoding/ccsds.rs b/satrs-core/src/encoding/ccsds.rs new file mode 100644 index 0000000..f8f775f --- /dev/null +++ b/satrs-core/src/encoding/ccsds.rs @@ -0,0 +1,269 @@ +#[cfg(feature = "alloc")] +use alloc::vec::Vec; +#[cfg(feature = "alloc")] +use hashbrown::HashSet; +use spacepackets::PacketId; + +use crate::tmtc::ReceivesTcCore; + +pub trait PacketIdLookup { + fn validate(&self, packet_id: u16) -> bool; +} + +#[cfg(feature = "alloc")] +impl PacketIdLookup for Vec { + fn validate(&self, packet_id: u16) -> bool { + self.contains(&packet_id) + } +} + +#[cfg(feature = "alloc")] +impl PacketIdLookup for HashSet { + fn validate(&self, packet_id: u16) -> bool { + self.contains(&packet_id) + } +} + +impl PacketIdLookup for [u16] { + fn validate(&self, packet_id: u16) -> bool { + self.binary_search(&packet_id).is_ok() + } +} + +#[cfg(feature = "alloc")] +impl PacketIdLookup for Vec { + fn validate(&self, packet_id: u16) -> bool { + self.contains(&PacketId::from(packet_id)) + } +} +#[cfg(feature = "alloc")] +impl PacketIdLookup for HashSet { + fn validate(&self, packet_id: u16) -> bool { + self.contains(&PacketId::from(packet_id)) + } +} + +impl PacketIdLookup for [PacketId] { + fn validate(&self, packet_id: u16) -> bool { + self.binary_search(&PacketId::from(packet_id)).is_ok() + } +} + +/// This function parses a given buffer for tightly packed CCSDS space packets. It uses the +/// [PacketId] field of the CCSDS packets to detect the start of a CCSDS space packet and then +/// uses the length field of the packet to extract CCSDS packets. +/// +/// This function is also able to deal with broken tail packets at the end as long a the parser +/// can read the full 7 bytes which constitue a space packet header plus one byte minimal size. +/// If broken tail packets are detected, they are moved to the front of the buffer, and the write +/// index for future write operations will be written to the `next_write_idx` argument. +/// +/// The parser will write all packets which were decoded successfully to the given `tc_receiver` +/// and return the number of packets found. If the [ReceivesTcCore::pass_tc] calls fails, the +/// error will be returned. +pub fn parse_buffer_for_ccsds_space_packets( + buf: &mut [u8], + packet_id_lookup: &(impl PacketIdLookup + ?Sized), + tc_receiver: &mut impl ReceivesTcCore, + next_write_idx: &mut usize, +) -> Result { + *next_write_idx = 0; + let mut packets_found = 0; + let mut current_idx = 0; + let buf_len = buf.len(); + loop { + if current_idx + 7 >= buf.len() { + break; + } + let packet_id = u16::from_be_bytes(buf[current_idx..current_idx + 2].try_into().unwrap()); + if packet_id_lookup.validate(packet_id) { + let length_field = + u16::from_be_bytes(buf[current_idx + 4..current_idx + 6].try_into().unwrap()); + let packet_size = length_field + 7; + if (current_idx + packet_size as usize) < buf_len { + tc_receiver.pass_tc(&buf[current_idx..current_idx + packet_size as usize])?; + packets_found += 1; + } else { + // Move packet to start of buffer if applicable. + if current_idx > 0 { + buf.copy_within(current_idx.., 0); + *next_write_idx = buf.len() - current_idx; + } + } + current_idx += packet_size as usize; + continue; + } + current_idx += 1; + } + Ok(packets_found) +} + +#[cfg(test)] +mod tests { + use spacepackets::{ + ecss::{tc::PusTcCreator, SerializablePusPacket}, + PacketId, SpHeader, + }; + + use crate::encoding::tests::TcCacher; + + use super::parse_buffer_for_ccsds_space_packets; + + const TEST_APID_0: u16 = 0x02; + const TEST_APID_1: u16 = 0x10; + const TEST_PACKET_ID_0: PacketId = PacketId::const_tc(true, TEST_APID_0); + const TEST_PACKET_ID_1: PacketId = PacketId::const_tc(true, TEST_APID_1); + + #[test] + fn test_basic() { + let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap(); + let ping_tc = PusTcCreator::new_simple(&mut sph, 17, 1, None, true); + let mut buffer: [u8; 32] = [0; 32]; + let packet_len = ping_tc + .write_to_bytes(&mut buffer) + .expect("writing packet failed"); + let valid_packet_ids = [TEST_PACKET_ID_0]; + let mut tc_cacher = TcCacher::default(); + let mut next_write_idx = 0; + let parse_result = parse_buffer_for_ccsds_space_packets( + &mut buffer, + valid_packet_ids.as_slice(), + &mut tc_cacher, + &mut next_write_idx, + ); + assert!(parse_result.is_ok()); + let parsed_packets = parse_result.unwrap(); + assert_eq!(parsed_packets, 1); + assert_eq!(tc_cacher.tc_queue.len(), 1); + assert_eq!( + tc_cacher.tc_queue.pop_front().unwrap(), + buffer[..packet_len] + ); + } + + #[test] + fn test_multi_packet() { + let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap(); + let ping_tc = PusTcCreator::new_simple(&mut sph, 17, 1, None, true); + let action_tc = PusTcCreator::new_simple(&mut sph, 8, 0, None, true); + let mut buffer: [u8; 32] = [0; 32]; + let packet_len_ping = ping_tc + .write_to_bytes(&mut buffer) + .expect("writing packet failed"); + let packet_len_action = action_tc + .write_to_bytes(&mut buffer[packet_len_ping..]) + .expect("writing packet failed"); + let valid_packet_ids = [TEST_PACKET_ID_0]; + let mut tc_cacher = TcCacher::default(); + let mut next_write_idx = 0; + let parse_result = parse_buffer_for_ccsds_space_packets( + &mut buffer, + valid_packet_ids.as_slice(), + &mut tc_cacher, + &mut next_write_idx, + ); + assert!(parse_result.is_ok()); + let parsed_packets = parse_result.unwrap(); + assert_eq!(parsed_packets, 2); + assert_eq!(tc_cacher.tc_queue.len(), 2); + assert_eq!( + tc_cacher.tc_queue.pop_front().unwrap(), + buffer[..packet_len_ping] + ); + assert_eq!( + tc_cacher.tc_queue.pop_front().unwrap(), + buffer[packet_len_ping..packet_len_ping + packet_len_action] + ); + } + + #[test] + fn test_multi_apid() { + let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap(); + let ping_tc = PusTcCreator::new_simple(&mut sph, 17, 1, None, true); + sph = SpHeader::tc_unseg(TEST_APID_1, 0, 0).unwrap(); + let action_tc = PusTcCreator::new_simple(&mut sph, 8, 0, None, true); + let mut buffer: [u8; 32] = [0; 32]; + let packet_len_ping = ping_tc + .write_to_bytes(&mut buffer) + .expect("writing packet failed"); + let packet_len_action = action_tc + .write_to_bytes(&mut buffer[packet_len_ping..]) + .expect("writing packet failed"); + let valid_packet_ids = [TEST_PACKET_ID_0, TEST_PACKET_ID_1]; + let mut tc_cacher = TcCacher::default(); + let mut next_write_idx = 0; + let parse_result = parse_buffer_for_ccsds_space_packets( + &mut buffer, + valid_packet_ids.as_slice(), + &mut tc_cacher, + &mut next_write_idx, + ); + assert!(parse_result.is_ok()); + let parsed_packets = parse_result.unwrap(); + assert_eq!(parsed_packets, 2); + assert_eq!(tc_cacher.tc_queue.len(), 2); + assert_eq!( + tc_cacher.tc_queue.pop_front().unwrap(), + buffer[..packet_len_ping] + ); + assert_eq!( + tc_cacher.tc_queue.pop_front().unwrap(), + buffer[packet_len_ping..packet_len_ping + packet_len_action] + ); + } + + #[test] + fn test_split_packet_multi() { + let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap(); + let ping_tc = PusTcCreator::new_simple(&mut sph, 17, 1, None, true); + sph = SpHeader::tc_unseg(TEST_APID_1, 0, 0).unwrap(); + let action_tc = PusTcCreator::new_simple(&mut sph, 8, 0, None, true); + let mut buffer: [u8; 32] = [0; 32]; + let packet_len_ping = ping_tc + .write_to_bytes(&mut buffer) + .expect("writing packet failed"); + let packet_len_action = action_tc + .write_to_bytes(&mut buffer[packet_len_ping..]) + .expect("writing packet failed"); + let valid_packet_ids = [TEST_PACKET_ID_0, TEST_PACKET_ID_1]; + let mut tc_cacher = TcCacher::default(); + let mut next_write_idx = 0; + let parse_result = parse_buffer_for_ccsds_space_packets( + &mut buffer[..packet_len_ping + packet_len_action - 4], + valid_packet_ids.as_slice(), + &mut tc_cacher, + &mut next_write_idx, + ); + assert!(parse_result.is_ok()); + let parsed_packets = parse_result.unwrap(); + assert_eq!(parsed_packets, 1); + assert_eq!(tc_cacher.tc_queue.len(), 1); + // The broken packet was moved to the start, so the next write index should be after the + // last segment missing 4 bytes. + assert_eq!(next_write_idx, packet_len_action - 4); + } + + #[test] + fn test_one_split_packet() { + let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap(); + let ping_tc = PusTcCreator::new_simple(&mut sph, 17, 1, None, true); + let mut buffer: [u8; 32] = [0; 32]; + let packet_len_ping = ping_tc + .write_to_bytes(&mut buffer) + .expect("writing packet failed"); + let valid_packet_ids = [TEST_PACKET_ID_0, TEST_PACKET_ID_1]; + let mut tc_cacher = TcCacher::default(); + let mut next_write_idx = 0; + let parse_result = parse_buffer_for_ccsds_space_packets( + &mut buffer[..packet_len_ping - 4], + valid_packet_ids.as_slice(), + &mut tc_cacher, + &mut next_write_idx, + ); + assert_eq!(next_write_idx, 0); + assert!(parse_result.is_ok()); + let parsed_packets = parse_result.unwrap(); + assert_eq!(parsed_packets, 0); + assert_eq!(tc_cacher.tc_queue.len(), 0); + } +} diff --git a/satrs-core/src/encoding/cobs.rs b/satrs-core/src/encoding/cobs.rs new file mode 100644 index 0000000..2645745 --- /dev/null +++ b/satrs-core/src/encoding/cobs.rs @@ -0,0 +1,263 @@ +use crate::tmtc::ReceivesTcCore; +use cobs::{decode_in_place, encode, max_encoding_length}; + +/// This function encodes the given packet with COBS and also wraps the encoded packet with +/// the sentinel value 0. It can be used repeatedly on the same encoded buffer by expecting +/// and incrementing the mutable reference of the current packet index. This is also used +/// to retrieve the total encoded size. +/// +/// This function will return [false] if the given encoding buffer is not large enough to hold +/// the encoded buffer and the two sentinel bytes and [true] if the encoding was successfull. +/// +/// ## Example +/// +/// ``` +/// use cobs::decode_in_place_report; +/// use satrs_core::encoding::{encode_packet_with_cobs}; +// +/// const SIMPLE_PACKET: [u8; 5] = [1, 2, 3, 4, 5]; +/// const INVERTED_PACKET: [u8; 5] = [5, 4, 3, 2, 1]; +/// +/// let mut encoding_buf: [u8; 32] = [0; 32]; +/// let mut current_idx = 0; +/// assert!(encode_packet_with_cobs(&SIMPLE_PACKET, &mut encoding_buf, &mut current_idx)); +/// assert!(encode_packet_with_cobs(&INVERTED_PACKET, &mut encoding_buf, &mut current_idx)); +/// assert_eq!(encoding_buf[0], 0); +/// let dec_report = decode_in_place_report(&mut encoding_buf[1..]).expect("decoding failed"); +/// assert_eq!(encoding_buf[1 + dec_report.src_used], 0); +/// assert_eq!(dec_report.dst_used, 5); +/// assert_eq!(current_idx, 16); +/// ``` +pub fn encode_packet_with_cobs( + packet: &[u8], + encoded_buf: &mut [u8], + current_idx: &mut usize, +) -> bool { + let max_encoding_len = max_encoding_length(packet.len()); + if *current_idx + max_encoding_len + 2 > encoded_buf.len() { + return false; + } + encoded_buf[*current_idx] = 0; + *current_idx += 1; + *current_idx += encode(packet, &mut encoded_buf[*current_idx..]); + encoded_buf[*current_idx] = 0; + *current_idx += 1; + true +} + +/// This function parses a given buffer for COBS encoded packets. The packet structure is +/// expected to be like this, assuming a sentinel value of 0 as the packet delimiter: +/// +/// 0 | ... Encoded Packet Data ... | 0 | 0 | ... Encoded Packet Data ... | 0 +/// +/// This function is also able to deal with broken tail packets at the end. If broken tail +/// packets are detected, they are moved to the front of the buffer, and the write index for +/// future write operations will be written to the `next_write_idx` argument. +/// +/// The parser will write all packets which were decoded successfully to the given `tc_receiver`. +pub fn parse_buffer_for_cobs_encoded_packets( + buf: &mut [u8], + tc_receiver: &mut dyn ReceivesTcCore, + next_write_idx: &mut usize, +) -> Result { + let mut start_index_packet = 0; + let mut start_found = false; + let mut last_byte = false; + let mut packets_found = 0; + for i in 0..buf.len() { + if i == buf.len() - 1 { + last_byte = true; + } + if buf[i] == 0 { + if !start_found && !last_byte && buf[i + 1] == 0 { + // Special case: Consecutive sentinel values or all zeroes. + // Skip. + continue; + } + if start_found { + let decode_result = decode_in_place(&mut buf[start_index_packet..i]); + if let Ok(packet_len) = decode_result { + packets_found += 1; + tc_receiver + .pass_tc(&buf[start_index_packet..start_index_packet + packet_len])?; + } + start_found = false; + } else { + start_index_packet = i + 1; + start_found = true; + } + } + } + // Move split frame at the end to the front of the buffer. + if start_index_packet > 0 && start_found && packets_found > 0 { + buf.copy_within(start_index_packet - 1.., 0); + *next_write_idx = buf.len() - start_index_packet + 1; + } + Ok(packets_found) +} + +#[cfg(test)] +pub(crate) mod tests { + use cobs::encode; + + use crate::encoding::tests::{encode_simple_packet, TcCacher, INVERTED_PACKET, SIMPLE_PACKET}; + + use super::parse_buffer_for_cobs_encoded_packets; + + #[test] + fn test_parsing_simple_packet() { + let mut test_sender = TcCacher::default(); + let mut encoded_buf: [u8; 16] = [0; 16]; + let mut current_idx = 0; + encode_simple_packet(&mut encoded_buf, &mut current_idx); + let mut next_read_idx = 0; + let packets = parse_buffer_for_cobs_encoded_packets( + &mut encoded_buf[0..current_idx], + &mut test_sender, + &mut next_read_idx, + ) + .unwrap(); + assert_eq!(packets, 1); + assert_eq!(test_sender.tc_queue.len(), 1); + let packet = &test_sender.tc_queue[0]; + assert_eq!(packet, &SIMPLE_PACKET); + } + + #[test] + fn test_parsing_consecutive_packets() { + let mut test_sender = TcCacher::default(); + let mut encoded_buf: [u8; 16] = [0; 16]; + let mut current_idx = 0; + encode_simple_packet(&mut encoded_buf, &mut current_idx); + + // Second packet + encoded_buf[current_idx] = 0; + current_idx += 1; + current_idx += encode(&INVERTED_PACKET, &mut encoded_buf[current_idx..]); + encoded_buf[current_idx] = 0; + current_idx += 1; + let mut next_read_idx = 0; + let packets = parse_buffer_for_cobs_encoded_packets( + &mut encoded_buf[0..current_idx], + &mut test_sender, + &mut next_read_idx, + ) + .unwrap(); + assert_eq!(packets, 2); + assert_eq!(test_sender.tc_queue.len(), 2); + let packet0 = &test_sender.tc_queue[0]; + assert_eq!(packet0, &SIMPLE_PACKET); + let packet1 = &test_sender.tc_queue[1]; + assert_eq!(packet1, &INVERTED_PACKET); + } + + #[test] + fn test_split_tail_packet_only() { + let mut test_sender = TcCacher::default(); + let mut encoded_buf: [u8; 16] = [0; 16]; + let mut current_idx = 0; + encode_simple_packet(&mut encoded_buf, &mut current_idx); + let mut next_read_idx = 0; + let packets = parse_buffer_for_cobs_encoded_packets( + // Cut off the sentinel byte at the end. + &mut encoded_buf[0..current_idx - 1], + &mut test_sender, + &mut next_read_idx, + ) + .unwrap(); + assert_eq!(packets, 0); + assert_eq!(test_sender.tc_queue.len(), 0); + assert_eq!(next_read_idx, 0); + } + + fn generic_test_split_packet(cut_off: usize) { + let mut test_sender = TcCacher::default(); + let mut encoded_buf: [u8; 16] = [0; 16]; + assert!(cut_off < INVERTED_PACKET.len() + 1); + let mut current_idx = 0; + encode_simple_packet(&mut encoded_buf, &mut current_idx); + // Second packet + encoded_buf[current_idx] = 0; + let packet_start = current_idx; + current_idx += 1; + let encoded_len = encode(&INVERTED_PACKET, &mut encoded_buf[current_idx..]); + assert_eq!(encoded_len, 6); + current_idx += encoded_len; + // We cut off the sentinel byte, so we expecte the write index to be the length of the + // packet minus the sentinel byte plus the first sentinel byte. + let next_expected_write_idx = 1 + encoded_len - cut_off + 1; + encoded_buf[current_idx] = 0; + current_idx += 1; + let mut next_write_idx = 0; + let expected_at_start = encoded_buf[packet_start..current_idx - cut_off].to_vec(); + let packets = parse_buffer_for_cobs_encoded_packets( + // Cut off the sentinel byte at the end. + &mut encoded_buf[0..current_idx - cut_off], + &mut test_sender, + &mut next_write_idx, + ) + .unwrap(); + assert_eq!(packets, 1); + assert_eq!(test_sender.tc_queue.len(), 1); + assert_eq!(&test_sender.tc_queue[0], &SIMPLE_PACKET); + assert_eq!(next_write_idx, next_expected_write_idx); + assert_eq!(encoded_buf[..next_expected_write_idx], expected_at_start); + } + + #[test] + fn test_one_packet_and_split_tail_packet_0() { + generic_test_split_packet(1); + } + + #[test] + fn test_one_packet_and_split_tail_packet_1() { + generic_test_split_packet(2); + } + + #[test] + fn test_one_packet_and_split_tail_packet_2() { + generic_test_split_packet(3); + } + + #[test] + fn test_zero_at_end() { + let mut test_sender = TcCacher::default(); + let mut encoded_buf: [u8; 16] = [0; 16]; + let mut next_write_idx = 0; + let mut current_idx = 0; + encoded_buf[current_idx] = 5; + current_idx += 1; + encode_simple_packet(&mut encoded_buf, &mut current_idx); + encoded_buf[current_idx] = 0; + current_idx += 1; + let packets = parse_buffer_for_cobs_encoded_packets( + // Cut off the sentinel byte at the end. + &mut encoded_buf[0..current_idx], + &mut test_sender, + &mut next_write_idx, + ) + .unwrap(); + assert_eq!(packets, 1); + assert_eq!(test_sender.tc_queue.len(), 1); + assert_eq!(&test_sender.tc_queue[0], &SIMPLE_PACKET); + assert_eq!(next_write_idx, 1); + assert_eq!(encoded_buf[0], 0); + } + + #[test] + fn test_all_zeroes() { + let mut test_sender = TcCacher::default(); + let mut all_zeroes: [u8; 5] = [0; 5]; + let mut next_write_idx = 0; + let packets = parse_buffer_for_cobs_encoded_packets( + // Cut off the sentinel byte at the end. + &mut all_zeroes, + &mut test_sender, + &mut next_write_idx, + ) + .unwrap(); + assert_eq!(packets, 0); + assert!(test_sender.tc_queue.is_empty()); + assert_eq!(next_write_idx, 0); + } +} diff --git a/satrs-core/src/encoding/mod.rs b/satrs-core/src/encoding/mod.rs new file mode 100644 index 0000000..94e3dee --- /dev/null +++ b/satrs-core/src/encoding/mod.rs @@ -0,0 +1,40 @@ +pub mod ccsds; +pub mod cobs; + +pub use crate::encoding::ccsds::parse_buffer_for_ccsds_space_packets; +pub use crate::encoding::cobs::{encode_packet_with_cobs, parse_buffer_for_cobs_encoded_packets}; + +#[cfg(test)] +pub(crate) mod tests { + use alloc::{collections::VecDeque, vec::Vec}; + + use crate::tmtc::ReceivesTcCore; + + use super::cobs::encode_packet_with_cobs; + + pub(crate) const SIMPLE_PACKET: [u8; 5] = [1, 2, 3, 4, 5]; + pub(crate) const INVERTED_PACKET: [u8; 5] = [5, 4, 3, 2, 1]; + + #[derive(Default)] + pub(crate) struct TcCacher { + pub(crate) tc_queue: VecDeque>, + } + + impl ReceivesTcCore for TcCacher { + type Error = (); + + fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> { + self.tc_queue.push_back(tc_raw.to_vec()); + Ok(()) + } + } + + pub(crate) fn encode_simple_packet(encoded_buf: &mut [u8], current_idx: &mut usize) { + encode_packet_with_cobs(&SIMPLE_PACKET, encoded_buf, current_idx); + } + + #[allow(dead_code)] + pub(crate) fn encode_inverted_packet(encoded_buf: &mut [u8], current_idx: &mut usize) { + encode_packet_with_cobs(&INVERTED_PACKET, encoded_buf, current_idx); + } +} diff --git a/satrs-core/src/hal/host/mod.rs b/satrs-core/src/hal/host/mod.rs deleted file mode 100644 index 8057db1..0000000 --- a/satrs-core/src/hal/host/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -//! Helper modules intended to be used on hosts with a full [std] runtime -pub mod udp_server; diff --git a/satrs-core/src/hal/mod.rs b/satrs-core/src/hal/mod.rs index c422a72..b6ab984 100644 --- a/satrs-core/src/hal/mod.rs +++ b/satrs-core/src/hal/mod.rs @@ -1,4 +1,4 @@ //! # Hardware Abstraction Layer module #[cfg(feature = "std")] #[cfg_attr(doc_cfg, doc(cfg(feature = "std")))] -pub mod host; +pub mod std; diff --git a/satrs-core/src/hal/std/mod.rs b/satrs-core/src/hal/std/mod.rs new file mode 100644 index 0000000..17ec012 --- /dev/null +++ b/satrs-core/src/hal/std/mod.rs @@ -0,0 +1,6 @@ +//! Helper modules intended to be used on systems with a full [std] runtime. +pub mod tcp_server; +pub mod udp_server; + +mod tcp_spacepackets_server; +mod tcp_with_cobs_server; diff --git a/satrs-core/src/hal/std/tcp_server.rs b/satrs-core/src/hal/std/tcp_server.rs new file mode 100644 index 0000000..dcccf66 --- /dev/null +++ b/satrs-core/src/hal/std/tcp_server.rs @@ -0,0 +1,319 @@ +//! Generic TCP TMTC servers with different TMTC format flavours. +use alloc::vec; +use alloc::{boxed::Box, vec::Vec}; +use core::time::Duration; +use socket2::{Domain, Socket, Type}; +use std::io::Read; +use std::net::TcpListener; +use std::net::{SocketAddr, TcpStream}; +use std::thread; + +use crate::tmtc::{ReceivesTc, TmPacketSource}; +use thiserror::Error; + +// Re-export the TMTC in COBS server. +pub use crate::hal::std::tcp_with_cobs_server::{CobsTcParser, CobsTmSender, TcpTmtcInCobsServer}; + +/// Configuration struct for the generic TCP TMTC server +/// +/// ## Parameters +/// +/// * `addr` - Address of the TCP server. +/// * `inner_loop_delay` - If a client connects for a longer period, but no TC is received or +/// no TM needs to be sent, the TCP server will delay for the specified amount of time +/// to reduce CPU load. +/// * `tm_buffer_size` - Size of the TM buffer used to read TM from the [TmPacketSource] and +/// encoding of that data. This buffer should at large enough to hold the maximum expected +/// TM size read from the packet source. +/// * `tc_buffer_size` - Size of the TC buffer used to read encoded telecommands sent from +/// the client. It is recommended to make this buffer larger to allow reading multiple +/// consecutive packets as well, for example by using common buffer sizes like 4096 or 8192 +/// byte. The buffer should at the very least be large enough to hold the maximum expected +/// telecommand size. +/// * `reuse_addr` - Can be used to set the `SO_REUSEADDR` option on the raw socket. This is +/// especially useful if the address and port are static for the server. Set to false by +/// default. +/// * `reuse_port` - Can be used to set the `SO_REUSEPORT` option on the raw socket. This is +/// especially useful if the address and port are static for the server. Set to false by +/// default. +#[derive(Debug, Copy, Clone)] +pub struct ServerConfig { + pub addr: SocketAddr, + pub inner_loop_delay: Duration, + pub tm_buffer_size: usize, + pub tc_buffer_size: usize, + pub reuse_addr: bool, + pub reuse_port: bool, +} + +impl ServerConfig { + pub fn new( + addr: SocketAddr, + inner_loop_delay: Duration, + tm_buffer_size: usize, + tc_buffer_size: usize, + ) -> Self { + Self { + addr, + inner_loop_delay, + tm_buffer_size, + tc_buffer_size, + reuse_addr: false, + reuse_port: false, + } + } +} + +#[derive(Error, Debug)] +pub enum TcpTmtcError { + #[error("TM retrieval error: {0}")] + TmError(TmError), + #[error("TC retrieval error: {0}")] + TcError(TcError), + #[error("io error: {0}")] + Io(#[from] std::io::Error), +} + +/// Result of one connection attempt. Contains the client address if a connection was established, +/// in addition to the number of telecommands and telemetry packets exchanged. +#[derive(Debug, Default)] +pub struct ConnectionResult { + pub addr: Option, + pub num_received_tcs: u32, + pub num_sent_tms: u32, +} + +/// Generic parser abstraction for an object which can parse for telecommands given a raw +/// bytestream received from a TCP socket and send them to a generic [ReceivesTc] telecommand +/// receiver. This allows different encoding schemes for telecommands. +pub trait TcpTcParser { + fn handle_tc_parsing( + &mut self, + tc_buffer: &mut [u8], + tc_receiver: &mut dyn ReceivesTc, + conn_result: &mut ConnectionResult, + current_write_idx: usize, + next_write_idx: &mut usize, + ) -> Result<(), TcpTmtcError>; +} + +/// Generic sender abstraction for an object which can pull telemetry from a given TM source +/// using a [TmPacketSource] and then send them back to a client using a given [TcpStream]. +/// The concrete implementation can also perform any encoding steps which are necessary before +/// sending back the data to a client. +pub trait TcpTmSender { + fn handle_tm_sending( + &mut self, + tm_buffer: &mut [u8], + tm_source: &mut dyn TmPacketSource, + conn_result: &mut ConnectionResult, + stream: &mut TcpStream, + ) -> Result>; +} + +/// TCP TMTC server implementation for exchange of generic TMTC packets in a generic way which +/// stays agnostic to the encoding scheme and format used for both telecommands and telemetry. +/// +/// This server implements a generic TMTC handling logic and allows modifying its behaviour +/// through the following 4 core abstractions: +/// +/// 1. [TcpTcParser] to parse for telecommands from the raw bytestream received from a client. +/// 2. Parsed telecommands will be sent to the [ReceivesTc] telecommand receiver. +/// 3. [TcpTmSender] to send telemetry pulled from a TM source back to the client. +/// 4. [TmPacketSource] as a generic TM source used by the [TcpTmSender]. +/// +/// It is possible to specify custom abstractions to build a dedicated TCP TMTC server without +/// having to re-implement common logic. +/// +/// Currently, this framework offers the following concrete implementations: +/// +/// 1. [TcpTmtcInCobsServer] to exchange TMTC wrapped inside the COBS framing protocol. +pub struct TcpTmtcGenericServer< + TmError, + TcError, + TmHandler: TcpTmSender, + TcHandler: TcpTcParser, +> { + base: TcpTmtcServerBase, + tc_handler: TcHandler, + tm_handler: TmHandler, +} + +impl< + TmError: 'static, + TcError: 'static, + TmSender: TcpTmSender, + TcParser: TcpTcParser, + > TcpTmtcGenericServer +{ + /// Create a new generic TMTC server instance. + /// + /// ## Parameter + /// + /// * `cfg` - Configuration of the server. + /// * `tc_parser` - Parser which extracts telecommands from the raw bytestream received from + /// the client. + /// * `tm_sender` - Sends back telemetry to the client using the specified TM source. + /// * `tm_source` - Generic TM source used by the server to pull telemetry packets which are + /// then sent back to the client. + /// * `tc_receiver` - Any received telecommand which was decoded successfully will be forwarded + /// to this TC receiver. + pub fn new( + cfg: ServerConfig, + tc_parser: TcParser, + tm_sender: TmSender, + tm_source: Box>, + tc_receiver: Box>, + ) -> Result, std::io::Error> { + Ok(Self { + base: TcpTmtcServerBase::new(cfg, tm_source, tc_receiver)?, + tc_handler: tc_parser, + tm_handler: tm_sender, + }) + } + + /// Retrieve the internal [TcpListener] class. + pub fn listener(&mut self) -> &mut TcpListener { + self.base.listener() + } + + /// Can be used to retrieve the local assigned address of the TCP server. This is especially + /// useful if using the port number 0 for OS auto-assignment. + pub fn local_addr(&self) -> std::io::Result { + self.base.local_addr() + } + + /// This call is used to handle the next connection to a client. Right now, it performs + /// the following steps: + /// + /// 1. It calls the [std::net::TcpListener::accept] method internally using the blocking API + /// until a client connects. + /// 2. It reads all the telecommands from the client and parses all received data using the + /// user specified [TcpTcParser]. + /// 3. After reading and parsing all telecommands, it sends back all telemetry using the + /// user specified [TcpTmSender]. + /// + /// The server will delay for a user-specified period if the client connects to the server + /// for prolonged periods and there is no traffic for the server. This is the case if the + /// client does not send any telecommands and no telemetry needs to be sent back to the client. + pub fn handle_next_connection( + &mut self, + ) -> Result> { + let mut connection_result = ConnectionResult::default(); + let mut current_write_idx; + let mut next_write_idx = 0; + let (mut stream, addr) = self.base.listener.accept()?; + stream.set_nonblocking(true)?; + connection_result.addr = Some(addr); + current_write_idx = next_write_idx; + loop { + let read_result = stream.read(&mut self.base.tc_buffer[current_write_idx..]); + match read_result { + Ok(0) => { + // Connection closed by client. If any TC was read, parse for complete packets. + // After that, break the outer loop. + if current_write_idx > 0 { + self.tc_handler.handle_tc_parsing( + &mut self.base.tc_buffer, + self.base.tc_receiver.as_mut(), + &mut connection_result, + current_write_idx, + &mut next_write_idx, + )?; + } + break; + } + Ok(read_len) => { + current_write_idx += read_len; + // TC buffer is full, we must parse for complete packets now. + if current_write_idx == self.base.tc_buffer.capacity() { + self.tc_handler.handle_tc_parsing( + &mut self.base.tc_buffer, + self.base.tc_receiver.as_mut(), + &mut connection_result, + current_write_idx, + &mut next_write_idx, + )?; + current_write_idx = next_write_idx; + } + } + Err(e) => match e.kind() { + // As per [TcpStream::set_read_timeout] documentation, this should work for + // both UNIX and Windows. + std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut => { + self.tc_handler.handle_tc_parsing( + &mut self.base.tc_buffer, + self.base.tc_receiver.as_mut(), + &mut connection_result, + current_write_idx, + &mut next_write_idx, + )?; + current_write_idx = next_write_idx; + + if !self.tm_handler.handle_tm_sending( + &mut self.base.tm_buffer, + self.base.tm_source.as_mut(), + &mut connection_result, + &mut stream, + )? { + // No TC read, no TM was sent, but the client has not disconnected. + // Perform an inner delay to avoid burning CPU time. + thread::sleep(self.base.inner_loop_delay); + } + } + _ => { + return Err(TcpTmtcError::Io(e)); + } + }, + } + } + self.tm_handler.handle_tm_sending( + &mut self.base.tm_buffer, + self.base.tm_source.as_mut(), + &mut connection_result, + &mut stream, + )?; + Ok(connection_result) + } +} + +pub(crate) struct TcpTmtcServerBase { + pub(crate) listener: TcpListener, + pub(crate) inner_loop_delay: Duration, + pub(crate) tm_source: Box>, + pub(crate) tm_buffer: Vec, + pub(crate) tc_receiver: Box>, + pub(crate) tc_buffer: Vec, +} + +impl TcpTmtcServerBase { + pub(crate) fn new( + cfg: ServerConfig, + tm_source: Box>, + tc_receiver: Box>, + ) -> Result { + // Create a TCP listener bound to two addresses. + let socket = Socket::new(Domain::IPV4, Type::STREAM, None)?; + socket.set_reuse_address(cfg.reuse_addr)?; + socket.set_reuse_port(cfg.reuse_port)?; + let addr = (cfg.addr).into(); + socket.bind(&addr)?; + socket.listen(128)?; + Ok(Self { + listener: socket.into(), + inner_loop_delay: cfg.inner_loop_delay, + tm_source, + tm_buffer: vec![0; cfg.tm_buffer_size], + tc_receiver, + tc_buffer: vec![0; cfg.tc_buffer_size], + }) + } + + pub(crate) fn listener(&mut self) -> &mut TcpListener { + &mut self.listener + } + + pub(crate) fn local_addr(&self) -> std::io::Result { + self.listener.local_addr() + } +} diff --git a/satrs-core/src/hal/std/tcp_spacepackets_server.rs b/satrs-core/src/hal/std/tcp_spacepackets_server.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/satrs-core/src/hal/std/tcp_spacepackets_server.rs @@ -0,0 +1 @@ + diff --git a/satrs-core/src/hal/std/tcp_with_cobs_server.rs b/satrs-core/src/hal/std/tcp_with_cobs_server.rs new file mode 100644 index 0000000..fc44ebf --- /dev/null +++ b/satrs-core/src/hal/std/tcp_with_cobs_server.rs @@ -0,0 +1,415 @@ +use alloc::boxed::Box; +use alloc::vec; +use cobs::encode; +use delegate::delegate; +use std::io::Write; +use std::net::SocketAddr; +use std::net::TcpListener; +use std::net::TcpStream; +use std::vec::Vec; + +use crate::encoding::parse_buffer_for_cobs_encoded_packets; +use crate::tmtc::ReceivesTc; +use crate::tmtc::TmPacketSource; + +use crate::hal::std::tcp_server::{ + ConnectionResult, ServerConfig, TcpTcParser, TcpTmSender, TcpTmtcError, TcpTmtcGenericServer, +}; + +/// Concrete [TcpTcParser] implementation for the [TcpTmtcInCobsServer]. +#[derive(Default)] +pub struct CobsTcParser {} + +impl TcpTcParser for CobsTcParser { + fn handle_tc_parsing( + &mut self, + tc_buffer: &mut [u8], + tc_receiver: &mut dyn ReceivesTc, + conn_result: &mut ConnectionResult, + current_write_idx: usize, + next_write_idx: &mut usize, + ) -> Result<(), TcpTmtcError> { + // Reader vec full, need to parse for packets. + conn_result.num_received_tcs += parse_buffer_for_cobs_encoded_packets( + &mut tc_buffer[..current_write_idx], + tc_receiver.upcast_mut(), + next_write_idx, + ) + .map_err(|e| TcpTmtcError::TcError(e))?; + Ok(()) + } +} + +/// Concrete [TcpTmSender] implementation for the [TcpTmtcInCobsServer]. +pub struct CobsTmSender { + tm_encoding_buffer: Vec, +} + +impl CobsTmSender { + fn new(tm_buffer_size: usize) -> Self { + Self { + // The buffer should be large enough to hold the maximum expected TM size encoded with + // COBS. + tm_encoding_buffer: vec![0; cobs::max_encoding_length(tm_buffer_size)], + } + } +} + +impl TcpTmSender for CobsTmSender { + fn handle_tm_sending( + &mut self, + tm_buffer: &mut [u8], + tm_source: &mut dyn TmPacketSource, + conn_result: &mut ConnectionResult, + stream: &mut TcpStream, + ) -> Result> { + let mut tm_was_sent = false; + loop { + // Write TM until TM source is exhausted. For now, there is no limit for the amount + // of TM written this way. + let read_tm_len = tm_source + .retrieve_packet(tm_buffer) + .map_err(|e| TcpTmtcError::TmError(e))?; + + if read_tm_len == 0 { + return Ok(tm_was_sent); + } + tm_was_sent = true; + conn_result.num_sent_tms += 1; + + // Encode into COBS and sent to client. + let mut current_idx = 0; + self.tm_encoding_buffer[current_idx] = 0; + current_idx += 1; + current_idx += encode( + &tm_buffer[..read_tm_len], + &mut self.tm_encoding_buffer[current_idx..], + ); + self.tm_encoding_buffer[current_idx] = 0; + current_idx += 1; + stream.write_all(&self.tm_encoding_buffer[..current_idx])?; + } + } +} + +/// TCP TMTC server implementation for exchange of generic TMTC packets which are framed with the +/// [COBS protocol](https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing). +/// +/// Telemetry will be encoded with the COBS protocol using [cobs::encode] in addition to being +/// wrapped with the sentinel value 0 as the packet delimiter as well before being sent back to +/// the client. Please note that the server will send as much data as it can retrieve from the +/// [TmPacketSource] in its current implementation. +/// +/// Using a framing protocol like COBS imposes minimal restrictions on the type of TMTC data +/// exchanged while also allowing packets with flexible size and a reliable way to reconstruct full +/// packets even from a data stream which is split up. The server wil use the +/// [parse_buffer_for_cobs_encoded_packets] function to parse for packets and pass them to a +/// generic TC receiver. The user can use [crate::encoding::encode_packet_with_cobs] to encode +/// telecommands sent to the server. +/// +/// ## Example +/// +/// The [TCP COBS integration](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-core/tests/tcp_server_cobs.rs) +/// test also serves as the example application for this module. +pub struct TcpTmtcInCobsServer { + generic_server: TcpTmtcGenericServer, +} + +impl TcpTmtcInCobsServer { + /// Create a new TCP TMTC server which exchanges TMTC packets encoded with + /// [COBS protocol](https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing). + /// + /// ## Parameter + /// + /// * `cfg` - Configuration of the server. + /// * `tm_source` - Generic TM source used by the server to pull telemetry packets which are + /// then sent back to the client. + /// * `tc_receiver` - Any received telecommands which were decoded successfully will be + /// forwarded to this TC receiver. + pub fn new( + cfg: ServerConfig, + tm_source: Box>, + tc_receiver: Box>, + ) -> Result> { + Ok(Self { + generic_server: TcpTmtcGenericServer::new( + cfg, + CobsTcParser::default(), + CobsTmSender::new(cfg.tm_buffer_size), + tm_source, + tc_receiver, + )?, + }) + } + + delegate! { + to self.generic_server { + pub fn listener(&mut self) -> &mut TcpListener; + + /// Can be used to retrieve the local assigned address of the TCP server. This is especially + /// useful if using the port number 0 for OS auto-assignment. + pub fn local_addr(&self) -> std::io::Result; + + /// Delegation to the [TcpTmtcGenericServer::handle_next_connection] call. + pub fn handle_next_connection( + &mut self, + ) -> Result>; + } + } +} + +#[cfg(test)] +mod tests { + use core::{ + sync::atomic::{AtomicBool, Ordering}, + time::Duration, + }; + use std::{ + io::{Read, Write}, + net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream}, + sync::Mutex, + thread, + }; + + use crate::{ + encoding::tests::{INVERTED_PACKET, SIMPLE_PACKET}, + hal::std::tcp_server::ServerConfig, + tmtc::{ReceivesTcCore, TmPacketSourceCore}, + }; + use alloc::{boxed::Box, collections::VecDeque, sync::Arc, vec::Vec}; + use cobs::encode; + + use super::TcpTmtcInCobsServer; + + #[derive(Default, Clone)] + struct SyncTcCacher { + tc_queue: Arc>>>, + } + impl ReceivesTcCore for SyncTcCacher { + type Error = (); + + fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> { + let mut tc_queue = self.tc_queue.lock().expect("tc forwarder failed"); + tc_queue.push_back(tc_raw.to_vec()); + Ok(()) + } + } + + #[derive(Default, Clone)] + struct SyncTmSource { + tm_queue: Arc>>>, + } + + impl SyncTmSource { + pub(crate) fn add_tm(&mut self, tm: &[u8]) { + let mut tm_queue = self.tm_queue.lock().expect("locking tm queue failec"); + tm_queue.push_back(tm.to_vec()); + } + } + + impl TmPacketSourceCore for SyncTmSource { + type Error = (); + + fn retrieve_packet(&mut self, buffer: &mut [u8]) -> Result { + let mut tm_queue = self.tm_queue.lock().expect("locking tm queue failed"); + if !tm_queue.is_empty() { + let next_vec = tm_queue.front().unwrap(); + if buffer.len() < next_vec.len() { + panic!( + "provided buffer too small, must be at least {} bytes", + next_vec.len() + ); + } + let next_vec = tm_queue.pop_front().unwrap(); + buffer[0..next_vec.len()].copy_from_slice(&next_vec); + return Ok(next_vec.len()); + } + Ok(0) + } + } + + fn encode_simple_packet(encoded_buf: &mut [u8], current_idx: &mut usize) { + encode_packet(&SIMPLE_PACKET, encoded_buf, current_idx) + } + + fn encode_inverted_packet(encoded_buf: &mut [u8], current_idx: &mut usize) { + encode_packet(&INVERTED_PACKET, encoded_buf, current_idx) + } + + fn encode_packet(packet: &[u8], encoded_buf: &mut [u8], current_idx: &mut usize) { + encoded_buf[*current_idx] = 0; + *current_idx += 1; + *current_idx += encode(packet, &mut encoded_buf[*current_idx..]); + encoded_buf[*current_idx] = 0; + *current_idx += 1; + } + + fn generic_tmtc_server( + addr: &SocketAddr, + tc_receiver: SyncTcCacher, + tm_source: SyncTmSource, + ) -> TcpTmtcInCobsServer<(), ()> { + TcpTmtcInCobsServer::new( + ServerConfig::new(*addr, Duration::from_millis(2), 1024, 1024), + Box::new(tm_source), + Box::new(tc_receiver), + ) + .expect("TCP server generation failed") + } + + #[test] + fn test_server_basic_no_tm() { + let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0); + let tc_receiver = SyncTcCacher::default(); + let tm_source = SyncTmSource::default(); + let mut tcp_server = generic_tmtc_server(&auto_port_addr, tc_receiver.clone(), tm_source); + let dest_addr = tcp_server + .local_addr() + .expect("retrieving dest addr failed"); + let conn_handled: Arc = Default::default(); + let set_if_done = conn_handled.clone(); + // Call the connection handler in separate thread, does block. + thread::spawn(move || { + let result = tcp_server.handle_next_connection(); + if result.is_err() { + panic!("handling connection failed: {:?}", result.unwrap_err()); + } + let conn_result = result.unwrap(); + assert_eq!(conn_result.num_received_tcs, 1); + assert_eq!(conn_result.num_sent_tms, 0); + set_if_done.store(true, Ordering::Relaxed); + }); + // Send TC to server now. + let mut encoded_buf: [u8; 16] = [0; 16]; + let mut current_idx = 0; + encode_simple_packet(&mut encoded_buf, &mut current_idx); + let mut stream = TcpStream::connect(dest_addr).expect("connecting to TCP server failed"); + stream + .write_all(&encoded_buf[..current_idx]) + .expect("writing to TCP server failed"); + drop(stream); + // A certain amount of time is allowed for the transaction to complete. + for _ in 0..3 { + if !conn_handled.load(Ordering::Relaxed) { + thread::sleep(Duration::from_millis(5)); + } + } + if !conn_handled.load(Ordering::Relaxed) { + panic!("connection was not handled properly"); + } + // Check that the packet was received and decoded successfully. + let mut tc_queue = tc_receiver + .tc_queue + .lock() + .expect("locking tc queue failed"); + assert_eq!(tc_queue.len(), 1); + assert_eq!(tc_queue.pop_front().unwrap(), &SIMPLE_PACKET); + drop(tc_queue); + } + + #[test] + fn test_server_basic_multi_tm_multi_tc() { + let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0); + let tc_receiver = SyncTcCacher::default(); + let mut tm_source = SyncTmSource::default(); + tm_source.add_tm(&INVERTED_PACKET); + tm_source.add_tm(&SIMPLE_PACKET); + let mut tcp_server = + generic_tmtc_server(&auto_port_addr, tc_receiver.clone(), tm_source.clone()); + let dest_addr = tcp_server + .local_addr() + .expect("retrieving dest addr failed"); + let conn_handled: Arc = Default::default(); + let set_if_done = conn_handled.clone(); + // Call the connection handler in separate thread, does block. + thread::spawn(move || { + let result = tcp_server.handle_next_connection(); + if result.is_err() { + panic!("handling connection failed: {:?}", result.unwrap_err()); + } + let conn_result = result.unwrap(); + assert_eq!(conn_result.num_received_tcs, 2, "Not enough TCs received"); + assert_eq!(conn_result.num_sent_tms, 2, "Not enough TMs received"); + set_if_done.store(true, Ordering::Relaxed); + }); + // Send TC to server now. + let mut encoded_buf: [u8; 32] = [0; 32]; + let mut current_idx = 0; + encode_simple_packet(&mut encoded_buf, &mut current_idx); + encode_inverted_packet(&mut encoded_buf, &mut current_idx); + let mut stream = TcpStream::connect(dest_addr).expect("connecting to TCP server failed"); + stream + .set_read_timeout(Some(Duration::from_millis(10))) + .expect("setting reas timeout failed"); + stream + .write_all(&encoded_buf[..current_idx]) + .expect("writing to TCP server failed"); + // Done with writing. + stream + .shutdown(std::net::Shutdown::Write) + .expect("shutting down write failed"); + let mut read_buf: [u8; 16] = [0; 16]; + let mut read_len_total = 0; + // Timeout ensures this does not block forever. + while read_len_total < 16 { + let read_len = stream.read(&mut read_buf).expect("read failed"); + read_len_total += read_len; + // Read until full expected size is available. + if read_len == 16 { + // Read first TM packet. + current_idx = 0; + assert_eq!(read_len, 16); + assert_eq!(read_buf[0], 0); + current_idx += 1; + let mut dec_report = cobs::decode_in_place_report(&mut read_buf[current_idx..]) + .expect("COBS decoding failed"); + assert_eq!(dec_report.dst_used, 5); + // Skip first sentinel byte. + assert_eq!( + &read_buf[current_idx..current_idx + INVERTED_PACKET.len()], + &INVERTED_PACKET + ); + current_idx += dec_report.src_used; + // End sentinel. + assert_eq!(read_buf[current_idx], 0, "invalid sentinel end byte"); + current_idx += 1; + + // Read second TM packet. + assert_eq!(read_buf[current_idx], 0); + current_idx += 1; + dec_report = cobs::decode_in_place_report(&mut read_buf[current_idx..]) + .expect("COBS decoding failed"); + assert_eq!(dec_report.dst_used, 5); + // Skip first sentinel byte. + assert_eq!( + &read_buf[current_idx..current_idx + SIMPLE_PACKET.len()], + &SIMPLE_PACKET + ); + current_idx += dec_report.src_used; + // End sentinel. + assert_eq!(read_buf[current_idx], 0); + break; + } + } + drop(stream); + + // A certain amount of time is allowed for the transaction to complete. + for _ in 0..3 { + if !conn_handled.load(Ordering::Relaxed) { + thread::sleep(Duration::from_millis(5)); + } + } + if !conn_handled.load(Ordering::Relaxed) { + panic!("connection was not handled properly"); + } + // Check that the packet was received and decoded successfully. + let mut tc_queue = tc_receiver + .tc_queue + .lock() + .expect("locking tc queue failed"); + assert_eq!(tc_queue.len(), 2); + assert_eq!(tc_queue.pop_front().unwrap(), &SIMPLE_PACKET); + assert_eq!(tc_queue.pop_front().unwrap(), &INVERTED_PACKET); + drop(tc_queue); + } +} diff --git a/satrs-core/src/hal/host/udp_server.rs b/satrs-core/src/hal/std/udp_server.rs similarity index 95% rename from satrs-core/src/hal/host/udp_server.rs rename to satrs-core/src/hal/std/udp_server.rs index b91a239..28e0328 100644 --- a/satrs-core/src/hal/host/udp_server.rs +++ b/satrs-core/src/hal/std/udp_server.rs @@ -1,4 +1,4 @@ -//! UDP server helper components +//! Generic UDP TC server. use crate::tmtc::{ReceivesTc, ReceivesTcCore}; use std::boxed::Box; use std::io::{Error, ErrorKind}; @@ -6,7 +6,8 @@ use std::net::{SocketAddr, ToSocketAddrs, UdpSocket}; use std::vec; use std::vec::Vec; -/// This TC server helper can be used to receive raw PUS telecommands thorough a UDP interface. +/// This UDP server can be used to receive CCSDS space packet telecommands or any other telecommand +/// format. /// /// It caches all received telecomands into a vector. The maximum expected telecommand size should /// be declared upfront. This avoids dynamic allocation during run-time. The user can specify a TC @@ -19,7 +20,7 @@ use std::vec::Vec; /// ``` /// use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}; /// use spacepackets::ecss::SerializablePusPacket; -/// use satrs_core::hal::host::udp_server::UdpTcServer; +/// use satrs_core::hal::std::udp_server::UdpTcServer; /// use satrs_core::tmtc::{ReceivesTc, ReceivesTcCore}; /// use spacepackets::SpHeader; /// use spacepackets::ecss::tc::PusTcCreator; @@ -51,7 +52,7 @@ use std::vec::Vec; /// .expect("Error sending PUS TC via UDP"); /// ``` /// -/// The [satrs-example crate](https://egit.irs.uni-stuttgart.de/rust/fsrc-launchpad/src/branch/main/-example) +/// The [satrs-example crate](https://egit.irs.uni-stuttgart.de/rust/fsrc-launchpad/src/branch/main/satrs-example) /// server code also includes /// [example code](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-example/src/tmtc.rs#L67) /// on how to use this TC server. It uses the server to receive PUS telecommands on a specific port @@ -140,7 +141,7 @@ impl UdpTcServer { #[cfg(test)] mod tests { - use crate::hal::host::udp_server::{ReceiveResult, UdpTcServer}; + use crate::hal::std::udp_server::{ReceiveResult, UdpTcServer}; use crate::tmtc::ReceivesTcCore; use spacepackets::ecss::tc::PusTcCreator; use spacepackets::ecss::SerializablePusPacket; diff --git a/satrs-core/src/lib.rs b/satrs-core/src/lib.rs index 43c8cfb..8ae56e5 100644 --- a/satrs-core/src/lib.rs +++ b/satrs-core/src/lib.rs @@ -20,6 +20,7 @@ extern crate downcast_rs; #[cfg(any(feature = "std", test))] extern crate std; +pub mod encoding; pub mod error; #[cfg(feature = "alloc")] #[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))] diff --git a/satrs-core/src/tmtc/mod.rs b/satrs-core/src/tmtc/mod.rs index 7237196..04f4299 100644 --- a/satrs-core/src/tmtc/mod.rs +++ b/satrs-core/src/tmtc/mod.rs @@ -72,12 +72,33 @@ pub trait ReceivesTcCore { /// Extension trait of [ReceivesTcCore] which allows downcasting by implementing [Downcast] and /// is also sendable. #[cfg(feature = "alloc")] -pub trait ReceivesTc: ReceivesTcCore + Downcast + Send {} +pub trait ReceivesTc: ReceivesTcCore + Downcast + Send { + // Remove this once trait upcasting coercion has been implemented. + // Tracking issue: https://github.com/rust-lang/rust/issues/65991 + fn upcast(&self) -> &dyn ReceivesTcCore; + // Remove this once trait upcasting coercion has been implemented. + // Tracking issue: https://github.com/rust-lang/rust/issues/65991 + fn upcast_mut(&mut self) -> &mut dyn ReceivesTcCore; +} /// Blanket implementation to automatically implement [ReceivesTc] when the [alloc] feature /// is enabled. #[cfg(feature = "alloc")] -impl ReceivesTc for T where T: ReceivesTcCore + Send + 'static {} +impl ReceivesTc for T +where + T: ReceivesTcCore + Send + 'static, +{ + // Remove this once trait upcasting coercion has been implemented. + // Tracking issue: https://github.com/rust-lang/rust/issues/65991 + fn upcast(&self) -> &dyn ReceivesTcCore { + self + } + // Remove this once trait upcasting coercion has been implemented. + // Tracking issue: https://github.com/rust-lang/rust/issues/65991 + fn upcast_mut(&mut self) -> &mut dyn ReceivesTcCore { + self + } +} #[cfg(feature = "alloc")] impl_downcast!(ReceivesTc assoc Error); @@ -92,3 +113,41 @@ pub trait ReceivesCcsdsTc { type Error; fn pass_ccsds(&mut self, header: &SpHeader, tc_raw: &[u8]) -> Result<(), Self::Error>; } + +/// Generic trait for a TM packet source, with no restrictions on the type of TM. +/// Implementors write the telemetry into the provided buffer and return the size of the telemetry. +pub trait TmPacketSourceCore { + type Error; + fn retrieve_packet(&mut self, buffer: &mut [u8]) -> Result; +} + +/// Extension trait of [TmPacketSourceCore] which allows downcasting by implementing [Downcast] and +/// is also sendable. +#[cfg(feature = "alloc")] +pub trait TmPacketSource: TmPacketSourceCore + Downcast + Send { + // Remove this once trait upcasting coercion has been implemented. + // Tracking issue: https://github.com/rust-lang/rust/issues/65991 + fn upcast(&self) -> &dyn TmPacketSourceCore; + // Remove this once trait upcasting coercion has been implemented. + // Tracking issue: https://github.com/rust-lang/rust/issues/65991 + fn upcast_mut(&mut self) -> &mut dyn TmPacketSourceCore; +} + +/// Blanket implementation to automatically implement [ReceivesTc] when the [alloc] feature +/// is enabled. +#[cfg(feature = "alloc")] +impl TmPacketSource for T +where + T: TmPacketSourceCore + Send + 'static, +{ + // Remove this once trait upcasting coercion has been implemented. + // Tracking issue: https://github.com/rust-lang/rust/issues/65991 + fn upcast(&self) -> &dyn TmPacketSourceCore { + self + } + // Remove this once trait upcasting coercion has been implemented. + // Tracking issue: https://github.com/rust-lang/rust/issues/65991 + fn upcast_mut(&mut self) -> &mut dyn TmPacketSourceCore { + self + } +} diff --git a/satrs-core/tests/tcp_server_cobs.rs b/satrs-core/tests/tcp_server_cobs.rs new file mode 100644 index 0000000..5956fb3 --- /dev/null +++ b/satrs-core/tests/tcp_server_cobs.rs @@ -0,0 +1,156 @@ +//! This serves as both an integration test and an example application showcasing all major +//! features of the TCP COBS server by performing following steps: +//! +//! 1. It defines both a TC receiver and a TM source which are [Sync]. +//! 2. A telemetry packet is inserted into the TM source. The packet will be handled by the +//! TCP server after handling all TCs. +//! 3. It instantiates the TCP server on localhost with automatic port assignment and assigns +//! the TC receiver and TM source created previously. +//! 4. It moves the TCP server to a different thread and calls the +//! [TcpTmtcInCobsServer::handle_next_connection] call inside that thread +//! 5. The main threads connects to the server, sends a test telecommand and then reads back +//! the test telemetry insertd in to the TM source previously. +use core::{ + sync::atomic::{AtomicBool, Ordering}, + time::Duration, +}; +use std::{ + io::{Read, Write}, + net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream}, + sync::Mutex, + thread, +}; + +use satrs_core::{ + encoding::cobs::encode_packet_with_cobs, + hal::std::tcp_server::{ServerConfig, TcpTmtcInCobsServer}, + tmtc::{ReceivesTcCore, TmPacketSourceCore}, +}; +use std::{boxed::Box, collections::VecDeque, sync::Arc, vec::Vec}; + +#[derive(Default, Clone)] +struct SyncTcCacher { + tc_queue: Arc>>>, +} +impl ReceivesTcCore for SyncTcCacher { + type Error = (); + + fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> { + let mut tc_queue = self.tc_queue.lock().expect("tc forwarder failed"); + println!("Received TC: {:x?}", tc_raw); + tc_queue.push_back(tc_raw.to_vec()); + Ok(()) + } +} + +#[derive(Default, Clone)] +struct SyncTmSource { + tm_queue: Arc>>>, +} + +impl SyncTmSource { + pub(crate) fn add_tm(&mut self, tm: &[u8]) { + let mut tm_queue = self.tm_queue.lock().expect("locking tm queue failec"); + tm_queue.push_back(tm.to_vec()); + } +} + +impl TmPacketSourceCore for SyncTmSource { + type Error = (); + + fn retrieve_packet(&mut self, buffer: &mut [u8]) -> Result { + let mut tm_queue = self.tm_queue.lock().expect("locking tm queue failed"); + if !tm_queue.is_empty() { + let next_vec = tm_queue.front().unwrap(); + if buffer.len() < next_vec.len() { + panic!( + "provided buffer too small, must be at least {} bytes", + next_vec.len() + ); + } + println!("Sending and encoding TM: {:x?}", next_vec); + let next_vec = tm_queue.pop_front().unwrap(); + buffer[0..next_vec.len()].copy_from_slice(&next_vec); + return Ok(next_vec.len()); + } + Ok(0) + } +} + +const SIMPLE_PACKET: [u8; 5] = [1, 2, 3, 4, 5]; +const INVERTED_PACKET: [u8; 5] = [5, 4, 3, 4, 1]; + +fn main() { + let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0); + let tc_receiver = SyncTcCacher::default(); + let mut tm_source = SyncTmSource::default(); + // Insert a telemetry packet which will be read back by the client at a later stage. + tm_source.add_tm(&INVERTED_PACKET); + let mut tcp_server = TcpTmtcInCobsServer::new( + ServerConfig::new(auto_port_addr, Duration::from_millis(2), 1024, 1024), + Box::new(tm_source), + Box::new(tc_receiver.clone()), + ) + .expect("TCP server generation failed"); + let dest_addr = tcp_server + .local_addr() + .expect("retrieving dest addr failed"); + let conn_handled: Arc = Default::default(); + let set_if_done = conn_handled.clone(); + + // Call the connection handler in separate thread, does block. + thread::spawn(move || { + let result = tcp_server.handle_next_connection(); + if result.is_err() { + panic!("handling connection failed: {:?}", result.unwrap_err()); + } + let conn_result = result.unwrap(); + assert_eq!(conn_result.num_received_tcs, 1, "No TC received"); + assert_eq!(conn_result.num_sent_tms, 1, "No TM received"); + // Signal the main thread we are done. + set_if_done.store(true, Ordering::Relaxed); + }); + + // Send TC to server now. + let mut encoded_buf: [u8; 16] = [0; 16]; + let mut current_idx = 0; + encode_packet_with_cobs(&SIMPLE_PACKET, &mut encoded_buf, &mut current_idx); + let mut stream = TcpStream::connect(dest_addr).expect("connecting to TCP server failed"); + stream + .write_all(&encoded_buf[..current_idx]) + .expect("writing to TCP server failed"); + // Done with writing. + stream + .shutdown(std::net::Shutdown::Write) + .expect("shutting down write failed"); + let mut read_buf: [u8; 16] = [0; 16]; + let read_len = stream.read(&mut read_buf).expect("read failed"); + drop(stream); + + // 1 byte encoding overhead, 2 sentinel bytes. + assert_eq!(read_len, 8); + assert_eq!(read_buf[0], 0); + assert_eq!(read_buf[read_len - 1], 0); + let decoded_len = + cobs::decode_in_place(&mut read_buf[1..read_len]).expect("COBS decoding failed"); + assert_eq!(decoded_len, 5); + // Skip first sentinel byte. + assert_eq!(&read_buf[1..1 + INVERTED_PACKET.len()], &INVERTED_PACKET); + // A certain amount of time is allowed for the transaction to complete. + for _ in 0..3 { + if !conn_handled.load(Ordering::Relaxed) { + thread::sleep(Duration::from_millis(5)); + } + } + if !conn_handled.load(Ordering::Relaxed) { + panic!("connection was not handled properly"); + } + // Check that the packet was received and decoded successfully. + let mut tc_queue = tc_receiver + .tc_queue + .lock() + .expect("locking tc queue failed"); + assert_eq!(tc_queue.len(), 1); + assert_eq!(tc_queue.pop_front().unwrap(), &SIMPLE_PACKET); + drop(tc_queue); +} diff --git a/satrs-example/src/tmtc.rs b/satrs-example/src/tmtc.rs index 188a59d..5d2ea5e 100644 --- a/satrs-example/src/tmtc.rs +++ b/satrs-example/src/tmtc.rs @@ -1,5 +1,5 @@ use log::{info, warn}; -use satrs_core::hal::host::udp_server::{ReceiveResult, UdpTcServer}; +use satrs_core::hal::std::udp_server::{ReceiveResult, UdpTcServer}; use std::net::SocketAddr; use std::sync::mpsc::{Receiver, SendError, Sender, TryRecvError}; use std::thread; diff --git a/satrs-mib/Cargo.toml b/satrs-mib/Cargo.toml index f6dd23a..cdfa4f5 100644 --- a/satrs-mib/Cargo.toml +++ b/satrs-mib/Cargo.toml @@ -23,8 +23,9 @@ version = "1" optional = true [dependencies.satrs-core] -version = "0.1.0-alpha.0" -# path = "../satrs-core" +# version = "0.1.0-alpha.0" +git = "https://egit.irs.uni-stuttgart.de/rust/sat-rs.git" +rev = "35e1f7a983f6535c5571186e361fe101d4306b89" [dependencies.satrs-mib-codegen] path = "codegen" diff --git a/satrs-mib/codegen/Cargo.toml b/satrs-mib/codegen/Cargo.toml index ccc4d1a..db6a671 100644 --- a/satrs-mib/codegen/Cargo.toml +++ b/satrs-mib/codegen/Cargo.toml @@ -20,8 +20,9 @@ quote = "1" proc-macro2 = "1" [dependencies.satrs-core] -version = "0.1.0-alpha.0" -# path = "../../satrs-core" +# version = "0.1.0-alpha.0" +git = "https://egit.irs.uni-stuttgart.de/rust/sat-rs.git" +rev = "35e1f7a983f6535c5571186e361fe101d4306b89" [dev-dependencies] trybuild = { version = "1", features = ["diff"] }