added pus components and ping

This commit is contained in:
lkoester
2024-04-09 11:18:54 +02:00
parent 0bcd0c2ca3
commit 3b94a125ef
23 changed files with 2081 additions and 15 deletions

View File

@ -2,6 +2,9 @@ use lazy_static::lazy_static;
use satrs::spacepackets::{PacketId, PacketType};
use std::{collections::HashSet, net::Ipv4Addr};
use strum::IntoEnumIterator;
use satrs_mib::resultcode;
use satrs_mib::res_code::ResultU16Info;
use num_enum::{IntoPrimitive, TryFromPrimitive};
pub const OBSW_SERVER_ADDR: Ipv4Addr = Ipv4Addr::UNSPECIFIED;
pub const SERVER_PORT: u16 = 7301;
@ -23,6 +26,56 @@ lazy_static! {
};
}
#[derive(Copy, Clone, PartialEq, Eq, Debug, TryFromPrimitive, IntoPrimitive)]
#[repr(u8)]
pub enum CustomPusServiceId {
Mode = 200,
Health = 201,
}
#[derive(Debug)]
pub enum GroupId {
Tmtc = 0,
Hk = 1,
Mode = 2,
}
pub mod tmtc_err {
use satrs::res_code::ResultU16;
use super::*;
#[resultcode]
pub const INVALID_PUS_SERVICE: ResultU16 = ResultU16::new(GroupId::Tmtc as u8, 0);
#[resultcode]
pub const INVALID_PUS_SUBSERVICE: ResultU16 = ResultU16::new(GroupId::Tmtc as u8, 1);
#[resultcode]
pub const PUS_SERVICE_NOT_IMPLEMENTED: ResultU16 = ResultU16::new(GroupId::Tmtc as u8, 2);
#[resultcode]
pub const PUS_SUBSERVICE_NOT_IMPLEMENTED: ResultU16 = ResultU16::new(GroupId::Tmtc as u8, 3);
#[resultcode]
pub const UNKNOWN_TARGET_ID: ResultU16 = ResultU16::new(GroupId::Tmtc as u8, 4);
#[resultcode]
pub const ROUTING_ERROR: ResultU16 = ResultU16::new(GroupId::Tmtc as u8, 5);
#[resultcode(info = "Request timeout for targeted PUS request. P1: Request ID. P2: Target ID")]
pub const REQUEST_TIMEOUT: ResultU16 = ResultU16::new(GroupId::Tmtc as u8, 6);
#[resultcode(
info = "Not enough data inside the TC application data field. Optionally includes: \
8 bytes of failure data containing 2 failure parameters, \
P1 (u32 big endian): Expected data length, P2: Found data length"
)]
pub const NOT_ENOUGH_APP_DATA: ResultU16 = ResultU16::new(GroupId::Tmtc as u8, 2);
pub const TMTC_RESULTS: &[ResultU16Info] = &[
INVALID_PUS_SERVICE_EXT,
INVALID_PUS_SUBSERVICE_EXT,
PUS_SERVICE_NOT_IMPLEMENTED_EXT,
UNKNOWN_TARGET_ID_EXT,
ROUTING_ERROR_EXT,
NOT_ENOUGH_APP_DATA_EXT,
];
}
pub mod components {
use satrs::request::UniqueApidTargetId;
use strum::EnumIter;

2
src/interface/mod.rs Normal file
View File

@ -0,0 +1,2 @@
pub mod tcp;
pub mod udp;

View File

@ -1,3 +1,34 @@
use std::net::Ipv4Addr;
use satrs::spacepackets::time::cds::CdsTime;
use satrs::spacepackets::time::TimeWriter;
pub mod config;
pub struct TimeStampHelper {
stamper: CdsTime,
time_stamp: [u8; 7],
}
impl TimeStampHelper {
pub fn stamp(&self) -> &[u8] {
&self.time_stamp
}
pub fn update_from_now(&mut self) {
self.stamper
.update_from_now()
.expect("Updating timestamp failed");
self.stamper
.write_to_bytes(&mut self.time_stamp)
.expect("Writing timestamp failed");
}
}
impl Default for TimeStampHelper {
fn default() -> Self {
Self {
stamper: CdsTime::now_with_u16_days().expect("creating time stamper failed"),
time_stamp: Default::default(),
}
}
}

View File

@ -13,37 +13,113 @@ use satrs::{
hal::std::{tcp_server::ServerConfig, udp_server::UdpTcServer},
tmtc::CcsdsDistributor,
};
use ops_sat_rs::config::tasks::FREQ_MS_PUS_STACK;
use crate::{
ccsds::CcsdsReceiver,
logger::setup_logger,
tcp::{SyncTcpTmSource, TcpTask},
interface::tcp::{SyncTcpTmSource, TcpTask},
tmtc::PusTcSourceProviderDynamic,
udp::{DynamicUdpTmHandler, UdpTmtcServer},
interface::udp::{DynamicUdpTmHandler, UdpTmtcServer},
};
use crate::pus::{PusReceiver, PusTcMpscRouter};
use crate::pus::stack::PusStack;
use crate::pus::test::create_test_service_dynamic;
use crate::requests::GenericRequestRouter;
use crate::tm_funnel::TmFunnelDynamic;
use crate::tmtc::TcSourceTaskDynamic;
mod ccsds;
mod logger;
mod tcp;
mod tmtc;
mod udp;
mod requests;
mod pus;
mod tm_funnel;
mod interface;
#[allow(dead_code)]
fn main() {
setup_logger().expect("setting up logging with fern failed");
println!("OPS-SAT Rust experiment OBSW");
let (tc_source_tx, tc_source_rx) = mpsc::channel();
let (tm_funnel_tx, tm_funnel_rx) = mpsc::channel();
let (tm_server_tx, tm_server_rx) = mpsc::channel();
let tc_source = PusTcSourceProviderDynamic(tc_source_tx);
let (pus_test_tx, pus_test_rx) = mpsc::channel();
// let (pus_event_tx, pus_event_rx) = mpsc::channel();
// let (pus_sched_tx, pus_sched_rx) = mpsc::channel();
// let (pus_hk_tx, pus_hk_rx) = mpsc::channel();
// let (pus_action_tx, pus_action_rx) = mpsc::channel();
// let (pus_mode_tx, pus_mode_rx) = mpsc::channel();
// let (_pus_action_reply_tx, pus_action_reply_rx) = mpsc::channel();
// let (pus_hk_reply_tx, pus_hk_reply_rx) = mpsc::channel();
// let (pus_mode_reply_tx, pus_mode_reply_rx) = mpsc::channel();
let pus_router = PusTcMpscRouter {
test_tc_sender: pus_test_tx,
// event_tc_sender: pus_event_tx,
// sched_tc_sender: pus_sched_tx,
// hk_tc_sender: pus_hk_tx,
// action_tc_sender: pus_action_tx,
// mode_tc_sender: pus_mode_tx,
};
let pus_test_service = create_test_service_dynamic(
tm_funnel_tx.clone(),
// event_handler.clone_event_sender(),
pus_test_rx,
);
// let pus_scheduler_service = create_scheduler_service_dynamic(
// tm_funnel_tx.clone(),
// tc_source.0.clone(),
// pus_sched_rx,
// create_sched_tc_pool(),
// );
//
// let pus_event_service =
// create_event_service_dynamic(tm_funnel_tx.clone(), pus_event_rx, event_request_tx);
// let pus_action_service = create_action_service_dynamic(
// tm_funnel_tx.clone(),
// pus_action_rx,
// request_map.clone(),
// pus_action_reply_rx,
// );
// let pus_hk_service = create_hk_service_dynamic(
// tm_funnel_tx.clone(),
// pus_hk_rx,
// request_map.clone(),
// pus_hk_reply_rx,
// );
// let pus_mode_service = create_mode_service_dynamic(
// tm_funnel_tx.clone(),
// pus_mode_rx,
// request_map,
// pus_mode_reply_rx,
// );
let mut pus_stack = PusStack::new(
pus_test_service,
// pus_hk_service,
// pus_event_service,
// pus_action_service,
// pus_scheduler_service,
// pus_mode_service,
);
let ccsds_receiver = CcsdsReceiver { tc_source };
let mut tmtc_task = TcSourceTaskDynamic::new(
tc_source_rx,
PusReceiver::new(tm_funnel_tx.clone(), pus_router),
);
let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT);
let udp_ccsds_distributor = CcsdsDistributor::new(ccsds_receiver.clone());
let udp_tc_server = UdpTcServer::new(sock_addr, 2048, Box::new(udp_ccsds_distributor))
.expect("creating UDP TMTC server failed");
let mut udp_tmtc_server = UdpTmtcServer {
udp_tc_server,
tm_handler: DynamicUdpTmHandler {
@ -60,7 +136,9 @@ fn main() {
tcp_ccsds_distributor,
PACKET_ID_VALIDATOR.clone(),
)
.expect("tcp server creation failed");
.expect("tcp server creation failed");
let mut tm_funnel = TmFunnelDynamic::new(sync_tm_tcp_source, tm_funnel_rx, tm_server_tx);
info!("Starting TMTC and UDP task");
let jh_udp_tmtc = thread::Builder::new()
@ -69,7 +147,7 @@ fn main() {
info!("Running UDP server on port {SERVER_PORT}");
loop {
udp_tmtc_server.periodic_operation();
// tmtc_task.periodic_operation();
tmtc_task.periodic_operation();
thread::sleep(Duration::from_millis(FREQ_MS_UDP_TMTC));
}
})
@ -86,10 +164,33 @@ fn main() {
})
.unwrap();
info!("Starting TM funnel task");
let jh_tm_funnel = thread::Builder::new()
.name("TM Funnel".to_string())
.spawn(move || loop {
tm_funnel.operation();
})
.unwrap();
info!("Starting PUS handler thread");
let jh_pus_handler = thread::Builder::new()
.name("PUS".to_string())
.spawn(move || loop {
pus_stack.periodic_operation();
thread::sleep(Duration::from_millis(FREQ_MS_PUS_STACK));
})
.unwrap();
jh_udp_tmtc
.join()
.expect("Joining UDP TMTC server thread failed");
jh_tcp
.join()
.expect("Joining TCP TMTC server thread failed");
}
jh_tm_funnel
.join()
.expect("Joining TM Funnel thread failed");
jh_pus_handler
.join()
.expect("Joining PUS handler thread failed");
}

