Merge branch 'main' into TargetIdWithApid
Some checks failed
Rust/sat-rs/pipeline/pr-main There was a failure building this commit

This commit is contained in:
Robin Müller 2023-09-26 16:00:16 +02:00
commit 92ac91e194
28 changed files with 3412 additions and 94 deletions

View File

@ -1,5 +1,5 @@
[workspace] [workspace]
resolver = "2"
members = [ members = [
"satrs-core", "satrs-core",
"satrs-mib", "satrs-mib",
@ -9,3 +9,4 @@ members = [
exclude = [ exclude = [
"satrs-example-stm32f3-disco", "satrs-example-stm32f3-disco",
] ]

View File

@ -2,10 +2,10 @@
# docker build -f automation/Dockerfile -t <NAME> . # docker build -f automation/Dockerfile -t <NAME> .
# docker run -it <NAME> # docker run -it <NAME>
FROM rust:latest FROM rust:latest
RUN apt-get update RUN apt-get update && apt-get --yes upgrade
RUN apt-get --yes upgrade
# tzdata is a dependency, won't install otherwise # tzdata is a dependency, won't install otherwise
ARG DEBIAN_FRONTEND=noninteractive ARG DEBIAN_FRONTEND=noninteractive
RUN apt-get --yes install rsync curl
# set CROSS_CONTAINER_IN_CONTAINER to inform `cross` that it is executed from within a container # set CROSS_CONTAINER_IN_CONTAINER to inform `cross` that it is executed from within a container
ENV CROSS_CONTAINER_IN_CONTAINER=true ENV CROSS_CONTAINER_IN_CONTAINER=true
@ -13,3 +13,13 @@ ENV CROSS_CONTAINER_IN_CONTAINER=true
RUN rustup install nightly && \ RUN rustup install nightly && \
rustup target add thumbv7em-none-eabihf armv7-unknown-linux-gnueabihf && \ rustup target add thumbv7em-none-eabihf armv7-unknown-linux-gnueabihf && \
rustup component add rustfmt clippy rustup component add rustfmt clippy
# RUN cargo install mdbook --no-default-features --features search --vers "^0.4" --locked
RUN curl -sSL https://github.com/rust-lang/mdBook/releases/download/v0.4.34/mdbook-v0.4.34-x86_64-unknown-linux-gnu.tar.gz | tar -xz --directory /usr/local/bin
# SSH stuff to allow deployment to doc server
RUN adduser --uid 114 jenkins
# Add documentation server to known hosts
RUN echo "|1|/LzCV4BuTmTb2wKnD146l9fTKgQ=|NJJtVjvWbtRt8OYqFgcYRnMQyVw= ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBNL8ssTonYtgiR/6RRlSIK9WU1ywOcJmxFTLcEblAwH7oifZzmYq3XRfwXrgfMpylEfMFYfCU8JRqtmi19xc21A=" >> /etc/ssh/ssh_known_hosts
RUN echo "|1|CcBvBc3EG03G+XM5rqRHs6gK/Gg=|oGeJQ+1I8NGI2THIkJsW92DpTzs= ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBNL8ssTonYtgiR/6RRlSIK9WU1ywOcJmxFTLcEblAwH7oifZzmYq3XRfwXrgfMpylEfMFYfCU8JRqtmi19xc21A=" >> /etc/ssh/ssh_known_hosts

118
automation/Jenkinsfile vendored
View File

@ -1,52 +1,76 @@
pipeline { pipeline {
agent {
agent { dockerfile {
dockerfile { dir 'automation'
dir 'automation' reuseNode true
reuseNode true args '--network host'
}
} }
}
stages { stages {
stage('Clippy') { stage('Rust Toolchain Info') {
steps { steps {
sh 'cargo clippy' sh 'rustc --version'
} }
}
stage('Docs') {
steps {
sh 'cargo +nightly doc --all-features'
}
}
stage('Rustfmt') {
steps {
sh 'cargo fmt --all --check'
}
}
stage('Test') {
steps {
sh 'cargo test --all-features'
}
}
stage('Check with all features') {
steps {
sh 'cargo check --all-features'
}
}
stage('Check with no features') {
steps {
sh 'cargo check --no-default-features'
}
}
stage('Check Cross Embedded Bare Metal') {
steps {
sh 'cargo check -p satrs-core --target thumbv7em-none-eabihf --no-default-features'
}
}
stage('Check Cross Embedded Linux') {
steps {
sh 'cargo check --target armv7-unknown-linux-gnueabihf'
}
}
} }
stage('Clippy') {
steps {
sh 'cargo clippy'
}
}
stage('Docs') {
steps {
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh 'cargo +nightly doc --all-features'
}
}
}
stage('Rustfmt') {
steps {
sh 'cargo fmt --all --check'
}
}
stage('Test') {
steps {
sh 'cargo test --all-features'
}
}
stage('Check with all features') {
steps {
sh 'cargo check --all-features'
}
}
stage('Check with no features') {
steps {
sh 'cargo check --no-default-features'
}
}
stage('Check Cross Embedded Bare Metal') {
steps {
sh 'cargo check -p satrs-core --target thumbv7em-none-eabihf --no-default-features'
}
}
stage('Check Cross Embedded Linux') {
steps {
sh 'cargo check --target armv7-unknown-linux-gnueabihf'
}
}
stage('Deploy satrs-book') {
when {
anyOf {
branch 'main';
branch pattern: 'mdbook-deployment*'
}
}
steps {
dir('satrs-book') {
sh 'mdbook build'
sshagent(credentials: ['documentation-buildfix']) {
// Deploy to Apache webserver
sh 'rsync -r --delete book/ buildfix@documentation.irs.uni-stuttgart.de:/projects/sat-rs'
}
}
}
}
}
} }

View File

@ -8,12 +8,3 @@
- [Modes and Health](./modes-and-health.md) - [Modes and Health](./modes-and-health.md)
- [Housekeeping Data](./housekeeping.md) - [Housekeeping Data](./housekeeping.md)
- [Events](./events.md) - [Events](./events.md)
- [Power Components](./power.md)
- [Thermal Components](./thermal.md)
- [Persistent TM storage](./persistent-tm-storage.md)
- [FDIR](./fdir.md)
- [Serialization of Data](./serialization.md)
- [Logging](./logging.md)
- [Modelling space systems](./modelling-space-systems.md)
- [Ground Segments](./ground-segments.md)

9
satrs-book/src/TODO.md Normal file
View File

@ -0,0 +1,9 @@
- [Power Components](./power.md)
- [Thermal Components](./thermal.md)
- [Persistent TM storage](./persistent-tm-storage.md)
- [FDIR](./fdir.md)
- [Serialization of Data](./serialization.md)
- [Logging](./logging.md)
- [Modelling space systems](./modelling-space-systems.md)
- [Ground Segments](./ground-segments.md)

View File

@ -1,6 +1,6 @@
# Communication with sat-rs based software # Communication with sat-rs based software
Communication is a huge topic for space systems. Remote systems are usually not (directly) Communication is a vital topic for remote system which are usually not (directly)
connected to the internet and only have 1-2 communication links during nominal operation. However, connected to the internet and only have 1-2 communication links during nominal operation. However,
most of these systems have internet access during development cycle. There are various standards most of these systems have internet access during development cycle. There are various standards
provided by CCSDS and ECSS which can be useful to determine how to communicate with the satellite provided by CCSDS and ECSS which can be useful to determine how to communicate with the satellite

View File

@ -17,10 +17,16 @@ delegate = ">0.7, <=0.10"
paste = "1" paste = "1"
embed-doc-image = "0.1" embed-doc-image = "0.1"
[dependencies.smallvec]
version = "1"
[dependencies.num_enum] [dependencies.num_enum]
version = ">0.5, <=0.7" version = ">0.5, <=0.7"
default-features = false default-features = false
[dependencies.crc]
version = "3"
[dependencies.dyn-clone] [dependencies.dyn-clone]
version = "1" version = "1"
optional = true optional = true
@ -60,19 +66,30 @@ version = "1"
default-features = false default-features = false
optional = true optional = true
[dependencies.socket2]
version = "0.5.4"
features = ["all"]
optional = true
[dependencies.spacepackets] [dependencies.spacepackets]
version = "0.7.0-beta.1" # version = "0.7.0-beta.1"
# path = "../../spacepackets" # path = "../../spacepackets"
# git = "https://egit.irs.uni-stuttgart.de/rust/spacepackets.git" git = "https://egit.irs.uni-stuttgart.de/rust/spacepackets.git"
# rev = "" rev = "79d26e1a6"
# branch = "" # branch = ""
default-features = false default-features = false
[dependencies.cobs]
git = "https://github.com/robamu/cobs.rs.git"
branch = "all_features"
default-features = false
[dev-dependencies] [dev-dependencies]
serde = "1" serde = "1"
zerocopy = "0.7" zerocopy = "0.7"
once_cell = "1.13" once_cell = "1.13"
serde_json = "1" serde_json = "1"
rand = "0.8"
[dev-dependencies.postcard] [dev-dependencies.postcard]
version = "1" version = "1"
@ -80,22 +97,23 @@ version = "1"
[features] [features]
default = ["std"] default = ["std"]
std = [ std = [
"downcast-rs/std", "downcast-rs/std",
"alloc", "alloc",
"bus", "bus",
"postcard/use-std", "postcard/use-std",
"crossbeam-channel/std", "crossbeam-channel/std",
"serde/std", "serde/std",
"spacepackets/std", "spacepackets/std",
"num_enum/std", "num_enum/std",
"thiserror" "thiserror",
"socket2"
] ]
alloc = [ alloc = [
"serde/alloc", "serde/alloc",
"spacepackets/alloc", "spacepackets/alloc",
"hashbrown", "hashbrown",
"dyn-clone", "dyn-clone",
"downcast-rs" "downcast-rs"
] ]
serde = ["dep:serde", "spacepackets/serde"] serde = ["dep:serde", "spacepackets/serde"]
crossbeam = ["crossbeam-channel"] crossbeam = ["crossbeam-channel"]

871
satrs-core/src/cfdp/dest.rs Normal file
View File

@ -0,0 +1,871 @@
use core::str::{from_utf8, Utf8Error};
use std::{
fs::{metadata, File},
io::{BufReader, Read, Seek, SeekFrom, Write},
path::{Path, PathBuf},
};
use crate::cfdp::user::TransactionFinishedParams;
use super::{
user::{CfdpUser, MetadataReceivedParams},
PacketInfo, PacketTarget, State, TransactionId, TransactionStep, CRC_32,
};
use smallvec::SmallVec;
use spacepackets::{
cfdp::{
pdu::{
eof::EofPdu,
file_data::FileDataPdu,
finished::{DeliveryCode, FileStatus, FinishedPdu},
metadata::{MetadataGenericParams, MetadataPdu},
CommonPduConfig, FileDirectiveType, PduError, PduHeader,
},
tlv::{msg_to_user::MsgToUserTlv, EntityIdTlv, TlvType},
ConditionCode, PduType, TransmissionMode,
},
util::UnsignedByteField,
};
use thiserror::Error;
pub struct DestinationHandler {
id: UnsignedByteField,
step: TransactionStep,
state: State,
tparams: TransactionParams,
packets_to_send_ctx: PacketsToSendContext,
}
#[derive(Debug, Default)]
struct PacketsToSendContext {
packet_available: bool,
directive: Option<FileDirectiveType>,
}
#[derive(Debug)]
struct FileProperties {
src_file_name: [u8; u8::MAX as usize],
src_file_name_len: usize,
dest_file_name: [u8; u8::MAX as usize],
dest_file_name_len: usize,
dest_path_buf: PathBuf,
}
#[derive(Debug)]
struct TransferState {
transaction_id: Option<TransactionId>,
progress: usize,
condition_code: ConditionCode,
delivery_code: DeliveryCode,
file_status: FileStatus,
metadata_params: MetadataGenericParams,
}
impl Default for TransferState {
fn default() -> Self {
Self {
transaction_id: None,
progress: Default::default(),
condition_code: ConditionCode::NoError,
delivery_code: DeliveryCode::Incomplete,
file_status: FileStatus::Unreported,
metadata_params: Default::default(),
}
}
}
#[derive(Debug)]
struct TransactionParams {
tstate: TransferState,
pdu_conf: CommonPduConfig,
file_properties: FileProperties,
cksum_buf: [u8; 1024],
msgs_to_user_size: usize,
msgs_to_user_buf: [u8; 1024],
}
impl Default for FileProperties {
fn default() -> Self {
Self {
src_file_name: [0; u8::MAX as usize],
src_file_name_len: Default::default(),
dest_file_name: [0; u8::MAX as usize],
dest_file_name_len: Default::default(),
dest_path_buf: Default::default(),
}
}
}
impl TransactionParams {
fn file_size(&self) -> usize {
self.tstate.metadata_params.file_size as usize
}
fn metadata_params(&self) -> &MetadataGenericParams {
&self.tstate.metadata_params
}
}
impl Default for TransactionParams {
fn default() -> Self {
Self {
pdu_conf: Default::default(),
cksum_buf: [0; 1024],
msgs_to_user_size: 0,
msgs_to_user_buf: [0; 1024],
tstate: Default::default(),
file_properties: Default::default(),
}
}
}
impl TransactionParams {
fn reset(&mut self) {
self.tstate.condition_code = ConditionCode::NoError;
self.tstate.delivery_code = DeliveryCode::Incomplete;
}
}
#[derive(Debug, Error)]
pub enum DestError {
/// File directive expected, but none specified
#[error("expected file directive")]
DirectiveExpected,
#[error("can not process packet type {0:?}")]
CantProcessPacketType(FileDirectiveType),
#[error("can not process file data PDUs in current state")]
WrongStateForFileDataAndEof,
// Received new metadata PDU while being already being busy with a file transfer.
#[error("busy with transfer")]
RecvdMetadataButIsBusy,
#[error("empty source file field")]
EmptySrcFileField,
#[error("empty dest file field")]
EmptyDestFileField,
#[error("pdu error {0}")]
Pdu(#[from] PduError),
#[error("io error {0}")]
Io(#[from] std::io::Error),
#[error("path conversion error {0}")]
PathConversion(#[from] Utf8Error),
#[error("error building dest path from source file name and dest folder")]
PathConcatError,
}
impl DestinationHandler {
pub fn new(id: impl Into<UnsignedByteField>) -> Self {
Self {
id: id.into(),
step: TransactionStep::Idle,
state: State::Idle,
tparams: Default::default(),
packets_to_send_ctx: Default::default(),
}
}
pub fn state_machine(&mut self, cfdp_user: &mut impl CfdpUser) -> Result<(), DestError> {
match self.state {
State::Idle => todo!(),
State::BusyClass1Nacked => self.fsm_nacked(cfdp_user),
State::BusyClass2Acked => todo!("acknowledged mode not implemented yet"),
}
}
pub fn insert_packet(&mut self, packet_info: &PacketInfo) -> Result<(), DestError> {
if packet_info.target() != PacketTarget::DestEntity {
// Unwrap is okay here, a PacketInfo for a file data PDU should always have the
// destination as the target.
return Err(DestError::CantProcessPacketType(
packet_info.pdu_directive().unwrap(),
));
}
match packet_info.pdu_type {
PduType::FileDirective => {
if packet_info.pdu_directive.is_none() {
return Err(DestError::DirectiveExpected);
}
self.handle_file_directive(
packet_info.pdu_directive.unwrap(),
packet_info.raw_packet,
)
}
PduType::FileData => self.handle_file_data(packet_info.raw_packet),
}
}
pub fn packet_to_send_ready(&self) -> bool {
self.packets_to_send_ctx.packet_available
}
pub fn get_next_packet_to_send(
&self,
buf: &mut [u8],
) -> Result<Option<(FileDirectiveType, usize)>, DestError> {
if !self.packet_to_send_ready() {
return Ok(None);
}
let directive = self.packets_to_send_ctx.directive.unwrap();
let written_size = match directive {
FileDirectiveType::FinishedPdu => {
let pdu_header = PduHeader::new_no_file_data(self.tparams.pdu_conf, 0);
let finished_pdu = if self.tparams.tstate.condition_code == ConditionCode::NoError
|| self.tparams.tstate.condition_code == ConditionCode::UnsupportedChecksumType
{
FinishedPdu::new_default(
pdu_header,
self.tparams.tstate.delivery_code,
self.tparams.tstate.file_status,
)
} else {
// TODO: Are there cases where this ID is actually the source entity ID?
let entity_id = EntityIdTlv::new(self.id);
FinishedPdu::new_with_error(
pdu_header,
self.tparams.tstate.condition_code,
self.tparams.tstate.delivery_code,
self.tparams.tstate.file_status,
entity_id,
)
};
finished_pdu.write_to_bytes(buf)?
}
FileDirectiveType::AckPdu => todo!(),
FileDirectiveType::NakPdu => todo!(),
FileDirectiveType::KeepAlivePdu => todo!(),
_ => {
// This should never happen and is considered an internal impl error
panic!("invalid file directive {directive:?} for dest handler send packet");
}
};
Ok(Some((directive, written_size)))
}
pub fn handle_file_directive(
&mut self,
pdu_directive: FileDirectiveType,
raw_packet: &[u8],
) -> Result<(), DestError> {
match pdu_directive {
FileDirectiveType::EofPdu => self.handle_eof_pdu(raw_packet)?,
FileDirectiveType::FinishedPdu
| FileDirectiveType::NakPdu
| FileDirectiveType::KeepAlivePdu => {
return Err(DestError::CantProcessPacketType(pdu_directive));
}
FileDirectiveType::AckPdu => {
todo!(
"check whether ACK pdu handling is applicable by checking the acked directive field"
)
}
FileDirectiveType::MetadataPdu => self.handle_metadata_pdu(raw_packet)?,
FileDirectiveType::PromptPdu => self.handle_prompt_pdu(raw_packet)?,
};
Ok(())
}
pub fn handle_metadata_pdu(&mut self, raw_packet: &[u8]) -> Result<(), DestError> {
if self.state != State::Idle {
return Err(DestError::RecvdMetadataButIsBusy);
}
let metadata_pdu = MetadataPdu::from_bytes(raw_packet)?;
self.tparams.reset();
self.tparams.tstate.metadata_params = *metadata_pdu.metadata_params();
let src_name = metadata_pdu.src_file_name();
if src_name.is_empty() {
return Err(DestError::EmptySrcFileField);
}
self.tparams.file_properties.src_file_name[..src_name.len_value()]
.copy_from_slice(src_name.value());
self.tparams.file_properties.src_file_name_len = src_name.len_value();
let dest_name = metadata_pdu.dest_file_name();
if dest_name.is_empty() {
return Err(DestError::EmptyDestFileField);
}
self.tparams.file_properties.dest_file_name[..dest_name.len_value()]
.copy_from_slice(dest_name.value());
self.tparams.file_properties.dest_file_name_len = dest_name.len_value();
self.tparams.pdu_conf = *metadata_pdu.pdu_header().common_pdu_conf();
self.tparams.msgs_to_user_size = 0;
if metadata_pdu.options().is_some() {
for option_tlv in metadata_pdu.options_iter().unwrap() {
if option_tlv.is_standard_tlv()
&& option_tlv.tlv_type().unwrap() == TlvType::MsgToUser
{
self.tparams
.msgs_to_user_buf
.copy_from_slice(option_tlv.raw_data().unwrap());
self.tparams.msgs_to_user_size += option_tlv.len_full();
}
}
}
if self.tparams.pdu_conf.trans_mode == TransmissionMode::Unacknowledged {
self.state = State::BusyClass1Nacked;
} else {
self.state = State::BusyClass2Acked;
}
self.step = TransactionStep::TransactionStart;
Ok(())
}
pub fn handle_file_data(&mut self, raw_packet: &[u8]) -> Result<(), DestError> {
if self.state == State::Idle || self.step != TransactionStep::ReceivingFileDataPdus {
return Err(DestError::WrongStateForFileDataAndEof);
}
let fd_pdu = FileDataPdu::from_bytes(raw_packet)?;
let mut dest_file = File::options()
.write(true)
.open(&self.tparams.file_properties.dest_path_buf)?;
dest_file.seek(SeekFrom::Start(fd_pdu.offset()))?;
dest_file.write_all(fd_pdu.file_data())?;
Ok(())
}
#[allow(clippy::needless_if)]
pub fn handle_eof_pdu(&mut self, raw_packet: &[u8]) -> Result<(), DestError> {
if self.state == State::Idle || self.step != TransactionStep::ReceivingFileDataPdus {
return Err(DestError::WrongStateForFileDataAndEof);
}
let eof_pdu = EofPdu::from_bytes(raw_packet)?;
let checksum = eof_pdu.file_checksum();
// For a standard disk based file system, which is assumed to be used for now, the file
// will always be retained. This might change in the future.
self.tparams.tstate.file_status = FileStatus::Retained;
if self.checksum_check(checksum)? {
self.tparams.tstate.condition_code = ConditionCode::NoError;
self.tparams.tstate.delivery_code = DeliveryCode::Complete;
} else {
self.tparams.tstate.condition_code = ConditionCode::FileChecksumFailure;
}
// TODO: Check progress, and implement transfer completion timer as specified in the
// standard. This timer protects against out of order arrival of packets.
if self.tparams.tstate.progress != self.tparams.file_size() {}
if self.state == State::BusyClass1Nacked {
self.step = TransactionStep::TransferCompletion;
} else {
self.step = TransactionStep::SendingAckPdu;
}
Ok(())
}
pub fn handle_prompt_pdu(&mut self, _raw_packet: &[u8]) -> Result<(), DestError> {
todo!();
}
fn checksum_check(&mut self, expected_checksum: u32) -> Result<bool, DestError> {
let mut digest = CRC_32.digest();
let file_to_check = File::open(&self.tparams.file_properties.dest_path_buf)?;
let mut buf_reader = BufReader::new(file_to_check);
loop {
let bytes_read = buf_reader.read(&mut self.tparams.cksum_buf)?;
if bytes_read == 0 {
break;
}
digest.update(&self.tparams.cksum_buf[0..bytes_read]);
}
if digest.finalize() == expected_checksum {
return Ok(true);
}
Ok(false)
}
fn fsm_nacked(&mut self, cfdp_user: &mut impl CfdpUser) -> Result<(), DestError> {
if self.step == TransactionStep::TransactionStart {
self.transaction_start(cfdp_user)?;
}
if self.step == TransactionStep::TransferCompletion {
self.transfer_completion(cfdp_user)?;
}
if self.step == TransactionStep::SendingAckPdu {
todo!("no support for acknowledged mode yet");
}
if self.step == TransactionStep::SendingFinishedPdu {
self.reset();
}
Ok(())
}
/// Get the step, which denotes the exact step of a pending CFDP transaction when applicable.
pub fn step(&self) -> TransactionStep {
self.step
}
/// Get the step, which denotes whether the CFDP handler is active, and which CFDP class
/// is used if it is active.
pub fn state(&self) -> State {
self.state
}
fn transaction_start(&mut self, cfdp_user: &mut impl CfdpUser) -> Result<(), DestError> {
let dest_name = from_utf8(
&self.tparams.file_properties.dest_file_name
[..self.tparams.file_properties.dest_file_name_len],
)?;
let dest_path = Path::new(dest_name);
self.tparams.file_properties.dest_path_buf = dest_path.to_path_buf();
let source_id = self.tparams.pdu_conf.source_id();
let id = TransactionId::new(source_id, self.tparams.pdu_conf.transaction_seq_num);
let src_name = from_utf8(
&self.tparams.file_properties.src_file_name
[0..self.tparams.file_properties.src_file_name_len],
)?;
let mut msgs_to_user = SmallVec::<[MsgToUserTlv<'_>; 16]>::new();
let mut num_msgs_to_user = 0;
if self.tparams.msgs_to_user_size > 0 {
let mut index = 0;
while index < self.tparams.msgs_to_user_size {
// This should never panic as the validity of the options was checked beforehand.
let msgs_to_user_tlv =
MsgToUserTlv::from_bytes(&self.tparams.msgs_to_user_buf[index..])
.expect("message to user creation failed unexpectedly");
msgs_to_user.push(msgs_to_user_tlv);
index += msgs_to_user_tlv.len_full();
num_msgs_to_user += 1;
}
}
let metadata_recvd_params = MetadataReceivedParams {
id,
source_id,
file_size: self.tparams.tstate.metadata_params.file_size,
src_file_name: src_name,
dest_file_name: dest_name,
msgs_to_user: &msgs_to_user[..num_msgs_to_user],
};
self.tparams.tstate.transaction_id = Some(id);
cfdp_user.metadata_recvd_indication(&metadata_recvd_params);
if dest_path.exists() {
let dest_metadata = metadata(dest_path)?;
if dest_metadata.is_dir() {
// Create new destination path by concatenating the last part of the source source
// name and the destination folder. For example, for a source file of /tmp/hello.txt
// and a destination name of /home/test, the resulting file name should be
// /home/test/hello.txt
let source_path = Path::new(from_utf8(
&self.tparams.file_properties.src_file_name
[..self.tparams.file_properties.src_file_name_len],
)?);
let source_name = source_path.file_name();
if source_name.is_none() {
return Err(DestError::PathConcatError);
}
let source_name = source_name.unwrap();
self.tparams.file_properties.dest_path_buf.push(source_name);
}
}
// This function does exactly what we require: Create a new file if it does not exist yet
// and trucate an existing one.
File::create(&self.tparams.file_properties.dest_path_buf)?;
self.step = TransactionStep::ReceivingFileDataPdus;
Ok(())
}
fn transfer_completion(&mut self, cfdp_user: &mut impl CfdpUser) -> Result<(), DestError> {
let transaction_finished_params = TransactionFinishedParams {
id: self.tparams.tstate.transaction_id.unwrap(),
condition_code: self.tparams.tstate.condition_code,
delivery_code: self.tparams.tstate.delivery_code,
file_status: self.tparams.tstate.file_status,
};
cfdp_user.transaction_finished_indication(&transaction_finished_params);
// This function should never be called with metadata parameters not set
if self.tparams.metadata_params().closure_requested {
self.prepare_finished_pdu()?;
self.step = TransactionStep::SendingFinishedPdu;
} else {
self.reset();
self.state = State::Idle;
self.step = TransactionStep::Idle;
}
Ok(())
}
fn reset(&mut self) {
self.step = TransactionStep::Idle;
self.state = State::Idle;
self.packets_to_send_ctx.packet_available = false;
self.tparams.reset();
}
fn prepare_finished_pdu(&mut self) -> Result<(), DestError> {
self.packets_to_send_ctx.packet_available = true;
self.packets_to_send_ctx.directive = Some(FileDirectiveType::FinishedPdu);
self.step = TransactionStep::SendingFinishedPdu;
Ok(())
}
}
#[cfg(test)]
mod tests {
use core::sync::atomic::{AtomicU8, Ordering};
#[allow(unused_imports)]
use std::println;
use std::{env::temp_dir, fs};
use alloc::{format, string::String};
use rand::Rng;
use spacepackets::{
cfdp::{lv::Lv, ChecksumType},
util::{UbfU16, UnsignedByteFieldU16},
};
use super::*;
const LOCAL_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(1);
const REMOTE_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(2);
const SRC_NAME: &str = "__cfdp__source-file";
const DEST_NAME: &str = "__cfdp__dest-file";
static ATOMIC_COUNTER: AtomicU8 = AtomicU8::new(0);
#[derive(Default)]
struct TestCfdpUser {
next_expected_seq_num: u64,
expected_full_src_name: String,
expected_full_dest_name: String,
expected_file_size: usize,
}
impl TestCfdpUser {
fn generic_id_check(&self, id: &crate::cfdp::TransactionId) {
assert_eq!(id.source_id, LOCAL_ID.into());
assert_eq!(id.seq_num().value(), self.next_expected_seq_num);
}
}
impl CfdpUser for TestCfdpUser {
fn transaction_indication(&mut self, id: &crate::cfdp::TransactionId) {
self.generic_id_check(id);
}
fn eof_sent_indication(&mut self, id: &crate::cfdp::TransactionId) {
self.generic_id_check(id);
}
fn transaction_finished_indication(
&mut self,
finished_params: &crate::cfdp::user::TransactionFinishedParams,
) {
self.generic_id_check(&finished_params.id);
}
fn metadata_recvd_indication(
&mut self,
md_recvd_params: &crate::cfdp::user::MetadataReceivedParams,
) {
self.generic_id_check(&md_recvd_params.id);
assert_eq!(
String::from(md_recvd_params.src_file_name),
self.expected_full_src_name
);
assert_eq!(
String::from(md_recvd_params.dest_file_name),
self.expected_full_dest_name
);
assert_eq!(md_recvd_params.msgs_to_user.len(), 0);
assert_eq!(md_recvd_params.source_id, LOCAL_ID.into());
assert_eq!(md_recvd_params.file_size as usize, self.expected_file_size);
}
fn file_segment_recvd_indication(
&mut self,
_segment_recvd_params: &crate::cfdp::user::FileSegmentRecvdParams,
) {
}
fn report_indication(&mut self, _id: &crate::cfdp::TransactionId) {}
fn suspended_indication(
&mut self,
_id: &crate::cfdp::TransactionId,
_condition_code: ConditionCode,
) {
panic!("unexpected suspended indication");
}
fn resumed_indication(&mut self, _id: &crate::cfdp::TransactionId, _progresss: u64) {}
fn fault_indication(
&mut self,
_id: &crate::cfdp::TransactionId,
_condition_code: ConditionCode,
_progress: u64,
) {
panic!("unexpected fault indication");
}
fn abandoned_indication(
&mut self,
_id: &crate::cfdp::TransactionId,
_condition_code: ConditionCode,
_progress: u64,
) {
panic!("unexpected abandoned indication");
}
fn eof_recvd_indication(&mut self, id: &crate::cfdp::TransactionId) {
self.generic_id_check(id);
}
}
fn init_check(handler: &DestinationHandler) {
assert_eq!(handler.state(), State::Idle);
assert_eq!(handler.step(), TransactionStep::Idle);
}
fn init_full_filenames() -> (PathBuf, PathBuf) {
let mut file_path = temp_dir();
let mut src_path = file_path.clone();
// Atomic counter used to allow concurrent tests.
let unique_counter = ATOMIC_COUNTER.fetch_add(1, Ordering::Relaxed);
// Create unique test filenames.
let src_name_unique = format!("{SRC_NAME}{}.txt", unique_counter);
let dest_name_unique = format!("{DEST_NAME}{}.txt", unique_counter);
src_path.push(src_name_unique);
file_path.push(dest_name_unique);
(src_path, file_path)
}
#[test]
fn test_basic() {
let dest_handler = DestinationHandler::new(REMOTE_ID);
init_check(&dest_handler);
}
fn create_pdu_header(seq_num: impl Into<UnsignedByteField>) -> PduHeader {
let mut pdu_conf =
CommonPduConfig::new_with_byte_fields(LOCAL_ID, REMOTE_ID, seq_num).unwrap();
pdu_conf.trans_mode = TransmissionMode::Unacknowledged;
PduHeader::new_no_file_data(pdu_conf, 0)
}
fn create_metadata_pdu<'filename>(
pdu_header: &PduHeader,
src_name: &'filename Path,
dest_name: &'filename Path,
file_size: u64,
) -> MetadataPdu<'filename, 'filename, 'static> {
let metadata_params = MetadataGenericParams::new(false, ChecksumType::Crc32, file_size);
MetadataPdu::new(
*pdu_header,
metadata_params,
Lv::new_from_str(src_name.as_os_str().to_str().unwrap()).unwrap(),
Lv::new_from_str(dest_name.as_os_str().to_str().unwrap()).unwrap(),
None,
)
}
fn insert_metadata_pdu(
metadata_pdu: &MetadataPdu,
buf: &mut [u8],
dest_handler: &mut DestinationHandler,
) {
let written_len = metadata_pdu
.write_to_bytes(buf)
.expect("writing metadata PDU failed");
let packet_info =
PacketInfo::new(&buf[..written_len]).expect("generating packet info failed");
let insert_result = dest_handler.insert_packet(&packet_info);
if let Err(e) = insert_result {
panic!("insert result error: {e}");
}
}
fn insert_eof_pdu(
file_data: &[u8],
pdu_header: &PduHeader,
buf: &mut [u8],
dest_handler: &mut DestinationHandler,
) {
let mut digest = CRC_32.digest();
digest.update(file_data);
let crc32 = digest.finalize();
let eof_pdu = EofPdu::new_no_error(*pdu_header, crc32, file_data.len() as u64);
let result = eof_pdu.write_to_bytes(buf);
assert!(result.is_ok());
let packet_info = PacketInfo::new(&buf).expect("generating packet info failed");
let result = dest_handler.insert_packet(&packet_info);
assert!(result.is_ok());
}
#[test]
fn test_empty_file_transfer() {
let (src_name, dest_name) = init_full_filenames();
assert!(!Path::exists(&dest_name));
let mut buf: [u8; 512] = [0; 512];
let mut test_user = TestCfdpUser {
next_expected_seq_num: 0,
expected_full_src_name: src_name.to_string_lossy().into(),
expected_full_dest_name: dest_name.to_string_lossy().into(),
expected_file_size: 0,
};
// We treat the destination handler like it is a remote entity.
let mut dest_handler = DestinationHandler::new(REMOTE_ID);
init_check(&dest_handler);
let seq_num = UbfU16::new(0);
let pdu_header = create_pdu_header(seq_num);
let metadata_pdu =
create_metadata_pdu(&pdu_header, src_name.as_path(), dest_name.as_path(), 0);
insert_metadata_pdu(&metadata_pdu, &mut buf, &mut dest_handler);
let result = dest_handler.state_machine(&mut test_user);
if let Err(e) = result {
panic!("dest handler fsm error: {e}");
}
assert_ne!(dest_handler.state(), State::Idle);
assert_eq!(dest_handler.step(), TransactionStep::ReceivingFileDataPdus);
insert_eof_pdu(&[], &pdu_header, &mut buf, &mut dest_handler);
let result = dest_handler.state_machine(&mut test_user);
assert!(result.is_ok());
assert_eq!(dest_handler.state(), State::Idle);
assert_eq!(dest_handler.step(), TransactionStep::Idle);
assert!(Path::exists(&dest_name));
let read_content = fs::read(&dest_name).expect("reading back string failed");
assert_eq!(read_content.len(), 0);
assert!(fs::remove_file(dest_name).is_ok());
}
#[test]
fn test_small_file_transfer() {
let (src_name, dest_name) = init_full_filenames();
assert!(!Path::exists(&dest_name));
let file_data_str = "Hello World!";
let file_data = file_data_str.as_bytes();
let mut buf: [u8; 512] = [0; 512];
let mut test_user = TestCfdpUser {
next_expected_seq_num: 0,
expected_full_src_name: src_name.to_string_lossy().into(),
expected_full_dest_name: dest_name.to_string_lossy().into(),
expected_file_size: file_data.len(),
};
// We treat the destination handler like it is a remote entity.
let mut dest_handler = DestinationHandler::new(REMOTE_ID);
init_check(&dest_handler);
let seq_num = UbfU16::new(0);
let pdu_header = create_pdu_header(seq_num);
let metadata_pdu = create_metadata_pdu(
&pdu_header,
src_name.as_path(),
dest_name.as_path(),
file_data.len() as u64,
);
insert_metadata_pdu(&metadata_pdu, &mut buf, &mut dest_handler);
let result = dest_handler.state_machine(&mut test_user);
if let Err(e) = result {
panic!("dest handler fsm error: {e}");
}
assert_ne!(dest_handler.state(), State::Idle);
assert_eq!(dest_handler.step(), TransactionStep::ReceivingFileDataPdus);
let offset = 0;
let filedata_pdu = FileDataPdu::new_no_seg_metadata(pdu_header, offset, file_data);
filedata_pdu
.write_to_bytes(&mut buf)
.expect("writing file data PDU failed");
let packet_info = PacketInfo::new(&buf).expect("creating packet info failed");
let result = dest_handler.insert_packet(&packet_info);
if let Err(e) = result {
panic!("destination handler packet insertion error: {e}");
}
let result = dest_handler.state_machine(&mut test_user);
assert!(result.is_ok());
insert_eof_pdu(file_data, &pdu_header, &mut buf, &mut dest_handler);
let result = dest_handler.state_machine(&mut test_user);
assert!(result.is_ok());
assert_eq!(dest_handler.state(), State::Idle);
assert_eq!(dest_handler.step(), TransactionStep::Idle);
assert!(Path::exists(&dest_name));
let read_content = fs::read_to_string(&dest_name).expect("reading back string failed");
assert_eq!(read_content, file_data_str);
assert!(fs::remove_file(dest_name).is_ok());
}
#[test]
fn test_segmented_file_transfer() {
let (src_name, dest_name) = init_full_filenames();
assert!(!Path::exists(&dest_name));
let mut rng = rand::thread_rng();
let mut random_data = [0u8; 512];
rng.fill(&mut random_data);
let mut buf: [u8; 512] = [0; 512];
let mut test_user = TestCfdpUser {
next_expected_seq_num: 0,
expected_full_src_name: src_name.to_string_lossy().into(),
expected_full_dest_name: dest_name.to_string_lossy().into(),
expected_file_size: random_data.len(),
};
// We treat the destination handler like it is a remote entity.
let mut dest_handler = DestinationHandler::new(REMOTE_ID);
init_check(&dest_handler);
let seq_num = UbfU16::new(0);
let pdu_header = create_pdu_header(seq_num);
let metadata_pdu = create_metadata_pdu(
&pdu_header,
src_name.as_path(),
dest_name.as_path(),
random_data.len() as u64,
);
insert_metadata_pdu(&metadata_pdu, &mut buf, &mut dest_handler);
let result = dest_handler.state_machine(&mut test_user);
if let Err(e) = result {
panic!("dest handler fsm error: {e}");
}
assert_ne!(dest_handler.state(), State::Idle);
assert_eq!(dest_handler.step(), TransactionStep::ReceivingFileDataPdus);
// First file data PDU
let mut offset: usize = 0;
let segment_len = 256;
let filedata_pdu = FileDataPdu::new_no_seg_metadata(
pdu_header,
offset as u64,
&random_data[0..segment_len],
);
filedata_pdu
.write_to_bytes(&mut buf)
.expect("writing file data PDU failed");
let packet_info = PacketInfo::new(&buf).expect("creating packet info failed");
let result = dest_handler.insert_packet(&packet_info);
if let Err(e) = result {
panic!("destination handler packet insertion error: {e}");
}
let result = dest_handler.state_machine(&mut test_user);
assert!(result.is_ok());
// Second file data PDU
offset += segment_len;
let filedata_pdu = FileDataPdu::new_no_seg_metadata(
pdu_header,
offset as u64,
&random_data[segment_len..],
);
filedata_pdu
.write_to_bytes(&mut buf)
.expect("writing file data PDU failed");
let packet_info = PacketInfo::new(&buf).expect("creating packet info failed");
let result = dest_handler.insert_packet(&packet_info);
if let Err(e) = result {
panic!("destination handler packet insertion error: {e}");
}
let result = dest_handler.state_machine(&mut test_user);
assert!(result.is_ok());
insert_eof_pdu(&random_data, &pdu_header, &mut buf, &mut dest_handler);
let result = dest_handler.state_machine(&mut test_user);
assert!(result.is_ok());
assert_eq!(dest_handler.state(), State::Idle);
assert_eq!(dest_handler.step(), TransactionStep::Idle);
// Clean up
assert!(Path::exists(&dest_name));
let read_content = fs::read(&dest_name).expect("reading back string failed");
assert_eq!(read_content, random_data);
assert!(fs::remove_file(dest_name).is_ok());
}
}

320
satrs-core/src/cfdp/mod.rs Normal file
View File

@ -0,0 +1,320 @@
use crc::{Crc, CRC_32_CKSUM};
use spacepackets::{
cfdp::{
pdu::{FileDirectiveType, PduError, PduHeader},
ChecksumType, PduType, TransmissionMode,
},
util::UnsignedByteField,
};
#[cfg(feature = "alloc")]
use alloc::boxed::Box;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
#[cfg(feature = "std")]
pub mod dest;
#[cfg(feature = "std")]
pub mod source;
pub mod user;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum EntityType {
Sending,
Receiving,
}
/// Generic abstraction for a check timer which has different functionality depending on whether
/// the using entity is the sending entity or the receiving entity for the unacknowledged
/// transmission mode.
///
/// For the sending entity, this timer determines the expiry period for declaring a check limit
/// fault after sending an EOF PDU with requested closure. This allows a timeout of the transfer.
/// Also see 4.6.3.2 of the CFDP standard.
///
/// For the receiving entity, this timer determines the expiry period for incrementing a check
/// counter after an EOF PDU is received for an incomplete file transfer. This allows out-of-order
/// reception of file data PDUs and EOF PDUs. Also see 4.6.3.3 of the CFDP standard.
pub trait CheckTimerProvider {
fn has_expired(&self) -> bool;
}
/// A generic trait which allows CFDP entities to create check timers which are required to
/// implement special procedures in unacknowledged transmission mode, as specified in 4.6.3.2
/// and 4.6.3.3. The [CheckTimerProvider] provides more information about the purpose of the
/// check timer.
///
/// This trait also allows the creation of different check timers depending on
/// the ID of the local entity, the ID of the remote entity for a given transaction, and the
/// type of entity.
#[cfg(feature = "alloc")]
pub trait CheckTimerCreator {
fn get_check_timer_provider(
local_id: &UnsignedByteField,
remote_id: &UnsignedByteField,
entity_type: EntityType,
) -> Box<dyn CheckTimerProvider>;
}
/// Simple implementation of the [CheckTimerProvider] trait assuming a standard runtime.
/// It also assumes that a second accuracy of the check timer period is sufficient.
#[cfg(feature = "std")]
pub struct StdCheckTimer {
expiry_time_seconds: u64,
start_time: std::time::Instant,
}
#[cfg(feature = "std")]
impl StdCheckTimer {
pub fn new(expiry_time_seconds: u64) -> Self {
Self {
expiry_time_seconds,
start_time: std::time::Instant::now(),
}
}
}
#[cfg(feature = "std")]
impl CheckTimerProvider for StdCheckTimer {
fn has_expired(&self) -> bool {
let elapsed_time = self.start_time.elapsed();
if elapsed_time.as_secs() > self.expiry_time_seconds {
return true;
}
false
}
}
#[derive(Debug)]
pub struct RemoteEntityConfig {
pub entity_id: UnsignedByteField,
pub max_file_segment_len: usize,
pub closure_requeted_by_default: bool,
pub crc_on_transmission_by_default: bool,
pub default_transmission_mode: TransmissionMode,
pub default_crc_type: ChecksumType,
pub check_limit: u32,
}
pub trait RemoteEntityConfigProvider {
fn get_remote_config(&self, remote_id: &UnsignedByteField) -> Option<&RemoteEntityConfig>;
}
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct TransactionId {
source_id: UnsignedByteField,
seq_num: UnsignedByteField,
}
impl TransactionId {
pub fn new(source_id: UnsignedByteField, seq_num: UnsignedByteField) -> Self {
Self { source_id, seq_num }
}
pub fn source_id(&self) -> &UnsignedByteField {
&self.source_id
}
pub fn seq_num(&self) -> &UnsignedByteField {
&self.seq_num
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum TransactionStep {
Idle = 0,
TransactionStart = 1,
ReceivingFileDataPdus = 2,
SendingAckPdu = 3,
TransferCompletion = 4,
SendingFinishedPdu = 5,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum State {
Idle = 0,
BusyClass1Nacked = 2,
BusyClass2Acked = 3,
}
pub const CRC_32: Crc<u32> = Crc::<u32>::new(&CRC_32_CKSUM);
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum PacketTarget {
SourceEntity,
DestEntity,
}
/// This is a helper struct which contains base information about a particular PDU packet.
/// This is also necessary information for CFDP packet routing. For example, some packet types
/// like file data PDUs can only be used by CFDP source entities.
pub struct PacketInfo<'raw_packet> {
pdu_type: PduType,
pdu_directive: Option<FileDirectiveType>,
target: PacketTarget,
raw_packet: &'raw_packet [u8],
}
impl<'raw> PacketInfo<'raw> {
pub fn new(raw_packet: &'raw [u8]) -> Result<Self, PduError> {
let (pdu_header, header_len) = PduHeader::from_bytes(raw_packet)?;
if pdu_header.pdu_type() == PduType::FileData {
return Ok(Self {
pdu_type: pdu_header.pdu_type(),
pdu_directive: None,
target: PacketTarget::DestEntity,
raw_packet,
});
}
if pdu_header.pdu_datafield_len() < 1 {
return Err(PduError::FormatError);
}
// Route depending on PDU type and directive type if applicable. Retrieve directive type
// from the raw stream for better performance (with sanity and directive code check).
// The routing is based on section 4.5 of the CFDP standard which specifies the PDU forwarding
// procedure.
let directive = FileDirectiveType::try_from(raw_packet[header_len]).map_err(|_| {
PduError::InvalidDirectiveType {
found: raw_packet[header_len],
expected: None,
}
})?;
let packet_target = match directive {
// Section c) of 4.5.3: These PDUs should always be targeted towards the file sender a.k.a.
// the source handler
FileDirectiveType::NakPdu
| FileDirectiveType::FinishedPdu
| FileDirectiveType::KeepAlivePdu => PacketTarget::SourceEntity,
// Section b) of 4.5.3: These PDUs should always be targeted towards the file receiver a.k.a.
// the destination handler
FileDirectiveType::MetadataPdu
| FileDirectiveType::EofPdu
| FileDirectiveType::PromptPdu => PacketTarget::DestEntity,
// Section a): Recipient depends of the type of PDU that is being acknowledged. We can simply
// extract the PDU type from the raw stream. If it is an EOF PDU, this packet is passed to
// the source handler, for a Finished PDU, it is passed to the destination handler.
FileDirectiveType::AckPdu => {
let acked_directive = FileDirectiveType::try_from(raw_packet[header_len + 1])
.map_err(|_| PduError::InvalidDirectiveType {
found: raw_packet[header_len],
expected: None,
})?;
if acked_directive == FileDirectiveType::EofPdu {
PacketTarget::SourceEntity
} else if acked_directive == FileDirectiveType::FinishedPdu {
PacketTarget::DestEntity
} else {
// TODO: Maybe a better error? This might be confusing..
return Err(PduError::InvalidDirectiveType {
found: raw_packet[header_len + 1],
expected: None,
});
}
}
};
Ok(Self {
pdu_type: pdu_header.pdu_type(),
pdu_directive: Some(directive),
target: packet_target,
raw_packet,
})
}
pub fn pdu_type(&self) -> PduType {
self.pdu_type
}
pub fn pdu_directive(&self) -> Option<FileDirectiveType> {
self.pdu_directive
}
pub fn target(&self) -> PacketTarget {
self.target
}
pub fn raw_packet(&self) -> &[u8] {
self.raw_packet
}
}
#[cfg(test)]
mod tests {
use spacepackets::cfdp::{
lv::Lv,
pdu::{
eof::EofPdu,
file_data::FileDataPdu,
metadata::{MetadataGenericParams, MetadataPdu},
CommonPduConfig, FileDirectiveType, PduHeader,
},
PduType,
};
use crate::cfdp::PacketTarget;
use super::PacketInfo;
fn generic_pdu_header() -> PduHeader {
let pdu_conf = CommonPduConfig::default();
PduHeader::new_no_file_data(pdu_conf, 0)
}
#[test]
fn test_metadata_pdu_info() {
let mut buf: [u8; 128] = [0; 128];
let pdu_header = generic_pdu_header();
let metadata_params = MetadataGenericParams::default();
let src_file_name = "hello.txt";
let dest_file_name = "hello-dest.txt";
let src_lv = Lv::new_from_str(src_file_name).unwrap();
let dest_lv = Lv::new_from_str(dest_file_name).unwrap();
let metadata_pdu = MetadataPdu::new(pdu_header, metadata_params, src_lv, dest_lv, None);
metadata_pdu
.write_to_bytes(&mut buf)
.expect("writing metadata PDU failed");
let packet_info = PacketInfo::new(&buf).expect("creating packet info failed");
assert_eq!(packet_info.pdu_type(), PduType::FileDirective);
assert!(packet_info.pdu_directive().is_some());
assert_eq!(
packet_info.pdu_directive().unwrap(),
FileDirectiveType::MetadataPdu
);
assert_eq!(packet_info.target(), PacketTarget::DestEntity);
}
#[test]
fn test_filedata_pdu_info() {
let mut buf: [u8; 128] = [0; 128];
let pdu_header = generic_pdu_header();
let file_data_pdu = FileDataPdu::new_no_seg_metadata(pdu_header, 0, &[]);
file_data_pdu
.write_to_bytes(&mut buf)
.expect("writing file data PDU failed");
let packet_info = PacketInfo::new(&buf).expect("creating packet info failed");
assert_eq!(packet_info.pdu_type(), PduType::FileData);
assert!(packet_info.pdu_directive().is_none());
assert_eq!(packet_info.target(), PacketTarget::DestEntity);
}
#[test]
fn test_eof_pdu_info() {
let mut buf: [u8; 128] = [0; 128];
let pdu_header = generic_pdu_header();
let eof_pdu = EofPdu::new_no_error(pdu_header, 0, 0);
eof_pdu
.write_to_bytes(&mut buf)
.expect("writing file data PDU failed");
let packet_info = PacketInfo::new(&buf).expect("creating packet info failed");
assert_eq!(packet_info.pdu_type(), PduType::FileDirective);
assert!(packet_info.pdu_directive().is_some());
assert_eq!(
packet_info.pdu_directive().unwrap(),
FileDirectiveType::EofPdu
);
}
}

View File

@ -0,0 +1,15 @@
#![allow(dead_code)]
use spacepackets::util::UnsignedByteField;
pub struct SourceHandler {
id: UnsignedByteField,
}
impl SourceHandler {
pub fn new(id: impl Into<UnsignedByteField>) -> Self {
Self { id: id.into() }
}
}
#[cfg(test)]
mod tests {}

View File

@ -0,0 +1,65 @@
use spacepackets::{
cfdp::{
pdu::{
file_data::RecordContinuationState,
finished::{DeliveryCode, FileStatus},
},
tlv::msg_to_user::MsgToUserTlv,
ConditionCode,
},
util::UnsignedByteField,
};
use super::TransactionId;
#[derive(Debug, Copy, Clone)]
pub struct TransactionFinishedParams {
pub id: TransactionId,
pub condition_code: ConditionCode,
pub delivery_code: DeliveryCode,
pub file_status: FileStatus,
}
#[derive(Debug)]
pub struct MetadataReceivedParams<'src_file, 'dest_file, 'msgs_to_user> {
pub id: TransactionId,
pub source_id: UnsignedByteField,
pub file_size: u64,
pub src_file_name: &'src_file str,
pub dest_file_name: &'dest_file str,
pub msgs_to_user: &'msgs_to_user [MsgToUserTlv<'msgs_to_user>],
}
#[derive(Debug)]
pub struct FileSegmentRecvdParams<'seg_meta> {
pub id: TransactionId,
pub offset: u64,
pub length: usize,
pub rec_cont_state: Option<RecordContinuationState>,
pub segment_metadata: Option<&'seg_meta [u8]>,
}
pub trait CfdpUser {
fn transaction_indication(&mut self, id: &TransactionId);
fn eof_sent_indication(&mut self, id: &TransactionId);
fn transaction_finished_indication(&mut self, finished_params: &TransactionFinishedParams);
fn metadata_recvd_indication(&mut self, md_recvd_params: &MetadataReceivedParams);
fn file_segment_recvd_indication(&mut self, segment_recvd_params: &FileSegmentRecvdParams);
// TODO: The standard does not strictly specify how the report information looks..
fn report_indication(&mut self, id: &TransactionId);
fn suspended_indication(&mut self, id: &TransactionId, condition_code: ConditionCode);
fn resumed_indication(&mut self, id: &TransactionId, progress: u64);
fn fault_indication(
&mut self,
id: &TransactionId,
condition_code: ConditionCode,
progress: u64,
);
fn abandoned_indication(
&mut self,
id: &TransactionId,
condition_code: ConditionCode,
progress: u64,
);
fn eof_recvd_indication(&mut self, id: &TransactionId);
}

View File

@ -0,0 +1,269 @@
#[cfg(feature = "alloc")]
use alloc::vec::Vec;
#[cfg(feature = "alloc")]
use hashbrown::HashSet;
use spacepackets::PacketId;
use crate::tmtc::ReceivesTcCore;
pub trait PacketIdLookup {
fn validate(&self, packet_id: u16) -> bool;
}
#[cfg(feature = "alloc")]
impl PacketIdLookup for Vec<u16> {
fn validate(&self, packet_id: u16) -> bool {
self.contains(&packet_id)
}
}
#[cfg(feature = "alloc")]
impl PacketIdLookup for HashSet<u16> {
fn validate(&self, packet_id: u16) -> bool {
self.contains(&packet_id)
}
}
impl PacketIdLookup for [u16] {
fn validate(&self, packet_id: u16) -> bool {
self.binary_search(&packet_id).is_ok()
}
}
#[cfg(feature = "alloc")]
impl PacketIdLookup for Vec<PacketId> {
fn validate(&self, packet_id: u16) -> bool {
self.contains(&PacketId::from(packet_id))
}
}
#[cfg(feature = "alloc")]
impl PacketIdLookup for HashSet<PacketId> {
fn validate(&self, packet_id: u16) -> bool {
self.contains(&PacketId::from(packet_id))
}
}
impl PacketIdLookup for [PacketId] {
fn validate(&self, packet_id: u16) -> bool {
self.binary_search(&PacketId::from(packet_id)).is_ok()
}
}
/// This function parses a given buffer for tightly packed CCSDS space packets. It uses the
/// [PacketId] field of the CCSDS packets to detect the start of a CCSDS space packet and then
/// uses the length field of the packet to extract CCSDS packets.
///
/// This function is also able to deal with broken tail packets at the end as long a the parser
/// can read the full 7 bytes which constitue a space packet header plus one byte minimal size.
/// If broken tail packets are detected, they are moved to the front of the buffer, and the write
/// index for future write operations will be written to the `next_write_idx` argument.
///
/// The parser will write all packets which were decoded successfully to the given `tc_receiver`
/// and return the number of packets found. If the [ReceivesTcCore::pass_tc] calls fails, the
/// error will be returned.
pub fn parse_buffer_for_ccsds_space_packets<E>(
buf: &mut [u8],
packet_id_lookup: &(impl PacketIdLookup + ?Sized),
tc_receiver: &mut (impl ReceivesTcCore<Error = E> + ?Sized),
next_write_idx: &mut usize,
) -> Result<u32, E> {
*next_write_idx = 0;
let mut packets_found = 0;
let mut current_idx = 0;
let buf_len = buf.len();
loop {
if current_idx + 7 >= buf.len() {
break;
}
let packet_id = u16::from_be_bytes(buf[current_idx..current_idx + 2].try_into().unwrap());
if packet_id_lookup.validate(packet_id) {
let length_field =
u16::from_be_bytes(buf[current_idx + 4..current_idx + 6].try_into().unwrap());
let packet_size = length_field + 7;
if (current_idx + packet_size as usize) <= buf_len {
tc_receiver.pass_tc(&buf[current_idx..current_idx + packet_size as usize])?;
packets_found += 1;
} else {
// Move packet to start of buffer if applicable.
if current_idx > 0 {
buf.copy_within(current_idx.., 0);
*next_write_idx = buf.len() - current_idx;
}
}
current_idx += packet_size as usize;
continue;
}
current_idx += 1;
}
Ok(packets_found)
}
#[cfg(test)]
mod tests {
use spacepackets::{
ecss::{tc::PusTcCreator, SerializablePusPacket},
PacketId, SpHeader,
};
use crate::encoding::tests::TcCacher;
use super::parse_buffer_for_ccsds_space_packets;
const TEST_APID_0: u16 = 0x02;
const TEST_APID_1: u16 = 0x10;
const TEST_PACKET_ID_0: PacketId = PacketId::const_tc(true, TEST_APID_0);
const TEST_PACKET_ID_1: PacketId = PacketId::const_tc(true, TEST_APID_1);
#[test]
fn test_basic() {
let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap();
let ping_tc = PusTcCreator::new_simple(&mut sph, 17, 1, None, true);
let mut buffer: [u8; 32] = [0; 32];
let packet_len = ping_tc
.write_to_bytes(&mut buffer)
.expect("writing packet failed");
let valid_packet_ids = [TEST_PACKET_ID_0];
let mut tc_cacher = TcCacher::default();
let mut next_write_idx = 0;
let parse_result = parse_buffer_for_ccsds_space_packets(
&mut buffer,
valid_packet_ids.as_slice(),
&mut tc_cacher,
&mut next_write_idx,
);
assert!(parse_result.is_ok());
let parsed_packets = parse_result.unwrap();
assert_eq!(parsed_packets, 1);
assert_eq!(tc_cacher.tc_queue.len(), 1);
assert_eq!(
tc_cacher.tc_queue.pop_front().unwrap(),
buffer[..packet_len]
);
}
#[test]
fn test_multi_packet() {
let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap();
let ping_tc = PusTcCreator::new_simple(&mut sph, 17, 1, None, true);
let action_tc = PusTcCreator::new_simple(&mut sph, 8, 0, None, true);
let mut buffer: [u8; 32] = [0; 32];
let packet_len_ping = ping_tc
.write_to_bytes(&mut buffer)
.expect("writing packet failed");
let packet_len_action = action_tc
.write_to_bytes(&mut buffer[packet_len_ping..])
.expect("writing packet failed");
let valid_packet_ids = [TEST_PACKET_ID_0];
let mut tc_cacher = TcCacher::default();
let mut next_write_idx = 0;
let parse_result = parse_buffer_for_ccsds_space_packets(
&mut buffer,
valid_packet_ids.as_slice(),
&mut tc_cacher,
&mut next_write_idx,
);
assert!(parse_result.is_ok());
let parsed_packets = parse_result.unwrap();
assert_eq!(parsed_packets, 2);
assert_eq!(tc_cacher.tc_queue.len(), 2);
assert_eq!(
tc_cacher.tc_queue.pop_front().unwrap(),
buffer[..packet_len_ping]
);
assert_eq!(
tc_cacher.tc_queue.pop_front().unwrap(),
buffer[packet_len_ping..packet_len_ping + packet_len_action]
);
}
#[test]
fn test_multi_apid() {
let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap();
let ping_tc = PusTcCreator::new_simple(&mut sph, 17, 1, None, true);
sph = SpHeader::tc_unseg(TEST_APID_1, 0, 0).unwrap();
let action_tc = PusTcCreator::new_simple(&mut sph, 8, 0, None, true);
let mut buffer: [u8; 32] = [0; 32];
let packet_len_ping = ping_tc
.write_to_bytes(&mut buffer)
.expect("writing packet failed");
let packet_len_action = action_tc
.write_to_bytes(&mut buffer[packet_len_ping..])
.expect("writing packet failed");
let valid_packet_ids = [TEST_PACKET_ID_0, TEST_PACKET_ID_1];
let mut tc_cacher = TcCacher::default();
let mut next_write_idx = 0;
let parse_result = parse_buffer_for_ccsds_space_packets(
&mut buffer,
valid_packet_ids.as_slice(),
&mut tc_cacher,
&mut next_write_idx,
);
assert!(parse_result.is_ok());
let parsed_packets = parse_result.unwrap();
assert_eq!(parsed_packets, 2);
assert_eq!(tc_cacher.tc_queue.len(), 2);
assert_eq!(
tc_cacher.tc_queue.pop_front().unwrap(),
buffer[..packet_len_ping]
);
assert_eq!(
tc_cacher.tc_queue.pop_front().unwrap(),
buffer[packet_len_ping..packet_len_ping + packet_len_action]
);
}
#[test]
fn test_split_packet_multi() {
let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap();
let ping_tc = PusTcCreator::new_simple(&mut sph, 17, 1, None, true);
sph = SpHeader::tc_unseg(TEST_APID_1, 0, 0).unwrap();
let action_tc = PusTcCreator::new_simple(&mut sph, 8, 0, None, true);
let mut buffer: [u8; 32] = [0; 32];
let packet_len_ping = ping_tc
.write_to_bytes(&mut buffer)
.expect("writing packet failed");
let packet_len_action = action_tc
.write_to_bytes(&mut buffer[packet_len_ping..])
.expect("writing packet failed");
let valid_packet_ids = [TEST_PACKET_ID_0, TEST_PACKET_ID_1];
let mut tc_cacher = TcCacher::default();
let mut next_write_idx = 0;
let parse_result = parse_buffer_for_ccsds_space_packets(
&mut buffer[..packet_len_ping + packet_len_action - 4],
valid_packet_ids.as_slice(),
&mut tc_cacher,
&mut next_write_idx,
);
assert!(parse_result.is_ok());
let parsed_packets = parse_result.unwrap();
assert_eq!(parsed_packets, 1);
assert_eq!(tc_cacher.tc_queue.len(), 1);
// The broken packet was moved to the start, so the next write index should be after the
// last segment missing 4 bytes.
assert_eq!(next_write_idx, packet_len_action - 4);
}
#[test]
fn test_one_split_packet() {
let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap();
let ping_tc = PusTcCreator::new_simple(&mut sph, 17, 1, None, true);
let mut buffer: [u8; 32] = [0; 32];
let packet_len_ping = ping_tc
.write_to_bytes(&mut buffer)
.expect("writing packet failed");
let valid_packet_ids = [TEST_PACKET_ID_0, TEST_PACKET_ID_1];
let mut tc_cacher = TcCacher::default();
let mut next_write_idx = 0;
let parse_result = parse_buffer_for_ccsds_space_packets(
&mut buffer[..packet_len_ping - 4],
valid_packet_ids.as_slice(),
&mut tc_cacher,
&mut next_write_idx,
);
assert_eq!(next_write_idx, 0);
assert!(parse_result.is_ok());
let parsed_packets = parse_result.unwrap();
assert_eq!(parsed_packets, 0);
assert_eq!(tc_cacher.tc_queue.len(), 0);
}
}

View File

@ -0,0 +1,263 @@
use crate::tmtc::ReceivesTcCore;
use cobs::{decode_in_place, encode, max_encoding_length};
/// This function encodes the given packet with COBS and also wraps the encoded packet with
/// the sentinel value 0. It can be used repeatedly on the same encoded buffer by expecting
/// and incrementing the mutable reference of the current packet index. This is also used
/// to retrieve the total encoded size.
///
/// This function will return [false] if the given encoding buffer is not large enough to hold
/// the encoded buffer and the two sentinel bytes and [true] if the encoding was successfull.
///
/// ## Example
///
/// ```
/// use cobs::decode_in_place_report;
/// use satrs_core::encoding::{encode_packet_with_cobs};
//
/// const SIMPLE_PACKET: [u8; 5] = [1, 2, 3, 4, 5];
/// const INVERTED_PACKET: [u8; 5] = [5, 4, 3, 2, 1];
///
/// let mut encoding_buf: [u8; 32] = [0; 32];
/// let mut current_idx = 0;
/// assert!(encode_packet_with_cobs(&SIMPLE_PACKET, &mut encoding_buf, &mut current_idx));
/// assert!(encode_packet_with_cobs(&INVERTED_PACKET, &mut encoding_buf, &mut current_idx));
/// assert_eq!(encoding_buf[0], 0);
/// let dec_report = decode_in_place_report(&mut encoding_buf[1..]).expect("decoding failed");
/// assert_eq!(encoding_buf[1 + dec_report.src_used], 0);
/// assert_eq!(dec_report.dst_used, 5);
/// assert_eq!(current_idx, 16);
/// ```
pub fn encode_packet_with_cobs(
packet: &[u8],
encoded_buf: &mut [u8],
current_idx: &mut usize,
) -> bool {
let max_encoding_len = max_encoding_length(packet.len());
if *current_idx + max_encoding_len + 2 > encoded_buf.len() {
return false;
}
encoded_buf[*current_idx] = 0;
*current_idx += 1;
*current_idx += encode(packet, &mut encoded_buf[*current_idx..]);
encoded_buf[*current_idx] = 0;
*current_idx += 1;
true
}
/// This function parses a given buffer for COBS encoded packets. The packet structure is
/// expected to be like this, assuming a sentinel value of 0 as the packet delimiter:
///
/// 0 | ... Encoded Packet Data ... | 0 | 0 | ... Encoded Packet Data ... | 0
///
/// This function is also able to deal with broken tail packets at the end. If broken tail
/// packets are detected, they are moved to the front of the buffer, and the write index for
/// future write operations will be written to the `next_write_idx` argument.
///
/// The parser will write all packets which were decoded successfully to the given `tc_receiver`.
pub fn parse_buffer_for_cobs_encoded_packets<E>(
buf: &mut [u8],
tc_receiver: &mut dyn ReceivesTcCore<Error = E>,
next_write_idx: &mut usize,
) -> Result<u32, E> {
let mut start_index_packet = 0;
let mut start_found = false;
let mut last_byte = false;
let mut packets_found = 0;
for i in 0..buf.len() {
if i == buf.len() - 1 {
last_byte = true;
}
if buf[i] == 0 {
if !start_found && !last_byte && buf[i + 1] == 0 {
// Special case: Consecutive sentinel values or all zeroes.
// Skip.
continue;
}
if start_found {
let decode_result = decode_in_place(&mut buf[start_index_packet..i]);
if let Ok(packet_len) = decode_result {
packets_found += 1;
tc_receiver
.pass_tc(&buf[start_index_packet..start_index_packet + packet_len])?;
}
start_found = false;
} else {
start_index_packet = i + 1;
start_found = true;
}
}
}
// Move split frame at the end to the front of the buffer.
if start_index_packet > 0 && start_found && packets_found > 0 {
buf.copy_within(start_index_packet - 1.., 0);
*next_write_idx = buf.len() - start_index_packet + 1;
}
Ok(packets_found)
}
#[cfg(test)]
pub(crate) mod tests {
use cobs::encode;
use crate::encoding::tests::{encode_simple_packet, TcCacher, INVERTED_PACKET, SIMPLE_PACKET};
use super::parse_buffer_for_cobs_encoded_packets;
#[test]
fn test_parsing_simple_packet() {
let mut test_sender = TcCacher::default();
let mut encoded_buf: [u8; 16] = [0; 16];
let mut current_idx = 0;
encode_simple_packet(&mut encoded_buf, &mut current_idx);
let mut next_read_idx = 0;
let packets = parse_buffer_for_cobs_encoded_packets(
&mut encoded_buf[0..current_idx],
&mut test_sender,
&mut next_read_idx,
)
.unwrap();
assert_eq!(packets, 1);
assert_eq!(test_sender.tc_queue.len(), 1);
let packet = &test_sender.tc_queue[0];
assert_eq!(packet, &SIMPLE_PACKET);
}
#[test]
fn test_parsing_consecutive_packets() {
let mut test_sender = TcCacher::default();
let mut encoded_buf: [u8; 16] = [0; 16];
let mut current_idx = 0;
encode_simple_packet(&mut encoded_buf, &mut current_idx);
// Second packet
encoded_buf[current_idx] = 0;
current_idx += 1;
current_idx += encode(&INVERTED_PACKET, &mut encoded_buf[current_idx..]);
encoded_buf[current_idx] = 0;
current_idx += 1;
let mut next_read_idx = 0;
let packets = parse_buffer_for_cobs_encoded_packets(
&mut encoded_buf[0..current_idx],
&mut test_sender,
&mut next_read_idx,
)
.unwrap();
assert_eq!(packets, 2);
assert_eq!(test_sender.tc_queue.len(), 2);
let packet0 = &test_sender.tc_queue[0];
assert_eq!(packet0, &SIMPLE_PACKET);
let packet1 = &test_sender.tc_queue[1];
assert_eq!(packet1, &INVERTED_PACKET);
}
#[test]
fn test_split_tail_packet_only() {
let mut test_sender = TcCacher::default();
let mut encoded_buf: [u8; 16] = [0; 16];
let mut current_idx = 0;
encode_simple_packet(&mut encoded_buf, &mut current_idx);
let mut next_read_idx = 0;
let packets = parse_buffer_for_cobs_encoded_packets(
// Cut off the sentinel byte at the end.
&mut encoded_buf[0..current_idx - 1],
&mut test_sender,
&mut next_read_idx,
)
.unwrap();
assert_eq!(packets, 0);
assert_eq!(test_sender.tc_queue.len(), 0);
assert_eq!(next_read_idx, 0);
}
fn generic_test_split_packet(cut_off: usize) {
let mut test_sender = TcCacher::default();
let mut encoded_buf: [u8; 16] = [0; 16];
assert!(cut_off < INVERTED_PACKET.len() + 1);
let mut current_idx = 0;
encode_simple_packet(&mut encoded_buf, &mut current_idx);
// Second packet
encoded_buf[current_idx] = 0;
let packet_start = current_idx;
current_idx += 1;
let encoded_len = encode(&INVERTED_PACKET, &mut encoded_buf[current_idx..]);
assert_eq!(encoded_len, 6);
current_idx += encoded_len;
// We cut off the sentinel byte, so we expecte the write index to be the length of the
// packet minus the sentinel byte plus the first sentinel byte.
let next_expected_write_idx = 1 + encoded_len - cut_off + 1;
encoded_buf[current_idx] = 0;
current_idx += 1;
let mut next_write_idx = 0;
let expected_at_start = encoded_buf[packet_start..current_idx - cut_off].to_vec();
let packets = parse_buffer_for_cobs_encoded_packets(
// Cut off the sentinel byte at the end.
&mut encoded_buf[0..current_idx - cut_off],
&mut test_sender,
&mut next_write_idx,
)
.unwrap();
assert_eq!(packets, 1);
assert_eq!(test_sender.tc_queue.len(), 1);
assert_eq!(&test_sender.tc_queue[0], &SIMPLE_PACKET);
assert_eq!(next_write_idx, next_expected_write_idx);
assert_eq!(encoded_buf[..next_expected_write_idx], expected_at_start);
}
#[test]
fn test_one_packet_and_split_tail_packet_0() {
generic_test_split_packet(1);
}
#[test]
fn test_one_packet_and_split_tail_packet_1() {
generic_test_split_packet(2);
}
#[test]
fn test_one_packet_and_split_tail_packet_2() {
generic_test_split_packet(3);
}
#[test]
fn test_zero_at_end() {
let mut test_sender = TcCacher::default();
let mut encoded_buf: [u8; 16] = [0; 16];
let mut next_write_idx = 0;
let mut current_idx = 0;
encoded_buf[current_idx] = 5;
current_idx += 1;
encode_simple_packet(&mut encoded_buf, &mut current_idx);
encoded_buf[current_idx] = 0;
current_idx += 1;
let packets = parse_buffer_for_cobs_encoded_packets(
// Cut off the sentinel byte at the end.
&mut encoded_buf[0..current_idx],
&mut test_sender,
&mut next_write_idx,
)
.unwrap();
assert_eq!(packets, 1);
assert_eq!(test_sender.tc_queue.len(), 1);
assert_eq!(&test_sender.tc_queue[0], &SIMPLE_PACKET);
assert_eq!(next_write_idx, 1);
assert_eq!(encoded_buf[0], 0);
}
#[test]
fn test_all_zeroes() {
let mut test_sender = TcCacher::default();
let mut all_zeroes: [u8; 5] = [0; 5];
let mut next_write_idx = 0;
let packets = parse_buffer_for_cobs_encoded_packets(
// Cut off the sentinel byte at the end.
&mut all_zeroes,
&mut test_sender,
&mut next_write_idx,
)
.unwrap();
assert_eq!(packets, 0);
assert!(test_sender.tc_queue.is_empty());
assert_eq!(next_write_idx, 0);
}
}

View File

@ -0,0 +1,40 @@
pub mod ccsds;
pub mod cobs;
pub use crate::encoding::ccsds::parse_buffer_for_ccsds_space_packets;
pub use crate::encoding::cobs::{encode_packet_with_cobs, parse_buffer_for_cobs_encoded_packets};
#[cfg(test)]
pub(crate) mod tests {
use alloc::{collections::VecDeque, vec::Vec};
use crate::tmtc::ReceivesTcCore;
use super::cobs::encode_packet_with_cobs;
pub(crate) const SIMPLE_PACKET: [u8; 5] = [1, 2, 3, 4, 5];
pub(crate) const INVERTED_PACKET: [u8; 5] = [5, 4, 3, 2, 1];
#[derive(Default)]
pub(crate) struct TcCacher {
pub(crate) tc_queue: VecDeque<Vec<u8>>,
}
impl ReceivesTcCore for TcCacher {
type Error = ();
fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> {
self.tc_queue.push_back(tc_raw.to_vec());
Ok(())
}
}
pub(crate) fn encode_simple_packet(encoded_buf: &mut [u8], current_idx: &mut usize) {
encode_packet_with_cobs(&SIMPLE_PACKET, encoded_buf, current_idx);
}
#[allow(dead_code)]
pub(crate) fn encode_inverted_packet(encoded_buf: &mut [u8], current_idx: &mut usize) {
encode_packet_with_cobs(&INVERTED_PACKET, encoded_buf, current_idx);
}
}

View File

@ -29,7 +29,7 @@ pub trait Executable: Send {
fn periodic_op(&mut self, op_code: i32) -> Result<OpResult, Self::Error>; fn periodic_op(&mut self, op_code: i32) -> Result<OpResult, Self::Error>;
} }
/// This function allows executing one task which implements the [Executable][Executable] trait /// This function allows executing one task which implements the [Executable] trait
/// ///
/// # Arguments /// # Arguments
/// ///
@ -78,7 +78,7 @@ pub fn exec_sched_single<
} }
/// This function allows executing multiple tasks as long as the tasks implement the /// This function allows executing multiple tasks as long as the tasks implement the
/// [Executable][Executable] trait /// [Executable] trait
/// ///
/// # Arguments /// # Arguments
/// ///

