Request and Reply Messaging Abstraction and first Consumers #136

Closed
muellerr wants to merge 95 commits from request-reply-messaging-mode-tree into main
4 changed files with 76 additions and 46 deletions
Showing only changes of commit fde8ab895c - Show all commits

View File

@ -14,9 +14,10 @@ use satrs::{
verification::{TcStateStarted, VerificationReportingProvider, VerificationToken}, verification::{TcStateStarted, VerificationReportingProvider, VerificationToken},
EcssTmSender, EcssTmSender,
}, },
spacepackets::time::cds::{self, TimeProvider}, ComponentId, spacepackets::time::cds::{self, TimeProvider},
ComponentId,
}; };
use satrs_example::config::{PUS_APID, ComponentIdList}; use satrs_example::config::{ComponentIdList, PUS_APID};
use crate::update_time; use crate::update_time;
@ -47,8 +48,11 @@ impl<VerificationReporter: VerificationReportingProvider> PusEventHandler<Verifi
let event_reporter = EventReporter::new(PUS_APID, 128).unwrap(); let event_reporter = EventReporter::new(PUS_APID, 128).unwrap();
let pus_event_dispatcher = let pus_event_dispatcher =
DefaultPusEventU32Dispatcher::new_with_default_backend(event_reporter); DefaultPusEventU32Dispatcher::new_with_default_backend(event_reporter);
let pus_event_man_send_provider = let pus_event_man_send_provider = EventU32SenderMpscBounded::new(
EventU32SenderMpscBounded::new(ComponentIdList::EventManagement as ComponentId, pus_event_man_tx, event_queue_cap); ComponentIdList::EventManagement as ComponentId,
pus_event_man_tx,
event_queue_cap,
);
event_manager.subscribe_all(pus_event_man_send_provider.target_id()); event_manager.subscribe_all(pus_event_man_send_provider.target_id());
event_manager.add_sender(pus_event_man_send_provider); event_manager.add_sender(pus_event_man_send_provider);

View File

