why is this so problematic..

This commit is contained in:
Robin Müller 2023-09-27 00:21:03 +02:00
parent 7ca8d52368
commit 5a3b9fb46b
Signed by: muellerr
GPG Key ID: FCE0B2BD2195142F
3 changed files with 134 additions and 128 deletions

View File

@ -1,2 +1,2 @@
tmtccmd == 5.0.0rc0 tmtccmd == 6.0.0
# -e git+https://github.com/robamu-org/tmtccmd@97e5e51101a08b21472b3ddecc2063359f7e307a#egg=tmtccmd # -e git+https://github.com/robamu-org/tmtccmd@97e5e51101a08b21472b3ddecc2063359f7e307a#egg=tmtccmd

View File

@ -6,7 +6,9 @@ mod requests;
mod tmtc; mod tmtc;
use log::{info, warn}; use log::{info, warn};
use satrs_core::hal::std::udp_server::UdpTcServer;
use crate::ccsds::CcsdsReceiver;
use crate::hk::AcsHkIds; use crate::hk::AcsHkIds;
use crate::logging::setup_logger; use crate::logging::setup_logger;
use crate::pus::action::{Pus8Wrapper, PusService8ActionHandler}; use crate::pus::action::{Pus8Wrapper, PusService8ActionHandler};
@ -14,9 +16,9 @@ use crate::pus::event::Pus5Wrapper;
use crate::pus::hk::{Pus3Wrapper, PusService3HkHandler}; use crate::pus::hk::{Pus3Wrapper, PusService3HkHandler};
use crate::pus::scheduler::Pus11Wrapper; use crate::pus::scheduler::Pus11Wrapper;
use crate::pus::test::Service17CustomWrapper; use crate::pus::test::Service17CustomWrapper;
use crate::pus::PusTcMpscRouter; use crate::pus::{PusReceiver, PusTcMpscRouter};
use crate::requests::{Request, RequestWithToken}; use crate::requests::{Request, RequestWithToken};
use crate::tmtc::{core_tmtc_task, PusTcSource, TcArgs, TcStore, TmArgs, TmFunnel}; use crate::tmtc::{PusTcSource, TcArgs, TcStore, TmArgs, TmFunnel, TmtcTask, UdpTmtcServer};
use satrs_core::event_man::{ use satrs_core::event_man::{
EventManagerWithMpscQueue, MpscEventReceiver, MpscEventU32SendProvider, SendEventProvider, EventManagerWithMpscQueue, MpscEventReceiver, MpscEventU32SendProvider, SendEventProvider,
}; };
@ -43,7 +45,7 @@ use satrs_core::spacepackets::{
SpHeader, SpHeader,
}; };
use satrs_core::tmtc::tm_helper::SharedTmStore; use satrs_core::tmtc::tm_helper::SharedTmStore;
use satrs_core::tmtc::{AddressableId, TargetId}; use satrs_core::tmtc::{AddressableId, CcsdsDistributor, TargetId};
use satrs_core::ChannelId; use satrs_core::ChannelId;
use satrs_example::{ use satrs_example::{
RequestTargetId, TcReceiverId, TmSenderId, OBSW_SERVER_ADDR, PUS_APID, SERVER_PORT, RequestTargetId, TcReceiverId, TmSenderId, OBSW_SERVER_ADDR, PUS_APID, SERVER_PORT,
@ -266,11 +268,27 @@ fn main() {
); );
let mut pus_3_wrapper = Pus3Wrapper { pus_3_handler }; let mut pus_3_wrapper = Pus3Wrapper { pus_3_handler };
let ccsds_receiver = CcsdsReceiver {
tc_source: tc_args.tc_source.clone(),
};
let mut tmtc_task = TmtcTask::new(tc_args, PusReceiver::new(verif_reporter, pus_router));
let ccsds_distributor = CcsdsDistributor::new(Box::new(ccsds_receiver));
let udp_tc_server = UdpTcServer::new(sock_addr, 2048, Box::new(ccsds_distributor))
.expect("creating UDP TMTC server failed");
let mut udp_tmtc_server = UdpTmtcServer {
udp_tc_server,
tm_rx: tm_args.tm_server_rx,
tm_store: tm_args.tm_store.clone_backing_pool(),
};
info!("Starting TMTC task"); info!("Starting TMTC task");
let jh0 = thread::Builder::new() let jh0 = thread::Builder::new()
.name("TMTC".to_string()) .name("TMTC".to_string())
.spawn(move || { .spawn(move || {
core_tmtc_task(sock_addr, tc_args, tm_args, verif_reporter, pus_router); udp_tmtc_server.periodic_operation();
tmtc_task.periodic_operation();
thread::sleep(Duration::from_millis(400));
}) })
.unwrap(); .unwrap();
@ -382,6 +400,7 @@ fn main() {
let mut timestamp: [u8; 7] = [0; 7]; let mut timestamp: [u8; 7] = [0; 7];
let mut time_provider = TimeProvider::new_with_u16_days(0, 0); let mut time_provider = TimeProvider::new_with_u16_days(0, 0);
loop { loop {
// TODO: Move this into a separate thread..
match acs_thread_rx.try_recv() { match acs_thread_rx.try_recv() {
Ok(request) => { Ok(request) => {
info!( info!(

View File

@ -1,21 +1,18 @@
use log::{info, warn}; use log::{info, warn};
use satrs_core::hal::std::udp_server::{ReceiveResult, UdpTcServer}; use satrs_core::hal::std::udp_server::{ReceiveResult, UdpTcServer};
use satrs_core::pus::ReceivesEcssPusTc;
use satrs_core::spacepackets::SpHeader;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::mpsc::{Receiver, SendError, Sender, TryRecvError}; use std::sync::mpsc::{Receiver, SendError, Sender, TryRecvError};
use std::thread;
use std::time::Duration;
use thiserror::Error; use thiserror::Error;
use crate::ccsds::CcsdsReceiver; use crate::pus::PusReceiver;
use crate::pus::{PusReceiver, PusTcMpscRouter};
use satrs_core::pool::{SharedPool, StoreAddr, StoreError}; use satrs_core::pool::{SharedPool, StoreAddr, StoreError};
use satrs_core::pus::verification::StdVerifReporterWithSender; use satrs_core::pus::TcAddrWithToken;
use satrs_core::pus::{ReceivesEcssPusTc, TcAddrWithToken};
use satrs_core::spacepackets::ecss::tc::PusTcReader; use satrs_core::spacepackets::ecss::tc::PusTcReader;
use satrs_core::spacepackets::ecss::PusPacket; use satrs_core::spacepackets::ecss::PusPacket;
use satrs_core::spacepackets::SpHeader;
use satrs_core::tmtc::tm_helper::SharedTmStore; use satrs_core::tmtc::tm_helper::SharedTmStore;
use satrs_core::tmtc::{CcsdsDistributor, CcsdsError, ReceivesCcsdsTc}; use satrs_core::tmtc::{CcsdsError, ReceivesCcsdsTc};
pub struct TmArgs { pub struct TmArgs {
pub tm_store: SharedTmStore, pub tm_store: SharedTmStore,
@ -65,9 +62,9 @@ pub struct TmFunnel {
} }
pub struct UdpTmtcServer { pub struct UdpTmtcServer {
udp_tc_server: UdpTcServer<CcsdsError<MpscStoreAndSendError>>, pub udp_tc_server: UdpTcServer<CcsdsError<MpscStoreAndSendError>>,
tm_rx: Receiver<StoreAddr>, pub tm_rx: Receiver<StoreAddr>,
tm_store: SharedPool, pub tm_store: SharedPool,
} }
#[derive(Clone)] #[derive(Clone)]
@ -98,131 +95,121 @@ impl ReceivesCcsdsTc for PusTcSource {
} }
} }
pub fn core_tmtc_task( pub struct TmtcTask {
socket_addr: SocketAddr, tc_args: TcArgs,
mut tc_args: TcArgs, tc_buf: [u8; 4096],
tm_args: TmArgs, pus_receiver: PusReceiver,
verif_reporter: StdVerifReporterWithSender,
pus_router: PusTcMpscRouter,
) {
let mut pus_receiver = PusReceiver::new(verif_reporter, pus_router);
let ccsds_receiver = CcsdsReceiver {
tc_source: tc_args.tc_source.clone(),
};
let ccsds_distributor = CcsdsDistributor::new(Box::new(ccsds_receiver));
let udp_tc_server = UdpTcServer::new(socket_addr, 2048, Box::new(ccsds_distributor))
.expect("creating UDP TMTC server failed");
let mut udp_tmtc_server = UdpTmtcServer {
udp_tc_server,
tm_rx: tm_args.tm_server_rx,
tm_store: tm_args.tm_store.clone_backing_pool(),
};
let mut tc_buf: [u8; 4096] = [0; 4096];
loop {
core_tmtc_loop(
&mut udp_tmtc_server,
&mut tc_args,
&mut tc_buf,
&mut pus_receiver,
);
thread::sleep(Duration::from_millis(400));
}
} }
fn core_tmtc_loop( impl TmtcTask {
udp_tmtc_server: &mut UdpTmtcServer, pub fn new(tc_args: TcArgs, pus_receiver: PusReceiver) -> Self {
tc_args: &mut TcArgs, Self {
tc_buf: &mut [u8], tc_args,
pus_receiver: &mut PusReceiver, tc_buf: [0; 4096],
) { pus_receiver,
while poll_tc_server(udp_tmtc_server) {}
match tc_args.tc_receiver.try_recv() {
Ok(addr) => {
let pool = tc_args
.tc_source
.tc_store
.pool
.read()
.expect("locking tc pool failed");
let data = pool.read(&addr).expect("reading pool failed");
tc_buf[0..data.len()].copy_from_slice(data);
drop(pool);
match PusTcReader::new(tc_buf) {
Ok((pus_tc, _)) => {
pus_receiver
.handle_tc_packet(addr, pus_tc.service(), &pus_tc)
.ok();
}
Err(e) => {
warn!("error creating PUS TC from raw data: {e}");
warn!("raw data: {tc_buf:x?}");
}
}
}
Err(e) => {
if let TryRecvError::Disconnected = e {
warn!("tmtc thread: sender disconnected")
}
} }
} }
if let Some(recv_addr) = udp_tmtc_server.udp_tc_server.last_sender() {
core_tm_handling(udp_tmtc_server, &recv_addr);
}
}
fn poll_tc_server(udp_tmtc_server: &mut UdpTmtcServer) -> bool { pub fn periodic_operation(&mut self) {
match udp_tmtc_server.udp_tc_server.try_recv_tc() { //while self.poll_tc() {}
Ok(_) => true, self.poll_tc();
Err(e) => match e { }
ReceiveResult::ReceiverError(e) => match e {
CcsdsError::ByteConversionError(e) => { pub fn poll_tc(&mut self) -> bool {
warn!("packet error: {e:?}"); match self.tc_args.tc_receiver.try_recv() {
true Ok(addr) => {
let pool = self
.tc_args
.tc_source
.tc_store
.pool
.read()
.expect("locking tc pool failed");
let data = pool.read(&addr).expect("reading pool failed");
self.tc_buf[0..data.len()].copy_from_slice(data);
drop(pool);
match PusTcReader::new(&self.tc_buf) {
Ok((pus_tc, _)) => {
self.pus_receiver
.handle_tc_packet(addr, pus_tc.service(), &pus_tc)
.ok();
true
}
Err(e) => {
warn!("error creating PUS TC from raw data: {e}");
warn!("raw data: {:x?}", self.tc_buf);
true
}
} }
CcsdsError::CustomError(e) => { }
warn!("mpsc store and send error {e:?}"); Err(e) => match e {
true TryRecvError::Empty => false,
TryRecvError::Disconnected => {
warn!("tmtc thread: sender disconnected");
false
} }
}, },
ReceiveResult::IoError(e) => { }
warn!("IO error {e}");
false
}
ReceiveResult::NothingReceived => false,
},
} }
} }
fn core_tm_handling(udp_tmtc_server: &mut UdpTmtcServer, recv_addr: &SocketAddr) { impl UdpTmtcServer {
while let Ok(addr) = udp_tmtc_server.tm_rx.try_recv() { pub fn periodic_operation(&mut self) {
let store_lock = udp_tmtc_server.tm_store.write(); while self.poll_tc_server() {}
if store_lock.is_err() { if let Some(recv_addr) = self.udp_tc_server.last_sender() {
warn!("Locking TM store failed"); self.send_tm_to_udp_client(&recv_addr);
continue;
} }
let mut store_lock = store_lock.unwrap(); }
let pg = store_lock.read_with_guard(addr);
let read_res = pg.read(); fn poll_tc_server(&mut self) -> bool {
if read_res.is_err() { match self.udp_tc_server.try_recv_tc() {
warn!("Error reading TM pool data"); Ok(_) => true,
continue; Err(e) => match e {
ReceiveResult::ReceiverError(e) => match e {
CcsdsError::ByteConversionError(e) => {
warn!("packet error: {e:?}");
true
}
CcsdsError::CustomError(e) => {
warn!("mpsc store and send error {e:?}");
true
}
},
ReceiveResult::IoError(e) => {
warn!("IO error {e}");
false
}
ReceiveResult::NothingReceived => false,
},
} }
let buf = read_res.unwrap(); }
if buf.len() > 9 {
let service = buf[7]; fn send_tm_to_udp_client(&mut self, recv_addr: &SocketAddr) {
let subservice = buf[8]; while let Ok(addr) = self.tm_rx.try_recv() {
info!("Sending PUS TM[{service},{subservice}]") let store_lock = self.tm_store.write();
} else { if store_lock.is_err() {
info!("Sending PUS TM"); warn!("Locking TM store failed");
} continue;
let result = udp_tmtc_server.udp_tc_server.socket.send_to(buf, recv_addr); }
if let Err(e) = result { let mut store_lock = store_lock.unwrap();
warn!("Sending TM with UDP socket failed: {e}") let pg = store_lock.read_with_guard(addr);
let read_res = pg.read();
if read_res.is_err() {
warn!("Error reading TM pool data");
continue;
}
let buf = read_res.unwrap();
if buf.len() > 9 {
let service = buf[7];
let subservice = buf[8];
info!("Sending PUS TM[{service},{subservice}]")
} else {
info!("Sending PUS TM");
}
let result = self.udp_tc_server.socket.send_to(buf, recv_addr);
if let Err(e) = result {
warn!("Sending TM with UDP socket failed: {e}")
}
} }
} }
} }