View File

@ -1,2 +0,0 @@
//! Helper modules intended to be used on hosts with a full [std] runtime
pub mod udp_server;

View File

@ -1,4 +1,4 @@
//! # Hardware Abstraction Layer module //! # Hardware Abstraction Layer module
#[cfg(feature = "std")] #[cfg(feature = "std")]
#[cfg_attr(doc_cfg, doc(cfg(feature = "std")))] #[cfg_attr(doc_cfg, doc(cfg(feature = "std")))]
pub mod host; pub mod std;

View File

@ -0,0 +1,6 @@
//! Helper modules intended to be used on systems with a full [std] runtime.
pub mod tcp_server;
pub mod udp_server;
mod tcp_cobs_server;
mod tcp_spacepackets_server;

View File

@ -0,0 +1,369 @@
use alloc::boxed::Box;
use alloc::vec;
use cobs::encode;
use delegate::delegate;
use std::io::Write;
use std::net::SocketAddr;
use std::net::TcpListener;
use std::net::TcpStream;
use std::vec::Vec;
use crate::encoding::parse_buffer_for_cobs_encoded_packets;
use crate::tmtc::ReceivesTc;
use crate::tmtc::TmPacketSource;
use crate::hal::std::tcp_server::{
ConnectionResult, ServerConfig, TcpTcParser, TcpTmSender, TcpTmtcError, TcpTmtcGenericServer,
};
/// Concrete [TcpTcParser] implementation for the [TcpTmtcInCobsServer].
#[derive(Default)]
pub struct CobsTcParser {}
impl<TmError, TcError: 'static> TcpTcParser<TmError, TcError> for CobsTcParser {
fn handle_tc_parsing(
&mut self,
tc_buffer: &mut [u8],
tc_receiver: &mut (impl ReceivesTc<Error = TcError> + ?Sized),
conn_result: &mut ConnectionResult,
current_write_idx: usize,
next_write_idx: &mut usize,
) -> Result<(), TcpTmtcError<TmError, TcError>> {
// Reader vec full, need to parse for packets.
conn_result.num_received_tcs += parse_buffer_for_cobs_encoded_packets(
&mut tc_buffer[..current_write_idx],
tc_receiver.upcast_mut(),
next_write_idx,
)
.map_err(|e| TcpTmtcError::TcError(e))?;
Ok(())
}
}
/// Concrete [TcpTmSender] implementation for the [TcpTmtcInCobsServer].
pub struct CobsTmSender {
tm_encoding_buffer: Vec<u8>,
}
impl CobsTmSender {
fn new(tm_buffer_size: usize) -> Self {
Self {
// The buffer should be large enough to hold the maximum expected TM size encoded with
// COBS.
tm_encoding_buffer: vec![0; cobs::max_encoding_length(tm_buffer_size)],
}
}
}
impl<TmError, TcError> TcpTmSender<TmError, TcError> for CobsTmSender {
fn handle_tm_sending(
&mut self,
tm_buffer: &mut [u8],
tm_source: &mut (impl TmPacketSource<Error = TmError> + ?Sized),
conn_result: &mut ConnectionResult,
stream: &mut TcpStream,
) -> Result<bool, TcpTmtcError<TmError, TcError>> {
let mut tm_was_sent = false;
loop {
// Write TM until TM source is exhausted. For now, there is no limit for the amount
// of TM written this way.
let read_tm_len = tm_source
.retrieve_packet(tm_buffer)
.map_err(|e| TcpTmtcError::TmError(e))?;
if read_tm_len == 0 {
return Ok(tm_was_sent);
}
tm_was_sent = true;
conn_result.num_sent_tms += 1;
// Encode into COBS and sent to client.
let mut current_idx = 0;
self.tm_encoding_buffer[current_idx] = 0;
current_idx += 1;
current_idx += encode(
&tm_buffer[..read_tm_len],
&mut self.tm_encoding_buffer[current_idx..],
);
self.tm_encoding_buffer[current_idx] = 0;
current_idx += 1;
stream.write_all(&self.tm_encoding_buffer[..current_idx])?;
}
}
}
/// TCP TMTC server implementation for exchange of generic TMTC packets which are framed with the
/// [COBS protocol](https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing).
///
/// Telemetry will be encoded with the COBS protocol using [cobs::encode] in addition to being
/// wrapped with the sentinel value 0 as the packet delimiter as well before being sent back to
/// the client. Please note that the server will send as much data as it can retrieve from the
/// [TmPacketSource] in its current implementation.
///
/// Using a framing protocol like COBS imposes minimal restrictions on the type of TMTC data
/// exchanged while also allowing packets with flexible size and a reliable way to reconstruct full
/// packets even from a data stream which is split up. The server wil use the
/// [parse_buffer_for_cobs_encoded_packets] function to parse for packets and pass them to a
/// generic TC receiver. The user can use [crate::encoding::encode_packet_with_cobs] to encode
/// telecommands sent to the server.
///
/// ## Example
///
/// The [TCP integration tests](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-core/tests/tcp_servers.rs)
/// test also serves as the example application for this module.
pub struct TcpTmtcInCobsServer<TmError, TcError: 'static> {
generic_server: TcpTmtcGenericServer<TmError, TcError, CobsTmSender, CobsTcParser>,
}
impl<TmError: 'static, TcError: 'static> TcpTmtcInCobsServer<TmError, TcError> {
/// Create a new TCP TMTC server which exchanges TMTC packets encoded with
/// [COBS protocol](https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing).
///
/// ## Parameter
///
/// * `cfg` - Configuration of the server.
/// * `tm_source` - Generic TM source used by the server to pull telemetry packets which are
/// then sent back to the client.
/// * `tc_receiver` - Any received telecommands which were decoded successfully will be
/// forwarded to this TC receiver.
pub fn new(
cfg: ServerConfig,
tm_source: Box<dyn TmPacketSource<Error = TmError>>,
tc_receiver: Box<dyn ReceivesTc<Error = TcError>>,
) -> Result<Self, TcpTmtcError<TmError, TcError>> {
Ok(Self {
generic_server: TcpTmtcGenericServer::new(
cfg,
CobsTcParser::default(),
CobsTmSender::new(cfg.tm_buffer_size),
tm_source,
tc_receiver,
)?,
})
}
delegate! {
to self.generic_server {
pub fn listener(&mut self) -> &mut TcpListener;
/// Can be used to retrieve the local assigned address of the TCP server. This is especially
/// useful if using the port number 0 for OS auto-assignment.
pub fn local_addr(&self) -> std::io::Result<SocketAddr>;
/// Delegation to the [TcpTmtcGenericServer::handle_next_connection] call.
pub fn handle_next_connection(
&mut self,
) -> Result<ConnectionResult, TcpTmtcError<TmError, TcError>>;
}
}
}
#[cfg(test)]
mod tests {
use core::{
sync::atomic::{AtomicBool, Ordering},
time::Duration,
};
use std::{
io::{Read, Write},
net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream},
thread,
};
use crate::{
encoding::tests::{INVERTED_PACKET, SIMPLE_PACKET},
hal::std::tcp_server::{
tests::{SyncTcCacher, SyncTmSource},
ServerConfig,
},
};
use alloc::{boxed::Box, sync::Arc};
use cobs::encode;
use super::TcpTmtcInCobsServer;
fn encode_simple_packet(encoded_buf: &mut [u8], current_idx: &mut usize) {
encode_packet(&SIMPLE_PACKET, encoded_buf, current_idx)
}
fn encode_inverted_packet(encoded_buf: &mut [u8], current_idx: &mut usize) {
encode_packet(&INVERTED_PACKET, encoded_buf, current_idx)
}
fn encode_packet(packet: &[u8], encoded_buf: &mut [u8], current_idx: &mut usize) {
encoded_buf[*current_idx] = 0;
*current_idx += 1;
*current_idx += encode(packet, &mut encoded_buf[*current_idx..]);
encoded_buf[*current_idx] = 0;
*current_idx += 1;
}
fn generic_tmtc_server(
addr: &SocketAddr,
tc_receiver: SyncTcCacher,
tm_source: SyncTmSource,
) -> TcpTmtcInCobsServer<(), ()> {
TcpTmtcInCobsServer::new(
ServerConfig::new(*addr, Duration::from_millis(2), 1024, 1024),
Box::new(tm_source),
Box::new(tc_receiver),
)
.expect("TCP server generation failed")
}
#[test]
fn test_server_basic_no_tm() {
let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
let tc_receiver = SyncTcCacher::default();
let tm_source = SyncTmSource::default();
let mut tcp_server = generic_tmtc_server(&auto_port_addr, tc_receiver.clone(), tm_source);
let dest_addr = tcp_server
.local_addr()
.expect("retrieving dest addr failed");
let conn_handled: Arc<AtomicBool> = Default::default();
let set_if_done = conn_handled.clone();
// Call the connection handler in separate thread, does block.
thread::spawn(move || {
let result = tcp_server.handle_next_connection();
if result.is_err() {
panic!("handling connection failed: {:?}", result.unwrap_err());
}
let conn_result = result.unwrap();
assert_eq!(conn_result.num_received_tcs, 1);
assert_eq!(conn_result.num_sent_tms, 0);
set_if_done.store(true, Ordering::Relaxed);
});
// Send TC to server now.
let mut encoded_buf: [u8; 16] = [0; 16];
let mut current_idx = 0;
encode_simple_packet(&mut encoded_buf, &mut current_idx);
let mut stream = TcpStream::connect(dest_addr).expect("connecting to TCP server failed");
stream
.write_all(&encoded_buf[..current_idx])
.expect("writing to TCP server failed");
drop(stream);
// A certain amount of time is allowed for the transaction to complete.
for _ in 0..3 {
if !conn_handled.load(Ordering::Relaxed) {
thread::sleep(Duration::from_millis(5));
}
}
if !conn_handled.load(Ordering::Relaxed) {
panic!("connection was not handled properly");
}
// Check that the packet was received and decoded successfully.
let mut tc_queue = tc_receiver
.tc_queue
.lock()
.expect("locking tc queue failed");
assert_eq!(tc_queue.len(), 1);
assert_eq!(tc_queue.pop_front().unwrap(), &SIMPLE_PACKET);
drop(tc_queue);
}
#[test]
fn test_server_basic_multi_tm_multi_tc() {
let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
let tc_receiver = SyncTcCacher::default();
let mut tm_source = SyncTmSource::default();
tm_source.add_tm(&INVERTED_PACKET);
tm_source.add_tm(&SIMPLE_PACKET);
let mut tcp_server =
generic_tmtc_server(&auto_port_addr, tc_receiver.clone(), tm_source.clone());
let dest_addr = tcp_server
.local_addr()
.expect("retrieving dest addr failed");
let conn_handled: Arc<AtomicBool> = Default::default();
let set_if_done = conn_handled.clone();
// Call the connection handler in separate thread, does block.
thread::spawn(move || {
let result = tcp_server.handle_next_connection();
if result.is_err() {
panic!("handling connection failed: {:?}", result.unwrap_err());
}
let conn_result = result.unwrap();
assert_eq!(conn_result.num_received_tcs, 2, "Not enough TCs received");
assert_eq!(conn_result.num_sent_tms, 2, "Not enough TMs received");
set_if_done.store(true, Ordering::Relaxed);
});
// Send TC to server now.
let mut encoded_buf: [u8; 32] = [0; 32];
let mut current_idx = 0;
encode_simple_packet(&mut encoded_buf, &mut current_idx);
encode_inverted_packet(&mut encoded_buf, &mut current_idx);
let mut stream = TcpStream::connect(dest_addr).expect("connecting to TCP server failed");
stream
.set_read_timeout(Some(Duration::from_millis(10)))
.expect("setting reas timeout failed");
stream
.write_all(&encoded_buf[..current_idx])
.expect("writing to TCP server failed");
// Done with writing.
stream
.shutdown(std::net::Shutdown::Write)
.expect("shutting down write failed");
let mut read_buf: [u8; 16] = [0; 16];
let mut read_len_total = 0;
// Timeout ensures this does not block forever.
while read_len_total < 16 {
let read_len = stream.read(&mut read_buf).expect("read failed");
read_len_total += read_len;
// Read until full expected size is available.
if read_len == 16 {
// Read first TM packet.
current_idx = 0;
assert_eq!(read_len, 16);
assert_eq!(read_buf[0], 0);
current_idx += 1;
let mut dec_report = cobs::decode_in_place_report(&mut read_buf[current_idx..])
.expect("COBS decoding failed");
assert_eq!(dec_report.dst_used, 5);
// Skip first sentinel byte.
assert_eq!(
&read_buf[current_idx..current_idx + INVERTED_PACKET.len()],
&INVERTED_PACKET
);
current_idx += dec_report.src_used;
// End sentinel.
assert_eq!(read_buf[current_idx], 0, "invalid sentinel end byte");
current_idx += 1;
// Read second TM packet.
assert_eq!(read_buf[current_idx], 0);
current_idx += 1;
dec_report = cobs::decode_in_place_report(&mut read_buf[current_idx..])
.expect("COBS decoding failed");
assert_eq!(dec_report.dst_used, 5);
// Skip first sentinel byte.
assert_eq!(
&read_buf[current_idx..current_idx + SIMPLE_PACKET.len()],
&SIMPLE_PACKET
);
current_idx += dec_report.src_used;
// End sentinel.
assert_eq!(read_buf[current_idx], 0);
break;
}
}
drop(stream);
// A certain amount of time is allowed for the transaction to complete.
for _ in 0..3 {
if !conn_handled.load(Ordering::Relaxed) {
thread::sleep(Duration::from_millis(5));
}
}
if !conn_handled.load(Ordering::Relaxed) {
panic!("connection was not handled properly");
}
// Check that the packet was received and decoded successfully.
let mut tc_queue = tc_receiver
.tc_queue
.lock()
.expect("locking tc queue failed");
assert_eq!(tc_queue.len(), 2);
assert_eq!(tc_queue.pop_front().unwrap(), &SIMPLE_PACKET);
assert_eq!(tc_queue.pop_front().unwrap(), &INVERTED_PACKET);
drop(tc_queue);
}
}

