Compare commits

..

1 Commits

Author SHA1 Message Date
e238b5d377 Mode Tree Feature Update 2025-02-07 15:16:43 +01:00
16 changed files with 142 additions and 117 deletions

View File

@ -26,7 +26,7 @@ use crate::hk::PusHkHelper;
use crate::pus::hk::{HkReply, HkReplyVariant};
use crate::requests::CompositeRequest;
use crate::spi::SpiInterface;
use crate::tm_sender::TmSender;
use crate::tmtc::sender::TmTcSender;
use serde::{Deserialize, Serialize};
@ -165,7 +165,7 @@ pub struct MgmHandlerLis3Mdl<
composite_request_rx: mpsc::Receiver<GenericMessage<CompositeRequest>>,
hk_reply_tx: mpsc::SyncSender<GenericMessage<HkReply>>,
switch_helper: SwitchHelper,
tm_sender: TmSender,
tm_sender: TmTcSender,
pub com_interface: ComInterface,
shared_mgm_set: Arc<Mutex<MgmData>>,
#[new(value = "PusHkHelper::new(id)")]
@ -576,9 +576,9 @@ mod tests {
let id = UniqueApidTargetId::new(Apid::Acs as u16, 1);
let mode_node = ModeRequestHandlerMpscBounded::new(id.into(), request_rx);
let (composite_request_tx, composite_request_rx) = mpsc::channel();
let (hk_reply_tx, hk_reply_rx) = mpsc::channel();
let (tm_tx, tm_rx) = mpsc::channel();
let tm_sender = TmSender::new_heap(tm_tx);
let (hk_reply_tx, hk_reply_rx) = mpsc::sync_channel(10);
let (tm_tx, tm_rx) = mpsc::sync_channel(10);
let tm_sender = TmTcSender::Heap(tm_tx);
let shared_mgm_set = Arc::default();
let mut handler = MgmHandlerLis3Mdl::new(
id,

View File

@ -574,7 +574,7 @@ mod tests {
let mode_node =
ModeRequestHandlerMpscBounded::new(PCDU_HANDLER.into(), mode_request_rx);
let (composite_request_tx, composite_request_rx) = mpsc::channel();
let (hk_reply_tx, hk_reply_rx) = mpsc::channel();
let (hk_reply_tx, hk_reply_rx) = mpsc::sync_channel(10);
let (tm_tx, tm_rx) = mpsc::channel::<PacketAsVec>();
let (switch_request_tx, switch_reqest_rx) = mpsc::channel();
let shared_switch_map = Arc::new(Mutex::new(SwitchSet::default()));

View File

@ -5,7 +5,7 @@ use std::{
};
use log::{info, warn};
use satrs::tmtc::{StoreAndSendError, TcSender};
use satrs::tmtc::StoreAndSendError;
use satrs::{
encoding::ccsds::{SpValidity, SpacePacketValidator},
hal::std::tcp_server::{HandledConnectionHandler, ServerConfig, TcpSpacepacketsServer},
@ -13,6 +13,8 @@ use satrs::{
tmtc::PacketSource,
};
use crate::tmtc::sender::TmTcSender;
#[derive(Default)]
pub struct ConnectionFinishedHandler {}
@ -110,13 +112,13 @@ pub type TcpServer<ReceivesTc, SendError> = TcpSpacepacketsServer<
SendError,
>;
pub struct TcpTask(pub TcpServer<TcSender, StoreAndSendError>);
pub struct TcpTask(pub TcpServer<TmTcSender, StoreAndSendError>);
impl TcpTask {
pub fn new(
cfg: ServerConfig,
tm_source: SyncTcpTmSource,
tc_sender: TcSender,
tc_sender: TmTcSender,
valid_ids: HashSet<PacketId>,
) -> Result<Self, std::io::Error> {
Ok(Self(TcpSpacepacketsServer::new(

View File

@ -3,12 +3,14 @@ use std::sync::mpsc;
use log::{info, warn};
use satrs::pus::HandlingStatus;
use satrs::tmtc::{PacketAsVec, PacketInPool, StoreAndSendError, TcSender};
use satrs::tmtc::{PacketAsVec, PacketInPool, StoreAndSendError};
use satrs::{
hal::std::udp_server::{ReceiveResult, UdpTcServer},
pool::{PoolProviderWithGuards, SharedStaticMemoryPool},
};
use crate::tmtc::sender::TmTcSender;
pub trait UdpTmHandler {
fn send_tm_to_udp_client(&mut self, socket: &UdpSocket, recv_addr: &SocketAddr);
}
@ -65,7 +67,7 @@ impl UdpTmHandler for DynamicUdpTmHandler {
}
pub struct UdpTmtcServer<TmHandler: UdpTmHandler> {
pub udp_tc_server: UdpTcServer<TcSender, StoreAndSendError>,
pub udp_tc_server: UdpTcServer<TmTcSender, StoreAndSendError>,
pub tm_handler: TmHandler,
}
@ -105,7 +107,6 @@ impl<TmHandler: UdpTmHandler> UdpTmtcServer<TmHandler> {
mod tests {
use std::net::Ipv4Addr;
use std::{
cell::RefCell,
collections::VecDeque,
net::IpAddr,
sync::{Arc, Mutex},
@ -116,30 +117,16 @@ mod tests {
ecss::{tc::PusTcCreator, WritablePusPacket},
SpHeader,
},
tmtc::PacketSenderRaw,
ComponentId,
};
use satrs_example::config::{components, OBSW_SERVER_ADDR};
use crate::tmtc::sender::{MockSender, TmTcSender};
use super::*;
const UDP_SERVER_ID: ComponentId = 0x05;
#[derive(Default, Debug)]
pub struct TestSender {
tc_vec: RefCell<VecDeque<PacketAsVec>>,
}
impl PacketSenderRaw for TestSender {
type Error = ();
fn send_packet(&self, sender_id: ComponentId, tc_raw: &[u8]) -> Result<(), Self::Error> {
let mut mut_queue = self.tc_vec.borrow_mut();
mut_queue.push_back(PacketAsVec::new(sender_id, tc_raw.to_vec()));
Ok(())
}
}
#[derive(Default, Debug, Clone)]
pub struct TestTmHandler {
addrs_to_send_to: Arc<Mutex<VecDeque<SocketAddr>>>,
@ -154,8 +141,7 @@ mod tests {
#[test]
fn test_basic() {
let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), 0);
let test_receiver = TestSender::default();
// let tc_queue = test_receiver.tc_vec.clone();
let test_receiver = TmTcSender::Mock(MockSender::default());
let udp_tc_server =
UdpTcServer::new(UDP_SERVER_ID, sock_addr, 2048, test_receiver).unwrap();
let tm_handler = TestTmHandler::default();
@ -165,7 +151,13 @@ mod tests {
tm_handler,
};
udp_dyn_server.periodic_operation();
let queue = udp_dyn_server.udp_tc_server.tc_sender.tc_vec.borrow();
let queue = udp_dyn_server
.udp_tc_server
.tc_sender
.get_mock_sender()
.unwrap()
.0
.borrow();
assert!(queue.is_empty());
assert!(tm_handler_calls.lock().unwrap().is_empty());
}
@ -173,8 +165,7 @@ mod tests {
#[test]
fn test_transactions() {
let sock_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0);
let test_receiver = TestSender::default();
// let tc_queue = test_receiver.tc_vec.clone();
let test_receiver = TmTcSender::Mock(MockSender::default());
let udp_tc_server =
UdpTcServer::new(UDP_SERVER_ID, sock_addr, 2048, test_receiver).unwrap();
let server_addr = udp_tc_server.socket.local_addr().unwrap();
@ -194,7 +185,13 @@ mod tests {
client.send_to(&ping_tc, server_addr).unwrap();
udp_dyn_server.periodic_operation();
{
let mut queue = udp_dyn_server.udp_tc_server.tc_sender.tc_vec.borrow_mut();
let mut queue = udp_dyn_server
.udp_tc_server
.tc_sender
.get_mock_sender()
.unwrap()
.0
.borrow_mut();
assert!(!queue.is_empty());
let packet_with_sender = queue.pop_front().unwrap();
assert_eq!(packet_with_sender.packet, ping_tc);
@ -209,7 +206,13 @@ mod tests {
assert_eq!(received_addr, client_addr);
}
udp_dyn_server.periodic_operation();
let queue = udp_dyn_server.udp_tc_server.tc_sender.tc_vec.borrow();
let queue = udp_dyn_server
.udp_tc_server
.tc_sender
.get_mock_sender()
.unwrap()
.0
.borrow();
assert!(queue.is_empty());
drop(queue);
// Still tries to send to the same client.

View File

@ -36,7 +36,6 @@ use satrs::{
pus::{event_man::EventRequestWithToken, EcssTcInMemConverter, HandlingStatus},
request::{GenericMessage, MessageMetadata},
spacepackets::time::{cds::CdsTime, TimeWriter},
tmtc::TcSender,
};
use satrs_example::{
config::{
@ -47,7 +46,7 @@ use satrs_example::{
},
DeviceMode,
};
use tm_sender::TmSender;
use tmtc::sender::TmTcSender;
use tmtc::{tc_source::TcSourceTask, tm_sink::TmSink};
cfg_if::cfg_if! {
@ -77,7 +76,6 @@ mod logger;
mod pus;
mod requests;
mod spi;
mod tm_sender;
mod tmtc;
fn main() {
@ -100,11 +98,11 @@ fn main() {
cfg_if::cfg_if! {
if #[cfg(not(feature = "heap_tmtc"))] {
let tm_sender = TmSender::Static(
let tm_sender = TmTcSender::Static(
PacketSenderWithSharedPool::new(tm_sink_tx.clone(), shared_tm_pool_wrapper.clone())
);
} else if #[cfg(feature = "heap_tmtc")] {
let tm_sender = TmSender::Heap(tm_sink_tx.clone());
let tm_sender = TmTcSender::Heap(tm_sink_tx.clone());
}
}
@ -232,7 +230,7 @@ fn main() {
tc_source_rx,
PusTcDistributor::new(tm_sender.clone(), pus_router),
));
let tc_sender = TcSender::StaticStore(tc_sender_with_shared_pool);
let tc_sender = TmTcSender::Static(tc_sender_with_shared_pool);
let udp_tm_handler = StaticUdpTmHandler {
tm_rx: tm_server_rx,
tm_store: shared_tm_pool.clone(),
@ -242,7 +240,7 @@ fn main() {
tc_source_rx,
PusTcDistributor::new(tm_sender.clone(), pus_router),
));
let tc_sender = TcSender::HeapStore(tc_source_tx.clone());
let tc_sender = TmTcSender::Heap(tc_source_tx.clone());
let udp_tm_handler = DynamicUdpTmHandler {
tm_rx: tm_server_rx,
};

View File

@ -22,7 +22,7 @@ use std::sync::mpsc;
use std::time::Duration;
use crate::requests::GenericRequestRouter;
use crate::tm_sender::TmSender;
use crate::tmtc::sender::TmTcSender;
use super::{
create_verification_reporter, generic_pus_request_timeout_handler, HandlingStatus,
@ -206,7 +206,7 @@ impl PusTcToRequestConverter<ActivePusActionRequestStd, ActionRequest> for Actio
}
pub fn create_action_service(
tm_sender: TmSender,
tm_sender: TmTcSender,
tc_in_mem_converter: EcssTcInMemConverter,
pus_action_rx: mpsc::Receiver<EcssTcAndToken>,
action_router: GenericRequestRouter,
@ -236,7 +236,7 @@ pub fn create_action_service(
pub struct ActionServiceWrapper {
pub(crate) service: PusTargetedRequestService<
MpscTcReceiver,
TmSender,
TmTcSender,
EcssTcInMemConverter,
VerificationReporter,
ActionRequestConverter,

View File

@ -1,7 +1,7 @@
use std::sync::mpsc;
use crate::pus::create_verification_reporter;
use crate::tm_sender::TmSender;
use crate::tmtc::sender::TmTcSender;
use satrs::pus::event_man::EventRequestWithToken;
use satrs::pus::event_srv::PusEventServiceHandler;
use satrs::pus::verification::VerificationReporter;
@ -15,7 +15,7 @@ use satrs_example::config::components::PUS_EVENT_MANAGEMENT;
use super::{DirectPusService, HandlingStatus};
pub fn create_event_service(
tm_sender: TmSender,
tm_sender: TmTcSender,
tm_in_pool_converter: EcssTcInMemConverter,
pus_event_rx: mpsc::Receiver<EcssTcAndToken>,
event_request_tx: mpsc::Sender<EventRequestWithToken>,
@ -38,7 +38,7 @@ pub fn create_event_service(
pub struct EventServiceWrapper {
pub handler: PusEventServiceHandler<
MpscTcReceiver,
TmSender,
TmTcSender,
EcssTcInMemConverter,
VerificationReporter,
>,

View File

@ -20,7 +20,7 @@ use std::time::Duration;
use crate::pus::{create_verification_reporter, generic_pus_request_timeout_handler};
use crate::requests::GenericRequestRouter;
use crate::tm_sender::TmSender;
use crate::tmtc::sender::TmTcSender;
use super::{HandlingStatus, PusTargetedRequestService, TargetedPusService};
@ -241,7 +241,7 @@ impl PusTcToRequestConverter<ActivePusRequestStd, HkRequest> for HkRequestConver
}
pub fn create_hk_service(
tm_sender: TmSender,
tm_sender: TmTcSender,
tc_in_mem_converter: EcssTcInMemConverter,
pus_hk_rx: mpsc::Receiver<EcssTcAndToken>,
request_router: GenericRequestRouter,
@ -269,7 +269,7 @@ pub fn create_hk_service(
pub struct HkServiceWrapper {
pub(crate) service: PusTargetedRequestService<
MpscTcReceiver,
TmSender,
TmTcSender,
EcssTcInMemConverter,
VerificationReporter,
HkRequestConverter,

View File

@ -1,5 +1,5 @@
use crate::requests::GenericRequestRouter;
use crate::tm_sender::TmSender;
use crate::tmtc::sender::TmTcSender;
use log::warn;
use satrs::pool::PoolAddr;
use satrs::pus::verification::{
@ -53,14 +53,14 @@ pub struct PusTcMpscRouter {
pub struct PusTcDistributor {
#[allow(dead_code)]
pub id: ComponentId,
pub tm_sender: TmSender,
pub tm_sender: TmTcSender,
pub verif_reporter: VerificationReporter,
pub pus_router: PusTcMpscRouter,
stamp_helper: TimestampHelper,
}
impl PusTcDistributor {
pub fn new(tm_sender: TmSender, pus_router: PusTcMpscRouter) -> Self {
pub fn new(tm_sender: TmTcSender, pus_router: PusTcMpscRouter) -> Self {
Self {
id: PUS_ROUTING_SERVICE.raw(),
tm_sender,

View File

@ -4,7 +4,7 @@ use std::sync::mpsc;
use std::time::Duration;
use crate::requests::GenericRequestRouter;
use crate::tm_sender::TmSender;
use crate::tmtc::sender::TmTcSender;
use satrs::pus::verification::VerificationReporter;
use satrs::pus::{
DefaultActiveRequestMap, EcssTcAndToken, EcssTcInMemConverter, MpscTcReceiver,
@ -209,7 +209,7 @@ impl PusTcToRequestConverter<ActivePusRequestStd, ModeRequest> for ModeRequestCo
}
pub fn create_mode_service(
tm_sender: TmSender,
tm_sender: TmTcSender,
tc_in_mem_converter: EcssTcInMemConverter,
pus_action_rx: mpsc::Receiver<EcssTcAndToken>,
mode_router: GenericRequestRouter,
@ -237,7 +237,7 @@ pub fn create_mode_service(
pub struct ModeServiceWrapper {
pub(crate) service: PusTargetedRequestService<
MpscTcReceiver,
TmSender,
TmTcSender,
EcssTcInMemConverter,
VerificationReporter,
ModeRequestConverter,

View File

@ -2,7 +2,7 @@ use std::sync::mpsc;
use std::time::Duration;
use crate::pus::create_verification_reporter;
use crate::tm_sender::TmSender;
use crate::tmtc::sender::TmTcSender;
use log::info;
use satrs::pool::{PoolProvider, StaticMemoryPool};
use satrs::pus::scheduler::{PusScheduler, TcInfo};
@ -83,7 +83,7 @@ impl TcReleaseProvider for TcReleaser {
pub struct SchedulingServiceWrapper {
pub pus_11_handler: PusSchedServiceHandler<
MpscTcReceiver,
TmSender,
TmTcSender,
EcssTcInMemConverter,
VerificationReporter,
PusScheduler,
@ -173,7 +173,7 @@ impl SchedulingServiceWrapper {
}
pub fn create_scheduler_service(
tm_sender: TmSender,
tm_sender: TmTcSender,
tc_in_mem_converter: EcssTcInMemConverter,
tc_releaser: TcReleaser,
pus_sched_rx: mpsc::Receiver<EcssTcAndToken>,

View File

@ -1,5 +1,5 @@
use crate::pus::create_verification_reporter;
use crate::tm_sender::TmSender;
use crate::tmtc::sender::TmTcSender;
use log::info;
use satrs::event_man::{EventMessage, EventMessageU32};
use satrs::pus::test::PusService17TestHandler;
@ -18,7 +18,7 @@ use std::sync::mpsc;
use super::{DirectPusService, HandlingStatus};
pub fn create_test_service(
tm_sender: TmSender,
tm_sender: TmTcSender,
tc_in_mem_converter: EcssTcInMemConverter,
event_sender: mpsc::SyncSender<EventMessageU32>,
pus_test_rx: mpsc::Receiver<EcssTcAndToken>,
@ -39,7 +39,7 @@ pub fn create_test_service(
pub struct TestCustomServiceWrapper {
pub handler: PusService17TestHandler<
MpscTcReceiver,
TmSender,
TmTcSender,
EcssTcInMemConverter,
VerificationReporter,
>,

View File

@ -1,33 +0,0 @@
use std::sync::mpsc;
use satrs::{
pus::EcssTmSender,
queue::GenericSendError,
spacepackets::ecss::WritablePusPacket,
tmtc::{PacketAsVec, PacketSenderWithSharedPool},
};
#[allow(dead_code)]
#[derive(Debug, Clone)]
pub enum TmSender {
Static(PacketSenderWithSharedPool),
Heap(mpsc::SyncSender<PacketAsVec>),
}
impl EcssTmSender for TmSender {
fn send_tm(
&self,
sender_id: satrs::ComponentId,
tm: satrs::pus::PusTmVariant,
) -> Result<(), satrs::pus::EcssTmtcError> {
match self {
TmSender::Static(sync_sender) => sync_sender.send_tm(sender_id, tm),
TmSender::Heap(sync_sender) => match tm {
satrs::pus::PusTmVariant::InStore(_) => panic!("can not send TM in store"),
satrs::pus::PusTmVariant::Direct(pus_tm_creator) => sync_sender
.send(PacketAsVec::new(sender_id, pus_tm_creator.to_vec()?))
.map_err(|_| GenericSendError::RxDisconnected.into()),
},
}
}
}

View File

@ -1,2 +1,3 @@
pub mod sender;
pub mod tc_source;
pub mod tm_sink;

View File

@ -0,0 +1,75 @@
use std::{cell::RefCell, collections::VecDeque, sync::mpsc};
use satrs::{
pus::EcssTmSender,
queue::GenericSendError,
spacepackets::ecss::WritablePusPacket,
tmtc::{PacketAsVec, PacketSenderRaw, PacketSenderWithSharedPool, StoreAndSendError},
ComponentId,
};
#[derive(Default, Debug, Clone)]
pub struct MockSender(pub RefCell<VecDeque<PacketAsVec>>);
#[allow(dead_code)]
#[derive(Debug, Clone)]
pub enum TmTcSender {
Static(PacketSenderWithSharedPool),
Heap(mpsc::SyncSender<PacketAsVec>),
Mock(MockSender),
}
impl TmTcSender {
#[allow(dead_code)]
pub fn get_mock_sender(&mut self) -> Option<&mut MockSender> {
match self {
TmTcSender::Mock(sender) => Some(sender),
_ => None,
}
}
}
impl EcssTmSender for TmTcSender {
fn send_tm(
&self,
sender_id: satrs::ComponentId,
tm: satrs::pus::PusTmVariant,
) -> Result<(), satrs::pus::EcssTmtcError> {
match self {
TmTcSender::Static(sync_sender) => sync_sender.send_tm(sender_id, tm),
TmTcSender::Heap(sync_sender) => match tm {
satrs::pus::PusTmVariant::InStore(_) => panic!("can not send TM in store"),
satrs::pus::PusTmVariant::Direct(pus_tm_creator) => sync_sender
.send(PacketAsVec::new(sender_id, pus_tm_creator.to_vec()?))
.map_err(|_| GenericSendError::RxDisconnected.into()),
},
TmTcSender::Mock(_) => Ok(()),
}
}
}
impl PacketSenderRaw for TmTcSender {
type Error = StoreAndSendError;
fn send_packet(&self, sender_id: ComponentId, packet: &[u8]) -> Result<(), Self::Error> {
match self {
TmTcSender::Static(packet_sender_with_shared_pool) => {
packet_sender_with_shared_pool.send_packet(sender_id, packet)
}
TmTcSender::Heap(sync_sender) => sync_sender
.send_packet(sender_id, packet)
.map_err(StoreAndSendError::Send),
TmTcSender::Mock(sender) => sender.send_packet(sender_id, packet),
}
}
}
impl PacketSenderRaw for MockSender {
type Error = StoreAndSendError;
fn send_packet(&self, sender_id: ComponentId, tc_raw: &[u8]) -> Result<(), Self::Error> {
let mut mut_queue = self.0.borrow_mut();
mut_queue.push_back(PacketAsVec::new(sender_id, tc_raw.to_vec()));
Ok(())
}
}

View File

@ -477,27 +477,6 @@ pub mod std_mod {
}
}
}
#[derive(Debug, Clone)]
pub enum TcSender {
StaticStore(PacketSenderWithSharedPool),
HeapStore(mpsc::SyncSender<PacketAsVec>),
}
impl PacketSenderRaw for TcSender {
type Error = StoreAndSendError;
fn send_packet(&self, sender_id: ComponentId, packet: &[u8]) -> Result<(), Self::Error> {
match self {
TcSender::StaticStore(packet_sender_with_shared_pool) => {
packet_sender_with_shared_pool.send_packet(sender_id, packet)
}
TcSender::HeapStore(sync_sender) => sync_sender
.send_packet(sender_id, packet)
.map_err(StoreAndSendError::Send),
}
}
}
}
#[cfg(test)]