Merge pull request 'Example: Add TCP server' (#85) from example-add-tcp-server into main
All checks were successful
Rust/sat-rs/pipeline/head This commit looks good

Reviewed-on: #85
This commit is contained in:
Robin Müller 2023-09-29 14:18:25 +02:00
commit ef8417d9db
15 changed files with 361 additions and 156 deletions

View File

@ -21,3 +21,9 @@ A lot of the architecture and general design considerations are based on the
through the 2 missions [FLP](https://www.irs.uni-stuttgart.de/en/research/satellitetechnology-and-instruments/smallsatelliteprogram/flying-laptop/) through the 2 missions [FLP](https://www.irs.uni-stuttgart.de/en/research/satellitetechnology-and-instruments/smallsatelliteprogram/flying-laptop/)
and [EIVE](https://www.irs.uni-stuttgart.de/en/research/satellitetechnology-and-instruments/smallsatelliteprogram/EIVE/). and [EIVE](https://www.irs.uni-stuttgart.de/en/research/satellitetechnology-and-instruments/smallsatelliteprogram/EIVE/).
# Getting started with the example
The [`satrs-example`](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-example)
provides various practical usage examples of the `sat-rs` framework. If you are more interested in
the practical application of `sat-rs` inside an application, it is recommended to have a look at
the example application.

View File

@ -30,6 +30,12 @@ impl PacketIdLookup for [u16] {
} }
} }
impl PacketIdLookup for &[u16] {
fn validate(&self, packet_id: u16) -> bool {
self.binary_search(&packet_id).is_ok()
}
}
#[cfg(feature = "alloc")] #[cfg(feature = "alloc")]
impl PacketIdLookup for Vec<PacketId> { impl PacketIdLookup for Vec<PacketId> {
fn validate(&self, packet_id: u16) -> bool { fn validate(&self, packet_id: u16) -> bool {
@ -49,6 +55,12 @@ impl PacketIdLookup for [PacketId] {
} }
} }
impl PacketIdLookup for &[PacketId] {
fn validate(&self, packet_id: u16) -> bool {
self.binary_search(&PacketId::from(packet_id)).is_ok()
}
}
/// This function parses a given buffer for tightly packed CCSDS space packets. It uses the /// This function parses a given buffer for tightly packed CCSDS space packets. It uses the
/// [PacketId] field of the CCSDS packets to detect the start of a CCSDS space packet and then /// [PacketId] field of the CCSDS packets to detect the start of a CCSDS space packet and then
/// uses the length field of the packet to extract CCSDS packets. /// uses the length field of the packet to extract CCSDS packets.

View File

@ -130,7 +130,7 @@ impl<TmError: 'static, TcError: 'static> TcpTmtcInCobsServer<TmError, TcError> {
cfg: ServerConfig, cfg: ServerConfig,
tm_source: Box<dyn TmPacketSource<Error = TmError>>, tm_source: Box<dyn TmPacketSource<Error = TmError>>,
tc_receiver: Box<dyn ReceivesTc<Error = TcError>>, tc_receiver: Box<dyn ReceivesTc<Error = TcError>>,
) -> Result<Self, TcpTmtcError<TmError, TcError>> { ) -> Result<Self, std::io::Error> {
Ok(Self { Ok(Self {
generic_server: TcpTmtcGenericServer::new( generic_server: TcpTmtcGenericServer::new(
cfg, cfg,

View File

@ -113,7 +113,7 @@ impl<TmError: 'static, TcError: 'static> TcpSpacepacketsServer<TmError, TcError>
tm_source: Box<dyn TmPacketSource<Error = TmError>>, tm_source: Box<dyn TmPacketSource<Error = TmError>>,
tc_receiver: Box<dyn ReceivesTc<Error = TcError>>, tc_receiver: Box<dyn ReceivesTc<Error = TcError>>,
packet_id_lookup: Box<dyn PacketIdLookup + Send>, packet_id_lookup: Box<dyn PacketIdLookup + Send>,
) -> Result<Self, TcpTmtcError<TmError, TcError>> { ) -> Result<Self, std::io::Error> {
Ok(Self { Ok(Self {
generic_server: TcpTmtcGenericServer::new( generic_server: TcpTmtcGenericServer::new(
cfg, cfg,

View File

@ -19,9 +19,9 @@ num_enum = "0.7"
thiserror = "1" thiserror = "1"
[dependencies.satrs-core] [dependencies.satrs-core]
# version = "0.1.0-alpha.0" # version = "0.1.0-alpha.1"
path = "../satrs-core" path = "../satrs-core"
[dependencies.satrs-mib] [dependencies.satrs-mib]
path = "../satrs-mib" version = "0.1.0-alpha.1"
# path = "../satrs-mib"

View File

@ -17,7 +17,7 @@ cargo run --bin simpleclient
This repository also contains a more complex client using the This repository also contains a more complex client using the
[Python tmtccmd](https://github.com/robamu-org/tmtccmd) module. [Python tmtccmd](https://github.com/robamu-org/tmtccmd) module.
# Using the tmtccmd Python client # <a id="tmtccmd"></a> Using the tmtccmd Python client
The python client requires a valid installation of the The python client requires a valid installation of the
[tmtccmd package](https://github.com/robamu-org/tmtccmd). [tmtccmd package](https://github.com/robamu-org/tmtccmd).
@ -51,3 +51,26 @@ the `simpleclient`:
You can also simply call the script without any arguments to view a list of services (`-s` flag) You can also simply call the script without any arguments to view a list of services (`-s` flag)
and corresponding op codes (`-o` flag) for each service. and corresponding op codes (`-o` flag) for each service.
# Structure of the example project
The example project contains components which could also be expected to be part of a production
On-Board Software.
1. A UDP and TCP server to receive telecommands and poll telemetry from. This might be an optional
component for an OBSW which is only used during the development phase on ground. The TCP
server parses space packets by using the CCSDS space packet ID as the packet start delimiter.
2. A PUS service stack which exposes some functionality conformant with the ECSS PUS service. This
currently includes the following services:
- Service 1 for telecommand verification.
- Service 3 for housekeeping telemetry handling.
- Service 5 for management and downlink of on-board events.
- Service 8 for handling on-board actions.
- Service 11 for scheduling telecommands to be released at a specific time.
- Service 17 for test purposes (pings)
3. An event manager component which handles the event IPC mechanism.
4. A TC source component which demultiplexes and routes telecommands based on parameters like
packet APID or PUS service and subservice type.
5. A TM sink sink component which is the target of all sent telemetry and sends it to downlink
handlers like the UDP and TCP server.
6. An AOCS example task which can also process some PUS commands.

View File

@ -4,7 +4,11 @@ import dataclasses
import enum import enum
import struct import struct
from spacepackets.ecss.tc import PacketId, PacketType
EXAMPLE_PUS_APID = 0x02 EXAMPLE_PUS_APID = 0x02
EXAMPLE_PUS_PACKET_ID_TM = PacketId(PacketType.TM, True, EXAMPLE_PUS_APID)
TM_PACKET_IDS = [EXAMPLE_PUS_PACKET_ID_TM]
class EventSeverity(enum.IntEnum): class EventSeverity(enum.IntEnum):

View File

@ -45,7 +45,7 @@ from tmtccmd.util.obj_id import ObjectIdDictT
import pus_tc import pus_tc
import tc_definitions import tc_definitions
from common import EXAMPLE_PUS_APID, EventU32 from common import EXAMPLE_PUS_APID, TM_PACKET_IDS, EventU32
_LOGGER = logging.getLogger() _LOGGER = logging.getLogger()
@ -63,7 +63,7 @@ class SatRsConfigHook(HookBase):
cfg = create_com_interface_cfg_default( cfg = create_com_interface_cfg_default(
com_if_key=com_if_key, com_if_key=com_if_key,
json_cfg_path=self.cfg_path, json_cfg_path=self.cfg_path,
space_packet_ids=None, space_packet_ids=TM_PACKET_IDS,
) )
return create_com_interface_default(cfg) return create_com_interface_default(cfg)

View File

@ -1,2 +1,2 @@
tmtccmd == 5.0.0rc0 tmtccmd == 6.0.0
# -e git+https://github.com/robamu-org/tmtccmd@97e5e51101a08b21472b3ddecc2063359f7e307a#egg=tmtccmd # -e git+https://github.com/robamu-org/tmtccmd@97e5e51101a08b21472b3ddecc2063359f7e307a#egg=tmtccmd

View File

@ -1,6 +1,8 @@
{ {
"com_if": "udp", "com_if": "tcp",
"tcpip_udp_ip_addr": "127.0.0.1", "tcpip_udp_ip_addr": "127.0.0.1",
"tcpip_udp_port": 7301, "tcpip_udp_port": 7301,
"tcpip_udp_recv_max_size": 1500 "tcpip_udp_recv_max_size": 1500,
"tcpip_tcp_ip_addr": "127.0.0.1",
"tcpip_tcp_port": 7301
} }

View File

@ -3,6 +3,7 @@ use satrs_core::spacepackets::{CcsdsPacket, SpHeader};
use satrs_core::tmtc::{CcsdsPacketHandler, ReceivesCcsdsTc}; use satrs_core::tmtc::{CcsdsPacketHandler, ReceivesCcsdsTc};
use satrs_example::PUS_APID; use satrs_example::PUS_APID;
#[derive(Clone)]
pub struct CcsdsReceiver { pub struct CcsdsReceiver {
pub tc_source: PusTcSource, pub tc_source: PusTcSource,
} }

View File

@ -3,10 +3,15 @@ mod hk;
mod logging; mod logging;
mod pus; mod pus;
mod requests; mod requests;
mod tcp;
mod tmtc; mod tmtc;
mod udp;
use log::{info, warn}; use log::{info, warn};
use satrs_core::hal::std::tcp_server::ServerConfig;
use satrs_core::hal::std::udp_server::UdpTcServer;
use crate::ccsds::CcsdsReceiver;
use crate::hk::AcsHkIds; use crate::hk::AcsHkIds;
use crate::logging::setup_logger; use crate::logging::setup_logger;
use crate::pus::action::{Pus8Wrapper, PusService8ActionHandler}; use crate::pus::action::{Pus8Wrapper, PusService8ActionHandler};
@ -14,9 +19,11 @@ use crate::pus::event::Pus5Wrapper;
use crate::pus::hk::{Pus3Wrapper, PusService3HkHandler}; use crate::pus::hk::{Pus3Wrapper, PusService3HkHandler};
use crate::pus::scheduler::Pus11Wrapper; use crate::pus::scheduler::Pus11Wrapper;
use crate::pus::test::Service17CustomWrapper; use crate::pus::test::Service17CustomWrapper;
use crate::pus::PusTcMpscRouter; use crate::pus::{PusReceiver, PusTcMpscRouter};
use crate::requests::{Request, RequestWithToken}; use crate::requests::{Request, RequestWithToken};
use crate::tmtc::{core_tmtc_task, PusTcSource, TcArgs, TcStore, TmArgs, TmFunnel}; use crate::tcp::{SyncTcpTmSource, TcpTask};
use crate::tmtc::{PusTcSource, TcArgs, TcStore, TmArgs, TmFunnel, TmtcTask};
use crate::udp::UdpTmtcServer;
use satrs_core::event_man::{ use satrs_core::event_man::{
EventManagerWithMpscQueue, MpscEventReceiver, MpscEventU32SendProvider, SendEventProvider, EventManagerWithMpscQueue, MpscEventReceiver, MpscEventU32SendProvider, SendEventProvider,
}; };
@ -43,7 +50,7 @@ use satrs_core::spacepackets::{
SpHeader, SpHeader,
}; };
use satrs_core::tmtc::tm_helper::SharedTmStore; use satrs_core::tmtc::tm_helper::SharedTmStore;
use satrs_core::tmtc::{AddressableId, TargetId}; use satrs_core::tmtc::{AddressableId, CcsdsDistributor, TargetId};
use satrs_core::ChannelId; use satrs_core::ChannelId;
use satrs_example::{ use satrs_example::{
RequestTargetId, TcReceiverId, TmSenderId, OBSW_SERVER_ADDR, PUS_APID, SERVER_PORT, RequestTargetId, TcReceiverId, TmSenderId, OBSW_SERVER_ADDR, PUS_APID, SERVER_PORT,
@ -139,7 +146,7 @@ fn main() {
let tm_args = TmArgs { let tm_args = TmArgs {
tm_store: shared_tm_store.clone(), tm_store: shared_tm_store.clone(),
tm_sink_sender: tm_funnel_tx.clone(), tm_sink_sender: tm_funnel_tx.clone(),
tm_server_rx, tm_udp_server_rx: tm_server_rx,
}; };
let aocs_tm_funnel = tm_funnel_tx.clone(); let aocs_tm_funnel = tm_funnel_tx.clone();
@ -266,11 +273,50 @@ fn main() {
); );
let mut pus_3_wrapper = Pus3Wrapper { pus_3_handler }; let mut pus_3_wrapper = Pus3Wrapper { pus_3_handler };
info!("Starting TMTC task"); let ccsds_receiver = CcsdsReceiver {
let jh0 = thread::Builder::new() tc_source: tc_args.tc_source.clone(),
.name("TMTC".to_string()) };
let mut tmtc_task = TmtcTask::new(tc_args, PusReceiver::new(verif_reporter, pus_router));
let udp_ccsds_distributor = CcsdsDistributor::new(Box::new(ccsds_receiver.clone()));
let udp_tc_server = UdpTcServer::new(sock_addr, 2048, Box::new(udp_ccsds_distributor))
.expect("creating UDP TMTC server failed");
let mut udp_tmtc_server = UdpTmtcServer {
udp_tc_server,
tm_rx: tm_args.tm_udp_server_rx,
tm_store: tm_args.tm_store.clone_backing_pool(),
};
info!("Starting TMTC and UDP task");
let jh_udp_tmtc = thread::Builder::new()
.name("TMTC and UDP".to_string())
.spawn(move || { .spawn(move || {
core_tmtc_task(sock_addr, tc_args, tm_args, verif_reporter, pus_router); info!("Running UDP server on port {SERVER_PORT}");
loop {
udp_tmtc_server.periodic_operation();
tmtc_task.periodic_operation();
thread::sleep(Duration::from_millis(400));
}
})
.unwrap();
let tcp_ccsds_distributor = CcsdsDistributor::new(Box::new(ccsds_receiver));
let tcp_server_cfg = ServerConfig::new(sock_addr, Duration::from_millis(400), 4096, 8192);
let mut sync_tm_tcp_source = SyncTcpTmSource::new(200);
let mut tcp_server = TcpTask::new(
tcp_server_cfg,
sync_tm_tcp_source.clone(),
tcp_ccsds_distributor,
)
.expect("tcp server creation failed");
info!("Starting TCP task");
let jh_tcp = thread::Builder::new()
.name("TCP".to_string())
.spawn(move || {
info!("Running TCP server on port {SERVER_PORT}");
loop {
tcp_server.periodic_operation();
}
}) })
.unwrap(); .unwrap();
@ -311,6 +357,7 @@ fn main() {
.tm_server_tx .tm_server_tx
.send(addr) .send(addr)
.expect("Sending TM to server failed"); .expect("Sending TM to server failed");
sync_tm_tcp_source.add_tm(tm_raw);
} }
} }
}) })
@ -382,6 +429,7 @@ fn main() {
let mut timestamp: [u8; 7] = [0; 7]; let mut timestamp: [u8; 7] = [0; 7];
let mut time_provider = TimeProvider::new_with_u16_days(0, 0); let mut time_provider = TimeProvider::new_with_u16_days(0, 0);
loop { loop {
// TODO: Move this into a separate function/task/module..
match acs_thread_rx.try_recv() { match acs_thread_rx.try_recv() {
Ok(request) => { Ok(request) => {
info!( info!(
@ -481,7 +529,12 @@ fn main() {
thread::sleep(Duration::from_millis(200)); thread::sleep(Duration::from_millis(200));
}) })
.unwrap(); .unwrap();
jh0.join().expect("Joining UDP TMTC server thread failed"); jh_udp_tmtc
.join()
.expect("Joining UDP TMTC server thread failed");
jh_tcp
.join()
.expect("Joining TCP TMTC server thread failed");
jh1.join().expect("Joining TM Funnel thread failed"); jh1.join().expect("Joining TM Funnel thread failed");
jh2.join().expect("Joining Event Manager thread failed"); jh2.join().expect("Joining Event Manager thread failed");
jh3.join().expect("Joining AOCS thread failed"); jh3.join().expect("Joining AOCS thread failed");

110
satrs-example/src/tcp.rs Normal file
View File

@ -0,0 +1,110 @@
use std::{
collections::VecDeque,
sync::{Arc, Mutex},
};
use log::{info, warn};
use satrs_core::{
hal::std::tcp_server::{ServerConfig, TcpSpacepacketsServer},
spacepackets::PacketId,
tmtc::{CcsdsDistributor, CcsdsError, TmPacketSourceCore},
};
use satrs_example::PUS_APID;
use crate::tmtc::MpscStoreAndSendError;
pub const PACKET_ID_LOOKUP: &[PacketId] = &[PacketId::const_tc(true, PUS_APID)];
#[derive(Default, Clone)]
pub struct SyncTcpTmSource {
tm_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
max_packets_stored: usize,
pub silent_packet_overwrite: bool,
}
impl SyncTcpTmSource {
pub fn new(max_packets_stored: usize) -> Self {
Self {
tm_queue: Arc::default(),
max_packets_stored,
silent_packet_overwrite: true,
}
}
pub fn add_tm(&mut self, tm: &[u8]) {
let mut tm_queue = self.tm_queue.lock().expect("locking tm queue failec");
if tm_queue.len() > self.max_packets_stored {
if !self.silent_packet_overwrite {
warn!("TPC TM source is full, deleting oldest packet");
}
tm_queue.pop_front();
}
tm_queue.push_back(tm.to_vec());
}
}
impl TmPacketSourceCore for SyncTcpTmSource {
type Error = ();
fn retrieve_packet(&mut self, buffer: &mut [u8]) -> Result<usize, Self::Error> {
let mut tm_queue = self.tm_queue.lock().expect("locking tm queue failed");
if !tm_queue.is_empty() {
let next_vec = tm_queue.front().unwrap();
if buffer.len() < next_vec.len() {
panic!(
"provided buffer too small, must be at least {} bytes",
next_vec.len()
);
}
let next_vec = tm_queue.pop_front().unwrap();
buffer[0..next_vec.len()].copy_from_slice(&next_vec);
if next_vec.len() > 9 {
let service = next_vec[7];
let subservice = next_vec[8];
info!("Sending PUS TM[{service},{subservice}]")
} else {
info!("Sending PUS TM");
}
return Ok(next_vec.len());
}
Ok(0)
}
}
pub struct TcpTask {
server: TcpSpacepacketsServer<(), CcsdsError<MpscStoreAndSendError>>,
}
impl TcpTask {
pub fn new(
cfg: ServerConfig,
tm_source: SyncTcpTmSource,
tc_receiver: CcsdsDistributor<MpscStoreAndSendError>,
) -> Result<Self, std::io::Error> {
Ok(Self {
server: TcpSpacepacketsServer::new(
cfg,
Box::new(tm_source),
Box::new(tc_receiver),
Box::new(PACKET_ID_LOOKUP),
)?,
})
}
pub fn periodic_operation(&mut self) {
loop {
let result = self.server.handle_next_connection();
match result {
Ok(conn_result) => {
info!(
"Served {} TMs and {} TCs for client {:?}",
conn_result.num_sent_tms, conn_result.num_received_tcs, conn_result.addr
);
}
Err(e) => {
warn!("TCP server error: {e:?}");
}
}
}
}
}

View File

@ -1,26 +1,21 @@
use log::{info, warn}; use log::warn;
use satrs_core::hal::std::udp_server::{ReceiveResult, UdpTcServer}; use satrs_core::pus::ReceivesEcssPusTc;
use std::net::SocketAddr; use satrs_core::spacepackets::SpHeader;
use std::sync::mpsc::{Receiver, SendError, Sender, TryRecvError}; use std::sync::mpsc::{Receiver, SendError, Sender, TryRecvError};
use std::thread;
use std::time::Duration;
use thiserror::Error; use thiserror::Error;
use crate::ccsds::CcsdsReceiver; use crate::pus::PusReceiver;
use crate::pus::{PusReceiver, PusTcMpscRouter};
use satrs_core::pool::{SharedPool, StoreAddr, StoreError}; use satrs_core::pool::{SharedPool, StoreAddr, StoreError};
use satrs_core::pus::verification::StdVerifReporterWithSender; use satrs_core::pus::TcAddrWithToken;
use satrs_core::pus::{ReceivesEcssPusTc, TcAddrWithToken};
use satrs_core::spacepackets::ecss::tc::PusTcReader; use satrs_core::spacepackets::ecss::tc::PusTcReader;
use satrs_core::spacepackets::ecss::PusPacket; use satrs_core::spacepackets::ecss::PusPacket;
use satrs_core::spacepackets::SpHeader;
use satrs_core::tmtc::tm_helper::SharedTmStore; use satrs_core::tmtc::tm_helper::SharedTmStore;
use satrs_core::tmtc::{CcsdsDistributor, CcsdsError, ReceivesCcsdsTc}; use satrs_core::tmtc::ReceivesCcsdsTc;
pub struct TmArgs { pub struct TmArgs {
pub tm_store: SharedTmStore, pub tm_store: SharedTmStore,
pub tm_sink_sender: Sender<StoreAddr>, pub tm_sink_sender: Sender<StoreAddr>,
pub tm_server_rx: Receiver<StoreAddr>, pub tm_udp_server_rx: Receiver<StoreAddr>,
} }
pub struct TcArgs { pub struct TcArgs {
@ -64,12 +59,6 @@ pub struct TmFunnel {
pub tm_server_tx: Sender<StoreAddr>, pub tm_server_tx: Sender<StoreAddr>,
} }
pub struct UdpTmtcServer {
udp_tc_server: UdpTcServer<CcsdsError<MpscStoreAndSendError>>,
tm_rx: Receiver<StoreAddr>,
tm_store: SharedPool,
}
#[derive(Clone)] #[derive(Clone)]
pub struct PusTcSource { pub struct PusTcSource {
pub tc_source: Sender<StoreAddr>, pub tc_source: Sender<StoreAddr>,
@ -98,131 +87,60 @@ impl ReceivesCcsdsTc for PusTcSource {
} }
} }
pub fn core_tmtc_task( pub struct TmtcTask {
socket_addr: SocketAddr, tc_args: TcArgs,
mut tc_args: TcArgs, tc_buf: [u8; 4096],
tm_args: TmArgs, pus_receiver: PusReceiver,
verif_reporter: StdVerifReporterWithSender,
pus_router: PusTcMpscRouter,
) {
let mut pus_receiver = PusReceiver::new(verif_reporter, pus_router);
let ccsds_receiver = CcsdsReceiver {
tc_source: tc_args.tc_source.clone(),
};
let ccsds_distributor = CcsdsDistributor::new(Box::new(ccsds_receiver));
let udp_tc_server = UdpTcServer::new(socket_addr, 2048, Box::new(ccsds_distributor))
.expect("creating UDP TMTC server failed");
let mut udp_tmtc_server = UdpTmtcServer {
udp_tc_server,
tm_rx: tm_args.tm_server_rx,
tm_store: tm_args.tm_store.clone_backing_pool(),
};
let mut tc_buf: [u8; 4096] = [0; 4096];
loop {
core_tmtc_loop(
&mut udp_tmtc_server,
&mut tc_args,
&mut tc_buf,
&mut pus_receiver,
);
thread::sleep(Duration::from_millis(400));
}
} }
fn core_tmtc_loop( impl TmtcTask {
udp_tmtc_server: &mut UdpTmtcServer, pub fn new(tc_args: TcArgs, pus_receiver: PusReceiver) -> Self {
tc_args: &mut TcArgs, Self {
tc_buf: &mut [u8], tc_args,
pus_receiver: &mut PusReceiver, tc_buf: [0; 4096],
) { pus_receiver,
while poll_tc_server(udp_tmtc_server) {} }
match tc_args.tc_receiver.try_recv() { }
pub fn periodic_operation(&mut self) {
//while self.poll_tc() {}
self.poll_tc();
}
pub fn poll_tc(&mut self) -> bool {
match self.tc_args.tc_receiver.try_recv() {
Ok(addr) => { Ok(addr) => {
let pool = tc_args let pool = self
.tc_args
.tc_source .tc_source
.tc_store .tc_store
.pool .pool
.read() .read()
.expect("locking tc pool failed"); .expect("locking tc pool failed");
let data = pool.read(&addr).expect("reading pool failed"); let data = pool.read(&addr).expect("reading pool failed");
tc_buf[0..data.len()].copy_from_slice(data); self.tc_buf[0..data.len()].copy_from_slice(data);
drop(pool); drop(pool);
match PusTcReader::new(tc_buf) { match PusTcReader::new(&self.tc_buf) {
Ok((pus_tc, _)) => { Ok((pus_tc, _)) => {
pus_receiver self.pus_receiver
.handle_tc_packet(addr, pus_tc.service(), &pus_tc) .handle_tc_packet(addr, pus_tc.service(), &pus_tc)
.ok(); .ok();
true
} }
Err(e) => { Err(e) => {
warn!("error creating PUS TC from raw data: {e}"); warn!("error creating PUS TC from raw data: {e}");
warn!("raw data: {tc_buf:x?}"); warn!("raw data: {:x?}", self.tc_buf);
true
} }
} }
} }
Err(e) => {
if let TryRecvError::Disconnected = e {
warn!("tmtc thread: sender disconnected")
}
}
}
if let Some(recv_addr) = udp_tmtc_server.udp_tc_server.last_sender() {
core_tm_handling(udp_tmtc_server, &recv_addr);
}
}
fn poll_tc_server(udp_tmtc_server: &mut UdpTmtcServer) -> bool {
match udp_tmtc_server.udp_tc_server.try_recv_tc() {
Ok(_) => true,
Err(e) => match e { Err(e) => match e {
ReceiveResult::ReceiverError(e) => match e { TryRecvError::Empty => false,
CcsdsError::ByteConversionError(e) => { TryRecvError::Disconnected => {
warn!("packet error: {e:?}"); warn!("tmtc thread: sender disconnected");
true
}
CcsdsError::CustomError(e) => {
warn!("mpsc store and send error {e:?}");
true
}
},
ReceiveResult::IoError(e) => {
warn!("IO error {e}");
false false
} }
ReceiveResult::NothingReceived => false,
}, },
} }
}
fn core_tm_handling(udp_tmtc_server: &mut UdpTmtcServer, recv_addr: &SocketAddr) {
while let Ok(addr) = udp_tmtc_server.tm_rx.try_recv() {
let store_lock = udp_tmtc_server.tm_store.write();
if store_lock.is_err() {
warn!("Locking TM store failed");
continue;
}
let mut store_lock = store_lock.unwrap();
let pg = store_lock.read_with_guard(addr);
let read_res = pg.read();
if read_res.is_err() {
warn!("Error reading TM pool data");
continue;
}
let buf = read_res.unwrap();
if buf.len() > 9 {
let service = buf[7];
let subservice = buf[8];
info!("Sending PUS TM[{service},{subservice}]")
} else {
info!("Sending PUS TM");
}
let result = udp_tmtc_server.udp_tc_server.socket.send_to(buf, recv_addr);
if let Err(e) = result {
warn!("Sending TM with UDP socket failed: {e}")
}
} }
} }

76
satrs-example/src/udp.rs Normal file
View File

@ -0,0 +1,76 @@
use std::{net::SocketAddr, sync::mpsc::Receiver};
use log::{info, warn};
use satrs_core::{
hal::std::udp_server::{ReceiveResult, UdpTcServer},
pool::{SharedPool, StoreAddr},
tmtc::CcsdsError,
};
use crate::tmtc::MpscStoreAndSendError;
pub struct UdpTmtcServer {
pub udp_tc_server: UdpTcServer<CcsdsError<MpscStoreAndSendError>>,
pub tm_rx: Receiver<StoreAddr>,
pub tm_store: SharedPool,
}
impl UdpTmtcServer {
pub fn periodic_operation(&mut self) {
while self.poll_tc_server() {}
if let Some(recv_addr) = self.udp_tc_server.last_sender() {
self.send_tm_to_udp_client(&recv_addr);
}
}
fn poll_tc_server(&mut self) -> bool {
match self.udp_tc_server.try_recv_tc() {
Ok(_) => true,
Err(e) => match e {
ReceiveResult::ReceiverError(e) => match e {
CcsdsError::ByteConversionError(e) => {
warn!("packet error: {e:?}");
true
}
CcsdsError::CustomError(e) => {
warn!("mpsc store and send error {e:?}");
true
}
},
ReceiveResult::IoError(e) => {
warn!("IO error {e}");
false
}
ReceiveResult::NothingReceived => false,
},
}
}
fn send_tm_to_udp_client(&mut self, recv_addr: &SocketAddr) {
while let Ok(addr) = self.tm_rx.try_recv() {
let store_lock = self.tm_store.write();
if store_lock.is_err() {
warn!("Locking TM store failed");
continue;
}
let mut store_lock = store_lock.unwrap();
let pg = store_lock.read_with_guard(addr);
let read_res = pg.read();
if read_res.is_err() {
warn!("Error reading TM pool data");
continue;
}
let buf = read_res.unwrap();
if buf.len() > 9 {
let service = buf[7];
let subservice = buf[8];
info!("Sending PUS TM[{service},{subservice}]")
} else {
info!("Sending PUS TM");
}
let result = self.udp_tc_server.socket.send_to(buf, recv_addr);
if let Err(e) = result {
warn!("Sending TM with UDP socket failed: {e}")
}
}
}
}