View File

@ -0,0 +1,378 @@
//! Generic TCP TMTC servers with different TMTC format flavours.
use alloc::vec;
use alloc::{boxed::Box, vec::Vec};
use core::time::Duration;
use socket2::{Domain, Socket, Type};
use std::io::Read;
use std::net::TcpListener;
use std::net::{SocketAddr, TcpStream};
use std::thread;
use crate::tmtc::{ReceivesTc, TmPacketSource};
use thiserror::Error;
// Re-export the TMTC in COBS server.
pub use crate::hal::std::tcp_cobs_server::{CobsTcParser, CobsTmSender, TcpTmtcInCobsServer};
pub use crate::hal::std::tcp_spacepackets_server::{
SpacepacketsTcParser, SpacepacketsTmSender, TcpSpacepacketsServer,
};
/// Configuration struct for the generic TCP TMTC server
///
/// ## Parameters
///
/// * `addr` - Address of the TCP server.
/// * `inner_loop_delay` - If a client connects for a longer period, but no TC is received or
/// no TM needs to be sent, the TCP server will delay for the specified amount of time
/// to reduce CPU load.
/// * `tm_buffer_size` - Size of the TM buffer used to read TM from the [TmPacketSource] and
/// encoding of that data. This buffer should at large enough to hold the maximum expected
/// TM size read from the packet source.
/// * `tc_buffer_size` - Size of the TC buffer used to read encoded telecommands sent from
/// the client. It is recommended to make this buffer larger to allow reading multiple
/// consecutive packets as well, for example by using common buffer sizes like 4096 or 8192
/// byte. The buffer should at the very least be large enough to hold the maximum expected
/// telecommand size.
/// * `reuse_addr` - Can be used to set the `SO_REUSEADDR` option on the raw socket. This is
/// especially useful if the address and port are static for the server. Set to false by
/// default.
/// * `reuse_port` - Can be used to set the `SO_REUSEPORT` option on the raw socket. This is
/// especially useful if the address and port are static for the server. Set to false by
/// default.
#[derive(Debug, Copy, Clone)]
pub struct ServerConfig {
pub addr: SocketAddr,
pub inner_loop_delay: Duration,
pub tm_buffer_size: usize,
pub tc_buffer_size: usize,
pub reuse_addr: bool,
pub reuse_port: bool,
}
impl ServerConfig {
pub fn new(
addr: SocketAddr,
inner_loop_delay: Duration,
tm_buffer_size: usize,
tc_buffer_size: usize,
) -> Self {
Self {
addr,
inner_loop_delay,
tm_buffer_size,
tc_buffer_size,
reuse_addr: false,
reuse_port: false,
}
}
}
#[derive(Error, Debug)]
pub enum TcpTmtcError<TmError, TcError> {
#[error("TM retrieval error: {0}")]
TmError(TmError),
#[error("TC retrieval error: {0}")]
TcError(TcError),
#[error("io error: {0}")]
Io(#[from] std::io::Error),
}
/// Result of one connection attempt. Contains the client address if a connection was established,
/// in addition to the number of telecommands and telemetry packets exchanged.
#[derive(Debug, Default)]
pub struct ConnectionResult {
pub addr: Option<SocketAddr>,
pub num_received_tcs: u32,
pub num_sent_tms: u32,
}
/// Generic parser abstraction for an object which can parse for telecommands given a raw
/// bytestream received from a TCP socket and send them to a generic [ReceivesTc] telecommand
/// receiver. This allows different encoding schemes for telecommands.
pub trait TcpTcParser<TmError, TcError> {
fn handle_tc_parsing(
&mut self,
tc_buffer: &mut [u8],
tc_receiver: &mut (impl ReceivesTc<Error = TcError> + ?Sized),
conn_result: &mut ConnectionResult,
current_write_idx: usize,
next_write_idx: &mut usize,
) -> Result<(), TcpTmtcError<TmError, TcError>>;
}
/// Generic sender abstraction for an object which can pull telemetry from a given TM source
/// using a [TmPacketSource] and then send them back to a client using a given [TcpStream].
/// The concrete implementation can also perform any encoding steps which are necessary before
/// sending back the data to a client.
pub trait TcpTmSender<TmError, TcError> {
fn handle_tm_sending(
&mut self,
tm_buffer: &mut [u8],
tm_source: &mut (impl TmPacketSource<Error = TmError> + ?Sized),
conn_result: &mut ConnectionResult,
stream: &mut TcpStream,
) -> Result<bool, TcpTmtcError<TmError, TcError>>;
}
/// TCP TMTC server implementation for exchange of generic TMTC packets in a generic way which
/// stays agnostic to the encoding scheme and format used for both telecommands and telemetry.
///
/// This server implements a generic TMTC handling logic and allows modifying its behaviour
/// through the following 4 core abstractions:
///
/// 1. [TcpTcParser] to parse for telecommands from the raw bytestream received from a client.
/// 2. Parsed telecommands will be sent to the [ReceivesTc] telecommand receiver.
/// 3. [TcpTmSender] to send telemetry pulled from a TM source back to the client.
/// 4. [TmPacketSource] as a generic TM source used by the [TcpTmSender].
///
/// It is possible to specify custom abstractions to build a dedicated TCP TMTC server without
/// having to re-implement common logic.
///
/// Currently, this framework offers the following concrete implementations:
///
/// 1. [TcpTmtcInCobsServer] to exchange TMTC wrapped inside the COBS framing protocol.
pub struct TcpTmtcGenericServer<
TmError,
TcError,
TmHandler: TcpTmSender<TmError, TcError>,
TcHandler: TcpTcParser<TmError, TcError>,
> {
base: TcpTmtcServerBase<TmError, TcError>,
tc_handler: TcHandler,
tm_handler: TmHandler,
}
impl<
TmError: 'static,
TcError: 'static,
TmSender: TcpTmSender<TmError, TcError>,
TcParser: TcpTcParser<TmError, TcError>,
> TcpTmtcGenericServer<TmError, TcError, TmSender, TcParser>
{
/// Create a new generic TMTC server instance.
///
/// ## Parameter
///
/// * `cfg` - Configuration of the server.
/// * `tc_parser` - Parser which extracts telecommands from the raw bytestream received from
/// the client.
/// * `tm_sender` - Sends back telemetry to the client using the specified TM source.
/// * `tm_source` - Generic TM source used by the server to pull telemetry packets which are
/// then sent back to the client.
/// * `tc_receiver` - Any received telecommand which was decoded successfully will be forwarded
/// to this TC receiver.
pub fn new(
cfg: ServerConfig,
tc_parser: TcParser,
tm_sender: TmSender,
tm_source: Box<dyn TmPacketSource<Error = TmError>>,
tc_receiver: Box<dyn ReceivesTc<Error = TcError>>,
) -> Result<TcpTmtcGenericServer<TmError, TcError, TmSender, TcParser>, std::io::Error> {
Ok(Self {
base: TcpTmtcServerBase::new(cfg, tm_source, tc_receiver)?,
tc_handler: tc_parser,
tm_handler: tm_sender,
})
}
/// Retrieve the internal [TcpListener] class.
pub fn listener(&mut self) -> &mut TcpListener {
self.base.listener()
}
/// Can be used to retrieve the local assigned address of the TCP server. This is especially
/// useful if using the port number 0 for OS auto-assignment.
pub fn local_addr(&self) -> std::io::Result<SocketAddr> {
self.base.local_addr()
}
/// This call is used to handle the next connection to a client. Right now, it performs
/// the following steps:
///
/// 1. It calls the [std::net::TcpListener::accept] method internally using the blocking API
/// until a client connects.
/// 2. It reads all the telecommands from the client and parses all received data using the
/// user specified [TcpTcParser].
/// 3. After reading and parsing all telecommands, it sends back all telemetry using the
/// user specified [TcpTmSender].
///
/// The server will delay for a user-specified period if the client connects to the server
/// for prolonged periods and there is no traffic for the server. This is the case if the
/// client does not send any telecommands and no telemetry needs to be sent back to the client.
pub fn handle_next_connection(
&mut self,
) -> Result<ConnectionResult, TcpTmtcError<TmError, TcError>> {
let mut connection_result = ConnectionResult::default();
let mut current_write_idx;
let mut next_write_idx = 0;
let (mut stream, addr) = self.base.listener.accept()?;
stream.set_nonblocking(true)?;
connection_result.addr = Some(addr);
current_write_idx = next_write_idx;
loop {
let read_result = stream.read(&mut self.base.tc_buffer[current_write_idx..]);
match read_result {
Ok(0) => {
// Connection closed by client. If any TC was read, parse for complete packets.
// After that, break the outer loop.
if current_write_idx > 0 {
self.tc_handler.handle_tc_parsing(
&mut self.base.tc_buffer,
self.base.tc_receiver.as_mut(),
&mut connection_result,
current_write_idx,
&mut next_write_idx,
)?;
}
break;
}
Ok(read_len) => {
current_write_idx += read_len;
// TC buffer is full, we must parse for complete packets now.
if current_write_idx == self.base.tc_buffer.capacity() {
self.tc_handler.handle_tc_parsing(
&mut self.base.tc_buffer,
self.base.tc_receiver.as_mut(),
&mut connection_result,
current_write_idx,
&mut next_write_idx,
)?;
current_write_idx = next_write_idx;
}
}
Err(e) => match e.kind() {
// As per [TcpStream::set_read_timeout] documentation, this should work for
// both UNIX and Windows.
std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut => {
self.tc_handler.handle_tc_parsing(
&mut self.base.tc_buffer,
self.base.tc_receiver.as_mut(),
&mut connection_result,
current_write_idx,
&mut next_write_idx,
)?;
current_write_idx = next_write_idx;
if !self.tm_handler.handle_tm_sending(
&mut self.base.tm_buffer,
self.base.tm_source.as_mut(),
&mut connection_result,
&mut stream,
)? {
// No TC read, no TM was sent, but the client has not disconnected.
// Perform an inner delay to avoid burning CPU time.
thread::sleep(self.base.inner_loop_delay);
}
}
_ => {
return Err(TcpTmtcError::Io(e));
}
},
}
}
self.tm_handler.handle_tm_sending(
&mut self.base.tm_buffer,
self.base.tm_source.as_mut(),
&mut connection_result,
&mut stream,
)?;
Ok(connection_result)
}
}
pub(crate) struct TcpTmtcServerBase<TmError, TcError> {
pub(crate) listener: TcpListener,
pub(crate) inner_loop_delay: Duration,
pub(crate) tm_source: Box<dyn TmPacketSource<Error = TmError>>,
pub(crate) tm_buffer: Vec<u8>,
pub(crate) tc_receiver: Box<dyn ReceivesTc<Error = TcError>>,
pub(crate) tc_buffer: Vec<u8>,
}
impl<TmError, TcError> TcpTmtcServerBase<TmError, TcError> {
pub(crate) fn new(
cfg: ServerConfig,
tm_source: Box<dyn TmPacketSource<Error = TmError>>,
tc_receiver: Box<dyn ReceivesTc<Error = TcError>>,
) -> Result<Self, std::io::Error> {
// Create a TCP listener bound to two addresses.
let socket = Socket::new(Domain::IPV4, Type::STREAM, None)?;
socket.set_reuse_address(cfg.reuse_addr)?;
socket.set_reuse_port(cfg.reuse_port)?;
let addr = (cfg.addr).into();
socket.bind(&addr)?;
socket.listen(128)?;
Ok(Self {
listener: socket.into(),
inner_loop_delay: cfg.inner_loop_delay,
tm_source,
tm_buffer: vec![0; cfg.tm_buffer_size],
tc_receiver,
tc_buffer: vec![0; cfg.tc_buffer_size],
})
}
pub(crate) fn listener(&mut self) -> &mut TcpListener {
&mut self.listener
}
pub(crate) fn local_addr(&self) -> std::io::Result<SocketAddr> {
self.listener.local_addr()
}
}
#[cfg(test)]
pub(crate) mod tests {
use std::sync::Mutex;
use alloc::{collections::VecDeque, sync::Arc, vec::Vec};
use crate::tmtc::{ReceivesTcCore, TmPacketSourceCore};
#[derive(Default, Clone)]
pub(crate) struct SyncTcCacher {
pub(crate) tc_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
}
impl ReceivesTcCore for SyncTcCacher {
type Error = ();
fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> {
let mut tc_queue = self.tc_queue.lock().expect("tc forwarder failed");
tc_queue.push_back(tc_raw.to_vec());
Ok(())
}
}
#[derive(Default, Clone)]
pub(crate) struct SyncTmSource {
tm_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
}
impl SyncTmSource {
pub(crate) fn add_tm(&mut self, tm: &[u8]) {
let mut tm_queue = self.tm_queue.lock().expect("locking tm queue failec");
tm_queue.push_back(tm.to_vec());
}
}
impl TmPacketSourceCore for SyncTmSource {
type Error = ();
fn retrieve_packet(&mut self, buffer: &mut [u8]) -> Result<usize, Self::Error> {
let mut tm_queue = self.tm_queue.lock().expect("locking tm queue failed");
if !tm_queue.is_empty() {
let next_vec = tm_queue.front().unwrap();
if buffer.len() < next_vec.len() {
panic!(
"provided buffer too small, must be at least {} bytes",
next_vec.len()
);
}
let next_vec = tm_queue.pop_front().unwrap();
buffer[0..next_vec.len()].copy_from_slice(&next_vec);
return Ok(next_vec.len());
}
Ok(0)
}
}
}

View File

@ -0,0 +1,363 @@
use delegate::delegate;
use std::{
io::Write,
net::{SocketAddr, TcpListener, TcpStream},
};
use alloc::boxed::Box;
use crate::{
encoding::{ccsds::PacketIdLookup, parse_buffer_for_ccsds_space_packets},
tmtc::{ReceivesTc, TmPacketSource},
};
use super::tcp_server::{
ConnectionResult, ServerConfig, TcpTcParser, TcpTmSender, TcpTmtcError, TcpTmtcGenericServer,
};
/// Concrete [TcpTcParser] implementation for the [TcpSpacepacketsServer].
pub struct SpacepacketsTcParser {
packet_id_lookup: Box<dyn PacketIdLookup + Send>,
}
impl SpacepacketsTcParser {
pub fn new(packet_id_lookup: Box<dyn PacketIdLookup + Send>) -> Self {
Self { packet_id_lookup }
}
}
impl<TmError, TcError: 'static> TcpTcParser<TmError, TcError> for SpacepacketsTcParser {
fn handle_tc_parsing(
&mut self,
tc_buffer: &mut [u8],
tc_receiver: &mut (impl ReceivesTc<Error = TcError> + ?Sized),
conn_result: &mut ConnectionResult,
current_write_idx: usize,
next_write_idx: &mut usize,
) -> Result<(), TcpTmtcError<TmError, TcError>> {
// Reader vec full, need to parse for packets.
conn_result.num_received_tcs += parse_buffer_for_ccsds_space_packets(
&mut tc_buffer[..current_write_idx],
self.packet_id_lookup.as_ref(),
tc_receiver.upcast_mut(),
next_write_idx,
)
.map_err(|e| TcpTmtcError::TcError(e))?;
Ok(())
}
}
/// Concrete [TcpTmSender] implementation for the [TcpSpacepacketsServer].
#[derive(Default)]
pub struct SpacepacketsTmSender {}
impl<TmError, TcError> TcpTmSender<TmError, TcError> for SpacepacketsTmSender {
fn handle_tm_sending(
&mut self,
tm_buffer: &mut [u8],
tm_source: &mut (impl TmPacketSource<Error = TmError> + ?Sized),
conn_result: &mut ConnectionResult,
stream: &mut TcpStream,
) -> Result<bool, TcpTmtcError<TmError, TcError>> {
let mut tm_was_sent = false;
loop {
// Write TM until TM source is exhausted. For now, there is no limit for the amount
// of TM written this way.
let read_tm_len = tm_source
.retrieve_packet(tm_buffer)
.map_err(|e| TcpTmtcError::TmError(e))?;
if read_tm_len == 0 {
return Ok(tm_was_sent);
}
tm_was_sent = true;
conn_result.num_sent_tms += 1;
stream.write_all(&tm_buffer[..read_tm_len])?;
}
}
}
/// TCP TMTC server implementation for exchange of tightly stuffed
/// [CCSDS space packets](https://public.ccsds.org/Pubs/133x0b2e1.pdf).
///
/// This serves only works if
/// [CCSDS 133.0-B-2 space packets](https://public.ccsds.org/Pubs/133x0b2e1.pdf) are the only
/// packet type being exchanged. It uses the CCSDS [spacepackets::PacketId] as the packet delimiter
/// and start marker when parsing for packets. The user specifies a set of expected
/// [spacepackets::PacketId]s as part of the server configuration for that purpose.
///
/// ## Example
///
/// The [TCP server integration tests](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-core/tests/tcp_servers.rs)
/// also serves as the example application for this module.
pub struct TcpSpacepacketsServer<TmError, TcError: 'static> {
generic_server:
TcpTmtcGenericServer<TmError, TcError, SpacepacketsTmSender, SpacepacketsTcParser>,
}
impl<TmError: 'static, TcError: 'static> TcpSpacepacketsServer<TmError, TcError> {
/// Create a new TCP TMTC server which exchanges CCSDS space packets.
///
/// ## Parameter
///
/// * `cfg` - Configuration of the server.
/// * `tm_source` - Generic TM source used by the server to pull telemetry packets which are
/// then sent back to the client.
/// * `tc_receiver` - Any received telecommands which were decoded successfully will be
/// forwarded to this TC receiver.
/// * `packet_id_lookup` - This lookup table contains the relevant packets IDs for packet
/// parsing. This mechanism is used to have a start marker for finding CCSDS packets.
pub fn new(
cfg: ServerConfig,
tm_source: Box<dyn TmPacketSource<Error = TmError>>,
tc_receiver: Box<dyn ReceivesTc<Error = TcError>>,
packet_id_lookup: Box<dyn PacketIdLookup + Send>,
) -> Result<Self, TcpTmtcError<TmError, TcError>> {
Ok(Self {
generic_server: TcpTmtcGenericServer::new(
cfg,
SpacepacketsTcParser::new(packet_id_lookup),
SpacepacketsTmSender::default(),
tm_source,
tc_receiver,
)?,
})
}
delegate! {
to self.generic_server {
pub fn listener(&mut self) -> &mut TcpListener;
/// Can be used to retrieve the local assigned address of the TCP server. This is especially
/// useful if using the port number 0 for OS auto-assignment.
pub fn local_addr(&self) -> std::io::Result<SocketAddr>;
/// Delegation to the [TcpTmtcGenericServer::handle_next_connection] call.
pub fn handle_next_connection(
&mut self,
) -> Result<ConnectionResult, TcpTmtcError<TmError, TcError>>;
}
}
}
#[cfg(test)]
mod tests {
use core::{
sync::atomic::{AtomicBool, Ordering},
time::Duration,
};
#[allow(unused_imports)]
use std::println;
use std::{
io::{Read, Write},
net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream},
thread,
};
use alloc::{boxed::Box, sync::Arc};
use hashbrown::HashSet;
use spacepackets::{
ecss::{tc::PusTcCreator, SerializablePusPacket},
PacketId, SpHeader,
};
use crate::hal::std::tcp_server::{
tests::{SyncTcCacher, SyncTmSource},
ServerConfig,
};
use super::TcpSpacepacketsServer;
const TEST_APID_0: u16 = 0x02;
const TEST_PACKET_ID_0: PacketId = PacketId::const_tc(true, TEST_APID_0);
const TEST_APID_1: u16 = 0x10;
const TEST_PACKET_ID_1: PacketId = PacketId::const_tc(true, TEST_APID_1);
fn generic_tmtc_server(
addr: &SocketAddr,
tc_receiver: SyncTcCacher,
tm_source: SyncTmSource,
packet_id_lookup: HashSet<PacketId>,
) -> TcpSpacepacketsServer<(), ()> {
TcpSpacepacketsServer::new(
ServerConfig::new(*addr, Duration::from_millis(2), 1024, 1024),
Box::new(tm_source),
Box::new(tc_receiver),
Box::new(packet_id_lookup),
)
.expect("TCP server generation failed")
}
#[test]
fn test_basic_tc_only() {
let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
let tc_receiver = SyncTcCacher::default();
let tm_source = SyncTmSource::default();
let mut packet_id_lookup = HashSet::new();
packet_id_lookup.insert(TEST_PACKET_ID_0);
let mut tcp_server = generic_tmtc_server(
&auto_port_addr,
tc_receiver.clone(),
tm_source,
packet_id_lookup,
);
let dest_addr = tcp_server
.local_addr()
.expect("retrieving dest addr failed");
let conn_handled: Arc<AtomicBool> = Default::default();
let set_if_done = conn_handled.clone();
// Call the connection handler in separate thread, does block.
thread::spawn(move || {
let result = tcp_server.handle_next_connection();
if result.is_err() {
panic!("handling connection failed: {:?}", result.unwrap_err());
}
let conn_result = result.unwrap();
assert_eq!(conn_result.num_received_tcs, 1);
assert_eq!(conn_result.num_sent_tms, 0);
set_if_done.store(true, Ordering::Relaxed);
});
let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap();
let ping_tc = PusTcCreator::new_simple(&mut sph, 17, 1, None, true);
let mut buffer: [u8; 32] = [0; 32];
let packet_len_ping = ping_tc
.write_to_bytes(&mut buffer)
.expect("writing packet failed");
let mut stream = TcpStream::connect(dest_addr).expect("connecting to TCP server failed");
stream
.write_all(&buffer[..packet_len_ping])
.expect("writing to TCP server failed");
drop(stream);
// A certain amount of time is allowed for the transaction to complete.
for _ in 0..3 {
if !conn_handled.load(Ordering::Relaxed) {
thread::sleep(Duration::from_millis(5));
}
}
if !conn_handled.load(Ordering::Relaxed) {
panic!("connection was not handled properly");
}
// Check that TC has arrived.
let mut tc_queue = tc_receiver.tc_queue.lock().unwrap();
assert_eq!(tc_queue.len(), 1);
assert_eq!(tc_queue.pop_front().unwrap(), buffer[..packet_len_ping]);
}
#[test]
fn test_multi_tc_multi_tm() {
let mut buffer: [u8; 32] = [0; 32];
let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
let tc_receiver = SyncTcCacher::default();
let mut tm_source = SyncTmSource::default();
// Add telemetry
let mut total_tm_len = 0;
let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap();
let verif_tm = PusTcCreator::new_simple(&mut sph, 1, 1, None, true);
let tm_packet_len = verif_tm
.write_to_bytes(&mut buffer)
.expect("writing packet failed");
total_tm_len += tm_packet_len;
let tm_0 = buffer[..tm_packet_len].to_vec();
tm_source.add_tm(&tm_0);
let mut sph = SpHeader::tc_unseg(TEST_APID_1, 0, 0).unwrap();
let verif_tm = PusTcCreator::new_simple(&mut sph, 1, 3, None, true);
let tm_packet_len = verif_tm
.write_to_bytes(&mut buffer)
.expect("writing packet failed");
total_tm_len += tm_packet_len;
let tm_1 = buffer[..tm_packet_len].to_vec();
tm_source.add_tm(&tm_1);
// Set up server
let mut packet_id_lookup = HashSet::new();
packet_id_lookup.insert(TEST_PACKET_ID_0);
packet_id_lookup.insert(TEST_PACKET_ID_1);
let mut tcp_server = generic_tmtc_server(
&auto_port_addr,
tc_receiver.clone(),
tm_source,
packet_id_lookup,
);
let dest_addr = tcp_server
.local_addr()
.expect("retrieving dest addr failed");
let conn_handled: Arc<AtomicBool> = Default::default();
let set_if_done = conn_handled.clone();
// Call the connection handler in separate thread, does block.
thread::spawn(move || {
let result = tcp_server.handle_next_connection();
if result.is_err() {
panic!("handling connection failed: {:?}", result.unwrap_err());
}
let conn_result = result.unwrap();
assert_eq!(
conn_result.num_received_tcs, 2,
"wrong number of received TCs"
);
assert_eq!(conn_result.num_sent_tms, 2, "wrong number of sent TMs");
set_if_done.store(true, Ordering::Relaxed);
});
let mut stream = TcpStream::connect(dest_addr).expect("connecting to TCP server failed");
stream
.set_read_timeout(Some(Duration::from_millis(10)))
.expect("setting reas timeout failed");
// Send telecommands
let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap();
let ping_tc = PusTcCreator::new_simple(&mut sph, 17, 1, None, true);
let packet_len = ping_tc
.write_to_bytes(&mut buffer)
.expect("writing packet failed");
let tc_0 = buffer[..packet_len].to_vec();
stream
.write_all(&tc_0)
.expect("writing to TCP server failed");
let mut sph = SpHeader::tc_unseg(TEST_APID_1, 0, 0).unwrap();
let action_tc = PusTcCreator::new_simple(&mut sph, 8, 0, None, true);
let packet_len = action_tc
.write_to_bytes(&mut buffer)
.expect("writing packet failed");
let tc_1 = buffer[..packet_len].to_vec();
stream
.write_all(&tc_1)
.expect("writing to TCP server failed");
// Done with writing.
stream
.shutdown(std::net::Shutdown::Write)
.expect("shutting down write failed");
let mut read_buf: [u8; 32] = [0; 32];
let mut current_idx = 0;
let mut read_len_total = 0;
// Timeout ensures this does not block forever.
while read_len_total < total_tm_len {
let read_len = stream
.read(&mut read_buf[current_idx..])
.expect("read failed");
current_idx += read_len;
read_len_total += read_len;
}
drop(stream);
assert_eq!(read_buf[..tm_0.len()], tm_0);
assert_eq!(read_buf[tm_0.len()..tm_0.len() + tm_1.len()], tm_1);
// A certain amount of time is allowed for the transaction to complete.
for _ in 0..3 {
if !conn_handled.load(Ordering::Relaxed) {
thread::sleep(Duration::from_millis(5));
}
}
if !conn_handled.load(Ordering::Relaxed) {
panic!("connection was not handled properly");
}
// Check that TC has arrived.
let mut tc_queue = tc_receiver.tc_queue.lock().unwrap();
assert_eq!(tc_queue.len(), 2);
assert_eq!(tc_queue.pop_front().unwrap(), tc_0);
assert_eq!(tc_queue.pop_front().unwrap(), tc_1);
}
}

