continue device handler
All checks were successful
Rust/sat-rs/pipeline/pr-main This commit looks good

This commit is contained in:
Robin Müller 2024-04-03 12:35:30 +02:00
parent 708050e5bd
commit 3e55fde3b4
Signed by: muellerr
GPG Key ID: A649FB78196E3849
3 changed files with 134 additions and 88 deletions

View File

@ -1,7 +1,6 @@
// TODO: Remove this at a later stage.
#![allow(dead_code)]
use derive_new::new; use derive_new::new;
use satrs::hk::HkRequestVariant; use satrs::hk::{HkRequest, HkRequestVariant};
use satrs::queue::{GenericSendError, GenericTargetedMessagingError};
use satrs::spacepackets::ecss::hk; use satrs::spacepackets::ecss::hk;
use satrs::spacepackets::ecss::tm::{PusTmCreator, PusTmSecondaryHeader}; use satrs::spacepackets::ecss::tm::{PusTmCreator, PusTmSecondaryHeader};
use satrs::spacepackets::SpHeader; use satrs::spacepackets::SpHeader;
@ -23,16 +22,23 @@ use serde::{Deserialize, Serialize};
pub trait SpiInterface { pub trait SpiInterface {
type Error; type Error;
fn transfer(&mut self, data: &mut [u8]) -> Result<(), Self::Error>; fn transfer(&mut self, tx: &[u8], rx: &mut [u8]) -> Result<(), Self::Error>;
} }
#[derive(Default)] #[derive(Default)]
pub struct SpiDummyInterface {} pub struct SpiDummyInterface {
dummy_val_0: i16,
dummy_val_1: i16,
dummy_val_2: i16,
}
impl SpiInterface for SpiDummyInterface { impl SpiInterface for SpiDummyInterface {
type Error = (); type Error = ();
fn transfer(&mut self, _data: &mut [u8]) -> Result<(), Self::Error> { fn transfer(&mut self, _tx: &[u8], rx: &mut [u8]) -> Result<(), Self::Error> {
rx[0..2].copy_from_slice(&self.dummy_val_0.to_be_bytes());
rx[2..4].copy_from_slice(&self.dummy_val_1.to_be_bytes());
rx[4..6].copy_from_slice(&self.dummy_val_2.to_be_bytes());
Ok(()) Ok(())
} }
} }
@ -64,81 +70,106 @@ pub struct MgmHandler<ComInterface: SpiInterface, TmSender: EcssTmSenderCore> {
#[new(value = "ModeAndSubmode::new(satrs_example::DeviceMode::Off as u32, 0)")] #[new(value = "ModeAndSubmode::new(satrs_example::DeviceMode::Off as u32, 0)")]
mode: ModeAndSubmode, mode: ModeAndSubmode,
#[new(default)] #[new(default)]
tx_buf: [u8; 12],
#[new(default)]
rx_buf: [u8; 12],
#[new(default)]
stamp_helper: TimeStampHelper, stamp_helper: TimeStampHelper,
} }
impl<ComInterface: SpiInterface, TmSender: EcssTmSenderCore> MgmHandler<ComInterface, TmSender> { impl<ComInterface: SpiInterface, TmSender: EcssTmSenderCore> MgmHandler<ComInterface, TmSender> {
pub fn periodic_operation(&mut self) { pub fn periodic_operation(&mut self) {
self.stamp_helper.update_from_now(); self.stamp_helper.update_from_now();
// Handle messages. // Handle requests.
// TODO: Put this in loop/seprate function. self.handle_composite_requests();
match self.composite_request_receiver.try_recv() { self.handle_mode_requests();
Ok(ref msg) => match &msg.message { // Communicate with the device.
CompositeRequest::Hk(hk_req) => match hk_req.variant { let result = self.com_interface.transfer(&self.tx_buf, &mut self.rx_buf);
HkRequestVariant::OneShot => { assert!(result.is_ok());
self.hk_reply_sender // TODO: Convert the raw data back to floats.
.send(GenericMessage::new( }
msg.requestor_info,
HkReply::new(hk_req.unique_id, HkReplyVariant::Ack),
))
.expect("failed to send HK reply");
let mut sp_header = SpHeader::tm_unseg(self.id.apid, 0, 0).unwrap();
let sec_header = PusTmSecondaryHeader::new(
3,
hk::Subservice::TmHkPacket as u8,
0,
0,
Some(self.stamp_helper.stamp()),
);
// Let's serialize it as JSON for now.. This is a lot simpler than binary
// serialization.
let mgm_data_serialized =
serde_json::to_vec(&*self.shared_mgm_set.lock().unwrap()).unwrap();
let hk_tm = PusTmCreator::new(
&mut sp_header,
sec_header,
&mgm_data_serialized,
true,
);
self.tm_sender
.send_tm(self.id.id(), PusTmVariant::Direct(hk_tm))
.expect("failed to send HK TM");
}
HkRequestVariant::EnablePeriodic => todo!(),
HkRequestVariant::DisablePeriodic => todo!(),
HkRequestVariant::ModifyCollectionInterval(_) => todo!(),
},
// TODO: This object does not have actions (yet).. Still send back completion failure
// reply.
CompositeRequest::Action(_action_req) => {}
},
Err(e) => { pub fn handle_composite_requests(&mut self) {
if e != mpsc::TryRecvError::Empty { loop {
log::warn!( match self.composite_request_receiver.try_recv() {
"{}: failed to receive composite request: {:?}", Ok(ref msg) => match &msg.message {
self.dev_str, CompositeRequest::Hk(hk_request) => {
e self.handle_hk_request(&msg.requestor_info, hk_request)
); }
// TODO: This object does not have actions (yet).. Still send back completion failure
// reply.
CompositeRequest::Action(_action_req) => {}
},
Err(e) => {
if e != mpsc::TryRecvError::Empty {
log::warn!(
"{}: failed to receive composite request: {:?}",
self.dev_str,
e
);
} else {
break;
}
} }
} }
} }
// TODO: Put this in loop and only allow one set mode request per cycle? }
match self.mode_interface.request_rx.try_recv() {
Ok(msg) => { pub fn handle_hk_request(&mut self, requestor_info: &MessageMetadata, hk_request: &HkRequest) {
let result = self.handle_mode_request(msg); match hk_request.variant {
// TODO: Trigger event? HkRequestVariant::OneShot => {
if result.is_err() { self.hk_reply_sender
log::warn!( .send(GenericMessage::new(
"{}: mode request failed with error {:?}", *requestor_info,
self.dev_str, HkReply::new(hk_request.unique_id, HkReplyVariant::Ack),
result.err().unwrap() ))
); .expect("failed to send HK reply");
} let mut sp_header = SpHeader::tm_unseg(self.id.apid, 0, 0).unwrap();
let sec_header = PusTmSecondaryHeader::new(
3,
hk::Subservice::TmHkPacket as u8,
0,
0,
Some(self.stamp_helper.stamp()),
);
// Let's serialize it as JSON for now.. This is a lot simpler than binary
// serialization.
let mgm_data_serialized =
serde_json::to_vec(&*self.shared_mgm_set.lock().unwrap()).unwrap();
let hk_tm =
PusTmCreator::new(&mut sp_header, sec_header, &mgm_data_serialized, true);
self.tm_sender
.send_tm(self.id.id(), PusTmVariant::Direct(hk_tm))
.expect("failed to send HK TM");
} }
Err(e) => { HkRequestVariant::EnablePeriodic => todo!(),
if e != mpsc::TryRecvError::Empty { HkRequestVariant::DisablePeriodic => todo!(),
log::warn!("{}: failed to receive mode request: {:?}", self.dev_str, e); HkRequestVariant::ModifyCollectionInterval(_) => todo!(),
}
}
pub fn handle_mode_requests(&mut self) {
loop {
// TODO: Only allow one set mode request per cycle?
match self.mode_interface.request_rx.try_recv() {
Ok(msg) => {
let result = self.handle_mode_request(msg);
// TODO: Trigger event?
if result.is_err() {
log::warn!(
"{}: mode request failed with error {:?}",
self.dev_str,
result.err().unwrap()
);
}
}
Err(e) => {
if e != mpsc::TryRecvError::Empty {
log::warn!("{}: failed to receive mode request: {:?}", self.dev_str, e);
} else {
break;
}
} }
} }
} }
@ -167,19 +198,24 @@ impl<ComInterface: SpiInterface, TmSender: EcssTmSenderCore> ModeRequestHandler
Ok(()) Ok(())
} }
fn announce_mode(&self, _requestor_info: MessageMetadata, _recursive: bool) { fn announce_mode(&self, _requestor_info: Option<MessageMetadata>, _recursive: bool) {
log::info!("{} announcing mode: {:?}", self.dev_str, self.mode); log::info!("{} announcing mode: {:?}", self.dev_str, self.mode);
// TODO: Trigger event.
} }
fn handle_mode_reached( fn handle_mode_reached(
&mut self, &mut self,
requestor: Option<MessageMetadata>, requestor: Option<MessageMetadata>,
) -> Result<(), Self::Error> { ) -> Result<(), Self::Error> {
self.announce_mode(requestor, false);
if let Some(requestor) = requestor { if let Some(requestor) = requestor {
if requestor.sender_id() == PUS_MODE_SERVICE.raw() { if requestor.sender_id() != PUS_MODE_SERVICE.id() {
// self.mode_reply_sender_to_pus.send( log::warn!(
//GenericMessage::new(requestor.request_id, requestor.sender_id, ModeReply::ModeReply(self.mode)) "can not send back mode reply to sender {}",
// )?; requestor.sender_id()
);
} else {
self.send_mode_reply(requestor, ModeReply::ModeReply(self.mode_and_submode()))?;
} }
} }
Ok(()) Ok(())
@ -187,9 +223,19 @@ impl<ComInterface: SpiInterface, TmSender: EcssTmSenderCore> ModeRequestHandler
fn send_mode_reply( fn send_mode_reply(
&self, &self,
_requestor: MessageMetadata, requestor: MessageMetadata,
_reply: ModeReply, reply: ModeReply,
) -> Result<(), Self::Error> { ) -> Result<(), Self::Error> {
if requestor.sender_id() != PUS_MODE_SERVICE.id() {
log::warn!(
"can not send back mode reply to sender {}",
requestor.sender_id()
);
}
self.mode_interface
.reply_tx_to_pus
.send(GenericMessage::new(requestor, reply))
.map_err(|_| GenericTargetedMessagingError::Send(GenericSendError::RxDisconnected))?;
Ok(()) Ok(())
} }

View File

@ -190,7 +190,7 @@ pub trait ModeRequestHandler: ModeProvider {
mode_and_submode: ModeAndSubmode, mode_and_submode: ModeAndSubmode,
) -> Result<(), Self::Error>; ) -> Result<(), Self::Error>;
fn announce_mode(&self, requestor_info: MessageMetadata, recursive: bool); fn announce_mode(&self, requestor_info: Option<MessageMetadata>, recursive: bool);
fn handle_mode_reached( fn handle_mode_reached(
&mut self, &mut self,
@ -222,11 +222,11 @@ pub trait ModeRequestHandler: ModeProvider {
ModeReply::ModeReply(self.mode_and_submode()), ModeReply::ModeReply(self.mode_and_submode()),
), ),
ModeRequest::AnnounceMode => { ModeRequest::AnnounceMode => {
self.announce_mode(request.requestor_info, false); self.announce_mode(Some(request.requestor_info), false);
Ok(()) Ok(())
} }
ModeRequest::AnnounceModeRecursive => { ModeRequest::AnnounceModeRecursive => {
self.announce_mode(request.requestor_info, true); self.announce_mode(Some(request.requestor_info), true);
Ok(()) Ok(())
} }
ModeRequest::ModeInfo(info) => self.handle_mode_info(request.requestor_info, info), ModeRequest::ModeInfo(info) => self.handle_mode_info(request.requestor_info, info),

View File

@ -79,7 +79,7 @@ impl ModeRequestHandler for TestDevice {
Ok(()) Ok(())
} }
fn announce_mode(&self, _requestor_info: MessageMetadata, _recursive: bool) { fn announce_mode(&self, _requestor_info: Option<MessageMetadata>, _recursive: bool) {
println!( println!(
"{}: announcing mode: {:?}", "{}: announcing mode: {:?}",
self.name, self.mode_and_submode self.name, self.mode_and_submode
@ -150,9 +150,11 @@ impl TestAssembly {
ModeReply::ModeReply(self.mode_and_submode), ModeReply::ModeReply(self.mode_and_submode),
) )
.unwrap(), .unwrap(),
ModeRequest::AnnounceMode => self.announce_mode(request.requestor_info, false), ModeRequest::AnnounceMode => {
self.announce_mode(Some(request.requestor_info), false)
}
ModeRequest::AnnounceModeRecursive => { ModeRequest::AnnounceModeRecursive => {
self.announce_mode(request.requestor_info, true) self.announce_mode(Some(request.requestor_info), true)
} }
ModeRequest::ModeInfo(_) => todo!(), ModeRequest::ModeInfo(_) => todo!(),
} }
@ -197,7 +199,7 @@ impl ModeRequestHandler for TestAssembly {
Ok(()) Ok(())
} }
fn announce_mode(&self, requestor_info: MessageMetadata, recursive: bool) { fn announce_mode(&self, requestor_info: Option<MessageMetadata>, recursive: bool) {
println!( println!(
"TestAssembly: Announcing mode (recursively: {}): {:?}", "TestAssembly: Announcing mode (recursively: {}): {:?}",
recursive, self.mode_and_submode recursive, self.mode_and_submode
@ -207,6 +209,7 @@ impl ModeRequestHandler for TestAssembly {
if recursive { if recursive {
mode_request = ModeRequest::AnnounceModeRecursive; mode_request = ModeRequest::AnnounceModeRecursive;
} }
let request_id = requestor_info.map_or(0, |info| info.request_id());
self.mode_node self.mode_node
.request_sender_map .request_sender_map
.0 .0
@ -214,10 +217,7 @@ impl ModeRequestHandler for TestAssembly {
.for_each(|(_, sender)| { .for_each(|(_, sender)| {
sender sender
.send(GenericMessage::new( .send(GenericMessage::new(
MessageMetadata::new( MessageMetadata::new(request_id, self.mode_node.local_channel_id_generic()),
requestor_info.request_id(),
self.mode_node.local_channel_id_generic(),
),
mode_request, mode_request,
)) ))
.expect("sending mode request failed"); .expect("sending mode request failed");