From 56c8fa3dfcb114a6c2796e319f05b8c5fb3155ce Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Mon, 29 Aug 2022 01:33:32 +0200 Subject: [PATCH] obsw is multi-file bin now --- fsrc-core/src/tmtc/mod.rs | 1 + fsrc-core/src/tmtc/tm_helper.rs | 51 ++++++++ fsrc-example/src/bin/obsw.rs | 208 ------------------------------ fsrc-example/src/bin/obsw/main.rs | 54 ++++++++ fsrc-example/src/bin/obsw/pus.rs | 1 + fsrc-example/src/bin/obsw/tmtc.rs | 193 +++++++++++++++++++++++++++ 6 files changed, 300 insertions(+), 208 deletions(-) create mode 100644 fsrc-core/src/tmtc/tm_helper.rs delete mode 100644 fsrc-example/src/bin/obsw.rs create mode 100644 fsrc-example/src/bin/obsw/main.rs create mode 100644 fsrc-example/src/bin/obsw/pus.rs create mode 100644 fsrc-example/src/bin/obsw/tmtc.rs diff --git a/fsrc-core/src/tmtc/mod.rs b/fsrc-core/src/tmtc/mod.rs index d1d39b5..d9e954a 100644 --- a/fsrc-core/src/tmtc/mod.rs +++ b/fsrc-core/src/tmtc/mod.rs @@ -13,6 +13,7 @@ use spacepackets::SpHeader; pub mod ccsds_distrib; #[cfg(feature = "alloc")] pub mod pus_distrib; +pub mod tm_helper; pub use ccsds_distrib::{CcsdsDistributor, CcsdsError, CcsdsPacketHandler}; pub use pus_distrib::{PusDistributor, PusServiceProvider}; diff --git a/fsrc-core/src/tmtc/tm_helper.rs b/fsrc-core/src/tmtc/tm_helper.rs new file mode 100644 index 0000000..c8581bf --- /dev/null +++ b/fsrc-core/src/tmtc/tm_helper.rs @@ -0,0 +1,51 @@ +use spacepackets::time::{CdsShortTimeProvider, TimeWriter}; +use spacepackets::tm::{PusTm, PusTmSecondaryHeader}; +use spacepackets::SpHeader; + +pub struct PusTmWithCdsShortHelper { + apid: u16, + cds_short_buf: [u8; 7], +} + +impl PusTmWithCdsShortHelper { + pub fn new(apid: u16) -> Self { + Self { + apid, + cds_short_buf: [0; 7], + } + } + + #[cfg(feature = "std")] + pub fn create_pus_tm_timestamp_now<'a>( + &'a mut self, + service: u8, + subservice: u8, + source_data: Option<&'a [u8]>, + ) -> PusTm { + let time_stamp = CdsShortTimeProvider::from_now().unwrap(); + time_stamp.write_to_bytes(&mut self.cds_short_buf).unwrap(); + self.create_pus_tm_common(service, subservice, source_data) + } + + pub fn create_pus_tm_with_stamp<'a>( + &'a mut self, + service: u8, + subservice: u8, + source_data: Option<&'a [u8]>, + stamper: &CdsShortTimeProvider, + ) -> PusTm { + stamper.write_to_bytes(&mut self.cds_short_buf).unwrap(); + self.create_pus_tm_common(service, subservice, source_data) + } + + fn create_pus_tm_common<'a>( + &'a self, + service: u8, + subservice: u8, + source_data: Option<&'a [u8]>, + ) -> PusTm { + let mut reply_header = SpHeader::tm(self.apid, 0, 0).unwrap(); + let tc_header = PusTmSecondaryHeader::new_simple(service, subservice, &self.cds_short_buf); + PusTm::new(&mut reply_header, tc_header, source_data, true) + } +} diff --git a/fsrc-example/src/bin/obsw.rs b/fsrc-example/src/bin/obsw.rs deleted file mode 100644 index 4d607fd..0000000 --- a/fsrc-example/src/bin/obsw.rs +++ /dev/null @@ -1,208 +0,0 @@ -use fsrc_core::hal::host::udp_server::{ReceiveResult, UdpTcServer}; -use fsrc_core::pool::{LocalPool, PoolCfg, StoreAddr}; -use fsrc_core::tmtc::{ - CcsdsDistributor, CcsdsError, CcsdsPacketHandler, PusDistributor, PusServiceProvider, - ReceivesCcsdsTc, -}; -use fsrc_example::{OBSW_SERVER_ADDR, SERVER_PORT}; -use spacepackets::tc::PusTc; -use spacepackets::time::{CdsShortTimeProvider, TimeWriter}; -use spacepackets::tm::{PusTm, PusTmSecondaryHeader}; -use spacepackets::{CcsdsPacket, SpHeader}; -use std::net::{IpAddr, SocketAddr}; -use std::sync::{mpsc, Arc, Mutex}; -use std::thread; -use std::time::Duration; - -const PUS_APID: u16 = 0x02; - -struct CcsdsReceiver { - pus_handler: PusDistributor<()>, -} - -impl CcsdsPacketHandler for CcsdsReceiver { - type Error = (); - - fn valid_apids(&self) -> &'static [u16] { - &[PUS_APID] - } - - fn handle_known_apid( - &mut self, - sp_header: &SpHeader, - tc_raw: &[u8], - ) -> Result<(), Self::Error> { - if sp_header.apid() == PUS_APID { - self.pus_handler - .pass_ccsds(sp_header, tc_raw) - .expect("Handling PUS packet failed"); - } - Ok(()) - } - - fn handle_unknown_apid( - &mut self, - _sp_header: &SpHeader, - _tc_raw: &[u8], - ) -> Result<(), Self::Error> { - println!("Unknown APID detected"); - Ok(()) - } -} - -unsafe impl Send for CcsdsReceiver {} - -struct PusReceiver { - tm_apid: u16, - tm_tx: mpsc::Sender, - tm_store: Arc>, -} - -impl PusServiceProvider for PusReceiver { - type Error = (); - - fn handle_pus_tc_packet( - &mut self, - service: u8, - _header: &SpHeader, - pus_tc: &PusTc, - ) -> Result<(), Self::Error> { - if service == 17 { - println!("Received PUS ping command"); - let raw_data = pus_tc.raw().expect("Could not retrieve raw data"); - println!("Raw data: 0x{raw_data:x?}"); - let mut reply_header = SpHeader::tm(self.tm_apid, 0, 0).unwrap(); - let time_stamp = CdsShortTimeProvider::from_now().unwrap(); - let mut timestamp_buf = [0; 10]; - time_stamp.write_to_bytes(&mut timestamp_buf).unwrap(); - let tc_header = PusTmSecondaryHeader::new_simple(17, 2, ×tamp_buf); - let ping_reply = PusTm::new(&mut reply_header, tc_header, None, true); - let addr = self - .tm_store - .lock() - .expect("Locking TM store failed") - .add_pus_tm(&ping_reply); - self.tm_tx - .send(addr) - .expect("Sending TM to TM funnel failed"); - } - Ok(()) - } -} - -struct TmStore { - pool: LocalPool, -} - -impl TmStore { - fn add_pus_tm(&mut self, pus_tm: &PusTm) -> StoreAddr { - let (addr, buf) = self - .pool - .free_element(pus_tm.len_packed()) - .expect("Store error"); - pus_tm - .write_to(buf) - .expect("Writing PUS TM to store failed"); - addr - } -} - -struct TmFunnel { - tm_funnel_rx: mpsc::Receiver, - tm_server_tx: mpsc::Sender, -} - -struct UdpTmtcServer { - udp_tc_server: UdpTcServer>, - tm_rx: mpsc::Receiver, - tm_store: Arc>, -} - -unsafe impl Send for UdpTmtcServer {} - -fn main() { - let pool_cfg = PoolCfg::new(vec![(8, 32), (4, 64), (2, 128)]); - let tm_pool = LocalPool::new(pool_cfg); - let tm_store = Arc::new(Mutex::new(TmStore { pool: tm_pool })); - let addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT); - let (tm_creator_tx, tm_funnel_rx) = mpsc::channel(); - let (tm_server_tx, tm_server_rx) = mpsc::channel(); - - let jh0 = thread::spawn(move || { - let pus_receiver = PusReceiver { - tm_apid: PUS_APID, - tm_tx: tm_creator_tx.clone(), - tm_store: tm_store.clone(), - }; - let pus_distributor = PusDistributor::new(Box::new(pus_receiver)); - let ccsds_receiver = CcsdsReceiver { - pus_handler: pus_distributor, - }; - let ccsds_distributor = CcsdsDistributor::new(Box::new(ccsds_receiver)); - let udp_tc_server = UdpTcServer::new(addr, 2048, Box::new(ccsds_distributor)) - .expect("Creating UDP TMTC server failed"); - let mut udp_tmtc_server = UdpTmtcServer { - udp_tc_server, - tm_rx: tm_server_rx, - tm_store - }; - loop { - loop { - match udp_tmtc_server.udp_tc_server.try_recv_tc() { - Ok(_) => (), - Err(e) => match e { - ReceiveResult::ReceiverError(e) => match e { - CcsdsError::PacketError(e) => { - println!("Got packet error: {e:?}"); - } - CcsdsError::CustomError(_) => { - println!("Unknown receiver error") - } - }, - ReceiveResult::OtherIoError(e) => { - println!("IO error {e}"); - break; - } - ReceiveResult::WouldBlock => { - if let Some(recv_addr) = udp_tmtc_server.udp_tc_server.last_sender() { - // TODO: Send TM Here - while let Ok(addr) = udp_tmtc_server.tm_rx.try_recv() { - let mut store_lock = udp_tmtc_server - .tm_store - .lock() - .expect("Locking TM store failed"); - let pg = store_lock.pool.read_with_guard(addr); - let buf = pg.read().expect("Error reading TM pool data"); - println!("Sending TM"); - udp_tmtc_server - .udp_tc_server - .socket - .send_to(buf, recv_addr) - .expect("Sending TM failed"); - } - } - break; - } - }, - } - } - thread::sleep(Duration::from_millis(400)); - } - }); - let jh1 = thread::spawn(move || { - let tm_funnel = TmFunnel { - tm_server_tx, - tm_funnel_rx, - }; - loop { - if let Ok(addr) = tm_funnel.tm_funnel_rx.recv() { - tm_funnel - .tm_server_tx - .send(addr) - .expect("Sending TM to server failed"); - } - } - }); - jh0.join().expect("Joining UDP TMTC server thread failed"); - jh1.join().expect("Joining TM Funnel thread failed"); -} diff --git a/fsrc-example/src/bin/obsw/main.rs b/fsrc-example/src/bin/obsw/main.rs new file mode 100644 index 0000000..89e9f2e --- /dev/null +++ b/fsrc-example/src/bin/obsw/main.rs @@ -0,0 +1,54 @@ +mod pus; +mod tmtc; + +use crate::tmtc::{core_tmtc_task, TmStore}; +use fsrc_core::hal::host::udp_server::UdpTcServer; +use fsrc_core::pool::{LocalPool, PoolCfg, StoreAddr}; +use fsrc_core::tmtc::CcsdsError; +use fsrc_example::{OBSW_SERVER_ADDR, SERVER_PORT}; +use std::net::{IpAddr, SocketAddr}; +use std::sync::{mpsc, Arc, Mutex}; +use std::thread; + +struct TmFunnel { + tm_funnel_rx: mpsc::Receiver, + tm_server_tx: mpsc::Sender, +} + +struct UdpTmtcServer { + udp_tc_server: UdpTcServer>, + tm_rx: mpsc::Receiver, + tm_store: Arc>, +} + +unsafe impl Send for UdpTmtcServer {} + +fn main() { + let pool_cfg = PoolCfg::new(vec![(8, 32), (4, 64), (2, 128)]); + let tm_pool = LocalPool::new(pool_cfg); + let tm_store = Arc::new(Mutex::new(TmStore { pool: tm_pool })); + let addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT); + let (tm_creator_tx, tm_funnel_rx) = mpsc::channel(); + let (tm_server_tx, tm_server_rx) = mpsc::channel(); + + let jh0 = thread::spawn(move || { + core_tmtc_task(tm_creator_tx.clone(), tm_server_rx, tm_store.clone(), addr); + }); + + let jh1 = thread::spawn(move || { + let tm_funnel = TmFunnel { + tm_server_tx, + tm_funnel_rx, + }; + loop { + if let Ok(addr) = tm_funnel.tm_funnel_rx.recv() { + tm_funnel + .tm_server_tx + .send(addr) + .expect("Sending TM to server failed"); + } + } + }); + jh0.join().expect("Joining UDP TMTC server thread failed"); + jh1.join().expect("Joining TM Funnel thread failed"); +} diff --git a/fsrc-example/src/bin/obsw/pus.rs b/fsrc-example/src/bin/obsw/pus.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/fsrc-example/src/bin/obsw/pus.rs @@ -0,0 +1 @@ + diff --git a/fsrc-example/src/bin/obsw/tmtc.rs b/fsrc-example/src/bin/obsw/tmtc.rs new file mode 100644 index 0000000..b28cdb4 --- /dev/null +++ b/fsrc-example/src/bin/obsw/tmtc.rs @@ -0,0 +1,193 @@ +use fsrc_core::hal::host::udp_server::{ReceiveResult, UdpTcServer}; +use std::net::SocketAddr; +use std::sync::{mpsc, Arc, Mutex}; +use std::thread; +use std::time::Duration; + +use crate::UdpTmtcServer; +use fsrc_core::pool::{LocalPool, StoreAddr}; +use fsrc_core::tmtc::tm_helper::PusTmWithCdsShortHelper; +use fsrc_core::tmtc::{ + CcsdsDistributor, CcsdsError, CcsdsPacketHandler, PusDistributor, PusServiceProvider, + ReceivesCcsdsTc, +}; +use spacepackets::tc::PusTc; +use spacepackets::tm::PusTm; +use spacepackets::{CcsdsPacket, SpHeader}; + +pub const PUS_APID: u16 = 0x02; + +pub struct TmStore { + pub pool: LocalPool, +} + +impl TmStore { + fn add_pus_tm(&mut self, pus_tm: &PusTm) -> StoreAddr { + let (addr, buf) = self + .pool + .free_element(pus_tm.len_packed()) + .expect("Store error"); + pus_tm + .write_to(buf) + .expect("Writing PUS TM to store failed"); + addr + } +} + +pub struct CcsdsReceiver { + pub pus_handler: PusDistributor<()>, +} + +impl CcsdsPacketHandler for CcsdsReceiver { + type Error = (); + + fn valid_apids(&self) -> &'static [u16] { + &[PUS_APID] + } + + fn handle_known_apid( + &mut self, + sp_header: &SpHeader, + tc_raw: &[u8], + ) -> Result<(), Self::Error> { + if sp_header.apid() == PUS_APID { + self.pus_handler + .pass_ccsds(sp_header, tc_raw) + .expect("Handling PUS packet failed"); + } + Ok(()) + } + + fn handle_unknown_apid( + &mut self, + _sp_header: &SpHeader, + _tc_raw: &[u8], + ) -> Result<(), Self::Error> { + println!("Unknown APID detected"); + Ok(()) + } +} + +unsafe impl Send for CcsdsReceiver {} + +pub struct PusReceiver { + pub tm_helper: PusTmWithCdsShortHelper, + pub tm_tx: mpsc::Sender, + pub tm_store: Arc>, +} + +impl PusReceiver { + pub fn new(apid: u16, tm_tx: mpsc::Sender, tm_store: Arc>) -> Self { + Self { + tm_helper: PusTmWithCdsShortHelper::new(apid), + tm_tx, + tm_store, + } + } +} + +impl PusServiceProvider for PusReceiver { + type Error = (); + + fn handle_pus_tc_packet( + &mut self, + service: u8, + _header: &SpHeader, + pus_tc: &PusTc, + ) -> Result<(), Self::Error> { + if service == 17 { + self.handle_test_service(pus_tc); + } + Ok(()) + } +} + +impl PusReceiver { + fn handle_test_service(&mut self, pus_tc: &PusTc) { + println!("Received PUS ping command"); + let raw_data = pus_tc.raw().expect("Could not retrieve raw data"); + println!("Raw data: 0x{raw_data:x?}"); + println!("Sending ping reply"); + let ping_reply = self.tm_helper.create_pus_tm_timestamp_now(17, 2, None); + let addr = self + .tm_store + .lock() + .expect("Locking TM store failed") + .add_pus_tm(&ping_reply); + self.tm_tx + .send(addr) + .expect("Sending TM to TM funnel failed"); + } +} + +pub fn core_tmtc_task( + tm_creator_tx: mpsc::Sender, + tm_server_rx: mpsc::Receiver, + tm_store: Arc>, + addr: SocketAddr, +) { + let pus_receiver = PusReceiver::new(PUS_APID, tm_creator_tx, tm_store.clone()); + let pus_distributor = PusDistributor::new(Box::new(pus_receiver)); + let ccsds_receiver = CcsdsReceiver { + pus_handler: pus_distributor, + }; + let ccsds_distributor = CcsdsDistributor::new(Box::new(ccsds_receiver)); + let udp_tc_server = UdpTcServer::new(addr, 2048, Box::new(ccsds_distributor)) + .expect("Creating UDP TMTC server failed"); + let mut udp_tmtc_server = UdpTmtcServer { + udp_tc_server, + tm_rx: tm_server_rx, + tm_store, + }; + loop { + core_tmtc_loop(&mut udp_tmtc_server); + thread::sleep(Duration::from_millis(400)); + } +} + +fn core_tmtc_loop(udp_tmtc_server: &mut UdpTmtcServer) { + while core_tc_handling(udp_tmtc_server) {} + if let Some(recv_addr) = udp_tmtc_server.udp_tc_server.last_sender() { + core_tm_handling(udp_tmtc_server, &recv_addr); + } +} + +fn core_tc_handling(udp_tmtc_server: &mut UdpTmtcServer) -> bool { + match udp_tmtc_server.udp_tc_server.try_recv_tc() { + Ok(_) => true, + Err(e) => match e { + ReceiveResult::ReceiverError(e) => match e { + CcsdsError::PacketError(e) => { + println!("Got packet error: {e:?}"); + true + } + CcsdsError::CustomError(_) => { + println!("Unknown receiver error"); + true + } + }, + ReceiveResult::OtherIoError(e) => { + println!("IO error {e}"); + false + } + ReceiveResult::WouldBlock => false, + }, + } +} + +fn core_tm_handling(udp_tmtc_server: &mut UdpTmtcServer, recv_addr: &SocketAddr) { + while let Ok(addr) = udp_tmtc_server.tm_rx.try_recv() { + let mut store_lock = udp_tmtc_server + .tm_store + .lock() + .expect("Locking TM store failed"); + let pg = store_lock.pool.read_with_guard(addr); + let buf = pg.read().expect("Error reading TM pool data"); + println!("Sending TM"); + udp_tmtc_server + .udp_tc_server + .socket + .send_to(buf, recv_addr) + .expect("Sending TM failed"); + } +}