View File

@ -1,4 +1,4 @@
//! UDP server helper components //! Generic UDP TC server.
use crate::tmtc::{ReceivesTc, ReceivesTcCore}; use crate::tmtc::{ReceivesTc, ReceivesTcCore};
use std::boxed::Box; use std::boxed::Box;
use std::io::{Error, ErrorKind}; use std::io::{Error, ErrorKind};
@ -6,7 +6,8 @@ use std::net::{SocketAddr, ToSocketAddrs, UdpSocket};
use std::vec; use std::vec;
use std::vec::Vec; use std::vec::Vec;
/// This TC server helper can be used to receive raw PUS telecommands thorough a UDP interface. /// This UDP server can be used to receive CCSDS space packet telecommands or any other telecommand
/// format.
/// ///
/// It caches all received telecomands into a vector. The maximum expected telecommand size should /// It caches all received telecomands into a vector. The maximum expected telecommand size should
/// be declared upfront. This avoids dynamic allocation during run-time. The user can specify a TC /// be declared upfront. This avoids dynamic allocation during run-time. The user can specify a TC
@ -19,7 +20,7 @@ use std::vec::Vec;
/// ``` /// ```
/// use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}; /// use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket};
/// use spacepackets::ecss::SerializablePusPacket; /// use spacepackets::ecss::SerializablePusPacket;
/// use satrs_core::hal::host::udp_server::UdpTcServer; /// use satrs_core::hal::std::udp_server::UdpTcServer;
/// use satrs_core::tmtc::{ReceivesTc, ReceivesTcCore}; /// use satrs_core::tmtc::{ReceivesTc, ReceivesTcCore};
/// use spacepackets::SpHeader; /// use spacepackets::SpHeader;
/// use spacepackets::ecss::tc::PusTcCreator; /// use spacepackets::ecss::tc::PusTcCreator;
@ -51,7 +52,7 @@ use std::vec::Vec;
/// .expect("Error sending PUS TC via UDP"); /// .expect("Error sending PUS TC via UDP");
/// ``` /// ```
/// ///
/// The [satrs-example crate](https://egit.irs.uni-stuttgart.de/rust/fsrc-launchpad/src/branch/main/-example) /// The [satrs-example crate](https://egit.irs.uni-stuttgart.de/rust/fsrc-launchpad/src/branch/main/satrs-example)
/// server code also includes /// server code also includes
/// [example code](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-example/src/tmtc.rs#L67) /// [example code](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-example/src/tmtc.rs#L67)
/// on how to use this TC server. It uses the server to receive PUS telecommands on a specific port /// on how to use this TC server. It uses the server to receive PUS telecommands on a specific port
@ -140,7 +141,7 @@ impl<E: 'static> UdpTcServer<E> {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crate::hal::host::udp_server::{ReceiveResult, UdpTcServer}; use crate::hal::std::udp_server::{ReceiveResult, UdpTcServer};
use crate::tmtc::ReceivesTcCore; use crate::tmtc::ReceivesTcCore;
use spacepackets::ecss::tc::PusTcCreator; use spacepackets::ecss::tc::PusTcCreator;
use spacepackets::ecss::SerializablePusPacket; use spacepackets::ecss::SerializablePusPacket;