707
src/pus/mod.rs Normal file
View File

@ -0,0 +1,707 @@
pub mod test;
pub mod stack;
use crate::requests::GenericRequestRouter;
use log::warn;
use satrs::pus::verification::{
self, FailParams, TcStateAccepted, TcStateStarted, VerificationReporter,
VerificationReporterCfg, VerificationReportingProvider, VerificationToken,
};
use satrs::pus::{
ActiveRequestMapProvider, ActiveRequestProvider, EcssTcAndToken, EcssTcInMemConverter,
EcssTcReceiverCore, EcssTmSenderCore, EcssTmtcError, GenericConversionError,
GenericRoutingError, PusPacketHandlerResult, PusPacketHandlingError, PusReplyHandler,
PusRequestRouter, PusServiceHelper, PusTcToRequestConverter, TcInMemory,
};
use satrs::queue::GenericReceiveError;
use satrs::request::{Apid, GenericMessage, MessageMetadata};
use satrs::spacepackets::ecss::tc::PusTcReader;
use satrs::spacepackets::ecss::PusServiceId;
use satrs::ComponentId;
use ops_sat_rs::config::{tmtc_err, CustomPusServiceId};
use ops_sat_rs::TimeStampHelper;
use std::fmt::Debug;
use std::sync::mpsc::{self, Sender};
use ops_sat_rs::config::components::PUS_ROUTING_SERVICE;
use crate::tmtc::MpscStoreAndSendError;
// pub mod action;
// pub mod event;
// pub mod hk;
// pub mod mode;
// pub mod scheduler;
// pub mod stack;
// pub mod test;
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
pub enum HandlingStatus {
Empty,
HandledOne,
}
pub fn create_verification_reporter(owner_id: ComponentId, apid: Apid) -> VerificationReporter {
let verif_cfg = VerificationReporterCfg::new(apid, 1, 2, 8).unwrap();
// Every software component which needs to generate verification telemetry, gets a cloned
// verification reporter.
VerificationReporter::new(owner_id, &verif_cfg)
}
/// Simple router structure which forwards PUS telecommands to dedicated handlers.
pub struct PusTcMpscRouter {
pub test_tc_sender: Sender<EcssTcAndToken>,
// pub event_tc_sender: Sender<EcssTcAndToken>,
// pub sched_tc_sender: Sender<EcssTcAndToken>,
// pub hk_tc_sender: Sender<EcssTcAndToken>,
// pub action_tc_sender: Sender<EcssTcAndToken>,
// pub mode_tc_sender: Sender<EcssTcAndToken>,
}
pub struct PusReceiver<TmSender: EcssTmSenderCore> {
pub id: ComponentId,
pub tm_sender: TmSender,
pub verif_reporter: VerificationReporter,
pub pus_router: PusTcMpscRouter,
stamp_helper: TimeStampHelper,
}
impl<TmSender: EcssTmSenderCore> PusReceiver<TmSender> {
pub fn new(tm_sender: TmSender, pus_router: PusTcMpscRouter) -> Self {
Self {
id: PUS_ROUTING_SERVICE.raw(),
tm_sender,
verif_reporter: create_verification_reporter(
PUS_ROUTING_SERVICE.id(),
PUS_ROUTING_SERVICE.apid,
),
pus_router,
stamp_helper: TimeStampHelper::default(),
}
}
pub fn handle_tc_packet(
&mut self,
tc_in_memory: TcInMemory,
service: u8,
pus_tc: &PusTcReader,
) -> Result<PusPacketHandlerResult, MpscStoreAndSendError> {
let init_token = self.verif_reporter.add_tc(pus_tc);
self.stamp_helper.update_from_now();
let accepted_token = self
.verif_reporter
.acceptance_success(&self.tm_sender, init_token, self.stamp_helper.stamp())
.expect("Acceptance success failure");
let service = PusServiceId::try_from(service);
match service {
Ok(standard_service) => match standard_service {
PusServiceId::Test => self.pus_router.test_tc_sender.send(EcssTcAndToken {
tc_in_memory,
token: Some(accepted_token.into()),
})?,
// PusServiceId::Housekeeping => {
// self.pus_router.hk_tc_sender.send(EcssTcAndToken {
// tc_in_memory,
// token: Some(accepted_token.into()),
// })?
// }
// PusServiceId::Event => self.pus_router.event_tc_sender.send(EcssTcAndToken {
// tc_in_memory,
// token: Some(accepted_token.into()),
// })?,
// PusServiceId::Scheduling => {
// self.pus_router.sched_tc_sender.send(EcssTcAndToken {
// tc_in_memory,
// token: Some(accepted_token.into()),
// })?
// }
_ => {
let result = self.verif_reporter.start_failure(
&self.tm_sender,
accepted_token,
FailParams::new(
self.stamp_helper.stamp(),
&tmtc_err::PUS_SERVICE_NOT_IMPLEMENTED,
&[standard_service as u8],
),
);
if result.is_err() {
warn!("Sending verification failure failed");
}
}
},
Err(e) => {
if let Ok(custom_service) = CustomPusServiceId::try_from(e.number) {
match custom_service {
CustomPusServiceId::Mode => {
// self.pus_router.mode_tc_sender.send(EcssTcAndToken {
// tc_in_memory,
// token: Some(accepted_token.into()),
// })?
}
CustomPusServiceId::Health => {}
}
} else {
self.verif_reporter
.start_failure(
&self.tm_sender,
accepted_token,
FailParams::new(
self.stamp_helper.stamp(),
&tmtc_err::INVALID_PUS_SUBSERVICE,
&[e.number],
),
)
.expect("Start failure verification failed")
}
}
}
Ok(PusPacketHandlerResult::RequestHandled)
}
}
pub trait TargetedPusService {
/// Returns [true] interface the packet handling is finished.
fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> bool;
fn poll_and_handle_next_reply(&mut self, time_stamp: &[u8]) -> HandlingStatus;
fn check_for_request_timeouts(&mut self);
}
/// This is a generic handler class for all PUS services where a PUS telecommand is converted
/// to a targeted request.
///
/// The generic steps for this process are the following
///
/// 1. Poll for TC packets
/// 2. Convert the raw packets to a [PusTcReader].
/// 3. Convert the PUS TC to a typed request using the [PusTcToRequestConverter].
/// 4. Route the requests using the [GenericRequestRouter].
/// 5. Add the request to the active request map using the [ActiveRequestMapProvider] abstraction.
/// 6. Check for replies which complete the forwarded request. The handler takes care of
/// the verification process.
/// 7. Check for timeouts of active requests. Generally, the timeout on the service level should
/// be highest expected timeout for the given target.
///
/// The handler exposes the following API:
///
/// 1. [Self::handle_one_tc] which tries to poll and handle one TC packet, covering steps 1-5.
/// 2. [Self::check_one_reply] which tries to poll and handle one reply, covering step 6.
/// 3. [Self::check_for_request_timeouts] which checks for request timeouts, covering step 7.
pub struct PusTargetedRequestService<
TcReceiver: EcssTcReceiverCore,
TmSender: EcssTmSenderCore,
TcInMemConverter: EcssTcInMemConverter,
VerificationReporter: VerificationReportingProvider,
RequestConverter: PusTcToRequestConverter<ActiveRequestInfo, RequestType, Error = GenericConversionError>,
ReplyHandler: PusReplyHandler<ActiveRequestInfo, ReplyType, Error = EcssTmtcError>,
ActiveRequestMap: ActiveRequestMapProvider<ActiveRequestInfo>,
ActiveRequestInfo: ActiveRequestProvider,
RequestType,
ReplyType,
> {
pub service_helper:
PusServiceHelper<TcReceiver, TmSender, TcInMemConverter, VerificationReporter>,
pub request_router: GenericRequestRouter,
pub request_converter: RequestConverter,
pub active_request_map: ActiveRequestMap,
pub reply_handler: ReplyHandler,
pub reply_receiver: mpsc::Receiver<GenericMessage<ReplyType>>,
phantom: std::marker::PhantomData<(RequestType, ActiveRequestInfo, ReplyType)>,
}
impl<
TcReceiver: EcssTcReceiverCore,
TmSender: EcssTmSenderCore,
TcInMemConverter: EcssTcInMemConverter,
VerificationReporter: VerificationReportingProvider,
RequestConverter: PusTcToRequestConverter<ActiveRequestInfo, RequestType, Error = GenericConversionError>,
ReplyHandler: PusReplyHandler<ActiveRequestInfo, ReplyType, Error = EcssTmtcError>,
ActiveRequestMap: ActiveRequestMapProvider<ActiveRequestInfo>,
ActiveRequestInfo: ActiveRequestProvider,
RequestType,
ReplyType,
>
PusTargetedRequestService<
TcReceiver,
TmSender,
TcInMemConverter,
VerificationReporter,
RequestConverter,
ReplyHandler,
ActiveRequestMap,
ActiveRequestInfo,
RequestType,
ReplyType,
>
where
GenericRequestRouter: PusRequestRouter<RequestType, Error = GenericRoutingError>,
{
pub fn new(
service_helper: PusServiceHelper<
TcReceiver,
TmSender,
TcInMemConverter,
VerificationReporter,
>,
request_converter: RequestConverter,
active_request_map: ActiveRequestMap,
reply_hook: ReplyHandler,
request_router: GenericRequestRouter,
reply_receiver: mpsc::Receiver<GenericMessage<ReplyType>>,
) -> Self {
Self {
service_helper,
request_converter,
active_request_map,
reply_handler: reply_hook,
request_router,
reply_receiver,
phantom: std::marker::PhantomData,
}
}
pub fn poll_and_handle_next_tc(
&mut self,
time_stamp: &[u8],
) -> Result<PusPacketHandlerResult, PusPacketHandlingError> {
let possible_packet = self.service_helper.retrieve_and_accept_next_packet()?;
if possible_packet.is_none() {
return Ok(PusPacketHandlerResult::Empty);
}
let ecss_tc_and_token = possible_packet.unwrap();
self.service_helper
.tc_in_mem_converter_mut()
.cache(&ecss_tc_and_token.tc_in_memory)?;
let tc = self.service_helper.tc_in_mem_converter().convert()?;
let (mut request_info, request) = match self.request_converter.convert(
ecss_tc_and_token.token,
&tc,
self.service_helper.tm_sender(),
&self.service_helper.common.verif_reporter,
time_stamp,
) {
Ok((info, req)) => (info, req),
Err(e) => {
self.handle_conversion_to_request_error(&e, ecss_tc_and_token.token, time_stamp);
return Err(e.into());
}
};
let accepted_token: VerificationToken<TcStateAccepted> = request_info
.token()
.try_into()
.expect("token not in expected accepted state");
let verif_request_id = verification::RequestId::new(&tc).raw();
match self.request_router.route(
MessageMetadata::new(verif_request_id, self.service_helper.id()),
request_info.target_id(),
request,
) {
Ok(()) => {
let started_token = self
.service_helper
.verif_reporter()
.start_success(
&self.service_helper.common.tm_sender,
accepted_token,
time_stamp,
)
.expect("Start success failure");
request_info.set_token(started_token.into());
self.active_request_map
.insert(&verif_request_id, request_info);
}
Err(e) => {
self.request_router.handle_error_generic(
&request_info,
&tc,
e.clone(),
self.service_helper.tm_sender(),
self.service_helper.verif_reporter(),
time_stamp,
);
return Err(e.into());
}
}
Ok(PusPacketHandlerResult::RequestHandled)
}
fn handle_conversion_to_request_error(
&mut self,
error: &GenericConversionError,
token: VerificationToken<TcStateAccepted>,
time_stamp: &[u8],
) {
match error {
GenericConversionError::WrongService(service) => {
let service_slice: [u8; 1] = [*service];
self.service_helper
.verif_reporter()
.completion_failure(
self.service_helper.tm_sender(),
token,
FailParams::new(time_stamp, &tmtc_err::INVALID_PUS_SERVICE, &service_slice),
)
.expect("Sending completion failure failed");
}
GenericConversionError::InvalidSubservice(subservice) => {
let subservice_slice: [u8; 1] = [*subservice];
self.service_helper
.verif_reporter()
.completion_failure(
self.service_helper.tm_sender(),
token,
FailParams::new(
time_stamp,
&tmtc_err::INVALID_PUS_SUBSERVICE,
&subservice_slice,
),
)
.expect("Sending completion failure failed");
}
GenericConversionError::NotEnoughAppData { expected, found } => {
let mut context_info = (*found as u32).to_be_bytes().to_vec();
context_info.extend_from_slice(&(*expected as u32).to_be_bytes());
self.service_helper
.verif_reporter()
.completion_failure(
self.service_helper.tm_sender(),
token,
FailParams::new(time_stamp, &tmtc_err::NOT_ENOUGH_APP_DATA, &context_info),
)
.expect("Sending completion failure failed");
}
// Do nothing.. this is service-level and can not be handled generically here.
GenericConversionError::InvalidAppData(_) => (),
}
}
pub fn poll_and_check_next_reply(
&mut self,
time_stamp: &[u8],
) -> Result<HandlingStatus, EcssTmtcError> {
match self.reply_receiver.try_recv() {
Ok(reply) => {
self.handle_reply(&reply, time_stamp)?;
Ok(HandlingStatus::HandledOne)
}
Err(e) => match e {
mpsc::TryRecvError::Empty => Ok(HandlingStatus::Empty),
mpsc::TryRecvError::Disconnected => Err(EcssTmtcError::Receive(
GenericReceiveError::TxDisconnected(None),
)),
},
}
}
pub fn handle_reply(
&mut self,
reply: &GenericMessage<ReplyType>,
time_stamp: &[u8],
) -> Result<(), EcssTmtcError> {
let active_req_opt = self.active_request_map.get(reply.request_id());
if active_req_opt.is_none() {
self.reply_handler
.handle_unrequested_reply(reply, &self.service_helper.common.tm_sender)?;
return Ok(());
}
let active_request = active_req_opt.unwrap();
let request_finished = self
.reply_handler
.handle_reply(
reply,
active_request,
&self.service_helper.common.tm_sender,
&self.service_helper.common.verif_reporter,
time_stamp,
)
.unwrap_or(false);
if request_finished {
self.active_request_map.remove(reply.request_id());
}
Ok(())
}
pub fn check_for_request_timeouts(&mut self) {
let mut requests_to_delete = Vec::new();
self.active_request_map
.for_each(|request_id, request_info| {
if request_info.has_timed_out() {
requests_to_delete.push(*request_id);
}
});
if !requests_to_delete.is_empty() {
for request_id in requests_to_delete {
self.active_request_map.remove(request_id);
}
}
}
}
/// Generic timeout handling: Handle the verification failure with a dedicated return code
/// and also log the error.
pub fn generic_pus_request_timeout_handler(
sender: &(impl EcssTmSenderCore + ?Sized),
active_request: &(impl ActiveRequestProvider + Debug),
verification_handler: &impl VerificationReportingProvider,
time_stamp: &[u8],
service_str: &'static str,
) -> Result<(), EcssTmtcError> {
log::warn!("timeout for active request {active_request:?} on {service_str} service");
let started_token: VerificationToken<TcStateStarted> = active_request
.token()
.try_into()
.expect("token not in expected started state");
verification_handler.completion_failure(
sender,
started_token,
FailParams::new(time_stamp, &tmtc_err::REQUEST_TIMEOUT, &[]),
)?;
Ok(())
}
#[cfg(test)]
pub(crate) mod tests {
use std::time::Duration;
use satrs::pus::test_util::TEST_COMPONENT_ID_0;
use satrs::pus::{MpscTmAsVecSender, PusTmAsVec, PusTmVariant};
use satrs::request::RequestId;
use satrs::{
pus::{
verification::test_util::TestVerificationReporter, ActivePusRequestStd,
ActiveRequestMapProvider, EcssTcInVecConverter, MpscTcReceiver,
},
request::UniqueApidTargetId,
spacepackets::{
ecss::{
tc::{PusTcCreator, PusTcSecondaryHeader},
WritablePusPacket,
},
SpHeader,
},
};
use crate::requests::CompositeRequest;
use super::*;
// Testbench dedicated to the testing of [PusReplyHandler]s
pub struct ReplyHandlerTestbench<
ReplyHandler: PusReplyHandler<ActiveRequestInfo, Reply, Error = EcssTmtcError>,
ActiveRequestInfo: ActiveRequestProvider,
Reply,
> {
pub id: ComponentId,
pub verif_reporter: TestVerificationReporter,
pub reply_handler: ReplyHandler,
pub tm_receiver: mpsc::Receiver<PusTmAsVec>,
pub default_timeout: Duration,
tm_sender: MpscTmAsVecSender,
phantom: std::marker::PhantomData<(ActiveRequestInfo, Reply)>,
}
impl<
ReplyHandler: PusReplyHandler<ActiveRequestInfo, Reply, Error = EcssTmtcError>,
ActiveRequestInfo: ActiveRequestProvider,
Reply,
> ReplyHandlerTestbench<ReplyHandler, ActiveRequestInfo, Reply>
{
pub fn new(owner_id: ComponentId, reply_handler: ReplyHandler) -> Self {
let test_verif_reporter = TestVerificationReporter::new(owner_id);
let (tm_sender, tm_receiver) = mpsc::channel();
Self {
id: TEST_COMPONENT_ID_0.raw(),
verif_reporter: test_verif_reporter,
reply_handler,
default_timeout: Duration::from_secs(30),
tm_sender,
tm_receiver,
phantom: std::marker::PhantomData,
}
}
pub fn add_tc(
&mut self,
apid: u16,
apid_target: u32,
time_stamp: &[u8],
) -> (verification::RequestId, ActivePusRequestStd) {
let sp_header = SpHeader::new_from_apid(apid);
let sec_header_dummy = PusTcSecondaryHeader::new_simple(0, 0);
let init = self.verif_reporter.add_tc(&PusTcCreator::new(
sp_header,
sec_header_dummy,
&[],
true,
));
let accepted = self
.verif_reporter
.acceptance_success(&self.tm_sender, init, time_stamp)
.expect("acceptance failed");
let started = self
.verif_reporter
.start_success(&self.tm_sender, accepted, time_stamp)
.expect("start failed");
(
started.request_id(),
ActivePusRequestStd::new(
UniqueApidTargetId::new(apid, apid_target).raw(),
started,
self.default_timeout,
),
)
}
pub fn handle_reply(
&mut self,
reply: &GenericMessage<Reply>,
active_request: &ActiveRequestInfo,
time_stamp: &[u8],
) -> Result<bool, ReplyHandler::Error> {
self.reply_handler.handle_reply(
reply,
active_request,
&self.tm_sender,
&self.verif_reporter,
time_stamp,
)
}
pub fn handle_unrequested_reply(
&mut self,
reply: &GenericMessage<Reply>,
) -> Result<(), ReplyHandler::Error> {
self.reply_handler
.handle_unrequested_reply(reply, &self.tm_sender)
}
pub fn handle_request_timeout(
&mut self,
active_request_info: &ActiveRequestInfo,
time_stamp: &[u8],
) -> Result<(), ReplyHandler::Error> {
self.reply_handler.handle_request_timeout(
active_request_info,
&self.tm_sender,
&self.verif_reporter,
time_stamp,
)
}
}
#[derive(Default)]
pub struct DummySender {}
/// Dummy sender component which does nothing on the [Self::send_tm] call.
///
/// Useful for unit tests.
impl EcssTmSenderCore for DummySender {
fn send_tm(&self, _source_id: ComponentId, _tm: PusTmVariant) -> Result<(), EcssTmtcError> {
Ok(())
}
}
// Testbench dedicated to the testing of [PusTcToRequestConverter]s
pub struct PusConverterTestbench<
Converter: PusTcToRequestConverter<ActiveRequestInfo, Request, Error = GenericConversionError>,
ActiveRequestInfo: ActiveRequestProvider,
Request,
> {
pub id: ComponentId,
pub verif_reporter: TestVerificationReporter,
pub converter: Converter,
dummy_sender: DummySender,
current_request_id: Option<verification::RequestId>,
current_packet: Option<Vec<u8>>,
phantom: std::marker::PhantomData<(ActiveRequestInfo, Request)>,
}
impl<
Converter: PusTcToRequestConverter<ActiveRequestInfo, Request, Error = GenericConversionError>,
ActiveRequestInfo: ActiveRequestProvider,
Request,
> PusConverterTestbench<Converter, ActiveRequestInfo, Request>
{
pub fn new(owner_id: ComponentId, converter: Converter) -> Self {
let test_verif_reporter = TestVerificationReporter::new(owner_id);
Self {
id: owner_id,
verif_reporter: test_verif_reporter,
converter,
dummy_sender: DummySender::default(),
current_request_id: None,
current_packet: None,
phantom: std::marker::PhantomData,
}
}
pub fn add_tc(&mut self, tc: &PusTcCreator) -> VerificationToken<TcStateAccepted> {
let token = self.verif_reporter.add_tc(tc);
self.current_request_id = Some(verification::RequestId::new(tc));
self.current_packet = Some(tc.to_vec().unwrap());
self.verif_reporter
.acceptance_success(&self.dummy_sender, token, &[])
.expect("acceptance failed")
}
pub fn request_id(&self) -> Option<verification::RequestId> {
self.current_request_id
}
pub fn convert(
&mut self,
token: VerificationToken<TcStateAccepted>,
time_stamp: &[u8],
expected_apid: u16,
expected_apid_target: u32,
) -> Result<(ActiveRequestInfo, Request), Converter::Error> {
if self.current_packet.is_none() {
return Err(GenericConversionError::InvalidAppData(
"call add_tc first".to_string(),
));
}
let current_packet = self.current_packet.take().unwrap();
let tc_reader = PusTcReader::new(&current_packet).unwrap();
let (active_info, request) = self.converter.convert(
token,
&tc_reader.0,
&self.dummy_sender,
&self.verif_reporter,
time_stamp,
)?;
assert_eq!(
active_info.token().request_id(),
self.request_id().expect("no request id is set")
);
assert_eq!(
active_info.target_id(),
UniqueApidTargetId::new(expected_apid, expected_apid_target).raw()
);
Ok((active_info, request))
}
}
pub struct TargetedPusRequestTestbench<
RequestConverter: PusTcToRequestConverter<ActiveRequestInfo, RequestType, Error = GenericConversionError>,
ReplyHandler: PusReplyHandler<ActiveRequestInfo, ReplyType, Error = EcssTmtcError>,
ActiveRequestMap: ActiveRequestMapProvider<ActiveRequestInfo>,
ActiveRequestInfo: ActiveRequestProvider,
RequestType,
ReplyType,
> {
pub service: PusTargetedRequestService<
MpscTcReceiver,
MpscTmAsVecSender,
EcssTcInVecConverter,
TestVerificationReporter,
RequestConverter,
ReplyHandler,
ActiveRequestMap,
ActiveRequestInfo,
RequestType,
ReplyType,
>,
pub request_id: Option<RequestId>,
pub tm_funnel_rx: mpsc::Receiver<PusTmAsVec>,
pub pus_packet_tx: mpsc::Sender<EcssTcAndToken>,
pub reply_tx: mpsc::Sender<GenericMessage<ReplyType>>,
pub request_rx: mpsc::Receiver<GenericMessage<CompositeRequest>>,
}
}

