continue
This commit is contained in:
@@ -147,5 +147,14 @@ pub enum DeviceMode {
|
||||
Normal = 2,
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
#[non_exhaustive]
|
||||
pub enum HkRequestType {
|
||||
OneShot,
|
||||
EnablePeriodic(core::time::Duration),
|
||||
DisablePeriodic,
|
||||
ModifyInterval(core::time::Duration),
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {}
|
||||
|
||||
@@ -1,22 +1,51 @@
|
||||
pub mod request {
|
||||
use crate::HkRequestType;
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Copy, serde::Serialize, serde::Deserialize)]
|
||||
pub enum HkId {
|
||||
Sensor,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Copy, serde::Serialize, serde::Deserialize)]
|
||||
pub struct HkRequest {
|
||||
pub id: HkId,
|
||||
pub req_type: HkRequestType,
|
||||
}
|
||||
|
||||
#[derive(serde::Serialize, serde::Deserialize, Clone, Copy, Debug)]
|
||||
pub enum Request {
|
||||
Ping,
|
||||
Hk(HkRequest),
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default, Debug, Copy, Clone, serde::Serialize, serde::Deserialize)]
|
||||
pub struct MgmData {
|
||||
pub valid: bool,
|
||||
pub x: f32,
|
||||
pub y: f32,
|
||||
pub z: f32,
|
||||
}
|
||||
|
||||
pub mod response {
|
||||
use crate::Message;
|
||||
use crate::{Message, mgm::MgmData};
|
||||
|
||||
#[derive(serde::Serialize, serde::Deserialize, Clone, Copy, Debug)]
|
||||
pub enum HkResponse {
|
||||
MgmData(MgmData),
|
||||
}
|
||||
|
||||
#[derive(serde::Serialize, serde::Deserialize, Clone, Copy, Debug)]
|
||||
pub enum Response {
|
||||
Ok,
|
||||
Hk(HkResponse),
|
||||
}
|
||||
|
||||
impl Message for Response {
|
||||
fn message_type(&self) -> crate::MessageType {
|
||||
match self {
|
||||
Response::Ok => crate::MessageType::Verification,
|
||||
Response::Hk(_hk_response) => crate::MessageType::Hk,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,9 +2,11 @@ use std::collections::HashMap;
|
||||
|
||||
use strum::IntoEnumIterator as _;
|
||||
|
||||
#[bitbybit::bitfield(u16, debug)]
|
||||
#[bitbybit::bitfield(u16, debug, default = 0x0)]
|
||||
#[derive(serde::Serialize, serde::Deserialize)]
|
||||
pub struct SwitchesBitfield {
|
||||
#[bit(2, rw)]
|
||||
magnetorquer: bool,
|
||||
#[bit(1, rw)]
|
||||
mgm1: bool,
|
||||
#[bit(0, rw)]
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use models::ccsds::{CcsdsTcPacketOwned, CcsdsTmPacketOwned};
|
||||
use models::mgm::MgmData;
|
||||
use models::pcdu::SwitchId;
|
||||
use models::{mgm, ComponentId};
|
||||
use satrs::hk::{HkRequest, HkRequestVariant};
|
||||
use models::{mgm, ComponentId, HkRequestType};
|
||||
use satrs::mode_tree::{ModeChild, ModeNode};
|
||||
use satrs::spacepackets::CcsdsPacketIdAndPsc;
|
||||
use satrs_example::{DeviceMode, TimestampHelper};
|
||||
@@ -26,8 +26,6 @@ use crate::ccsds::pack_ccsds_tm_packet_for_now;
|
||||
use crate::eps::PowerSwitchHelper;
|
||||
use crate::spi::SpiInterface;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
pub const NR_OF_DATA_AND_CFG_REGISTERS: usize = 14;
|
||||
|
||||
// Register adresses to access various bytes from the raw reply.
|
||||
@@ -35,12 +33,6 @@ pub const X_LOWBYTE_IDX: usize = 9;
|
||||
pub const Y_LOWBYTE_IDX: usize = 11;
|
||||
pub const Z_LOWBYTE_IDX: usize = 13;
|
||||
|
||||
#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
|
||||
#[repr(u32)]
|
||||
pub enum SetId {
|
||||
SensorData = 0,
|
||||
}
|
||||
|
||||
#[derive(Default, Debug, PartialEq, Eq)]
|
||||
pub enum TransitionState {
|
||||
#[default]
|
||||
@@ -117,19 +109,10 @@ impl SpiInterface for SpiSimInterfaceWrapper {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default, Debug, Copy, Clone, Serialize, Deserialize)]
|
||||
pub struct MgmData {
|
||||
pub valid: bool,
|
||||
pub x: f32,
|
||||
pub y: f32,
|
||||
pub z: f32,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct BufWrapper {
|
||||
tx_buf: [u8; 32],
|
||||
rx_buf: [u8; 32],
|
||||
tm_buf: [u8; 32],
|
||||
}
|
||||
|
||||
pub struct ModeHelpers {
|
||||
@@ -223,6 +206,9 @@ impl<ComInterface: SpiInterface> MgmHandlerLis3Mdl<ComInterface> {
|
||||
mgm::request::Request::Ping => {
|
||||
self.send_telemetry(Some(tc_id), mgm::response::Response::Ok)
|
||||
}
|
||||
mgm::request::Request::Hk(hk_request) => {
|
||||
self.handle_hk_request(Some(tc_id), &hk_request)
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
@@ -257,42 +243,23 @@ impl<ComInterface: SpiInterface> MgmHandlerLis3Mdl<ComInterface> {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn handle_hk_request(&mut self, _requestor_info: &MessageMetadata, hk_request: &HkRequest) {
|
||||
match hk_request.variant {
|
||||
HkRequestVariant::OneShot => {
|
||||
/*
|
||||
pub fn handle_hk_request(
|
||||
&mut self,
|
||||
tc_id: Option<CcsdsPacketIdAndPsc>,
|
||||
hk_request: &models::mgm::request::HkRequest,
|
||||
) {
|
||||
match hk_request.req_type {
|
||||
HkRequestType::OneShot => {
|
||||
let mgm_snapshot = *self.shared_mgm_set.lock().unwrap();
|
||||
if let Ok(hk_tm) = self.hk_helper.generate_hk_report_packet(
|
||||
self.stamp_helper.stamp(),
|
||||
SetId::SensorData as u32,
|
||||
&mut |hk_buf| {
|
||||
hk_buf[0] = mgm_snapshot.valid as u8;
|
||||
hk_buf[1..5].copy_from_slice(&mgm_snapshot.x.to_be_bytes());
|
||||
hk_buf[5..9].copy_from_slice(&mgm_snapshot.y.to_be_bytes());
|
||||
hk_buf[9..13].copy_from_slice(&mgm_snapshot.z.to_be_bytes());
|
||||
Ok(13)
|
||||
},
|
||||
&mut self.bufs.tm_buf,
|
||||
) {
|
||||
// TODO: If sending the TM fails, we should also send a failure reply.
|
||||
self.tm_sender
|
||||
.send_tm(self.id.id(), PusTmVariant::Direct(hk_tm))
|
||||
.expect("failed to send HK TM");
|
||||
self.hk_reply_tx
|
||||
.send(GenericMessage::new(
|
||||
*requestor_info,
|
||||
HkReply::new(hk_request.unique_id, HkReplyVariant::Ack),
|
||||
))
|
||||
.expect("failed to send HK reply");
|
||||
} else {
|
||||
// TODO: Send back failure reply. Need result code for this.
|
||||
log::error!("TM buffer too small to generate HK data");
|
||||
}
|
||||
*/
|
||||
self.send_telemetry(
|
||||
tc_id,
|
||||
mgm::response::Response::Hk(mgm::response::HkResponse::MgmData(mgm_snapshot)),
|
||||
)
|
||||
}
|
||||
HkRequestVariant::EnablePeriodic => todo!(),
|
||||
HkRequestVariant::DisablePeriodic => todo!(),
|
||||
HkRequestVariant::ModifyCollectionInterval(_) => todo!(),
|
||||
HkRequestType::EnablePeriodic(_duration) => todo!(),
|
||||
HkRequestType::DisablePeriodic => todo!(),
|
||||
HkRequestType::ModifyInterval(_duration) => todo!(),
|
||||
_ => todo!(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -549,6 +516,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
#[allow(dead_code)]
|
||||
pub struct MgmAssemblyMock(
|
||||
pub HashMap<satrs::ComponentId, mpsc::SyncSender<GenericMessage<ModeRequest>>>,
|
||||
);
|
||||
@@ -568,6 +536,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
#[allow(dead_code)]
|
||||
pub struct GroundMock {
|
||||
pub request_sender_map:
|
||||
HashMap<satrs::ComponentId, mpsc::SyncSender<GenericMessage<ModeRequest>>>,
|
||||
|
||||
@@ -7,7 +7,10 @@ use std::{
|
||||
use derive_new::new;
|
||||
use models::{
|
||||
ccsds::{CcsdsTcPacketOwned, CcsdsTmPacketOwned},
|
||||
pcdu::{self, SwitchId, SwitchMapBinary, SwitchMapBinaryWrapper, SwitchRequest, SwitchState},
|
||||
pcdu::{
|
||||
self, SwitchId, SwitchMapBinary, SwitchMapBinaryWrapper, SwitchRequest, SwitchState,
|
||||
SwitchStateBinary, SwitchesBitfield,
|
||||
},
|
||||
ComponentId,
|
||||
};
|
||||
use num_enum::{IntoPrimitive, TryFromPrimitive};
|
||||
@@ -50,6 +53,22 @@ impl SwitchSet {
|
||||
Self::new(wrapper.0)
|
||||
}
|
||||
|
||||
pub fn as_bitfield(&self) -> Option<SwitchesBitfield> {
|
||||
for entry in SwitchId::iter() {
|
||||
if !self.switch_map.contains_key(&entry) {
|
||||
return None;
|
||||
}
|
||||
}
|
||||
Some(
|
||||
SwitchesBitfield::builder()
|
||||
.with_magnetorquer(*self.switch_map.get(&SwitchId::Mgt).unwrap() == SwitchState::On)
|
||||
.with_mgm1(*self.switch_map.get(&SwitchId::Mgm1).unwrap() == SwitchState::On)
|
||||
.with_mgm0(*self.switch_map.get(&SwitchId::Mgm0).unwrap() == SwitchState::On)
|
||||
.build(),
|
||||
)
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub fn set_switch_state(&mut self, switch_id: SwitchId, state: SwitchState) -> bool {
|
||||
if !self.switch_map.contains_key(&switch_id) {
|
||||
return false;
|
||||
@@ -74,6 +93,7 @@ impl Default for SwitchMapWrapper {
|
||||
}
|
||||
|
||||
impl SwitchMapWrapper {
|
||||
#[allow(dead_code)]
|
||||
pub fn new_with_init_switches_off() -> Self {
|
||||
let mut switch_map = SwitchMap::default();
|
||||
for entry in SwitchId::iter() {
|
||||
@@ -322,9 +342,28 @@ impl<ComInterface: SerialInterface> PcduHandler<ComInterface> {
|
||||
pcdu::request::Request::Ping => {
|
||||
self.send_tm(Some(tc_id), pcdu::response::Response::Ok)
|
||||
}
|
||||
pcdu::request::Request::GetSwitches => {}
|
||||
pcdu::request::Request::EnableSwitches(switches) => todo!(),
|
||||
pcdu::request::Request::DisableSwitches(switches) => todo!(),
|
||||
pcdu::request::Request::GetSwitches => self.send_tm(
|
||||
Some(tc_id),
|
||||
pcdu::response::Response::Switches(
|
||||
self.shared_switch_map
|
||||
.lock()
|
||||
.unwrap()
|
||||
.as_bitfield()
|
||||
.expect("could not build switches response"),
|
||||
),
|
||||
),
|
||||
pcdu::request::Request::EnableSwitches(switches) => {
|
||||
self.handle_switches_bitfield_request(
|
||||
switches,
|
||||
SwitchStateBinary::On,
|
||||
);
|
||||
}
|
||||
pcdu::request::Request::DisableSwitches(switches) => {
|
||||
self.handle_switches_bitfield_request(
|
||||
switches,
|
||||
SwitchStateBinary::Off,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
@@ -342,6 +381,22 @@ impl<ComInterface: SerialInterface> PcduHandler<ComInterface> {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn handle_switches_bitfield_request(
|
||||
&mut self,
|
||||
switches: SwitchesBitfield,
|
||||
state: SwitchStateBinary,
|
||||
) {
|
||||
if switches.mgm0() {
|
||||
self.handle_device_switching(SwitchId::Mgm0, state);
|
||||
}
|
||||
if switches.mgm1() {
|
||||
self.handle_device_switching(SwitchId::Mgm1, state);
|
||||
}
|
||||
if switches.magnetorquer() {
|
||||
self.handle_device_switching(SwitchId::Mgt, state);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn send_tm(&self, tc_id: Option<CcsdsPacketIdAndPsc>, response: pcdu::response::Response) {
|
||||
match pack_ccsds_tm_packet_for_now(ComponentId::Pcdu, tc_id, &response) {
|
||||
Ok(packet) => {
|
||||
@@ -394,18 +449,25 @@ impl<ComInterface: SerialInterface> PcduHandler<ComInterface> {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn handle_device_switching(&mut self, switch_id: SwitchId, state: SwitchStateBinary) {
|
||||
let pcdu_req = PcduRequest::SwitchDevice {
|
||||
switch: switch_id,
|
||||
state,
|
||||
};
|
||||
let pcdu_req_ser = serde_json::to_string(&pcdu_req).unwrap();
|
||||
self.com_interface
|
||||
.send(pcdu_req_ser.as_bytes())
|
||||
.expect("failed to send switch request to PCDU");
|
||||
}
|
||||
|
||||
pub fn handle_switch_requests(&mut self) {
|
||||
loop {
|
||||
match self.switch_request_rx.try_recv() {
|
||||
Ok(switch_req) => {
|
||||
let pcdu_req = PcduRequest::SwitchDevice {
|
||||
switch: switch_req.message.switch_id(),
|
||||
state: switch_req.message.target_state(),
|
||||
};
|
||||
let pcdu_req_ser = serde_json::to_string(&pcdu_req).unwrap();
|
||||
self.com_interface
|
||||
.send(pcdu_req_ser.as_bytes())
|
||||
.expect("failed to send switch request to PCDU");
|
||||
self.handle_device_switching(
|
||||
switch_req.message.switch_id(),
|
||||
switch_req.message.target_state(),
|
||||
);
|
||||
}
|
||||
Err(e) => match e {
|
||||
mpsc::TryRecvError::Empty => break,
|
||||
@@ -580,6 +642,7 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub struct PcduTestbench {
|
||||
pub mode_request_tx: mpsc::SyncSender<GenericMessage<ModeRequest>>,
|
||||
pub mode_reply_rx_to_pus: mpsc::Receiver<GenericMessage<ModeReply>>,
|
||||
|
||||
@@ -83,11 +83,11 @@ fn main() {
|
||||
let mut controller = Controller::new(controller_tc_rx, tm_sink_tx.clone());
|
||||
|
||||
let ccsds_distributor = CcsdsDistributor::default();
|
||||
let mut tmtc_task = TcSourceTask::new(tc_source_rx, ccsds_distributor);
|
||||
tmtc_task.add_target(ComponentId::EpsPcdu, pcdu_handler_tc_tx);
|
||||
tmtc_task.add_target(ComponentId::Controller, controller_tc_tx);
|
||||
tmtc_task.add_target(ComponentId::AcsMgm0, mgm_0_handler_tc_tx);
|
||||
tmtc_task.add_target(ComponentId::AcsMgm1, mgm_1_handler_tc_tx);
|
||||
let mut tc_source = TcSourceTask::new(tc_source_rx, ccsds_distributor);
|
||||
tc_source.add_target(ComponentId::EpsPcdu, pcdu_handler_tc_tx);
|
||||
tc_source.add_target(ComponentId::Controller, controller_tc_tx);
|
||||
tc_source.add_target(ComponentId::AcsMgm0, mgm_0_handler_tc_tx);
|
||||
tc_source.add_target(ComponentId::AcsMgm1, mgm_1_handler_tc_tx);
|
||||
|
||||
let tc_sender = TmTcSender::Normal(tc_source_tx.clone());
|
||||
let udp_tm_handler = UdpTmHandlerWithChannel {
|
||||
@@ -239,7 +239,7 @@ fn main() {
|
||||
info!("Running UDP server on port {SERVER_PORT}");
|
||||
loop {
|
||||
udp_tmtc_server.periodic_operation();
|
||||
tmtc_task.periodic_operation();
|
||||
tc_source.periodic_operation();
|
||||
thread::sleep(Duration::from_millis(FREQ_MS_UDP_TMTC));
|
||||
}
|
||||
})
|
||||
|
||||
@@ -26,27 +26,6 @@ impl TmTcSender {
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
impl EcssTmSender for TmTcSender {
|
||||
fn send_tm(
|
||||
&self,
|
||||
sender_id: satrs::ComponentId,
|
||||
tm: satrs::pus::PusTmVariant,
|
||||
) -> Result<(), satrs::pus::EcssTmtcError> {
|
||||
match self {
|
||||
//TmTcSender::Static(sync_sender) => sync_sender.send_tm(sender_id, tm),
|
||||
TmTcSender::Heap(sync_sender) => match tm {
|
||||
satrs::pus::PusTmVariant::InStore(_) => panic!("can not send TM in store"),
|
||||
satrs::pus::PusTmVariant::Direct(pus_tm_creator) => sync_sender
|
||||
.send(PacketAsVec::new(sender_id, pus_tm_creator.to_vec()?))
|
||||
.map_err(|_| GenericSendError::RxDisconnected.into()),
|
||||
},
|
||||
TmTcSender::Mock(_) => Ok(()),
|
||||
}
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
impl PacketHandler for TmTcSender {
|
||||
type Error = GenericSendError;
|
||||
|
||||
|
||||
@@ -9,66 +9,6 @@ use std::{
|
||||
sync::mpsc::{self, TryRecvError},
|
||||
};
|
||||
|
||||
// TC source components where static pools are the backing memory of the received telecommands.
|
||||
/*
|
||||
pub struct TcSourceTaskStatic {
|
||||
shared_tc_pool: SharedPacketPool,
|
||||
tc_receiver: mpsc::Receiver<PacketInPool>,
|
||||
/// We allocate this buffer from the heap to avoid a clippy warning on large enum variant
|
||||
/// differences.
|
||||
tc_buf: Box<[u8; 4096]>,
|
||||
pus_distributor: PusTcDistributor,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
impl TcSourceTaskStatic {
|
||||
pub fn new(
|
||||
shared_tc_pool: SharedPacketPool,
|
||||
tc_receiver: mpsc::Receiver<PacketInPool>,
|
||||
pus_receiver: PusTcDistributor,
|
||||
) -> Self {
|
||||
Self {
|
||||
shared_tc_pool,
|
||||
tc_receiver,
|
||||
tc_buf: Box::new([0; 4096]),
|
||||
pus_distributor: pus_receiver,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn periodic_operation(&mut self) {
|
||||
self.poll_tc();
|
||||
}
|
||||
|
||||
pub fn poll_tc(&mut self) -> HandlingStatus {
|
||||
// Right now, we only expect ECSS PUS packets.
|
||||
// If packets like CFDP are expected, we might have to check the APID first.
|
||||
match self.tc_receiver.try_recv() {
|
||||
Ok(packet_in_pool) => {
|
||||
let pool = self
|
||||
.shared_tc_pool
|
||||
.0
|
||||
.read()
|
||||
.expect("locking tc pool failed");
|
||||
pool.read(&packet_in_pool.store_addr, self.tc_buf.as_mut_slice())
|
||||
.expect("reading pool failed");
|
||||
drop(pool);
|
||||
self.pus_distributor
|
||||
.handle_tc_packet_in_store(packet_in_pool, self.tc_buf.as_slice())
|
||||
.ok();
|
||||
HandlingStatus::HandledOne
|
||||
}
|
||||
Err(e) => match e {
|
||||
TryRecvError::Empty => HandlingStatus::Empty,
|
||||
TryRecvError::Disconnected => {
|
||||
log::warn!("tmtc thread: sender disconnected");
|
||||
HandlingStatus::Empty
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
pub type CcsdsDistributor = HashMap<ComponentId, std::sync::mpsc::SyncSender<CcsdsTcPacketOwned>>;
|
||||
|
||||
// TC source components where the heap is the backing memory of the received telecommands.
|
||||
@@ -77,7 +17,6 @@ pub struct TcSourceTask {
|
||||
ccsds_distributor: CcsdsDistributor,
|
||||
}
|
||||
|
||||
//#[allow(dead_code)]
|
||||
impl TcSourceTask {
|
||||
pub fn new(
|
||||
tc_receiver: mpsc::Receiver<PacketAsVec>,
|
||||
|
||||
@@ -4,13 +4,8 @@ use std::{
|
||||
};
|
||||
|
||||
use arbitrary_int::{u11, u14};
|
||||
use log::info;
|
||||
use models::ccsds::CcsdsTmPacketOwned;
|
||||
use satrs::spacepackets::{
|
||||
ecss::{tm::PusTmZeroCopyWriter, PusPacket},
|
||||
seq_count::{SequenceCounter, SequenceCounterCcsdsSimple},
|
||||
CcsdsPacket,
|
||||
};
|
||||
use satrs::spacepackets::seq_count::{SequenceCounter, SequenceCounterCcsdsSimple};
|
||||
|
||||
use crate::interface::tcp::SyncTcpTmSource;
|
||||
|
||||
@@ -29,113 +24,13 @@ impl CcsdsSeqCounterMap {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct TmFunnelCommon {
|
||||
seq_counter_map: CcsdsSeqCounterMap,
|
||||
msg_counter_map: HashMap<u8, u16>,
|
||||
sync_tm_tcp_source: SyncTcpTmSource,
|
||||
}
|
||||
|
||||
impl TmFunnelCommon {
|
||||
pub fn new(sync_tm_tcp_source: SyncTcpTmSource) -> Self {
|
||||
Self {
|
||||
seq_counter_map: Default::default(),
|
||||
msg_counter_map: Default::default(),
|
||||
sync_tm_tcp_source,
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Replace pus tm zero copy writer by CCSDS zero copy writer.
|
||||
// Applies common packet processing operations for PUS TM packets. This includes setting
|
||||
// a sequence counter
|
||||
fn apply_packet_processing(&mut self, mut zero_copy_writer: PusTmZeroCopyWriter) {
|
||||
// zero_copy_writer.set_apid(PUS_APID);
|
||||
zero_copy_writer.set_seq_count(
|
||||
self.seq_counter_map
|
||||
.get_and_increment(zero_copy_writer.apid()),
|
||||
);
|
||||
let entry = self
|
||||
.msg_counter_map
|
||||
.entry(zero_copy_writer.service_type_id())
|
||||
.or_insert(0);
|
||||
zero_copy_writer.set_msg_count(*entry);
|
||||
if *entry == u16::MAX {
|
||||
*entry = 0;
|
||||
} else {
|
||||
*entry += 1;
|
||||
}
|
||||
|
||||
Self::packet_printout(&zero_copy_writer);
|
||||
// This operation has to come last!
|
||||
zero_copy_writer.finish();
|
||||
}
|
||||
|
||||
fn packet_printout(tm: &PusTmZeroCopyWriter) {
|
||||
info!(
|
||||
"Sending PUS TM[{},{}] with APID {}",
|
||||
tm.service_type_id(),
|
||||
tm.message_subtype_id(),
|
||||
tm.apid()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
pub struct TmSinkStatic {
|
||||
common: TmFunnelCommon,
|
||||
shared_tm_store: SharedPacketPool,
|
||||
tm_funnel_rx: mpsc::Receiver<PacketInPool>,
|
||||
tm_server_tx: mpsc::SyncSender<PacketInPool>,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
impl TmSinkStatic {
|
||||
pub fn new(
|
||||
shared_tm_store: SharedPacketPool,
|
||||
sync_tm_tcp_source: SyncTcpTmSource,
|
||||
tm_funnel_rx: mpsc::Receiver<PacketInPool>,
|
||||
tm_server_tx: mpsc::SyncSender<PacketInPool>,
|
||||
) -> Self {
|
||||
Self {
|
||||
common: TmFunnelCommon::new(sync_tm_tcp_source),
|
||||
shared_tm_store,
|
||||
tm_funnel_rx,
|
||||
tm_server_tx,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn operation(&mut self) {
|
||||
if let Ok(pus_tm_in_pool) = self.tm_funnel_rx.recv() {
|
||||
// Read the TM, set sequence counter and message counter, and finally update
|
||||
// the CRC.
|
||||
let shared_pool = self.shared_tm_store.0.clone();
|
||||
let mut pool_guard = shared_pool.write().expect("Locking TM pool failed");
|
||||
let mut tm_copy = Vec::new();
|
||||
pool_guard
|
||||
.modify(&pus_tm_in_pool.store_addr, |buf| {
|
||||
let zero_copy_writer = PusTmZeroCopyWriter::new(buf, MIN_CDS_FIELD_LEN, true)
|
||||
.expect("Creating TM zero copy writer failed");
|
||||
self.common.apply_packet_processing(zero_copy_writer);
|
||||
tm_copy = buf.to_vec()
|
||||
})
|
||||
.expect("Reading TM from pool failed");
|
||||
self.tm_server_tx
|
||||
.send(pus_tm_in_pool)
|
||||
.expect("Sending TM to server failed");
|
||||
// We could also do this step in the update closure, but I'd rather avoid this, could
|
||||
// lead to nested locking.
|
||||
self.common.sync_tm_tcp_source.add_tm(&tm_copy);
|
||||
}
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
pub struct TmSink {
|
||||
common: TmFunnelCommon,
|
||||
seq_counter_map: CcsdsSeqCounterMap,
|
||||
sync_tm_tcp_source: SyncTcpTmSource,
|
||||
tm_funnel_rx: mpsc::Receiver<CcsdsTmPacketOwned>,
|
||||
tm_server_tx: mpsc::SyncSender<CcsdsTmPacketOwned>,
|
||||
}
|
||||
|
||||
//#[allow(dead_code)]
|
||||
impl TmSink {
|
||||
pub fn new(
|
||||
sync_tm_tcp_source: SyncTcpTmSource,
|
||||
@@ -143,7 +38,8 @@ impl TmSink {
|
||||
tm_server_tx: mpsc::SyncSender<CcsdsTmPacketOwned>,
|
||||
) -> Self {
|
||||
Self {
|
||||
common: TmFunnelCommon::new(sync_tm_tcp_source),
|
||||
seq_counter_map: Default::default(),
|
||||
sync_tm_tcp_source,
|
||||
tm_funnel_rx,
|
||||
tm_server_tx,
|
||||
}
|
||||
@@ -151,41 +47,12 @@ impl TmSink {
|
||||
|
||||
pub fn operation(&mut self) {
|
||||
if let Ok(mut tm) = self.tm_funnel_rx.recv() {
|
||||
//let tm_raw = tm.to_vec();
|
||||
tm.sp_header.set_seq_count(
|
||||
self.common
|
||||
.seq_counter_map
|
||||
.get_and_increment(tm.sp_header.apid()),
|
||||
);
|
||||
// Read the TM, set sequence counter and message counter, and finally update
|
||||
// the CRC.
|
||||
/*
|
||||
let zero_copy_writer =
|
||||
PusTmZeroCopyWriter::new(&mut tm_raw, MIN_CDS_FIELD_LEN, true)
|
||||
.expect("Creating TM zero copy writer failed");
|
||||
*/
|
||||
//self.common.apply_packet_processing(zero_copy_writer);
|
||||
self.common.sync_tm_tcp_source.add_tm(&tm.to_vec());
|
||||
tm.sp_header
|
||||
.set_seq_count(self.seq_counter_map.get_and_increment(tm.sp_header.apid()));
|
||||
self.sync_tm_tcp_source.add_tm(&tm.to_vec());
|
||||
self.tm_server_tx
|
||||
.send(tm)
|
||||
.expect("Sending TM to server failed");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//#[allow(dead_code)]
|
||||
//pub enum TmSink {
|
||||
//Static(TmSinkStatic),
|
||||
//Heap(TmSinkDynamic),
|
||||
//}
|
||||
|
||||
/*
|
||||
impl TmSink {
|
||||
pub fn operation(&mut self) {
|
||||
match self {
|
||||
TmSink::Static(static_sink) => static_sink.operation(),
|
||||
TmSink::Heap(dynamic_sink) => dynamic_sink.operation(),
|
||||
}
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
Reference in New Issue
Block a user