try to make it compile again

This commit is contained in:
Robin Mueller
2025-11-27 18:11:14 +01:00
parent b3a7f3f9d2
commit 3f4c0e7d51
8 changed files with 164 additions and 288 deletions

View File

@@ -32,12 +32,7 @@ use satrs_minisim::{
};
use serde::{Deserialize, Serialize};
use crate::{
hk::PusHkHelper,
//pus::hk::{HkReply, HkReplyVariant},
requests::CompositeRequest,
tmtc::sender::TmTcSender,
};
use crate::{hk::PusHkHelper, requests::CompositeRequest};
pub trait SerialInterface {
type Error: core::fmt::Debug;
@@ -213,7 +208,7 @@ pub struct PcduHandler<ComInterface: SerialInterface> {
//hk_reply_tx: mpsc::SyncSender<GenericMessage<HkReply>>,
hk_tx: std::sync::mpsc::SyncSender<CcsdsTmPacketOwned>,
switch_request_rx: mpsc::Receiver<GenericMessage<SwitchRequest>>,
tm_sender: TmTcSender,
tm_tx: mpsc::SyncSender<CcsdsTmPacketOwned>,
pub com_interface: ComInterface,
shared_switch_map: Arc<Mutex<SwitchSet>>,
#[new(value = "PusHkHelper::new(id)")]
@@ -274,7 +269,7 @@ impl<ComInterface: SerialInterface> PcduHandler<ComInterface> {
}
}
pub fn handle_hk_request(&mut self, requestor_info: &MessageMetadata, hk_request: &HkRequest) {
pub fn handle_hk_request(&mut self, _requestor_info: &MessageMetadata, hk_request: &HkRequest) {
match hk_request.variant {
HkRequestVariant::OneShot => {
if hk_request.unique_id == SetId::SwitcherSet as u32 {
@@ -301,11 +296,11 @@ impl<ComInterface: SerialInterface> PcduHandler<ComInterface> {
},
&mut self.tm_buf,
) {
self.tm_sender
.send_tm(self.id.id(), PusTmVariant::Direct(hk_tm))
.expect("failed to send HK TM");
// TODO: Fix
/*
self.tm_sender
.send(self.id.id(), PusTmVariant::Direct(hk_tm))
.expect("failed to send HK TM");
self.hk_reply_tx
.send(GenericMessage::new(
*requestor_info,
@@ -511,9 +506,7 @@ mod tests {
use std::sync::mpsc;
use arbitrary_int::u21;
use satrs::{
mode::ModeRequest, power::SwitchStateBinary, request::GenericMessage, tmtc::PacketAsVec,
};
use satrs::{mode::ModeRequest, power::SwitchStateBinary, request::GenericMessage};
use satrs_example::ids::{self, Apid};
use satrs_minisim::eps::SwitchMapBinary;
@@ -561,7 +554,7 @@ mod tests {
pub composite_request_tx: mpsc::Sender<GenericMessage<CompositeRequest>>,
//pub hk_reply_rx: mpsc::Receiver<GenericMessage<HkReply>>,
pub hk_rx: std::sync::mpsc::Receiver<CcsdsTmPacketOwned>,
pub tm_rx: mpsc::Receiver<PacketAsVec>,
pub tm_rx: mpsc::Receiver<CcsdsTmPacketOwned>,
pub switch_request_tx: mpsc::Sender<GenericMessage<SwitchRequest>>,
pub handler: PcduHandler<SerialInterfaceTest>,
}
@@ -574,7 +567,7 @@ mod tests {
let mode_node = ModeRequestHandlerMpscBounded::new(PCDU.into(), mode_request_rx);
let (composite_request_tx, composite_request_rx) = mpsc::channel();
let (hk_tx, hk_rx) = mpsc::sync_channel(10);
let (tm_tx, tm_rx) = mpsc::sync_channel::<PacketAsVec>(5);
let (tm_tx, tm_rx) = mpsc::sync_channel(5);
let (switch_request_tx, switch_reqest_rx) = mpsc::channel();
let shared_switch_map = Arc::new(Mutex::new(SwitchSet::default()));
let mut handler = PcduHandler::new(
@@ -584,7 +577,8 @@ mod tests {
composite_request_rx,
hk_tx,
switch_reqest_rx,
TmTcSender::Heap(tm_tx.clone()),
tm_tx.clone(),
//TmTcSender::Normal(tm_tx.clone()),
SerialInterfaceTest::default(),
shared_switch_map,
);

View File

@@ -1,6 +1,6 @@
use std::sync::mpsc::{self};
use crate::pus::create_verification_reporter;
//use crate::pus::create_verification_reporter;
use arbitrary_int::traits::Integer as _;
use arbitrary_int::u11;
use satrs::event_man_legacy::{EventMessageU32, EventRoutingError};
@@ -34,6 +34,7 @@ impl EventTmHook for EventApidSetter {
}
}
/*
/// The PUS event handler subscribes for all events and converts them into ECSS PUS 5 event
/// packets. It also handles the verification completion of PUS event service requests.
pub struct PusEventHandler<TmSender: EcssTmSender> {
@@ -292,3 +293,4 @@ mod tests {
// TODO: Add test.
}
}
*/

View File

@@ -1,22 +1,22 @@
#![allow(dead_code)]
use std::collections::VecDeque;
use std::net::{SocketAddr, UdpSocket};
use std::sync::mpsc;
use std::sync::{mpsc, Arc, Mutex};
use log::{info, warn};
use satrs::hal::std::udp_server::{ReceiveResult, UdpTcServer};
use satrs::pus::HandlingStatus;
use satrs::queue::GenericSendError;
use satrs::tmtc::PacketAsVec;
use satrs::pool::{PoolProviderWithGuards, SharedStaticMemoryPool};
use satrs::tmtc::PacketInPool;
use satrs_example::CcsdsTmPacketOwned;
use crate::tmtc::sender::TmTcSender;
pub trait UdpTmHandler {
pub trait UdpTmHandlerProvider {
fn send_tm_to_udp_client(&mut self, socket: &UdpSocket, recv_addr: &SocketAddr);
}
/*
pub struct StaticUdpTmHandler {
pub tm_rx: mpsc::Receiver<PacketInPool>,
pub tm_store: SharedStaticMemoryPool,
@@ -45,22 +45,17 @@ impl UdpTmHandler for StaticUdpTmHandler {
}
}
}
*/
pub struct DynamicUdpTmHandler {
pub tm_rx: mpsc::Receiver<PacketAsVec>,
pub struct UdpTmHandlerWithChannel {
pub tm_rx: mpsc::Receiver<CcsdsTmPacketOwned>,
}
impl UdpTmHandler for DynamicUdpTmHandler {
impl UdpTmHandlerProvider for UdpTmHandlerWithChannel {
fn send_tm_to_udp_client(&mut self, socket: &UdpSocket, recv_addr: &SocketAddr) {
while let Ok(tm) = self.tm_rx.try_recv() {
if tm.packet.len() > 9 {
let service = tm.packet[7];
let subservice = tm.packet[8];
info!("Sending PUS TM[{service},{subservice}]")
} else {
info!("Sending PUS TM");
}
let result = socket.send_to(&tm.packet, recv_addr);
info!("Sending PUS TM with header {:?}", tm.tm_header);
let result = socket.send_to(&tm.to_vec(), recv_addr);
if let Err(e) = result {
warn!("Sending TM with UDP socket failed: {e}")
}
@@ -68,12 +63,49 @@ impl UdpTmHandler for DynamicUdpTmHandler {
}
}
pub struct UdpTmtcServer<TmHandler: UdpTmHandler> {
pub udp_tc_server: UdpTcServer<TmTcSender, GenericSendError>,
pub tm_handler: TmHandler,
#[derive(Default, Debug, Clone)]
pub struct TestTmHandler {
addrs_to_send_to: Arc<Mutex<VecDeque<SocketAddr>>>,
}
impl<TmHandler: UdpTmHandler> UdpTmtcServer<TmHandler> {
impl UdpTmHandlerProvider for TestTmHandler {
fn send_tm_to_udp_client(&mut self, _socket: &UdpSocket, recv_addr: &SocketAddr) {
self.addrs_to_send_to.lock().unwrap().push_back(*recv_addr);
}
}
pub enum UdpTmHandler {
Normal(UdpTmHandlerWithChannel),
Test(TestTmHandler),
}
impl From<UdpTmHandlerWithChannel> for UdpTmHandler {
fn from(handler: UdpTmHandlerWithChannel) -> Self {
UdpTmHandler::Normal(handler)
}
}
impl From<TestTmHandler> for UdpTmHandler {
fn from(handler: TestTmHandler) -> Self {
UdpTmHandler::Test(handler)
}
}
impl UdpTmHandlerProvider for UdpTmHandler {
fn send_tm_to_udp_client(&mut self, socket: &UdpSocket, recv_addr: &SocketAddr) {
match self {
UdpTmHandler::Normal(handler) => handler.send_tm_to_udp_client(socket, recv_addr),
UdpTmHandler::Test(handler) => handler.send_tm_to_udp_client(socket, recv_addr),
}
}
}
pub struct UdpTmtcServer {
pub udp_tc_server: UdpTcServer<TmTcSender, GenericSendError>,
pub tm_handler: UdpTmHandler,
}
impl UdpTmtcServer {
pub fn periodic_operation(&mut self) {
loop {
if self.poll_tc_server() == HandlingStatus::Empty {
@@ -107,12 +139,8 @@ impl<TmHandler: UdpTmHandler> UdpTmtcServer<TmHandler> {
#[cfg(test)]
mod tests {
use std::net::IpAddr;
use std::net::Ipv4Addr;
use std::{
collections::VecDeque,
net::IpAddr,
sync::{Arc, Mutex},
};
use arbitrary_int::traits::Integer as _;
use arbitrary_int::u14;
@@ -127,23 +155,12 @@ mod tests {
use satrs_example::config::OBSW_SERVER_ADDR;
use satrs_example::ids;
use crate::tmtc::sender::{MockSender, TmTcSender};
use crate::tmtc::sender::MockSender;
use super::*;
const UDP_SERVER_ID: ComponentId = 0x05;
#[derive(Default, Debug, Clone)]
pub struct TestTmHandler {
addrs_to_send_to: Arc<Mutex<VecDeque<SocketAddr>>>,
}
impl UdpTmHandler for TestTmHandler {
fn send_tm_to_udp_client(&mut self, _socket: &UdpSocket, recv_addr: &SocketAddr) {
self.addrs_to_send_to.lock().unwrap().push_back(*recv_addr);
}
}
#[test]
fn test_basic() {
let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), 0);
@@ -154,7 +171,7 @@ mod tests {
let tm_handler_calls = tm_handler.addrs_to_send_to.clone();
let mut udp_dyn_server = UdpTmtcServer {
udp_tc_server,
tm_handler,
tm_handler: tm_handler.into(),
};
udp_dyn_server.periodic_operation();
let queue = udp_dyn_server
@@ -179,7 +196,7 @@ mod tests {
let tm_handler_calls = tm_handler.addrs_to_send_to.clone();
let mut udp_dyn_server = UdpTmtcServer {
udp_tc_server,
tm_handler,
tm_handler: tm_handler.into(),
};
let sph = SpHeader::new_for_unseg_tc(ids::Apid::GenericPus.raw_value(), u14::ZERO, 0);
let ping_tc = PusTcCreator::new_simple(

View File

@@ -42,6 +42,20 @@ pub struct CcsdsTmPacketOwned {
pub payload: alloc::vec::Vec<u8>,
}
/// Simple type modelling packet stored in the heap. This structure is intended to
/// be used when sending a packet via a message queue, so it also contains the sender ID.
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct PacketAsVec {
pub sender_id: ComponentId,
pub packet: Vec<u8>,
}
impl PacketAsVec {
pub fn new(sender_id: ComponentId, packet: Vec<u8>) -> Self {
Self { sender_id, packet }
}
}
#[derive(Debug, thiserror::Error)]
pub enum CcsdsCreationError {
#[error("CCSDS packet creation error: {0}")]
@@ -76,11 +90,11 @@ impl CcsdsTmPacketOwned {
.unwrap()
}
pub fn to_vec(&self) -> Result<alloc::vec::Vec<u8>, CcsdsCreationError> {
pub fn to_vec(&self) -> alloc::vec::Vec<u8> {
let mut buf = alloc::vec![0u8; self.len_written()];
let len = self.write_to_bytes(&mut buf)?;
let len = self.write_to_bytes(&mut buf).unwrap();
buf.truncate(len);
Ok(buf)
buf
}
}

View File

@@ -5,12 +5,10 @@ use std::{
time::Duration,
};
use acs::mgm::{MgmHandlerLis3Mdl, SpiDummyInterface, SpiSimInterface, SpiSimInterfaceWrapper};
use eps::{
pcdu::{PcduHandler, SerialInterfaceDummy, SerialInterfaceToSim, SerialSimInterfaceWrapper},
PowerSwitchHelper,
};
use events::EventHandler;
use interface::{
sim_client_udp::create_sim_client,
tcp::{SyncTcpTmSource, TcpTask},
@@ -18,31 +16,17 @@ use interface::{
};
use log::info;
use logger::setup_logger;
/*
use pus::{
action::create_action_service,
event::create_event_service,
hk::create_hk_service,
mode::create_mode_service,
scheduler::{create_scheduler_service, TcReleaser},
stack::PusStack,
test::create_test_service,
PusTcDistributor, PusTcMpscRouter,
};
*/
use requests::GenericRequestRouter;
use satrs::{
hal::std::{tcp_server::ServerConfig, udp_server::UdpTcServer},
mode::{Mode, ModeAndSubmode, ModeRequest, ModeRequestHandlerMpscBounded},
mode_tree::connect_mode_nodes,
pus::{event_man::EventRequestWithToken, EcssTcCacher, HandlingStatus},
pus::{event_man::EventRequestWithToken, HandlingStatus},
request::{GenericMessage, MessageMetadata},
spacepackets::time::cds::CdsTime,
};
use satrs_example::{
config::{
components::NO_SENDER,
pool::create_sched_tc_pool,
tasks::{FREQ_MS_AOCS, FREQ_MS_PUS_STACK, FREQ_MS_UDP_TMTC, SIM_CLIENT_IDLE_DELAY_MS},
OBSW_SERVER_ADDR, PACKET_ID_VALIDATOR, SERVER_PORT,
},
@@ -56,23 +40,7 @@ use satrs_example::{
use tmtc::sender::TmTcSender;
use tmtc::{tc_source::TcSourceTask, tm_sink::TmSink};
cfg_if::cfg_if! {
if #[cfg(feature = "heap_tmtc")] {
use interface::udp::DynamicUdpTmHandler;
use satrs::pus::EcssTcVecCacher;
use tmtc::{tc_source::TcSourceTaskDynamic, tm_sink::TmSinkDynamic};
} else {
use std::sync::RwLock;
use interface::udp::StaticUdpTmHandler;
use satrs::pus::EcssTcInSharedPoolCacher;
use satrs::tmtc::{PacketSenderWithSharedPool, SharedPacketPool};
use satrs_example::config::pool::create_static_pools;
use tmtc::{
tc_source::TcSourceTaskStatic,
tm_sink::TmSinkStatic,
};
}
}
use crate::{interface::udp::UdpTmHandlerWithChannel, tmtc::tc_source::CcsdsDistributor};
mod acs;
mod eps;
@@ -89,33 +57,13 @@ fn main() {
setup_logger().expect("setting up logging with fern failed");
println!("Running OBSW example");
cfg_if::cfg_if! {
if #[cfg(not(feature = "heap_tmtc"))] {
let (tm_pool, tc_pool) = create_static_pools();
let shared_tm_pool = Arc::new(RwLock::new(tm_pool));
let shared_tc_pool = Arc::new(RwLock::new(tc_pool));
let shared_tm_pool_wrapper = SharedPacketPool::new(&shared_tm_pool);
let shared_tc_pool_wrapper = SharedPacketPool::new(&shared_tc_pool);
}
}
let (tc_source_tx, tc_source_rx) = mpsc::sync_channel(50);
let (tm_sink_tx, tm_sink_rx) = mpsc::sync_channel(50);
let (tm_server_tx, tm_server_rx) = mpsc::sync_channel(50);
cfg_if::cfg_if! {
if #[cfg(not(feature = "heap_tmtc"))] {
let tm_sender = TmTcSender::Static(
PacketSenderWithSharedPool::new(tm_sink_tx.clone(), shared_tm_pool_wrapper.clone())
);
} else if #[cfg(feature = "heap_tmtc")] {
let tm_sender = TmTcSender::Heap(tm_sink_tx.clone());
}
}
let (sim_request_tx, sim_request_rx) = mpsc::channel();
let (mgm_0_sim_reply_tx, mgm_0_sim_reply_rx) = mpsc::channel();
let (mgm_1_sim_reply_tx, mgm_1_sim_reply_rx) = mpsc::channel();
//let (mgm_0_sim_reply_tx, mgm_0_sim_reply_rx) = mpsc::channel();
//let (mgm_1_sim_reply_tx, mgm_1_sim_reply_rx) = mpsc::channel();
let (pcdu_sim_reply_tx, pcdu_sim_reply_rx) = mpsc::channel();
let mut opt_sim_client = create_sim_client(sim_request_rx);
@@ -138,141 +86,35 @@ fn main() {
.composite_router_map
.insert(PCDU.id(), pcdu_handler_composite_tx);
// This helper structure is used by all telecommand providers which need to send telecommands
// to the TC source.
/*
cfg_if::cfg_if! {
if #[cfg(not(feature = "heap_tmtc"))] {
let tc_sender_with_shared_pool =
PacketSenderWithSharedPool::new(tc_source_tx, shared_tc_pool_wrapper.clone());
let tc_in_mem_converter =
EcssTcCacher::Static(EcssTcInSharedPoolCacher::new(shared_tc_pool, 4096));
} else if #[cfg(feature = "heap_tmtc")] {
let tc_in_mem_converter = EcssTcCacher::Heap(EcssTcVecCacher::default());
}
}
*/
// Create event handling components
// These sender handles are used to send event requests, for example to enable or disable
// certain events.
let (event_tx, event_rx) = mpsc::sync_channel(100);
//let (event_tx, event_rx) = mpsc::sync_channel(100);
let (event_request_tx, event_request_rx) = mpsc::channel::<EventRequestWithToken>();
// The event task is the core handler to perform the event routing and TM handling as specified
// in the sat-rs documentation.
let mut event_handler = EventHandler::new(tm_sink_tx.clone(), event_rx, event_request_rx);
//let mut event_handler = EventHandler::new(tm_sink_tx.clone(), event_rx, event_request_rx);
let (pus_test_tx, pus_test_rx) = mpsc::sync_channel(20);
let (pus_event_tx, pus_event_rx) = mpsc::sync_channel(10);
let (pus_sched_tx, pus_sched_rx) = mpsc::sync_channel(50);
let (pus_hk_tx, pus_hk_rx) = mpsc::sync_channel(50);
let (pus_action_tx, pus_action_rx) = mpsc::sync_channel(50);
let (pus_mode_tx, pus_mode_rx) = mpsc::sync_channel(50);
//let (pus_test_tx, pus_test_rx) = mpsc::sync_channel(20);
//let (pus_event_tx, pus_event_rx) = mpsc::sync_channel(10);
//let (pus_sched_tx, pus_sched_rx) = mpsc::sync_channel(50);
//let (pus_hk_tx, pus_hk_rx) = mpsc::sync_channel(50);
//let (pus_action_tx, pus_action_rx) = mpsc::sync_channel(50);
//let (pus_mode_tx, pus_mode_rx) = mpsc::sync_channel(50);
let (_pus_action_reply_tx, pus_action_reply_rx) = mpsc::channel();
//let (_pus_action_reply_tx, pus_action_reply_rx) = mpsc::channel();
let (pus_hk_reply_tx, pus_hk_reply_rx) = mpsc::sync_channel(50);
let (pus_mode_reply_tx, pus_mode_reply_rx) = mpsc::sync_channel(30);
//let (pus_mode_reply_tx, pus_mode_reply_rx) = mpsc::sync_channel(30);
/*
cfg_if::cfg_if! {
if #[cfg(not(feature = "heap_tmtc"))] {
let tc_releaser = TcReleaser::Static(tc_sender_with_shared_pool.clone());
} else if #[cfg(feature = "heap_tmtc")] {
let tc_releaser = TcReleaser::Heap(tc_source_tx.clone());
}
}
*/
/*
let pus_router = PusTcMpscRouter {
test_tc_sender: pus_test_tx,
event_tc_sender: pus_event_tx,
sched_tc_sender: pus_sched_tx,
hk_tc_sender: pus_hk_tx,
action_tc_sender: pus_action_tx,
mode_tc_sender: pus_mode_tx,
};
let pus_test_service = create_test_service(
tm_sender.clone(),
tc_in_mem_converter.clone(),
event_tx.clone(),
pus_test_rx,
);
let pus_scheduler_service = create_scheduler_service(
tm_sender.clone(),
tc_in_mem_converter.clone(),
tc_releaser,
pus_sched_rx,
create_sched_tc_pool(),
);
let pus_event_service = create_event_service(
tm_sender.clone(),
tc_in_mem_converter.clone(),
pus_event_rx,
event_request_tx,
);
let pus_action_service = create_action_service(
tm_sender.clone(),
tc_in_mem_converter.clone(),
pus_action_rx,
request_map.clone(),
pus_action_reply_rx,
);
let pus_hk_service = create_hk_service(
tm_sender.clone(),
tc_in_mem_converter.clone(),
pus_hk_rx,
request_map.clone(),
pus_hk_reply_rx,
);
let pus_mode_service = create_mode_service(
tm_sender.clone(),
tc_in_mem_converter.clone(),
pus_mode_rx,
request_map,
pus_mode_reply_rx,
);
let mut pus_stack = PusStack::new(
pus_test_service,
pus_hk_service,
pus_event_service,
pus_action_service,
pus_scheduler_service,
pus_mode_service,
);
*/
/*
cfg_if::cfg_if! {
if #[cfg(not(feature = "heap_tmtc"))] {
let mut tmtc_task = TcSourceTask::Static(TcSourceTaskStatic::new(
shared_tc_pool_wrapper.clone(),
tc_source_rx,
PusTcDistributor::new(tm_sender.clone(), pus_router),
));
let tc_sender = TmTcSender::Static(tc_sender_with_shared_pool);
let udp_tm_handler = StaticUdpTmHandler {
tm_rx: tm_server_rx,
tm_store: shared_tm_pool.clone(),
};
} else if #[cfg(feature = "heap_tmtc")] {
let mut tmtc_task = TcSourceTask::Heap(TcSourceTaskDynamic::new(
tc_source_rx,
PusTcDistributor::new(tm_sender.clone(), pus_router),
));
let tc_sender = TmTcSender::Heap(tc_source_tx.clone());
let udp_tm_handler = DynamicUdpTmHandler {
tm_rx: tm_server_rx,
};
}
}
*/
let mut ccsds_distributor = CcsdsDistributor::default();
let mut tmtc_task = TcSourceTask::new(
tc_source_rx,
ccsds_distributor,
//PusTcDistributor::new(tm_sender.clone(), pus_router),
);
let tc_sender = TmTcSender::Heap(tc_source_tx.clone());
let udp_tm_handler = DynamicUdpTmHandler {
let tc_sender = TmTcSender::Normal(tc_source_tx.clone());
let udp_tm_handler = UdpTmHandlerWithChannel {
tm_rx: tm_server_rx,
};
@@ -281,7 +123,7 @@ fn main() {
.expect("creating UDP TMTC server failed");
let mut udp_tmtc_server = UdpTmtcServer {
udp_tc_server,
tm_handler: udp_tm_handler,
tm_handler: udp_tm_handler.into(),
};
let tcp_server_cfg = ServerConfig::new(
@@ -300,32 +142,14 @@ fn main() {
)
.expect("tcp server creation failed");
/*
cfg_if::cfg_if! {
if #[cfg(not(feature = "heap_tmtc"))] {
let mut tm_sink = TmSink::Static(TmSinkStatic::new(
shared_tm_pool_wrapper,
sync_tm_tcp_source,
tm_sink_rx,
tm_server_tx,
));
} else if #[cfg(feature = "heap_tmtc")] {
let mut tm_sink = TmSink::Heap(TmSinkDynamic::new(
sync_tm_tcp_source,
tm_sink_rx,
tm_server_tx,
));
}
}
*/
let mut tm_sink = TmSink::new(sync_tm_tcp_source, tm_sink_rx, tm_server_tx);
let shared_switch_set = Arc::new(Mutex::default());
let (switch_request_tx, switch_request_rx) = mpsc::sync_channel(20);
let switch_helper = PowerSwitchHelper::new(switch_request_tx, shared_switch_set.clone());
let shared_mgm_0_set = Arc::default();
let shared_mgm_1_set = Arc::default();
//let shared_mgm_0_set = Arc::default();
//let shared_mgm_1_set = Arc::default();
let mgm_0_mode_node = ModeRequestHandlerMpscBounded::new(MGM0.into(), mgm_0_handler_mode_rx);
let mgm_1_mode_node = ModeRequestHandlerMpscBounded::new(MGM1.into(), mgm_1_handler_mode_rx);
/*
@@ -405,7 +229,7 @@ fn main() {
pcdu_handler_composite_rx,
pus_hk_reply_tx,
switch_request_rx,
tm_sender.clone(),
tm_sink_tx.clone(),
pcdu_serial_interface,
shared_switch_set,
);
@@ -507,8 +331,8 @@ fn main() {
let jh_pus_handler = thread::Builder::new()
.name("sat-rs pus".to_string())
.spawn(move || loop {
event_handler.periodic_operation();
pus_stack.periodic_operation();
//event_handler.periodic_operation();
//pus_stack.periodic_operation();
thread::sleep(Duration::from_millis(FREQ_MS_PUS_STACK));
})
.unwrap();

View File

@@ -1,10 +1,8 @@
use std::{cell::RefCell, collections::VecDeque, sync::mpsc};
use satrs::{
pus::EcssTmSender,
queue::GenericSendError,
spacepackets::ecss::WritablePusPacket,
tmtc::{PacketAsVec, PacketHandler, PacketSenderWithSharedPool},
tmtc::{PacketAsVec, PacketHandler},
ComponentId,
};
@@ -14,8 +12,7 @@ pub struct MockSender(pub RefCell<VecDeque<PacketAsVec>>);
#[allow(dead_code)]
#[derive(Debug, Clone)]
pub enum TmTcSender {
Static(PacketSenderWithSharedPool),
Heap(mpsc::SyncSender<PacketAsVec>),
Normal(mpsc::SyncSender<PacketAsVec>),
Mock(MockSender),
}
@@ -29,6 +26,7 @@ impl TmTcSender {
}
}
/*
impl EcssTmSender for TmTcSender {
fn send_tm(
&self,
@@ -36,7 +34,7 @@ impl EcssTmSender for TmTcSender {
tm: satrs::pus::PusTmVariant,
) -> Result<(), satrs::pus::EcssTmtcError> {
match self {
TmTcSender::Static(sync_sender) => sync_sender.send_tm(sender_id, tm),
//TmTcSender::Static(sync_sender) => sync_sender.send_tm(sender_id, tm),
TmTcSender::Heap(sync_sender) => match tm {
satrs::pus::PusTmVariant::InStore(_) => panic!("can not send TM in store"),
satrs::pus::PusTmVariant::Direct(pus_tm_creator) => sync_sender
@@ -47,19 +45,15 @@ impl EcssTmSender for TmTcSender {
}
}
}
*/
impl PacketHandler for TmTcSender {
type Error = GenericSendError;
fn handle_packet(&self, sender_id: ComponentId, packet: &[u8]) -> Result<(), Self::Error> {
match self {
TmTcSender::Static(packet_sender_with_shared_pool) => {
if let Err(e) = packet_sender_with_shared_pool.handle_packet(sender_id, packet) {
log::error!("Error sending packet via Static TM/TC sender: {:?}", e);
}
}
TmTcSender::Heap(sync_sender) => {
if let Err(e) = sync_sender.handle_packet(sender_id, packet) {
TmTcSender::Normal(sync_sender) => {
if let Err(e) = sync_sender.send(PacketAsVec::new(sender_id, packet.to_vec())) {
log::error!("Error sending packet via Heap TM/TC sender: {:?}", e);
}
}

View File

@@ -1,5 +1,9 @@
use satrs::pus::HandlingStatus;
use satrs_example::{CcsdsTcPacketOwned, ComponentId};
use satrs::{
pus::HandlingStatus,
spacepackets::{CcsdsPacketReader, ChecksumType},
tmtc::PacketAsVec,
};
use satrs_example::{CcsdsTcPacketOwned, ComponentId, TcHeader};
use std::{
collections::HashMap,
sync::mpsc::{self, TryRecvError},
@@ -69,14 +73,14 @@ pub type CcsdsDistributor = HashMap<ComponentId, std::sync::mpsc::SyncSender<Ccs
// TC source components where the heap is the backing memory of the received telecommands.
pub struct TcSourceTask {
pub tc_receiver: mpsc::Receiver<CcsdsTcPacketOwned>,
pub tc_receiver: mpsc::Receiver<PacketAsVec>,
ccsds_distributor: CcsdsDistributor,
}
//#[allow(dead_code)]
impl TcSourceTask {
pub fn new(
tc_receiver: mpsc::Receiver<CcsdsTcPacketOwned>,
tc_receiver: mpsc::Receiver<PacketAsVec>,
ccsds_distributor: CcsdsDistributor,
) -> Self {
Self {
@@ -94,14 +98,39 @@ impl TcSourceTask {
// If packets like CFDP are expected, we might have to check the APID first.
match self.tc_receiver.try_recv() {
Ok(packet) => {
if let Some(sender) = self.ccsds_distributor.get(&packet.tc_header.target_id) {
sender.send(packet).ok();
} else {
let ccsds_tc_reader_result =
CcsdsPacketReader::new(&packet.packet, Some(ChecksumType::WithCrc16));
if ccsds_tc_reader_result.is_err() {
log::warn!(
"no TC handler for target ID {:?}",
packet.tc_header.target_id
"received invalid CCSDS TC packet: {:?}",
ccsds_tc_reader_result.err()
);
// TODO: Send a dedicated TM packet.
return HandlingStatus::HandledOne;
}
let ccsds_tc_reader = ccsds_tc_reader_result.unwrap();
let tc_header_result =
postcard::take_from_bytes::<TcHeader>(ccsds_tc_reader.user_data());
if tc_header_result.is_err() {
log::warn!(
"received CCSDS TC packet with invalid TC header: {:?}",
tc_header_result.err()
);
// TODO: Send a dedicated TM packet.
return HandlingStatus::HandledOne;
}
let (tc_header, payload) = tc_header_result.unwrap();
if let Some(sender) = self.ccsds_distributor.get(&tc_header.target_id) {
sender
.send(CcsdsTcPacketOwned {
sp_header: *ccsds_tc_reader.sp_header(),
tc_header,
payload: payload.to_vec(),
})
.ok();
} else {
log::warn!("no TC handler for target ID {:?}", tc_header.target_id);
// TODO: Send a dedicated TM packet.
}
HandlingStatus::HandledOne
}

View File

@@ -5,11 +5,13 @@ use std::{
use arbitrary_int::{u11, u14};
use log::info;
use satrs::spacepackets::{
ecss::{tm::PusTmZeroCopyWriter, PusPacket},
seq_count::SequenceCounter,
seq_count::SequenceCounterCcsdsSimple,
CcsdsPacket,
use satrs::{
spacepackets::{
ecss::{tm::PusTmZeroCopyWriter, PusPacket},
seq_count::{SequenceCounter, SequenceCounterCcsdsSimple},
CcsdsPacket,
},
tmtc::PacketAsVec,
};
use satrs_example::CcsdsTmPacketOwned;
@@ -165,7 +167,7 @@ impl TmSink {
.expect("Creating TM zero copy writer failed");
self.common.apply_packet_processing(zero_copy_writer);
*/
self.common.sync_tm_tcp_source.add_tm(&tm.to_vec().unwrap());
self.common.sync_tm_tcp_source.add_tm(&tm.to_vec());
self.tm_server_tx
.send(tm)
.expect("Sending TM to server failed");