75
src/pus/stack.rs Normal file
View File

@ -0,0 +1,75 @@
// use crate::pus::mode::ModeServiceWrapper;
use derive_new::new;
use satrs::{
pus::{EcssTcInMemConverter, EcssTmSenderCore},
spacepackets::time::{cds, TimeWriter},
};
use crate::pus::HandlingStatus;
use crate::pus::test::TestCustomServiceWrapper;
// use super::{
// action::ActionServiceWrapper, event::EventServiceWrapper, hk::HkServiceWrapper,
// scheduler::SchedulingServiceWrapper, test::TestCustomServiceWrapper, HandlingStatus,
// TargetedPusService,
// };
#[derive(new)]
pub struct PusStack<TmSender: EcssTmSenderCore, TcInMemConverter: EcssTcInMemConverter> {
test_srv: TestCustomServiceWrapper<TmSender, TcInMemConverter>,
// hk_srv_wrapper: HkServiceWrapper<TmSender, TcInMemConverter>,
// event_srv: EventServiceWrapper<TmSender, TcInMemConverter>,
// action_srv_wrapper: ActionServiceWrapper<TmSender, TcInMemConverter>,
// schedule_srv: SchedulingServiceWrapper<TmSender, TcInMemConverter>,
// mode_srv: ModeServiceWrapper<TmSender, TcInMemConverter>,
}
impl<TmSender: EcssTmSenderCore, TcInMemConverter: EcssTcInMemConverter>
PusStack<TmSender, TcInMemConverter>
{
pub fn periodic_operation(&mut self) {
// Release all telecommands which reached their release time before calling the service
// handlers.
// self.schedule_srv.release_tcs();
let time_stamp = cds::CdsTime::now_with_u16_days()
.expect("time stamp generation error")
.to_vec()
.unwrap();
loop {
let mut nothing_to_do = true;
let mut is_srv_finished =
|tc_handling_done: bool, reply_handling_done: Option<HandlingStatus>| {
if !tc_handling_done
|| (reply_handling_done.is_some()
&& reply_handling_done.unwrap() == HandlingStatus::Empty)
{
nothing_to_do = false;
}
};
is_srv_finished(self.test_srv.poll_and_handle_next_packet(&time_stamp), None);
// is_srv_finished(self.schedule_srv.poll_and_handle_next_tc(&time_stamp), None);
// is_srv_finished(self.event_srv.poll_and_handle_next_tc(&time_stamp), None);
// is_srv_finished(
// self.action_srv_wrapper.poll_and_handle_next_tc(&time_stamp),
// Some(
// self.action_srv_wrapper
// .poll_and_handle_next_reply(&time_stamp),
// ),
// );
// is_srv_finished(
// self.hk_srv_wrapper.poll_and_handle_next_tc(&time_stamp),
// Some(self.hk_srv_wrapper.poll_and_handle_next_reply(&time_stamp)),
// );
// is_srv_finished(
// self.mode_srv.poll_and_handle_next_tc(&time_stamp),
// Some(self.mode_srv.poll_and_handle_next_reply(&time_stamp)),
// );
if nothing_to_do {
// Timeout checking is only done once.
// self.action_srv_wrapper.check_for_request_timeouts();
// self.hk_srv_wrapper.check_for_request_timeouts();
// self.mode_srv.check_for_request_timeouts();
break;
}
}
}
}

