Integration of the mini simulator into the sat-rs example
Some checks are pending
Rust/sat-rs/pipeline/pr-main Build started...

This commit is contained in:
2024-06-03 15:18:23 +02:00
parent a4c433a7be
commit 29167736db
42 changed files with 3578 additions and 697 deletions

View File

@ -1,52 +1,126 @@
use derive_new::new;
use satrs::hk::{HkRequest, HkRequestVariant};
use satrs::power::{PowerSwitchInfo, PowerSwitcherCommandSender};
use satrs::queue::{GenericSendError, GenericTargetedMessagingError};
use satrs::spacepackets::ecss::hk;
use satrs::spacepackets::ecss::tm::{PusTmCreator, PusTmSecondaryHeader};
use satrs::spacepackets::SpHeader;
use satrs_example::{DeviceMode, TimeStampHelper};
use satrs_example::{DeviceMode, TimestampHelper};
use satrs_minisim::acs::lis3mdl::{
MgmLis3MdlReply, MgmLis3RawValues, FIELD_LSB_PER_GAUSS_4_SENS, GAUSS_TO_MICROTESLA_FACTOR,
};
use satrs_minisim::acs::MgmRequestLis3Mdl;
use satrs_minisim::eps::PcduSwitch;
use satrs_minisim::{SerializableSimMsgPayload, SimReply, SimRequest};
use std::fmt::Debug;
use std::sync::mpsc::{self};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use satrs::mode::{
ModeAndSubmode, ModeError, ModeProvider, ModeReply, ModeRequest, ModeRequestHandler,
};
use satrs::pus::{EcssTmSender, PusTmVariant};
use satrs::request::{GenericMessage, MessageMetadata, UniqueApidTargetId};
use satrs_example::config::components::PUS_MODE_SERVICE;
use satrs_example::config::components::{NO_SENDER, PUS_MODE_SERVICE};
use crate::hk::PusHkHelper;
use crate::pus::hk::{HkReply, HkReplyVariant};
use crate::requests::CompositeRequest;
use serde::{Deserialize, Serialize};
const GAUSS_TO_MICROTESLA_FACTOR: f32 = 100.0;
// This is the selected resoltion for the STM LIS3MDL device for the 4 Gauss sensitivity setting.
const FIELD_LSB_PER_GAUSS_4_SENS: f32 = 1.0 / 6842.0;
pub const NR_OF_DATA_AND_CFG_REGISTERS: usize = 14;
// Register adresses to access various bytes from the raw reply.
pub const X_LOWBYTE_IDX: usize = 9;
pub const Y_LOWBYTE_IDX: usize = 11;
pub const Z_LOWBYTE_IDX: usize = 13;
#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
#[repr(u32)]
pub enum SetId {
SensorData = 0,
}
#[derive(Default, Debug, PartialEq, Eq)]
pub enum TransitionState {
#[default]
Idle,
PowerSwitching,
Done,
}
pub trait SpiInterface {
type Error;
type Error: Debug;
fn transfer(&mut self, tx: &[u8], rx: &mut [u8]) -> Result<(), Self::Error>;
}
#[derive(Default)]
pub struct SpiDummyInterface {
pub dummy_val_0: i16,
pub dummy_val_1: i16,
pub dummy_val_2: i16,
pub dummy_values: MgmLis3RawValues,
}
impl SpiInterface for SpiDummyInterface {
type Error = ();
fn transfer(&mut self, _tx: &[u8], rx: &mut [u8]) -> Result<(), Self::Error> {
rx[0..2].copy_from_slice(&self.dummy_val_0.to_be_bytes());
rx[2..4].copy_from_slice(&self.dummy_val_1.to_be_bytes());
rx[4..6].copy_from_slice(&self.dummy_val_2.to_be_bytes());
rx[X_LOWBYTE_IDX..X_LOWBYTE_IDX + 2].copy_from_slice(&self.dummy_values.x.to_le_bytes());
rx[Y_LOWBYTE_IDX..Y_LOWBYTE_IDX + 2].copy_from_slice(&self.dummy_values.y.to_be_bytes());
rx[Z_LOWBYTE_IDX..Z_LOWBYTE_IDX + 2].copy_from_slice(&self.dummy_values.z.to_be_bytes());
Ok(())
}
}
pub struct SpiSimInterface {
pub sim_request_tx: mpsc::Sender<SimRequest>,
pub sim_reply_rx: mpsc::Receiver<SimReply>,
}
impl SpiInterface for SpiSimInterface {
type Error = ();
// Right now, we only support requesting sensor data and not configuration of the sensor.
fn transfer(&mut self, _tx: &[u8], rx: &mut [u8]) -> Result<(), Self::Error> {
let mgm_sensor_request = MgmRequestLis3Mdl::RequestSensorData;
if let Err(e) = self
.sim_request_tx
.send(SimRequest::new_with_epoch_time(mgm_sensor_request))
{
log::error!("failed to send MGM LIS3 request: {}", e);
}
match self.sim_reply_rx.recv_timeout(Duration::from_millis(50)) {
Ok(sim_reply) => {
let sim_reply_lis3 = MgmLis3MdlReply::from_sim_message(&sim_reply)
.expect("failed to parse LIS3 reply");
rx[X_LOWBYTE_IDX..X_LOWBYTE_IDX + 2]
.copy_from_slice(&sim_reply_lis3.raw.x.to_le_bytes());
rx[Y_LOWBYTE_IDX..Y_LOWBYTE_IDX + 2]
.copy_from_slice(&sim_reply_lis3.raw.y.to_le_bytes());
rx[Z_LOWBYTE_IDX..Z_LOWBYTE_IDX + 2]
.copy_from_slice(&sim_reply_lis3.raw.z.to_le_bytes());
}
Err(e) => {
log::warn!("MGM LIS3 SIM reply timeout: {}", e);
}
}
Ok(())
}
}
pub enum SpiSimInterfaceWrapper {
Dummy(SpiDummyInterface),
Sim(SpiSimInterface),
}
impl SpiInterface for SpiSimInterfaceWrapper {
type Error = ();
fn transfer(&mut self, tx: &[u8], rx: &mut [u8]) -> Result<(), Self::Error> {
match self {
SpiSimInterfaceWrapper::Dummy(dummy) => dummy.transfer(tx, rx),
SpiSimInterfaceWrapper::Sim(sim_if) => sim_if.transfer(tx, rx),
}
}
}
#[derive(Default, Debug, Copy, Clone, Serialize, Deserialize)]
pub struct MgmData {
pub valid: bool,
@ -57,61 +131,85 @@ pub struct MgmData {
pub struct MpscModeLeafInterface {
pub request_rx: mpsc::Receiver<GenericMessage<ModeRequest>>,
pub reply_tx_to_pus: mpsc::Sender<GenericMessage<ModeReply>>,
pub reply_tx_to_parent: mpsc::Sender<GenericMessage<ModeReply>>,
pub reply_to_pus_tx: mpsc::Sender<GenericMessage<ModeReply>>,
pub reply_to_parent_tx: mpsc::SyncSender<GenericMessage<ModeReply>>,
}
#[derive(Default)]
pub struct BufWrapper {
tx_buf: [u8; 32],
rx_buf: [u8; 32],
tm_buf: [u8; 32],
}
pub struct ModeHelpers {
current: ModeAndSubmode,
target: Option<ModeAndSubmode>,
requestor_info: Option<MessageMetadata>,
transition_state: TransitionState,
}
impl Default for ModeHelpers {
fn default() -> Self {
Self {
current: ModeAndSubmode::new(DeviceMode::Off as u32, 0),
target: Default::default(),
requestor_info: Default::default(),
transition_state: Default::default(),
}
}
}
/// Example MGM device handler strongly based on the LIS3MDL MEMS device.
#[derive(new)]
#[allow(clippy::too_many_arguments)]
pub struct MgmHandlerLis3Mdl<ComInterface: SpiInterface, TmSender: EcssTmSender> {
pub struct MgmHandlerLis3Mdl<
ComInterface: SpiInterface,
TmSender: EcssTmSender,
SwitchHelper: PowerSwitchInfo<PcduSwitch> + PowerSwitcherCommandSender<PcduSwitch>,
> {
id: UniqueApidTargetId,
dev_str: &'static str,
mode_interface: MpscModeLeafInterface,
composite_request_receiver: mpsc::Receiver<GenericMessage<CompositeRequest>>,
hk_reply_sender: mpsc::Sender<GenericMessage<HkReply>>,
composite_request_rx: mpsc::Receiver<GenericMessage<CompositeRequest>>,
hk_reply_tx: mpsc::Sender<GenericMessage<HkReply>>,
switch_helper: SwitchHelper,
tm_sender: TmSender,
com_interface: ComInterface,
pub com_interface: ComInterface,
shared_mgm_set: Arc<Mutex<MgmData>>,
#[new(value = "ModeAndSubmode::new(satrs_example::DeviceMode::Off as u32, 0)")]
mode_and_submode: ModeAndSubmode,
#[new(value = "PusHkHelper::new(id)")]
hk_helper: PusHkHelper,
#[new(default)]
tx_buf: [u8; 12],
mode_helpers: ModeHelpers,
#[new(default)]
rx_buf: [u8; 12],
bufs: BufWrapper,
#[new(default)]
tm_buf: [u8; 16],
#[new(default)]
stamp_helper: TimeStampHelper,
stamp_helper: TimestampHelper,
}
impl<ComInterface: SpiInterface, TmSender: EcssTmSender> MgmHandlerLis3Mdl<ComInterface, TmSender> {
impl<
ComInterface: SpiInterface,
TmSender: EcssTmSender,
SwitchHelper: PowerSwitchInfo<PcduSwitch> + PowerSwitcherCommandSender<PcduSwitch>,
> MgmHandlerLis3Mdl<ComInterface, TmSender, SwitchHelper>
{
pub fn periodic_operation(&mut self) {
self.stamp_helper.update_from_now();
// Handle requests.
self.handle_composite_requests();
self.handle_mode_requests();
if let Some(target_mode_submode) = self.mode_helpers.target {
self.handle_mode_transition(target_mode_submode);
}
if self.mode() == DeviceMode::Normal as u32 {
log::trace!("polling LIS3MDL sensor {}", self.dev_str);
// Communicate with the device.
let result = self.com_interface.transfer(&self.tx_buf, &mut self.rx_buf);
assert!(result.is_ok());
// Actual data begins on the second byte, similarly to how a lot of SPI devices behave.
let x_raw = i16::from_be_bytes(self.rx_buf[1..3].try_into().unwrap());
let y_raw = i16::from_be_bytes(self.rx_buf[3..5].try_into().unwrap());
let z_raw = i16::from_be_bytes(self.rx_buf[5..7].try_into().unwrap());
// Simple scaling to retrieve the float value, assuming a sensor resolution of
let mut mgm_guard = self.shared_mgm_set.lock().unwrap();
mgm_guard.x = x_raw as f32 * GAUSS_TO_MICROTESLA_FACTOR * FIELD_LSB_PER_GAUSS_4_SENS;
mgm_guard.y = y_raw as f32 * GAUSS_TO_MICROTESLA_FACTOR * FIELD_LSB_PER_GAUSS_4_SENS;
mgm_guard.z = z_raw as f32 * GAUSS_TO_MICROTESLA_FACTOR * FIELD_LSB_PER_GAUSS_4_SENS;
drop(mgm_guard);
self.poll_sensor();
}
}
pub fn handle_composite_requests(&mut self) {
loop {
match self.composite_request_receiver.try_recv() {
match self.composite_request_rx.try_recv() {
Ok(ref msg) => match &msg.message {
CompositeRequest::Hk(hk_request) => {
self.handle_hk_request(&msg.requestor_info, hk_request)
@ -139,34 +237,33 @@ impl<ComInterface: SpiInterface, TmSender: EcssTmSender> MgmHandlerLis3Mdl<ComIn
pub fn handle_hk_request(&mut self, requestor_info: &MessageMetadata, hk_request: &HkRequest) {
match hk_request.variant {
HkRequestVariant::OneShot => {
self.hk_reply_sender
.send(GenericMessage::new(
*requestor_info,
HkReply::new(hk_request.unique_id, HkReplyVariant::Ack),
))
.expect("failed to send HK reply");
let sec_header = PusTmSecondaryHeader::new(
3,
hk::Subservice::TmHkPacket as u8,
0,
0,
self.stamp_helper.stamp(),
);
let mgm_snapshot = *self.shared_mgm_set.lock().unwrap();
// Use binary serialization here. We want the data to be tightly packed.
self.tm_buf[0] = mgm_snapshot.valid as u8;
self.tm_buf[1..5].copy_from_slice(&mgm_snapshot.x.to_be_bytes());
self.tm_buf[5..9].copy_from_slice(&mgm_snapshot.y.to_be_bytes());
self.tm_buf[9..13].copy_from_slice(&mgm_snapshot.z.to_be_bytes());
let hk_tm = PusTmCreator::new(
SpHeader::new_from_apid(self.id.apid),
sec_header,
&self.tm_buf[0..12],
true,
);
self.tm_sender
.send_tm(self.id.id(), PusTmVariant::Direct(hk_tm))
.expect("failed to send HK TM");
if let Ok(hk_tm) = self.hk_helper.generate_hk_report_packet(
self.stamp_helper.stamp(),
SetId::SensorData as u32,
&mut |hk_buf| {
hk_buf[0] = mgm_snapshot.valid as u8;
hk_buf[1..5].copy_from_slice(&mgm_snapshot.x.to_be_bytes());
hk_buf[5..9].copy_from_slice(&mgm_snapshot.y.to_be_bytes());
hk_buf[9..13].copy_from_slice(&mgm_snapshot.z.to_be_bytes());
Ok(13)
},
&mut self.bufs.tm_buf,
) {
// TODO: If sending the TM fails, we should also send a failure reply.
self.tm_sender
.send_tm(self.id.id(), PusTmVariant::Direct(hk_tm))
.expect("failed to send HK TM");
self.hk_reply_tx
.send(GenericMessage::new(
*requestor_info,
HkReply::new(hk_request.unique_id, HkReplyVariant::Ack),
))
.expect("failed to send HK reply");
} else {
// TODO: Send back failure reply. Need result code for this.
log::error!("TM buffer too small to generate HK data");
}
}
HkRequestVariant::EnablePeriodic => todo!(),
HkRequestVariant::DisablePeriodic => todo!(),
@ -199,20 +296,91 @@ impl<ComInterface: SpiInterface, TmSender: EcssTmSender> MgmHandlerLis3Mdl<ComIn
}
}
}
}
impl<ComInterface: SpiInterface, TmSender: EcssTmSender> ModeProvider
for MgmHandlerLis3Mdl<ComInterface, TmSender>
{
fn mode_and_submode(&self) -> ModeAndSubmode {
self.mode_and_submode
pub fn poll_sensor(&mut self) {
// Communicate with the device. This is actually how to read the data from the LIS3 device
// SPI interface.
self.com_interface
.transfer(
&self.bufs.tx_buf[0..NR_OF_DATA_AND_CFG_REGISTERS + 1],
&mut self.bufs.rx_buf[0..NR_OF_DATA_AND_CFG_REGISTERS + 1],
)
.expect("failed to transfer data");
let x_raw = i16::from_le_bytes(
self.bufs.rx_buf[X_LOWBYTE_IDX..X_LOWBYTE_IDX + 2]
.try_into()
.unwrap(),
);
let y_raw = i16::from_le_bytes(
self.bufs.rx_buf[Y_LOWBYTE_IDX..Y_LOWBYTE_IDX + 2]
.try_into()
.unwrap(),
);
let z_raw = i16::from_le_bytes(
self.bufs.rx_buf[Z_LOWBYTE_IDX..Z_LOWBYTE_IDX + 2]
.try_into()
.unwrap(),
);
// Simple scaling to retrieve the float value, assuming the best sensor resolution.
let mut mgm_guard = self.shared_mgm_set.lock().unwrap();
mgm_guard.x = x_raw as f32 * GAUSS_TO_MICROTESLA_FACTOR as f32 * FIELD_LSB_PER_GAUSS_4_SENS;
mgm_guard.y = y_raw as f32 * GAUSS_TO_MICROTESLA_FACTOR as f32 * FIELD_LSB_PER_GAUSS_4_SENS;
mgm_guard.z = z_raw as f32 * GAUSS_TO_MICROTESLA_FACTOR as f32 * FIELD_LSB_PER_GAUSS_4_SENS;
mgm_guard.valid = true;
drop(mgm_guard);
}
pub fn handle_mode_transition(&mut self, target_mode_submode: ModeAndSubmode) {
if target_mode_submode.mode() == DeviceMode::On as u32
|| target_mode_submode.mode() == DeviceMode::Normal as u32
{
if self.mode_helpers.transition_state == TransitionState::Idle {
let result = self
.switch_helper
.send_switch_on_cmd(MessageMetadata::new(0, self.id.id()), PcduSwitch::Mgm);
if result.is_err() {
// Could not send switch command.. still continue with transition.
log::error!("failed to send switch on command");
}
self.mode_helpers.transition_state = TransitionState::PowerSwitching;
}
if self.mode_helpers.transition_state == TransitionState::PowerSwitching
&& self
.switch_helper
.is_switch_on(PcduSwitch::Mgm)
.expect("switch info error")
{
self.mode_helpers.transition_state = TransitionState::Done;
}
if self.mode_helpers.transition_state == TransitionState::Done {
self.mode_helpers.current = self.mode_helpers.target.unwrap();
self.handle_mode_reached(self.mode_helpers.requestor_info)
.expect("failed to handle mode reached");
self.mode_helpers.transition_state = TransitionState::Idle;
}
}
}
}
impl<ComInterface: SpiInterface, TmSender: EcssTmSender> ModeRequestHandler
for MgmHandlerLis3Mdl<ComInterface, TmSender>
impl<
ComInterface: SpiInterface,
TmSender: EcssTmSender,
SwitchHelper: PowerSwitchInfo<PcduSwitch> + PowerSwitcherCommandSender<PcduSwitch>,
> ModeProvider for MgmHandlerLis3Mdl<ComInterface, TmSender, SwitchHelper>
{
fn mode_and_submode(&self) -> ModeAndSubmode {
self.mode_helpers.current
}
}
impl<
ComInterface: SpiInterface,
TmSender: EcssTmSender,
SwitchHelper: PowerSwitchInfo<PcduSwitch> + PowerSwitcherCommandSender<PcduSwitch>,
> ModeRequestHandler for MgmHandlerLis3Mdl<ComInterface, TmSender, SwitchHelper>
{
type Error = ModeError;
fn start_transition(
&mut self,
requestor: MessageMetadata,
@ -223,8 +391,18 @@ impl<ComInterface: SpiInterface, TmSender: EcssTmSender> ModeRequestHandler
self.dev_str,
mode_and_submode
);
self.mode_and_submode = mode_and_submode;
self.handle_mode_reached(Some(requestor))?;
self.mode_helpers.current = mode_and_submode;
if mode_and_submode.mode() == DeviceMode::Off as u32 {
self.shared_mgm_set.lock().unwrap().valid = false;
self.handle_mode_reached(Some(requestor))?;
} else if mode_and_submode.mode() == DeviceMode::Normal as u32
|| mode_and_submode.mode() == DeviceMode::On as u32
{
// TODO: Write helper method for the struct? Might help for other handlers as well..
self.mode_helpers.transition_state = TransitionState::Idle;
self.mode_helpers.requestor_info = Some(requestor);
self.mode_helpers.target = Some(mode_and_submode);
}
Ok(())
}
@ -232,7 +410,7 @@ impl<ComInterface: SpiInterface, TmSender: EcssTmSender> ModeRequestHandler
log::info!(
"{} announcing mode: {:?}",
self.dev_str,
self.mode_and_submode
self.mode_and_submode()
);
}
@ -240,11 +418,15 @@ impl<ComInterface: SpiInterface, TmSender: EcssTmSender> ModeRequestHandler
&mut self,
requestor: Option<MessageMetadata>,
) -> Result<(), Self::Error> {
self.mode_helpers.target = None;
self.announce_mode(requestor, false);
if let Some(requestor) = requestor {
if requestor.sender_id() == NO_SENDER {
return Ok(());
}
if requestor.sender_id() != PUS_MODE_SERVICE.id() {
log::warn!(
"can not send back mode reply to sender {}",
"can not send back mode reply to sender {:x}",
requestor.sender_id()
);
} else {
@ -266,7 +448,7 @@ impl<ComInterface: SpiInterface, TmSender: EcssTmSender> ModeRequestHandler
);
}
self.mode_interface
.reply_tx_to_pus
.reply_to_pus_tx
.send(GenericMessage::new(requestor, reply))
.map_err(|_| GenericTargetedMessagingError::Send(GenericSendError::RxDisconnected))?;
Ok(())
@ -280,3 +462,193 @@ impl<ComInterface: SpiInterface, TmSender: EcssTmSender> ModeRequestHandler
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::sync::{mpsc, Arc};
use satrs::{
mode::{ModeReply, ModeRequest},
power::SwitchStateBinary,
request::{GenericMessage, UniqueApidTargetId},
tmtc::PacketAsVec,
};
use satrs_example::config::components::Apid;
use satrs_minisim::acs::lis3mdl::MgmLis3RawValues;
use crate::{eps::TestSwitchHelper, pus::hk::HkReply, requests::CompositeRequest};
use super::*;
#[derive(Default)]
pub struct TestSpiInterface {
pub call_count: u32,
pub next_mgm_data: MgmLis3RawValues,
}
impl SpiInterface for TestSpiInterface {
type Error = ();
fn transfer(&mut self, _tx: &[u8], rx: &mut [u8]) -> Result<(), Self::Error> {
rx[X_LOWBYTE_IDX..X_LOWBYTE_IDX + 2]
.copy_from_slice(&self.next_mgm_data.x.to_le_bytes());
rx[Y_LOWBYTE_IDX..Y_LOWBYTE_IDX + 2]
.copy_from_slice(&self.next_mgm_data.y.to_le_bytes());
rx[Z_LOWBYTE_IDX..Z_LOWBYTE_IDX + 2]
.copy_from_slice(&self.next_mgm_data.z.to_le_bytes());
self.call_count += 1;
Ok(())
}
}
pub struct MgmTestbench {
pub mode_request_tx: mpsc::Sender<GenericMessage<ModeRequest>>,
pub mode_reply_rx_to_pus: mpsc::Receiver<GenericMessage<ModeReply>>,
pub mode_reply_rx_to_parent: mpsc::Receiver<GenericMessage<ModeReply>>,
pub composite_request_tx: mpsc::Sender<GenericMessage<CompositeRequest>>,
pub hk_reply_rx: mpsc::Receiver<GenericMessage<HkReply>>,
pub tm_rx: mpsc::Receiver<PacketAsVec>,
pub handler:
MgmHandlerLis3Mdl<TestSpiInterface, mpsc::Sender<PacketAsVec>, TestSwitchHelper>,
}
impl MgmTestbench {
pub fn new() -> Self {
let (request_tx, request_rx) = mpsc::channel();
let (reply_tx_to_pus, reply_rx_to_pus) = mpsc::channel();
let (reply_tx_to_parent, reply_rx_to_parent) = mpsc::sync_channel(5);
let mode_interface = MpscModeLeafInterface {
request_rx,
reply_to_pus_tx: reply_tx_to_pus,
reply_to_parent_tx: reply_tx_to_parent,
};
let (composite_request_tx, composite_request_rx) = mpsc::channel();
let (hk_reply_tx, hk_reply_rx) = mpsc::channel();
let (tm_tx, tm_rx) = mpsc::channel::<PacketAsVec>();
let shared_mgm_set = Arc::default();
Self {
mode_request_tx: request_tx,
mode_reply_rx_to_pus: reply_rx_to_pus,
mode_reply_rx_to_parent: reply_rx_to_parent,
composite_request_tx,
tm_rx,
hk_reply_rx,
handler: MgmHandlerLis3Mdl::new(
UniqueApidTargetId::new(Apid::Acs as u16, 1),
"TEST_MGM",
mode_interface,
composite_request_rx,
hk_reply_tx,
TestSwitchHelper::default(),
tm_tx,
TestSpiInterface::default(),
shared_mgm_set,
),
}
}
}
#[test]
fn test_basic_handler() {
let mut testbench = MgmTestbench::new();
assert_eq!(testbench.handler.com_interface.call_count, 0);
assert_eq!(
testbench.handler.mode_and_submode().mode(),
DeviceMode::Off as u32
);
assert_eq!(testbench.handler.mode_and_submode().submode(), 0_u16);
testbench.handler.periodic_operation();
// Handler is OFF, no changes expected.
assert_eq!(testbench.handler.com_interface.call_count, 0);
assert_eq!(
testbench.handler.mode_and_submode().mode(),
DeviceMode::Off as u32
);
assert_eq!(testbench.handler.mode_and_submode().submode(), 0_u16);
}
#[test]
fn test_normal_handler() {
let mut testbench = MgmTestbench::new();
testbench
.mode_request_tx
.send(GenericMessage::new(
MessageMetadata::new(0, PUS_MODE_SERVICE.id()),
ModeRequest::SetMode(ModeAndSubmode::new(DeviceMode::Normal as u32, 0)),
))
.expect("failed to send mode request");
testbench.handler.periodic_operation();
assert_eq!(
testbench.handler.mode_and_submode().mode(),
DeviceMode::Normal as u32
);
assert_eq!(testbench.handler.mode_and_submode().submode(), 0);
// Verify power switch handling.
let mut switch_requests = testbench.handler.switch_helper.switch_requests.borrow_mut();
assert_eq!(switch_requests.len(), 1);
let switch_req = switch_requests.pop_front().expect("no switch request");
assert_eq!(switch_req.target_state, SwitchStateBinary::On);
assert_eq!(switch_req.switch_id, PcduSwitch::Mgm);
let mut switch_info_requests = testbench
.handler
.switch_helper
.switch_info_requests
.borrow_mut();
assert_eq!(switch_info_requests.len(), 1);
let switch_info_req = switch_info_requests.pop_front().expect("no switch request");
assert_eq!(switch_info_req, PcduSwitch::Mgm);
let mode_reply = testbench
.mode_reply_rx_to_pus
.try_recv()
.expect("no mode reply generated");
match mode_reply.message {
ModeReply::ModeReply(mode) => {
assert_eq!(mode.mode(), DeviceMode::Normal as u32);
assert_eq!(mode.submode(), 0);
}
_ => panic!("unexpected mode reply"),
}
// The device should have been polled once.
assert_eq!(testbench.handler.com_interface.call_count, 1);
let mgm_set = *testbench.handler.shared_mgm_set.lock().unwrap();
assert!(mgm_set.x < 0.001);
assert!(mgm_set.y < 0.001);
assert!(mgm_set.z < 0.001);
assert!(mgm_set.valid);
}
#[test]
fn test_normal_handler_mgm_set_conversion() {
let mut testbench = MgmTestbench::new();
let raw_values = MgmLis3RawValues {
x: 1000,
y: -1000,
z: 1000,
};
testbench.handler.com_interface.next_mgm_data = raw_values;
testbench
.mode_request_tx
.send(GenericMessage::new(
MessageMetadata::new(0, PUS_MODE_SERVICE.id()),
ModeRequest::SetMode(ModeAndSubmode::new(DeviceMode::Normal as u32, 0)),
))
.expect("failed to send mode request");
testbench.handler.periodic_operation();
let mgm_set = *testbench.handler.shared_mgm_set.lock().unwrap();
let expected_x =
raw_values.x as f32 * GAUSS_TO_MICROTESLA_FACTOR as f32 * FIELD_LSB_PER_GAUSS_4_SENS;
let expected_y =
raw_values.y as f32 * GAUSS_TO_MICROTESLA_FACTOR as f32 * FIELD_LSB_PER_GAUSS_4_SENS;
let expected_z =
raw_values.z as f32 * GAUSS_TO_MICROTESLA_FACTOR as f32 * FIELD_LSB_PER_GAUSS_4_SENS;
let x_diff = (mgm_set.x - expected_x).abs();
let y_diff = (mgm_set.y - expected_y).abs();
let z_diff = (mgm_set.z - expected_z).abs();
assert!(x_diff < 0.001, "x diff too large: {}", x_diff);
assert!(y_diff < 0.001, "y diff too large: {}", y_diff);
assert!(z_diff < 0.001, "z diff too large: {}", z_diff);
assert!(mgm_set.valid);
}
}

View File

@ -122,7 +122,7 @@ pub mod mode_err {
}
pub mod components {
use satrs::request::UniqueApidTargetId;
use satrs::{request::UniqueApidTargetId, ComponentId};
use strum::EnumIter;
#[derive(Copy, Clone, PartialEq, Eq, EnumIter)]
@ -132,6 +132,7 @@ pub mod components {
Acs = 3,
Cfdp = 4,
Tmtc = 5,
Eps = 6,
}
// Component IDs for components with the PUS APID.
@ -150,6 +151,11 @@ pub mod components {
Mgm0 = 0,
}
#[derive(Copy, Clone, PartialEq, Eq)]
pub enum EpsId {
Pcdu = 0,
}
#[derive(Copy, Clone, PartialEq, Eq)]
pub enum TmtcId {
UdpServer = 0,
@ -172,10 +178,13 @@ pub mod components {
UniqueApidTargetId::new(Apid::Sched as u16, 0);
pub const MGM_HANDLER_0: UniqueApidTargetId =
UniqueApidTargetId::new(Apid::Acs as u16, AcsId::Mgm0 as u32);
pub const PCDU_HANDLER: UniqueApidTargetId =
UniqueApidTargetId::new(Apid::Eps as u16, EpsId::Pcdu as u32);
pub const UDP_SERVER: UniqueApidTargetId =
UniqueApidTargetId::new(Apid::Tmtc as u16, TmtcId::UdpServer as u32);
pub const TCP_SERVER: UniqueApidTargetId =
UniqueApidTargetId::new(Apid::Tmtc as u16, TmtcId::TcpServer as u32);
pub const NO_SENDER: ComponentId = ComponentId::MAX;
}
pub mod pool {
@ -224,7 +233,7 @@ pub mod pool {
pub mod tasks {
pub const FREQ_MS_UDP_TMTC: u64 = 200;
pub const FREQ_MS_EVENT_HANDLING: u64 = 400;
pub const FREQ_MS_AOCS: u64 = 500;
pub const FREQ_MS_PUS_STACK: u64 = 200;
pub const SIM_CLIENT_IDLE_DELAY_MS: u64 = 5;
}

View File

@ -0,0 +1,195 @@
use derive_new::new;
use std::{cell::RefCell, collections::VecDeque, sync::mpsc, time::Duration};
use satrs::{
power::{
PowerSwitchInfo, PowerSwitcherCommandSender, SwitchRequest, SwitchState, SwitchStateBinary,
},
queue::GenericSendError,
request::{GenericMessage, MessageMetadata},
};
use satrs_minisim::eps::{PcduSwitch, SwitchMapWrapper};
use thiserror::Error;
use self::pcdu::SharedSwitchSet;
pub mod pcdu;
#[derive(new, Clone)]
pub struct PowerSwitchHelper {
switcher_tx: mpsc::SyncSender<GenericMessage<SwitchRequest>>,
shared_switch_set: SharedSwitchSet,
}
#[derive(Debug, Error, Copy, Clone, PartialEq, Eq)]
pub enum SwitchCommandingError {
#[error("send error: {0}")]
Send(#[from] GenericSendError),
}
#[derive(Debug, Error, Copy, Clone, PartialEq, Eq)]
pub enum SwitchInfoError {
/// This is a configuration error which should not occur.
#[error("switch ID not in map")]
SwitchIdNotInMap(PcduSwitch),
#[error("switch set invalid")]
SwitchSetInvalid,
}
impl PowerSwitchInfo<PcduSwitch> for PowerSwitchHelper {
type Error = SwitchInfoError;
fn switch_state(
&self,
switch_id: PcduSwitch,
) -> Result<satrs::power::SwitchState, Self::Error> {
let switch_set = self
.shared_switch_set
.lock()
.expect("failed to lock switch set");
if !switch_set.valid {
return Err(SwitchInfoError::SwitchSetInvalid);
}
if let Some(state) = switch_set.switch_map.get(&switch_id) {
return Ok(*state);
}
Err(SwitchInfoError::SwitchIdNotInMap(switch_id))
}
fn switch_delay_ms(&self) -> Duration {
// Here, we could set device specific switch delays theoretically. Set it to this value
// for now.
Duration::from_millis(1000)
}
}
impl PowerSwitcherCommandSender<PcduSwitch> for PowerSwitchHelper {
type Error = SwitchCommandingError;
fn send_switch_on_cmd(
&self,
requestor_info: satrs::request::MessageMetadata,
switch_id: PcduSwitch,
) -> Result<(), Self::Error> {
self.switcher_tx
.send_switch_on_cmd(requestor_info, switch_id)?;
Ok(())
}
fn send_switch_off_cmd(
&self,
requestor_info: satrs::request::MessageMetadata,
switch_id: PcduSwitch,
) -> Result<(), Self::Error> {
self.switcher_tx
.send_switch_off_cmd(requestor_info, switch_id)?;
Ok(())
}
}
#[derive(new)]
pub struct SwitchRequestInfo {
pub requestor_info: MessageMetadata,
pub switch_id: PcduSwitch,
pub target_state: satrs::power::SwitchStateBinary,
}
// Test switch helper which can be used for unittests.
pub struct TestSwitchHelper {
pub switch_requests: RefCell<VecDeque<SwitchRequestInfo>>,
pub switch_info_requests: RefCell<VecDeque<PcduSwitch>>,
pub switch_delay_request_count: u32,
pub next_switch_delay: Duration,
pub switch_map: RefCell<SwitchMapWrapper>,
pub switch_map_valid: bool,
}
impl Default for TestSwitchHelper {
fn default() -> Self {
Self {
switch_requests: Default::default(),
switch_info_requests: Default::default(),
switch_delay_request_count: Default::default(),
next_switch_delay: Duration::from_millis(1000),
switch_map: Default::default(),
switch_map_valid: true,
}
}
}
impl PowerSwitchInfo<PcduSwitch> for TestSwitchHelper {
type Error = SwitchInfoError;
fn switch_state(
&self,
switch_id: PcduSwitch,
) -> Result<satrs::power::SwitchState, Self::Error> {
let mut switch_info_requests_mut = self.switch_info_requests.borrow_mut();
switch_info_requests_mut.push_back(switch_id);
if !self.switch_map_valid {
return Err(SwitchInfoError::SwitchSetInvalid);
}
let switch_map_mut = self.switch_map.borrow_mut();
if let Some(state) = switch_map_mut.0.get(&switch_id) {
return Ok(*state);
}
Err(SwitchInfoError::SwitchIdNotInMap(switch_id))
}
fn switch_delay_ms(&self) -> Duration {
self.next_switch_delay
}
}
impl PowerSwitcherCommandSender<PcduSwitch> for TestSwitchHelper {
type Error = SwitchCommandingError;
fn send_switch_on_cmd(
&self,
requestor_info: MessageMetadata,
switch_id: PcduSwitch,
) -> Result<(), Self::Error> {
let mut switch_requests_mut = self.switch_requests.borrow_mut();
switch_requests_mut.push_back(SwitchRequestInfo {
requestor_info,
switch_id,
target_state: SwitchStateBinary::On,
});
// By default, the test helper immediately acknowledges the switch request by setting
// the appropriate switch state in the internal switch map.
let mut switch_map_mut = self.switch_map.borrow_mut();
if let Some(switch_state) = switch_map_mut.0.get_mut(&switch_id) {
*switch_state = SwitchState::On;
}
Ok(())
}
fn send_switch_off_cmd(
&self,
requestor_info: MessageMetadata,
switch_id: PcduSwitch,
) -> Result<(), Self::Error> {
let mut switch_requests_mut = self.switch_requests.borrow_mut();
switch_requests_mut.push_back(SwitchRequestInfo {
requestor_info,
switch_id,
target_state: SwitchStateBinary::Off,
});
// By default, the test helper immediately acknowledges the switch request by setting
// the appropriate switch state in the internal switch map.
let mut switch_map_mut = self.switch_map.borrow_mut();
if let Some(switch_state) = switch_map_mut.0.get_mut(&switch_id) {
*switch_state = SwitchState::Off;
}
Ok(())
}
}
#[allow(dead_code)]
impl TestSwitchHelper {
// Helper function which can be used to force a switch to another state for test purposes.
pub fn set_switch_state(&mut self, switch: PcduSwitch, state: SwitchState) {
self.switch_map.get_mut().0.insert(switch, state);
}
}

View File

@ -0,0 +1,722 @@
use std::{
cell::RefCell,
collections::VecDeque,
sync::{mpsc, Arc, Mutex},
};
use derive_new::new;
use num_enum::{IntoPrimitive, TryFromPrimitive};
use satrs::{
hk::{HkRequest, HkRequestVariant},
mode::{ModeAndSubmode, ModeError, ModeProvider, ModeReply, ModeRequestHandler},
power::SwitchRequest,
pus::{EcssTmSender, PusTmVariant},
queue::{GenericSendError, GenericTargetedMessagingError},
request::{GenericMessage, MessageMetadata, UniqueApidTargetId},
spacepackets::ByteConversionError,
};
use satrs_example::{
config::components::{NO_SENDER, PUS_MODE_SERVICE},
DeviceMode, TimestampHelper,
};
use satrs_minisim::{
eps::{
PcduReply, PcduRequest, PcduSwitch, SwitchMap, SwitchMapBinaryWrapper, SwitchMapWrapper,
},
SerializableSimMsgPayload, SimReply, SimRequest,
};
use serde::{Deserialize, Serialize};
use crate::{
acs::mgm::MpscModeLeafInterface,
hk::PusHkHelper,
pus::hk::{HkReply, HkReplyVariant},
requests::CompositeRequest,
};
pub trait SerialInterface {
type Error: core::fmt::Debug;
/// Send some data via the serial interface.
fn send(&self, data: &[u8]) -> Result<(), Self::Error>;
/// Receive all replies received on the serial interface so far. This function takes a closure
/// and call its for each received packet, passing the received packet into it.
fn try_recv_replies<ReplyHandler: FnMut(&[u8])>(
&self,
f: ReplyHandler,
) -> Result<(), Self::Error>;
}
#[derive(new)]
pub struct SerialInterfaceToSim {
pub sim_request_tx: mpsc::Sender<SimRequest>,
pub sim_reply_rx: mpsc::Receiver<SimReply>,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq, TryFromPrimitive, IntoPrimitive)]
#[repr(u32)]
pub enum SetId {
SwitcherSet = 0,
}
impl SerialInterface for SerialInterfaceToSim {
type Error = ();
fn send(&self, data: &[u8]) -> Result<(), Self::Error> {
let request: PcduRequest = serde_json::from_slice(data).expect("expected a PCDU request");
self.sim_request_tx
.send(SimRequest::new_with_epoch_time(request))
.expect("failed to send request to simulation");
Ok(())
}
fn try_recv_replies<ReplyHandler: FnMut(&[u8])>(
&self,
mut f: ReplyHandler,
) -> Result<(), Self::Error> {
loop {
match self.sim_reply_rx.try_recv() {
Ok(reply) => {
let reply = serde_json::to_string(&reply).unwrap();
f(reply.as_bytes());
}
Err(e) => match e {
mpsc::TryRecvError::Empty => break,
mpsc::TryRecvError::Disconnected => {
log::warn!("sim reply sender has disconnected");
break;
}
},
}
}
Ok(())
}
}
#[derive(Default)]
pub struct SerialInterfaceDummy {
// Need interior mutability here for both fields.
pub switch_map: RefCell<SwitchMapBinaryWrapper>,
pub reply_deque: RefCell<VecDeque<SimReply>>,
}
impl SerialInterface for SerialInterfaceDummy {
type Error = ();
fn send(&self, data: &[u8]) -> Result<(), Self::Error> {
let pcdu_req: PcduRequest = serde_json::from_slice(data).unwrap();
let switch_map_mut = &mut self.switch_map.borrow_mut().0;
match pcdu_req {
PcduRequest::SwitchDevice { switch, state } => {
match switch_map_mut.entry(switch) {
std::collections::hash_map::Entry::Occupied(mut val) => {
*val.get_mut() = state;
}
std::collections::hash_map::Entry::Vacant(vacant) => {
vacant.insert(state);
}
};
}
PcduRequest::RequestSwitchInfo => {
let mut reply_deque_mut = self.reply_deque.borrow_mut();
reply_deque_mut.push_back(SimReply::new(&PcduReply::SwitchInfo(
switch_map_mut.clone(),
)));
}
};
Ok(())
}
fn try_recv_replies<ReplyHandler: FnMut(&[u8])>(
&self,
mut f: ReplyHandler,
) -> Result<(), Self::Error> {
if self.reply_queue_empty() {
return Ok(());
}
loop {
let reply = self.get_next_reply_as_string();
f(reply.as_bytes());
if self.reply_queue_empty() {
break;
}
}
Ok(())
}
}
impl SerialInterfaceDummy {
fn get_next_reply_as_string(&self) -> String {
let mut reply_deque_mut = self.reply_deque.borrow_mut();
let next_reply = reply_deque_mut.pop_front().unwrap();
serde_json::to_string(&next_reply).unwrap()
}
fn reply_queue_empty(&self) -> bool {
self.reply_deque.borrow().is_empty()
}
}
pub enum SerialSimInterfaceWrapper {
Dummy(SerialInterfaceDummy),
Sim(SerialInterfaceToSim),
}
impl SerialInterface for SerialSimInterfaceWrapper {
type Error = ();
fn send(&self, data: &[u8]) -> Result<(), Self::Error> {
match self {
SerialSimInterfaceWrapper::Dummy(dummy) => dummy.send(data),
SerialSimInterfaceWrapper::Sim(sim) => sim.send(data),
}
}
fn try_recv_replies<ReplyHandler: FnMut(&[u8])>(
&self,
f: ReplyHandler,
) -> Result<(), Self::Error> {
match self {
SerialSimInterfaceWrapper::Dummy(dummy) => dummy.try_recv_replies(f),
SerialSimInterfaceWrapper::Sim(sim) => sim.try_recv_replies(f),
}
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum OpCode {
RegularOp = 0,
PollAndRecvReplies = 1,
}
#[derive(Clone, PartialEq, Eq, Default, Serialize, Deserialize)]
pub struct SwitchSet {
pub valid: bool,
pub switch_map: SwitchMap,
}
pub type SharedSwitchSet = Arc<Mutex<SwitchSet>>;
/// Example PCDU device handler.
#[derive(new)]
#[allow(clippy::too_many_arguments)]
pub struct PcduHandler<ComInterface: SerialInterface, TmSender: EcssTmSender> {
id: UniqueApidTargetId,
dev_str: &'static str,
mode_interface: MpscModeLeafInterface,
composite_request_rx: mpsc::Receiver<GenericMessage<CompositeRequest>>,
hk_reply_tx: mpsc::Sender<GenericMessage<HkReply>>,
switch_request_rx: mpsc::Receiver<GenericMessage<SwitchRequest>>,
tm_sender: TmSender,
pub com_interface: ComInterface,
shared_switch_map: Arc<Mutex<SwitchSet>>,
#[new(value = "PusHkHelper::new(id)")]
hk_helper: PusHkHelper,
#[new(value = "ModeAndSubmode::new(satrs_example::DeviceMode::Off as u32, 0)")]
mode_and_submode: ModeAndSubmode,
#[new(default)]
stamp_helper: TimestampHelper,
#[new(value = "[0; 256]")]
tm_buf: [u8; 256],
}
impl<ComInterface: SerialInterface, TmSender: EcssTmSender> PcduHandler<ComInterface, TmSender> {
pub fn periodic_operation(&mut self, op_code: OpCode) {
match op_code {
OpCode::RegularOp => {
self.stamp_helper.update_from_now();
// Handle requests.
self.handle_composite_requests();
self.handle_mode_requests();
self.handle_switch_requests();
// Poll the switch states and/or telemetry regularly here.
if self.mode() == DeviceMode::Normal as u32 || self.mode() == DeviceMode::On as u32
{
self.handle_periodic_commands();
}
}
OpCode::PollAndRecvReplies => {
self.poll_and_handle_replies();
}
}
}
pub fn handle_composite_requests(&mut self) {
loop {
match self.composite_request_rx.try_recv() {
Ok(ref msg) => match &msg.message {
CompositeRequest::Hk(hk_request) => {
self.handle_hk_request(&msg.requestor_info, hk_request)
}
// TODO: This object does not have actions (yet).. Still send back completion failure
// reply.
CompositeRequest::Action(_action_req) => {}
},
Err(e) => {
if e != mpsc::TryRecvError::Empty {
log::warn!(
"{}: failed to receive composite request: {:?}",
self.dev_str,
e
);
} else {
break;
}
}
}
}
}
pub fn handle_hk_request(&mut self, requestor_info: &MessageMetadata, hk_request: &HkRequest) {
match hk_request.variant {
HkRequestVariant::OneShot => {
if hk_request.unique_id == SetId::SwitcherSet as u32 {
if let Ok(hk_tm) = self.hk_helper.generate_hk_report_packet(
self.stamp_helper.stamp(),
SetId::SwitcherSet as u32,
&mut |hk_buf| {
// Send TM down as JSON.
let switch_map_snapshot = self
.shared_switch_map
.lock()
.expect("failed to lock switch map")
.clone();
let switch_map_json = serde_json::to_string(&switch_map_snapshot)
.expect("failed to serialize switch map");
if switch_map_json.len() > hk_buf.len() {
log::error!("switch map JSON too large for HK buffer");
return Err(ByteConversionError::ToSliceTooSmall {
found: hk_buf.len(),
expected: switch_map_json.len(),
});
}
Ok(switch_map_json.len())
},
&mut self.tm_buf,
) {
self.tm_sender
.send_tm(self.id.id(), PusTmVariant::Direct(hk_tm))
.expect("failed to send HK TM");
self.hk_reply_tx
.send(GenericMessage::new(
*requestor_info,
HkReply::new(hk_request.unique_id, HkReplyVariant::Ack),
))
.expect("failed to send HK reply");
}
}
}
HkRequestVariant::EnablePeriodic => todo!(),
HkRequestVariant::DisablePeriodic => todo!(),
HkRequestVariant::ModifyCollectionInterval(_) => todo!(),
}
}
pub fn handle_periodic_commands(&self) {
let pcdu_req = PcduRequest::RequestSwitchInfo;
let pcdu_req_ser = serde_json::to_string(&pcdu_req).unwrap();
if let Err(_e) = self.com_interface.send(pcdu_req_ser.as_bytes()) {
log::warn!("polling PCDU switch info failed");
}
}
pub fn handle_mode_requests(&mut self) {
loop {
// TODO: Only allow one set mode request per cycle?
match self.mode_interface.request_rx.try_recv() {
Ok(msg) => {
let result = self.handle_mode_request(msg);
// TODO: Trigger event?
if result.is_err() {
log::warn!(
"{}: mode request failed with error {:?}",
self.dev_str,
result.err().unwrap()
);
}
}
Err(e) => {
if e != mpsc::TryRecvError::Empty {
log::warn!("{}: failed to receive mode request: {:?}", self.dev_str, e);
} else {
break;
}
}
}
}
}
pub fn handle_switch_requests(&mut self) {
loop {
match self.switch_request_rx.try_recv() {
Ok(switch_req) => match PcduSwitch::try_from(switch_req.message.switch_id()) {
Ok(pcdu_switch) => {
let pcdu_req = PcduRequest::SwitchDevice {
switch: pcdu_switch,
state: switch_req.message.target_state(),
};
let pcdu_req_ser = serde_json::to_string(&pcdu_req).unwrap();
self.com_interface
.send(pcdu_req_ser.as_bytes())
.expect("failed to send switch request to PCDU");
}
Err(e) => todo!("failed to convert switch ID {:?} to typed PCDU switch", e),
},
Err(e) => match e {
mpsc::TryRecvError::Empty => break,
mpsc::TryRecvError::Disconnected => {
log::warn!("switch request receiver has disconnected");
break;
}
},
};
}
}
pub fn poll_and_handle_replies(&mut self) {
if let Err(e) = self.com_interface.try_recv_replies(|reply| {
let sim_reply: SimReply = serde_json::from_slice(reply).expect("invalid reply format");
let pcdu_reply = PcduReply::from_sim_message(&sim_reply).expect("invalid reply format");
match pcdu_reply {
PcduReply::SwitchInfo(switch_info) => {
let switch_map_wrapper =
SwitchMapWrapper::from_binary_switch_map_ref(&switch_info);
let mut shared_switch_map = self
.shared_switch_map
.lock()
.expect("failed to lock switch map");
shared_switch_map.switch_map = switch_map_wrapper.0;
shared_switch_map.valid = true;
}
}
}) {
log::warn!("receiving PCDU replies failed: {:?}", e);
}
}
}
impl<ComInterface: SerialInterface, TmSender: EcssTmSender> ModeProvider
for PcduHandler<ComInterface, TmSender>
{
fn mode_and_submode(&self) -> ModeAndSubmode {
self.mode_and_submode
}
}
impl<ComInterface: SerialInterface, TmSender: EcssTmSender> ModeRequestHandler
for PcduHandler<ComInterface, TmSender>
{
type Error = ModeError;
fn start_transition(
&mut self,
requestor: MessageMetadata,
mode_and_submode: ModeAndSubmode,
) -> Result<(), satrs::mode::ModeError> {
log::info!(
"{}: transitioning to mode {:?}",
self.dev_str,
mode_and_submode
);
self.mode_and_submode = mode_and_submode;
if mode_and_submode.mode() == DeviceMode::Off as u32 {
self.shared_switch_map.lock().unwrap().valid = false;
}
self.handle_mode_reached(Some(requestor))?;
Ok(())
}
fn announce_mode(&self, _requestor_info: Option<MessageMetadata>, _recursive: bool) {
log::info!(
"{} announcing mode: {:?}",
self.dev_str,
self.mode_and_submode
);
}
fn handle_mode_reached(
&mut self,
requestor: Option<MessageMetadata>,
) -> Result<(), Self::Error> {
self.announce_mode(requestor, false);
if let Some(requestor) = requestor {
if requestor.sender_id() == NO_SENDER {
return Ok(());
}
if requestor.sender_id() != PUS_MODE_SERVICE.id() {
log::warn!(
"can not send back mode reply to sender {}",
requestor.sender_id()
);
} else {
self.send_mode_reply(requestor, ModeReply::ModeReply(self.mode_and_submode()))?;
}
}
Ok(())
}
fn send_mode_reply(
&self,
requestor: MessageMetadata,
reply: ModeReply,
) -> Result<(), Self::Error> {
if requestor.sender_id() != PUS_MODE_SERVICE.id() {
log::warn!(
"can not send back mode reply to sender {}",
requestor.sender_id()
);
}
self.mode_interface
.reply_to_pus_tx
.send(GenericMessage::new(requestor, reply))
.map_err(|_| GenericTargetedMessagingError::Send(GenericSendError::RxDisconnected))?;
Ok(())
}
fn handle_mode_info(
&mut self,
_requestor_info: MessageMetadata,
_info: ModeAndSubmode,
) -> Result<(), Self::Error> {
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::sync::mpsc;
use satrs::{
mode::ModeRequest, power::SwitchStateBinary, request::GenericMessage, tmtc::PacketAsVec,
};
use satrs_example::config::components::{Apid, MGM_HANDLER_0};
use satrs_minisim::eps::SwitchMapBinary;
use super::*;
#[derive(Default)]
pub struct SerialInterfaceTest {
pub inner: SerialInterfaceDummy,
pub send_queue: RefCell<VecDeque<Vec<u8>>>,
pub reply_queue: RefCell<VecDeque<String>>,
}
impl SerialInterface for SerialInterfaceTest {
type Error = ();
fn send(&self, data: &[u8]) -> Result<(), Self::Error> {
let mut send_queue_mut = self.send_queue.borrow_mut();
send_queue_mut.push_back(data.to_vec());
self.inner.send(data)
}
fn try_recv_replies<ReplyHandler: FnMut(&[u8])>(
&self,
mut f: ReplyHandler,
) -> Result<(), Self::Error> {
if self.inner.reply_queue_empty() {
return Ok(());
}
loop {
let reply = self.inner.get_next_reply_as_string();
self.reply_queue.borrow_mut().push_back(reply.clone());
f(reply.as_bytes());
if self.inner.reply_queue_empty() {
break;
}
}
Ok(())
}
}
pub struct PcduTestbench {
pub mode_request_tx: mpsc::Sender<GenericMessage<ModeRequest>>,
pub mode_reply_rx_to_pus: mpsc::Receiver<GenericMessage<ModeReply>>,
pub mode_reply_rx_to_parent: mpsc::Receiver<GenericMessage<ModeReply>>,
pub composite_request_tx: mpsc::Sender<GenericMessage<CompositeRequest>>,
pub hk_reply_rx: mpsc::Receiver<GenericMessage<HkReply>>,
pub tm_rx: mpsc::Receiver<PacketAsVec>,
pub switch_request_tx: mpsc::Sender<GenericMessage<SwitchRequest>>,
pub handler: PcduHandler<SerialInterfaceTest, mpsc::Sender<PacketAsVec>>,
}
impl PcduTestbench {
pub fn new() -> Self {
let (mode_request_tx, mode_request_rx) = mpsc::channel();
let (mode_reply_tx_to_pus, mode_reply_rx_to_pus) = mpsc::channel();
let (mode_reply_tx_to_parent, mode_reply_rx_to_parent) = mpsc::sync_channel(5);
let mode_interface = MpscModeLeafInterface {
request_rx: mode_request_rx,
reply_to_pus_tx: mode_reply_tx_to_pus,
reply_to_parent_tx: mode_reply_tx_to_parent,
};
let (composite_request_tx, composite_request_rx) = mpsc::channel();
let (hk_reply_tx, hk_reply_rx) = mpsc::channel();
let (tm_tx, tm_rx) = mpsc::channel::<PacketAsVec>();
let (switch_request_tx, switch_reqest_rx) = mpsc::channel();
let shared_switch_map = Arc::new(Mutex::new(SwitchSet::default()));
Self {
mode_request_tx,
mode_reply_rx_to_pus,
mode_reply_rx_to_parent,
composite_request_tx,
hk_reply_rx,
tm_rx,
switch_request_tx,
handler: PcduHandler::new(
UniqueApidTargetId::new(Apid::Eps as u16, 0),
"TEST_PCDU",
mode_interface,
composite_request_rx,
hk_reply_tx,
switch_reqest_rx,
tm_tx,
SerialInterfaceTest::default(),
shared_switch_map,
),
}
}
pub fn verify_switch_info_req_was_sent(&self, expected_queue_len: usize) {
// Check that there is now communication happening.
let mut send_queue_mut = self.handler.com_interface.send_queue.borrow_mut();
assert_eq!(send_queue_mut.len(), expected_queue_len);
let packet_sent = send_queue_mut.pop_front().unwrap();
drop(send_queue_mut);
let pcdu_req: PcduRequest = serde_json::from_slice(&packet_sent).unwrap();
assert_eq!(pcdu_req, PcduRequest::RequestSwitchInfo);
}
pub fn verify_switch_req_was_sent(
&self,
expected_queue_len: usize,
switch_id: PcduSwitch,
target_state: SwitchStateBinary,
) {
// Check that there is now communication happening.
let mut send_queue_mut = self.handler.com_interface.send_queue.borrow_mut();
assert_eq!(send_queue_mut.len(), expected_queue_len);
let packet_sent = send_queue_mut.pop_front().unwrap();
drop(send_queue_mut);
let pcdu_req: PcduRequest = serde_json::from_slice(&packet_sent).unwrap();
assert_eq!(
pcdu_req,
PcduRequest::SwitchDevice {
switch: switch_id,
state: target_state
}
)
}
pub fn verify_switch_reply_received(
&self,
expected_queue_len: usize,
expected_map: SwitchMapBinary,
) {
// Check that a switch reply was read back.
let mut reply_received_mut = self.handler.com_interface.reply_queue.borrow_mut();
assert_eq!(reply_received_mut.len(), expected_queue_len);
let reply_received = reply_received_mut.pop_front().unwrap();
let sim_reply: SimReply = serde_json::from_str(&reply_received).unwrap();
let pcdu_reply = PcduReply::from_sim_message(&sim_reply).unwrap();
assert_eq!(pcdu_reply, PcduReply::SwitchInfo(expected_map));
}
}
#[test]
fn test_basic_handler() {
let mut testbench = PcduTestbench::new();
assert_eq!(testbench.handler.com_interface.send_queue.borrow().len(), 0);
assert_eq!(
testbench.handler.com_interface.reply_queue.borrow().len(),
0
);
assert_eq!(
testbench.handler.mode_and_submode().mode(),
DeviceMode::Off as u32
);
assert_eq!(testbench.handler.mode_and_submode().submode(), 0_u16);
testbench.handler.periodic_operation(OpCode::RegularOp);
testbench
.handler
.periodic_operation(OpCode::PollAndRecvReplies);
// Handler is OFF, no changes expected.
assert_eq!(testbench.handler.com_interface.send_queue.borrow().len(), 0);
assert_eq!(
testbench.handler.com_interface.reply_queue.borrow().len(),
0
);
assert_eq!(
testbench.handler.mode_and_submode().mode(),
DeviceMode::Off as u32
);
assert_eq!(testbench.handler.mode_and_submode().submode(), 0_u16);
}
#[test]
fn test_normal_mode() {
let mut testbench = PcduTestbench::new();
testbench
.mode_request_tx
.send(GenericMessage::new(
MessageMetadata::new(0, PUS_MODE_SERVICE.id()),
ModeRequest::SetMode(ModeAndSubmode::new(DeviceMode::Normal as u32, 0)),
))
.expect("failed to send mode request");
let switch_map_shared = testbench.handler.shared_switch_map.lock().unwrap();
assert!(!switch_map_shared.valid);
drop(switch_map_shared);
testbench.handler.periodic_operation(OpCode::RegularOp);
testbench
.handler
.periodic_operation(OpCode::PollAndRecvReplies);
// Check correctness of mode.
assert_eq!(
testbench.handler.mode_and_submode().mode(),
DeviceMode::Normal as u32
);
assert_eq!(testbench.handler.mode_and_submode().submode(), 0);
testbench.verify_switch_info_req_was_sent(1);
testbench.verify_switch_reply_received(1, SwitchMapBinaryWrapper::default().0);
let switch_map_shared = testbench.handler.shared_switch_map.lock().unwrap();
assert!(switch_map_shared.valid);
drop(switch_map_shared);
}
#[test]
fn test_switch_request_handling() {
let mut testbench = PcduTestbench::new();
testbench
.mode_request_tx
.send(GenericMessage::new(
MessageMetadata::new(0, PUS_MODE_SERVICE.id()),
ModeRequest::SetMode(ModeAndSubmode::new(DeviceMode::Normal as u32, 0)),
))
.expect("failed to send mode request");
testbench
.switch_request_tx
.send(GenericMessage::new(
MessageMetadata::new(0, MGM_HANDLER_0.id()),
SwitchRequest::new(0, SwitchStateBinary::On),
))
.expect("failed to send switch request");
testbench.handler.periodic_operation(OpCode::RegularOp);
testbench
.handler
.periodic_operation(OpCode::PollAndRecvReplies);
testbench.verify_switch_req_was_sent(2, PcduSwitch::Mgm, SwitchStateBinary::On);
testbench.verify_switch_info_req_was_sent(1);
let mut switch_map = SwitchMapBinaryWrapper::default().0;
*switch_map
.get_mut(&PcduSwitch::Mgm)
.expect("switch state setting failed") = SwitchStateBinary::On;
testbench.verify_switch_reply_received(1, switch_map);
let switch_map_shared = testbench.handler.shared_switch_map.lock().unwrap();
assert!(switch_map_shared.valid);
drop(switch_map_shared);
}
}

View File

@ -1,7 +1,9 @@
use derive_new::new;
use satrs::hk::UniqueId;
use satrs::request::UniqueApidTargetId;
use satrs::spacepackets::ByteConversionError;
use satrs::spacepackets::ecss::hk;
use satrs::spacepackets::ecss::tm::{PusTmCreator, PusTmSecondaryHeader};
use satrs::spacepackets::{ByteConversionError, SpHeader};
#[derive(Debug, new, Copy, Clone)]
pub struct HkUniqueId {
@ -33,3 +35,35 @@ impl HkUniqueId {
Ok(8)
}
}
#[derive(new)]
pub struct PusHkHelper {
component_id: UniqueApidTargetId,
}
impl PusHkHelper {
pub fn generate_hk_report_packet<
'a,
'b,
HkWriter: FnMut(&mut [u8]) -> Result<usize, ByteConversionError>,
>(
&self,
timestamp: &'a [u8],
set_id: u32,
hk_data_writer: &mut HkWriter,
buf: &'b mut [u8],
) -> Result<PusTmCreator<'a, 'b>, ByteConversionError> {
let sec_header =
PusTmSecondaryHeader::new(3, hk::Subservice::TmHkPacket as u8, 0, 0, timestamp);
buf[0..4].copy_from_slice(&self.component_id.unique_id.to_be_bytes());
buf[4..8].copy_from_slice(&set_id.to_be_bytes());
let (_, second_half) = buf.split_at_mut(8);
let hk_data_len = hk_data_writer(second_half)?;
Ok(PusTmCreator::new(
SpHeader::new_from_apid(self.component_id.apid),
sec_header,
&buf[0..8 + hk_data_len],
true,
))
}
}

View File

@ -1,3 +1,4 @@
//! This module contains all component related to the direct interface of the example.
pub mod sim_client_udp;
pub mod tcp;
pub mod udp;

View File

@ -0,0 +1,420 @@
use std::{
collections::HashMap,
net::{Ipv4Addr, SocketAddr, SocketAddrV4, UdpSocket},
sync::mpsc,
time::Duration,
};
use satrs::pus::HandlingStatus;
use satrs_minisim::{
udp::SIM_CTRL_PORT, SerializableSimMsgPayload, SimComponent, SimMessageProvider, SimReply,
SimRequest,
};
use satrs_minisim::{SimCtrlReply, SimCtrlRequest};
struct SimReplyMap(pub HashMap<SimComponent, mpsc::Sender<SimReply>>);
pub fn create_sim_client(sim_request_rx: mpsc::Receiver<SimRequest>) -> Option<SimClientUdp> {
match SimClientUdp::new(
SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, SIM_CTRL_PORT)),
sim_request_rx,
) {
Ok(sim_client) => {
log::info!("simulator client connection success");
return Some(sim_client);
}
Err(e) => {
log::warn!("sim client creation error: {}", e);
}
}
None
}
#[derive(thiserror::Error, Debug)]
pub enum SimClientCreationError {
#[error("io error: {0}")]
Io(#[from] std::io::Error),
#[error("timeout when trying to connect to sim UDP server")]
Timeout,
#[error("invalid ping reply when trying connection to UDP sim server")]
InvalidReplyJsonError(#[from] serde_json::Error),
#[error("invalid sim reply, not pong reply as expected: {0:?}")]
ReplyIsNotPong(SimReply),
}
pub struct SimClientUdp {
udp_client: UdpSocket,
simulator_addr: SocketAddr,
sim_request_rx: mpsc::Receiver<SimRequest>,
reply_map: SimReplyMap,
reply_buf: [u8; 4096],
}
impl SimClientUdp {
pub fn new(
simulator_addr: SocketAddr,
sim_request_rx: mpsc::Receiver<SimRequest>,
) -> Result<Self, SimClientCreationError> {
let mut reply_buf: [u8; 4096] = [0; 4096];
let mut udp_client = UdpSocket::bind("127.0.0.1:0")?;
udp_client.set_read_timeout(Some(Duration::from_millis(100)))?;
Self::attempt_connection(&mut udp_client, simulator_addr, &mut reply_buf)?;
udp_client.set_nonblocking(true)?;
Ok(Self {
udp_client,
simulator_addr,
sim_request_rx,
reply_map: SimReplyMap(HashMap::new()),
reply_buf,
})
}
pub fn attempt_connection(
udp_client: &mut UdpSocket,
simulator_addr: SocketAddr,
reply_buf: &mut [u8],
) -> Result<(), SimClientCreationError> {
let sim_req = SimRequest::new_with_epoch_time(SimCtrlRequest::Ping);
let sim_req_json = serde_json::to_string(&sim_req).expect("failed to serialize SimRequest");
udp_client.send_to(sim_req_json.as_bytes(), simulator_addr)?;
match udp_client.recv(reply_buf) {
Ok(reply_len) => {
let sim_reply: SimReply = serde_json::from_slice(&reply_buf[0..reply_len])?;
if sim_reply.component() != SimComponent::SimCtrl {
return Err(SimClientCreationError::ReplyIsNotPong(sim_reply));
}
let sim_ctrl_reply =
SimCtrlReply::from_sim_message(&sim_reply).expect("invalid SIM reply");
match sim_ctrl_reply {
SimCtrlReply::InvalidRequest(_) => {
panic!("received invalid request reply from UDP sim server")
}
SimCtrlReply::Pong => Ok(()),
}
}
Err(e) => {
if e.kind() == std::io::ErrorKind::TimedOut
|| e.kind() == std::io::ErrorKind::WouldBlock
{
Err(SimClientCreationError::Timeout)
} else {
Err(SimClientCreationError::Io(e))
}
}
}
}
pub fn operation(&mut self) -> HandlingStatus {
let mut no_sim_requests_handled = true;
let mut no_data_from_udp_server_received = true;
loop {
match self.sim_request_rx.try_recv() {
Ok(request) => {
let request_json =
serde_json::to_string(&request).expect("failed to serialize SimRequest");
if let Err(e) = self
.udp_client
.send_to(request_json.as_bytes(), self.simulator_addr)
{
log::error!("error sending data to UDP SIM server: {}", e);
break;
} else {
no_sim_requests_handled = false;
}
}
Err(e) => match e {
mpsc::TryRecvError::Empty => {
break;
}
mpsc::TryRecvError::Disconnected => {
log::warn!("SIM request sender disconnected");
break;
}
},
}
}
loop {
match self.udp_client.recv(&mut self.reply_buf) {
Ok(recvd_bytes) => {
no_data_from_udp_server_received = false;
let sim_reply_result: serde_json::Result<SimReply> =
serde_json::from_slice(&self.reply_buf[0..recvd_bytes]);
match sim_reply_result {
Ok(sim_reply) => {
if let Some(sender) = self.reply_map.0.get(&sim_reply.component()) {
sender.send(sim_reply).expect("failed to send SIM reply");
} else {
log::warn!(
"no recipient for SIM reply from component {:?}",
sim_reply.component()
);
}
}
Err(e) => {
log::warn!("failed to deserialize SIM reply: {}", e);
}
}
}
Err(e) => {
if e.kind() == std::io::ErrorKind::WouldBlock
|| e.kind() == std::io::ErrorKind::TimedOut
{
break;
}
log::error!("error receiving data from UDP SIM server: {}", e);
break;
}
}
}
if no_sim_requests_handled && no_data_from_udp_server_received {
return HandlingStatus::Empty;
}
HandlingStatus::HandledOne
}
pub fn add_reply_recipient(
&mut self,
component: SimComponent,
reply_sender: mpsc::Sender<SimReply>,
) {
self.reply_map.0.insert(component, reply_sender);
}
}
#[cfg(test)]
pub mod tests {
use std::{
collections::HashMap,
net::{SocketAddr, UdpSocket},
sync::{
atomic::{AtomicBool, Ordering},
mpsc, Arc,
},
time::Duration,
};
use satrs_minisim::{
eps::{PcduReply, PcduRequest},
SerializableSimMsgPayload, SimComponent, SimCtrlReply, SimCtrlRequest, SimMessageProvider,
SimReply, SimRequest,
};
use super::SimClientUdp;
struct UdpSimTestServer {
udp_server: UdpSocket,
request_tx: mpsc::Sender<SimRequest>,
reply_rx: mpsc::Receiver<SimReply>,
last_sender: Option<SocketAddr>,
stop_signal: Arc<AtomicBool>,
recv_buf: [u8; 1024],
}
impl UdpSimTestServer {
pub fn new(
request_tx: mpsc::Sender<SimRequest>,
reply_rx: mpsc::Receiver<SimReply>,
stop_signal: Arc<AtomicBool>,
) -> Self {
let udp_server = UdpSocket::bind("127.0.0.1:0").expect("creating UDP server failed");
udp_server
.set_nonblocking(true)
.expect("failed to set UDP server to non-blocking");
Self {
udp_server,
request_tx,
reply_rx,
last_sender: None,
stop_signal,
recv_buf: [0; 1024],
}
}
pub fn operation(&mut self) {
loop {
let mut no_sim_replies_handled = true;
let mut no_data_received = true;
if self.stop_signal.load(Ordering::Relaxed) {
break;
}
if let Some(last_sender) = self.last_sender {
loop {
match self.reply_rx.try_recv() {
Ok(sim_reply) => {
let sim_reply_json = serde_json::to_string(&sim_reply)
.expect("failed to serialize SimReply");
self.udp_server
.send_to(sim_reply_json.as_bytes(), last_sender)
.expect("failed to send reply to client from UDP server");
no_sim_replies_handled = false;
}
Err(e) => match e {
mpsc::TryRecvError::Empty => break,
mpsc::TryRecvError::Disconnected => {
panic!("reply sender disconnected")
}
},
}
}
}
loop {
match self.udp_server.recv_from(&mut self.recv_buf) {
Ok((read_bytes, from)) => {
let sim_request: SimRequest =
serde_json::from_slice(&self.recv_buf[0..read_bytes])
.expect("failed to deserialize SimRequest");
if sim_request.component() == SimComponent::SimCtrl {
// For a ping, we perform the reply handling here directly
let sim_ctrl_request =
SimCtrlRequest::from_sim_message(&sim_request)
.expect("failed to convert SimRequest to SimCtrlRequest");
match sim_ctrl_request {
SimCtrlRequest::Ping => {
no_data_received = false;
self.last_sender = Some(from);
let sim_reply = SimReply::new(&SimCtrlReply::Pong);
let sim_reply_json = serde_json::to_string(&sim_reply)
.expect("failed to serialize SimReply");
self.udp_server
.send_to(sim_reply_json.as_bytes(), from)
.expect(
"failed to send reply to client from UDP server",
);
}
};
}
// Forward each SIM request for testing purposes.
self.request_tx
.send(sim_request)
.expect("failed to send request");
}
Err(e) => {
if e.kind() != std::io::ErrorKind::WouldBlock
&& e.kind() != std::io::ErrorKind::TimedOut
{
panic!("UDP server error: {}", e);
}
break;
}
}
}
if no_sim_replies_handled && no_data_received {
std::thread::sleep(Duration::from_millis(5));
}
}
}
pub fn local_addr(&self) -> SocketAddr {
self.udp_server.local_addr().unwrap()
}
}
#[test]
fn basic_connection_test() {
let (server_sim_request_tx, server_sim_request_rx) = mpsc::channel();
let (_server_sim_reply_tx, server_sim_reply_rx) = mpsc::channel();
let stop_signal = Arc::new(AtomicBool::new(false));
let mut udp_server = UdpSimTestServer::new(
server_sim_request_tx,
server_sim_reply_rx,
stop_signal.clone(),
);
let server_addr = udp_server.local_addr();
let (_client_sim_req_tx, client_sim_req_rx) = mpsc::channel();
// Need to spawn the simulator UDP server before calling the client constructor.
let jh0 = std::thread::spawn(move || {
udp_server.operation();
});
// Creating the client also performs the connection test.
SimClientUdp::new(server_addr, client_sim_req_rx).unwrap();
let sim_request = server_sim_request_rx
.recv_timeout(Duration::from_millis(50))
.expect("no SIM request received");
let ping_request = SimCtrlRequest::from_sim_message(&sim_request)
.expect("failed to create SimCtrlRequest");
assert_eq!(ping_request, SimCtrlRequest::Ping);
// Stop the server.
stop_signal.store(true, Ordering::Relaxed);
jh0.join().unwrap();
}
#[test]
fn basic_request_reply_test() {
let (server_sim_request_tx, server_sim_request_rx) = mpsc::channel();
let (server_sim_reply_tx, sever_sim_reply_rx) = mpsc::channel();
let stop_signal = Arc::new(AtomicBool::new(false));
let mut udp_server = UdpSimTestServer::new(
server_sim_request_tx,
sever_sim_reply_rx,
stop_signal.clone(),
);
let server_addr = udp_server.local_addr();
let (client_sim_req_tx, client_sim_req_rx) = mpsc::channel();
let (client_pcdu_reply_tx, client_pcdu_reply_rx) = mpsc::channel();
// Need to spawn the simulator UDP server before calling the client constructor.
let jh0 = std::thread::spawn(move || {
udp_server.operation();
});
// Creating the client also performs the connection test.
let mut client = SimClientUdp::new(server_addr, client_sim_req_rx).unwrap();
client.add_reply_recipient(SimComponent::Pcdu, client_pcdu_reply_tx);
let sim_request = server_sim_request_rx
.recv_timeout(Duration::from_millis(50))
.expect("no SIM request received");
let ping_request = SimCtrlRequest::from_sim_message(&sim_request)
.expect("failed to create SimCtrlRequest");
assert_eq!(ping_request, SimCtrlRequest::Ping);
let pcdu_req = PcduRequest::RequestSwitchInfo;
client_sim_req_tx
.send(SimRequest::new_with_epoch_time(pcdu_req))
.expect("send failed");
client.operation();
// Check that the request arrives properly at the server.
let sim_request = server_sim_request_rx
.recv_timeout(Duration::from_millis(50))
.expect("no SIM request received");
let req_recvd_on_server =
PcduRequest::from_sim_message(&sim_request).expect("failed to create SimCtrlRequest");
matches!(req_recvd_on_server, PcduRequest::RequestSwitchInfo);
// We inject the reply ourselves.
let pcdu_reply = PcduReply::SwitchInfo(HashMap::new());
server_sim_reply_tx
.send(SimReply::new(&pcdu_reply))
.expect("sending PCDU reply failed");
// Now we verify that the reply is sent by the UDP server back to the client, and then
// forwarded by the clients internal map.
let mut pcdu_reply_received = false;
for _ in 0..3 {
client.operation();
match client_pcdu_reply_rx.try_recv() {
Ok(sim_reply) => {
assert_eq!(sim_reply.component(), SimComponent::Pcdu);
let pcdu_reply_from_client = PcduReply::from_sim_message(&sim_reply)
.expect("failed to create PcduReply");
assert_eq!(pcdu_reply_from_client, pcdu_reply);
pcdu_reply_received = true;
break;
}
Err(e) => match e {
mpsc::TryRecvError::Empty => std::thread::sleep(Duration::from_millis(10)),
mpsc::TryRecvError::Disconnected => panic!("reply sender disconnected"),
},
}
}
if !pcdu_reply_received {
panic!("no reply received");
}
// Stop the server.
stop_signal.store(true, Ordering::Relaxed);
jh0.join().unwrap();
}
}

View File

@ -9,12 +9,12 @@ pub enum DeviceMode {
Normal = 2,
}
pub struct TimeStampHelper {
pub struct TimestampHelper {
stamper: CdsTime,
time_stamp: [u8; 7],
}
impl TimeStampHelper {
impl TimestampHelper {
pub fn stamp(&self) -> &[u8] {
&self.time_stamp
}
@ -29,7 +29,7 @@ impl TimeStampHelper {
}
}
impl Default for TimeStampHelper {
impl Default for TimestampHelper {
fn default() -> Self {
Self {
stamper: CdsTime::now_with_u16_days().expect("creating time stamper failed"),

View File

@ -1,4 +1,5 @@
mod acs;
mod eps;
mod events;
mod hk;
mod interface;
@ -7,6 +8,10 @@ mod pus;
mod requests;
mod tmtc;
use crate::eps::pcdu::{
PcduHandler, SerialInterfaceDummy, SerialInterfaceToSim, SerialSimInterfaceWrapper,
};
use crate::eps::PowerSwitchHelper;
use crate::events::EventHandler;
use crate::interface::udp::DynamicUdpTmHandler;
use crate::pus::stack::PusStack;
@ -16,15 +21,21 @@ use log::info;
use pus::test::create_test_service_dynamic;
use satrs::hal::std::tcp_server::ServerConfig;
use satrs::hal::std::udp_server::UdpTcServer;
use satrs::request::GenericMessage;
use satrs::pus::HandlingStatus;
use satrs::request::{GenericMessage, MessageMetadata};
use satrs::tmtc::{PacketSenderWithSharedPool, SharedPacketPool};
use satrs_example::config::pool::{create_sched_tc_pool, create_static_pools};
use satrs_example::config::tasks::{
FREQ_MS_AOCS, FREQ_MS_EVENT_HANDLING, FREQ_MS_PUS_STACK, FREQ_MS_UDP_TMTC,
FREQ_MS_AOCS, FREQ_MS_PUS_STACK, FREQ_MS_UDP_TMTC, SIM_CLIENT_IDLE_DELAY_MS,
};
use satrs_example::config::{OBSW_SERVER_ADDR, PACKET_ID_VALIDATOR, SERVER_PORT};
use satrs_example::DeviceMode;
use crate::acs::mgm::{MgmHandlerLis3Mdl, MpscModeLeafInterface, SpiDummyInterface};
use crate::acs::mgm::{
MgmHandlerLis3Mdl, MpscModeLeafInterface, SpiDummyInterface, SpiSimInterface,
SpiSimInterfaceWrapper,
};
use crate::interface::sim_client_udp::create_sim_client;
use crate::interface::tcp::{SyncTcpTmSource, TcpTask};
use crate::interface::udp::{StaticUdpTmHandler, UdpTmtcServer};
use crate::logger::setup_logger;
@ -36,12 +47,14 @@ use crate::pus::scheduler::{create_scheduler_service_dynamic, create_scheduler_s
use crate::pus::test::create_test_service_static;
use crate::pus::{PusTcDistributor, PusTcMpscRouter};
use crate::requests::{CompositeRequest, GenericRequestRouter};
use satrs::mode::ModeRequest;
use satrs::mode::{Mode, ModeAndSubmode, ModeRequest};
use satrs::pus::event_man::EventRequestWithToken;
use satrs::spacepackets::{time::cds::CdsTime, time::TimeWriter};
use satrs_example::config::components::{MGM_HANDLER_0, TCP_SERVER, UDP_SERVER};
use satrs_example::config::components::{
MGM_HANDLER_0, NO_SENDER, PCDU_HANDLER, TCP_SERVER, UDP_SERVER,
};
use std::net::{IpAddr, SocketAddr};
use std::sync::mpsc;
use std::sync::{mpsc, Mutex};
use std::sync::{Arc, RwLock};
use std::thread;
use std::time::Duration;
@ -60,9 +73,20 @@ fn static_tmtc_pool_main() {
let tm_sink_tx_sender =
PacketSenderWithSharedPool::new(tm_sink_tx.clone(), shared_tm_pool_wrapper.clone());
let (sim_request_tx, sim_request_rx) = mpsc::channel();
let (mgm_sim_reply_tx, mgm_sim_reply_rx) = mpsc::channel();
let (pcdu_sim_reply_tx, pcdu_sim_reply_rx) = mpsc::channel();
let mut opt_sim_client = create_sim_client(sim_request_rx);
let (mgm_handler_composite_tx, mgm_handler_composite_rx) =
mpsc::channel::<GenericMessage<CompositeRequest>>();
let (mgm_handler_mode_tx, mgm_handler_mode_rx) = mpsc::channel::<GenericMessage<ModeRequest>>();
mpsc::sync_channel::<GenericMessage<CompositeRequest>>(10);
let (pcdu_handler_composite_tx, pcdu_handler_composite_rx) =
mpsc::sync_channel::<GenericMessage<CompositeRequest>>(30);
let (mgm_handler_mode_tx, mgm_handler_mode_rx) =
mpsc::sync_channel::<GenericMessage<ModeRequest>>(5);
let (pcdu_handler_mode_tx, pcdu_handler_mode_rx) =
mpsc::sync_channel::<GenericMessage<ModeRequest>>(5);
// Some request are targetable. This map is used to retrieve sender handles based on a target ID.
let mut request_map = GenericRequestRouter::default();
@ -72,6 +96,12 @@ fn static_tmtc_pool_main() {
request_map
.mode_router_map
.insert(MGM_HANDLER_0.id(), mgm_handler_mode_tx);
request_map
.composite_router_map
.insert(PCDU_HANDLER.id(), pcdu_handler_composite_tx);
request_map
.mode_router_map
.insert(PCDU_HANDLER.id(), pcdu_handler_mode_tx.clone());
// This helper structure is used by all telecommand providers which need to send telecommands
// to the TC source.
@ -195,26 +225,76 @@ fn static_tmtc_pool_main() {
);
let (mgm_handler_mode_reply_to_parent_tx, _mgm_handler_mode_reply_to_parent_rx) =
mpsc::channel();
mpsc::sync_channel(5);
let shared_switch_set = Arc::new(Mutex::default());
let (switch_request_tx, switch_request_rx) = mpsc::sync_channel(20);
let switch_helper = PowerSwitchHelper::new(switch_request_tx, shared_switch_set.clone());
let dummy_spi_interface = SpiDummyInterface::default();
let shared_mgm_set = Arc::default();
let mode_leaf_interface = MpscModeLeafInterface {
let mgm_mode_leaf_interface = MpscModeLeafInterface {
request_rx: mgm_handler_mode_rx,
reply_tx_to_pus: pus_mode_reply_tx,
reply_tx_to_parent: mgm_handler_mode_reply_to_parent_tx,
reply_to_pus_tx: pus_mode_reply_tx.clone(),
reply_to_parent_tx: mgm_handler_mode_reply_to_parent_tx,
};
let mgm_spi_interface = if let Some(sim_client) = opt_sim_client.as_mut() {
sim_client.add_reply_recipient(satrs_minisim::SimComponent::MgmLis3Mdl, mgm_sim_reply_tx);
SpiSimInterfaceWrapper::Sim(SpiSimInterface {
sim_request_tx: sim_request_tx.clone(),
sim_reply_rx: mgm_sim_reply_rx,
})
} else {
SpiSimInterfaceWrapper::Dummy(SpiDummyInterface::default())
};
let mut mgm_handler = MgmHandlerLis3Mdl::new(
MGM_HANDLER_0,
"MGM_0",
mode_leaf_interface,
mgm_mode_leaf_interface,
mgm_handler_composite_rx,
pus_hk_reply_tx,
tm_sink_tx,
dummy_spi_interface,
pus_hk_reply_tx.clone(),
switch_helper.clone(),
tm_sink_tx.clone(),
mgm_spi_interface,
shared_mgm_set,
);
let (pcdu_handler_mode_reply_to_parent_tx, _pcdu_handler_mode_reply_to_parent_rx) =
mpsc::sync_channel(10);
let pcdu_mode_leaf_interface = MpscModeLeafInterface {
request_rx: pcdu_handler_mode_rx,
reply_to_pus_tx: pus_mode_reply_tx,
reply_to_parent_tx: pcdu_handler_mode_reply_to_parent_tx,
};
let pcdu_serial_interface = if let Some(sim_client) = opt_sim_client.as_mut() {
sim_client.add_reply_recipient(satrs_minisim::SimComponent::Pcdu, pcdu_sim_reply_tx);
SerialSimInterfaceWrapper::Sim(SerialInterfaceToSim::new(
sim_request_tx.clone(),
pcdu_sim_reply_rx,
))
} else {
SerialSimInterfaceWrapper::Dummy(SerialInterfaceDummy::default())
};
let mut pcdu_handler = PcduHandler::new(
PCDU_HANDLER,
"PCDU",
pcdu_mode_leaf_interface,
pcdu_handler_composite_rx,
pus_hk_reply_tx,
switch_request_rx,
tm_sink_tx,
pcdu_serial_interface,
shared_switch_set,
);
// The PCDU is a critical component which should be in normal mode immediately.
pcdu_handler_mode_tx
.send(GenericMessage::new(
MessageMetadata::new(0, NO_SENDER),
ModeRequest::SetMode(ModeAndSubmode::new(DeviceMode::Normal as Mode, 0)),
))
.expect("sending initial mode request failed");
info!("Starting TMTC and UDP task");
let jh_udp_tmtc = thread::Builder::new()
.name("SATRS tmtc-udp".to_string())
@ -247,14 +327,20 @@ fn static_tmtc_pool_main() {
})
.unwrap();
info!("Starting event handling task");
let jh_event_handling = thread::Builder::new()
.name("sat-rs events".to_string())
.spawn(move || loop {
event_handler.periodic_operation();
thread::sleep(Duration::from_millis(FREQ_MS_EVENT_HANDLING));
})
.unwrap();
let mut opt_jh_sim_client = None;
if let Some(mut sim_client) = opt_sim_client {
info!("Starting UDP sim client task");
opt_jh_sim_client = Some(
thread::Builder::new()
.name("sat-rs sim adapter".to_string())
.spawn(move || loop {
if sim_client.operation() == HandlingStatus::Empty {
std::thread::sleep(Duration::from_millis(SIM_CLIENT_IDLE_DELAY_MS));
}
})
.unwrap(),
);
}
info!("Starting AOCS thread");
let jh_aocs = thread::Builder::new()
@ -265,10 +351,26 @@ fn static_tmtc_pool_main() {
})
.unwrap();
info!("Starting EPS thread");
let jh_eps = thread::Builder::new()
.name("sat-rs eps".to_string())
.spawn(move || loop {
// TODO: We should introduce something like a fixed timeslot helper to allow a more
// declarative API. It would also be very useful for the AOCS task.
pcdu_handler.periodic_operation(eps::pcdu::OpCode::RegularOp);
thread::sleep(Duration::from_millis(50));
pcdu_handler.periodic_operation(eps::pcdu::OpCode::PollAndRecvReplies);
thread::sleep(Duration::from_millis(50));
pcdu_handler.periodic_operation(eps::pcdu::OpCode::PollAndRecvReplies);
thread::sleep(Duration::from_millis(300));
})
.unwrap();
info!("Starting PUS handler thread");
let jh_pus_handler = thread::Builder::new()
.name("sat-rs pus".to_string())
.spawn(move || loop {
event_handler.periodic_operation();
pus_stack.periodic_operation();
thread::sleep(Duration::from_millis(FREQ_MS_PUS_STACK));
})
@ -283,10 +385,13 @@ fn static_tmtc_pool_main() {
jh_tm_funnel
.join()
.expect("Joining TM Funnel thread failed");
jh_event_handling
.join()
.expect("Joining Event Manager thread failed");
if let Some(jh_sim_client) = opt_jh_sim_client {
jh_sim_client
.join()
.expect("Joining SIM client thread failed");
}
jh_aocs.join().expect("Joining AOCS thread failed");
jh_eps.join().expect("Joining EPS thread failed");
jh_pus_handler
.join()
.expect("Joining PUS handler thread failed");
@ -295,22 +400,38 @@ fn static_tmtc_pool_main() {
#[allow(dead_code)]
fn dyn_tmtc_pool_main() {
let (tc_source_tx, tc_source_rx) = mpsc::channel();
let (tm_funnel_tx, tm_funnel_rx) = mpsc::channel();
let (tm_sink_tx, tm_sink_rx) = mpsc::channel();
let (tm_server_tx, tm_server_rx) = mpsc::channel();
let (sim_request_tx, sim_request_rx) = mpsc::channel();
let (mgm_sim_reply_tx, mgm_sim_reply_rx) = mpsc::channel();
let (pcdu_sim_reply_tx, pcdu_sim_reply_rx) = mpsc::channel();
let mut opt_sim_client = create_sim_client(sim_request_rx);
// Some request are targetable. This map is used to retrieve sender handles based on a target ID.
let (mgm_handler_composite_tx, mgm_handler_composite_rx) =
mpsc::channel::<GenericMessage<CompositeRequest>>();
let (mgm_handler_mode_tx, mgm_handler_mode_rx) = mpsc::channel::<GenericMessage<ModeRequest>>();
mpsc::sync_channel::<GenericMessage<CompositeRequest>>(5);
let (pcdu_handler_composite_tx, pcdu_handler_composite_rx) =
mpsc::sync_channel::<GenericMessage<CompositeRequest>>(10);
let (mgm_handler_mode_tx, mgm_handler_mode_rx) =
mpsc::sync_channel::<GenericMessage<ModeRequest>>(5);
let (pcdu_handler_mode_tx, pcdu_handler_mode_rx) =
mpsc::sync_channel::<GenericMessage<ModeRequest>>(10);
// Some request are targetable. This map is used to retrieve sender handles based on a target ID.
let mut request_map = GenericRequestRouter::default();
request_map
.composite_router_map
.insert(MGM_HANDLER_0.raw(), mgm_handler_composite_tx);
.insert(MGM_HANDLER_0.id(), mgm_handler_composite_tx);
request_map
.mode_router_map
.insert(MGM_HANDLER_0.raw(), mgm_handler_mode_tx);
.insert(MGM_HANDLER_0.id(), mgm_handler_mode_tx);
request_map
.composite_router_map
.insert(PCDU_HANDLER.id(), pcdu_handler_composite_tx);
request_map
.mode_router_map
.insert(PCDU_HANDLER.id(), pcdu_handler_mode_tx.clone());
// Create event handling components
// These sender handles are used to send event requests, for example to enable or disable
@ -319,7 +440,7 @@ fn dyn_tmtc_pool_main() {
let (event_request_tx, event_request_rx) = mpsc::channel::<EventRequestWithToken>();
// The event task is the core handler to perform the event routing and TM handling as specified
// in the sat-rs documentation.
let mut event_handler = EventHandler::new(tm_funnel_tx.clone(), event_rx, event_request_rx);
let mut event_handler = EventHandler::new(tm_sink_tx.clone(), event_rx, event_request_rx);
let (pus_test_tx, pus_test_rx) = mpsc::channel();
let (pus_event_tx, pus_event_rx) = mpsc::channel();
@ -342,30 +463,30 @@ fn dyn_tmtc_pool_main() {
};
let pus_test_service =
create_test_service_dynamic(tm_funnel_tx.clone(), event_tx.clone(), pus_test_rx);
create_test_service_dynamic(tm_sink_tx.clone(), event_tx.clone(), pus_test_rx);
let pus_scheduler_service = create_scheduler_service_dynamic(
tm_funnel_tx.clone(),
tm_sink_tx.clone(),
tc_source_tx.clone(),
pus_sched_rx,
create_sched_tc_pool(),
);
let pus_event_service =
create_event_service_dynamic(tm_funnel_tx.clone(), pus_event_rx, event_request_tx);
create_event_service_dynamic(tm_sink_tx.clone(), pus_event_rx, event_request_tx);
let pus_action_service = create_action_service_dynamic(
tm_funnel_tx.clone(),
tm_sink_tx.clone(),
pus_action_rx,
request_map.clone(),
pus_action_reply_rx,
);
let pus_hk_service = create_hk_service_dynamic(
tm_funnel_tx.clone(),
tm_sink_tx.clone(),
pus_hk_rx,
request_map.clone(),
pus_hk_reply_rx,
);
let pus_mode_service = create_mode_service_dynamic(
tm_funnel_tx.clone(),
tm_sink_tx.clone(),
pus_mode_rx,
request_map,
pus_mode_reply_rx,
@ -381,7 +502,7 @@ fn dyn_tmtc_pool_main() {
let mut tmtc_task = TcSourceTaskDynamic::new(
tc_source_rx,
PusTcDistributor::new(tm_funnel_tx.clone(), pus_router),
PusTcDistributor::new(tm_sink_tx.clone(), pus_router),
);
let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT);
@ -410,28 +531,77 @@ fn dyn_tmtc_pool_main() {
)
.expect("tcp server creation failed");
let mut tm_funnel = TmSinkDynamic::new(sync_tm_tcp_source, tm_funnel_rx, tm_server_tx);
let mut tm_funnel = TmSinkDynamic::new(sync_tm_tcp_source, tm_sink_rx, tm_server_tx);
let shared_switch_set = Arc::new(Mutex::default());
let (switch_request_tx, switch_request_rx) = mpsc::sync_channel(20);
let switch_helper = PowerSwitchHelper::new(switch_request_tx, shared_switch_set.clone());
let (mgm_handler_mode_reply_to_parent_tx, _mgm_handler_mode_reply_to_parent_rx) =
mpsc::channel();
let dummy_spi_interface = SpiDummyInterface::default();
mpsc::sync_channel(5);
let shared_mgm_set = Arc::default();
let mode_leaf_interface = MpscModeLeafInterface {
request_rx: mgm_handler_mode_rx,
reply_tx_to_pus: pus_mode_reply_tx,
reply_tx_to_parent: mgm_handler_mode_reply_to_parent_tx,
reply_to_pus_tx: pus_mode_reply_tx.clone(),
reply_to_parent_tx: mgm_handler_mode_reply_to_parent_tx,
};
let mgm_spi_interface = if let Some(sim_client) = opt_sim_client.as_mut() {
sim_client.add_reply_recipient(satrs_minisim::SimComponent::MgmLis3Mdl, mgm_sim_reply_tx);
SpiSimInterfaceWrapper::Sim(SpiSimInterface {
sim_request_tx: sim_request_tx.clone(),
sim_reply_rx: mgm_sim_reply_rx,
})
} else {
SpiSimInterfaceWrapper::Dummy(SpiDummyInterface::default())
};
let mut mgm_handler = MgmHandlerLis3Mdl::new(
MGM_HANDLER_0,
"MGM_0",
mode_leaf_interface,
mgm_handler_composite_rx,
pus_hk_reply_tx,
tm_funnel_tx,
dummy_spi_interface,
pus_hk_reply_tx.clone(),
switch_helper.clone(),
tm_sink_tx.clone(),
mgm_spi_interface,
shared_mgm_set,
);
let (pcdu_handler_mode_reply_to_parent_tx, _pcdu_handler_mode_reply_to_parent_rx) =
mpsc::sync_channel(10);
let pcdu_mode_leaf_interface = MpscModeLeafInterface {
request_rx: pcdu_handler_mode_rx,
reply_to_pus_tx: pus_mode_reply_tx,
reply_to_parent_tx: pcdu_handler_mode_reply_to_parent_tx,
};
let pcdu_serial_interface = if let Some(sim_client) = opt_sim_client.as_mut() {
sim_client.add_reply_recipient(satrs_minisim::SimComponent::Pcdu, pcdu_sim_reply_tx);
SerialSimInterfaceWrapper::Sim(SerialInterfaceToSim::new(
sim_request_tx.clone(),
pcdu_sim_reply_rx,
))
} else {
SerialSimInterfaceWrapper::Dummy(SerialInterfaceDummy::default())
};
let mut pcdu_handler = PcduHandler::new(
PCDU_HANDLER,
"PCDU",
pcdu_mode_leaf_interface,
pcdu_handler_composite_rx,
pus_hk_reply_tx,
switch_request_rx,
tm_sink_tx,
pcdu_serial_interface,
shared_switch_set,
);
// The PCDU is a critical component which should be in normal mode immediately.
pcdu_handler_mode_tx
.send(GenericMessage::new(
MessageMetadata::new(0, NO_SENDER),
ModeRequest::SetMode(ModeAndSubmode::new(DeviceMode::Normal as Mode, 0)),
))
.expect("sending initial mode request failed");
info!("Starting TMTC and UDP task");
let jh_udp_tmtc = thread::Builder::new()
.name("sat-rs tmtc-udp".to_string())
@ -464,14 +634,20 @@ fn dyn_tmtc_pool_main() {
})
.unwrap();
info!("Starting event handling task");
let jh_event_handling = thread::Builder::new()
.name("sat-rs events".to_string())
.spawn(move || loop {
event_handler.periodic_operation();
thread::sleep(Duration::from_millis(FREQ_MS_EVENT_HANDLING));
})
.unwrap();
let mut opt_jh_sim_client = None;
if let Some(mut sim_client) = opt_sim_client {
info!("Starting UDP sim client task");
opt_jh_sim_client = Some(
thread::Builder::new()
.name("sat-rs sim adapter".to_string())
.spawn(move || loop {
if sim_client.operation() == HandlingStatus::Empty {
std::thread::sleep(Duration::from_millis(SIM_CLIENT_IDLE_DELAY_MS));
}
})
.unwrap(),
);
}
info!("Starting AOCS thread");
let jh_aocs = thread::Builder::new()
@ -482,11 +658,27 @@ fn dyn_tmtc_pool_main() {
})
.unwrap();
info!("Starting EPS thread");
let jh_eps = thread::Builder::new()
.name("sat-rs eps".to_string())
.spawn(move || loop {
// TODO: We should introduce something like a fixed timeslot helper to allow a more
// declarative API. It would also be very useful for the AOCS task.
pcdu_handler.periodic_operation(eps::pcdu::OpCode::RegularOp);
thread::sleep(Duration::from_millis(50));
pcdu_handler.periodic_operation(eps::pcdu::OpCode::PollAndRecvReplies);
thread::sleep(Duration::from_millis(50));
pcdu_handler.periodic_operation(eps::pcdu::OpCode::PollAndRecvReplies);
thread::sleep(Duration::from_millis(300));
})
.unwrap();
info!("Starting PUS handler thread");
let jh_pus_handler = thread::Builder::new()
.name("sat-rs pus".to_string())
.spawn(move || loop {
pus_stack.periodic_operation();
event_handler.periodic_operation();
thread::sleep(Duration::from_millis(FREQ_MS_PUS_STACK));
})
.unwrap();
@ -500,10 +692,13 @@ fn dyn_tmtc_pool_main() {
jh_tm_funnel
.join()
.expect("Joining TM Funnel thread failed");
jh_event_handling
.join()
.expect("Joining Event Manager thread failed");
if let Some(jh_sim_client) = opt_jh_sim_client {
jh_sim_client
.join()
.expect("Joining SIM client thread failed");
}
jh_aocs.join().expect("Joining AOCS thread failed");
jh_eps.join().expect("Joining EPS thread failed");
jh_pus_handler
.join()
.expect("Joining PUS handler thread failed");

View File

@ -341,7 +341,7 @@ mod tests {
let (tm_funnel_tx, tm_funnel_rx) = mpsc::channel();
let (pus_action_tx, pus_action_rx) = mpsc::channel();
let (action_reply_tx, action_reply_rx) = mpsc::channel();
let (action_req_tx, action_req_rx) = mpsc::channel();
let (action_req_tx, action_req_rx) = mpsc::sync_channel(10);
let verif_reporter = TestVerificationReporter::new(owner_id);
let mut generic_req_router = GenericRequestRouter::default();
generic_req_router

View File

@ -12,6 +12,7 @@ use satrs::pus::{
PusPacketHandlingError, PusReplyHandler, PusServiceHelper, PusTcToRequestConverter,
};
use satrs::request::{GenericMessage, UniqueApidTargetId};
use satrs::res_code::ResultU16;
use satrs::spacepackets::ecss::tc::PusTcReader;
use satrs::spacepackets::ecss::{hk, PusPacket, PusServiceId};
use satrs::tmtc::{PacketAsVec, PacketSenderWithSharedPool};
@ -32,8 +33,10 @@ pub struct HkReply {
}
#[derive(Clone, PartialEq, Debug)]
#[allow(dead_code)]
pub enum HkReplyVariant {
Ack,
Failed(ResultU16),
}
#[derive(Default)]
@ -69,6 +72,15 @@ impl PusReplyHandler<ActivePusRequestStd, HkReply> for HkReplyHandler {
.completion_success(tm_sender, started_token, time_stamp)
.expect("sending completion success verification failed");
}
HkReplyVariant::Failed(failure_code) => {
verification_handler
.completion_failure(
tm_sender,
started_token,
FailParams::new(time_stamp, &failure_code, &[]),
)
.expect("sending completion success verification failed");
}
};
Ok(true)
}

View File

@ -19,7 +19,7 @@ use satrs::tmtc::{PacketAsVec, PacketInPool};
use satrs::ComponentId;
use satrs_example::config::components::PUS_ROUTING_SERVICE;
use satrs_example::config::{tmtc_err, CustomPusServiceId};
use satrs_example::TimeStampHelper;
use satrs_example::TimestampHelper;
use std::fmt::Debug;
use std::sync::mpsc::{self, Sender};
@ -53,7 +53,7 @@ pub struct PusTcDistributor<TmSender: EcssTmSender> {
pub tm_sender: TmSender,
pub verif_reporter: VerificationReporter,
pub pus_router: PusTcMpscRouter,
stamp_helper: TimeStampHelper,
stamp_helper: TimestampHelper,
}
impl<TmSender: EcssTmSender> PusTcDistributor<TmSender> {
@ -66,7 +66,7 @@ impl<TmSender: EcssTmSender> PusTcDistributor<TmSender> {
PUS_ROUTING_SERVICE.apid,
),
pus_router,
stamp_helper: TimeStampHelper::default(),
stamp_helper: TimestampHelper::default(),
}
}

View File

@ -28,8 +28,9 @@ pub enum CompositeRequest {
pub struct GenericRequestRouter {
pub id: ComponentId,
// All messages which do not have a dedicated queue.
pub composite_router_map: HashMap<ComponentId, mpsc::Sender<GenericMessage<CompositeRequest>>>,
pub mode_router_map: HashMap<ComponentId, mpsc::Sender<GenericMessage<ModeRequest>>>,
pub composite_router_map:
HashMap<ComponentId, mpsc::SyncSender<GenericMessage<CompositeRequest>>>,
pub mode_router_map: HashMap<ComponentId, mpsc::SyncSender<GenericMessage<ModeRequest>>>,
}
impl Default for GenericRequestRouter {