diff --git a/Cargo.toml b/Cargo.toml index c9d35a8..eaeb356 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace] - +resolver = "2" members = [ "satrs-core", "satrs-mib", @@ -9,3 +9,4 @@ members = [ exclude = [ "satrs-example-stm32f3-disco", ] + diff --git a/README.md b/README.md index 548c038..585d3a9 100644 --- a/README.md +++ b/README.md @@ -23,7 +23,7 @@ This project currently contains following crates: on a host computer or on any system with a standard runtime like a Raspberry Pi. * [`satrs-mib`](https://egit.irs.uni-stuttgart.de/rust/satrs-launchpad/src/branch/main/satrs-mib): Components to build a mission information base from the on-board software directly. -* [`satrs-example-stm32f3-disco`](https://egit.irs.uni-stuttgart.de/rust/satrs-example-stm32f3-disco): +* [`satrs-example-stm32f3-disco`](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-example-stm32f3-disco): Example of a simple example on-board software using sat-rs components on a bare-metal system with constrained resources. diff --git a/automation/Jenkinsfile b/automation/Jenkinsfile index 44946f7..0770614 100644 --- a/automation/Jenkinsfile +++ b/automation/Jenkinsfile @@ -8,6 +8,11 @@ pipeline { } stages { + stage('Rust Toolchain Info') { + steps { + sh 'rustc --version' + } + } stage('Clippy') { steps { sh 'cargo clippy' @@ -15,7 +20,9 @@ pipeline { } stage('Docs') { steps { - sh 'cargo +nightly doc --all-features' + catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') { + sh 'cargo +nightly doc --all-features' + } } } stage('Rustfmt') { diff --git a/satrs-book/.gitignore b/satrs-book/.gitignore new file mode 100644 index 0000000..7585238 --- /dev/null +++ b/satrs-book/.gitignore @@ -0,0 +1 @@ +book diff --git a/satrs-book/book.toml b/satrs-book/book.toml new file mode 100644 index 0000000..8d01bb6 --- /dev/null +++ b/satrs-book/book.toml @@ -0,0 +1,6 @@ +[book] +authors = ["Robin Mueller"] +language = "en" +multilingual = false +src = "src" +title = "The sat-rs book" diff --git a/satrs-book/src/SUMMARY.md b/satrs-book/src/SUMMARY.md new file mode 100644 index 0000000..995e8a9 --- /dev/null +++ b/satrs-book/src/SUMMARY.md @@ -0,0 +1,19 @@ +# Summary + +- [Introduction](./introduction.md) +- [Design](./design.md) +- [Communication with Space Systems](./communication.md) +- [Working with Constrained Systems](./constrained-systems.md) +- [Actions](./actions.md) +- [Modes and Health](./modes-and-health.md) +- [Housekeeping Data](./housekeeping.md) +- [Events](./events.md) +- [Power Components](./power.md) +- [Thermal Components](./thermal.md) +- [Persistent TM storage](./persistent-tm-storage.md) +- [FDIR](./fdir.md) +- [Serialization of Data](./serialization.md) +- [Logging](./logging.md) +- [Modelling space systems](./modelling-space-systems.md) +- [Ground Segments](./ground-segments.md) + diff --git a/satrs-book/src/actions.md b/satrs-book/src/actions.md new file mode 100644 index 0000000..0e092d9 --- /dev/null +++ b/satrs-book/src/actions.md @@ -0,0 +1,42 @@ +# Working with Actions + +Space systems generally need to be commanded regularly. This can include commands periodically +required to ensure a healthy system, or commands to reach the mission goals. + +These commands can be modelled using the concept of Actions. the ECSS PUS standard also provides +the PUS service 8 for actions, but provides few concrete subservices and specification on how +action commanding could look like. + +`sat-rs` proposes two recommended ways to perform action commanding: + +1. Target ID and Action ID based. The target ID is a 32-bit unsigned ID for an OBSW object entity + which can also accept Actions. The action ID is a 32-bit unsigned ID for each action that a + target is able to perform. +2. Target ID and Action String based. The target ID is the same as in the first proposal, but + the unique action is identified by a string. + +The framework provides an `ActionRequest` abstraction to model both of these cases. + +## Commanding with ECSS PUS 8 + +`sat-rs` provides a generic ECSS PUS 8 action command handler. This handler can convert PUS 8 +telecommands which use the commanding scheme 1 explained above to an `ActionRequest` which is +then forwarded to the target specified by the Target ID. + +There are 3 requirements for the PUS 8 telecommand: + +1. The subservice 128 must be used +2. Bytes 0 to 4 of application data must contain the target ID in `u32` big endian format. +3. Bytes 4 to 8 of application data must contain the action ID in `u32` big endian format. +4. The rest of the application data are assumed to be command specific additional parameters. They + will be added to an IPC store and the corresponding store address will be sent as part of the + `ActionRequest`. + +## Sending back telemetry + +There are some cases where the regular verification provided by PUS in response to PUS action +commands is not sufficient and some additional telemetry needs to be sent to ground. In that +case, it is recommended to chose some custom subservice for action TM data and then send the +telemetry using the same scheme as shown above, where the first 8 bytes of the application +data is reserved for the target ID and action ID. + diff --git a/satrs-book/src/communication.md b/satrs-book/src/communication.md new file mode 100644 index 0000000..447ebfd --- /dev/null +++ b/satrs-book/src/communication.md @@ -0,0 +1,46 @@ +# Communication with sat-rs based software + +Communication is a huge topic for space systems. Remote systems are usually not (directly) +connected to the internet and only have 1-2 communication links during nominal operation. However, +most of these systems have internet access during development cycle. There are various standards +provided by CCSDS and ECSS which can be useful to determine how to communicate with the satellite +and the primary On-Board Software. + +# Application layer + +Most communication with space systems is usually packet based. For example, the CCSDS space +packet standard only specifies a 6 byte header with at least 1 byte payload. The PUS packet +standard is a subset of the space packet standard, which adds some fields and a 16 bit CRC, but +it is still centered around small packets. `sat-rs` provides support for these ECSS and CCSDS +standards and also attempts to fill the gap to the internet protocol by providing the following +components. + +1. [UDP TMTC Server](https://docs.rs/satrs-core/0.1.0-alpha.0/satrs_core/hal/host/udp_server/index.html#). + UDP is already packet based which makes it an excellent fit for exchanging space packets. +2. TCP TMTC Server. This is a stream based protocol, so the server uses the COBS framing protocol + to always deliver complete packets. + +# Working with telemetry and telecommands (TMTC) + +The commands sent to a space system are commonly called telecommands (TC) while the data received +from it are called telemetry (TM). Keeping in mind the previous section, the concept of a TC source +and a TM sink can be applied to most satellites. The TM sink is the one entity where all generated +telemetry arrives in real-time. The most important task of the TM sink usually is to send all +arriving telemetry to the ground segment of a satellite mission immediately. Another important +task might be to store all arriving telemetry persistently. This is especially important for +space systems which do not have permanent contact like low-earth-orbit (LEO) satellites. + +The most important task of a TC source is to deliver the telecommands to the correct recipients. +For modern component oriented software using message passing, this usually includes staged +demultiplexing components to determine where a command needs to be sent. + +# Low-level protocols and the bridge to the communcation subsystem + +Many satellite systems usually use the lower levels of the OSI layer in addition to the application +layer covered by the PUS standard or the CCSDS space packets standard. This oftentimes requires +special hardware like dedicated FPGAs to handle forward error correction fast enough. `sat-rs` +might provide components to handle standard like the Unified Space Data Link Standard (USLP) in +software but most of the time the handling of communication is performed through custom +software and hardware. Still, connecting this custom software and hardware to `sat-rs` can mostly +be done by using the concept of TC sources and TM sinks mentioned previously. + diff --git a/satrs-book/src/constrained-systems.md b/satrs-book/src/constrained-systems.md new file mode 100644 index 0000000..4dbdb7a --- /dev/null +++ b/satrs-book/src/constrained-systems.md @@ -0,0 +1,57 @@ +# Working with Constrained Systems + +Software for space systems oftentimes has different requirements than the software for host +systems or servers. Currently, most space systems are considered embedded systems. + +For these systems, the computation power and the available heap are the most important resources +which are constrained. This might make completeley heap based memory management schemes which +are oftentimes used on host and server based systems unfeasable. Still, completely forbidding +heap allocations might make software development unnecessarilly difficult, especially in a +time where the OBSW might be running on Linux based systems with hundreds of MBs of RAM. + +A useful pattern used commonly in space systems is to limit heap allocations to program +initialization time and avoid frequent run-time allocations. This prevents issues like +running out of memory (something even Rust can not protect from) or heap fragmentation. + +# Using pre-allocated pool structures + +A huge candidate for heap allocations is the TMTC and handling. TC, TMs and IPC data are all +candidates where the data size might vary greatly. The regular solution for host systems +might be to send around this data as a `Vec` until it is dropped. `sat-rs` provides +another solution to avoid run-time allocations by offering and recommendng pre-allocated static +pools. + +These pools are split into subpools where each subpool can have different page sizes. +For example, a very small TC pool might look like this: + +TODO: Add image + +A TC entry inside this pool has a store address which can then be sent around without having +to dynamically allocate memory. The same principle can also be applied to the TM and IPC data. + +# Using special crates to prevent smaller allocations + +Another common way to use the heap on host systems is using containers like `String` and `Vec` +to work with data where the size is not known beforehand. The most common solution for embedded +systems is to determine the maximum expected size and then use a pre-allocated `u8` buffer and a +size variable. Alternatively, you can use the following crates for more convenience or a smart +behaviour which at the very least reduce heap allocations: + +1. [`smallvec`](https://docs.rs/smallvec/latest/smallvec/). +2. [`arrayvec`](https://docs.rs/arrayvec/latest/arrayvec/index.html) which also contains an + [`ArrayString`](https://docs.rs/arrayvec/latest/arrayvec/struct.ArrayString.html) helper type. +3. [`tinyvec`](https://docs.rs/tinyvec/latest/tinyvec/). + +# Using a fixed amount of threads + +On host systems, it is a common practice to dynamically spawn new threads to handle workloads. +On space systems this is generally considered an anti-pattern as this is considered undeterministic +and might lead to similar issues like when dynamically using the heap. For example, spawning a new +thread might use up the remaining heap of a system, leading to undeterministic errors. + +The most common way to avoid this is to simply spawn all required threads at program initialization +time. If a thread is done with its task, it can go back to sleeping regularly, only occasionally +checking for new jobs. If a system still needs to handle bursty concurrent loads, another possible +way commonly used for host systems as well would be to use a threadpool, for example by using the +[`threadpool`](https://crates.io/crates/threadpool) crate. + diff --git a/satrs-book/src/design.md b/satrs-book/src/design.md new file mode 100644 index 0000000..9ec7317 --- /dev/null +++ b/satrs-book/src/design.md @@ -0,0 +1,57 @@ +# Framework Design + +Satellites and space systems in general are complex systems with a wide range of requirements for +both the hardware and the software. Consequently, the general design of the framework is centered +around many light-weight components which try to impose as few restrictions as possible on how to +solve certain problems. + +There are still a lot of common patterns and architectures across these systems where guidance +of how to solve a problem and a common structure would still be extremely useful to avoid pitfalls +which were already solved and to avoid boilerplate code. This framework tries to provide this +structure and guidance the following way: + +1. Providing this book which explains the architecture and design patterns in respect to common + issues and requirements of space systems. +2. Providing an example application. Space systems still commonly have large monolithic + primary On-Board Softwares, so the choice was made to provide one example software which + contains the various features provided by sat-rs. +3. Providing a good test suite. This includes both unittests and integration tests. The integration + tests can also serve as smaller usage examples than the large `satrs-example` application. + +This framework has special support for standards used in the space industry. This especially +includes standards provided by Consultative Committee for Space Data Systems (CCSDS) and European +Cooperation for Space Standardization (ECSS). It does not enforce using any of those standards, +but it is always recommended to use some sort of standard for interoperability. + +A lot of the modules and design considerations are based on the Flight Software Framework (FSFW). +The FSFW has its own [documentation](https://documentation.irs.uni-stuttgart.de/fsfw/), which +will be referred to when applicable. The FSFW was developed over a period of 10 years for the +Flying Laptop Project by the University of Stuttgart with Airbus Defence and Space GmbH. +It has flight heritage through the 2 mssions [FLP](https://www.irs.uni-stuttgart.de/en/research/satellitetechnology-and-instruments/smallsatelliteprogram/flying-laptop/) +and [EIVE](https://www.irs.uni-stuttgart.de/en/research/satellitetechnology-and-instruments/smallsatelliteprogram/EIVE/). +Therefore, a lot of the design concepts were ported more or less unchanged to the `sat-rs` +framework. +FLP is a medium-size small satellite with a higher budget and longer development time than EIVE, +which allowed to build a highly reliable system while EIVE is a smaller 6U+ cubesat which had a +shorter development cycle and was built using cheaper COTS components. This framework also tries +to accumulate the knowledge of developing the OBSW and operating the satellite for both these +different systems and provide a solution for a wider range of small satellite systems. + +`sat-rs` can be seen as a modern port of the FSFW which uses common principles of software +engineering to provide a reliable and robust basis for space On-Board Software. The choice +of using the Rust programming language was made for the following reasons: + +1. Rust has safety guarantees which are a perfect fit for space systems which generally have high + robustness and reliablity guarantees. +2. Rust is suitable for embedded systems. It can also be run on smaller embedded systems like the + STM32 which have also become common in the space sector. All space systems are embedded systems, + which makes using large languages like Python challenging even for OBCs with more performance. +3. Rust has support for linking C APIs through its excellent FFI support. This is especially + important because many vendor provided libaries are still C based. +4. Modern tooling like a package managers and various development helper, which can further reduce + development cycles for space systems. `cargo` provides tools like auto-formatters and linters + which can immediately ensure a high software quality throughout each development cycle. +5. A large ecosystem with excellent libraries which also leverages the excellent tooling provided + previously. Integrating these libraries is a lot easier compared to languages like C/C++ where + there is still no standardized way to use packages. + diff --git a/satrs-book/src/events.md b/satrs-book/src/events.md new file mode 100644 index 0000000..083e76a --- /dev/null +++ b/satrs-book/src/events.md @@ -0,0 +1,16 @@ +# Events + +Events can be an extremely important mechanism used for remote systems to monitor unexpected +or expected anomalies and events occuring on these systems. They are oftentimes tied to +Fault Detection, Isolation and Recovery (FDIR) operations, which need to happen autonomously. + +Events can also be used as a convenient Inter-Process Communication (IPC) mechansism, which is +also observable for the Ground segment. The PUS Service 5 standardizes how the ground interface +for events might look like, but does not specify how other software components might react +to those events. There is the PUS Service 19, which might be used for that purpose, but the +event components recommended by this framework do not really need this service. + +The following images shows how the flow of events could look like in a system where components +can generate events, and where other system components might be interested in those events: + +![Event flow](images/event_man_arch.png) diff --git a/satrs-book/src/fdir.md b/satrs-book/src/fdir.md new file mode 100644 index 0000000..074b2ad --- /dev/null +++ b/satrs-book/src/fdir.md @@ -0,0 +1 @@ +# Fault Detecion, Isolation And Recovery (FDIR) diff --git a/satrs-book/src/ground-segments.md b/satrs-book/src/ground-segments.md new file mode 100644 index 0000000..58672b2 --- /dev/null +++ b/satrs-book/src/ground-segments.md @@ -0,0 +1 @@ +# Ground Segments diff --git a/satrs-book/src/housekeeping.md b/satrs-book/src/housekeeping.md new file mode 100644 index 0000000..5a7d73b --- /dev/null +++ b/satrs-book/src/housekeeping.md @@ -0,0 +1,24 @@ +# Housekeeping Data + +Remote systems like satellites and rovers oftentimes generate data autonomously and periodically. +The most common example for this is temperature or attitude data. Data like this is commonly +referred to as housekeeping data, and is usually one of the most important and most resource heavy +data sources received from a satellite. Standards like the PUS Service 3 make recommendation how to +expose housekeeping data, but the applicability of the interface offered by PUS 3 has proven to be +partially difficult and clunky for modular systems. + +First, we are going to list some assumption and requirements about Housekeeping (HK) data: + +1. HK data is generated periodically by various system components throughout the + systems. +2. An autonomous and periodic sampling of that HK data to be stored and sent to Ground is generally + required. A minimum interface consists of requesting a one-shot sample of HK, enabling and + disabling the periodic autonomous generation of samples and modifying the collection interval + of the periodic autonomous generation. +3. HK data often needs to be shared to other software components. For example, a thermal controller + wants to read the data samples of all sensor components. + +A commonly required way to model HK data in a clean way is also to group related HK data into sets, +which can then dumped via a similar interface. + +TODO: Write down `sat-rs` recommendations how to expose and work with HK data. diff --git a/satrs-book/src/images/event_man_arch.graphml b/satrs-book/src/images/event_man_arch.graphml new file mode 100644 index 0000000..1336793 --- /dev/null +++ b/satrs-book/src/images/event_man_arch.graphml @@ -0,0 +1,259 @@ + + + + + + + + + + + + + + + + + + + + + + + Example Event Flow + + + + + + + + + + + Event Manager + + + + + + + + + + + Event +Creator 0 + + + + + + + + + + + Event +Creator 2 + + + + + + + + + + + Event +Creator 1 + + + + + + + + + + + Event +Creator 3 + + + + + + + + + + + PUS Service 5 +Event Reporting + + + + + + + + + + + + PUS Service 19 +Event Action + + + + + + + + + + + Telemetry +Sink + + + + + + + + + + + Subscriptions + +1. Event Creator 0 subscribes + for event 0 +2. Event Creator 1 subscribes + for event group 2 +3. PUS Service 5 handler + subscribes for all events +4. PUS Service 19 handler + subscribes for all events + + + + + + + + + + + event 1 +(group 1) + + + + + + + + + + + + + event 0 +(group 0) + + + + + + + + + + + event 2 +(group 3) + + + + + + + + + + + + + event 3 (group 2) +event 4 (group 2) + + + + + + + + + + + <<all events>> + + + + + + + + + + + <<all events>> + + + + + + + + + + + + + event 1 +event 2 + + + + + + + + + + + group 2 + + + + + + + + + + + enabled Events +as PUS 5 TM + + + + + + + + + diff --git a/satrs-book/src/images/event_man_arch.png b/satrs-book/src/images/event_man_arch.png new file mode 100644 index 0000000..61c8d72 Binary files /dev/null and b/satrs-book/src/images/event_man_arch.png differ diff --git a/satrs-book/src/introduction.md b/satrs-book/src/introduction.md new file mode 100644 index 0000000..31a0b0c --- /dev/null +++ b/satrs-book/src/introduction.md @@ -0,0 +1,23 @@ +The sat-rs book +====== + +This book is the primary information resource for the [sat-rs framework](https://egit.irs.uni-stuttgart.de/rust/sat-rs) +in addition to the regular API documentation. It contains the following resources: + +1. Architecture informations and consideration which would exceeds the scope of the regular API. +2. General information on how to build On-Board Software and how `sat-rs` can help to fulfill + the unique requirements of writing software for remote systems. +2. A Getting-Started workshop where a small On-Board Software is built from scratch using + sat-rs components. + +# Introduction + +The primary goal of the sat-rs framework is to provide re-usable components +to write on-board software for remote systems like rovers or satellites. It is specifically written +for the special requirements for these systems. + +A lot of the architecture and general design considerations are based on the +[FSFW](https://egit.irs.uni-stuttgart.de/fsfw/fsfw) C++ framework which has flight heritage +through the 2 missions [FLP](https://www.irs.uni-stuttgart.de/en/research/satellitetechnology-and-instruments/smallsatelliteprogram/flying-laptop/) +and [EIVE](https://www.irs.uni-stuttgart.de/en/research/satellitetechnology-and-instruments/smallsatelliteprogram/EIVE/). + diff --git a/satrs-book/src/logging.md b/satrs-book/src/logging.md new file mode 100644 index 0000000..b921bbe --- /dev/null +++ b/satrs-book/src/logging.md @@ -0,0 +1 @@ +# Logging diff --git a/satrs-book/src/modelling-space-systems.md b/satrs-book/src/modelling-space-systems.md new file mode 100644 index 0000000..c42d084 --- /dev/null +++ b/satrs-book/src/modelling-space-systems.md @@ -0,0 +1 @@ +# Modelling Space Systems diff --git a/satrs-book/src/modes-and-health.md b/satrs-book/src/modes-and-health.md new file mode 100644 index 0000000..4cb6878 --- /dev/null +++ b/satrs-book/src/modes-and-health.md @@ -0,0 +1,102 @@ +# Modes + +Modes are an extremely useful concept for complex system in general. They also allow simplified +system reasoning for both system operators and OBSW developers. They model the behaviour of a +component and also provide observability of a system. A few examples of how to model +different components of a space system with modes will be given. + +## Modelling a pyhsical devices with modes + +The following simple mode scheme with the following three mode + +- `OFF` +- `ON` +- `NORMAL` + +can be applied to a large number of simpler devices of a remote system, for example sensors. + +1. `OFF` means that a device is physically switched off, and the corresponding software component +does not poll the device regularly. +2. `ON` means that a device is pyhsically switched on, but the device is not polled perically. +3. `NORMAL` means that a device is powered on and polled periodically. + +If a devices is `OFF`, the device handler will deny commands which include physical communication +with the connected devices. In `NORMAL` mode, it will autonomously perform periodic polling +of a connected physical device in addition to handling remote commands by the operator. +Using these three basic modes, there are two important transitions which need to be taken care of +for the majority of devices: + +1. `OFF` to `ON` or `NORMAL`: The device first needs to be powered on. After that, the + device initial startup configuration must be performed. +2. `NORMAL` or `ON` to `OFF`: Any important shutdown configuration or handling must be performed + before powering off the device. + +## Modelling a controller with modes + +Controller components are not modelling physical devices, but a mode scheme is still the best +way to model most of these components. + +For example, a hypothetical attitude controller might have the following modes: + +- `SAFE` +- `TARGET IDLE` +- `TARGET POINTING GROUND` +- `TARGET POINTING NADIR` + +We can also introduce the concept of submodes: The `SAFE` mode can for example have a +`DEFAULT` submode and a `DETUMBLE` submode. + +## Achieving system observability with modes + +If a system component has a mode in some shape or form, this mode should be observable. This means +that the operator can also retrieve the mode for a particular component. This is especially +important if these components can change their mode autonomously. + +If a component is able to change its mode autonomously, this is also something which is relevant +information for the operator or for other software components. This means that a component +should also be able to announce its mode. + +This concept becomes especially important when applying the mode concept on the whole +system level. This will also be explained in detail in a dedicated chapter, but the basic idea +is to model the whole system as a tree where each node has a mode. A new capability is added now: +A component can announce its mode recursively. This means that the component will announce its +own mode first before announcing the mode of all its children. Using a scheme like this, the mode +of the whole system can be retrieved using only one command. The same concept can also be used +for commanding the whole system, which will be explained in more detail in the dedicated systems +modelling chapter. + +In summary, a component which has modes has to expose the following 4 capabilities: + +1. Set a mode +2. Read the mode +3. Announce the mode +4. Announce the mode recursively + +## Using ECSS PUS to perform mode commanding + +# Health + +Health is an important concept for systems and components which might fail. +Oftentimes, the health is tied to the mode of a system component in some shape or form, and +determines whether a system component is usable. Health is also an extremely useful concept +to simplify the Fault Detection, Isolation and Recovery (FDIR) concept of a system. + +The following health states are based on the ones used inside the FSFW and are enough to model most +use-cases: + +- `HEALTHY` +- `FAULTY` +- `NEEDS RECOVERY` +- `EXTERNAL CONTROL` + +1. `HEALTHY` means that a component is working nominally, and can perform its task without any issues. +2. `FAULTY` means that a component does not work properly. This might also impact other system +components, so the passivation and isolation of that component is desirable for FDIR purposes. +3. `NEEDS RECOVERY` is used to attempt a recovery of a component. For example, a simple sensor +could be power-cycled if there were multiple communication issues in the last time. +4. `EXTERNAL CONTROL` is used to isolate an individual component from the rest of the system. For + example, on operator might be interested in testing a component in isolation, and the interference + of the system is not desired. In that case, the `EXTERNAL CONTROL` health state might be used + to prevent mode commands from the system while allowing external mode commands. + + diff --git a/satrs-book/src/persistent-tm-storage.md b/satrs-book/src/persistent-tm-storage.md new file mode 100644 index 0000000..77d4f50 --- /dev/null +++ b/satrs-book/src/persistent-tm-storage.md @@ -0,0 +1 @@ +# Persistent Telemetry (TM) Storage diff --git a/satrs-book/src/power.md b/satrs-book/src/power.md new file mode 100644 index 0000000..b1ae539 --- /dev/null +++ b/satrs-book/src/power.md @@ -0,0 +1 @@ +# Power Components diff --git a/satrs-book/src/serialization.md b/satrs-book/src/serialization.md new file mode 100644 index 0000000..0dfc62d --- /dev/null +++ b/satrs-book/src/serialization.md @@ -0,0 +1 @@ +# Serialization diff --git a/satrs-book/src/thermal.md b/satrs-book/src/thermal.md new file mode 100644 index 0000000..abcbb57 --- /dev/null +++ b/satrs-book/src/thermal.md @@ -0,0 +1 @@ +# Thermal Components diff --git a/satrs-core/Cargo.toml b/satrs-core/Cargo.toml index 284b84a..4b75576 100644 --- a/satrs-core/Cargo.toml +++ b/satrs-core/Cargo.toml @@ -66,14 +66,24 @@ version = "1" default-features = false optional = true +[dependencies.socket2] +version = "0.5.4" +features = ["all"] +optional = true + [dependencies.spacepackets] -version = "0.7.0-beta.1" +# version = "0.7.0-beta.1" # path = "../../spacepackets" -# git = "https://egit.irs.uni-stuttgart.de/rust/spacepackets.git" -# rev = "" +git = "https://egit.irs.uni-stuttgart.de/rust/spacepackets.git" +rev = "79d26e1a6" # branch = "" default-features = false +[dependencies.cobs] +git = "https://github.com/robamu/cobs.rs.git" +branch = "all_features" +default-features = false + [dev-dependencies] serde = "1" zerocopy = "0.7" @@ -87,22 +97,23 @@ version = "1" [features] default = ["std"] std = [ - "downcast-rs/std", - "alloc", - "bus", - "postcard/use-std", - "crossbeam-channel/std", - "serde/std", - "spacepackets/std", - "num_enum/std", - "thiserror", + "downcast-rs/std", + "alloc", + "bus", + "postcard/use-std", + "crossbeam-channel/std", + "serde/std", + "spacepackets/std", + "num_enum/std", + "thiserror", + "socket2" ] alloc = [ - "serde/alloc", - "spacepackets/alloc", - "hashbrown", - "dyn-clone", - "downcast-rs" + "serde/alloc", + "spacepackets/alloc", + "hashbrown", + "dyn-clone", + "downcast-rs" ] serde = ["dep:serde", "spacepackets/serde"] crossbeam = ["crossbeam-channel"] diff --git a/satrs-core/src/encoding/ccsds.rs b/satrs-core/src/encoding/ccsds.rs new file mode 100644 index 0000000..f8f775f --- /dev/null +++ b/satrs-core/src/encoding/ccsds.rs @@ -0,0 +1,269 @@ +#[cfg(feature = "alloc")] +use alloc::vec::Vec; +#[cfg(feature = "alloc")] +use hashbrown::HashSet; +use spacepackets::PacketId; + +use crate::tmtc::ReceivesTcCore; + +pub trait PacketIdLookup { + fn validate(&self, packet_id: u16) -> bool; +} + +#[cfg(feature = "alloc")] +impl PacketIdLookup for Vec { + fn validate(&self, packet_id: u16) -> bool { + self.contains(&packet_id) + } +} + +#[cfg(feature = "alloc")] +impl PacketIdLookup for HashSet { + fn validate(&self, packet_id: u16) -> bool { + self.contains(&packet_id) + } +} + +impl PacketIdLookup for [u16] { + fn validate(&self, packet_id: u16) -> bool { + self.binary_search(&packet_id).is_ok() + } +} + +#[cfg(feature = "alloc")] +impl PacketIdLookup for Vec { + fn validate(&self, packet_id: u16) -> bool { + self.contains(&PacketId::from(packet_id)) + } +} +#[cfg(feature = "alloc")] +impl PacketIdLookup for HashSet { + fn validate(&self, packet_id: u16) -> bool { + self.contains(&PacketId::from(packet_id)) + } +} + +impl PacketIdLookup for [PacketId] { + fn validate(&self, packet_id: u16) -> bool { + self.binary_search(&PacketId::from(packet_id)).is_ok() + } +} + +/// This function parses a given buffer for tightly packed CCSDS space packets. It uses the +/// [PacketId] field of the CCSDS packets to detect the start of a CCSDS space packet and then +/// uses the length field of the packet to extract CCSDS packets. +/// +/// This function is also able to deal with broken tail packets at the end as long a the parser +/// can read the full 7 bytes which constitue a space packet header plus one byte minimal size. +/// If broken tail packets are detected, they are moved to the front of the buffer, and the write +/// index for future write operations will be written to the `next_write_idx` argument. +/// +/// The parser will write all packets which were decoded successfully to the given `tc_receiver` +/// and return the number of packets found. If the [ReceivesTcCore::pass_tc] calls fails, the +/// error will be returned. +pub fn parse_buffer_for_ccsds_space_packets( + buf: &mut [u8], + packet_id_lookup: &(impl PacketIdLookup + ?Sized), + tc_receiver: &mut impl ReceivesTcCore, + next_write_idx: &mut usize, +) -> Result { + *next_write_idx = 0; + let mut packets_found = 0; + let mut current_idx = 0; + let buf_len = buf.len(); + loop { + if current_idx + 7 >= buf.len() { + break; + } + let packet_id = u16::from_be_bytes(buf[current_idx..current_idx + 2].try_into().unwrap()); + if packet_id_lookup.validate(packet_id) { + let length_field = + u16::from_be_bytes(buf[current_idx + 4..current_idx + 6].try_into().unwrap()); + let packet_size = length_field + 7; + if (current_idx + packet_size as usize) < buf_len { + tc_receiver.pass_tc(&buf[current_idx..current_idx + packet_size as usize])?; + packets_found += 1; + } else { + // Move packet to start of buffer if applicable. + if current_idx > 0 { + buf.copy_within(current_idx.., 0); + *next_write_idx = buf.len() - current_idx; + } + } + current_idx += packet_size as usize; + continue; + } + current_idx += 1; + } + Ok(packets_found) +} + +#[cfg(test)] +mod tests { + use spacepackets::{ + ecss::{tc::PusTcCreator, SerializablePusPacket}, + PacketId, SpHeader, + }; + + use crate::encoding::tests::TcCacher; + + use super::parse_buffer_for_ccsds_space_packets; + + const TEST_APID_0: u16 = 0x02; + const TEST_APID_1: u16 = 0x10; + const TEST_PACKET_ID_0: PacketId = PacketId::const_tc(true, TEST_APID_0); + const TEST_PACKET_ID_1: PacketId = PacketId::const_tc(true, TEST_APID_1); + + #[test] + fn test_basic() { + let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap(); + let ping_tc = PusTcCreator::new_simple(&mut sph, 17, 1, None, true); + let mut buffer: [u8; 32] = [0; 32]; + let packet_len = ping_tc + .write_to_bytes(&mut buffer) + .expect("writing packet failed"); + let valid_packet_ids = [TEST_PACKET_ID_0]; + let mut tc_cacher = TcCacher::default(); + let mut next_write_idx = 0; + let parse_result = parse_buffer_for_ccsds_space_packets( + &mut buffer, + valid_packet_ids.as_slice(), + &mut tc_cacher, + &mut next_write_idx, + ); + assert!(parse_result.is_ok()); + let parsed_packets = parse_result.unwrap(); + assert_eq!(parsed_packets, 1); + assert_eq!(tc_cacher.tc_queue.len(), 1); + assert_eq!( + tc_cacher.tc_queue.pop_front().unwrap(), + buffer[..packet_len] + ); + } + + #[test] + fn test_multi_packet() { + let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap(); + let ping_tc = PusTcCreator::new_simple(&mut sph, 17, 1, None, true); + let action_tc = PusTcCreator::new_simple(&mut sph, 8, 0, None, true); + let mut buffer: [u8; 32] = [0; 32]; + let packet_len_ping = ping_tc + .write_to_bytes(&mut buffer) + .expect("writing packet failed"); + let packet_len_action = action_tc + .write_to_bytes(&mut buffer[packet_len_ping..]) + .expect("writing packet failed"); + let valid_packet_ids = [TEST_PACKET_ID_0]; + let mut tc_cacher = TcCacher::default(); + let mut next_write_idx = 0; + let parse_result = parse_buffer_for_ccsds_space_packets( + &mut buffer, + valid_packet_ids.as_slice(), + &mut tc_cacher, + &mut next_write_idx, + ); + assert!(parse_result.is_ok()); + let parsed_packets = parse_result.unwrap(); + assert_eq!(parsed_packets, 2); + assert_eq!(tc_cacher.tc_queue.len(), 2); + assert_eq!( + tc_cacher.tc_queue.pop_front().unwrap(), + buffer[..packet_len_ping] + ); + assert_eq!( + tc_cacher.tc_queue.pop_front().unwrap(), + buffer[packet_len_ping..packet_len_ping + packet_len_action] + ); + } + + #[test] + fn test_multi_apid() { + let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap(); + let ping_tc = PusTcCreator::new_simple(&mut sph, 17, 1, None, true); + sph = SpHeader::tc_unseg(TEST_APID_1, 0, 0).unwrap(); + let action_tc = PusTcCreator::new_simple(&mut sph, 8, 0, None, true); + let mut buffer: [u8; 32] = [0; 32]; + let packet_len_ping = ping_tc + .write_to_bytes(&mut buffer) + .expect("writing packet failed"); + let packet_len_action = action_tc + .write_to_bytes(&mut buffer[packet_len_ping..]) + .expect("writing packet failed"); + let valid_packet_ids = [TEST_PACKET_ID_0, TEST_PACKET_ID_1]; + let mut tc_cacher = TcCacher::default(); + let mut next_write_idx = 0; + let parse_result = parse_buffer_for_ccsds_space_packets( + &mut buffer, + valid_packet_ids.as_slice(), + &mut tc_cacher, + &mut next_write_idx, + ); + assert!(parse_result.is_ok()); + let parsed_packets = parse_result.unwrap(); + assert_eq!(parsed_packets, 2); + assert_eq!(tc_cacher.tc_queue.len(), 2); + assert_eq!( + tc_cacher.tc_queue.pop_front().unwrap(), + buffer[..packet_len_ping] + ); + assert_eq!( + tc_cacher.tc_queue.pop_front().unwrap(), + buffer[packet_len_ping..packet_len_ping + packet_len_action] + ); + } + + #[test] + fn test_split_packet_multi() { + let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap(); + let ping_tc = PusTcCreator::new_simple(&mut sph, 17, 1, None, true); + sph = SpHeader::tc_unseg(TEST_APID_1, 0, 0).unwrap(); + let action_tc = PusTcCreator::new_simple(&mut sph, 8, 0, None, true); + let mut buffer: [u8; 32] = [0; 32]; + let packet_len_ping = ping_tc + .write_to_bytes(&mut buffer) + .expect("writing packet failed"); + let packet_len_action = action_tc + .write_to_bytes(&mut buffer[packet_len_ping..]) + .expect("writing packet failed"); + let valid_packet_ids = [TEST_PACKET_ID_0, TEST_PACKET_ID_1]; + let mut tc_cacher = TcCacher::default(); + let mut next_write_idx = 0; + let parse_result = parse_buffer_for_ccsds_space_packets( + &mut buffer[..packet_len_ping + packet_len_action - 4], + valid_packet_ids.as_slice(), + &mut tc_cacher, + &mut next_write_idx, + ); + assert!(parse_result.is_ok()); + let parsed_packets = parse_result.unwrap(); + assert_eq!(parsed_packets, 1); + assert_eq!(tc_cacher.tc_queue.len(), 1); + // The broken packet was moved to the start, so the next write index should be after the + // last segment missing 4 bytes. + assert_eq!(next_write_idx, packet_len_action - 4); + } + + #[test] + fn test_one_split_packet() { + let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap(); + let ping_tc = PusTcCreator::new_simple(&mut sph, 17, 1, None, true); + let mut buffer: [u8; 32] = [0; 32]; + let packet_len_ping = ping_tc + .write_to_bytes(&mut buffer) + .expect("writing packet failed"); + let valid_packet_ids = [TEST_PACKET_ID_0, TEST_PACKET_ID_1]; + let mut tc_cacher = TcCacher::default(); + let mut next_write_idx = 0; + let parse_result = parse_buffer_for_ccsds_space_packets( + &mut buffer[..packet_len_ping - 4], + valid_packet_ids.as_slice(), + &mut tc_cacher, + &mut next_write_idx, + ); + assert_eq!(next_write_idx, 0); + assert!(parse_result.is_ok()); + let parsed_packets = parse_result.unwrap(); + assert_eq!(parsed_packets, 0); + assert_eq!(tc_cacher.tc_queue.len(), 0); + } +} diff --git a/satrs-core/src/encoding/cobs.rs b/satrs-core/src/encoding/cobs.rs new file mode 100644 index 0000000..2645745 --- /dev/null +++ b/satrs-core/src/encoding/cobs.rs @@ -0,0 +1,263 @@ +use crate::tmtc::ReceivesTcCore; +use cobs::{decode_in_place, encode, max_encoding_length}; + +/// This function encodes the given packet with COBS and also wraps the encoded packet with +/// the sentinel value 0. It can be used repeatedly on the same encoded buffer by expecting +/// and incrementing the mutable reference of the current packet index. This is also used +/// to retrieve the total encoded size. +/// +/// This function will return [false] if the given encoding buffer is not large enough to hold +/// the encoded buffer and the two sentinel bytes and [true] if the encoding was successfull. +/// +/// ## Example +/// +/// ``` +/// use cobs::decode_in_place_report; +/// use satrs_core::encoding::{encode_packet_with_cobs}; +// +/// const SIMPLE_PACKET: [u8; 5] = [1, 2, 3, 4, 5]; +/// const INVERTED_PACKET: [u8; 5] = [5, 4, 3, 2, 1]; +/// +/// let mut encoding_buf: [u8; 32] = [0; 32]; +/// let mut current_idx = 0; +/// assert!(encode_packet_with_cobs(&SIMPLE_PACKET, &mut encoding_buf, &mut current_idx)); +/// assert!(encode_packet_with_cobs(&INVERTED_PACKET, &mut encoding_buf, &mut current_idx)); +/// assert_eq!(encoding_buf[0], 0); +/// let dec_report = decode_in_place_report(&mut encoding_buf[1..]).expect("decoding failed"); +/// assert_eq!(encoding_buf[1 + dec_report.src_used], 0); +/// assert_eq!(dec_report.dst_used, 5); +/// assert_eq!(current_idx, 16); +/// ``` +pub fn encode_packet_with_cobs( + packet: &[u8], + encoded_buf: &mut [u8], + current_idx: &mut usize, +) -> bool { + let max_encoding_len = max_encoding_length(packet.len()); + if *current_idx + max_encoding_len + 2 > encoded_buf.len() { + return false; + } + encoded_buf[*current_idx] = 0; + *current_idx += 1; + *current_idx += encode(packet, &mut encoded_buf[*current_idx..]); + encoded_buf[*current_idx] = 0; + *current_idx += 1; + true +} + +/// This function parses a given buffer for COBS encoded packets. The packet structure is +/// expected to be like this, assuming a sentinel value of 0 as the packet delimiter: +/// +/// 0 | ... Encoded Packet Data ... | 0 | 0 | ... Encoded Packet Data ... | 0 +/// +/// This function is also able to deal with broken tail packets at the end. If broken tail +/// packets are detected, they are moved to the front of the buffer, and the write index for +/// future write operations will be written to the `next_write_idx` argument. +/// +/// The parser will write all packets which were decoded successfully to the given `tc_receiver`. +pub fn parse_buffer_for_cobs_encoded_packets( + buf: &mut [u8], + tc_receiver: &mut dyn ReceivesTcCore, + next_write_idx: &mut usize, +) -> Result { + let mut start_index_packet = 0; + let mut start_found = false; + let mut last_byte = false; + let mut packets_found = 0; + for i in 0..buf.len() { + if i == buf.len() - 1 { + last_byte = true; + } + if buf[i] == 0 { + if !start_found && !last_byte && buf[i + 1] == 0 { + // Special case: Consecutive sentinel values or all zeroes. + // Skip. + continue; + } + if start_found { + let decode_result = decode_in_place(&mut buf[start_index_packet..i]); + if let Ok(packet_len) = decode_result { + packets_found += 1; + tc_receiver + .pass_tc(&buf[start_index_packet..start_index_packet + packet_len])?; + } + start_found = false; + } else { + start_index_packet = i + 1; + start_found = true; + } + } + } + // Move split frame at the end to the front of the buffer. + if start_index_packet > 0 && start_found && packets_found > 0 { + buf.copy_within(start_index_packet - 1.., 0); + *next_write_idx = buf.len() - start_index_packet + 1; + } + Ok(packets_found) +} + +#[cfg(test)] +pub(crate) mod tests { + use cobs::encode; + + use crate::encoding::tests::{encode_simple_packet, TcCacher, INVERTED_PACKET, SIMPLE_PACKET}; + + use super::parse_buffer_for_cobs_encoded_packets; + + #[test] + fn test_parsing_simple_packet() { + let mut test_sender = TcCacher::default(); + let mut encoded_buf: [u8; 16] = [0; 16]; + let mut current_idx = 0; + encode_simple_packet(&mut encoded_buf, &mut current_idx); + let mut next_read_idx = 0; + let packets = parse_buffer_for_cobs_encoded_packets( + &mut encoded_buf[0..current_idx], + &mut test_sender, + &mut next_read_idx, + ) + .unwrap(); + assert_eq!(packets, 1); + assert_eq!(test_sender.tc_queue.len(), 1); + let packet = &test_sender.tc_queue[0]; + assert_eq!(packet, &SIMPLE_PACKET); + } + + #[test] + fn test_parsing_consecutive_packets() { + let mut test_sender = TcCacher::default(); + let mut encoded_buf: [u8; 16] = [0; 16]; + let mut current_idx = 0; + encode_simple_packet(&mut encoded_buf, &mut current_idx); + + // Second packet + encoded_buf[current_idx] = 0; + current_idx += 1; + current_idx += encode(&INVERTED_PACKET, &mut encoded_buf[current_idx..]); + encoded_buf[current_idx] = 0; + current_idx += 1; + let mut next_read_idx = 0; + let packets = parse_buffer_for_cobs_encoded_packets( + &mut encoded_buf[0..current_idx], + &mut test_sender, + &mut next_read_idx, + ) + .unwrap(); + assert_eq!(packets, 2); + assert_eq!(test_sender.tc_queue.len(), 2); + let packet0 = &test_sender.tc_queue[0]; + assert_eq!(packet0, &SIMPLE_PACKET); + let packet1 = &test_sender.tc_queue[1]; + assert_eq!(packet1, &INVERTED_PACKET); + } + + #[test] + fn test_split_tail_packet_only() { + let mut test_sender = TcCacher::default(); + let mut encoded_buf: [u8; 16] = [0; 16]; + let mut current_idx = 0; + encode_simple_packet(&mut encoded_buf, &mut current_idx); + let mut next_read_idx = 0; + let packets = parse_buffer_for_cobs_encoded_packets( + // Cut off the sentinel byte at the end. + &mut encoded_buf[0..current_idx - 1], + &mut test_sender, + &mut next_read_idx, + ) + .unwrap(); + assert_eq!(packets, 0); + assert_eq!(test_sender.tc_queue.len(), 0); + assert_eq!(next_read_idx, 0); + } + + fn generic_test_split_packet(cut_off: usize) { + let mut test_sender = TcCacher::default(); + let mut encoded_buf: [u8; 16] = [0; 16]; + assert!(cut_off < INVERTED_PACKET.len() + 1); + let mut current_idx = 0; + encode_simple_packet(&mut encoded_buf, &mut current_idx); + // Second packet + encoded_buf[current_idx] = 0; + let packet_start = current_idx; + current_idx += 1; + let encoded_len = encode(&INVERTED_PACKET, &mut encoded_buf[current_idx..]); + assert_eq!(encoded_len, 6); + current_idx += encoded_len; + // We cut off the sentinel byte, so we expecte the write index to be the length of the + // packet minus the sentinel byte plus the first sentinel byte. + let next_expected_write_idx = 1 + encoded_len - cut_off + 1; + encoded_buf[current_idx] = 0; + current_idx += 1; + let mut next_write_idx = 0; + let expected_at_start = encoded_buf[packet_start..current_idx - cut_off].to_vec(); + let packets = parse_buffer_for_cobs_encoded_packets( + // Cut off the sentinel byte at the end. + &mut encoded_buf[0..current_idx - cut_off], + &mut test_sender, + &mut next_write_idx, + ) + .unwrap(); + assert_eq!(packets, 1); + assert_eq!(test_sender.tc_queue.len(), 1); + assert_eq!(&test_sender.tc_queue[0], &SIMPLE_PACKET); + assert_eq!(next_write_idx, next_expected_write_idx); + assert_eq!(encoded_buf[..next_expected_write_idx], expected_at_start); + } + + #[test] + fn test_one_packet_and_split_tail_packet_0() { + generic_test_split_packet(1); + } + + #[test] + fn test_one_packet_and_split_tail_packet_1() { + generic_test_split_packet(2); + } + + #[test] + fn test_one_packet_and_split_tail_packet_2() { + generic_test_split_packet(3); + } + + #[test] + fn test_zero_at_end() { + let mut test_sender = TcCacher::default(); + let mut encoded_buf: [u8; 16] = [0; 16]; + let mut next_write_idx = 0; + let mut current_idx = 0; + encoded_buf[current_idx] = 5; + current_idx += 1; + encode_simple_packet(&mut encoded_buf, &mut current_idx); + encoded_buf[current_idx] = 0; + current_idx += 1; + let packets = parse_buffer_for_cobs_encoded_packets( + // Cut off the sentinel byte at the end. + &mut encoded_buf[0..current_idx], + &mut test_sender, + &mut next_write_idx, + ) + .unwrap(); + assert_eq!(packets, 1); + assert_eq!(test_sender.tc_queue.len(), 1); + assert_eq!(&test_sender.tc_queue[0], &SIMPLE_PACKET); + assert_eq!(next_write_idx, 1); + assert_eq!(encoded_buf[0], 0); + } + + #[test] + fn test_all_zeroes() { + let mut test_sender = TcCacher::default(); + let mut all_zeroes: [u8; 5] = [0; 5]; + let mut next_write_idx = 0; + let packets = parse_buffer_for_cobs_encoded_packets( + // Cut off the sentinel byte at the end. + &mut all_zeroes, + &mut test_sender, + &mut next_write_idx, + ) + .unwrap(); + assert_eq!(packets, 0); + assert!(test_sender.tc_queue.is_empty()); + assert_eq!(next_write_idx, 0); + } +} diff --git a/satrs-core/src/encoding/mod.rs b/satrs-core/src/encoding/mod.rs new file mode 100644 index 0000000..94e3dee --- /dev/null +++ b/satrs-core/src/encoding/mod.rs @@ -0,0 +1,40 @@ +pub mod ccsds; +pub mod cobs; + +pub use crate::encoding::ccsds::parse_buffer_for_ccsds_space_packets; +pub use crate::encoding::cobs::{encode_packet_with_cobs, parse_buffer_for_cobs_encoded_packets}; + +#[cfg(test)] +pub(crate) mod tests { + use alloc::{collections::VecDeque, vec::Vec}; + + use crate::tmtc::ReceivesTcCore; + + use super::cobs::encode_packet_with_cobs; + + pub(crate) const SIMPLE_PACKET: [u8; 5] = [1, 2, 3, 4, 5]; + pub(crate) const INVERTED_PACKET: [u8; 5] = [5, 4, 3, 2, 1]; + + #[derive(Default)] + pub(crate) struct TcCacher { + pub(crate) tc_queue: VecDeque>, + } + + impl ReceivesTcCore for TcCacher { + type Error = (); + + fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> { + self.tc_queue.push_back(tc_raw.to_vec()); + Ok(()) + } + } + + pub(crate) fn encode_simple_packet(encoded_buf: &mut [u8], current_idx: &mut usize) { + encode_packet_with_cobs(&SIMPLE_PACKET, encoded_buf, current_idx); + } + + #[allow(dead_code)] + pub(crate) fn encode_inverted_packet(encoded_buf: &mut [u8], current_idx: &mut usize) { + encode_packet_with_cobs(&INVERTED_PACKET, encoded_buf, current_idx); + } +} diff --git a/satrs-core/src/hal/host/mod.rs b/satrs-core/src/hal/host/mod.rs deleted file mode 100644 index 8057db1..0000000 --- a/satrs-core/src/hal/host/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -//! Helper modules intended to be used on hosts with a full [std] runtime -pub mod udp_server; diff --git a/satrs-core/src/hal/mod.rs b/satrs-core/src/hal/mod.rs index c422a72..b6ab984 100644 --- a/satrs-core/src/hal/mod.rs +++ b/satrs-core/src/hal/mod.rs @@ -1,4 +1,4 @@ //! # Hardware Abstraction Layer module #[cfg(feature = "std")] #[cfg_attr(doc_cfg, doc(cfg(feature = "std")))] -pub mod host; +pub mod std; diff --git a/satrs-core/src/hal/std/mod.rs b/satrs-core/src/hal/std/mod.rs new file mode 100644 index 0000000..17ec012 --- /dev/null +++ b/satrs-core/src/hal/std/mod.rs @@ -0,0 +1,6 @@ +//! Helper modules intended to be used on systems with a full [std] runtime. +pub mod tcp_server; +pub mod udp_server; + +mod tcp_spacepackets_server; +mod tcp_with_cobs_server; diff --git a/satrs-core/src/hal/std/tcp_server.rs b/satrs-core/src/hal/std/tcp_server.rs new file mode 100644 index 0000000..dcccf66 --- /dev/null +++ b/satrs-core/src/hal/std/tcp_server.rs @@ -0,0 +1,319 @@ +//! Generic TCP TMTC servers with different TMTC format flavours. +use alloc::vec; +use alloc::{boxed::Box, vec::Vec}; +use core::time::Duration; +use socket2::{Domain, Socket, Type}; +use std::io::Read; +use std::net::TcpListener; +use std::net::{SocketAddr, TcpStream}; +use std::thread; + +use crate::tmtc::{ReceivesTc, TmPacketSource}; +use thiserror::Error; + +// Re-export the TMTC in COBS server. +pub use crate::hal::std::tcp_with_cobs_server::{CobsTcParser, CobsTmSender, TcpTmtcInCobsServer}; + +/// Configuration struct for the generic TCP TMTC server +/// +/// ## Parameters +/// +/// * `addr` - Address of the TCP server. +/// * `inner_loop_delay` - If a client connects for a longer period, but no TC is received or +/// no TM needs to be sent, the TCP server will delay for the specified amount of time +/// to reduce CPU load. +/// * `tm_buffer_size` - Size of the TM buffer used to read TM from the [TmPacketSource] and +/// encoding of that data. This buffer should at large enough to hold the maximum expected +/// TM size read from the packet source. +/// * `tc_buffer_size` - Size of the TC buffer used to read encoded telecommands sent from +/// the client. It is recommended to make this buffer larger to allow reading multiple +/// consecutive packets as well, for example by using common buffer sizes like 4096 or 8192 +/// byte. The buffer should at the very least be large enough to hold the maximum expected +/// telecommand size. +/// * `reuse_addr` - Can be used to set the `SO_REUSEADDR` option on the raw socket. This is +/// especially useful if the address and port are static for the server. Set to false by +/// default. +/// * `reuse_port` - Can be used to set the `SO_REUSEPORT` option on the raw socket. This is +/// especially useful if the address and port are static for the server. Set to false by +/// default. +#[derive(Debug, Copy, Clone)] +pub struct ServerConfig { + pub addr: SocketAddr, + pub inner_loop_delay: Duration, + pub tm_buffer_size: usize, + pub tc_buffer_size: usize, + pub reuse_addr: bool, + pub reuse_port: bool, +} + +impl ServerConfig { + pub fn new( + addr: SocketAddr, + inner_loop_delay: Duration, + tm_buffer_size: usize, + tc_buffer_size: usize, + ) -> Self { + Self { + addr, + inner_loop_delay, + tm_buffer_size, + tc_buffer_size, + reuse_addr: false, + reuse_port: false, + } + } +} + +#[derive(Error, Debug)] +pub enum TcpTmtcError { + #[error("TM retrieval error: {0}")] + TmError(TmError), + #[error("TC retrieval error: {0}")] + TcError(TcError), + #[error("io error: {0}")] + Io(#[from] std::io::Error), +} + +/// Result of one connection attempt. Contains the client address if a connection was established, +/// in addition to the number of telecommands and telemetry packets exchanged. +#[derive(Debug, Default)] +pub struct ConnectionResult { + pub addr: Option, + pub num_received_tcs: u32, + pub num_sent_tms: u32, +} + +/// Generic parser abstraction for an object which can parse for telecommands given a raw +/// bytestream received from a TCP socket and send them to a generic [ReceivesTc] telecommand +/// receiver. This allows different encoding schemes for telecommands. +pub trait TcpTcParser { + fn handle_tc_parsing( + &mut self, + tc_buffer: &mut [u8], + tc_receiver: &mut dyn ReceivesTc, + conn_result: &mut ConnectionResult, + current_write_idx: usize, + next_write_idx: &mut usize, + ) -> Result<(), TcpTmtcError>; +} + +/// Generic sender abstraction for an object which can pull telemetry from a given TM source +/// using a [TmPacketSource] and then send them back to a client using a given [TcpStream]. +/// The concrete implementation can also perform any encoding steps which are necessary before +/// sending back the data to a client. +pub trait TcpTmSender { + fn handle_tm_sending( + &mut self, + tm_buffer: &mut [u8], + tm_source: &mut dyn TmPacketSource, + conn_result: &mut ConnectionResult, + stream: &mut TcpStream, + ) -> Result>; +} + +/// TCP TMTC server implementation for exchange of generic TMTC packets in a generic way which +/// stays agnostic to the encoding scheme and format used for both telecommands and telemetry. +/// +/// This server implements a generic TMTC handling logic and allows modifying its behaviour +/// through the following 4 core abstractions: +/// +/// 1. [TcpTcParser] to parse for telecommands from the raw bytestream received from a client. +/// 2. Parsed telecommands will be sent to the [ReceivesTc] telecommand receiver. +/// 3. [TcpTmSender] to send telemetry pulled from a TM source back to the client. +/// 4. [TmPacketSource] as a generic TM source used by the [TcpTmSender]. +/// +/// It is possible to specify custom abstractions to build a dedicated TCP TMTC server without +/// having to re-implement common logic. +/// +/// Currently, this framework offers the following concrete implementations: +/// +/// 1. [TcpTmtcInCobsServer] to exchange TMTC wrapped inside the COBS framing protocol. +pub struct TcpTmtcGenericServer< + TmError, + TcError, + TmHandler: TcpTmSender, + TcHandler: TcpTcParser, +> { + base: TcpTmtcServerBase, + tc_handler: TcHandler, + tm_handler: TmHandler, +} + +impl< + TmError: 'static, + TcError: 'static, + TmSender: TcpTmSender, + TcParser: TcpTcParser, + > TcpTmtcGenericServer +{ + /// Create a new generic TMTC server instance. + /// + /// ## Parameter + /// + /// * `cfg` - Configuration of the server. + /// * `tc_parser` - Parser which extracts telecommands from the raw bytestream received from + /// the client. + /// * `tm_sender` - Sends back telemetry to the client using the specified TM source. + /// * `tm_source` - Generic TM source used by the server to pull telemetry packets which are + /// then sent back to the client. + /// * `tc_receiver` - Any received telecommand which was decoded successfully will be forwarded + /// to this TC receiver. + pub fn new( + cfg: ServerConfig, + tc_parser: TcParser, + tm_sender: TmSender, + tm_source: Box>, + tc_receiver: Box>, + ) -> Result, std::io::Error> { + Ok(Self { + base: TcpTmtcServerBase::new(cfg, tm_source, tc_receiver)?, + tc_handler: tc_parser, + tm_handler: tm_sender, + }) + } + + /// Retrieve the internal [TcpListener] class. + pub fn listener(&mut self) -> &mut TcpListener { + self.base.listener() + } + + /// Can be used to retrieve the local assigned address of the TCP server. This is especially + /// useful if using the port number 0 for OS auto-assignment. + pub fn local_addr(&self) -> std::io::Result { + self.base.local_addr() + } + + /// This call is used to handle the next connection to a client. Right now, it performs + /// the following steps: + /// + /// 1. It calls the [std::net::TcpListener::accept] method internally using the blocking API + /// until a client connects. + /// 2. It reads all the telecommands from the client and parses all received data using the + /// user specified [TcpTcParser]. + /// 3. After reading and parsing all telecommands, it sends back all telemetry using the + /// user specified [TcpTmSender]. + /// + /// The server will delay for a user-specified period if the client connects to the server + /// for prolonged periods and there is no traffic for the server. This is the case if the + /// client does not send any telecommands and no telemetry needs to be sent back to the client. + pub fn handle_next_connection( + &mut self, + ) -> Result> { + let mut connection_result = ConnectionResult::default(); + let mut current_write_idx; + let mut next_write_idx = 0; + let (mut stream, addr) = self.base.listener.accept()?; + stream.set_nonblocking(true)?; + connection_result.addr = Some(addr); + current_write_idx = next_write_idx; + loop { + let read_result = stream.read(&mut self.base.tc_buffer[current_write_idx..]); + match read_result { + Ok(0) => { + // Connection closed by client. If any TC was read, parse for complete packets. + // After that, break the outer loop. + if current_write_idx > 0 { + self.tc_handler.handle_tc_parsing( + &mut self.base.tc_buffer, + self.base.tc_receiver.as_mut(), + &mut connection_result, + current_write_idx, + &mut next_write_idx, + )?; + } + break; + } + Ok(read_len) => { + current_write_idx += read_len; + // TC buffer is full, we must parse for complete packets now. + if current_write_idx == self.base.tc_buffer.capacity() { + self.tc_handler.handle_tc_parsing( + &mut self.base.tc_buffer, + self.base.tc_receiver.as_mut(), + &mut connection_result, + current_write_idx, + &mut next_write_idx, + )?; + current_write_idx = next_write_idx; + } + } + Err(e) => match e.kind() { + // As per [TcpStream::set_read_timeout] documentation, this should work for + // both UNIX and Windows. + std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut => { + self.tc_handler.handle_tc_parsing( + &mut self.base.tc_buffer, + self.base.tc_receiver.as_mut(), + &mut connection_result, + current_write_idx, + &mut next_write_idx, + )?; + current_write_idx = next_write_idx; + + if !self.tm_handler.handle_tm_sending( + &mut self.base.tm_buffer, + self.base.tm_source.as_mut(), + &mut connection_result, + &mut stream, + )? { + // No TC read, no TM was sent, but the client has not disconnected. + // Perform an inner delay to avoid burning CPU time. + thread::sleep(self.base.inner_loop_delay); + } + } + _ => { + return Err(TcpTmtcError::Io(e)); + } + }, + } + } + self.tm_handler.handle_tm_sending( + &mut self.base.tm_buffer, + self.base.tm_source.as_mut(), + &mut connection_result, + &mut stream, + )?; + Ok(connection_result) + } +} + +pub(crate) struct TcpTmtcServerBase { + pub(crate) listener: TcpListener, + pub(crate) inner_loop_delay: Duration, + pub(crate) tm_source: Box>, + pub(crate) tm_buffer: Vec, + pub(crate) tc_receiver: Box>, + pub(crate) tc_buffer: Vec, +} + +impl TcpTmtcServerBase { + pub(crate) fn new( + cfg: ServerConfig, + tm_source: Box>, + tc_receiver: Box>, + ) -> Result { + // Create a TCP listener bound to two addresses. + let socket = Socket::new(Domain::IPV4, Type::STREAM, None)?; + socket.set_reuse_address(cfg.reuse_addr)?; + socket.set_reuse_port(cfg.reuse_port)?; + let addr = (cfg.addr).into(); + socket.bind(&addr)?; + socket.listen(128)?; + Ok(Self { + listener: socket.into(), + inner_loop_delay: cfg.inner_loop_delay, + tm_source, + tm_buffer: vec![0; cfg.tm_buffer_size], + tc_receiver, + tc_buffer: vec![0; cfg.tc_buffer_size], + }) + } + + pub(crate) fn listener(&mut self) -> &mut TcpListener { + &mut self.listener + } + + pub(crate) fn local_addr(&self) -> std::io::Result { + self.listener.local_addr() + } +} diff --git a/satrs-core/src/hal/std/tcp_spacepackets_server.rs b/satrs-core/src/hal/std/tcp_spacepackets_server.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/satrs-core/src/hal/std/tcp_spacepackets_server.rs @@ -0,0 +1 @@ + diff --git a/satrs-core/src/hal/std/tcp_with_cobs_server.rs b/satrs-core/src/hal/std/tcp_with_cobs_server.rs new file mode 100644 index 0000000..fc44ebf --- /dev/null +++ b/satrs-core/src/hal/std/tcp_with_cobs_server.rs @@ -0,0 +1,415 @@ +use alloc::boxed::Box; +use alloc::vec; +use cobs::encode; +use delegate::delegate; +use std::io::Write; +use std::net::SocketAddr; +use std::net::TcpListener; +use std::net::TcpStream; +use std::vec::Vec; + +use crate::encoding::parse_buffer_for_cobs_encoded_packets; +use crate::tmtc::ReceivesTc; +use crate::tmtc::TmPacketSource; + +use crate::hal::std::tcp_server::{ + ConnectionResult, ServerConfig, TcpTcParser, TcpTmSender, TcpTmtcError, TcpTmtcGenericServer, +}; + +/// Concrete [TcpTcParser] implementation for the [TcpTmtcInCobsServer]. +#[derive(Default)] +pub struct CobsTcParser {} + +impl TcpTcParser for CobsTcParser { + fn handle_tc_parsing( + &mut self, + tc_buffer: &mut [u8], + tc_receiver: &mut dyn ReceivesTc, + conn_result: &mut ConnectionResult, + current_write_idx: usize, + next_write_idx: &mut usize, + ) -> Result<(), TcpTmtcError> { + // Reader vec full, need to parse for packets. + conn_result.num_received_tcs += parse_buffer_for_cobs_encoded_packets( + &mut tc_buffer[..current_write_idx], + tc_receiver.upcast_mut(), + next_write_idx, + ) + .map_err(|e| TcpTmtcError::TcError(e))?; + Ok(()) + } +} + +/// Concrete [TcpTmSender] implementation for the [TcpTmtcInCobsServer]. +pub struct CobsTmSender { + tm_encoding_buffer: Vec, +} + +impl CobsTmSender { + fn new(tm_buffer_size: usize) -> Self { + Self { + // The buffer should be large enough to hold the maximum expected TM size encoded with + // COBS. + tm_encoding_buffer: vec![0; cobs::max_encoding_length(tm_buffer_size)], + } + } +} + +impl TcpTmSender for CobsTmSender { + fn handle_tm_sending( + &mut self, + tm_buffer: &mut [u8], + tm_source: &mut dyn TmPacketSource, + conn_result: &mut ConnectionResult, + stream: &mut TcpStream, + ) -> Result> { + let mut tm_was_sent = false; + loop { + // Write TM until TM source is exhausted. For now, there is no limit for the amount + // of TM written this way. + let read_tm_len = tm_source + .retrieve_packet(tm_buffer) + .map_err(|e| TcpTmtcError::TmError(e))?; + + if read_tm_len == 0 { + return Ok(tm_was_sent); + } + tm_was_sent = true; + conn_result.num_sent_tms += 1; + + // Encode into COBS and sent to client. + let mut current_idx = 0; + self.tm_encoding_buffer[current_idx] = 0; + current_idx += 1; + current_idx += encode( + &tm_buffer[..read_tm_len], + &mut self.tm_encoding_buffer[current_idx..], + ); + self.tm_encoding_buffer[current_idx] = 0; + current_idx += 1; + stream.write_all(&self.tm_encoding_buffer[..current_idx])?; + } + } +} + +/// TCP TMTC server implementation for exchange of generic TMTC packets which are framed with the +/// [COBS protocol](https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing). +/// +/// Telemetry will be encoded with the COBS protocol using [cobs::encode] in addition to being +/// wrapped with the sentinel value 0 as the packet delimiter as well before being sent back to +/// the client. Please note that the server will send as much data as it can retrieve from the +/// [TmPacketSource] in its current implementation. +/// +/// Using a framing protocol like COBS imposes minimal restrictions on the type of TMTC data +/// exchanged while also allowing packets with flexible size and a reliable way to reconstruct full +/// packets even from a data stream which is split up. The server wil use the +/// [parse_buffer_for_cobs_encoded_packets] function to parse for packets and pass them to a +/// generic TC receiver. The user can use [crate::encoding::encode_packet_with_cobs] to encode +/// telecommands sent to the server. +/// +/// ## Example +/// +/// The [TCP COBS integration](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-core/tests/tcp_server_cobs.rs) +/// test also serves as the example application for this module. +pub struct TcpTmtcInCobsServer { + generic_server: TcpTmtcGenericServer, +} + +impl TcpTmtcInCobsServer { + /// Create a new TCP TMTC server which exchanges TMTC packets encoded with + /// [COBS protocol](https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing). + /// + /// ## Parameter + /// + /// * `cfg` - Configuration of the server. + /// * `tm_source` - Generic TM source used by the server to pull telemetry packets which are + /// then sent back to the client. + /// * `tc_receiver` - Any received telecommands which were decoded successfully will be + /// forwarded to this TC receiver. + pub fn new( + cfg: ServerConfig, + tm_source: Box>, + tc_receiver: Box>, + ) -> Result> { + Ok(Self { + generic_server: TcpTmtcGenericServer::new( + cfg, + CobsTcParser::default(), + CobsTmSender::new(cfg.tm_buffer_size), + tm_source, + tc_receiver, + )?, + }) + } + + delegate! { + to self.generic_server { + pub fn listener(&mut self) -> &mut TcpListener; + + /// Can be used to retrieve the local assigned address of the TCP server. This is especially + /// useful if using the port number 0 for OS auto-assignment. + pub fn local_addr(&self) -> std::io::Result; + + /// Delegation to the [TcpTmtcGenericServer::handle_next_connection] call. + pub fn handle_next_connection( + &mut self, + ) -> Result>; + } + } +} + +#[cfg(test)] +mod tests { + use core::{ + sync::atomic::{AtomicBool, Ordering}, + time::Duration, + }; + use std::{ + io::{Read, Write}, + net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream}, + sync::Mutex, + thread, + }; + + use crate::{ + encoding::tests::{INVERTED_PACKET, SIMPLE_PACKET}, + hal::std::tcp_server::ServerConfig, + tmtc::{ReceivesTcCore, TmPacketSourceCore}, + }; + use alloc::{boxed::Box, collections::VecDeque, sync::Arc, vec::Vec}; + use cobs::encode; + + use super::TcpTmtcInCobsServer; + + #[derive(Default, Clone)] + struct SyncTcCacher { + tc_queue: Arc>>>, + } + impl ReceivesTcCore for SyncTcCacher { + type Error = (); + + fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> { + let mut tc_queue = self.tc_queue.lock().expect("tc forwarder failed"); + tc_queue.push_back(tc_raw.to_vec()); + Ok(()) + } + } + + #[derive(Default, Clone)] + struct SyncTmSource { + tm_queue: Arc>>>, + } + + impl SyncTmSource { + pub(crate) fn add_tm(&mut self, tm: &[u8]) { + let mut tm_queue = self.tm_queue.lock().expect("locking tm queue failec"); + tm_queue.push_back(tm.to_vec()); + } + } + + impl TmPacketSourceCore for SyncTmSource { + type Error = (); + + fn retrieve_packet(&mut self, buffer: &mut [u8]) -> Result { + let mut tm_queue = self.tm_queue.lock().expect("locking tm queue failed"); + if !tm_queue.is_empty() { + let next_vec = tm_queue.front().unwrap(); + if buffer.len() < next_vec.len() { + panic!( + "provided buffer too small, must be at least {} bytes", + next_vec.len() + ); + } + let next_vec = tm_queue.pop_front().unwrap(); + buffer[0..next_vec.len()].copy_from_slice(&next_vec); + return Ok(next_vec.len()); + } + Ok(0) + } + } + + fn encode_simple_packet(encoded_buf: &mut [u8], current_idx: &mut usize) { + encode_packet(&SIMPLE_PACKET, encoded_buf, current_idx) + } + + fn encode_inverted_packet(encoded_buf: &mut [u8], current_idx: &mut usize) { + encode_packet(&INVERTED_PACKET, encoded_buf, current_idx) + } + + fn encode_packet(packet: &[u8], encoded_buf: &mut [u8], current_idx: &mut usize) { + encoded_buf[*current_idx] = 0; + *current_idx += 1; + *current_idx += encode(packet, &mut encoded_buf[*current_idx..]); + encoded_buf[*current_idx] = 0; + *current_idx += 1; + } + + fn generic_tmtc_server( + addr: &SocketAddr, + tc_receiver: SyncTcCacher, + tm_source: SyncTmSource, + ) -> TcpTmtcInCobsServer<(), ()> { + TcpTmtcInCobsServer::new( + ServerConfig::new(*addr, Duration::from_millis(2), 1024, 1024), + Box::new(tm_source), + Box::new(tc_receiver), + ) + .expect("TCP server generation failed") + } + + #[test] + fn test_server_basic_no_tm() { + let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0); + let tc_receiver = SyncTcCacher::default(); + let tm_source = SyncTmSource::default(); + let mut tcp_server = generic_tmtc_server(&auto_port_addr, tc_receiver.clone(), tm_source); + let dest_addr = tcp_server + .local_addr() + .expect("retrieving dest addr failed"); + let conn_handled: Arc = Default::default(); + let set_if_done = conn_handled.clone(); + // Call the connection handler in separate thread, does block. + thread::spawn(move || { + let result = tcp_server.handle_next_connection(); + if result.is_err() { + panic!("handling connection failed: {:?}", result.unwrap_err()); + } + let conn_result = result.unwrap(); + assert_eq!(conn_result.num_received_tcs, 1); + assert_eq!(conn_result.num_sent_tms, 0); + set_if_done.store(true, Ordering::Relaxed); + }); + // Send TC to server now. + let mut encoded_buf: [u8; 16] = [0; 16]; + let mut current_idx = 0; + encode_simple_packet(&mut encoded_buf, &mut current_idx); + let mut stream = TcpStream::connect(dest_addr).expect("connecting to TCP server failed"); + stream + .write_all(&encoded_buf[..current_idx]) + .expect("writing to TCP server failed"); + drop(stream); + // A certain amount of time is allowed for the transaction to complete. + for _ in 0..3 { + if !conn_handled.load(Ordering::Relaxed) { + thread::sleep(Duration::from_millis(5)); + } + } + if !conn_handled.load(Ordering::Relaxed) { + panic!("connection was not handled properly"); + } + // Check that the packet was received and decoded successfully. + let mut tc_queue = tc_receiver + .tc_queue + .lock() + .expect("locking tc queue failed"); + assert_eq!(tc_queue.len(), 1); + assert_eq!(tc_queue.pop_front().unwrap(), &SIMPLE_PACKET); + drop(tc_queue); + } + + #[test] + fn test_server_basic_multi_tm_multi_tc() { + let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0); + let tc_receiver = SyncTcCacher::default(); + let mut tm_source = SyncTmSource::default(); + tm_source.add_tm(&INVERTED_PACKET); + tm_source.add_tm(&SIMPLE_PACKET); + let mut tcp_server = + generic_tmtc_server(&auto_port_addr, tc_receiver.clone(), tm_source.clone()); + let dest_addr = tcp_server + .local_addr() + .expect("retrieving dest addr failed"); + let conn_handled: Arc = Default::default(); + let set_if_done = conn_handled.clone(); + // Call the connection handler in separate thread, does block. + thread::spawn(move || { + let result = tcp_server.handle_next_connection(); + if result.is_err() { + panic!("handling connection failed: {:?}", result.unwrap_err()); + } + let conn_result = result.unwrap(); + assert_eq!(conn_result.num_received_tcs, 2, "Not enough TCs received"); + assert_eq!(conn_result.num_sent_tms, 2, "Not enough TMs received"); + set_if_done.store(true, Ordering::Relaxed); + }); + // Send TC to server now. + let mut encoded_buf: [u8; 32] = [0; 32]; + let mut current_idx = 0; + encode_simple_packet(&mut encoded_buf, &mut current_idx); + encode_inverted_packet(&mut encoded_buf, &mut current_idx); + let mut stream = TcpStream::connect(dest_addr).expect("connecting to TCP server failed"); + stream + .set_read_timeout(Some(Duration::from_millis(10))) + .expect("setting reas timeout failed"); + stream + .write_all(&encoded_buf[..current_idx]) + .expect("writing to TCP server failed"); + // Done with writing. + stream + .shutdown(std::net::Shutdown::Write) + .expect("shutting down write failed"); + let mut read_buf: [u8; 16] = [0; 16]; + let mut read_len_total = 0; + // Timeout ensures this does not block forever. + while read_len_total < 16 { + let read_len = stream.read(&mut read_buf).expect("read failed"); + read_len_total += read_len; + // Read until full expected size is available. + if read_len == 16 { + // Read first TM packet. + current_idx = 0; + assert_eq!(read_len, 16); + assert_eq!(read_buf[0], 0); + current_idx += 1; + let mut dec_report = cobs::decode_in_place_report(&mut read_buf[current_idx..]) + .expect("COBS decoding failed"); + assert_eq!(dec_report.dst_used, 5); + // Skip first sentinel byte. + assert_eq!( + &read_buf[current_idx..current_idx + INVERTED_PACKET.len()], + &INVERTED_PACKET + ); + current_idx += dec_report.src_used; + // End sentinel. + assert_eq!(read_buf[current_idx], 0, "invalid sentinel end byte"); + current_idx += 1; + + // Read second TM packet. + assert_eq!(read_buf[current_idx], 0); + current_idx += 1; + dec_report = cobs::decode_in_place_report(&mut read_buf[current_idx..]) + .expect("COBS decoding failed"); + assert_eq!(dec_report.dst_used, 5); + // Skip first sentinel byte. + assert_eq!( + &read_buf[current_idx..current_idx + SIMPLE_PACKET.len()], + &SIMPLE_PACKET + ); + current_idx += dec_report.src_used; + // End sentinel. + assert_eq!(read_buf[current_idx], 0); + break; + } + } + drop(stream); + + // A certain amount of time is allowed for the transaction to complete. + for _ in 0..3 { + if !conn_handled.load(Ordering::Relaxed) { + thread::sleep(Duration::from_millis(5)); + } + } + if !conn_handled.load(Ordering::Relaxed) { + panic!("connection was not handled properly"); + } + // Check that the packet was received and decoded successfully. + let mut tc_queue = tc_receiver + .tc_queue + .lock() + .expect("locking tc queue failed"); + assert_eq!(tc_queue.len(), 2); + assert_eq!(tc_queue.pop_front().unwrap(), &SIMPLE_PACKET); + assert_eq!(tc_queue.pop_front().unwrap(), &INVERTED_PACKET); + drop(tc_queue); + } +} diff --git a/satrs-core/src/hal/host/udp_server.rs b/satrs-core/src/hal/std/udp_server.rs similarity index 93% rename from satrs-core/src/hal/host/udp_server.rs rename to satrs-core/src/hal/std/udp_server.rs index de5a3f0..28e0328 100644 --- a/satrs-core/src/hal/host/udp_server.rs +++ b/satrs-core/src/hal/std/udp_server.rs @@ -1,4 +1,4 @@ -//! UDP server helper components +//! Generic UDP TC server. use crate::tmtc::{ReceivesTc, ReceivesTcCore}; use std::boxed::Box; use std::io::{Error, ErrorKind}; @@ -6,7 +6,8 @@ use std::net::{SocketAddr, ToSocketAddrs, UdpSocket}; use std::vec; use std::vec::Vec; -/// This TC server helper can be used to receive raw PUS telecommands thorough a UDP interface. +/// This UDP server can be used to receive CCSDS space packet telecommands or any other telecommand +/// format. /// /// It caches all received telecomands into a vector. The maximum expected telecommand size should /// be declared upfront. This avoids dynamic allocation during run-time. The user can specify a TC @@ -19,7 +20,7 @@ use std::vec::Vec; /// ``` /// use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}; /// use spacepackets::ecss::SerializablePusPacket; -/// use satrs_core::hal::host::udp_server::UdpTcServer; +/// use satrs_core::hal::std::udp_server::UdpTcServer; /// use satrs_core::tmtc::{ReceivesTc, ReceivesTcCore}; /// use spacepackets::SpHeader; /// use spacepackets::ecss::tc::PusTcCreator; @@ -51,9 +52,9 @@ use std::vec::Vec; /// .expect("Error sending PUS TC via UDP"); /// ``` /// -/// The [fsrc-example crate](https://egit.irs.uni-stuttgart.de/rust/fsrc-launchpad/src/branch/main/fsrc-example) +/// The [satrs-example crate](https://egit.irs.uni-stuttgart.de/rust/fsrc-launchpad/src/branch/main/satrs-example) /// server code also includes -/// [example code](https://egit.irs.uni-stuttgart.de/rust/fsrc-launchpad/src/branch/main/fsrc-example/src/bin/obsw/tmtc.rs) +/// [example code](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-example/src/tmtc.rs#L67) /// on how to use this TC server. It uses the server to receive PUS telecommands on a specific port /// and then forwards them to a generic CCSDS packet receiver. pub struct UdpTcServer { @@ -140,7 +141,7 @@ impl UdpTcServer { #[cfg(test)] mod tests { - use crate::hal::host::udp_server::{ReceiveResult, UdpTcServer}; + use crate::hal::std::udp_server::{ReceiveResult, UdpTcServer}; use crate::tmtc::ReceivesTcCore; use spacepackets::ecss::tc::PusTcCreator; use spacepackets::ecss::SerializablePusPacket; diff --git a/satrs-core/src/lib.rs b/satrs-core/src/lib.rs index 949ffa5..22699e0 100644 --- a/satrs-core/src/lib.rs +++ b/satrs-core/src/lib.rs @@ -21,6 +21,7 @@ extern crate downcast_rs; extern crate std; pub mod cfdp; +pub mod encoding; pub mod error; #[cfg(feature = "alloc")] #[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))] diff --git a/satrs-core/src/tmtc/mod.rs b/satrs-core/src/tmtc/mod.rs index 7237196..04f4299 100644 --- a/satrs-core/src/tmtc/mod.rs +++ b/satrs-core/src/tmtc/mod.rs @@ -72,12 +72,33 @@ pub trait ReceivesTcCore { /// Extension trait of [ReceivesTcCore] which allows downcasting by implementing [Downcast] and /// is also sendable. #[cfg(feature = "alloc")] -pub trait ReceivesTc: ReceivesTcCore + Downcast + Send {} +pub trait ReceivesTc: ReceivesTcCore + Downcast + Send { + // Remove this once trait upcasting coercion has been implemented. + // Tracking issue: https://github.com/rust-lang/rust/issues/65991 + fn upcast(&self) -> &dyn ReceivesTcCore; + // Remove this once trait upcasting coercion has been implemented. + // Tracking issue: https://github.com/rust-lang/rust/issues/65991 + fn upcast_mut(&mut self) -> &mut dyn ReceivesTcCore; +} /// Blanket implementation to automatically implement [ReceivesTc] when the [alloc] feature /// is enabled. #[cfg(feature = "alloc")] -impl ReceivesTc for T where T: ReceivesTcCore + Send + 'static {} +impl ReceivesTc for T +where + T: ReceivesTcCore + Send + 'static, +{ + // Remove this once trait upcasting coercion has been implemented. + // Tracking issue: https://github.com/rust-lang/rust/issues/65991 + fn upcast(&self) -> &dyn ReceivesTcCore { + self + } + // Remove this once trait upcasting coercion has been implemented. + // Tracking issue: https://github.com/rust-lang/rust/issues/65991 + fn upcast_mut(&mut self) -> &mut dyn ReceivesTcCore { + self + } +} #[cfg(feature = "alloc")] impl_downcast!(ReceivesTc assoc Error); @@ -92,3 +113,41 @@ pub trait ReceivesCcsdsTc { type Error; fn pass_ccsds(&mut self, header: &SpHeader, tc_raw: &[u8]) -> Result<(), Self::Error>; } + +/// Generic trait for a TM packet source, with no restrictions on the type of TM. +/// Implementors write the telemetry into the provided buffer and return the size of the telemetry. +pub trait TmPacketSourceCore { + type Error; + fn retrieve_packet(&mut self, buffer: &mut [u8]) -> Result; +} + +/// Extension trait of [TmPacketSourceCore] which allows downcasting by implementing [Downcast] and +/// is also sendable. +#[cfg(feature = "alloc")] +pub trait TmPacketSource: TmPacketSourceCore + Downcast + Send { + // Remove this once trait upcasting coercion has been implemented. + // Tracking issue: https://github.com/rust-lang/rust/issues/65991 + fn upcast(&self) -> &dyn TmPacketSourceCore; + // Remove this once trait upcasting coercion has been implemented. + // Tracking issue: https://github.com/rust-lang/rust/issues/65991 + fn upcast_mut(&mut self) -> &mut dyn TmPacketSourceCore; +} + +/// Blanket implementation to automatically implement [ReceivesTc] when the [alloc] feature +/// is enabled. +#[cfg(feature = "alloc")] +impl TmPacketSource for T +where + T: TmPacketSourceCore + Send + 'static, +{ + // Remove this once trait upcasting coercion has been implemented. + // Tracking issue: https://github.com/rust-lang/rust/issues/65991 + fn upcast(&self) -> &dyn TmPacketSourceCore { + self + } + // Remove this once trait upcasting coercion has been implemented. + // Tracking issue: https://github.com/rust-lang/rust/issues/65991 + fn upcast_mut(&mut self) -> &mut dyn TmPacketSourceCore { + self + } +} diff --git a/satrs-core/tests/tcp_server_cobs.rs b/satrs-core/tests/tcp_server_cobs.rs new file mode 100644 index 0000000..5956fb3 --- /dev/null +++ b/satrs-core/tests/tcp_server_cobs.rs @@ -0,0 +1,156 @@ +//! This serves as both an integration test and an example application showcasing all major +//! features of the TCP COBS server by performing following steps: +//! +//! 1. It defines both a TC receiver and a TM source which are [Sync]. +//! 2. A telemetry packet is inserted into the TM source. The packet will be handled by the +//! TCP server after handling all TCs. +//! 3. It instantiates the TCP server on localhost with automatic port assignment and assigns +//! the TC receiver and TM source created previously. +//! 4. It moves the TCP server to a different thread and calls the +//! [TcpTmtcInCobsServer::handle_next_connection] call inside that thread +//! 5. The main threads connects to the server, sends a test telecommand and then reads back +//! the test telemetry insertd in to the TM source previously. +use core::{ + sync::atomic::{AtomicBool, Ordering}, + time::Duration, +}; +use std::{ + io::{Read, Write}, + net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream}, + sync::Mutex, + thread, +}; + +use satrs_core::{ + encoding::cobs::encode_packet_with_cobs, + hal::std::tcp_server::{ServerConfig, TcpTmtcInCobsServer}, + tmtc::{ReceivesTcCore, TmPacketSourceCore}, +}; +use std::{boxed::Box, collections::VecDeque, sync::Arc, vec::Vec}; + +#[derive(Default, Clone)] +struct SyncTcCacher { + tc_queue: Arc>>>, +} +impl ReceivesTcCore for SyncTcCacher { + type Error = (); + + fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> { + let mut tc_queue = self.tc_queue.lock().expect("tc forwarder failed"); + println!("Received TC: {:x?}", tc_raw); + tc_queue.push_back(tc_raw.to_vec()); + Ok(()) + } +} + +#[derive(Default, Clone)] +struct SyncTmSource { + tm_queue: Arc>>>, +} + +impl SyncTmSource { + pub(crate) fn add_tm(&mut self, tm: &[u8]) { + let mut tm_queue = self.tm_queue.lock().expect("locking tm queue failec"); + tm_queue.push_back(tm.to_vec()); + } +} + +impl TmPacketSourceCore for SyncTmSource { + type Error = (); + + fn retrieve_packet(&mut self, buffer: &mut [u8]) -> Result { + let mut tm_queue = self.tm_queue.lock().expect("locking tm queue failed"); + if !tm_queue.is_empty() { + let next_vec = tm_queue.front().unwrap(); + if buffer.len() < next_vec.len() { + panic!( + "provided buffer too small, must be at least {} bytes", + next_vec.len() + ); + } + println!("Sending and encoding TM: {:x?}", next_vec); + let next_vec = tm_queue.pop_front().unwrap(); + buffer[0..next_vec.len()].copy_from_slice(&next_vec); + return Ok(next_vec.len()); + } + Ok(0) + } +} + +const SIMPLE_PACKET: [u8; 5] = [1, 2, 3, 4, 5]; +const INVERTED_PACKET: [u8; 5] = [5, 4, 3, 4, 1]; + +fn main() { + let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0); + let tc_receiver = SyncTcCacher::default(); + let mut tm_source = SyncTmSource::default(); + // Insert a telemetry packet which will be read back by the client at a later stage. + tm_source.add_tm(&INVERTED_PACKET); + let mut tcp_server = TcpTmtcInCobsServer::new( + ServerConfig::new(auto_port_addr, Duration::from_millis(2), 1024, 1024), + Box::new(tm_source), + Box::new(tc_receiver.clone()), + ) + .expect("TCP server generation failed"); + let dest_addr = tcp_server + .local_addr() + .expect("retrieving dest addr failed"); + let conn_handled: Arc = Default::default(); + let set_if_done = conn_handled.clone(); + + // Call the connection handler in separate thread, does block. + thread::spawn(move || { + let result = tcp_server.handle_next_connection(); + if result.is_err() { + panic!("handling connection failed: {:?}", result.unwrap_err()); + } + let conn_result = result.unwrap(); + assert_eq!(conn_result.num_received_tcs, 1, "No TC received"); + assert_eq!(conn_result.num_sent_tms, 1, "No TM received"); + // Signal the main thread we are done. + set_if_done.store(true, Ordering::Relaxed); + }); + + // Send TC to server now. + let mut encoded_buf: [u8; 16] = [0; 16]; + let mut current_idx = 0; + encode_packet_with_cobs(&SIMPLE_PACKET, &mut encoded_buf, &mut current_idx); + let mut stream = TcpStream::connect(dest_addr).expect("connecting to TCP server failed"); + stream + .write_all(&encoded_buf[..current_idx]) + .expect("writing to TCP server failed"); + // Done with writing. + stream + .shutdown(std::net::Shutdown::Write) + .expect("shutting down write failed"); + let mut read_buf: [u8; 16] = [0; 16]; + let read_len = stream.read(&mut read_buf).expect("read failed"); + drop(stream); + + // 1 byte encoding overhead, 2 sentinel bytes. + assert_eq!(read_len, 8); + assert_eq!(read_buf[0], 0); + assert_eq!(read_buf[read_len - 1], 0); + let decoded_len = + cobs::decode_in_place(&mut read_buf[1..read_len]).expect("COBS decoding failed"); + assert_eq!(decoded_len, 5); + // Skip first sentinel byte. + assert_eq!(&read_buf[1..1 + INVERTED_PACKET.len()], &INVERTED_PACKET); + // A certain amount of time is allowed for the transaction to complete. + for _ in 0..3 { + if !conn_handled.load(Ordering::Relaxed) { + thread::sleep(Duration::from_millis(5)); + } + } + if !conn_handled.load(Ordering::Relaxed) { + panic!("connection was not handled properly"); + } + // Check that the packet was received and decoded successfully. + let mut tc_queue = tc_receiver + .tc_queue + .lock() + .expect("locking tc queue failed"); + assert_eq!(tc_queue.len(), 1); + assert_eq!(tc_queue.pop_front().unwrap(), &SIMPLE_PACKET); + drop(tc_queue); +} diff --git a/satrs-example/src/tmtc.rs b/satrs-example/src/tmtc.rs index 188a59d..5d2ea5e 100644 --- a/satrs-example/src/tmtc.rs +++ b/satrs-example/src/tmtc.rs @@ -1,5 +1,5 @@ use log::{info, warn}; -use satrs_core::hal::host::udp_server::{ReceiveResult, UdpTcServer}; +use satrs_core::hal::std::udp_server::{ReceiveResult, UdpTcServer}; use std::net::SocketAddr; use std::sync::mpsc::{Receiver, SendError, Sender, TryRecvError}; use std::thread; diff --git a/satrs-mib/Cargo.toml b/satrs-mib/Cargo.toml index f6dd23a..cdfa4f5 100644 --- a/satrs-mib/Cargo.toml +++ b/satrs-mib/Cargo.toml @@ -23,8 +23,9 @@ version = "1" optional = true [dependencies.satrs-core] -version = "0.1.0-alpha.0" -# path = "../satrs-core" +# version = "0.1.0-alpha.0" +git = "https://egit.irs.uni-stuttgart.de/rust/sat-rs.git" +rev = "35e1f7a983f6535c5571186e361fe101d4306b89" [dependencies.satrs-mib-codegen] path = "codegen" diff --git a/satrs-mib/codegen/Cargo.toml b/satrs-mib/codegen/Cargo.toml index ccc4d1a..db6a671 100644 --- a/satrs-mib/codegen/Cargo.toml +++ b/satrs-mib/codegen/Cargo.toml @@ -20,8 +20,9 @@ quote = "1" proc-macro2 = "1" [dependencies.satrs-core] -version = "0.1.0-alpha.0" -# path = "../../satrs-core" +# version = "0.1.0-alpha.0" +git = "https://egit.irs.uni-stuttgart.de/rust/sat-rs.git" +rev = "35e1f7a983f6535c5571186e361fe101d4306b89" [dev-dependencies] trybuild = { version = "1", features = ["diff"] }