127
src/pus/test.rs Normal file
View File

@ -0,0 +1,127 @@
use crate::pus::create_verification_reporter;
use log::{info, warn};
use satrs::event_man::{EventMessage, EventMessageU32};
use satrs::pool::SharedStaticMemoryPool;
use satrs::pus::test::PusService17TestHandler;
use satrs::pus::verification::{FailParams, VerificationReporter, VerificationReportingProvider};
use satrs::pus::EcssTcInSharedStoreConverter;
use satrs::pus::{
EcssTcAndToken, EcssTcInMemConverter, EcssTcInVecConverter, EcssTmSenderCore, MpscTcReceiver,
MpscTmAsVecSender, MpscTmInSharedPoolSenderBounded, PusPacketHandlerResult, PusServiceHelper,
PusTmAsVec, PusTmInPool, TmInSharedPoolSender,
};
use satrs::spacepackets::ecss::tc::PusTcReader;
use satrs::spacepackets::ecss::PusPacket;
use satrs::spacepackets::time::cds::CdsTime;
use satrs::spacepackets::time::TimeWriter;
use std::sync::mpsc;
use std::sync::mpsc::Sender;
use ops_sat_rs::config::components::PUS_TEST_SERVICE;
use ops_sat_rs::config::tmtc_err;
pub fn create_test_service_dynamic(
tm_funnel_tx: mpsc::Sender<PusTmAsVec>,
// event_sender: mpsc::Sender<EventMessageU32>,
pus_test_rx: mpsc::Receiver<EcssTcAndToken>,
) -> TestCustomServiceWrapper<Sender<PusTmAsVec>, EcssTcInVecConverter> {
let pus17_handler = PusService17TestHandler::new(PusServiceHelper::new(
PUS_TEST_SERVICE.id(),
pus_test_rx,
tm_funnel_tx,
create_verification_reporter(PUS_TEST_SERVICE.id(), PUS_TEST_SERVICE.apid),
EcssTcInVecConverter::default(),
));
TestCustomServiceWrapper {
handler: pus17_handler,
// test_srv_event_sender: event_sender,
}
}
pub struct TestCustomServiceWrapper<
TmSender: EcssTmSenderCore,
TcInMemConverter: EcssTcInMemConverter,
> {
pub handler:
PusService17TestHandler<MpscTcReceiver, TmSender, TcInMemConverter, VerificationReporter>,
// pub test_srv_event_sender: mpsc::Sender<EventMessageU32>,
}
impl<TmSender: EcssTmSenderCore, TcInMemConverter: EcssTcInMemConverter>
TestCustomServiceWrapper<TmSender, TcInMemConverter>
{
pub fn poll_and_handle_next_packet(&mut self, time_stamp: &[u8]) -> bool {
let res = self.handler.poll_and_handle_next_tc(time_stamp);
if res.is_err() {
warn!("PUS17 handler failed with error {:?}", res.unwrap_err());
return true;
}
match res.unwrap() {
PusPacketHandlerResult::RequestHandled => {
info!("Received PUS ping command TC[17,1]");
info!("Sent ping reply PUS TM[17,2]");
}
PusPacketHandlerResult::RequestHandledPartialSuccess(partial_err) => {
warn!(
"Handled PUS ping command with partial success: {:?}",
partial_err
);
}
PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
warn!("PUS17: Subservice {subservice} not implemented")
}
// TODO: adapt interface events are implemented
PusPacketHandlerResult::CustomSubservice(subservice, token) => {
let (tc, _) = PusTcReader::new(
self.handler
.service_helper
.tc_in_mem_converter
.tc_slice_raw(),
)
.unwrap();
let time_stamper = CdsTime::now_with_u16_days().unwrap();
let mut stamp_buf: [u8; 7] = [0; 7];
time_stamper.write_to_bytes(&mut stamp_buf).unwrap();
if subservice == 128 {
info!("Generating test event");
// self.test_srv_event_sender
// .send(EventMessage::new(PUS_TEST_SERVICE.id(), TEST_EVENT.into()))
// .expect("Sending test event failed");
let start_token = self
.handler
.service_helper
.verif_reporter()
.start_success(self.handler.service_helper.tm_sender(), token, &stamp_buf)
.expect("Error sending start success");
self.handler
.service_helper
.verif_reporter()
.completion_success(
self.handler.service_helper.tm_sender(),
start_token,
&stamp_buf,
)
.expect("Error sending completion success");
} else {
let fail_data = [tc.subservice()];
self.handler
.service_helper
.verif_reporter()
.start_failure(
self.handler.service_helper.tm_sender(),
token,
FailParams::new(
&stamp_buf,
&tmtc_err::INVALID_PUS_SUBSERVICE,
&fail_data,
),
)
.expect("Sending start failure verification failed");
}
}
PusPacketHandlerResult::Empty => {
return true;
}
}
false
}
}

