move to CCSDS + serde for sat-rs example

This commit is contained in:
2025-11-17 11:13:49 +01:00
parent d7e6732888
commit 033c995138
61 changed files with 1649 additions and 2482 deletions
+169 -179
View File
@@ -1,14 +1,14 @@
use derive_new::new;
use satrs::hk::{HkRequest, HkRequestVariant};
use models::ccsds::{CcsdsTcPacketOwned, CcsdsTmPacketOwned};
use models::mgm::MgmData;
use models::pcdu::SwitchId;
use models::{mgm, ComponentId, HkRequestType};
use satrs::mode_tree::{ModeChild, ModeNode};
use satrs::power::{PowerSwitchInfo, PowerSwitcherCommandSender};
use satrs_example::ids::generic_pus::PUS_MODE;
use satrs::spacepackets::CcsdsPacketIdAndPsc;
use satrs_example::{DeviceMode, TimestampHelper};
use satrs_minisim::acs::lis3mdl::{
MgmLis3MdlReply, MgmLis3RawValues, FIELD_LSB_PER_GAUSS_4_SENS, GAUSS_TO_MICROTESLA_FACTOR,
};
use satrs_minisim::acs::MgmRequestLis3Mdl;
use satrs_minisim::eps::PcduSwitch;
use satrs_minisim::{SerializableSimMsgPayload, SimReply, SimRequest};
use std::fmt::Debug;
use std::sync::mpsc::{self};
@@ -19,17 +19,12 @@ use satrs::mode::{
ModeAndSubmode, ModeError, ModeProvider, ModeReply, ModeRequestHandler,
ModeRequestHandlerMpscBounded,
};
use satrs::pus::{EcssTmSender, PusTmVariant};
use satrs::request::{GenericMessage, MessageMetadata, UniqueApidTargetId};
use satrs::request::{GenericMessage, MessageMetadata};
use satrs_example::config::components::NO_SENDER;
use crate::hk::PusHkHelper;
use crate::pus::hk::{HkReply, HkReplyVariant};
use crate::requests::CompositeRequest;
use crate::ccsds::pack_ccsds_tm_packet_for_now;
use crate::eps::PowerSwitchHelper;
use crate::spi::SpiInterface;
use crate::tmtc::sender::TmTcSender;
use serde::{Deserialize, Serialize};
pub const NR_OF_DATA_AND_CFG_REGISTERS: usize = 14;
@@ -38,12 +33,6 @@ pub const X_LOWBYTE_IDX: usize = 9;
pub const Y_LOWBYTE_IDX: usize = 11;
pub const Z_LOWBYTE_IDX: usize = 13;
#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
#[repr(u32)]
pub enum SetId {
SensorData = 0,
}
#[derive(Default, Debug, PartialEq, Eq)]
pub enum TransitionState {
#[default]
@@ -120,19 +109,10 @@ impl SpiInterface for SpiSimInterfaceWrapper {
}
}
#[derive(Default, Debug, Copy, Clone, Serialize, Deserialize)]
pub struct MgmData {
pub valid: bool,
pub x: f32,
pub y: f32,
pub z: f32,
}
#[derive(Default)]
pub struct BufWrapper {
tx_buf: [u8; 32],
rx_buf: [u8; 32],
tm_buf: [u8; 32],
}
pub struct ModeHelpers {
@@ -154,40 +134,52 @@ impl Default for ModeHelpers {
}
/// Example MGM device handler strongly based on the LIS3MDL MEMS device.
#[derive(new)]
#[allow(clippy::too_many_arguments)]
pub struct MgmHandlerLis3Mdl<
ComInterface: SpiInterface,
SwitchHelper: PowerSwitchInfo<PcduSwitch> + PowerSwitcherCommandSender<PcduSwitch>,
> {
id: UniqueApidTargetId,
pub struct MgmHandlerLis3Mdl<ComInterface: SpiInterface> {
id: ComponentId,
dev_str: &'static str,
mode_node: ModeRequestHandlerMpscBounded,
composite_request_rx: mpsc::Receiver<GenericMessage<CompositeRequest>>,
hk_reply_tx: mpsc::SyncSender<GenericMessage<HkReply>>,
switch_helper: SwitchHelper,
tm_sender: TmTcSender,
tc_rx: mpsc::Receiver<CcsdsTcPacketOwned>,
tm_tx: mpsc::SyncSender<CcsdsTmPacketOwned>,
switch_helper: PowerSwitchHelper,
pub com_interface: ComInterface,
shared_mgm_set: Arc<Mutex<MgmData>>,
#[new(value = "PusHkHelper::new(id)")]
hk_helper: PusHkHelper,
#[new(default)]
//hk_helper: PusHkHelper,
mode_helpers: ModeHelpers,
#[new(default)]
bufs: BufWrapper,
#[new(default)]
stamp_helper: TimestampHelper,
}
impl<
ComInterface: SpiInterface,
SwitchHelper: PowerSwitchInfo<PcduSwitch> + PowerSwitcherCommandSender<PcduSwitch>,
> MgmHandlerLis3Mdl<ComInterface, SwitchHelper>
{
impl<ComInterface: SpiInterface> MgmHandlerLis3Mdl<ComInterface> {
#[allow(clippy::too_many_arguments)]
pub fn new(
id: ComponentId,
dev_str: &'static str,
mode_node: ModeRequestHandlerMpscBounded,
tc_rx: mpsc::Receiver<CcsdsTcPacketOwned>,
tm_tx: mpsc::SyncSender<CcsdsTmPacketOwned>,
switch_helper: PowerSwitchHelper,
com_interface: ComInterface,
shared_mgm_set: Arc<Mutex<MgmData>>,
) -> Self {
Self {
id,
dev_str,
mode_node,
tc_rx,
tm_tx,
switch_helper,
com_interface,
shared_mgm_set,
mode_helpers: ModeHelpers::default(),
bufs: BufWrapper::default(),
stamp_helper: TimestampHelper::default(),
}
}
pub fn periodic_operation(&mut self) {
self.stamp_helper.update_from_now();
// Handle requests.
self.handle_composite_requests();
self.handle_telecommands();
self.handle_mode_requests();
if let Some(target_mode_submode) = self.mode_helpers.target {
self.handle_mode_transition(target_mode_submode);
@@ -198,67 +190,76 @@ impl<
}
}
pub fn handle_composite_requests(&mut self) {
pub fn handle_telecommands(&mut self) {
loop {
match self.composite_request_rx.try_recv() {
Ok(ref msg) => match &msg.message {
CompositeRequest::Hk(hk_request) => {
self.handle_hk_request(&msg.requestor_info, hk_request)
}
// TODO: This object does not have actions (yet).. Still send back completion failure
// reply.
CompositeRequest::Action(_action_req) => {}
},
Err(e) => {
if e != mpsc::TryRecvError::Empty {
log::warn!(
"{}: failed to receive composite request: {:?}",
self.dev_str,
e
);
} else {
break;
match self.tc_rx.try_recv() {
Ok(packet) => {
let tc_id = CcsdsPacketIdAndPsc::new_from_ccsds_packet(&packet.sp_header);
match postcard::from_bytes::<mgm::request::Request>(&packet.payload) {
Ok(request) => {
log::info!(
"received request {:?} with TC ID {:#010x}",
request,
tc_id.raw()
);
match request {
mgm::request::Request::Ping => {
self.send_telemetry(Some(tc_id), mgm::response::Response::Ok)
}
mgm::request::Request::Hk(hk_request) => {
self.handle_hk_request(Some(tc_id), &hk_request)
}
}
}
Err(e) => {
log::warn!("failed to deserialize request: {}", e);
}
}
}
Err(e) => match e {
std::sync::mpsc::TryRecvError::Empty => break,
std::sync::mpsc::TryRecvError::Disconnected => {
log::warn!("packet sender disconnected")
}
},
}
}
}
pub fn handle_hk_request(&mut self, requestor_info: &MessageMetadata, hk_request: &HkRequest) {
match hk_request.variant {
HkRequestVariant::OneShot => {
let mgm_snapshot = *self.shared_mgm_set.lock().unwrap();
if let Ok(hk_tm) = self.hk_helper.generate_hk_report_packet(
self.stamp_helper.stamp(),
SetId::SensorData as u32,
&mut |hk_buf| {
hk_buf[0] = mgm_snapshot.valid as u8;
hk_buf[1..5].copy_from_slice(&mgm_snapshot.x.to_be_bytes());
hk_buf[5..9].copy_from_slice(&mgm_snapshot.y.to_be_bytes());
hk_buf[9..13].copy_from_slice(&mgm_snapshot.z.to_be_bytes());
Ok(13)
},
&mut self.bufs.tm_buf,
) {
// TODO: If sending the TM fails, we should also send a failure reply.
self.tm_sender
.send_tm(self.id.id(), PusTmVariant::Direct(hk_tm))
.expect("failed to send HK TM");
self.hk_reply_tx
.send(GenericMessage::new(
*requestor_info,
HkReply::new(hk_request.unique_id, HkReplyVariant::Ack),
))
.expect("failed to send HK reply");
} else {
// TODO: Send back failure reply. Need result code for this.
log::error!("TM buffer too small to generate HK data");
pub fn send_telemetry(
&self,
tc_id: Option<CcsdsPacketIdAndPsc>,
response: mgm::response::Response,
) {
match pack_ccsds_tm_packet_for_now(self.id, tc_id, &response) {
Ok(packet) => {
if let Err(e) = self.tm_tx.send(packet) {
log::warn!("failed to send TM packet: {}", e);
}
}
HkRequestVariant::EnablePeriodic => todo!(),
HkRequestVariant::DisablePeriodic => todo!(),
HkRequestVariant::ModifyCollectionInterval(_) => todo!(),
Err(e) => {
log::warn!("failed to pack TM packet: {}", e);
}
}
}
pub fn handle_hk_request(
&mut self,
tc_id: Option<CcsdsPacketIdAndPsc>,
hk_request: &models::mgm::request::HkRequest,
) {
match hk_request.req_type {
HkRequestType::OneShot => {
let mgm_snapshot = *self.shared_mgm_set.lock().unwrap();
self.send_telemetry(
tc_id,
mgm::response::Response::Hk(mgm::response::HkResponse::MgmData(mgm_snapshot)),
)
}
HkRequestType::EnablePeriodic(_duration) => todo!(),
HkRequestType::DisablePeriodic => todo!(),
HkRequestType::ModifyInterval(_duration) => todo!(),
_ => todo!(),
}
}
@@ -331,7 +332,7 @@ impl<
if self.mode_helpers.transition_state == TransitionState::Idle {
let result = self
.switch_helper
.send_switch_on_cmd(MessageMetadata::new(0, self.id.id()), PcduSwitch::Mgm);
.send_switch_on_cmd(MessageMetadata::new(0, self.id as u32), SwitchId::Mgm0);
if result.is_err() {
// Could not send switch command.. still continue with transition.
log::error!("failed to send switch on command");
@@ -339,10 +340,7 @@ impl<
self.mode_helpers.transition_state = TransitionState::PowerSwitching;
}
if self.mode_helpers.transition_state == TransitionState::PowerSwitching
&& self
.switch_helper
.is_switch_on(PcduSwitch::Mgm)
.expect("switch info error")
&& self.switch_helper.is_switch_on(SwitchId::Mgm0)
{
self.mode_helpers.transition_state = TransitionState::Done;
}
@@ -356,21 +354,13 @@ impl<
}
}
impl<
ComInterface: SpiInterface,
SwitchHelper: PowerSwitchInfo<PcduSwitch> + PowerSwitcherCommandSender<PcduSwitch>,
> ModeProvider for MgmHandlerLis3Mdl<ComInterface, SwitchHelper>
{
impl<ComInterface: SpiInterface> ModeProvider for MgmHandlerLis3Mdl<ComInterface> {
fn mode_and_submode(&self) -> ModeAndSubmode {
self.mode_helpers.current
}
}
impl<
ComInterface: SpiInterface,
SwitchHelper: PowerSwitchInfo<PcduSwitch> + PowerSwitcherCommandSender<PcduSwitch>,
> ModeRequestHandler for MgmHandlerLis3Mdl<ComInterface, SwitchHelper>
{
impl<ComInterface: SpiInterface> ModeRequestHandler for MgmHandlerLis3Mdl<ComInterface> {
type Error = ModeError;
fn start_transition(
@@ -417,7 +407,7 @@ impl<
if requestor.sender_id() == NO_SENDER {
return Ok(());
}
if requestor.sender_id() != PUS_MODE.id() {
if requestor.sender_id() != ComponentId::Ground as u32 {
log::warn!(
"can not send back mode reply to sender {:x}",
requestor.sender_id()
@@ -434,7 +424,7 @@ impl<
requestor: MessageMetadata,
reply: ModeReply,
) -> Result<(), Self::Error> {
if requestor.sender_id() != PUS_MODE.id() {
if requestor.sender_id() != ComponentId::Ground as u32 {
log::warn!(
"can not send back mode reply to sender {}",
requestor.sender_id()
@@ -455,21 +445,13 @@ impl<
}
}
impl<
ComInterface: SpiInterface,
SwitchHelper: PowerSwitchInfo<PcduSwitch> + PowerSwitcherCommandSender<PcduSwitch>,
> ModeNode for MgmHandlerLis3Mdl<ComInterface, SwitchHelper>
{
impl<ComInterface: SpiInterface> ModeNode for MgmHandlerLis3Mdl<ComInterface> {
fn id(&self) -> satrs::ComponentId {
self.id.into()
self.id as u32
}
}
impl<
ComInterface: SpiInterface,
SwitchHelper: PowerSwitchInfo<PcduSwitch> + PowerSwitcherCommandSender<PcduSwitch>,
> ModeChild for MgmHandlerLis3Mdl<ComInterface, SwitchHelper>
{
impl<ComInterface: SpiInterface> ModeChild for MgmHandlerLis3Mdl<ComInterface> {
type Sender = mpsc::SyncSender<GenericMessage<ModeReply>>;
fn add_mode_parent(&mut self, id: satrs::ComponentId, reply_sender: Self::Sender) {
@@ -484,19 +466,19 @@ mod tests {
sync::{mpsc, Arc},
};
use arbitrary_int::u21;
use models::{
pcdu::{SwitchRequest, SwitchState, SwitchStateBinary},
ComponentId,
};
use satrs::{
mode::{ModeReply, ModeRequest},
mode_tree::ModeParent,
power::SwitchStateBinary,
request::{GenericMessage, UniqueApidTargetId},
request::GenericMessage,
tmtc::PacketAsVec,
ComponentId,
};
use satrs_example::ids::{acs::ASSEMBLY, Apid};
use satrs_minisim::acs::lis3mdl::MgmLis3RawValues;
use crate::{eps::TestSwitchHelper, pus::hk::HkReply, requests::CompositeRequest};
use crate::eps::pcdu::{SharedSwitchSet, SwitchMap, SwitchSet};
use super::*;
@@ -524,22 +506,24 @@ mod tests {
#[allow(dead_code)]
pub struct MgmTestbench {
pub mode_request_tx: mpsc::SyncSender<GenericMessage<ModeRequest>>,
pub mode_reply_rx_to_pus: mpsc::Receiver<GenericMessage<ModeReply>>,
pub mode_reply_rx_to_ground: mpsc::Receiver<GenericMessage<ModeReply>>,
pub mode_reply_rx_to_parent: mpsc::Receiver<GenericMessage<ModeReply>>,
pub composite_request_tx: mpsc::Sender<GenericMessage<CompositeRequest>>,
pub hk_reply_rx: mpsc::Receiver<GenericMessage<HkReply>>,
pub shared_switch_set: SharedSwitchSet,
pub tc_tx: mpsc::SyncSender<CcsdsTcPacketOwned>,
pub tm_rx: mpsc::Receiver<PacketAsVec>,
pub handler: MgmHandlerLis3Mdl<TestSpiInterface, TestSwitchHelper>,
pub switch_rx: mpsc::Receiver<GenericMessage<SwitchRequest>>,
pub handler: MgmHandlerLis3Mdl<TestSpiInterface>,
}
#[derive(Default)]
#[allow(dead_code)]
pub struct MgmAssemblyMock(
pub HashMap<ComponentId, mpsc::SyncSender<GenericMessage<ModeRequest>>>,
pub HashMap<satrs::ComponentId, mpsc::SyncSender<GenericMessage<ModeRequest>>>,
);
impl ModeNode for MgmAssemblyMock {
fn id(&self) -> satrs::ComponentId {
PUS_MODE.into()
ComponentId::AcsMgmAssembly as u32
}
}
@@ -552,17 +536,19 @@ mod tests {
}
#[derive(Default)]
pub struct PusMock {
pub request_sender_map: HashMap<ComponentId, mpsc::SyncSender<GenericMessage<ModeRequest>>>,
#[allow(dead_code)]
pub struct GroundMock {
pub request_sender_map:
HashMap<satrs::ComponentId, mpsc::SyncSender<GenericMessage<ModeRequest>>>,
}
impl ModeNode for PusMock {
impl ModeNode for GroundMock {
fn id(&self) -> satrs::ComponentId {
PUS_MODE.into()
ComponentId::Ground as u32
}
}
impl ModeParent for PusMock {
impl ModeParent for GroundMock {
type Sender = mpsc::SyncSender<GenericMessage<ModeRequest>>;
fn add_mode_child(&mut self, id: satrs::ComponentId, request_sender: Self::Sender) {
@@ -573,36 +559,40 @@ mod tests {
impl MgmTestbench {
pub fn new() -> Self {
let (request_tx, request_rx) = mpsc::sync_channel(5);
let (reply_tx_to_pus, reply_rx_to_pus) = mpsc::sync_channel(5);
let (reply_tx_to_ground, reply_rx_to_ground) = mpsc::sync_channel(5);
let (reply_tx_to_parent, reply_rx_to_parent) = mpsc::sync_channel(5);
let id = UniqueApidTargetId::new(Apid::Acs.raw_value(), u21::new(1));
let mode_node = ModeRequestHandlerMpscBounded::new(id.into(), request_rx);
let (composite_request_tx, composite_request_rx) = mpsc::channel();
let (hk_reply_tx, hk_reply_rx) = mpsc::sync_channel(10);
let (tm_tx, tm_rx) = mpsc::sync_channel(10);
let tm_sender = TmTcSender::Heap(tm_tx);
let mode_node =
ModeRequestHandlerMpscBounded::new(ComponentId::Ground as u32, request_rx);
let (tc_tx, tc_rx) = mpsc::sync_channel(10);
let (hk_reply_tx, _hk_reply_rx) = mpsc::sync_channel(10);
let (_tm_tx, tm_rx) = mpsc::sync_channel(10);
let (switcher_tx, switch_rx) = mpsc::sync_channel(10);
let shared_mgm_set = Arc::default();
let mut switch_map = SwitchMap::new();
switch_map.insert(SwitchId::Mgm0, SwitchState::Off);
let switch_map = SwitchSet::new(switch_map);
let shared_switch_set = SharedSwitchSet::new(Mutex::new(switch_map));
let mut handler = MgmHandlerLis3Mdl::new(
id,
ComponentId::AcsMgm0,
"TEST_MGM",
mode_node,
composite_request_rx,
tc_rx,
hk_reply_tx,
TestSwitchHelper::default(),
tm_sender,
PowerSwitchHelper::new(switcher_tx, shared_switch_set.clone()),
TestSpiInterface::default(),
shared_mgm_set,
);
handler.add_mode_parent(PUS_MODE.into(), reply_tx_to_pus);
handler.add_mode_parent(ASSEMBLY.into(), reply_tx_to_parent);
handler.add_mode_parent(ComponentId::Ground as u32, reply_tx_to_ground);
handler.add_mode_parent(ComponentId::AcsMgmAssembly as u32, reply_tx_to_parent);
Self {
mode_request_tx: request_tx,
mode_reply_rx_to_pus: reply_rx_to_pus,
mode_reply_rx_to_ground: reply_rx_to_ground,
mode_reply_rx_to_parent: reply_rx_to_parent,
composite_request_tx,
shared_switch_set,
switch_rx,
handler,
tm_rx,
hk_reply_rx,
tc_tx,
}
}
}
@@ -632,7 +622,7 @@ mod tests {
testbench
.mode_request_tx
.send(GenericMessage::new(
MessageMetadata::new(0, PUS_MODE.id()),
MessageMetadata::new(0, ComponentId::Ground as u32),
ModeRequest::SetMode {
mode_and_submode: ModeAndSubmode::new(DeviceMode::Normal as u32, 0),
forced: false,
@@ -647,22 +637,22 @@ mod tests {
assert_eq!(testbench.handler.mode_and_submode().submode(), 0);
// Verify power switch handling.
let mut switch_requests = testbench.handler.switch_helper.switch_requests.borrow_mut();
assert_eq!(switch_requests.len(), 1);
let switch_req = switch_requests.pop_front().expect("no switch request");
assert_eq!(switch_req.target_state, SwitchStateBinary::On);
assert_eq!(switch_req.switch_id, PcduSwitch::Mgm);
let mut switch_info_requests = testbench
.handler
.switch_helper
.switch_info_requests
.borrow_mut();
assert_eq!(switch_info_requests.len(), 1);
let switch_info_req = switch_info_requests.pop_front().expect("no switch request");
assert_eq!(switch_info_req, PcduSwitch::Mgm);
let switch_req = testbench.switch_rx.try_recv().expect("no switch request");
assert_eq!(switch_req.message.switch_id, SwitchId::Mgm0);
assert_eq!(switch_req.message.target_state, SwitchStateBinary::On);
// This simulates one cycle for the power switch to update.
testbench
.shared_switch_set
.lock()
.unwrap()
.set_switch_state(SwitchId::Mgm0, SwitchState::On);
// Now the power switch is updated and the mode request should be completed.
testbench.handler.periodic_operation();
let mode_reply = testbench
.mode_reply_rx_to_pus
.mode_reply_rx_to_ground
.try_recv()
.expect("no mode reply generated");
match mode_reply.message {
@@ -673,7 +663,7 @@ mod tests {
_ => panic!("unexpected mode reply"),
}
// The device should have been polled once.
assert_eq!(testbench.handler.com_interface.call_count, 1);
assert_eq!(testbench.handler.com_interface.call_count, 2);
let mgm_set = *testbench.handler.shared_mgm_set.lock().unwrap();
assert!(mgm_set.x < 0.001);
assert!(mgm_set.y < 0.001);
@@ -693,7 +683,7 @@ mod tests {
testbench
.mode_request_tx
.send(GenericMessage::new(
MessageMetadata::new(0, PUS_MODE.id()),
MessageMetadata::new(0, ComponentId::Ground as u32),
ModeRequest::SetMode {
mode_and_submode: ModeAndSubmode::new(DeviceMode::Normal as u32, 0),
forced: false,
-66
View File
@@ -1,66 +0,0 @@
#![allow(dead_code)]
use crossbeam_channel::{bounded, Receiver, Sender};
use std::sync::atomic::{AtomicU16, Ordering};
use std::thread;
use zerocopy::{FromBytes, Immutable, IntoBytes, NetworkEndian, Unaligned, U16};
trait FieldDataProvider: Send {
fn get_data(&self) -> &[u8];
}
struct FixedFieldDataWrapper {
data: [u8; 8],
}
impl FixedFieldDataWrapper {
pub fn from_two_u32(p0: u32, p1: u32) -> Self {
let mut data = [0; 8];
data[0..4].copy_from_slice(p0.to_be_bytes().as_slice());
data[4..8].copy_from_slice(p1.to_be_bytes().as_slice());
Self { data }
}
}
impl FieldDataProvider for FixedFieldDataWrapper {
fn get_data(&self) -> &[u8] {
self.data.as_slice()
}
}
type FieldDataTraitObj = Box<dyn FieldDataProvider>;
struct ExampleMgmSet {
mgm_vec: [f32; 3],
temperature: u16,
}
#[derive(FromBytes, IntoBytes, Immutable, Unaligned)]
#[repr(C)]
struct ExampleMgmSetZc {
mgm_vec: [u8; 12],
temperatur: U16<NetworkEndian>,
}
fn main() {
let (s0, r0): (Sender<FieldDataTraitObj>, Receiver<FieldDataTraitObj>) = bounded(5);
let data_wrapper = FixedFieldDataWrapper::from_two_u32(2, 3);
s0.send(Box::new(data_wrapper)).unwrap();
let jh0 = thread::spawn(move || {
let data = r0.recv().unwrap();
let raw = data.get_data();
println!("Received data {raw:?}");
});
let jh1 = thread::spawn(|| {});
jh0.join().unwrap();
jh1.join().unwrap();
//let mut max_val: u16 = u16::MAX;
//max_val += 1;
//println!("Max val: {}", max_val);
let atomic_u16: AtomicU16 = AtomicU16::new(u16::MAX);
atomic_u16.fetch_add(1, Ordering::SeqCst);
println!(
"atomic after overflow: {}",
atomic_u16.load(Ordering::SeqCst)
);
}
+34
View File
@@ -0,0 +1,34 @@
use arbitrary_int::u11;
use models::{ccsds::CcsdsTmPacketOwned, Apid, ComponentId, Message, TmHeader};
use satrs::spacepackets::{
time::{cds::CdsTime, StdTimestampError},
CcsdsPacketIdAndPsc, SpHeader,
};
use serde::Serialize;
#[derive(Debug, thiserror::Error)]
pub enum CcsdsTmCreationError {
#[error("postcard error: {0}")]
Postcard(#[from] postcard::Error),
#[error("timestamp error: {0}")]
Time(#[from] StdTimestampError),
}
pub fn pack_ccsds_tm_packet_for_now(
sender_id: ComponentId,
tc_id: Option<CcsdsPacketIdAndPsc>,
payload: &(impl Serialize + Message),
) -> Result<CcsdsTmPacketOwned, CcsdsTmCreationError> {
let now = CdsTime::now_with_u16_days()?;
let sp_header = SpHeader::new_from_apid(u11::new(Apid::Tmtc as u16));
let tm_header = TmHeader::new(
sender_id,
ComponentId::Ground,
payload.message_type(),
tc_id,
&now,
);
Ok(CcsdsTmPacketOwned::new_with_serde_payload(
sp_header, &tm_header, payload,
)?)
}
+5 -10
View File
@@ -7,13 +7,10 @@ use satrs::{
use satrs_mib::res_code::ResultU16Info;
use satrs_mib::resultcode;
use std::{collections::HashSet, net::Ipv4Addr};
use strum::IntoEnumIterator;
use strum::IntoEnumIterator as _;
use num_enum::{IntoPrimitive, TryFromPrimitive};
use satrs::{
events_legacy::{EventU32TypedSev, SeverityInfo},
pool::{StaticMemoryPool, StaticPoolConfig},
};
use satrs::pool::{StaticMemoryPool, StaticPoolConfig};
#[derive(Copy, Clone, PartialEq, Eq, Debug, TryFromPrimitive, IntoPrimitive)]
#[repr(u8)]
@@ -39,19 +36,17 @@ pub enum GroupId {
pub const OBSW_SERVER_ADDR: Ipv4Addr = Ipv4Addr::UNSPECIFIED;
pub const SERVER_PORT: u16 = 7301;
pub const TEST_EVENT: EventU32TypedSev<SeverityInfo> = EventU32TypedSev::<SeverityInfo>::new(0, 0);
lazy_static! {
pub static ref PACKET_ID_VALIDATOR: HashSet<PacketId> = {
let mut set = HashSet::new();
for id in crate::ids::Apid::iter() {
for id in models::Apid::iter() {
set.insert(PacketId::new(PacketType::Tc, true, u11::new(id as u16)));
}
set
};
pub static ref APID_VALIDATOR: HashSet<u16> = {
let mut set = HashSet::new();
for id in crate::ids::Apid::iter() {
for id in models::Apid::iter() {
set.insert(id as u16);
}
set
@@ -175,6 +170,6 @@ pub mod pool {
pub mod tasks {
pub const FREQ_MS_UDP_TMTC: u64 = 200;
pub const FREQ_MS_AOCS: u64 = 500;
pub const FREQ_MS_PUS_STACK: u64 = 200;
pub const FREQ_MS_CONTROLLER: u64 = 200;
pub const SIM_CLIENT_IDLE_DELAY_MS: u64 = 5;
}
+83
View File
@@ -0,0 +1,83 @@
use models::{
ccsds::{CcsdsTcPacketOwned, CcsdsTmPacketOwned},
control, ComponentId,
};
use satrs::spacepackets::CcsdsPacketIdAndPsc;
use crate::ccsds::pack_ccsds_tm_packet_for_now;
pub struct Controller {
pub tc_rx: std::sync::mpsc::Receiver<CcsdsTcPacketOwned>,
pub tm_tx: std::sync::mpsc::SyncSender<CcsdsTmPacketOwned>,
pub event_ctrl_tx: std::sync::mpsc::SyncSender<control::Event>,
}
impl Controller {
pub fn new(
tc_rx: std::sync::mpsc::Receiver<CcsdsTcPacketOwned>,
tm_tx: std::sync::mpsc::SyncSender<CcsdsTmPacketOwned>,
event_ctrl_tx: std::sync::mpsc::SyncSender<control::Event>,
) -> Self {
Self {
tc_rx,
tm_tx,
event_ctrl_tx,
}
}
pub fn periodic_operation(&mut self) {
self.handle_telecommands();
}
pub fn handle_telecommands(&mut self) {
loop {
match self.tc_rx.try_recv() {
Ok(packet) => {
let tc_id = CcsdsPacketIdAndPsc::new_from_ccsds_packet(&packet.sp_header);
match postcard::from_bytes::<control::request::Request>(&packet.payload) {
Ok(request) => {
log::info!(
"received request {:?} with TC ID {:#010x}",
request,
tc_id.raw()
);
match request {
control::request::Request::Ping => self
.send_telemetry(Some(tc_id), control::response::Response::Ok),
control::request::Request::TestEvent => {
self.event_ctrl_tx.send(control::Event::TestEvent).unwrap()
}
}
}
Err(e) => {
log::warn!("failed to deserialize request: {}", e);
}
}
}
Err(e) => match e {
std::sync::mpsc::TryRecvError::Empty => break,
std::sync::mpsc::TryRecvError::Disconnected => {
log::warn!("packet sender disconnected")
}
},
}
}
}
pub fn send_telemetry(
&self,
tc_id: Option<CcsdsPacketIdAndPsc>,
response: control::response::Response,
) {
match pack_ccsds_tm_packet_for_now(ComponentId::Controller, tc_id, &response) {
Ok(packet) => {
if let Err(e) = self.tm_tx.send(packet) {
log::warn!("failed to send TM packet: {}", e);
}
}
Err(e) => {
log::warn!("failed to pack TM packet: {}", e);
}
}
}
}
+90 -29
View File
@@ -1,16 +1,15 @@
use derive_new::new;
use models::pcdu::{SwitchId, SwitchRequest, SwitchState, SwitchStateBinary};
use std::{cell::RefCell, collections::VecDeque, sync::mpsc, time::Duration};
use satrs::{
power::{
PowerSwitchInfo, PowerSwitcherCommandSender, SwitchRequest, SwitchState, SwitchStateBinary,
},
queue::GenericSendError,
request::{GenericMessage, MessageMetadata},
};
use satrs_minisim::eps::{PcduSwitch, SwitchMapWrapper};
use thiserror::Error;
use crate::eps::pcdu::SwitchMapWrapper;
use self::pcdu::SharedSwitchSet;
pub mod pcdu;
@@ -22,6 +21,7 @@ pub struct PowerSwitchHelper {
}
#[derive(Debug, Error, Copy, Clone, PartialEq, Eq)]
#[allow(dead_code)]
pub enum SwitchCommandingError {
#[error("send error: {0}")]
Send(#[from] GenericSendError),
@@ -31,18 +31,72 @@ pub enum SwitchCommandingError {
pub enum SwitchInfoError {
/// This is a configuration error which should not occur.
#[error("switch ID not in map")]
SwitchIdNotInMap(PcduSwitch),
SwitchIdNotInMap(SwitchId),
#[error("switch set invalid")]
SwitchSetInvalid,
}
impl PowerSwitchInfo<PcduSwitch> for PowerSwitchHelper {
impl PowerSwitchHelper {
pub fn send_switch_on_cmd(
&self,
requestor_info: satrs::request::MessageMetadata,
switch_id: SwitchId,
) -> Result<(), GenericSendError> {
self.switcher_tx.send(GenericMessage::new(
requestor_info,
SwitchRequest::new(switch_id, SwitchStateBinary::On),
))?;
Ok(())
}
#[allow(dead_code)]
pub fn send_switch_off_cmd(
&self,
requestor_info: satrs::request::MessageMetadata,
switch_id: SwitchId,
) -> Result<(), GenericSendError> {
self.switcher_tx.send(GenericMessage::new(
requestor_info,
SwitchRequest::new(switch_id, SwitchStateBinary::Off),
))?;
Ok(())
}
pub fn switch_state(&self, switch_id: SwitchId) -> Result<SwitchState, SwitchInfoError> {
let switch_set = self
.shared_switch_set
.lock()
.expect("failed to lock switch set");
if !switch_set.valid {
return Err(SwitchInfoError::SwitchSetInvalid);
}
if let Some(state) = switch_set.switch_map.get(&switch_id) {
return Ok(*state);
}
Err(SwitchInfoError::SwitchIdNotInMap(switch_id))
}
#[allow(dead_code)]
fn switch_delay_ms(&self) -> Duration {
// Here, we could set device specific switch delays theoretically. Set it to this value
// for now.
Duration::from_millis(1000)
}
pub fn is_switch_on(&self, switch_id: SwitchId) -> bool {
if let Ok(state) = self.switch_state(switch_id) {
state == SwitchState::On
} else {
false
}
}
}
/*
impl PowerSwitchInfo<SwitchId> for PowerSwitchHelper {
type Error = SwitchInfoError;
fn switch_state(
&self,
switch_id: PcduSwitch,
) -> Result<satrs::power::SwitchState, Self::Error> {
fn switch_state(&self, switch_id: SwitchId) -> Result<SwitchState, Self::Error> {
let switch_set = self
.shared_switch_set
.lock()
@@ -63,43 +117,51 @@ impl PowerSwitchInfo<PcduSwitch> for PowerSwitchHelper {
Duration::from_millis(1000)
}
}
*/
impl PowerSwitcherCommandSender<PcduSwitch> for PowerSwitchHelper {
/*
impl PowerSwitcherCommandSender<SwitchId> for PowerSwitchHelper {
type Error = SwitchCommandingError;
fn send_switch_on_cmd(
&self,
requestor_info: satrs::request::MessageMetadata,
switch_id: PcduSwitch,
switch_id: SwitchId,
) -> Result<(), Self::Error> {
self.switcher_tx
.send_switch_on_cmd(requestor_info, switch_id)?;
self.switcher_tx.send(GenericMessage::new(
requestor_info,
SwitchRequest::new(switch_id, SwitchStateBinary::On),
));
Ok(())
}
fn send_switch_off_cmd(
&self,
requestor_info: satrs::request::MessageMetadata,
switch_id: PcduSwitch,
switch_id: SwitchId,
) -> Result<(), Self::Error> {
self.switcher_tx
.send_switch_off_cmd(requestor_info, switch_id)?;
self.switcher_tx.send(GenericMessage::new(
requestor_info,
SwitchRequest::new(switch_id, SwitchStateBinary::Off),
));
Ok(())
}
}
*/
#[allow(dead_code)]
#[derive(new)]
pub struct SwitchRequestInfo {
pub requestor_info: MessageMetadata,
pub switch_id: PcduSwitch,
pub target_state: satrs::power::SwitchStateBinary,
pub switch_id: SwitchId,
pub target_state: SwitchStateBinary,
}
// Test switch helper which can be used for unittests.
#[allow(dead_code)]
pub struct TestSwitchHelper {
pub switch_requests: RefCell<VecDeque<SwitchRequestInfo>>,
pub switch_info_requests: RefCell<VecDeque<PcduSwitch>>,
pub switch_info_requests: RefCell<VecDeque<SwitchId>>,
#[allow(dead_code)]
pub switch_delay_request_count: u32,
pub next_switch_delay: Duration,
@@ -120,13 +182,11 @@ impl Default for TestSwitchHelper {
}
}
impl PowerSwitchInfo<PcduSwitch> for TestSwitchHelper {
/*
impl PowerSwitchInfo<SwitchId> for TestSwitchHelper {
type Error = SwitchInfoError;
fn switch_state(
&self,
switch_id: PcduSwitch,
) -> Result<satrs::power::SwitchState, Self::Error> {
fn switch_state(&self, switch_id: SwitchId) -> Result<satrs::power::SwitchState, Self::Error> {
let mut switch_info_requests_mut = self.switch_info_requests.borrow_mut();
switch_info_requests_mut.push_back(switch_id);
if !self.switch_map_valid {
@@ -144,13 +204,13 @@ impl PowerSwitchInfo<PcduSwitch> for TestSwitchHelper {
}
}
impl PowerSwitcherCommandSender<PcduSwitch> for TestSwitchHelper {
impl PowerSwitcherCommandSender<SwitchId> for TestSwitchHelper {
type Error = SwitchCommandingError;
fn send_switch_on_cmd(
&self,
requestor_info: MessageMetadata,
switch_id: PcduSwitch,
switch_id: SwitchId,
) -> Result<(), Self::Error> {
let mut switch_requests_mut = self.switch_requests.borrow_mut();
switch_requests_mut.push_back(SwitchRequestInfo {
@@ -170,7 +230,7 @@ impl PowerSwitcherCommandSender<PcduSwitch> for TestSwitchHelper {
fn send_switch_off_cmd(
&self,
requestor_info: MessageMetadata,
switch_id: PcduSwitch,
switch_id: SwitchId,
) -> Result<(), Self::Error> {
let mut switch_requests_mut = self.switch_requests.borrow_mut();
switch_requests_mut.push_back(SwitchRequestInfo {
@@ -187,11 +247,12 @@ impl PowerSwitcherCommandSender<PcduSwitch> for TestSwitchHelper {
Ok(())
}
}
*/
#[allow(dead_code)]
impl TestSwitchHelper {
// Helper function which can be used to force a switch to another state for test purposes.
pub fn set_switch_state(&mut self, switch: PcduSwitch, state: SwitchState) {
pub fn set_switch_state(&mut self, switch: SwitchId, state: SwitchState) {
self.switch_map.get_mut().0.insert(switch, state);
}
}
+235 -148
View File
@@ -1,43 +1,118 @@
use std::{
cell::RefCell,
collections::VecDeque,
collections::{HashMap, VecDeque},
sync::{mpsc, Arc, Mutex},
};
use derive_new::new;
use models::{
ccsds::{CcsdsTcPacketOwned, CcsdsTmPacketOwned},
pcdu::{
self, SwitchId, SwitchMapBinary, SwitchMapBinaryWrapper, SwitchRequest, SwitchState,
SwitchStateBinary, SwitchesBitfield,
},
ComponentId,
};
use num_enum::{IntoPrimitive, TryFromPrimitive};
use satrs::{
hk::{HkRequest, HkRequestVariant},
mode::{
ModeAndSubmode, ModeError, ModeProvider, ModeReply, ModeRequestHandler,
ModeRequestHandlerMpscBounded,
},
mode_tree::{ModeChild, ModeNode},
power::SwitchRequest,
pus::{EcssTmSender, PusTmVariant},
queue::GenericSendError,
request::{GenericMessage, MessageMetadata, UniqueApidTargetId},
spacepackets::ByteConversionError,
};
use satrs_example::{
config::components::NO_SENDER,
ids::{eps::PCDU, generic_pus::PUS_MODE},
DeviceMode, TimestampHelper,
request::{GenericMessage, MessageMetadata},
spacepackets::CcsdsPacketIdAndPsc,
};
use satrs_example::{config::components::NO_SENDER, DeviceMode, TimestampHelper};
use satrs_minisim::{
eps::{
PcduReply, PcduRequest, PcduSwitch, SwitchMap, SwitchMapBinaryWrapper, SwitchMapWrapper,
},
eps::{PcduReply, PcduRequest},
SerializableSimMsgPayload, SimReply, SimRequest,
};
use serde::{Deserialize, Serialize};
use strum::IntoEnumIterator as _;
use crate::{
hk::PusHkHelper,
pus::hk::{HkReply, HkReplyVariant},
requests::CompositeRequest,
tmtc::sender::TmTcSender,
};
use crate::ccsds::pack_ccsds_tm_packet_for_now;
#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct SwitchSet {
pub valid: bool,
pub switch_map: SwitchMap,
}
impl SwitchSet {
pub fn new(switch_map: SwitchMap) -> Self {
Self {
valid: true,
switch_map,
}
}
pub fn new_with_init_switches_unknown() -> Self {
let wrapper = SwitchMapWrapper::default();
Self::new(wrapper.0)
}
pub fn as_bitfield(&self) -> Option<SwitchesBitfield> {
for entry in SwitchId::iter() {
if !self.switch_map.contains_key(&entry) {
return None;
}
}
Some(
SwitchesBitfield::builder()
.with_magnetorquer(*self.switch_map.get(&SwitchId::Mgt).unwrap() == SwitchState::On)
.with_mgm1(*self.switch_map.get(&SwitchId::Mgm1).unwrap() == SwitchState::On)
.with_mgm0(*self.switch_map.get(&SwitchId::Mgm0).unwrap() == SwitchState::On)
.build(),
)
}
#[allow(dead_code)]
pub fn set_switch_state(&mut self, switch_id: SwitchId, state: SwitchState) -> bool {
if !self.switch_map.contains_key(&switch_id) {
return false;
}
*self.switch_map.get_mut(&switch_id).unwrap() = state;
true
}
}
pub type SwitchMap = HashMap<SwitchId, SwitchState>;
pub struct SwitchMapWrapper(pub SwitchMap);
impl Default for SwitchMapWrapper {
fn default() -> Self {
let mut switch_map = SwitchMap::default();
for entry in SwitchId::iter() {
switch_map.insert(entry, SwitchState::Unknown);
}
Self(switch_map)
}
}
impl SwitchMapWrapper {
#[allow(dead_code)]
pub fn new_with_init_switches_off() -> Self {
let mut switch_map = SwitchMap::default();
for entry in SwitchId::iter() {
switch_map.insert(entry, SwitchState::Off);
}
Self(switch_map)
}
pub fn from_binary_switch_map_ref(switch_map: &SwitchMapBinary) -> Self {
Self(
switch_map
.iter()
.map(|(key, value)| (*key, SwitchState::from(*value)))
.collect(),
)
}
}
pub type SharedSwitchSet = Arc<Mutex<SwitchSet>>;
pub trait SerialInterface {
type Error: core::fmt::Debug;
@@ -194,44 +269,47 @@ pub enum OpCode {
PollAndRecvReplies = 1,
}
#[derive(Clone, PartialEq, Eq, Default, Serialize, Deserialize)]
pub struct SwitchSet {
pub valid: bool,
pub switch_map: SwitchMap,
}
pub type SharedSwitchSet = Arc<Mutex<SwitchSet>>;
/// Example PCDU device handler.
#[derive(new)]
#[allow(clippy::too_many_arguments)]
pub struct PcduHandler<ComInterface: SerialInterface> {
id: UniqueApidTargetId,
dev_str: &'static str,
mode_node: ModeRequestHandlerMpscBounded,
composite_request_rx: mpsc::Receiver<GenericMessage<CompositeRequest>>,
hk_reply_tx: mpsc::SyncSender<GenericMessage<HkReply>>,
switch_request_rx: mpsc::Receiver<GenericMessage<SwitchRequest>>,
tm_sender: TmTcSender,
tc_rx: std::sync::mpsc::Receiver<CcsdsTcPacketOwned>,
tm_tx: mpsc::SyncSender<CcsdsTmPacketOwned>,
pub com_interface: ComInterface,
shared_switch_map: Arc<Mutex<SwitchSet>>,
#[new(value = "PusHkHelper::new(id)")]
hk_helper: PusHkHelper,
#[new(value = "ModeAndSubmode::new(satrs_example::DeviceMode::Off as u32, 0)")]
mode_and_submode: ModeAndSubmode,
#[new(default)]
stamp_helper: TimestampHelper,
#[new(value = "[0; 256]")]
tm_buf: [u8; 256],
}
impl<ComInterface: SerialInterface> PcduHandler<ComInterface> {
pub fn new(
mode_node: ModeRequestHandlerMpscBounded,
tc_rx: std::sync::mpsc::Receiver<CcsdsTcPacketOwned>,
tm_tx: std::sync::mpsc::SyncSender<CcsdsTmPacketOwned>,
switch_request_rx: mpsc::Receiver<GenericMessage<SwitchRequest>>,
com_interface: ComInterface,
shared_switch_map: Arc<Mutex<SwitchSet>>,
) -> Self {
Self {
dev_str: "PCDU",
mode_node,
tc_rx,
switch_request_rx,
tm_tx,
com_interface,
shared_switch_map,
mode_and_submode: ModeAndSubmode::new(0, 0),
stamp_helper: TimestampHelper::default(),
}
}
pub fn periodic_operation(&mut self, op_code: OpCode) {
match op_code {
OpCode::RegularOp => {
self.stamp_helper.update_from_now();
// Handle requests.
self.handle_composite_requests();
self.handle_telecommands();
self.handle_mode_requests();
self.handle_switch_requests();
// Poll the switch states and/or telemetry regularly here.
@@ -246,75 +324,87 @@ impl<ComInterface: SerialInterface> PcduHandler<ComInterface> {
}
}
pub fn handle_composite_requests(&mut self) {
pub fn handle_telecommands(&mut self) {
loop {
match self.composite_request_rx.try_recv() {
Ok(ref msg) => match &msg.message {
CompositeRequest::Hk(hk_request) => {
self.handle_hk_request(&msg.requestor_info, hk_request)
}
// TODO: This object does not have actions (yet).. Still send back completion failure
// reply.
CompositeRequest::Action(_action_req) => {}
},
Err(e) => {
if e != mpsc::TryRecvError::Empty {
log::warn!(
"{}: failed to receive composite request: {:?}",
self.dev_str,
e
);
} else {
break;
match self.tc_rx.try_recv() {
Ok(packet) => {
let tc_id = CcsdsPacketIdAndPsc::new_from_ccsds_packet(&packet.sp_header);
match postcard::from_bytes::<pcdu::request::Request>(&packet.payload) {
Ok(request) => {
log::info!(
"received request {:?} with TC ID {:#010x}",
request,
tc_id.raw()
);
match request {
pcdu::request::Request::Ping => {
self.send_tm(Some(tc_id), pcdu::response::Response::Ok)
}
pcdu::request::Request::GetSwitches => self.send_tm(
Some(tc_id),
pcdu::response::Response::Switches(
self.shared_switch_map
.lock()
.unwrap()
.as_bitfield()
.expect("could not build switches response"),
),
),
pcdu::request::Request::EnableSwitches(switches) => {
self.handle_switches_bitfield_request(
switches,
SwitchStateBinary::On,
);
}
pcdu::request::Request::DisableSwitches(switches) => {
self.handle_switches_bitfield_request(
switches,
SwitchStateBinary::Off,
);
}
}
}
Err(e) => {
log::warn!("failed to deserialize request: {}", e);
}
}
}
Err(e) => match e {
std::sync::mpsc::TryRecvError::Empty => break,
std::sync::mpsc::TryRecvError::Disconnected => {
log::warn!("packet sender disconnected")
}
},
}
}
}
pub fn handle_hk_request(&mut self, requestor_info: &MessageMetadata, hk_request: &HkRequest) {
match hk_request.variant {
HkRequestVariant::OneShot => {
if hk_request.unique_id == SetId::SwitcherSet as u32 {
if let Ok(hk_tm) = self.hk_helper.generate_hk_report_packet(
self.stamp_helper.stamp(),
SetId::SwitcherSet as u32,
&mut |hk_buf| {
// Send TM down as JSON.
let switch_map_snapshot = self
.shared_switch_map
.lock()
.expect("failed to lock switch map")
.clone();
let switch_map_json = serde_json::to_string(&switch_map_snapshot)
.expect("failed to serialize switch map");
if switch_map_json.len() > hk_buf.len() {
log::error!("switch map JSON too large for HK buffer");
return Err(ByteConversionError::ToSliceTooSmall {
found: hk_buf.len(),
expected: switch_map_json.len(),
});
}
Ok(switch_map_json.len())
},
&mut self.tm_buf,
) {
self.tm_sender
.send_tm(self.id.id(), PusTmVariant::Direct(hk_tm))
.expect("failed to send HK TM");
self.hk_reply_tx
.send(GenericMessage::new(
*requestor_info,
HkReply::new(hk_request.unique_id, HkReplyVariant::Ack),
))
.expect("failed to send HK reply");
}
pub fn handle_switches_bitfield_request(
&mut self,
switches: SwitchesBitfield,
state: SwitchStateBinary,
) {
if switches.mgm0() {
self.handle_device_switching(SwitchId::Mgm0, state);
}
if switches.mgm1() {
self.handle_device_switching(SwitchId::Mgm1, state);
}
if switches.magnetorquer() {
self.handle_device_switching(SwitchId::Mgt, state);
}
}
pub fn send_tm(&self, tc_id: Option<CcsdsPacketIdAndPsc>, response: pcdu::response::Response) {
match pack_ccsds_tm_packet_for_now(ComponentId::Pcdu, tc_id, &response) {
Ok(packet) => {
if let Err(e) = self.tm_tx.send(packet) {
log::warn!("failed to send TM packet: {}", e);
}
}
HkRequestVariant::EnablePeriodic => todo!(),
HkRequestVariant::DisablePeriodic => todo!(),
HkRequestVariant::ModifyCollectionInterval(_) => todo!(),
Err(e) => {
log::warn!("failed to pack TM packet: {}", e);
}
}
}
@@ -357,22 +447,26 @@ impl<ComInterface: SerialInterface> PcduHandler<ComInterface> {
}
}
pub fn handle_device_switching(&mut self, switch_id: SwitchId, state: SwitchStateBinary) {
let pcdu_req = PcduRequest::SwitchDevice {
switch: switch_id,
state,
};
let pcdu_req_ser = serde_json::to_string(&pcdu_req).unwrap();
self.com_interface
.send(pcdu_req_ser.as_bytes())
.expect("failed to send switch request to PCDU");
}
pub fn handle_switch_requests(&mut self) {
loop {
match self.switch_request_rx.try_recv() {
Ok(switch_req) => match PcduSwitch::try_from(switch_req.message.switch_id()) {
Ok(pcdu_switch) => {
let pcdu_req = PcduRequest::SwitchDevice {
switch: pcdu_switch,
state: switch_req.message.target_state(),
};
let pcdu_req_ser = serde_json::to_string(&pcdu_req).unwrap();
self.com_interface
.send(pcdu_req_ser.as_bytes())
.expect("failed to send switch request to PCDU");
}
Err(e) => todo!("failed to convert switch ID {:?} to typed PCDU switch", e),
},
Ok(switch_req) => {
self.handle_device_switching(
switch_req.message.switch_id(),
switch_req.message.target_state(),
);
}
Err(e) => match e {
mpsc::TryRecvError::Empty => break,
mpsc::TryRecvError::Disconnected => {
@@ -450,7 +544,7 @@ impl<ComInterface: SerialInterface> ModeRequestHandler for PcduHandler<ComInterf
if requestor.sender_id() == NO_SENDER {
return Ok(());
}
if requestor.sender_id() != PUS_MODE.id() {
if requestor.sender_id() != ComponentId::Ground as u32 {
log::warn!(
"can not send back mode reply to sender {}",
requestor.sender_id()
@@ -467,7 +561,7 @@ impl<ComInterface: SerialInterface> ModeRequestHandler for PcduHandler<ComInterf
requestor: MessageMetadata,
reply: ModeReply,
) -> Result<(), Self::Error> {
if requestor.sender_id() != PUS_MODE.id() {
if requestor.sender_id() != ComponentId::Ground as u32 {
log::warn!(
"can not send back mode reply to sender {}",
requestor.sender_id()
@@ -490,7 +584,7 @@ impl<ComInterface: SerialInterface> ModeRequestHandler for PcduHandler<ComInterf
impl<ComInterface: SerialInterface> ModeNode for PcduHandler<ComInterface> {
fn id(&self) -> satrs::ComponentId {
PCDU.into()
ComponentId::EpsPcdu as u32
}
}
@@ -506,12 +600,8 @@ impl<ComInterface: SerialInterface> ModeChild for PcduHandler<ComInterface> {
mod tests {
use std::sync::mpsc;
use arbitrary_int::u21;
use satrs::{
mode::ModeRequest, power::SwitchStateBinary, request::GenericMessage, tmtc::PacketAsVec,
};
use satrs_example::ids::{self, Apid};
use satrs_minisim::eps::SwitchMapBinary;
use models::pcdu::{SwitchMapBinary, SwitchStateBinary};
use satrs::{mode::ModeRequest, request::GenericMessage};
use super::*;
@@ -550,13 +640,13 @@ mod tests {
}
}
#[allow(dead_code)]
pub struct PcduTestbench {
pub mode_request_tx: mpsc::SyncSender<GenericMessage<ModeRequest>>,
pub mode_reply_rx_to_pus: mpsc::Receiver<GenericMessage<ModeReply>>,
pub mode_reply_rx_to_parent: mpsc::Receiver<GenericMessage<ModeReply>>,
pub composite_request_tx: mpsc::Sender<GenericMessage<CompositeRequest>>,
pub hk_reply_rx: mpsc::Receiver<GenericMessage<HkReply>>,
pub tm_rx: mpsc::Receiver<PacketAsVec>,
pub tc_tx: mpsc::SyncSender<CcsdsTcPacketOwned>,
pub tm_rx: mpsc::Receiver<CcsdsTmPacketOwned>,
pub switch_request_tx: mpsc::Sender<GenericMessage<SwitchRequest>>,
pub handler: PcduHandler<SerialInterfaceTest>,
}
@@ -564,33 +654,30 @@ mod tests {
impl PcduTestbench {
pub fn new() -> Self {
let (mode_request_tx, mode_request_rx) = mpsc::sync_channel(5);
let (mode_reply_tx_to_pus, mode_reply_rx_to_pus) = mpsc::sync_channel(5);
let (_mode_reply_tx_to_pus, mode_reply_rx_to_pus) = mpsc::sync_channel(5);
let (mode_reply_tx_to_parent, mode_reply_rx_to_parent) = mpsc::sync_channel(5);
let mode_node = ModeRequestHandlerMpscBounded::new(PCDU.into(), mode_request_rx);
let (composite_request_tx, composite_request_rx) = mpsc::channel();
let (hk_reply_tx, hk_reply_rx) = mpsc::sync_channel(10);
let (tm_tx, tm_rx) = mpsc::sync_channel::<PacketAsVec>(5);
let mode_node =
ModeRequestHandlerMpscBounded::new(ComponentId::EpsPcdu as u32, mode_request_rx);
let (tc_tx, tc_rx) = mpsc::sync_channel(5);
let (tm_tx, tm_rx) = mpsc::sync_channel(5);
let (switch_request_tx, switch_reqest_rx) = mpsc::channel();
let shared_switch_map = Arc::new(Mutex::new(SwitchSet::default()));
let shared_switch_map =
Arc::new(Mutex::new(SwitchSet::new_with_init_switches_unknown()));
let mut handler = PcduHandler::new(
UniqueApidTargetId::new(Apid::Eps.raw_value(), u21::new(0)),
"TEST_PCDU",
mode_node,
composite_request_rx,
hk_reply_tx,
tc_rx,
tm_tx.clone(),
switch_reqest_rx,
TmTcSender::Heap(tm_tx.clone()),
SerialInterfaceTest::default(),
shared_switch_map,
);
handler.add_mode_parent(ids::eps::SUBSYSTEM.into(), mode_reply_tx_to_parent);
handler.add_mode_parent(PUS_MODE.into(), mode_reply_tx_to_pus);
handler.add_mode_parent(ComponentId::EpsSubsystem as u32, mode_reply_tx_to_parent);
//handler.add_mode_parent(PUS_MODE.into(), mode_reply_tx_to_pus);
Self {
mode_request_tx,
mode_reply_rx_to_pus,
mode_reply_rx_to_parent,
composite_request_tx,
hk_reply_rx,
tc_tx,
tm_rx,
switch_request_tx,
handler,
@@ -610,7 +697,7 @@ mod tests {
pub fn verify_switch_req_was_sent(
&self,
expected_queue_len: usize,
switch_id: PcduSwitch,
switch_id: SwitchId,
target_state: SwitchStateBinary,
) {
// Check that there is now communication happening.
@@ -679,7 +766,7 @@ mod tests {
testbench
.mode_request_tx
.send(GenericMessage::new(
MessageMetadata::new(0, PUS_MODE.id()),
MessageMetadata::new(0, ComponentId::Ground as u32),
ModeRequest::SetMode {
mode_and_submode: ModeAndSubmode::new(DeviceMode::Normal as u32, 0),
forced: false,
@@ -687,7 +774,7 @@ mod tests {
))
.expect("failed to send mode request");
let switch_map_shared = testbench.handler.shared_switch_map.lock().unwrap();
assert!(!switch_map_shared.valid);
assert!(switch_map_shared.valid);
drop(switch_map_shared);
testbench.handler.periodic_operation(OpCode::RegularOp);
testbench
@@ -714,7 +801,7 @@ mod tests {
testbench
.mode_request_tx
.send(GenericMessage::new(
MessageMetadata::new(0, PUS_MODE.id()),
MessageMetadata::new(0, ComponentId::Ground as u32),
ModeRequest::SetMode {
mode_and_submode: ModeAndSubmode::new(DeviceMode::Normal as u32, 0),
forced: false,
@@ -724,8 +811,8 @@ mod tests {
testbench
.switch_request_tx
.send(GenericMessage::new(
MessageMetadata::new(0, ids::acs::MGM0.id()),
SwitchRequest::new(0, SwitchStateBinary::On),
MessageMetadata::new(0, ComponentId::AcsMgm0 as u32),
SwitchRequest::new(SwitchId::Mgm0, SwitchStateBinary::On),
))
.expect("failed to send switch request");
testbench.handler.periodic_operation(OpCode::RegularOp);
@@ -733,11 +820,11 @@ mod tests {
.handler
.periodic_operation(OpCode::PollAndRecvReplies);
testbench.verify_switch_req_was_sent(2, PcduSwitch::Mgm, SwitchStateBinary::On);
testbench.verify_switch_req_was_sent(2, SwitchId::Mgm0, SwitchStateBinary::On);
testbench.verify_switch_info_req_was_sent(1);
let mut switch_map = SwitchMapBinaryWrapper::default().0;
*switch_map
.get_mut(&PcduSwitch::Mgm)
.get_mut(&SwitchId::Mgm0)
.expect("switch state setting failed") = SwitchStateBinary::On;
testbench.verify_switch_reply_received(1, switch_map);
+33
View File
@@ -0,0 +1,33 @@
use models::{ccsds::CcsdsTmPacketOwned, control, ComponentId, Event, Message};
use crate::ccsds::pack_ccsds_tm_packet_for_now;
pub struct EventManager {
pub ctrl_rx: std::sync::mpsc::Receiver<control::Event>,
pub tm_tx: std::sync::mpsc::SyncSender<CcsdsTmPacketOwned>,
}
impl EventManager {
pub fn periodic_operation(&mut self) {
if let Ok(event) = self.ctrl_rx.try_recv() {
self.event_to_tm(ComponentId::Controller, &Event::ControllerEvent(event));
}
}
pub fn event_to_tm(
&mut self,
sender_id: ComponentId,
event: &(impl serde::Serialize + Message),
) {
match pack_ccsds_tm_packet_for_now(sender_id, None, event) {
Ok(packet) => {
if let Err(e) = self.tm_tx.send(packet) {
log::warn!("error sending event TM packet: {:?}", e);
}
}
Err(e) => {
log::warn!("error packing event TM packet: {:?}", e);
}
}
}
}
-109
View File
@@ -1,109 +0,0 @@
//! This is an auto-generated configuration module.
use satrs::request::UniqueApidTargetId;
#[derive(Debug, PartialEq, Eq, strum::EnumIter)]
#[bitbybit::bitenum(u11)]
pub enum Apid {
Sched = 1,
GenericPus = 2,
Acs = 3,
Cfdp = 4,
Tmtc = 5,
Eps = 6,
}
pub mod acs {
#[derive(Debug, PartialEq, Eq)]
#[bitbybit::bitenum(u21, exhaustive = false)]
pub enum Id {
Subsystem = 1,
Assembly = 2,
Mgm0 = 3,
Mgm1 = 4,
}
pub const SUBSYSTEM: super::UniqueApidTargetId =
super::UniqueApidTargetId::new(super::Apid::Acs.raw_value(), Id::Subsystem.raw_value());
pub const ASSEMBLY: super::UniqueApidTargetId =
super::UniqueApidTargetId::new(super::Apid::Acs.raw_value(), Id::Assembly.raw_value());
pub const MGM0: super::UniqueApidTargetId =
super::UniqueApidTargetId::new(super::Apid::Acs.raw_value(), Id::Mgm0.raw_value());
pub const MGM1: super::UniqueApidTargetId =
super::UniqueApidTargetId::new(super::Apid::Acs.raw_value(), Id::Mgm1.raw_value());
}
pub mod eps {
#[derive(Debug, PartialEq, Eq)]
#[bitbybit::bitenum(u21, exhaustive = false)]
pub enum Id {
Pcdu = 0,
Subsystem = 1,
}
pub const PCDU: super::UniqueApidTargetId =
super::UniqueApidTargetId::new(super::Apid::Eps.raw_value(), Id::Pcdu.raw_value());
pub const SUBSYSTEM: super::UniqueApidTargetId =
super::UniqueApidTargetId::new(super::Apid::Eps.raw_value(), Id::Subsystem.raw_value());
}
pub mod generic_pus {
#[derive(Debug, PartialEq, Eq)]
#[bitbybit::bitenum(u21, exhaustive = false)]
pub enum Id {
PusEventManagement = 0,
PusRouting = 1,
PusTest = 2,
PusAction = 3,
PusMode = 4,
PusHk = 5,
}
pub const PUS_EVENT_MANAGEMENT: super::UniqueApidTargetId = super::UniqueApidTargetId::new(
super::Apid::GenericPus.raw_value(),
Id::PusEventManagement.raw_value(),
);
pub const PUS_ROUTING: super::UniqueApidTargetId = super::UniqueApidTargetId::new(
super::Apid::GenericPus.raw_value(),
Id::PusRouting.raw_value(),
);
pub const PUS_TEST: super::UniqueApidTargetId = super::UniqueApidTargetId::new(
super::Apid::GenericPus.raw_value(),
Id::PusTest.raw_value(),
);
pub const PUS_ACTION: super::UniqueApidTargetId = super::UniqueApidTargetId::new(
super::Apid::GenericPus.raw_value(),
Id::PusAction.raw_value(),
);
pub const PUS_MODE: super::UniqueApidTargetId = super::UniqueApidTargetId::new(
super::Apid::GenericPus.raw_value(),
Id::PusMode.raw_value(),
);
pub const PUS_HK: super::UniqueApidTargetId =
super::UniqueApidTargetId::new(super::Apid::GenericPus.raw_value(), Id::PusHk.raw_value());
}
pub mod sched {
#[derive(Debug, PartialEq, Eq)]
#[bitbybit::bitenum(u21, exhaustive = false)]
pub enum Id {
PusSched = 0,
}
pub const PUS_SCHED: super::UniqueApidTargetId =
super::UniqueApidTargetId::new(super::Apid::Sched.raw_value(), Id::PusSched.raw_value());
}
pub mod tmtc {
#[derive(Debug, PartialEq, Eq)]
#[bitbybit::bitenum(u21, exhaustive = false)]
pub enum Id {
UdpServer = 0,
TcpServer = 1,
}
pub const UDP_SERVER: super::UniqueApidTargetId =
super::UniqueApidTargetId::new(super::Apid::Tmtc.raw_value(), Id::UdpServer.raw_value());
pub const TCP_SERVER: super::UniqueApidTargetId =
super::UniqueApidTargetId::new(super::Apid::Tmtc.raw_value(), Id::TcpServer.raw_value());
}
+57 -72
View File
@@ -1,66 +1,29 @@
#![allow(dead_code)]
use std::collections::VecDeque;
use std::net::{SocketAddr, UdpSocket};
use std::sync::mpsc;
use std::sync::{mpsc, Arc, Mutex};
use log::{info, warn};
use log::warn;
use models::ccsds::CcsdsTmPacketOwned;
use satrs::hal::std::udp_server::{ReceiveResult, UdpTcServer};
use satrs::pus::HandlingStatus;
use satrs::queue::GenericSendError;
use satrs::tmtc::PacketAsVec;
use satrs::pool::{PoolProviderWithGuards, SharedStaticMemoryPool};
use satrs::tmtc::PacketInPool;
use crate::tmtc::sender::TmTcSender;
pub trait UdpTmHandler {
pub trait UdpTmHandlerProvider {
fn send_tm_to_udp_client(&mut self, socket: &UdpSocket, recv_addr: &SocketAddr);
}
pub struct StaticUdpTmHandler {
pub tm_rx: mpsc::Receiver<PacketInPool>,
pub tm_store: SharedStaticMemoryPool,
pub struct UdpTmHandlerWithChannel {
pub tm_rx: mpsc::Receiver<CcsdsTmPacketOwned>,
}
impl UdpTmHandler for StaticUdpTmHandler {
fn send_tm_to_udp_client(&mut self, socket: &UdpSocket, &recv_addr: &SocketAddr) {
while let Ok(pus_tm_in_pool) = self.tm_rx.try_recv() {
let store_lock = self.tm_store.write();
if store_lock.is_err() {
warn!("Locking TM store failed");
continue;
}
let mut store_lock = store_lock.unwrap();
let pg = store_lock.read_with_guard(pus_tm_in_pool.store_addr);
let read_res = pg.read_as_vec();
if read_res.is_err() {
warn!("Error reading TM pool data");
continue;
}
let buf = read_res.unwrap();
let result = socket.send_to(&buf, recv_addr);
if let Err(e) = result {
warn!("Sending TM with UDP socket failed: {e}")
}
}
}
}
pub struct DynamicUdpTmHandler {
pub tm_rx: mpsc::Receiver<PacketAsVec>,
}
impl UdpTmHandler for DynamicUdpTmHandler {
impl UdpTmHandlerProvider for UdpTmHandlerWithChannel {
fn send_tm_to_udp_client(&mut self, socket: &UdpSocket, recv_addr: &SocketAddr) {
while let Ok(tm) = self.tm_rx.try_recv() {
if tm.packet.len() > 9 {
let service = tm.packet[7];
let subservice = tm.packet[8];
info!("Sending PUS TM[{service},{subservice}]")
} else {
info!("Sending PUS TM");
}
let result = socket.send_to(&tm.packet, recv_addr);
log::debug!("sending TM from sender {:?}", tm.tm_header.sender_id);
let result = socket.send_to(&tm.to_vec(), recv_addr);
if let Err(e) = result {
warn!("Sending TM with UDP socket failed: {e}")
}
@@ -68,12 +31,49 @@ impl UdpTmHandler for DynamicUdpTmHandler {
}
}
pub struct UdpTmtcServer<TmHandler: UdpTmHandler> {
pub udp_tc_server: UdpTcServer<TmTcSender, GenericSendError>,
pub tm_handler: TmHandler,
#[derive(Default, Debug, Clone)]
pub struct TestTmHandler {
addrs_to_send_to: Arc<Mutex<VecDeque<SocketAddr>>>,
}
impl<TmHandler: UdpTmHandler> UdpTmtcServer<TmHandler> {
impl UdpTmHandlerProvider for TestTmHandler {
fn send_tm_to_udp_client(&mut self, _socket: &UdpSocket, recv_addr: &SocketAddr) {
self.addrs_to_send_to.lock().unwrap().push_back(*recv_addr);
}
}
pub enum UdpTmHandler {
Normal(UdpTmHandlerWithChannel),
Test(TestTmHandler),
}
impl From<UdpTmHandlerWithChannel> for UdpTmHandler {
fn from(handler: UdpTmHandlerWithChannel) -> Self {
UdpTmHandler::Normal(handler)
}
}
impl From<TestTmHandler> for UdpTmHandler {
fn from(handler: TestTmHandler) -> Self {
UdpTmHandler::Test(handler)
}
}
impl UdpTmHandlerProvider for UdpTmHandler {
fn send_tm_to_udp_client(&mut self, socket: &UdpSocket, recv_addr: &SocketAddr) {
match self {
UdpTmHandler::Normal(handler) => handler.send_tm_to_udp_client(socket, recv_addr),
UdpTmHandler::Test(handler) => handler.send_tm_to_udp_client(socket, recv_addr),
}
}
}
pub struct UdpTmtcServer {
pub udp_tc_server: UdpTcServer<TmTcSender, GenericSendError>,
pub tm_handler: UdpTmHandler,
}
impl UdpTmtcServer {
pub fn periodic_operation(&mut self) {
loop {
if self.poll_tc_server() == HandlingStatus::Empty {
@@ -107,15 +107,12 @@ impl<TmHandler: UdpTmHandler> UdpTmtcServer<TmHandler> {
#[cfg(test)]
mod tests {
use std::net::IpAddr;
use std::net::Ipv4Addr;
use std::{
collections::VecDeque,
net::IpAddr,
sync::{Arc, Mutex},
};
use arbitrary_int::traits::Integer as _;
use arbitrary_int::u14;
use models::Apid;
use satrs::spacepackets::ecss::{CreatorConfig, MessageTypeId};
use satrs::{
spacepackets::{
@@ -125,25 +122,13 @@ mod tests {
ComponentId,
};
use satrs_example::config::OBSW_SERVER_ADDR;
use satrs_example::ids;
use crate::tmtc::sender::{MockSender, TmTcSender};
use crate::tmtc::sender::MockSender;
use super::*;
const UDP_SERVER_ID: ComponentId = 0x05;
#[derive(Default, Debug, Clone)]
pub struct TestTmHandler {
addrs_to_send_to: Arc<Mutex<VecDeque<SocketAddr>>>,
}
impl UdpTmHandler for TestTmHandler {
fn send_tm_to_udp_client(&mut self, _socket: &UdpSocket, recv_addr: &SocketAddr) {
self.addrs_to_send_to.lock().unwrap().push_back(*recv_addr);
}
}
#[test]
fn test_basic() {
let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), 0);
@@ -154,7 +139,7 @@ mod tests {
let tm_handler_calls = tm_handler.addrs_to_send_to.clone();
let mut udp_dyn_server = UdpTmtcServer {
udp_tc_server,
tm_handler,
tm_handler: tm_handler.into(),
};
udp_dyn_server.periodic_operation();
let queue = udp_dyn_server
@@ -179,9 +164,9 @@ mod tests {
let tm_handler_calls = tm_handler.addrs_to_send_to.clone();
let mut udp_dyn_server = UdpTmtcServer {
udp_tc_server,
tm_handler,
tm_handler: tm_handler.into(),
};
let sph = SpHeader::new_for_unseg_tc(ids::Apid::GenericPus.raw_value(), u14::ZERO, 0);
let sph = SpHeader::new_for_unseg_tc(Apid::Tmtc.raw_value(), u14::ZERO, 0);
let ping_tc = PusTcCreator::new_simple(
sph,
MessageTypeId::new(17, 1),
@@ -1,6 +1,3 @@
use std::sync::mpsc::{self};
use crate::pus::create_verification_reporter;
use arbitrary_int::traits::Integer as _;
use arbitrary_int::u11;
use satrs::event_man_legacy::{EventMessageU32, EventRoutingError};
@@ -34,6 +31,7 @@ impl EventTmHook for EventApidSetter {
}
}
/*
/// The PUS event handler subscribes for all events and converts them into ECSS PUS 5 event
/// packets. It also handles the verification completion of PUS event service requests.
pub struct PusEventHandler<TmSender: EcssTmSender> {
@@ -292,3 +290,4 @@ mod tests {
// TODO: Add test.
}
}
*/
@@ -39,6 +39,7 @@ pub fn create_verification_reporter(owner_id: ComponentId, apid: Apid) -> Verifi
VerificationReporter::new(owner_id, &verif_cfg)
}
/*
/// Simple router structure which forwards PUS telecommands to dedicated handlers.
pub struct PusTcMpscRouter {
pub test_tc_sender: mpsc::SyncSender<EcssTcAndToken>,
@@ -187,6 +188,7 @@ impl PusTcDistributor {
Ok(HandlingStatus::HandledOne)
}
}
*/
pub trait TargetedPusService {
const SERVICE_ID: u8;
@@ -15,8 +15,8 @@ use satrs::spacepackets::ecss::tc::PusTcReader;
use satrs::spacepackets::ecss::PusPacket;
use satrs::ComponentId;
use satrs_example::config::tmtc_err;
use satrs_example::ids;
/*
#[derive(Clone, Debug)]
#[non_exhaustive]
pub enum CompositeRequest {
@@ -153,3 +153,4 @@ impl PusRequestRouter<ModeRequest> for GenericRequestRouter {
Err(GenericRoutingError::UnknownTargetId(target_id))
}
}
*/
+17 -1
View File
@@ -1,7 +1,23 @@
extern crate alloc;
pub use models::ComponentId;
use satrs::spacepackets::time::cds::CdsTime;
pub mod config;
pub mod ids;
/// Simple type modelling packet stored in the heap. This structure is intended to
/// be used when sending a packet via a message queue, so it also contains the sender ID.
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct PacketAsVec {
pub sender_id: ComponentId,
pub packet: Vec<u8>,
}
impl PacketAsVec {
pub fn new(sender_id: ComponentId, packet: Vec<u8>) -> Self {
Self { sender_id, packet }
}
}
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
pub enum DeviceMode {
+1 -1
View File
@@ -9,7 +9,7 @@ pub fn setup_logger() -> Result<(), fern::InitError> {
message
))
})
.level(log::LevelFilter::Debug)
.level(log::LevelFilter::Info)
.chain(std::io::stdout())
.chain(fern::log_file("output.log")?)
.apply()?;
+83 -251
View File
@@ -5,12 +5,10 @@ use std::{
time::Duration,
};
use acs::mgm::{MgmHandlerLis3Mdl, SpiDummyInterface, SpiSimInterface, SpiSimInterfaceWrapper};
use eps::{
pcdu::{PcduHandler, SerialInterfaceDummy, SerialInterfaceToSim, SerialSimInterfaceWrapper},
PowerSwitchHelper,
};
use events::EventHandler;
use interface::{
sim_client_udp::create_sim_client,
tcp::{SyncTcpTmSource, TcpTask},
@@ -18,256 +16,102 @@ use interface::{
};
use log::info;
use logger::setup_logger;
use pus::{
action::create_action_service,
event::create_event_service,
hk::create_hk_service,
mode::create_mode_service,
scheduler::{create_scheduler_service, TcReleaser},
stack::PusStack,
test::create_test_service,
PusTcDistributor, PusTcMpscRouter,
};
use requests::GenericRequestRouter;
use models::ComponentId;
use satrs::{
hal::std::{tcp_server::ServerConfig, udp_server::UdpTcServer},
mode::{Mode, ModeAndSubmode, ModeRequest, ModeRequestHandlerMpscBounded},
mode_tree::connect_mode_nodes,
pus::{event_man::EventRequestWithToken, EcssTcCacher, HandlingStatus},
pus::HandlingStatus,
request::{GenericMessage, MessageMetadata},
spacepackets::time::cds::CdsTime,
};
use satrs_example::{
config::{
components::NO_SENDER,
pool::create_sched_tc_pool,
tasks::{FREQ_MS_AOCS, FREQ_MS_PUS_STACK, FREQ_MS_UDP_TMTC, SIM_CLIENT_IDLE_DELAY_MS},
tasks::{FREQ_MS_AOCS, FREQ_MS_CONTROLLER, FREQ_MS_UDP_TMTC, SIM_CLIENT_IDLE_DELAY_MS},
OBSW_SERVER_ADDR, PACKET_ID_VALIDATOR, SERVER_PORT,
},
ids::{
acs::*,
eps::*,
tmtc::{TCP_SERVER, UDP_SERVER},
},
DeviceMode,
};
use tmtc::sender::TmTcSender;
use tmtc::{tc_source::TcSourceTask, tm_sink::TmSink};
cfg_if::cfg_if! {
if #[cfg(feature = "heap_tmtc")] {
use interface::udp::DynamicUdpTmHandler;
use satrs::pus::EcssTcVecCacher;
use tmtc::{tc_source::TcSourceTaskDynamic, tm_sink::TmSinkDynamic};
} else {
use std::sync::RwLock;
use interface::udp::StaticUdpTmHandler;
use satrs::pus::EcssTcInSharedPoolCacher;
use satrs::tmtc::{PacketSenderWithSharedPool, SharedPacketPool};
use satrs_example::config::pool::create_static_pools;
use tmtc::{
tc_source::TcSourceTaskStatic,
tm_sink::TmSinkStatic,
};
}
}
use crate::{
acs::mgm::{MgmHandlerLis3Mdl, SpiDummyInterface, SpiSimInterface, SpiSimInterfaceWrapper},
control::Controller,
eps::pcdu::SwitchSet,
event_manager::EventManager,
interface::udp::UdpTmHandlerWithChannel,
tmtc::tc_source::CcsdsDistributor,
};
mod acs;
mod ccsds;
mod control;
mod eps;
mod events;
mod hk;
mod event_manager;
mod interface;
mod logger;
mod pus;
mod requests;
mod spi;
mod tmtc;
fn main() {
setup_logger().expect("setting up logging with fern failed");
println!("Running OBSW example");
cfg_if::cfg_if! {
if #[cfg(not(feature = "heap_tmtc"))] {
let (tm_pool, tc_pool) = create_static_pools();
let shared_tm_pool = Arc::new(RwLock::new(tm_pool));
let shared_tc_pool = Arc::new(RwLock::new(tc_pool));
let shared_tm_pool_wrapper = SharedPacketPool::new(&shared_tm_pool);
let shared_tc_pool_wrapper = SharedPacketPool::new(&shared_tc_pool);
}
}
println!("Runng OBSW example");
let (tc_source_tx, tc_source_rx) = mpsc::sync_channel(50);
let (tm_sink_tx, tm_sink_rx) = mpsc::sync_channel(50);
let (tm_server_tx, tm_server_rx) = mpsc::sync_channel(50);
cfg_if::cfg_if! {
if #[cfg(not(feature = "heap_tmtc"))] {
let tm_sender = TmTcSender::Static(
PacketSenderWithSharedPool::new(tm_sink_tx.clone(), shared_tm_pool_wrapper.clone())
);
} else if #[cfg(feature = "heap_tmtc")] {
let tm_sender = TmTcSender::Heap(tm_sink_tx.clone());
}
}
let (sim_request_tx, sim_request_rx) = mpsc::channel();
let (mgm_0_sim_reply_tx, mgm_0_sim_reply_rx) = mpsc::channel();
let (mgm_1_sim_reply_tx, mgm_1_sim_reply_rx) = mpsc::channel();
let (pcdu_sim_reply_tx, pcdu_sim_reply_rx) = mpsc::channel();
let mut opt_sim_client = create_sim_client(sim_request_rx);
let (mgm_0_handler_composite_tx, mgm_0_handler_composite_rx) = mpsc::sync_channel(10);
let (mgm_1_handler_composite_tx, mgm_1_handler_composite_rx) = mpsc::sync_channel(10);
let (pcdu_handler_composite_tx, pcdu_handler_composite_rx) = mpsc::sync_channel(30);
let (mgm_0_handler_mode_tx, mgm_0_handler_mode_rx) = mpsc::sync_channel(5);
let (mgm_1_handler_mode_tx, mgm_1_handler_mode_rx) = mpsc::sync_channel(5);
let (mgm_0_handler_tc_tx, mgm_0_handler_tc_rx) = mpsc::sync_channel(10);
let (mgm_1_handler_tc_tx, mgm_1_handler_tc_rx) = mpsc::sync_channel(10);
let (pcdu_handler_tc_tx, pcdu_handler_tc_rx) = mpsc::sync_channel(30);
let (controller_tc_tx, controller_tc_rx) = mpsc::sync_channel(10);
let (_mgm_0_handler_mode_tx, mgm_0_handler_mode_rx) = mpsc::sync_channel(5);
let (_mgm_1_handler_mode_tx, mgm_1_handler_mode_rx) = mpsc::sync_channel(5);
let (pcdu_handler_mode_tx, pcdu_handler_mode_rx) = mpsc::sync_channel(5);
// Some request are targetable. This map is used to retrieve sender handles based on a target ID.
let mut request_map = GenericRequestRouter::default();
request_map
.composite_router_map
.insert(MGM0.id(), mgm_0_handler_composite_tx);
request_map
.composite_router_map
.insert(MGM1.id(), mgm_1_handler_composite_tx);
request_map
.composite_router_map
.insert(PCDU.id(), pcdu_handler_composite_tx);
// This helper structure is used by all telecommand providers which need to send telecommands
// to the TC source.
cfg_if::cfg_if! {
if #[cfg(not(feature = "heap_tmtc"))] {
let tc_sender_with_shared_pool =
PacketSenderWithSharedPool::new(tc_source_tx, shared_tc_pool_wrapper.clone());
let tc_in_mem_converter =
EcssTcCacher::Static(EcssTcInSharedPoolCacher::new(shared_tc_pool, 4096));
} else if #[cfg(feature = "heap_tmtc")] {
let tc_in_mem_converter = EcssTcCacher::Heap(EcssTcVecCacher::default());
}
}
// Create event handling components
// These sender handles are used to send event requests, for example to enable or disable
// certain events.
let (event_tx, event_rx) = mpsc::sync_channel(100);
let (event_request_tx, event_request_rx) = mpsc::channel::<EventRequestWithToken>();
// The event task is the core handler to perform the event routing and TM handling as specified
// in the sat-rs documentation.
let mut event_handler = EventHandler::new(tm_sink_tx.clone(), event_rx, event_request_rx);
let (pus_test_tx, pus_test_rx) = mpsc::sync_channel(20);
let (pus_event_tx, pus_event_rx) = mpsc::sync_channel(10);
let (pus_sched_tx, pus_sched_rx) = mpsc::sync_channel(50);
let (pus_hk_tx, pus_hk_rx) = mpsc::sync_channel(50);
let (pus_action_tx, pus_action_rx) = mpsc::sync_channel(50);
let (pus_mode_tx, pus_mode_rx) = mpsc::sync_channel(50);
let (_pus_action_reply_tx, pus_action_reply_rx) = mpsc::channel();
let (pus_hk_reply_tx, pus_hk_reply_rx) = mpsc::sync_channel(50);
let (pus_mode_reply_tx, pus_mode_reply_rx) = mpsc::sync_channel(30);
cfg_if::cfg_if! {
if #[cfg(not(feature = "heap_tmtc"))] {
let tc_releaser = TcReleaser::Static(tc_sender_with_shared_pool.clone());
} else if #[cfg(feature = "heap_tmtc")] {
let tc_releaser = TcReleaser::Heap(tc_source_tx.clone());
}
}
let pus_router = PusTcMpscRouter {
test_tc_sender: pus_test_tx,
event_tc_sender: pus_event_tx,
sched_tc_sender: pus_sched_tx,
hk_tc_sender: pus_hk_tx,
action_tc_sender: pus_action_tx,
mode_tc_sender: pus_mode_tx,
let (event_ctrl_tx, event_ctrl_rx) = mpsc::sync_channel(10);
let mut event_manager = EventManager {
ctrl_rx: event_ctrl_rx,
tm_tx: tm_sink_tx.clone(),
};
let pus_test_service = create_test_service(
tm_sender.clone(),
tc_in_mem_converter.clone(),
event_tx.clone(),
pus_test_rx,
);
let pus_scheduler_service = create_scheduler_service(
tm_sender.clone(),
tc_in_mem_converter.clone(),
tc_releaser,
pus_sched_rx,
create_sched_tc_pool(),
);
let pus_event_service = create_event_service(
tm_sender.clone(),
tc_in_mem_converter.clone(),
pus_event_rx,
event_request_tx,
);
let pus_action_service = create_action_service(
tm_sender.clone(),
tc_in_mem_converter.clone(),
pus_action_rx,
request_map.clone(),
pus_action_reply_rx,
);
let pus_hk_service = create_hk_service(
tm_sender.clone(),
tc_in_mem_converter.clone(),
pus_hk_rx,
request_map.clone(),
pus_hk_reply_rx,
);
let pus_mode_service = create_mode_service(
tm_sender.clone(),
tc_in_mem_converter.clone(),
pus_mode_rx,
request_map,
pus_mode_reply_rx,
);
let mut pus_stack = PusStack::new(
pus_test_service,
pus_hk_service,
pus_event_service,
pus_action_service,
pus_scheduler_service,
pus_mode_service,
);
cfg_if::cfg_if! {
if #[cfg(not(feature = "heap_tmtc"))] {
let mut tmtc_task = TcSourceTask::Static(TcSourceTaskStatic::new(
shared_tc_pool_wrapper.clone(),
tc_source_rx,
PusTcDistributor::new(tm_sender.clone(), pus_router),
));
let tc_sender = TmTcSender::Static(tc_sender_with_shared_pool);
let udp_tm_handler = StaticUdpTmHandler {
tm_rx: tm_server_rx,
tm_store: shared_tm_pool.clone(),
};
} else if #[cfg(feature = "heap_tmtc")] {
let mut tmtc_task = TcSourceTask::Heap(TcSourceTaskDynamic::new(
tc_source_rx,
PusTcDistributor::new(tm_sender.clone(), pus_router),
));
let tc_sender = TmTcSender::Heap(tc_source_tx.clone());
let udp_tm_handler = DynamicUdpTmHandler {
tm_rx: tm_server_rx,
};
}
}
let mut controller = Controller::new(controller_tc_rx, tm_sink_tx.clone(), event_ctrl_tx);
let ccsds_distributor = CcsdsDistributor::default();
let mut tc_source = TcSourceTask::new(tc_source_rx, ccsds_distributor);
tc_source.add_target(ComponentId::EpsPcdu, pcdu_handler_tc_tx);
tc_source.add_target(ComponentId::Controller, controller_tc_tx);
tc_source.add_target(ComponentId::AcsMgm0, mgm_0_handler_tc_tx);
tc_source.add_target(ComponentId::AcsMgm1, mgm_1_handler_tc_tx);
let tc_sender = TmTcSender::Normal(tc_source_tx.clone());
let udp_tm_handler = UdpTmHandlerWithChannel {
tm_rx: tm_server_rx,
};
let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT);
let udp_tc_server = UdpTcServer::new(UDP_SERVER.id(), sock_addr, 2048, tc_sender.clone())
.expect("creating UDP TMTC server failed");
let udp_tc_server = UdpTcServer::new(
ComponentId::UdpServer as u32,
sock_addr,
2048,
tc_sender.clone(),
)
.expect("creating UDP TMTC server failed");
let mut udp_tmtc_server = UdpTmtcServer {
udp_tc_server,
tm_handler: udp_tm_handler,
tm_handler: udp_tm_handler.into(),
};
let tcp_server_cfg = ServerConfig::new(
TCP_SERVER.id(),
ComponentId::TcpServer as u32,
sock_addr,
Duration::from_millis(400),
4096,
@@ -282,31 +126,18 @@ fn main() {
)
.expect("tcp server creation failed");
cfg_if::cfg_if! {
if #[cfg(not(feature = "heap_tmtc"))] {
let mut tm_sink = TmSink::Static(TmSinkStatic::new(
shared_tm_pool_wrapper,
sync_tm_tcp_source,
tm_sink_rx,
tm_server_tx,
));
} else if #[cfg(feature = "heap_tmtc")] {
let mut tm_sink = TmSink::Heap(TmSinkDynamic::new(
sync_tm_tcp_source,
tm_sink_rx,
tm_server_tx,
));
}
}
let mut tm_sink = TmSink::new(sync_tm_tcp_source, tm_sink_rx, tm_server_tx);
let shared_switch_set = Arc::new(Mutex::default());
let shared_switch_set = Arc::new(Mutex::new(SwitchSet::new_with_init_switches_unknown()));
let (switch_request_tx, switch_request_rx) = mpsc::sync_channel(20);
let switch_helper = PowerSwitchHelper::new(switch_request_tx, shared_switch_set.clone());
let shared_mgm_0_set = Arc::default();
let shared_mgm_1_set = Arc::default();
let mgm_0_mode_node = ModeRequestHandlerMpscBounded::new(MGM0.into(), mgm_0_handler_mode_rx);
let mgm_1_mode_node = ModeRequestHandlerMpscBounded::new(MGM1.into(), mgm_1_handler_mode_rx);
let mgm_0_mode_node =
ModeRequestHandlerMpscBounded::new(ComponentId::AcsMgm0 as u32, mgm_0_handler_mode_rx);
let mgm_1_mode_node =
ModeRequestHandlerMpscBounded::new(ComponentId::AcsMgm1 as u32, mgm_1_handler_mode_rx);
let (mgm_0_spi_interface, mgm_1_spi_interface) =
if let Some(sim_client) = opt_sim_client.as_mut() {
sim_client
@@ -330,28 +161,27 @@ fn main() {
)
};
let mut mgm_0_handler = MgmHandlerLis3Mdl::new(
MGM0,
ComponentId::AcsMgm0,
"MGM_0",
mgm_0_mode_node,
mgm_0_handler_composite_rx,
pus_hk_reply_tx.clone(),
mgm_0_handler_tc_rx,
tm_sink_tx.clone(),
switch_helper.clone(),
tm_sender.clone(),
mgm_0_spi_interface,
shared_mgm_0_set,
);
let mut mgm_1_handler = MgmHandlerLis3Mdl::new(
MGM1,
ComponentId::AcsMgm1,
"MGM_1",
mgm_1_mode_node,
mgm_1_handler_composite_rx,
pus_hk_reply_tx.clone(),
mgm_1_handler_tc_rx,
tm_sink_tx.clone(),
switch_helper.clone(),
tm_sender.clone(),
mgm_1_spi_interface,
shared_mgm_1_set,
);
// Connect PUS service to device handlers.
/*
connect_mode_nodes(
&mut pus_stack.mode_srv,
mgm_0_handler_mode_tx,
@@ -364,6 +194,7 @@ fn main() {
&mut mgm_1_handler,
pus_mode_reply_tx.clone(),
);
*/
let pcdu_serial_interface = if let Some(sim_client) = opt_sim_client.as_mut() {
sim_client.add_reply_recipient(satrs_minisim::SimComponent::Pcdu, pcdu_sim_reply_tx);
@@ -374,24 +205,24 @@ fn main() {
} else {
SerialSimInterfaceWrapper::Dummy(SerialInterfaceDummy::default())
};
let pcdu_mode_node = ModeRequestHandlerMpscBounded::new(PCDU.into(), pcdu_handler_mode_rx);
let pcdu_mode_node =
ModeRequestHandlerMpscBounded::new(ComponentId::Pcdu as u32, pcdu_handler_mode_rx);
let mut pcdu_handler = PcduHandler::new(
PCDU,
"PCDU",
pcdu_mode_node,
pcdu_handler_composite_rx,
pus_hk_reply_tx,
pcdu_handler_tc_rx,
tm_sink_tx.clone(),
switch_request_rx,
tm_sender.clone(),
pcdu_serial_interface,
shared_switch_set,
);
/*
connect_mode_nodes(
&mut pus_stack.mode_srv,
pcdu_handler_mode_tx.clone(),
&mut pcdu_handler,
pus_mode_reply_tx,
);
*/
// The PCDU is a critical component which should be in normal mode immediately.
pcdu_handler_mode_tx
@@ -406,12 +237,12 @@ fn main() {
info!("Starting TMTC and UDP task");
let jh_udp_tmtc = thread::Builder::new()
.name("SATRS tmtc-udp".to_string())
.name("TMTC & UDP".to_string())
.spawn(move || {
info!("Running UDP server on port {SERVER_PORT}");
loop {
udp_tmtc_server.periodic_operation();
tmtc_task.periodic_operation();
tc_source.periodic_operation();
thread::sleep(Duration::from_millis(FREQ_MS_UDP_TMTC));
}
})
@@ -419,7 +250,7 @@ fn main() {
info!("Starting TCP task");
let jh_tcp = thread::Builder::new()
.name("sat-rs tcp".to_string())
.name("TCP".to_string())
.spawn(move || {
info!("Running TCP server on port {SERVER_PORT}");
loop {
@@ -430,7 +261,7 @@ fn main() {
info!("Starting TM funnel task");
let jh_tm_funnel = thread::Builder::new()
.name("tm sink".to_string())
.name("TM SINK".to_string())
.spawn(move || loop {
tm_sink.operation();
})
@@ -441,7 +272,7 @@ fn main() {
info!("Starting UDP sim client task");
opt_jh_sim_client = Some(
thread::Builder::new()
.name("sat-rs sim adapter".to_string())
.name("SIM ADAPTER".to_string())
.spawn(move || loop {
if sim_client.operation() == HandlingStatus::Empty {
std::thread::sleep(Duration::from_millis(SIM_CLIENT_IDLE_DELAY_MS));
@@ -453,7 +284,7 @@ fn main() {
info!("Starting AOCS thread");
let jh_aocs = thread::Builder::new()
.name("sat-rs aocs".to_string())
.name("AOCS".to_string())
.spawn(move || loop {
mgm_0_handler.periodic_operation();
mgm_1_handler.periodic_operation();
@@ -463,12 +294,13 @@ fn main() {
info!("Starting EPS thread");
let jh_eps = thread::Builder::new()
.name("sat-rs eps".to_string())
.name("EPS".to_string())
.spawn(move || loop {
// TODO: We should introduce something like a fixed timeslot helper to allow a more
// declarative API. It would also be very useful for the AOCS task.
//
// TODO: The fixed timeslot handler exists.. use it.
// TODO: Why not just use sync code in the PCDU handler, and fully delay there?
pcdu_handler.periodic_operation(crate::eps::pcdu::OpCode::RegularOp);
thread::sleep(Duration::from_millis(50));
pcdu_handler.periodic_operation(crate::eps::pcdu::OpCode::PollAndRecvReplies);
@@ -478,13 +310,13 @@ fn main() {
})
.unwrap();
info!("Starting PUS handler thread");
let jh_pus_handler = thread::Builder::new()
.name("sat-rs pus".to_string())
info!("Starting controller thread");
let jh_controller_thread = thread::Builder::new()
.name("CTRL".to_string())
.spawn(move || loop {
event_handler.periodic_operation();
pus_stack.periodic_operation();
thread::sleep(Duration::from_millis(FREQ_MS_PUS_STACK));
controller.periodic_operation();
event_manager.periodic_operation();
thread::sleep(Duration::from_millis(FREQ_MS_CONTROLLER));
})
.unwrap();
@@ -504,7 +336,7 @@ fn main() {
}
jh_aocs.join().expect("Joining AOCS thread failed");
jh_eps.join().expect("Joining EPS thread failed");
jh_pus_handler
jh_controller_thread
.join()
.expect("Joining PUS handler thread failed");
}
+4 -31
View File
@@ -1,10 +1,8 @@
use std::{cell::RefCell, collections::VecDeque, sync::mpsc};
use satrs::{
pus::EcssTmSender,
queue::GenericSendError,
spacepackets::ecss::WritablePusPacket,
tmtc::{PacketAsVec, PacketHandler, PacketSenderWithSharedPool},
tmtc::{PacketAsVec, PacketHandler},
ComponentId,
};
@@ -14,8 +12,7 @@ pub struct MockSender(pub RefCell<VecDeque<PacketAsVec>>);
#[allow(dead_code)]
#[derive(Debug, Clone)]
pub enum TmTcSender {
Static(PacketSenderWithSharedPool),
Heap(mpsc::SyncSender<PacketAsVec>),
Normal(mpsc::SyncSender<PacketAsVec>),
Mock(MockSender),
}
@@ -29,37 +26,13 @@ impl TmTcSender {
}
}
impl EcssTmSender for TmTcSender {
fn send_tm(
&self,
sender_id: satrs::ComponentId,
tm: satrs::pus::PusTmVariant,
) -> Result<(), satrs::pus::EcssTmtcError> {
match self {
TmTcSender::Static(sync_sender) => sync_sender.send_tm(sender_id, tm),
TmTcSender::Heap(sync_sender) => match tm {
satrs::pus::PusTmVariant::InStore(_) => panic!("can not send TM in store"),
satrs::pus::PusTmVariant::Direct(pus_tm_creator) => sync_sender
.send(PacketAsVec::new(sender_id, pus_tm_creator.to_vec()?))
.map_err(|_| GenericSendError::RxDisconnected.into()),
},
TmTcSender::Mock(_) => Ok(()),
}
}
}
impl PacketHandler for TmTcSender {
type Error = GenericSendError;
fn handle_packet(&self, sender_id: ComponentId, packet: &[u8]) -> Result<(), Self::Error> {
match self {
TmTcSender::Static(packet_sender_with_shared_pool) => {
if let Err(e) = packet_sender_with_shared_pool.handle_packet(sender_id, packet) {
log::error!("Error sending packet via Static TM/TC sender: {:?}", e);
}
}
TmTcSender::Heap(sync_sender) => {
if let Err(e) = sync_sender.handle_packet(sender_id, packet) {
TmTcSender::Normal(sync_sender) => {
if let Err(e) = sync_sender.send(PacketAsVec::new(sender_id, packet.to_vec())) {
log::error!("Error sending packet via Heap TM/TC sender: {:?}", e);
}
}
+83 -107
View File
@@ -1,121 +1,97 @@
use models::{ccsds::CcsdsTcPacketOwned, ComponentId, TcHeader};
use satrs::{
pool::PoolProvider,
pus::HandlingStatus,
tmtc::{PacketAsVec, PacketInPool, SharedPacketPool},
spacepackets::{CcsdsPacketReader, ChecksumType},
tmtc::PacketAsVec,
};
use std::{
collections::HashMap,
sync::mpsc::{self, TryRecvError},
};
use std::sync::mpsc::{self, TryRecvError};
use crate::pus::PusTcDistributor;
// TC source components where static pools are the backing memory of the received telecommands.
pub struct TcSourceTaskStatic {
shared_tc_pool: SharedPacketPool,
tc_receiver: mpsc::Receiver<PacketInPool>,
/// We allocate this buffer from the heap to avoid a clippy warning on large enum variant
/// differences.
tc_buf: Box<[u8; 4096]>,
pus_distributor: PusTcDistributor,
}
#[allow(dead_code)]
impl TcSourceTaskStatic {
pub fn new(
shared_tc_pool: SharedPacketPool,
tc_receiver: mpsc::Receiver<PacketInPool>,
pus_receiver: PusTcDistributor,
) -> Self {
Self {
shared_tc_pool,
tc_receiver,
tc_buf: Box::new([0; 4096]),
pus_distributor: pus_receiver,
}
}
pub fn periodic_operation(&mut self) {
self.poll_tc();
}
pub fn poll_tc(&mut self) -> HandlingStatus {
// Right now, we only expect ECSS PUS packets.
// If packets like CFDP are expected, we might have to check the APID first.
match self.tc_receiver.try_recv() {
Ok(packet_in_pool) => {
let pool = self
.shared_tc_pool
.0
.read()
.expect("locking tc pool failed");
pool.read(&packet_in_pool.store_addr, self.tc_buf.as_mut_slice())
.expect("reading pool failed");
drop(pool);
self.pus_distributor
.handle_tc_packet_in_store(packet_in_pool, self.tc_buf.as_slice())
.ok();
HandlingStatus::HandledOne
}
Err(e) => match e {
TryRecvError::Empty => HandlingStatus::Empty,
TryRecvError::Disconnected => {
log::warn!("tmtc thread: sender disconnected");
HandlingStatus::Empty
}
},
}
}
}
pub type CcsdsDistributor = HashMap<ComponentId, std::sync::mpsc::SyncSender<CcsdsTcPacketOwned>>;
// TC source components where the heap is the backing memory of the received telecommands.
pub struct TcSourceTaskDynamic {
pub struct TcSourceTask {
pub tc_receiver: mpsc::Receiver<PacketAsVec>,
pus_distributor: PusTcDistributor,
}
#[allow(dead_code)]
impl TcSourceTaskDynamic {
pub fn new(tc_receiver: mpsc::Receiver<PacketAsVec>, pus_receiver: PusTcDistributor) -> Self {
Self {
tc_receiver,
pus_distributor: pus_receiver,
}
}
pub fn periodic_operation(&mut self) {
self.poll_tc();
}
pub fn poll_tc(&mut self) -> HandlingStatus {
// Right now, we only expect ECSS PUS packets.
// If packets like CFDP are expected, we might have to check the APID first.
match self.tc_receiver.try_recv() {
Ok(packet_as_vec) => {
self.pus_distributor
.handle_tc_packet_vec(packet_as_vec)
.ok();
HandlingStatus::HandledOne
}
Err(e) => match e {
TryRecvError::Empty => HandlingStatus::Empty,
TryRecvError::Disconnected => {
log::warn!("tmtc thread: sender disconnected");
HandlingStatus::Empty
}
},
}
}
}
#[allow(dead_code)]
pub enum TcSourceTask {
Static(TcSourceTaskStatic),
Heap(TcSourceTaskDynamic),
ccsds_distributor: CcsdsDistributor,
}
impl TcSourceTask {
pub fn new(
tc_receiver: mpsc::Receiver<PacketAsVec>,
ccsds_distributor: CcsdsDistributor,
) -> Self {
Self {
tc_receiver,
ccsds_distributor,
}
}
pub fn add_target(
&mut self,
target_id: ComponentId,
sender: mpsc::SyncSender<CcsdsTcPacketOwned>,
) {
self.ccsds_distributor.insert(target_id, sender);
}
pub fn periodic_operation(&mut self) {
match self {
TcSourceTask::Static(task) => task.periodic_operation(),
TcSourceTask::Heap(task) => task.periodic_operation(),
loop {
if self.poll_tc() == HandlingStatus::Empty {
break;
}
}
}
pub fn poll_tc(&mut self) -> HandlingStatus {
match self.tc_receiver.try_recv() {
Ok(packet) => {
log::debug!("received raw packet: {:?}", packet);
let ccsds_tc_reader_result =
CcsdsPacketReader::new(&packet.packet, Some(ChecksumType::WithCrc16));
if ccsds_tc_reader_result.is_err() {
log::warn!(
"received invalid CCSDS TC packet: {:?}",
ccsds_tc_reader_result.err()
);
// TODO: Send a dedicated TM packet.
return HandlingStatus::HandledOne;
}
let ccsds_tc_reader = ccsds_tc_reader_result.unwrap();
let tc_header_result =
postcard::take_from_bytes::<TcHeader>(ccsds_tc_reader.user_data());
if tc_header_result.is_err() {
log::warn!(
"received CCSDS TC packet with invalid TC header: {:?}",
tc_header_result.err()
);
// TODO: Send a dedicated TM packet.
return HandlingStatus::HandledOne;
}
let (tc_header, payload) = tc_header_result.unwrap();
if let Some(sender) = self.ccsds_distributor.get(&tc_header.target_id) {
log::debug!("sending TC packet to target ID: {:?}", tc_header.target_id);
sender
.send(CcsdsTcPacketOwned {
sp_header: *ccsds_tc_reader.sp_header(),
tc_header,
payload: payload.to_vec(),
})
.ok();
} else {
log::warn!("no TC handler for target ID {:?}", tc_header.target_id);
// TODO: Send a dedicated TM packet.
}
HandlingStatus::HandledOne
}
Err(e) => match e {
TryRecvError::Empty => HandlingStatus::Empty,
TryRecvError::Disconnected => {
log::warn!("tmtc thread: sender disconnected");
HandlingStatus::Empty
}
},
}
}
}
+14 -139
View File
@@ -4,18 +4,8 @@ use std::{
};
use arbitrary_int::{u11, u14};
use log::info;
use satrs::{
pool::PoolProvider,
spacepackets::{
ecss::{tm::PusTmZeroCopyWriter, PusPacket},
seq_count::SequenceCounter,
seq_count::SequenceCounterCcsdsSimple,
time::cds::MIN_CDS_FIELD_LEN,
CcsdsPacket,
},
tmtc::{PacketAsVec, PacketInPool, SharedPacketPool},
};
use models::ccsds::CcsdsTmPacketOwned;
use satrs::spacepackets::seq_count::{SequenceCounter, SequenceCounterCcsdsSimple};
use crate::interface::tcp::SyncTcpTmSource;
@@ -34,118 +24,22 @@ impl CcsdsSeqCounterMap {
}
}
pub struct TmFunnelCommon {
pub struct TmSink {
seq_counter_map: CcsdsSeqCounterMap,
msg_counter_map: HashMap<u8, u16>,
sync_tm_tcp_source: SyncTcpTmSource,
tm_funnel_rx: mpsc::Receiver<CcsdsTmPacketOwned>,
tm_server_tx: mpsc::SyncSender<CcsdsTmPacketOwned>,
}
impl TmFunnelCommon {
pub fn new(sync_tm_tcp_source: SyncTcpTmSource) -> Self {
impl TmSink {
pub fn new(
sync_tm_tcp_source: SyncTcpTmSource,
tm_funnel_rx: mpsc::Receiver<CcsdsTmPacketOwned>,
tm_server_tx: mpsc::SyncSender<CcsdsTmPacketOwned>,
) -> Self {
Self {
seq_counter_map: Default::default(),
msg_counter_map: Default::default(),
sync_tm_tcp_source,
}
}
// Applies common packet processing operations for PUS TM packets. This includes setting
// a sequence counter
fn apply_packet_processing(&mut self, mut zero_copy_writer: PusTmZeroCopyWriter) {
// zero_copy_writer.set_apid(PUS_APID);
zero_copy_writer.set_seq_count(
self.seq_counter_map
.get_and_increment(zero_copy_writer.apid()),
);
let entry = self
.msg_counter_map
.entry(zero_copy_writer.service_type_id())
.or_insert(0);
zero_copy_writer.set_msg_count(*entry);
if *entry == u16::MAX {
*entry = 0;
} else {
*entry += 1;
}
Self::packet_printout(&zero_copy_writer);
// This operation has to come last!
zero_copy_writer.finish();
}
fn packet_printout(tm: &PusTmZeroCopyWriter) {
info!(
"Sending PUS TM[{},{}] with APID {}",
tm.service_type_id(),
tm.message_subtype_id(),
tm.apid()
);
}
}
pub struct TmSinkStatic {
common: TmFunnelCommon,
shared_tm_store: SharedPacketPool,
tm_funnel_rx: mpsc::Receiver<PacketInPool>,
tm_server_tx: mpsc::SyncSender<PacketInPool>,
}
#[allow(dead_code)]
impl TmSinkStatic {
pub fn new(
shared_tm_store: SharedPacketPool,
sync_tm_tcp_source: SyncTcpTmSource,
tm_funnel_rx: mpsc::Receiver<PacketInPool>,
tm_server_tx: mpsc::SyncSender<PacketInPool>,
) -> Self {
Self {
common: TmFunnelCommon::new(sync_tm_tcp_source),
shared_tm_store,
tm_funnel_rx,
tm_server_tx,
}
}
pub fn operation(&mut self) {
if let Ok(pus_tm_in_pool) = self.tm_funnel_rx.recv() {
// Read the TM, set sequence counter and message counter, and finally update
// the CRC.
let shared_pool = self.shared_tm_store.0.clone();
let mut pool_guard = shared_pool.write().expect("Locking TM pool failed");
let mut tm_copy = Vec::new();
pool_guard
.modify(&pus_tm_in_pool.store_addr, |buf| {
let zero_copy_writer = PusTmZeroCopyWriter::new(buf, MIN_CDS_FIELD_LEN, true)
.expect("Creating TM zero copy writer failed");
self.common.apply_packet_processing(zero_copy_writer);
tm_copy = buf.to_vec()
})
.expect("Reading TM from pool failed");
self.tm_server_tx
.send(pus_tm_in_pool)
.expect("Sending TM to server failed");
// We could also do this step in the update closure, but I'd rather avoid this, could
// lead to nested locking.
self.common.sync_tm_tcp_source.add_tm(&tm_copy);
}
}
}
pub struct TmSinkDynamic {
common: TmFunnelCommon,
tm_funnel_rx: mpsc::Receiver<PacketAsVec>,
tm_server_tx: mpsc::SyncSender<PacketAsVec>,
}
#[allow(dead_code)]
impl TmSinkDynamic {
pub fn new(
sync_tm_tcp_source: SyncTcpTmSource,
tm_funnel_rx: mpsc::Receiver<PacketAsVec>,
tm_server_tx: mpsc::SyncSender<PacketAsVec>,
) -> Self {
Self {
common: TmFunnelCommon::new(sync_tm_tcp_source),
tm_funnel_rx,
tm_server_tx,
}
@@ -153,31 +47,12 @@ impl TmSinkDynamic {
pub fn operation(&mut self) {
if let Ok(mut tm) = self.tm_funnel_rx.recv() {
// Read the TM, set sequence counter and message counter, and finally update
// the CRC.
let zero_copy_writer =
PusTmZeroCopyWriter::new(&mut tm.packet, MIN_CDS_FIELD_LEN, true)
.expect("Creating TM zero copy writer failed");
self.common.apply_packet_processing(zero_copy_writer);
self.common.sync_tm_tcp_source.add_tm(&tm.packet);
tm.sp_header
.set_seq_count(self.seq_counter_map.get_and_increment(tm.sp_header.apid()));
self.sync_tm_tcp_source.add_tm(&tm.to_vec());
self.tm_server_tx
.send(tm)
.expect("Sending TM to server failed");
}
}
}
#[allow(dead_code)]
pub enum TmSink {
Static(TmSinkStatic),
Heap(TmSinkDynamic),
}
impl TmSink {
pub fn operation(&mut self) {
match self {
TmSink::Static(static_sink) => static_sink.operation(),
TmSink::Heap(dynamic_sink) => dynamic_sink.operation(),
}
}
}