@ -178,26 +178,26 @@ mod tests {
request::GenericMessage, request::GenericMessage,
}; };
const TEST_CHANNEL_ID_0: u32 = 5; const TEST_COMPONENT_ID_0: u64 = 5;
const TEST_CHANNEL_ID_1: u32 = 6; const TEST_COMPONENT_ID_1: u64 = 6;
const TEST_CHANNEL_ID_2: u32 = 7; const TEST_COMPONENT_ID_2: u64 = 7;
#[test] #[test]
fn test_simple_mode_requestor() { fn test_simple_mode_requestor() {
let (reply_sender, reply_receiver) = mpsc::channel(); let (reply_sender, reply_receiver) = mpsc::channel();
let (request_sender, request_receiver) = mpsc::channel(); let (request_sender, request_receiver) = mpsc::channel();
let mut mode_requestor = ModeRequestorMpsc::new(TEST_CHANNEL_ID_0, reply_receiver); let mut mode_requestor = ModeRequestorMpsc::new(TEST_COMPONENT_ID_0, reply_receiver);
mode_requestor.add_message_target(TEST_CHANNEL_ID_1, request_sender); mode_requestor.add_message_target(TEST_COMPONENT_ID_1, request_sender);
// Send a request and verify it arrives at the receiver. // Send a request and verify it arrives at the receiver.
let request_id = 2; let request_id = 2;
let sent_request = ModeRequest::ReadMode; let sent_request = ModeRequest::ReadMode;
mode_requestor mode_requestor
.send_mode_request(request_id, TEST_CHANNEL_ID_1, sent_request) .send_mode_request(request_id, TEST_COMPONENT_ID_1, sent_request)
.expect("send failed"); .expect("send failed");
let request = request_receiver.recv().expect("recv failed"); let request = request_receiver.recv().expect("recv failed");
assert_eq!(request.request_id, 2); assert_eq!(request.request_id, 2);
assert_eq!(request.sender_id, TEST_CHANNEL_ID_0); assert_eq!(request.sender_id, TEST_COMPONENT_ID_0);
assert_eq!(request.message, sent_request); assert_eq!(request.message, sent_request);
// Send a reply and verify it arrives at the requestor. // Send a reply and verify it arrives at the requestor.
@ -205,14 +205,14 @@ mod tests {
reply_sender reply_sender
.send(GenericMessage::new( .send(GenericMessage::new(
request_id, request_id,
TEST_CHANNEL_ID_1, TEST_COMPONENT_ID_1,
mode_reply, mode_reply,
)) ))
.expect("send failed"); .expect("send failed");
let reply = mode_requestor.try_recv_mode_reply().expect("recv failed"); let reply = mode_requestor.try_recv_mode_reply().expect("recv failed");
assert!(reply.is_some()); assert!(reply.is_some());
let reply = reply.unwrap(); let reply = reply.unwrap();
assert_eq!(reply.sender_id, TEST_CHANNEL_ID_1); assert_eq!(reply.sender_id, TEST_COMPONENT_ID_1);
assert_eq!(reply.request_id, 2); assert_eq!(reply.request_id, 2);
assert_eq!(reply.message, mode_reply); assert_eq!(reply.message, mode_reply);
} }
@ -225,32 +225,35 @@ mod tests {
let (request_sender_to_channel_1, request_receiver_channel_1) = mpsc::channel(); let (request_sender_to_channel_1, request_receiver_channel_1) = mpsc::channel();
//let (reply_sender_to_channel_2, reply_receiver_channel_2) = mpsc::channel(); //let (reply_sender_to_channel_2, reply_receiver_channel_2) = mpsc::channel();
let mut mode_connector = ModeRequestorAndHandlerMpsc::new( let mut mode_connector = ModeRequestorAndHandlerMpsc::new(
TEST_CHANNEL_ID_0, TEST_COMPONENT_ID_0,
request_receiver_of_connector, request_receiver_of_connector,
reply_receiver_of_connector, reply_receiver_of_connector,
); );
assert_eq!( assert_eq!(
ModeRequestSender::local_channel_id(&mode_connector), ModeRequestSender::local_channel_id(&mode_connector),
TEST_CHANNEL_ID_0 TEST_COMPONENT_ID_0
); );
assert_eq!( assert_eq!(
ModeReplySender::local_channel_id(&mode_connector), ModeReplySender::local_channel_id(&mode_connector),
TEST_CHANNEL_ID_0 TEST_COMPONENT_ID_0
);
assert_eq!(
mode_connector.local_channel_id_generic(),
TEST_COMPONENT_ID_0
); );
assert_eq!(mode_connector.local_channel_id_generic(), TEST_CHANNEL_ID_0);
mode_connector.add_request_target(TEST_CHANNEL_ID_1, request_sender_to_channel_1); mode_connector.add_request_target(TEST_COMPONENT_ID_1, request_sender_to_channel_1);
// Send a request and verify it arrives at the receiver. // Send a request and verify it arrives at the receiver.
let request_id = 2; let request_id = 2;
let sent_request = ModeRequest::ReadMode; let sent_request = ModeRequest::ReadMode;
mode_connector mode_connector
.send_mode_request(request_id, TEST_CHANNEL_ID_1, sent_request) .send_mode_request(request_id, TEST_COMPONENT_ID_1, sent_request)
.expect("send failed"); .expect("send failed");
let request = request_receiver_channel_1.recv().expect("recv failed"); let request = request_receiver_channel_1.recv().expect("recv failed");
assert_eq!(request.request_id, 2); assert_eq!(request.request_id, 2);
assert_eq!(request.sender_id, TEST_CHANNEL_ID_0); assert_eq!(request.sender_id, TEST_COMPONENT_ID_0);
assert_eq!(request.message, ModeRequest::ReadMode); assert_eq!(request.message, ModeRequest::ReadMode);
} }
@ -261,21 +264,21 @@ mod tests {
let (reply_sender_to_channel_2, reply_receiver_channel_2) = mpsc::channel(); let (reply_sender_to_channel_2, reply_receiver_channel_2) = mpsc::channel();
let mut mode_connector = ModeRequestorAndHandlerMpsc::new( let mut mode_connector = ModeRequestorAndHandlerMpsc::new(
TEST_CHANNEL_ID_0, TEST_COMPONENT_ID_0,
request_receiver_of_connector, request_receiver_of_connector,
reply_receiver_of_connector, reply_receiver_of_connector,
); );
mode_connector.add_reply_target(TEST_CHANNEL_ID_2, reply_sender_to_channel_2); mode_connector.add_reply_target(TEST_COMPONENT_ID_2, reply_sender_to_channel_2);
// Send a request and verify it arrives at the receiver. // Send a request and verify it arrives at the receiver.
let request_id = 2; let request_id = 2;
let sent_reply = ModeReply::ModeInfo(ModeAndSubmode::new(3, 5)); let sent_reply = ModeReply::ModeInfo(ModeAndSubmode::new(3, 5));
mode_connector mode_connector
.send_mode_reply(request_id, TEST_CHANNEL_ID_2, sent_reply) .send_mode_reply(request_id, TEST_COMPONENT_ID_2, sent_reply)
.expect("send failed"); .expect("send failed");
let reply = reply_receiver_channel_2.recv().expect("recv failed"); let reply = reply_receiver_channel_2.recv().expect("recv failed");
assert_eq!(reply.request_id, 2); assert_eq!(reply.request_id, 2);
assert_eq!(reply.sender_id, TEST_CHANNEL_ID_0); assert_eq!(reply.sender_id, TEST_COMPONENT_ID_0);
assert_eq!(reply.message, sent_reply); assert_eq!(reply.message, sent_reply);
} }