View File

@ -20,6 +20,8 @@ extern crate downcast_rs;
#[cfg(any(feature = "std", test))] #[cfg(any(feature = "std", test))]
extern crate std; extern crate std;
pub mod cfdp;
pub mod encoding;
pub mod error; pub mod error;
#[cfg(feature = "alloc")] #[cfg(feature = "alloc")]
#[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))] #[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))]

View File

@ -36,12 +36,33 @@ pub trait ReceivesTcCore {
/// Extension trait of [ReceivesTcCore] which allows downcasting by implementing [Downcast] and /// Extension trait of [ReceivesTcCore] which allows downcasting by implementing [Downcast] and
/// is also sendable. /// is also sendable.
#[cfg(feature = "alloc")] #[cfg(feature = "alloc")]
pub trait ReceivesTc: ReceivesTcCore + Downcast + Send {} pub trait ReceivesTc: ReceivesTcCore + Downcast + Send {
// Remove this once trait upcasting coercion has been implemented.
// Tracking issue: https://github.com/rust-lang/rust/issues/65991
fn upcast(&self) -> &dyn ReceivesTcCore<Error = Self::Error>;
// Remove this once trait upcasting coercion has been implemented.
// Tracking issue: https://github.com/rust-lang/rust/issues/65991
fn upcast_mut(&mut self) -> &mut dyn ReceivesTcCore<Error = Self::Error>;
}
/// Blanket implementation to automatically implement [ReceivesTc] when the [alloc] feature /// Blanket implementation to automatically implement [ReceivesTc] when the [alloc] feature
/// is enabled. /// is enabled.
#[cfg(feature = "alloc")] #[cfg(feature = "alloc")]
impl<T> ReceivesTc for T where T: ReceivesTcCore + Send + 'static {} impl<T> ReceivesTc for T
where
T: ReceivesTcCore + Send + 'static,
{
// Remove this once trait upcasting coercion has been implemented.
// Tracking issue: https://github.com/rust-lang/rust/issues/65991
fn upcast(&self) -> &dyn ReceivesTcCore<Error = Self::Error> {
self
}
// Remove this once trait upcasting coercion has been implemented.
// Tracking issue: https://github.com/rust-lang/rust/issues/65991
fn upcast_mut(&mut self) -> &mut dyn ReceivesTcCore<Error = Self::Error> {
self
}
}
#[cfg(feature = "alloc")] #[cfg(feature = "alloc")]
impl_downcast!(ReceivesTc assoc Error); impl_downcast!(ReceivesTc assoc Error);
@ -56,3 +77,41 @@ pub trait ReceivesCcsdsTc {
type Error; type Error;
fn pass_ccsds(&mut self, header: &SpHeader, tc_raw: &[u8]) -> Result<(), Self::Error>; fn pass_ccsds(&mut self, header: &SpHeader, tc_raw: &[u8]) -> Result<(), Self::Error>;
} }
/// Generic trait for a TM packet source, with no restrictions on the type of TM.
/// Implementors write the telemetry into the provided buffer and return the size of the telemetry.
pub trait TmPacketSourceCore {
type Error;
fn retrieve_packet(&mut self, buffer: &mut [u8]) -> Result<usize, Self::Error>;
}
/// Extension trait of [TmPacketSourceCore] which allows downcasting by implementing [Downcast] and
/// is also sendable.
#[cfg(feature = "alloc")]
pub trait TmPacketSource: TmPacketSourceCore + Downcast + Send {
// Remove this once trait upcasting coercion has been implemented.
// Tracking issue: https://github.com/rust-lang/rust/issues/65991
fn upcast(&self) -> &dyn TmPacketSourceCore<Error = Self::Error>;
// Remove this once trait upcasting coercion has been implemented.
// Tracking issue: https://github.com/rust-lang/rust/issues/65991
fn upcast_mut(&mut self) -> &mut dyn TmPacketSourceCore<Error = Self::Error>;
}
/// Blanket implementation to automatically implement [ReceivesTc] when the [alloc] feature
/// is enabled.
#[cfg(feature = "alloc")]
impl<T> TmPacketSource for T
where
T: TmPacketSourceCore + Send + 'static,
{
// Remove this once trait upcasting coercion has been implemented.
// Tracking issue: https://github.com/rust-lang/rust/issues/65991
fn upcast(&self) -> &dyn TmPacketSourceCore<Error = Self::Error> {
self
}
// Remove this once trait upcasting coercion has been implemented.
// Tracking issue: https://github.com/rust-lang/rust/issues/65991
fn upcast_mut(&mut self) -> &mut dyn TmPacketSourceCore<Error = Self::Error> {
self
}
}