152
src/requests.rs Normal file
View File

@ -0,0 +1,152 @@
use std::collections::HashMap;
use std::sync::mpsc;
use log::warn;
use satrs::action::ActionRequest;
use satrs::hk::HkRequest;
use satrs::mode::ModeRequest;
use satrs::pus::verification::{
FailParams, TcStateAccepted, VerificationReportingProvider, VerificationToken,
};
use satrs::pus::{ActiveRequestProvider, EcssTmSenderCore, GenericRoutingError, PusRequestRouter};
use satrs::queue::GenericSendError;
use satrs::request::{GenericMessage, MessageMetadata, UniqueApidTargetId};
use satrs::spacepackets::ecss::tc::PusTcReader;
use satrs::spacepackets::ecss::PusPacket;
use satrs::ComponentId;
use ops_sat_rs::config::components::PUS_ROUTING_SERVICE;
use ops_sat_rs::config::tmtc_err;
#[derive(Clone, Debug)]
#[non_exhaustive]
pub enum CompositeRequest {
Hk(HkRequest),
Action(ActionRequest),
}
#[derive(Clone)]
pub struct GenericRequestRouter {
pub id: ComponentId,
// All messages which do not have a dedicated queue.
pub composite_router_map: HashMap<ComponentId, mpsc::Sender<GenericMessage<CompositeRequest>>>,
pub mode_router_map: HashMap<ComponentId, mpsc::Sender<GenericMessage<ModeRequest>>>,
}
impl Default for GenericRequestRouter {
fn default() -> Self {
Self {
id: PUS_ROUTING_SERVICE.raw(),
composite_router_map: Default::default(),
mode_router_map: Default::default(),
}
}
}
impl GenericRequestRouter {
pub(crate) fn handle_error_generic(
&self,
active_request: &impl ActiveRequestProvider,
tc: &PusTcReader,
error: GenericRoutingError,
tm_sender: &(impl EcssTmSenderCore + ?Sized),
verif_reporter: &impl VerificationReportingProvider,
time_stamp: &[u8],
) {
warn!(
"Routing request for service {} failed: {error:?}",
tc.service()
);
let accepted_token: VerificationToken<TcStateAccepted> = active_request
.token()
.try_into()
.expect("token is not in accepted state");
match error {
GenericRoutingError::UnknownTargetId(id) => {
let apid_target_id = UniqueApidTargetId::from(id);
warn!("Target APID for request: {}", apid_target_id.apid);
warn!("Target Unique ID for request: {}", apid_target_id.unique_id);
let mut fail_data: [u8; 8] = [0; 8];
fail_data.copy_from_slice(&id.to_be_bytes());
verif_reporter
.completion_failure(
tm_sender,
accepted_token,
FailParams::new(time_stamp, &tmtc_err::UNKNOWN_TARGET_ID, &fail_data),
)
.expect("Sending start failure failed");
}
GenericRoutingError::Send(_) => {
let mut fail_data: [u8; 8] = [0; 8];
fail_data.copy_from_slice(&active_request.target_id().to_be_bytes());
verif_reporter
.completion_failure(
tm_sender,
accepted_token,
FailParams::new(time_stamp, &tmtc_err::ROUTING_ERROR, &fail_data),
)
.expect("Sending start failure failed");
}
}
}
}
impl PusRequestRouter<HkRequest> for GenericRequestRouter {
type Error = GenericRoutingError;
fn route(
&self,
requestor_info: MessageMetadata,
target_id: ComponentId,
hk_request: HkRequest,
) -> Result<(), Self::Error> {
if let Some(sender) = self.composite_router_map.get(&target_id) {
sender
.send(GenericMessage::new(
requestor_info,
CompositeRequest::Hk(hk_request),
))
.map_err(|_| GenericRoutingError::Send(GenericSendError::RxDisconnected))?;
return Ok(());
}
Err(GenericRoutingError::UnknownTargetId(target_id))
}
}
impl PusRequestRouter<ActionRequest> for GenericRequestRouter {
type Error = GenericRoutingError;
fn route(
&self,
requestor_info: MessageMetadata,
target_id: ComponentId,
action_request: ActionRequest,
) -> Result<(), Self::Error> {
if let Some(sender) = self.composite_router_map.get(&target_id) {
sender
.send(GenericMessage::new(
requestor_info,
CompositeRequest::Action(action_request),
))
.map_err(|_| GenericRoutingError::Send(GenericSendError::RxDisconnected))?;
return Ok(());
}
Err(GenericRoutingError::UnknownTargetId(target_id))
}
}
impl PusRequestRouter<ModeRequest> for GenericRequestRouter {
type Error = GenericRoutingError;
fn route(
&self,
requestor_info: MessageMetadata,
target_id: ComponentId,
request: ModeRequest,
) -> Result<(), Self::Error> {
if let Some(sender) = self.mode_router_map.get(&target_id) {
sender
.send(GenericMessage::new(requestor_info, request))
.map_err(|_| GenericRoutingError::Send(GenericSendError::RxDisconnected))?;
return Ok(());
}
Err(GenericRoutingError::UnknownTargetId(target_id))
}
}

