cleaned up code and bumped sat-rs

This commit is contained in:
Robin Müller 2024-04-15 12:16:01 +02:00
parent 6dddfd5a70
commit df72676c0d
Signed by: muellerr
GPG Key ID: A649FB78196E3849
14 changed files with 273 additions and 340 deletions

101
Cargo.lock generated
View File

@ -25,9 +25,9 @@ dependencies = [
[[package]]
name = "allocator-api2"
version = "0.2.16"
version = "0.2.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0942ffc6dcaadf03badf6e6a2d0228460359d5e34b57ccdc720b7382dfbd5ec5"
checksum = "5c6cb57a04249c6480766f7f7cef5467412af1490f8d1e243141daddada3264f"
[[package]]
name = "android-tzdata"
@ -138,9 +138,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
[[package]]
name = "cc"
version = "1.0.92"
version = "1.0.94"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2678b2e3449475e95b0aa6f9b506a28e61b3dc8996592b983695e8ebb58a8b41"
checksum = "17f6e324229dc011159fcc089755d1e2e216a90d43a7dea6853ca740b84f35e7"
[[package]]
name = "cfg-if"
@ -160,7 +160,7 @@ dependencies = [
"num-traits",
"serde",
"wasm-bindgen",
"windows-targets 0.52.4",
"windows-targets 0.52.5",
]
[[package]]
@ -250,7 +250,7 @@ checksum = "d150dea618e920167e5973d70ae6ece4385b7164e0d799fe7c122dd0a5d912ad"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.58",
"syn 2.0.59",
]
[[package]]
@ -464,7 +464,7 @@ dependencies = [
"proc-macro-crate",
"proc-macro2",
"quote",
"syn 2.0.58",
"syn 2.0.59",
]
[[package]]
@ -521,9 +521,9 @@ dependencies = [
[[package]]
name = "proc-macro2"
version = "1.0.79"
version = "1.0.80"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e835ff2298f5721608eb1a980ecaee1aef2c132bf95ecc026a11b7bf3c01c02e"
checksum = "a56dea16b0a29e94408b9aa5e2940a4eedbd128a1ba20e8f7ae60fd3d465af0e"
dependencies = [
"unicode-ident",
]
@ -590,7 +590,7 @@ checksum = "e86697c916019a8588c99b5fac3cead74ec0b4b819707a682fd4d23fa0ce1ba1"
[[package]]
name = "satrs"
version = "0.2.0-rc.0"
source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#d43a8eb571ea3f2198f75ecdc125921272d6bc7f"
source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=rework-tmtc-modules#301a7a19a62a4f4541e8c7299cd67ab347c44352"
dependencies = [
"bus",
"cobs",
@ -615,7 +615,7 @@ dependencies = [
[[package]]
name = "satrs-mib"
version = "0.1.1"
source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#d43a8eb571ea3f2198f75ecdc125921272d6bc7f"
source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=rework-tmtc-modules#301a7a19a62a4f4541e8c7299cd67ab347c44352"
dependencies = [
"csv",
"satrs-mib-codegen",
@ -627,17 +627,17 @@ dependencies = [
[[package]]
name = "satrs-mib-codegen"
version = "0.1.1"
source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#d43a8eb571ea3f2198f75ecdc125921272d6bc7f"
source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=rework-tmtc-modules#301a7a19a62a4f4541e8c7299cd67ab347c44352"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.58",
"syn 2.0.59",
]
[[package]]
name = "satrs-shared"
version = "0.1.3"
source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#d43a8eb571ea3f2198f75ecdc125921272d6bc7f"
source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=rework-tmtc-modules#301a7a19a62a4f4541e8c7299cd67ab347c44352"
dependencies = [
"serde",
"spacepackets",
@ -671,7 +671,7 @@ checksum = "7eb0b34b42edc17f6b7cac84a52a1c5f0e1bb2227e997ca9011ea3dd34e8610b"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.58",
"syn 2.0.59",
]
[[package]]
@ -734,7 +734,7 @@ dependencies = [
"proc-macro2",
"quote",
"rustversion",
"syn 2.0.58",
"syn 2.0.59",
]
[[package]]
@ -750,9 +750,9 @@ dependencies = [
[[package]]
name = "syn"
version = "2.0.58"
version = "2.0.59"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44cfb93f38070beee36b3fef7d4f5a16f27751d94b187b666a5cc5e9b0d30687"
checksum = "4a6531ffc7b071655e4ce2e04bd464c4830bb585a61cabb96cf808f05172615a"
dependencies = [
"proc-macro2",
"quote",
@ -776,7 +776,7 @@ checksum = "c61f3ba182994efc43764a46c018c347bc492c79f024e705f46567b418f6d4f7"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.58",
"syn 2.0.59",
]
[[package]]
@ -841,7 +841,7 @@ dependencies = [
"once_cell",
"proc-macro2",
"quote",
"syn 2.0.58",
"syn 2.0.59",
"wasm-bindgen-shared",
]
@ -863,7 +863,7 @@ checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.58",
"syn 2.0.59",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
@ -880,7 +880,7 @@ version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9"
dependencies = [
"windows-targets 0.52.4",
"windows-targets 0.52.5",
]
[[package]]
@ -898,7 +898,7 @@ version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d"
dependencies = [
"windows-targets 0.52.4",
"windows-targets 0.52.5",
]
[[package]]
@ -918,17 +918,18 @@ dependencies = [
[[package]]
name = "windows-targets"
version = "0.52.4"
version = "0.52.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7dd37b7e5ab9018759f893a1952c9420d060016fc19a472b4bb20d1bdd694d1b"
checksum = "6f0713a46559409d202e70e28227288446bf7841d3211583a4b53e3f6d96e7eb"
dependencies = [
"windows_aarch64_gnullvm 0.52.4",
"windows_aarch64_msvc 0.52.4",
"windows_i686_gnu 0.52.4",
"windows_i686_msvc 0.52.4",
"windows_x86_64_gnu 0.52.4",
"windows_x86_64_gnullvm 0.52.4",
"windows_x86_64_msvc 0.52.4",
"windows_aarch64_gnullvm 0.52.5",
"windows_aarch64_msvc 0.52.5",
"windows_i686_gnu 0.52.5",
"windows_i686_gnullvm",
"windows_i686_msvc 0.52.5",
"windows_x86_64_gnu 0.52.5",
"windows_x86_64_gnullvm 0.52.5",
"windows_x86_64_msvc 0.52.5",
]
[[package]]
@ -939,9 +940,9 @@ checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8"
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.52.4"
version = "0.52.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bcf46cf4c365c6f2d1cc93ce535f2c8b244591df96ceee75d8e83deb70a9cac9"
checksum = "7088eed71e8b8dda258ecc8bac5fb1153c5cffaf2578fc8ff5d61e23578d3263"
[[package]]
name = "windows_aarch64_msvc"
@ -951,9 +952,9 @@ checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc"
[[package]]
name = "windows_aarch64_msvc"
version = "0.52.4"
version = "0.52.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da9f259dd3bcf6990b55bffd094c4f7235817ba4ceebde8e6d11cd0c5633b675"
checksum = "9985fd1504e250c615ca5f281c3f7a6da76213ebd5ccc9561496568a2752afb6"
[[package]]
name = "windows_i686_gnu"
@ -963,9 +964,15 @@ checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e"
[[package]]
name = "windows_i686_gnu"
version = "0.52.4"
version = "0.52.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b474d8268f99e0995f25b9f095bc7434632601028cf86590aea5c8a5cb7801d3"
checksum = "88ba073cf16d5372720ec942a8ccbf61626074c6d4dd2e745299726ce8b89670"
[[package]]
name = "windows_i686_gnullvm"
version = "0.52.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87f4261229030a858f36b459e748ae97545d6f1ec60e5e0d6a3d32e0dc232ee9"
[[package]]
name = "windows_i686_msvc"
@ -975,9 +982,9 @@ checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406"
[[package]]
name = "windows_i686_msvc"
version = "0.52.4"
version = "0.52.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1515e9a29e5bed743cb4415a9ecf5dfca648ce85ee42e15873c3cd8610ff8e02"
checksum = "db3c2bf3d13d5b658be73463284eaf12830ac9a26a90c717b7f771dfe97487bf"
[[package]]
name = "windows_x86_64_gnu"
@ -987,9 +994,9 @@ checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e"
[[package]]
name = "windows_x86_64_gnu"
version = "0.52.4"
version = "0.52.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5eee091590e89cc02ad514ffe3ead9eb6b660aedca2183455434b93546371a03"
checksum = "4e4246f76bdeff09eb48875a0fd3e2af6aada79d409d33011886d3e1581517d9"
[[package]]
name = "windows_x86_64_gnullvm"
@ -999,9 +1006,9 @@ checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.52.4"
version = "0.52.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77ca79f2451b49fa9e2af39f0747fe999fcda4f5e241b2898624dca97a1f2177"
checksum = "852298e482cd67c356ddd9570386e2862b5673c85bd5f88df9ab6802b334c596"
[[package]]
name = "windows_x86_64_msvc"
@ -1011,9 +1018,9 @@ checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538"
[[package]]
name = "windows_x86_64_msvc"
version = "0.52.4"
version = "0.52.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32b752e52a2da0ddfbdbcc6fceadfeede4c939ed16d13e648833a61dfb611ed8"
checksum = "bec47e5bfd1bff0eeaf6d8b485cc1074891a197ab4225d504cb7a1ab88b02bf0"
[[package]]
name = "winnow"
@ -1042,5 +1049,5 @@ checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.58",
"syn 2.0.59",
]

View File

@ -19,13 +19,13 @@ mio = "0.8"
[dependencies.satrs]
version = "0.2.0-rc.0"
git = "https://egit.irs.uni-stuttgart.de/rust/sat-rs.git"
branch = "main"
branch = "rework-tmtc-modules"
features = ["test_util"]
[dependencies.satrs-mib]
version = "0.1.1"
git = "https://egit.irs.uni-stuttgart.de/rust/sat-rs.git"
branch = "main"
branch = "rework-tmtc-modules"
[dev-dependencies]
env_logger = "0.11"

View File

@ -1,5 +1,6 @@
use lazy_static::lazy_static;
use num_enum::{IntoPrimitive, TryFromPrimitive};
use satrs::spacepackets::PacketId;
use satrs_mib::res_code::ResultU16Info;
use satrs_mib::resultcode;
use std::env;
@ -14,6 +15,7 @@ pub const SERVER_PORT: u16 = 7301;
pub const TCP_SPP_SERVER_PORT: u16 = 4096;
pub const EXPERIMENT_ID: u32 = 278;
pub const EXPERIMENT_APID: u16 = 1024 + EXPERIMENT_ID as u16;
pub const EXPERIMENT_PACKET_ID: PacketId = PacketId::new_for_tc(true, EXPERIMENT_APID);
#[derive(Copy, Clone, PartialEq, Eq, Debug, TryFromPrimitive, IntoPrimitive)]
#[repr(u8)]
@ -104,6 +106,9 @@ pub mod components {
PusAction = 4,
PusMode = 5,
PusHk = 6,
UdpServer = 7,
TcpServer = 8,
TcpSppClient = 9,
}
pub const CONTROLLER_ID: UniqueApidTargetId =
@ -120,6 +125,10 @@ pub mod components {
UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::PusMode as u32);
pub const PUS_HK_SERVICE: UniqueApidTargetId =
UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::PusHk as u32);
pub const UDP_SERVER: UniqueApidTargetId =
UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::UdpServer as u32);
pub const TCP_SERVER: UniqueApidTargetId =
UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::TcpServer as u32);
}
pub mod tasks {

View File

@ -1,7 +1,7 @@
use num_enum::TryFromPrimitive;
use satrs::{
action::ActionRequest,
pus::action::{ActionReplyVariant, PusActionReply},
pus::action::{ActionReplyPus, ActionReplyVariant},
request::{GenericMessage, MessageMetadata},
};
use std::{
@ -22,7 +22,7 @@ pub enum ActionId {
pub struct ExperimentController {
pub composite_request_rx: mpsc::Receiver<GenericMessage<CompositeRequest>>,
pub action_reply_tx: mpsc::Sender<GenericMessage<PusActionReply>>,
pub action_reply_tx: mpsc::Sender<GenericMessage<ActionReplyPus>>,
pub stop_signal: Arc<AtomicBool>,
home_path_stop_file: PathBuf,
tmp_path_stop_file: PathBuf,
@ -31,7 +31,7 @@ pub struct ExperimentController {
impl ExperimentController {
pub fn new(
composite_request_rx: mpsc::Receiver<GenericMessage<CompositeRequest>>,
action_reply_tx: mpsc::Sender<GenericMessage<PusActionReply>>,
action_reply_tx: mpsc::Sender<GenericMessage<ActionReplyPus>>,
stop_signal: Arc<AtomicBool>,
) -> Self {
let mut home_path_stop_file = PathBuf::new();

View File

@ -1,6 +1,6 @@
use std::{
collections::VecDeque,
sync::{atomic::AtomicBool, Arc, Mutex},
sync::{atomic::AtomicBool, mpsc, Arc, Mutex},
time::Duration,
};
@ -8,13 +8,12 @@ use log::{info, warn};
use ops_sat_rs::config::tasks::STOP_CHECK_FREQUENCY;
use satrs::{
hal::std::tcp_server::{HandledConnectionHandler, ServerConfig, TcpSpacepacketsServer},
pus::ReceivesEcssPusTc,
pus::PacketAsVec,
queue::GenericSendError,
spacepackets::PacketId,
tmtc::{CcsdsDistributor, CcsdsError, ReceivesCcsdsTc, TmPacketSourceCore},
tmtc::PacketSource,
};
use crate::tmtc::ccsds::CcsdsReceiver;
#[derive(Default, Clone)]
pub struct SyncTcpTmSource {
tm_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
@ -43,7 +42,7 @@ impl SyncTcpTmSource {
}
}
impl TmPacketSourceCore for SyncTcpTmSource {
impl PacketSource for SyncTcpTmSource {
type Error = ();
fn retrieve_packet(&mut self, buffer: &mut [u8]) -> Result<usize, Self::Error> {
@ -83,57 +82,38 @@ impl HandledConnectionHandler for ConnectionFinishedHandler {
}
}
pub type TcpServerType<TcSource, MpscErrorType> = TcpSpacepacketsServer<
pub type TcpServer = TcpSpacepacketsServer<
SyncTcpTmSource,
CcsdsDistributor<CcsdsReceiver<TcSource, MpscErrorType>, MpscErrorType>,
mpsc::Sender<PacketAsVec>,
Vec<PacketId>,
ConnectionFinishedHandler,
(),
CcsdsError<MpscErrorType>,
GenericSendError,
>;
pub struct TcpTask<
TcSource: ReceivesCcsdsTc<Error = MpscErrorType>
+ ReceivesEcssPusTc<Error = MpscErrorType>
+ Clone
+ Send
+ 'static,
MpscErrorType: 'static,
> {
server: TcpServerType<TcSource, MpscErrorType>,
}
pub struct TcpTask(pub TcpServer);
impl<
TcSource: ReceivesCcsdsTc<Error = MpscErrorType>
+ ReceivesEcssPusTc<Error = MpscErrorType>
+ Clone
+ Send
+ 'static,
MpscErrorType: 'static + core::fmt::Debug,
> TcpTask<TcSource, MpscErrorType>
{
impl TcpTask {
pub fn new(
cfg: ServerConfig,
tm_source: SyncTcpTmSource,
tc_receiver: CcsdsDistributor<CcsdsReceiver<TcSource, MpscErrorType>, MpscErrorType>,
tc_sender: mpsc::Sender<PacketAsVec>,
packet_id_lookup: Vec<PacketId>,
stop_signal: Arc<AtomicBool>,
) -> Result<Self, std::io::Error> {
Ok(Self {
server: TcpSpacepacketsServer::new(
Ok(Self(TcpSpacepacketsServer::new(
cfg,
tm_source,
tc_receiver,
tc_sender,
packet_id_lookup,
ConnectionFinishedHandler::default(),
Some(stop_signal),
)?,
})
)?))
}
pub fn periodic_operation(&mut self) {
let result = self
.server
.0
.handle_next_connection(Some(Duration::from_millis(STOP_CHECK_FREQUENCY)));
match result {
Ok(_conn_result) => (),

View File

@ -1,23 +1,37 @@
use std::io::{self, Read};
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::mpsc;
use std::time::Duration;
use mio::net::TcpStream;
use mio::{Events, Interest, Poll, Token};
use ops_sat_rs::config::TCP_SPP_SERVER_PORT;
use satrs::spacepackets::ecss::CCSDS_HEADER_LEN;
use satrs::spacepackets::{CcsdsPacket, SpHeader};
use ops_sat_rs::config::tasks::STOP_CHECK_FREQUENCY;
use ops_sat_rs::config::{EXPERIMENT_PACKET_ID, TCP_SPP_SERVER_PORT};
use satrs::encoding::ccsds::parse_buffer_for_ccsds_space_packets;
use satrs::pus::PacketAsVec;
use satrs::queue::GenericSendError;
use satrs::ComponentId;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum PacketForwardingError {
#[error("send error: {0}")]
Send(#[from] GenericSendError),
#[error("io error: {0}")]
Io(#[from] io::Error),
}
pub struct TcpSppClient {
id: ComponentId,
poll: Poll,
events: Events,
client: TcpStream,
read_buf: [u8; 4096],
tc_source_tx: mpsc::Sender<Vec<u8>>,
tc_source_tx: mpsc::Sender<PacketAsVec>,
}
impl TcpSppClient {
pub fn new() -> io::Result<Self> {
pub fn new(id: ComponentId, tc_source_tx: mpsc::Sender<PacketAsVec>) -> io::Result<Self> {
let poll = Poll::new()?;
let events = Events::with_capacity(128);
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 1)), TCP_SPP_SERVER_PORT);
@ -28,15 +42,20 @@ impl TcpSppClient {
Interest::READABLE | Interest::WRITABLE,
)?;
Ok(Self {
id,
poll,
events,
client,
read_buf: [0; 4096],
tc_source_tx,
})
}
pub fn periodic_operation(&mut self) -> io::Result<()> {
self.poll.poll(&mut self.events, None)?;
pub fn periodic_operation(&mut self) -> Result<(), PacketForwardingError> {
self.poll.poll(
&mut self.events,
Some(Duration::from_millis(STOP_CHECK_FREQUENCY)),
)?;
let events: Vec<mio::event::Event> = self.events.iter().cloned().collect();
for event in events {
if event.token() == Token(0) {
@ -52,32 +71,28 @@ impl TcpSppClient {
Ok(())
}
pub fn read_from_server(&mut self) -> io::Result<()> {
pub fn read_from_server(&mut self) -> Result<(), PacketForwardingError> {
match self.client.read(&mut self.read_buf) {
Ok(0) => return Err(io::Error::from(io::ErrorKind::BrokenPipe)),
Ok(n) => {
if n < CCSDS_HEADER_LEN + 1 {
log::warn!("received packet smaller than minimum expected packet size.");
log::debug!("{:?}", &self.read_buf[..n]);
return Ok(());
}
// We already checked that the received size has the minimum expected size.
let (sp_header, data) =
SpHeader::from_be_bytes(&self.read_buf[..n]).expect("parsing SP header failed");
let full_expected_packet_len = sp_header.total_len();
// We received an incomplete frame?
if n < full_expected_packet_len {
log::warn!(
"received incomplete SPP, with detected packet length {} but read buffer length {}",
full_expected_packet_len, n
);
return Ok(());
}
self.tc_source_tx
.send(self.read_buf[0..full_expected_packet_len].to_vec());
}
Err(e) => return Err(e),
Ok(0) => return Err(io::Error::from(io::ErrorKind::BrokenPipe).into()),
Ok(read_bytes) => self.handle_read_bytstream(read_bytes)?,
Err(e) => return Err(e.into()),
}
Ok(())
}
pub fn handle_read_bytstream(
&mut self,
read_bytes: usize,
) -> Result<(), PacketForwardingError> {
let mut dummy = 0;
// This parser is able to deal with broken tail packets, but we ignore those for now..
parse_buffer_for_ccsds_space_packets(
&mut self.read_buf[..read_bytes],
&[EXPERIMENT_PACKET_ID].as_slice(),
self.id,
&mut self.tc_source_tx,
&mut dummy,
)?;
Ok(())
}
}

View File

@ -2,18 +2,18 @@ use std::net::{SocketAddr, UdpSocket};
use std::sync::mpsc;
use log::{info, warn};
use satrs::pus::PusTmAsVec;
use satrs::{
hal::std::udp_server::{ReceiveResult, UdpTcServer},
tmtc::CcsdsError,
};
use satrs::hal::std::udp_server::{ReceiveResult, UdpTcServer};
use satrs::pus::PacketAsVec;
use satrs::queue::GenericSendError;
use crate::pus::HandlingStatus;
pub trait UdpTmHandler {
fn send_tm_to_udp_client(&mut self, socket: &UdpSocket, recv_addr: &SocketAddr);
}
pub struct DynamicUdpTmHandler {
pub tm_rx: mpsc::Receiver<PusTmAsVec>,
pub tm_rx: mpsc::Receiver<PacketAsVec>,
}
impl UdpTmHandler for DynamicUdpTmHandler {
@ -34,42 +34,39 @@ impl UdpTmHandler for DynamicUdpTmHandler {
}
}
pub struct UdpTmtcServer<TmHandler: UdpTmHandler, SendError> {
pub udp_tc_server: UdpTcServer<CcsdsError<SendError>>,
pub struct UdpTmtcServer<TmHandler: UdpTmHandler> {
pub udp_tc_server: UdpTcServer<mpsc::Sender<PacketAsVec>, GenericSendError>,
pub tm_handler: TmHandler,
}
impl<TmHandler: UdpTmHandler, SendError: core::fmt::Debug + 'static>
UdpTmtcServer<TmHandler, SendError>
{
impl<TmHandler: UdpTmHandler> UdpTmtcServer<TmHandler> {
pub fn periodic_operation(&mut self) {
while self.poll_tc_server() {}
loop {
if self.poll_tc_server() == HandlingStatus::Empty {
break;
}
}
if let Some(recv_addr) = self.udp_tc_server.last_sender() {
self.tm_handler
.send_tm_to_udp_client(&self.udp_tc_server.socket, &recv_addr);
}
}
fn poll_tc_server(&mut self) -> bool {
fn poll_tc_server(&mut self) -> HandlingStatus {
match self.udp_tc_server.try_recv_tc() {
Ok(_) => true,
Err(e) => match e {
ReceiveResult::ReceiverError(e) => match e {
CcsdsError::ByteConversionError(e) => {
warn!("packet error: {e:?}");
true
Ok(_) => HandlingStatus::HandledOne,
Err(e) => {
match e {
ReceiveResult::NothingReceived => (),
ReceiveResult::Io(io_error) => {
warn!("Error receiving TC from UDP server: {io_error}");
}
CcsdsError::CustomError(e) => {
warn!("mpsc custom error {e:?}");
true
ReceiveResult::Send(send_error) => {
warn!("error sending TM to UDP client: {send_error}");
}
},
ReceiveResult::IoError(e) => {
warn!("IO error {e}");
false
}
ReceiveResult::NothingReceived => false,
},
HandlingStatus::Empty
}
}
}
}
@ -79,29 +76,35 @@ mod tests {
use std::{
collections::VecDeque,
net::IpAddr,
sync::{Arc, Mutex},
sync::{mpsc::TryRecvError, Arc, Mutex},
};
use ops_sat_rs::config::{components, OBSW_SERVER_ADDR};
use ops_sat_rs::config::{EXPERIMENT_APID, OBSW_SERVER_ADDR};
use satrs::{
spacepackets::{
ecss::{tc::PusTcCreator, WritablePusPacket},
SpHeader,
},
tmtc::ReceivesTcCore,
tmtc::PacketSenderRaw,
ComponentId,
};
use super::*;
const UDP_SERVER_ID: ComponentId = 0x05;
#[derive(Default, Debug, Clone)]
pub struct TestReceiver {
tc_vec: Arc<Mutex<VecDeque<Vec<u8>>>>,
tc_vec: Arc<Mutex<VecDeque<PacketAsVec>>>,
}
impl ReceivesTcCore for TestReceiver {
type Error = CcsdsError<()>;
fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> {
self.tc_vec.lock().unwrap().push_back(tc_raw.to_vec());
impl PacketSenderRaw for TestReceiver {
type Error = ();
fn send_packet(&self, sender_id: ComponentId, packet: &[u8]) -> Result<(), Self::Error> {
self.tc_vec
.lock()
.unwrap()
.push_back(PacketAsVec::new(sender_id, packet.to_vec()));
Ok(())
}
}
@ -120,9 +123,8 @@ mod tests {
#[test]
fn test_basic() {
let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), 0);
let test_receiver = TestReceiver::default();
let tc_queue = test_receiver.tc_vec.clone();
let udp_tc_server = UdpTcServer::new(sock_addr, 2048, Box::new(test_receiver)).unwrap();
let (tx, rx) = mpsc::channel();
let udp_tc_server = UdpTcServer::new(UDP_SERVER_ID, sock_addr, 2048, tx).unwrap();
let tm_handler = TestTmHandler::default();
let tm_handler_calls = tm_handler.addrs_to_send_to.clone();
let mut udp_dyn_server = UdpTmtcServer {
@ -130,16 +132,14 @@ mod tests {
tm_handler,
};
udp_dyn_server.periodic_operation();
assert!(tc_queue.lock().unwrap().is_empty());
assert!(tm_handler_calls.lock().unwrap().is_empty());
matches!(rx.try_recv(), Err(TryRecvError::Empty));
}
#[test]
fn test_transactions() {
let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), 0);
let test_receiver = TestReceiver::default();
let tc_queue = test_receiver.tc_vec.clone();
let udp_tc_server = UdpTcServer::new(sock_addr, 2048, Box::new(test_receiver)).unwrap();
let (tx, rx) = mpsc::channel();
let udp_tc_server = UdpTcServer::new(UDP_SERVER_ID, sock_addr, 2048, tx).unwrap();
let server_addr = udp_tc_server.socket.local_addr().unwrap();
let tm_handler = TestTmHandler::default();
let tm_handler_calls = tm_handler.addrs_to_send_to.clone();
@ -147,7 +147,7 @@ mod tests {
udp_tc_server,
tm_handler,
};
let sph = SpHeader::new_for_unseg_tc(components::Apid::GenericPus as u16, 0, 0);
let sph = SpHeader::new_for_unseg_tc(EXPERIMENT_APID, 0, 0);
let ping_tc = PusTcCreator::new_simple(sph, 17, 1, &[], true)
.to_vec()
.unwrap();
@ -157,10 +157,9 @@ mod tests {
client.send(&ping_tc).unwrap();
udp_dyn_server.periodic_operation();
{
let mut tc_queue = tc_queue.lock().unwrap();
assert!(!tc_queue.is_empty());
let received_tc = tc_queue.pop_front().unwrap();
assert_eq!(received_tc, ping_tc);
let packet_with_sender = rx.try_recv().unwrap();
assert_eq!(packet_with_sender.packet, ping_tc);
matches!(rx.try_recv(), Err(TryRecvError::Empty));
}
{
@ -171,7 +170,7 @@ mod tests {
assert_eq!(received_addr, client_addr);
}
udp_dyn_server.periodic_operation();
assert!(tc_queue.lock().unwrap().is_empty());
matches!(rx.try_recv(), Err(TryRecvError::Empty));
// Still tries to send to the same client.
{
let mut tm_handler_calls = tm_handler_calls.lock().unwrap();

View File

@ -7,7 +7,7 @@ use std::{
use log::info;
use ops_sat_rs::config::{
components::CONTROLLER_ID,
components::{CONTROLLER_ID, TCP_SERVER, UDP_SERVER},
tasks::{FREQ_MS_CTRL, FREQ_MS_PUS_STACK},
EXPERIMENT_APID,
};
@ -15,19 +15,16 @@ use ops_sat_rs::config::{tasks::FREQ_MS_UDP_TMTC, OBSW_SERVER_ADDR, SERVER_PORT}
use satrs::{
hal::std::{tcp_server::ServerConfig, udp_server::UdpTcServer},
spacepackets::PacketId,
tmtc::CcsdsDistributor,
};
use crate::pus::{PusReceiver, PusTcMpscRouter};
use crate::tmtc::tm_funnel::TmFunnelDynamic;
use crate::tmtc::TcSourceTaskDynamic;
use crate::pus::{PusTcDistributor, PusTcMpscRouter};
use crate::tmtc::tc_source::TcSourceTaskDynamic;
use crate::tmtc::tm_sink::TmFunnelDynamic;
use crate::{controller::ExperimentController, pus::test::create_test_service};
use crate::{
interface::tcp_server::{SyncTcpTmSource, TcpTask},
interface::udp_server::{DynamicUdpTmHandler, UdpTmtcServer},
logger::setup_logger,
tmtc::ccsds::CcsdsReceiver,
tmtc::PusTcSourceProviderDynamic,
};
use crate::{
pus::{action::create_action_service, stack::PusStack},
@ -51,8 +48,6 @@ fn main() {
let (tm_funnel_tx, tm_funnel_rx) = mpsc::channel();
let (tm_server_tx, tm_server_rx) = mpsc::channel();
let tc_source = PusTcSourceProviderDynamic(tc_source_tx);
let (pus_test_tx, pus_test_rx) = mpsc::channel();
// let (pus_event_tx, pus_event_rx) = mpsc::channel();
// let (pus_sched_tx, pus_sched_rx) = mpsc::channel();
@ -122,16 +117,13 @@ fn main() {
// pus_mode_service,
);
let ccsds_receiver = CcsdsReceiver { tc_source };
let mut tmtc_task = TcSourceTaskDynamic::new(
tc_source_rx,
PusReceiver::new(tm_funnel_tx.clone(), pus_router),
PusTcDistributor::new(tm_funnel_tx.clone(), pus_router),
);
let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT);
let udp_ccsds_distributor = CcsdsDistributor::new(ccsds_receiver.clone());
let udp_tc_server = UdpTcServer::new(sock_addr, 2048, Box::new(udp_ccsds_distributor))
let udp_tc_server = UdpTcServer::new(UDP_SERVER.id(), sock_addr, 2048, tc_source_tx.clone())
.expect("creating UDP TMTC server failed");
let mut udp_tmtc_server = UdpTmtcServer {
udp_tc_server,
@ -140,13 +132,18 @@ fn main() {
},
};
let tcp_ccsds_distributor = CcsdsDistributor::new(ccsds_receiver);
let tcp_server_cfg = ServerConfig::new(sock_addr, Duration::from_millis(400), 4096, 8192);
let tcp_server_cfg = ServerConfig::new(
TCP_SERVER.id(),
sock_addr,
Duration::from_millis(400),
4096,
8192,
);
let sync_tm_tcp_source = SyncTcpTmSource::new(200);
let mut tcp_server = TcpTask::new(
tcp_server_cfg,
sync_tm_tcp_source.clone(),
tcp_ccsds_distributor,
tc_source_tx.clone(),
vec![PacketId::new_for_tc(true, EXPERIMENT_APID)],
stop_signal.clone(),
)

View File

@ -4,16 +4,16 @@ use ops_sat_rs::config::tmtc_err;
use satrs::action::{ActionRequest, ActionRequestVariant};
use satrs::params::WritableToBeBytes;
use satrs::pus::action::{
ActionReplyVariant, ActivePusActionRequestStd, DefaultActiveActionRequestMap, PusActionReply,
ActionReplyPus, ActionReplyVariant, ActivePusActionRequestStd, DefaultActiveActionRequestMap,
};
use satrs::pus::verification::{
FailParams, FailParamsWithStep, TcStateAccepted, TcStateStarted, VerificationReporter,
VerificationReportingProvider, VerificationToken,
};
use satrs::pus::{
ActiveRequestProvider, EcssTcAndToken, EcssTcInVecConverter, EcssTmSenderCore, EcssTmtcError,
GenericConversionError, PusPacketHandlerResult, PusReplyHandler, PusServiceHelper,
PusTcToRequestConverter, PusTmAsVec,
ActiveRequestProvider, EcssTcAndToken, EcssTcInVecConverter, EcssTmSender, EcssTmtcError,
GenericConversionError, PacketAsVec, PusPacketHandlerResult, PusReplyHandler, PusServiceHelper,
PusTcToRequestConverter,
};
use satrs::request::{GenericMessage, UniqueApidTargetId};
use satrs::spacepackets::ecss::tc::PusTcReader;
@ -40,13 +40,13 @@ impl Default for ActionReplyHandler {
}
}
impl PusReplyHandler<ActivePusActionRequestStd, PusActionReply> for ActionReplyHandler {
impl PusReplyHandler<ActivePusActionRequestStd, ActionReplyPus> for ActionReplyHandler {
type Error = EcssTmtcError;
fn handle_unrequested_reply(
&mut self,
reply: &GenericMessage<PusActionReply>,
_tm_sender: &impl EcssTmSenderCore,
reply: &GenericMessage<ActionReplyPus>,
_tm_sender: &impl EcssTmSender,
) -> Result<(), Self::Error> {
warn!("received unexpected reply for service 8: {reply:?}");
Ok(())
@ -54,9 +54,9 @@ impl PusReplyHandler<ActivePusActionRequestStd, PusActionReply> for ActionReplyH
fn handle_reply(
&mut self,
reply: &GenericMessage<PusActionReply>,
reply: &GenericMessage<ActionReplyPus>,
active_request: &ActivePusActionRequestStd,
tm_sender: &(impl EcssTmSenderCore + ?Sized),
tm_sender: &(impl EcssTmSender + ?Sized),
verification_handler: &impl VerificationReportingProvider,
time_stamp: &[u8],
) -> Result<bool, Self::Error> {
@ -119,7 +119,7 @@ impl PusReplyHandler<ActivePusActionRequestStd, PusActionReply> for ActionReplyH
fn handle_request_timeout(
&mut self,
active_request: &ActivePusActionRequestStd,
tm_sender: &impl EcssTmSenderCore,
tm_sender: &impl EcssTmSender,
verification_handler: &impl VerificationReportingProvider,
time_stamp: &[u8],
) -> Result<(), Self::Error> {
@ -143,7 +143,7 @@ impl PusTcToRequestConverter<ActivePusActionRequestStd, ActionRequest> for Actio
&mut self,
token: VerificationToken<TcStateAccepted>,
tc: &PusTcReader,
tm_sender: &(impl EcssTmSenderCore + ?Sized),
tm_sender: &(impl EcssTmSender + ?Sized),
verif_reporter: &impl VerificationReportingProvider,
time_stamp: &[u8],
) -> Result<(ActivePusActionRequestStd, ActionRequest), Self::Error> {
@ -193,10 +193,10 @@ impl PusTcToRequestConverter<ActivePusActionRequestStd, ActionRequest> for Actio
}
pub fn create_action_service(
tm_funnel_tx: mpsc::Sender<PusTmAsVec>,
tm_funnel_tx: mpsc::Sender<PacketAsVec>,
pus_action_rx: mpsc::Receiver<EcssTcAndToken>,
action_router: GenericRequestRouter,
reply_receiver: mpsc::Receiver<GenericMessage<PusActionReply>>,
reply_receiver: mpsc::Receiver<GenericMessage<ActionReplyPus>>,
) -> ActionServiceWrapper {
let action_request_handler = PusTargetedRequestService::new(
PusServiceHelper::new(
@ -225,7 +225,7 @@ pub struct ActionServiceWrapper {
DefaultActiveActionRequestMap,
ActivePusActionRequestStd,
ActionRequest,
PusActionReply,
ActionReplyPus,
>,
}
@ -304,7 +304,7 @@ mod tests {
DefaultActiveActionRequestMap,
ActivePusActionRequestStd,
ActionRequest,
PusActionReply,
ActionReplyPus,
>
{
pub fn new_for_action(owner_id: ComponentId, target_id: ComponentId) -> Self {
@ -460,7 +460,7 @@ mod tests {
if let CompositeRequest::Action(action_req) = req.message {
assert_eq!(action_req.action_id, action_id);
assert_eq!(action_req.variant, ActionRequestVariant::NoData);
let action_reply = PusActionReply::new(action_id, ActionReplyVariant::Completed);
let action_reply = ActionReplyPus::new(action_id, ActionReplyVariant::Completed);
testbench
.reply_tx
.send(GenericMessage::new(req.requestor_info, action_reply))
@ -580,7 +580,7 @@ mod tests {
let (req_id, active_req) = testbench.add_tc(TEST_APID, TEST_UNIQUE_ID_0, &[]);
let active_action_req =
ActivePusActionRequestStd::new_from_common_req(action_id, active_req);
let reply = PusActionReply::new(action_id, ActionReplyVariant::Completed);
let reply = ActionReplyPus::new(action_id, ActionReplyVariant::Completed);
let generic_reply = GenericMessage::new(MessageMetadata::new(req_id.into(), 0), reply);
let result = testbench.handle_reply(&generic_reply, &active_action_req, &[]);
assert!(result.is_ok());
@ -601,7 +601,7 @@ mod tests {
let active_action_req =
ActivePusActionRequestStd::new_from_common_req(action_id, active_req);
let error_code = ResultU16::new(2, 3);
let reply = PusActionReply::new(
let reply = ActionReplyPus::new(
action_id,
ActionReplyVariant::CompletionFailed {
error_code,
@ -628,7 +628,7 @@ mod tests {
let (req_id, active_req) = testbench.add_tc(TEST_APID, TEST_UNIQUE_ID_0, &[]);
let active_action_req =
ActivePusActionRequestStd::new_from_common_req(action_id, active_req);
let reply = PusActionReply::new(action_id, ActionReplyVariant::StepSuccess { step: 1 });
let reply = ActionReplyPus::new(action_id, ActionReplyVariant::StepSuccess { step: 1 });
let generic_reply = GenericMessage::new(MessageMetadata::new(req_id.into(), 0), reply);
let result = testbench.handle_reply(&generic_reply, &active_action_req, &[]);
assert!(result.is_ok());
@ -655,7 +655,7 @@ mod tests {
let active_action_req =
ActivePusActionRequestStd::new_from_common_req(action_id, active_req);
let error_code = ResultU16::new(2, 3);
let reply = PusActionReply::new(
let reply = ActionReplyPus::new(
action_id,
ActionReplyVariant::StepFailed {
error_code,
@ -685,7 +685,7 @@ mod tests {
fn reply_handling_unrequested_reply() {
let mut testbench =
ReplyHandlerTestbench::new(TEST_COMPONENT_ID_0.id(), ActionReplyHandler::default());
let action_reply = PusActionReply::new(5_u32, ActionReplyVariant::Completed);
let action_reply = ActionReplyPus::new(5_u32, ActionReplyVariant::Completed);
let unrequested_reply =
GenericMessage::new(MessageMetadata::new(10_u32, 15_u64), action_reply);
// Right now this function does not do a lot. We simply check that it does not panic or do

View File

@ -3,7 +3,6 @@ pub mod stack;
pub mod test;
use crate::requests::GenericRequestRouter;
use crate::tmtc::MpscStoreAndSendError;
use log::warn;
use ops_sat_rs::config::components::PUS_ROUTING_SERVICE;
use ops_sat_rs::config::{tmtc_err, CustomPusServiceId};
@ -14,15 +13,14 @@ use satrs::pus::verification::{
};
use satrs::pus::{
ActiveRequestMapProvider, ActiveRequestProvider, EcssTcAndToken, EcssTcInMemConverter,
EcssTcInVecConverter, EcssTmSenderCore, EcssTmtcError, GenericConversionError,
GenericRoutingError, MpscTcReceiver, MpscTmAsVecSender, PusPacketHandlerResult,
PusPacketHandlingError, PusReplyHandler, PusRequestRouter, PusServiceHelper,
PusTcToRequestConverter, TcInMemory,
EcssTcInVecConverter, EcssTmSender, EcssTmtcError, GenericConversionError, GenericRoutingError,
MpscTcReceiver, MpscTmAsVecSender, PusPacketHandlerResult, PusPacketHandlingError,
PusReplyHandler, PusRequestRouter, PusServiceHelper, PusTcToRequestConverter,
};
use satrs::queue::GenericReceiveError;
use satrs::queue::{GenericReceiveError, GenericSendError};
use satrs::request::{Apid, GenericMessage, MessageMetadata};
use satrs::spacepackets::ecss::tc::PusTcReader;
use satrs::spacepackets::ecss::PusServiceId;
use satrs::spacepackets::ecss::{PusPacket, PusServiceId};
use satrs::ComponentId;
use std::fmt::Debug;
use std::sync::mpsc::{self, Sender};
@ -59,7 +57,7 @@ pub struct PusTcMpscRouter {
// pub mode_tc_sender: Sender<EcssTcAndToken>,
}
pub struct PusTcDistributor<TmSender: EcssTmSenderCore> {
pub struct PusTcDistributor<TmSender: EcssTmSender> {
pub id: ComponentId,
pub tm_sender: TmSender,
pub verif_reporter: VerificationReporter,
@ -67,7 +65,7 @@ pub struct PusTcDistributor<TmSender: EcssTmSenderCore> {
stamp_helper: TimeStampHelper,
}
impl<TmSender: EcssTmSenderCore> PusTcDistributor<TmSender> {
impl<TmSender: EcssTmSender> PusTcDistributor<TmSender> {
pub fn new(tm_sender: TmSender, pus_router: PusTcMpscRouter) -> Self {
Self {
id: PUS_ROUTING_SERVICE.raw(),
@ -83,25 +81,35 @@ impl<TmSender: EcssTmSenderCore> PusTcDistributor<TmSender> {
pub fn handle_tc_packet(
&mut self,
tc_in_memory: TcInMemory,
service: u8,
pus_tc: &PusTcReader,
) -> Result<PusPacketHandlerResult, MpscStoreAndSendError> {
let init_token = self.verif_reporter.add_tc(pus_tc);
sender_id: ComponentId,
tc: Vec<u8>,
) -> Result<PusPacketHandlerResult, GenericSendError> {
let pus_tc_result = PusTcReader::new(&tc);
if pus_tc_result.is_err() {
log::warn!(
"error creating PUS TC received from {}: {}",
sender_id,
pus_tc_result.unwrap_err()
);
log::warn!("raw data: {:x?}", tc);
return Ok(PusPacketHandlerResult::RequestHandled);
}
let pus_tc = pus_tc_result.unwrap().0;
let init_token = self.verif_reporter.add_tc(&pus_tc);
self.stamp_helper.update_from_now();
let accepted_token = self
.verif_reporter
.acceptance_success(&self.tm_sender, init_token, self.stamp_helper.stamp())
.expect("Acceptance success failure");
let service = PusServiceId::try_from(service);
let service = PusServiceId::try_from(pus_tc.service());
match service {
Ok(standard_service) => match standard_service {
PusServiceId::Test => self.pus_router.test_tc_sender.send(EcssTcAndToken {
tc_in_memory,
tc_in_memory: tc.into(),
token: Some(accepted_token.into()),
})?,
PusServiceId::Action => self.pus_router.action_tc_sender.send(EcssTcAndToken {
tc_in_memory,
tc_in_memory: tc.into(),
token: Some(accepted_token.into()),
})?,
// PusServiceId::Event => self.pus_router.event_tc_sender.send(EcssTcAndToken {
@ -436,7 +444,7 @@ where
/// and also log the error.
#[allow(dead_code)]
pub fn generic_pus_request_timeout_handler(
sender: &(impl EcssTmSenderCore + ?Sized),
sender: &(impl EcssTmSender + ?Sized),
active_request: &(impl ActiveRequestProvider + Debug),
verification_handler: &impl VerificationReportingProvider,
time_stamp: &[u8],
@ -460,7 +468,7 @@ pub(crate) mod tests {
use std::time::Duration;
use satrs::pus::test_util::TEST_COMPONENT_ID_0;
use satrs::pus::{MpscTmAsVecSender, PusTmAsVec, PusTmVariant};
use satrs::pus::{MpscTmAsVecSender, PacketAsVec, PusTmVariant};
use satrs::request::RequestId;
use satrs::{
pus::{
@ -490,7 +498,7 @@ pub(crate) mod tests {
pub id: ComponentId,
pub verif_reporter: TestVerificationReporter,
pub reply_handler: ReplyHandler,
pub tm_receiver: mpsc::Receiver<PusTmAsVec>,
pub tm_receiver: mpsc::Receiver<PacketAsVec>,
pub default_timeout: Duration,
tm_sender: MpscTmAsVecSender,
phantom: std::marker::PhantomData<(ActiveRequestInfo, Reply)>,
@ -590,7 +598,7 @@ pub(crate) mod tests {
/// Dummy sender component which does nothing on the [Self::send_tm] call.
///
/// Useful for unit tests.
impl EcssTmSenderCore for DummySender {
impl EcssTmSender for DummySender {
fn send_tm(&self, _source_id: ComponentId, _tm: PusTmVariant) -> Result<(), EcssTmtcError> {
Ok(())
}
@ -694,7 +702,7 @@ pub(crate) mod tests {
ReplyType,
>,
pub request_id: Option<RequestId>,
pub tm_funnel_rx: mpsc::Receiver<PusTmAsVec>,
pub tm_funnel_rx: mpsc::Receiver<PacketAsVec>,
pub pus_packet_tx: mpsc::Sender<EcssTcAndToken>,
pub reply_tx: mpsc::Sender<GenericMessage<ReplyType>>,
pub request_rx: mpsc::Receiver<GenericMessage<CompositeRequest>>,

View File

@ -7,7 +7,7 @@ use satrs::pus::test::PusService17TestHandler;
use satrs::pus::verification::{FailParams, VerificationReporter, VerificationReportingProvider};
use satrs::pus::{
EcssTcAndToken, EcssTcInMemConverter, EcssTcInVecConverter, MpscTcReceiver, MpscTmAsVecSender,
PusPacketHandlerResult, PusServiceHelper, PusTmAsVec,
PacketAsVec, PusPacketHandlerResult, PusServiceHelper,
};
use satrs::spacepackets::ecss::tc::PusTcReader;
use satrs::spacepackets::ecss::PusPacket;
@ -18,8 +18,7 @@ use std::sync::mpsc;
use super::HandlingStatus;
pub fn create_test_service(
tm_funnel_tx: mpsc::Sender<PusTmAsVec>,
// event_sender: mpsc::Sender<EventMessageU32>,
tm_funnel_tx: mpsc::Sender<PacketAsVec>,
pus_test_rx: mpsc::Receiver<EcssTcAndToken>,
) -> TestCustomServiceWrapper {
let pus17_handler = PusService17TestHandler::new(PusServiceHelper::new(

View File

@ -10,7 +10,7 @@ use satrs::mode::ModeRequest;
use satrs::pus::verification::{
FailParams, TcStateAccepted, VerificationReportingProvider, VerificationToken,
};
use satrs::pus::{ActiveRequestProvider, EcssTmSenderCore, GenericRoutingError, PusRequestRouter};
use satrs::pus::{ActiveRequestProvider, EcssTmSender, GenericRoutingError, PusRequestRouter};
use satrs::queue::GenericSendError;
use satrs::request::{GenericMessage, MessageMetadata, UniqueApidTargetId};
use satrs::spacepackets::ecss::tc::PusTcReader;
@ -49,7 +49,7 @@ impl GenericRequestRouter {
active_request: &impl ActiveRequestProvider,
tc: &PusTcReader,
error: GenericRoutingError,
tm_sender: &(impl EcssTmSenderCore + ?Sized),
tm_sender: &(impl EcssTmSender + ?Sized),
verif_reporter: &impl VerificationReportingProvider,
time_stamp: &[u8],
) {

View File

@ -1,94 +1,23 @@
use satrs::{pool::PoolProvider, tmtc::tc_helper::SharedTcPool};
use std::sync::mpsc::{self, TryRecvError};
use satrs::{
pool::StoreAddr,
pus::{MpscTmAsVecSender, MpscTmInSharedPoolSenderBounded},
spacepackets::ecss::{tc::PusTcReader, PusPacket},
};
use satrs::pus::{MpscTmAsVecSender, PacketAsVec};
use crate::pus::PusTcDistributor;
// TC source components where static pools are the backing memory of the received telecommands.
pub struct TcSourceTaskStatic {
shared_tc_pool: SharedTcPool,
tc_receiver: mpsc::Receiver<StoreAddr>,
tc_buf: [u8; 4096],
pus_receiver: PusTcDistributor<MpscTmInSharedPoolSenderBounded>,
}
impl TcSourceTaskStatic {
pub fn new(
shared_tc_pool: SharedTcPool,
tc_receiver: mpsc::Receiver<StoreAddr>,
pus_receiver: PusTcDistributor<MpscTmInSharedPoolSenderBounded>,
) -> Self {
Self {
shared_tc_pool,
tc_receiver,
tc_buf: [0; 4096],
pus_receiver,
}
}
pub fn periodic_operation(&mut self) {
self.poll_tc();
}
pub fn poll_tc(&mut self) -> bool {
match self.tc_receiver.try_recv() {
Ok(addr) => {
let pool = self
.shared_tc_pool
.0
.read()
.expect("locking tc pool failed");
pool.read(&addr, &mut self.tc_buf)
.expect("reading pool failed");
drop(pool);
match PusTcReader::new(&self.tc_buf) {
Ok((pus_tc, _)) => {
self.pus_receiver
.handle_tc_packet(
satrs::pus::TcInMemory::StoreAddr(addr),
pus_tc.service(),
&pus_tc,
)
.ok();
true
}
Err(e) => {
log::warn!("error creating PUS TC from raw data: {e}");
log::warn!("raw data: {:x?}", self.tc_buf);
true
}
}
}
Err(e) => match e {
TryRecvError::Empty => false,
TryRecvError::Disconnected => {
log::warn!("tmtc thread: sender disconnected");
false
}
},
}
}
}
// TC source components where the heap is the backing memory of the received telecommands.
pub struct TcSourceTaskDynamic {
pub tc_receiver: mpsc::Receiver<Vec<u8>>,
pus_receiver: PusTcDistributor<MpscTmAsVecSender>,
pub tc_receiver: mpsc::Receiver<PacketAsVec>,
pus_distrib: PusTcDistributor<MpscTmAsVecSender>,
}
impl TcSourceTaskDynamic {
pub fn new(
tc_receiver: mpsc::Receiver<Vec<u8>>,
tc_receiver: mpsc::Receiver<PacketAsVec>,
pus_receiver: PusTcDistributor<MpscTmAsVecSender>,
) -> Self {
Self {
tc_receiver,
pus_receiver,
pus_distrib: pus_receiver,
}
}
@ -97,25 +26,15 @@ impl TcSourceTaskDynamic {
}
pub fn poll_tc(&mut self) -> bool {
// Right now, we only expect PUS packets.
// Right now, we only expect PUS packets. If any other protocols like CFDP are added at
// a later stage, we probably need to check for the APID before routing the packet.
match self.tc_receiver.try_recv() {
Ok(tc) => match PusTcReader::new(&tc) {
Ok((pus_tc, _)) => {
self.pus_receiver
.handle_tc_packet(
satrs::pus::TcInMemory::Vec(tc.clone()),
pus_tc.service(),
&pus_tc,
)
Ok(packet_with_sender) => {
self.pus_distrib
.handle_tc_packet(packet_with_sender.sender_id, packet_with_sender.packet)
.ok();
true
}
Err(e) => {
log::warn!("error creating PUS TC from raw data: {e}");
log::warn!("raw data: {:x?}", tc);
true
}
},
Err(e) => match e {
TryRecvError::Empty => false,
TryRecvError::Disconnected => {

View File

@ -4,7 +4,7 @@ use std::{collections::HashMap, sync::mpsc, time::Duration};
use log::info;
use ops_sat_rs::config::tasks::STOP_CHECK_FREQUENCY;
use satrs::pus::PusTmAsVec;
use satrs::pus::PacketAsVec;
use satrs::{
seq_count::{CcsdsSimpleSeqCountProvider, SequenceCountProviderCore},
spacepackets::{
@ -76,16 +76,16 @@ impl TmFunnelCommon {
pub struct TmFunnelDynamic {
common: TmFunnelCommon,
tm_funnel_rx: mpsc::Receiver<PusTmAsVec>,
tm_server_tx: mpsc::Sender<PusTmAsVec>,
tm_funnel_rx: mpsc::Receiver<PacketAsVec>,
tm_server_tx: mpsc::Sender<PacketAsVec>,
stop_signal: Arc<AtomicBool>,
}
impl TmFunnelDynamic {
pub fn new(
sync_tm_tcp_source: SyncTcpTmSource,
tm_funnel_rx: mpsc::Receiver<PusTmAsVec>,
tm_server_tx: mpsc::Sender<PusTmAsVec>,
tm_funnel_rx: mpsc::Receiver<PacketAsVec>,
tm_server_tx: mpsc::Sender<PacketAsVec>,
stop_signal: Arc<AtomicBool>,
) -> Self {
Self {