View File

@ -0,0 +1,244 @@
//! This serves as both an integration test and an example application showcasing all major
//! features of the TCP COBS server by performing following steps:
//!
//! 1. It defines both a TC receiver and a TM source which are [Sync].
//! 2. A telemetry packet is inserted into the TM source. The packet will be handled by the
//! TCP server after handling all TCs.
//! 3. It instantiates the TCP server on localhost with automatic port assignment and assigns
//! the TC receiver and TM source created previously.
//! 4. It moves the TCP server to a different thread and calls the
//! [TcpTmtcInCobsServer::handle_next_connection] call inside that thread
//! 5. The main threads connects to the server, sends a test telecommand and then reads back
//! the test telemetry insertd in to the TM source previously.
use core::{
sync::atomic::{AtomicBool, Ordering},
time::Duration,
};
use std::{
io::{Read, Write},
net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream},
sync::Mutex,
thread,
};
use hashbrown::HashSet;
use satrs_core::{
encoding::cobs::encode_packet_with_cobs,
hal::std::tcp_server::{ServerConfig, TcpSpacepacketsServer, TcpTmtcInCobsServer},
tmtc::{ReceivesTcCore, TmPacketSourceCore},
};
use spacepackets::{
ecss::{tc::PusTcCreator, SerializablePusPacket},
PacketId, SpHeader,
};
use std::{boxed::Box, collections::VecDeque, sync::Arc, vec::Vec};
#[derive(Default, Clone)]
struct SyncTcCacher {
tc_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
}
impl ReceivesTcCore for SyncTcCacher {
type Error = ();
fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> {
let mut tc_queue = self.tc_queue.lock().expect("tc forwarder failed");
println!("Received TC: {:x?}", tc_raw);
tc_queue.push_back(tc_raw.to_vec());
Ok(())
}
}
#[derive(Default, Clone)]
struct SyncTmSource {
tm_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
}
impl SyncTmSource {
pub(crate) fn add_tm(&mut self, tm: &[u8]) {
let mut tm_queue = self.tm_queue.lock().expect("locking tm queue failec");
tm_queue.push_back(tm.to_vec());
}
}
impl TmPacketSourceCore for SyncTmSource {
type Error = ();
fn retrieve_packet(&mut self, buffer: &mut [u8]) -> Result<usize, Self::Error> {
let mut tm_queue = self.tm_queue.lock().expect("locking tm queue failed");
if !tm_queue.is_empty() {
let next_vec = tm_queue.front().unwrap();
if buffer.len() < next_vec.len() {
panic!(
"provided buffer too small, must be at least {} bytes",
next_vec.len()
);
}
println!("Sending and encoding TM: {:x?}", next_vec);
let next_vec = tm_queue.pop_front().unwrap();
buffer[0..next_vec.len()].copy_from_slice(&next_vec);
return Ok(next_vec.len());
}
Ok(0)
}
}
const SIMPLE_PACKET: [u8; 5] = [1, 2, 3, 4, 5];
const INVERTED_PACKET: [u8; 5] = [5, 4, 3, 4, 1];
const AUTO_PORT_ADDR: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
#[test]
fn test_cobs_server() {
let tc_receiver = SyncTcCacher::default();
let mut tm_source = SyncTmSource::default();
// Insert a telemetry packet which will be read back by the client at a later stage.
tm_source.add_tm(&INVERTED_PACKET);
let mut tcp_server = TcpTmtcInCobsServer::new(
ServerConfig::new(AUTO_PORT_ADDR, Duration::from_millis(2), 1024, 1024),
Box::new(tm_source),
Box::new(tc_receiver.clone()),
)
.expect("TCP server generation failed");
let dest_addr = tcp_server
.local_addr()
.expect("retrieving dest addr failed");
let conn_handled: Arc<AtomicBool> = Default::default();
let set_if_done = conn_handled.clone();
// Call the connection handler in separate thread, does block.
thread::spawn(move || {
let result = tcp_server.handle_next_connection();
if result.is_err() {
panic!("handling connection failed: {:?}", result.unwrap_err());
}
let conn_result = result.unwrap();
assert_eq!(conn_result.num_received_tcs, 1, "No TC received");
assert_eq!(conn_result.num_sent_tms, 1, "No TM received");
// Signal the main thread we are done.
set_if_done.store(true, Ordering::Relaxed);
});
// Send TC to server now.
let mut encoded_buf: [u8; 16] = [0; 16];
let mut current_idx = 0;
encode_packet_with_cobs(&SIMPLE_PACKET, &mut encoded_buf, &mut current_idx);
let mut stream = TcpStream::connect(dest_addr).expect("connecting to TCP server failed");
stream
.write_all(&encoded_buf[..current_idx])
.expect("writing to TCP server failed");
// Done with writing.
stream
.shutdown(std::net::Shutdown::Write)
.expect("shutting down write failed");
let mut read_buf: [u8; 16] = [0; 16];
let read_len = stream.read(&mut read_buf).expect("read failed");
drop(stream);
// 1 byte encoding overhead, 2 sentinel bytes.
assert_eq!(read_len, 8);
assert_eq!(read_buf[0], 0);
assert_eq!(read_buf[read_len - 1], 0);
let decoded_len =
cobs::decode_in_place(&mut read_buf[1..read_len]).expect("COBS decoding failed");
assert_eq!(decoded_len, 5);
// Skip first sentinel byte.
assert_eq!(&read_buf[1..1 + INVERTED_PACKET.len()], &INVERTED_PACKET);
// A certain amount of time is allowed for the transaction to complete.
for _ in 0..3 {
if !conn_handled.load(Ordering::Relaxed) {
thread::sleep(Duration::from_millis(5));
}
}
if !conn_handled.load(Ordering::Relaxed) {
panic!("connection was not handled properly");
}
// Check that the packet was received and decoded successfully.
let mut tc_queue = tc_receiver
.tc_queue
.lock()
.expect("locking tc queue failed");
assert_eq!(tc_queue.len(), 1);
assert_eq!(tc_queue.pop_front().unwrap(), &SIMPLE_PACKET);
drop(tc_queue);
}
const TEST_APID_0: u16 = 0x02;
const TEST_PACKET_ID_0: PacketId = PacketId::const_tc(true, TEST_APID_0);
#[test]
fn test_ccsds_server() {
let mut buffer: [u8; 32] = [0; 32];
let tc_receiver = SyncTcCacher::default();
let mut tm_source = SyncTmSource::default();
let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap();
let verif_tm = PusTcCreator::new_simple(&mut sph, 1, 1, None, true);
let tm_packet_len = verif_tm
.write_to_bytes(&mut buffer)
.expect("writing packet failed");
tm_source.add_tm(&buffer[..tm_packet_len]);
let tm_vec = buffer[..tm_packet_len].to_vec();
let mut packet_id_lookup = HashSet::new();
packet_id_lookup.insert(TEST_PACKET_ID_0);
let mut tcp_server = TcpSpacepacketsServer::new(
ServerConfig::new(AUTO_PORT_ADDR, Duration::from_millis(2), 1024, 1024),
Box::new(tm_source),
Box::new(tc_receiver.clone()),
Box::new(packet_id_lookup),
)
.expect("TCP server generation failed");
let dest_addr = tcp_server
.local_addr()
.expect("retrieving dest addr failed");
let conn_handled: Arc<AtomicBool> = Default::default();
let set_if_done = conn_handled.clone();
// Call the connection handler in separate thread, does block.
thread::spawn(move || {
let result = tcp_server.handle_next_connection();
if result.is_err() {
panic!("handling connection failed: {:?}", result.unwrap_err());
}
let conn_result = result.unwrap();
assert_eq!(conn_result.num_received_tcs, 1);
assert_eq!(conn_result.num_sent_tms, 1);
set_if_done.store(true, Ordering::Relaxed);
});
let mut stream = TcpStream::connect(dest_addr).expect("connecting to TCP server failed");
let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap();
let ping_tc = PusTcCreator::new_simple(&mut sph, 17, 1, None, true);
stream
.set_read_timeout(Some(Duration::from_millis(10)))
.expect("setting reas timeout failed");
let packet_len = ping_tc
.write_to_bytes(&mut buffer)
.expect("writing packet failed");
stream
.write_all(&buffer[..packet_len])
.expect("writing to TCP server failed");
// Done with writing.
stream
.shutdown(std::net::Shutdown::Write)
.expect("shutting down write failed");
let mut read_buf: [u8; 16] = [0; 16];
let mut read_len_total = 0;
// Timeout ensures this does not block forever.
while read_len_total < tm_packet_len {
let read_len = stream.read(&mut read_buf).expect("read failed");
read_len_total += read_len;
assert_eq!(read_buf[..read_len], tm_vec);
}
drop(stream);
// A certain amount of time is allowed for the transaction to complete.
for _ in 0..3 {
if !conn_handled.load(Ordering::Relaxed) {
thread::sleep(Duration::from_millis(5));
}
}
if !conn_handled.load(Ordering::Relaxed) {
panic!("connection was not handled properly");
}
// Check that TC has arrived.
let mut tc_queue = tc_receiver.tc_queue.lock().unwrap();
assert_eq!(tc_queue.len(), 1);
assert_eq!(tc_queue.pop_front().unwrap(), buffer[..packet_len]);
}