157
src/tm_funnel.rs Normal file
View File

@ -0,0 +1,157 @@
use std::{
collections::HashMap,
sync::mpsc::{self},
};
use log::info;
use satrs::pus::{PusTmAsVec, PusTmInPool};
use satrs::{
pool::PoolProvider,
seq_count::{CcsdsSimpleSeqCountProvider, SequenceCountProviderCore},
spacepackets::{
ecss::{tm::PusTmZeroCopyWriter, PusPacket},
time::cds::MIN_CDS_FIELD_LEN,
CcsdsPacket,
},
tmtc::tm_helper::SharedTmPool,
};
use crate::interface::tcp::SyncTcpTmSource;
#[derive(Default)]
pub struct CcsdsSeqCounterMap {
apid_seq_counter_map: HashMap<u16, CcsdsSimpleSeqCountProvider>,
}
impl CcsdsSeqCounterMap {
pub fn get_and_increment(&mut self, apid: u16) -> u16 {
self.apid_seq_counter_map
.entry(apid)
.or_default()
.get_and_increment()
}
}
pub struct TmFunnelCommon {
seq_counter_map: CcsdsSeqCounterMap,
msg_counter_map: HashMap<u8, u16>,
sync_tm_tcp_source: SyncTcpTmSource,
}
impl TmFunnelCommon {
pub fn new(sync_tm_tcp_source: SyncTcpTmSource) -> Self {
Self {
seq_counter_map: Default::default(),
msg_counter_map: Default::default(),
sync_tm_tcp_source,
}
}
// Applies common packet processing operations for PUS TM packets. This includes setting
// a sequence counter
fn apply_packet_processing(&mut self, mut zero_copy_writer: PusTmZeroCopyWriter) {
// zero_copy_writer.set_apid(PUS_APID);
zero_copy_writer.set_seq_count(
self.seq_counter_map
.get_and_increment(zero_copy_writer.apid()),
);
let entry = self
.msg_counter_map
.entry(zero_copy_writer.service())
.or_insert(0);
zero_copy_writer.set_msg_count(*entry);
if *entry == u16::MAX {
*entry = 0;
} else {
*entry += 1;
}
Self::packet_printout(&zero_copy_writer);
// This operation has to come last!
zero_copy_writer.finish();
}
fn packet_printout(tm: &PusTmZeroCopyWriter) {
info!("Sending PUS TM[{},{}]", tm.service(), tm.subservice());
}
}
pub struct TmFunnelStatic {
common: TmFunnelCommon,
shared_tm_store: SharedTmPool,
tm_funnel_rx: mpsc::Receiver<PusTmInPool>,
tm_server_tx: mpsc::SyncSender<PusTmInPool>,
}
impl TmFunnelStatic {
pub fn new(
shared_tm_store: SharedTmPool,
sync_tm_tcp_source: SyncTcpTmSource,
tm_funnel_rx: mpsc::Receiver<PusTmInPool>,
tm_server_tx: mpsc::SyncSender<PusTmInPool>,
) -> Self {
Self {
common: TmFunnelCommon::new(sync_tm_tcp_source),
shared_tm_store,
tm_funnel_rx,
tm_server_tx,
}
}
pub fn operation(&mut self) {
if let Ok(pus_tm_in_pool) = self.tm_funnel_rx.recv() {
// Read the TM, set sequence counter and message counter, and finally update
// the CRC.
let shared_pool = self.shared_tm_store.clone_backing_pool();
let mut pool_guard = shared_pool.write().expect("Locking TM pool failed");
let mut tm_copy = Vec::new();
pool_guard
.modify(&pus_tm_in_pool.store_addr, |buf| {
let zero_copy_writer = PusTmZeroCopyWriter::new(buf, MIN_CDS_FIELD_LEN)
.expect("Creating TM zero copy writer failed");
self.common.apply_packet_processing(zero_copy_writer);
tm_copy = buf.to_vec()
})
.expect("Reading TM from pool failed");
self.tm_server_tx
.send(pus_tm_in_pool)
.expect("Sending TM to server failed");
// We could also do this step in the update closure, but I'd rather avoid this, could
// lead to nested locking.
self.common.sync_tm_tcp_source.add_tm(&tm_copy);
}
}
}
pub struct TmFunnelDynamic {
common: TmFunnelCommon,
tm_funnel_rx: mpsc::Receiver<PusTmAsVec>,
tm_server_tx: mpsc::Sender<PusTmAsVec>,
}
impl TmFunnelDynamic {
pub fn new(
sync_tm_tcp_source: SyncTcpTmSource,
tm_funnel_rx: mpsc::Receiver<PusTmAsVec>,
tm_server_tx: mpsc::Sender<PusTmAsVec>,
) -> Self {
Self {
common: TmFunnelCommon::new(sync_tm_tcp_source),
tm_funnel_rx,
tm_server_tx,
}
}
pub fn operation(&mut self) {
if let Ok(mut tm) = self.tm_funnel_rx.recv() {
// Read the TM, set sequence counter and message counter, and finally update
// the CRC.
let zero_copy_writer = PusTmZeroCopyWriter::new(&mut tm.packet, MIN_CDS_FIELD_LEN)
.expect("Creating TM zero copy writer failed");
self.common.apply_packet_processing(zero_copy_writer);
self.common.sync_tm_tcp_source.add_tm(&tm.packet);
self.tm_server_tx
.send(tm)
.expect("Sending TM to server failed");
}
}
}