View File

@ -7,7 +7,6 @@ use satrs::mode::{
ModeRequestHandlerMpscBounded, ModeRequestReceiver, ModeRequestorAndHandlerMpscBounded, ModeRequestHandlerMpscBounded, ModeRequestReceiver, ModeRequestorAndHandlerMpscBounded,
ModeRequestorBoundedMpsc, ModeRequestorBoundedMpsc,
}; };
use satrs::pus::action::ActionRequestWithId;
use satrs::request::RequestId; use satrs::request::RequestId;
use satrs::{ use satrs::{
mode::{ModeAndSubmode, ModeReply, ModeRequest}, mode::{ModeAndSubmode, ModeReply, ModeRequest},
@ -17,7 +16,7 @@ use satrs::{
}; };
use std::string::{String, ToString}; use std::string::{String, ToString};
pub enum TestChannelId { pub enum TestComponentId {
Device1 = 1, Device1 = 1,
Device2 = 2, Device2 = 2,
Assembly = 3, Assembly = 3,
@ -34,7 +33,7 @@ impl PusModeService {
self.mode_node self.mode_node
.send_mode_request( .send_mode_request(
self.request_id_counter.get(), self.request_id_counter.get(),
TestChannelId::Assembly as u32, TestComponentId::Assembly as ComponentId,
ModeRequest::AnnounceModeRecursive, ModeRequest::AnnounceModeRecursive,
) )
.unwrap(); .unwrap();
@ -48,7 +47,7 @@ struct TestDevice {
pub mode_node: ModeRequestHandlerMpscBounded, pub mode_node: ModeRequestHandlerMpscBounded,
pub mode_and_submode: ModeAndSubmode, pub mode_and_submode: ModeAndSubmode,
pub mode_requestor_info: Option<(RequestId, ComponentId)>, pub mode_requestor_info: Option<(RequestId, ComponentId)>,
pub action_queue: mpsc::Receiver<GenericMessage<ActionRequest>>, // pub action_queue: mpsc::Receiver<GenericMessage<ActionRequest>>,
} }
pub struct ModeLeafDeviceHelper {} pub struct ModeLeafDeviceHelper {}
@ -249,45 +248,69 @@ fn main() {
// Mode requestors and handlers. // Mode requestors and handlers.
let mut mode_node_assy = ModeRequestorAndHandlerMpscBounded::new( let mut mode_node_assy = ModeRequestorAndHandlerMpscBounded::new(
TestChannelId::Assembly as u32, TestComponentId::Assembly as ComponentId,
request_receiver_assy, request_receiver_assy,
reply_receiver_assy, reply_receiver_assy,
); );
// Mode requestors only. // Mode requestors only.
let mut mode_node_pus = let mut mode_node_pus = ModeRequestorBoundedMpsc::new(
ModeRequestorBoundedMpsc::new(TestChannelId::PusModeService as u32, reply_receiver_pus); TestComponentId::PusModeService as ComponentId,
reply_receiver_pus,
);
// Request handlers only. // Request handlers only.
let mut mode_node_dev1 = let mut mode_node_dev1 = ModeRequestHandlerMpscBounded::new(
ModeRequestHandlerMpscBounded::new(TestChannelId::Device1 as u32, request_receiver_dev1); TestComponentId::Device1 as ComponentId,
let mut mode_node_dev2 = request_receiver_dev1,
ModeRequestHandlerMpscBounded::new(TestChannelId::Device2 as u32, request_receiver_dev2); );
let mut mode_node_dev2 = ModeRequestHandlerMpscBounded::new(
TestComponentId::Device2 as ComponentId,
request_receiver_dev2,
);
// Set up mode request senders first. // Set up mode request senders first.
mode_node_pus.add_message_target(TestChannelId::Assembly as u32, request_sender_to_assy);
mode_node_pus.add_message_target( mode_node_pus.add_message_target(
TestChannelId::Device1 as u32, TestComponentId::Assembly as ComponentId,
request_sender_to_assy,
);
mode_node_pus.add_message_target(
TestComponentId::Device1 as ComponentId,
request_sender_to_dev1.clone(), request_sender_to_dev1.clone(),
); );
mode_node_pus.add_message_target( mode_node_pus.add_message_target(
TestChannelId::Device2 as u32, TestComponentId::Device2 as ComponentId,
request_sender_to_dev2.clone(), request_sender_to_dev2.clone(),
); );
mode_node_assy.add_request_target(TestChannelId::Device1 as u32, request_sender_to_dev1); mode_node_assy.add_request_target(
mode_node_assy.add_request_target(TestChannelId::Device2 as u32, request_sender_to_dev2); TestComponentId::Device1 as ComponentId,
request_sender_to_dev1,
);
mode_node_assy.add_request_target(
TestComponentId::Device2 as ComponentId,
request_sender_to_dev2,
);
// Set up mode reply senders. // Set up mode reply senders.
mode_node_dev1.add_message_target(TestChannelId::Assembly as u32, reply_sender_to_assy.clone());
mode_node_dev1.add_message_target( mode_node_dev1.add_message_target(
TestChannelId::PusModeService as u32, TestComponentId::Assembly as ComponentId,
reply_sender_to_assy.clone(),
);
mode_node_dev1.add_message_target(
TestComponentId::PusModeService as ComponentId,
reply_sender_to_pus.clone(), reply_sender_to_pus.clone(),
); );
mode_node_dev2.add_message_target(TestChannelId::Assembly as u32, reply_sender_to_assy);
mode_node_dev2.add_message_target( mode_node_dev2.add_message_target(
TestChannelId::PusModeService as u32, TestComponentId::Assembly as ComponentId,
reply_sender_to_assy,
);
mode_node_dev2.add_message_target(
TestComponentId::PusModeService as ComponentId,
reply_sender_to_pus.clone(), reply_sender_to_pus.clone(),
); );
mode_node_assy.add_reply_target(TestChannelId::PusModeService as u32, reply_sender_to_pus); mode_node_assy.add_reply_target(
TestComponentId::PusModeService as ComponentId,
reply_sender_to_pus,
);
let mut device1 = TestDevice { let mut device1 = TestDevice {
name: "Test Device 1".to_string(), name: "Test Device 1".to_string(),

View File

@ -30,7 +30,7 @@ fn test_threaded_usage() {
let (pus_event_man_tx, pus_event_man_rx) = mpsc::channel(); let (pus_event_man_tx, pus_event_man_rx) = mpsc::channel();
let pus_event_man_send_provider = EventU32SenderMpsc::new(1, pus_event_man_tx); let pus_event_man_send_provider = EventU32SenderMpsc::new(1, pus_event_man_tx);
event_man.subscribe_all(pus_event_man_send_provider.channel_id()); event_man.subscribe_all(pus_event_man_send_provider.target_id());
event_man.add_sender(pus_event_man_send_provider); event_man.add_sender(pus_event_man_send_provider);
let (event_tx, event_rx) = mpsc::channel(); let (event_tx, event_rx) = mpsc::channel();
let reporter = EventReporter::new(0x02, 128).expect("Creating event reporter failed"); let reporter = EventReporter::new(0x02, 128).expect("Creating event reporter failed");