View File

@ -1,5 +1,5 @@
use log::{info, warn}; use log::{info, warn};
use satrs_core::hal::host::udp_server::{ReceiveResult, UdpTcServer}; use satrs_core::hal::std::udp_server::{ReceiveResult, UdpTcServer};
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::mpsc::{Receiver, SendError, Sender, TryRecvError}; use std::sync::mpsc::{Receiver, SendError, Sender, TryRecvError};
use std::thread; use std::thread;

View File

@ -23,8 +23,9 @@ version = "1"
optional = true optional = true
[dependencies.satrs-core] [dependencies.satrs-core]
version = "0.1.0-alpha.0" # version = "0.1.0-alpha.0"
# path = "../satrs-core" git = "https://egit.irs.uni-stuttgart.de/rust/sat-rs.git"
rev = "35e1f7a983f6535c5571186e361fe101d4306b89"
[dependencies.satrs-mib-codegen] [dependencies.satrs-mib-codegen]
path = "codegen" path = "codegen"

View File

@ -20,8 +20,9 @@ quote = "1"
proc-macro2 = "1" proc-macro2 = "1"
[dependencies.satrs-core] [dependencies.satrs-core]
version = "0.1.0-alpha.0" # version = "0.1.0-alpha.0"
# path = "../../satrs-core" git = "https://egit.irs.uni-stuttgart.de/rust/sat-rs.git"
rev = "35e1f7a983f6535c5571186e361fe101d4306b89"
[dev-dependencies] [dev-dependencies]
trybuild = { version = "1", features = ["diff"] } trybuild = { version = "1", features = ["diff"] }