View File

@ -4,6 +4,21 @@ use satrs::{
tmtc::ReceivesCcsdsTc,
};
use std::sync::mpsc::{self, SendError, Sender, TryRecvError};
use satrs::pool::{StoreAddr, StoreError};
use satrs::pus::{EcssTcAndToken, MpscTmAsVecSender};
use satrs::spacepackets::ecss::PusPacket;
use crate::pus::PusReceiver;
use thiserror::Error;
#[derive(Debug, Clone, PartialEq, Eq, Error)]
pub enum MpscStoreAndSendError {
#[error("Store error: {0}")]
Store(#[from] StoreError),
#[error("TC send error: {0}")]
TcSend(#[from] SendError<EcssTcAndToken>),
#[error("TMTC send error: {0}")]
TmTcSend(#[from] SendError<StoreAddr>),
}
// Newtype, can not implement necessary traits on MPSC sender directly because of orphan rules.
#[derive(Clone)]
@ -30,17 +45,17 @@ impl ReceivesCcsdsTc for PusTcSourceProviderDynamic {
// TC source components where the heap is the backing memory of the received telecommands.
pub struct TcSourceTaskDynamic {
pub tc_receiver: mpsc::Receiver<Vec<u8>>,
// pus_receiver: PusReceiver<MpscTmAsVecSender>,
pus_receiver: PusReceiver<MpscTmAsVecSender>,
}
impl TcSourceTaskDynamic {
pub fn new(
tc_receiver: mpsc::Receiver<Vec<u8>>,
// pus_receiver: PusReceiver<MpscTmAsVecSender>,
pus_receiver: PusReceiver<MpscTmAsVecSender>,
) -> Self {
Self {
tc_receiver,
// pus_receiver,
pus_receiver,
}
}
@ -52,7 +67,6 @@ impl TcSourceTaskDynamic {
match self.tc_receiver.try_recv() {
Ok(tc) => match PusTcReader::new(&tc) {
Ok((pus_tc, _)) => {
/*
self.pus_receiver
.handle_tc_packet(
satrs::pus::TcInMemory::Vec(tc.clone()),
@ -60,7 +74,6 @@ impl TcSourceTaskDynamic {
&pus_tc,
)
.ok();
*/
true
}
Err(e) => {