Compare commits

...

21 Commits

Author SHA1 Message Date
5ec5124ea3 Updated events modules and docs
All checks were successful
Rust/sat-rs/pipeline/pr-main This commit looks good
2024-04-24 14:30:45 +02:00
bfaddd0ebb Merge pull request 'prep next release' (#171) from pre-v0.2.0-rc.4 into main
All checks were successful
Rust/sat-rs/pipeline/head This commit looks good
Reviewed-on: #171
2024-04-23 16:32:03 +02:00
423a068736 prep next release
All checks were successful
Rust/sat-rs/pipeline/head This commit looks good
2024-04-23 14:55:19 +02:00
8022af1bf2 Merge pull request 'update Python client for example' (#170) from update-example-pyclient into main
All checks were successful
Rust/sat-rs/pipeline/head This commit looks good
Reviewed-on: #170
2024-04-23 14:52:04 +02:00
acd2260dfd update Python client for example
All checks were successful
Rust/sat-rs/pipeline/head This commit looks good
2024-04-23 14:22:50 +02:00
e5ee698dc4 Merge pull request 'TCP server improvements' (#169) from tcp-ip-improvements into main
Some checks failed
Rust/sat-rs/pipeline/head There was a failure building this commit
Reviewed-on: #169
2024-04-23 13:21:41 +02:00
e8907c74d4 changelog
All checks were successful
Rust/sat-rs/pipeline/head This commit looks good
2024-04-23 11:23:00 +02:00
536051e05b improvements and fixes
All checks were successful
Rust/sat-rs/pipeline/head This commit looks good
2024-04-22 20:29:14 +02:00
701db659e9 Merge pull request 'formatting' (#168) from fmt into main
All checks were successful
Rust/sat-rs/pipeline/head This commit looks good
Reviewed-on: #168
2024-04-22 15:47:58 +02:00
4b8e54b91b formatting
Some checks are pending
Rust/sat-rs/pipeline/head This commit looks good
Rust/sat-rs/pipeline/pr-main Build started...
2024-04-22 10:42:49 +02:00
870d60cfd6 Merge pull request 'bugfix and improvements for CCSDS SP decoder' (#167) from ccsds-decoder-bugfix into main
Some checks failed
Rust/sat-rs/pipeline/head There was a failure building this commit
Reviewed-on: #167
2024-04-22 10:23:12 +02:00
9e62e4292c bugfix and improvements for CCSDS SP decoder
Some checks are pending
Rust/sat-rs/pipeline/pr-main Build started...
2024-04-20 11:19:46 +02:00
b2e77fbc09 Merge pull request 'requires another hotfix' (#166) from and-another-docs-rs-hotfix into main
All checks were successful
Rust/sat-rs/pipeline/head This commit looks good
Reviewed-on: #166
2024-04-17 20:42:09 +02:00
5371928496 docs_rs build argument hotfix
Some checks are pending
Rust/sat-rs/pipeline/pr-main Build queued...
2024-04-17 20:41:30 +02:00
31cddbd99b Merge pull request 'bump msrv version' (#165) from bump-msrv into main
All checks were successful
Rust/sat-rs/pipeline/head This commit looks good
Reviewed-on: #165
2024-04-17 18:56:21 +02:00
7c00e13e70 bump msrv version
All checks were successful
Rust/sat-rs/pipeline/head This commit looks good
2024-04-17 18:10:32 +02:00
aa72063454 Merge pull request 'prepare next release candidate' (#164) from prep-v0.2.0-rc.2 into main
All checks were successful
Rust/sat-rs/pipeline/head This commit looks good
Reviewed-on: #164
2024-04-17 18:03:28 +02:00
7b37b76695 prepare next release candidate
All checks were successful
Rust/sat-rs/pipeline/head This commit looks good
2024-04-17 17:19:38 +02:00
ea5d95c12d Merge pull request 'why is this an issue for docs-rs?' (#163) from fix-for-docs-build-docs-rs into main
All checks were successful
Rust/sat-rs/pipeline/head This commit looks good
Reviewed-on: #163
2024-04-17 17:09:36 +02:00
c62adbb300 Merge branch 'main' into fix-for-docs-build-docs-rs
Some checks are pending
Rust/sat-rs/pipeline/pr-main Build started...
2024-04-17 16:41:45 +02:00
4a27d2605d why is this an issue for docs-rs?
Some checks are pending
Rust/sat-rs/pipeline/head Build queued...
Rust/sat-rs/pipeline/pr-main This commit looks good
2024-04-17 16:34:56 +02:00
28 changed files with 397 additions and 333 deletions

View File

@@ -1,14 +1,17 @@
# Events
Events can be an extremely important mechanism used for remote systems to monitor unexpected
or expected anomalies and events occuring on these systems. They are oftentimes tied to
Events are an important mechanism used for remote systems to monitor unexpected
or expected anomalies and events occuring on these systems.
One common use case for events on remote systems is to offer a light-weight publish-subscribe
mechanism and IPC mechanism for software and hardware events which are also packaged as telemetry
(TM) or can trigger a system response. They can also be tied to
Fault Detection, Isolation and Recovery (FDIR) operations, which need to happen autonomously.
Events can also be used as a convenient Inter-Process Communication (IPC) mechansism, which is
also observable for the Ground segment. The PUS Service 5 standardizes how the ground interface
for events might look like, but does not specify how other software components might react
to those events. There is the PUS Service 19, which might be used for that purpose, but the
event components recommended by this framework do not really need this service.
The PUS Service 5 standardizes how the ground interface for events might look like, but does not
specify how other software components might react to those events. There is the PUS Service 19,
which might be used for that purpose, but the event components recommended by this framework do not
rely on the present of this service.
The following images shows how the flow of events could look like in a system where components
can generate events, and where other system components might be interested in those events:

View File

@@ -10,6 +10,7 @@ class Apid(enum.IntEnum):
GENERIC_PUS = 2
ACS = 3
CFDP = 4
TMTC = 5
class EventSeverity(enum.IntEnum):

View File

@@ -103,7 +103,9 @@ class PusHandler(GenericApidHandlerBase):
def handle_tm(self, apid: int, packet: bytes, _user_args: Any):
try:
pus_tm = PusTelemetry.unpack(packet, time_reader=CdsShortTimestamp.empty())
pus_tm = PusTelemetry.unpack(
packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE
)
except ValueError as e:
_LOGGER.warning("Could not generate PUS TM object from raw data")
_LOGGER.warning(f"Raw Packet: [{packet.hex(sep=',')}], REPR: {packet!r}")
@@ -111,7 +113,7 @@ class PusHandler(GenericApidHandlerBase):
service = pus_tm.service
if service == 1:
tm_packet = Service1Tm.unpack(
data=packet, params=UnpackParams(CdsShortTimestamp.empty(), 1, 2)
data=packet, params=UnpackParams(CdsShortTimestamp.TIMESTAMP_SIZE, 1, 2)
)
res = self.verif_wrapper.add_tm(tm_packet)
if res is None:
@@ -128,7 +130,9 @@ class PusHandler(GenericApidHandlerBase):
elif service == 3:
_LOGGER.info("No handling for HK packets implemented")
_LOGGER.info(f"Raw packet: 0x[{packet.hex(sep=',')}]")
pus_tm = PusTelemetry.unpack(packet, time_reader=CdsShortTimestamp.empty())
pus_tm = PusTelemetry.unpack(
packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE
)
if pus_tm.subservice == 25:
if len(pus_tm.source_data) < 8:
raise ValueError("No addressable ID in HK packet")
@@ -136,16 +140,18 @@ class PusHandler(GenericApidHandlerBase):
_LOGGER.info(json_str)
elif service == 5:
tm_packet = PusTelemetry.unpack(
packet, time_reader=CdsShortTimestamp.empty()
packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE
)
src_data = tm_packet.source_data
event_u32 = EventU32.unpack(src_data)
_LOGGER.info(f"Received event packet. Event: {event_u32}")
_LOGGER.info(
f"Received event packet. Source APID: {Apid(tm_packet.apid)!r}, Event: {event_u32}"
)
if event_u32.group_id == 0 and event_u32.unique_id == 0:
_LOGGER.info("Received test event")
elif service == 17:
tm_packet = Service17Tm.unpack(
packet, time_reader=CdsShortTimestamp.empty()
packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE
)
if tm_packet.subservice == 2:
self.file_logger.info("Received Ping Reply TM[17,2]")
@@ -162,7 +168,7 @@ class PusHandler(GenericApidHandlerBase):
f"The service {service} is not implemented in Telemetry Factory"
)
tm_packet = PusTelemetry.unpack(
packet, time_reader=CdsShortTimestamp.empty()
packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE
)
self.raw_logger.log_tm(pus_tm)
@@ -197,15 +203,15 @@ class TcHandler(TcHandlerBase):
_LOGGER.info(log_entry.log_str)
def queue_finished_cb(self, info: ProcedureWrapper):
if info.proc_type == TcProcedureType.DEFAULT:
def_proc = info.to_def_procedure()
if info.proc_type == TcProcedureType.TREE_COMMANDING:
def_proc = info.to_tree_commanding_procedure()
_LOGGER.info(f"Queue handling finished for command {def_proc.cmd_path}")
def feed_cb(self, info: ProcedureWrapper, wrapper: FeedWrapper):
q = self.queue_helper
q.queue_wrapper = wrapper.queue_wrapper
if info.proc_type == TcProcedureType.DEFAULT:
def_proc = info.to_def_procedure()
if info.proc_type == TcProcedureType.TREE_COMMANDING:
def_proc = info.to_tree_commanding_procedure()
assert def_proc.cmd_path is not None
pus_tc.pack_pus_telecommands(q, def_proc.cmd_path)
@@ -256,6 +262,7 @@ def main():
while True:
state = tmtc_backend.periodic_op(None)
if state.request == BackendRequest.TERMINATION_NO_ERROR:
tmtc_backend.close_com_if()
sys.exit(0)
elif state.request == BackendRequest.DELAY_IDLE:
_LOGGER.info("TMTC Client in IDLE mode")
@@ -270,6 +277,7 @@ def main():
elif state.request == BackendRequest.CALL_NEXT:
pass
except KeyboardInterrupt:
tmtc_backend.close_com_if()
sys.exit(0)

View File

@@ -1,2 +1,2 @@
tmtccmd == 8.0.0rc1
tmtccmd == 8.0.0rc2
# -e git+https://github.com/robamu-org/tmtccmd@97e5e51101a08b21472b3ddecc2063359f7e307a#egg=tmtccmd

View File

@@ -8,13 +8,10 @@ use satrs::pus::verification::VerificationReporter;
use satrs::pus::EcssTmSender;
use satrs::request::UniqueApidTargetId;
use satrs::{
event_man::{
EventManagerWithBoundedMpsc, EventSendProvider, EventU32SenderMpscBounded,
MpscEventReceiver,
},
event_man::{EventManagerWithBoundedMpsc, EventSendProvider, EventU32SenderMpscBounded},
pus::{
event_man::{
DefaultPusEventU32Dispatcher, EventReporter, EventRequest, EventRequestWithToken,
DefaultPusEventU32TmCreator, EventReporter, EventRequest, EventRequestWithToken,
},
verification::{TcStateStarted, VerificationReportingProvider, VerificationToken},
},
@@ -40,13 +37,12 @@ impl EventTmHookProvider for EventApidSetter {
/// packets. It also handles the verification completion of PUS event service requests.
pub struct PusEventHandler<TmSender: EcssTmSender> {
event_request_rx: mpsc::Receiver<EventRequestWithToken>,
pus_event_dispatcher: DefaultPusEventU32Dispatcher<()>,
pus_event_tm_creator: DefaultPusEventU32TmCreator<EventApidSetter>,
pus_event_man_rx: mpsc::Receiver<EventMessageU32>,
tm_sender: TmSender,
time_provider: CdsTime,
timestamp: [u8; 7],
verif_handler: VerificationReporter,
event_apid_setter: EventApidSetter,
}
impl<TmSender: EcssTmSender> PusEventHandler<TmSender> {
@@ -61,9 +57,16 @@ impl<TmSender: EcssTmSender> PusEventHandler<TmSender> {
// All events sent to the manager are routed to the PUS event manager, which generates PUS event
// telemetry for each event.
let event_reporter = EventReporter::new(PUS_EVENT_MANAGEMENT.raw(), 0, 0, 128).unwrap();
let event_reporter = EventReporter::new_with_hook(
PUS_EVENT_MANAGEMENT.raw(),
0,
0,
128,
EventApidSetter::default(),
)
.unwrap();
let pus_event_dispatcher =
DefaultPusEventU32Dispatcher::new_with_default_backend(event_reporter);
DefaultPusEventU32TmCreator::new_with_default_backend(event_reporter);
let pus_event_man_send_provider = EventU32SenderMpscBounded::new(
PUS_EVENT_MANAGEMENT.raw(),
pus_event_man_tx,
@@ -75,13 +78,12 @@ impl<TmSender: EcssTmSender> PusEventHandler<TmSender> {
Self {
event_request_rx,
pus_event_dispatcher,
pus_event_tm_creator: pus_event_dispatcher,
pus_event_man_rx,
time_provider: CdsTime::new_with_u16_days(0, 0),
timestamp: [0; 7],
verif_handler,
tm_sender,
event_apid_setter: EventApidSetter::default(),
}
}
@@ -95,75 +97,105 @@ impl<TmSender: EcssTmSender> PusEventHandler<TmSender> {
.completion_success(&self.tm_sender, started_token, timestamp)
.expect("Sending completion success failed");
};
// handle event requests
if let Ok(event_req) = self.event_request_rx.try_recv() {
match event_req.request {
EventRequest::Enable(event) => {
self.pus_event_dispatcher
.enable_tm_for_event(&event)
.expect("Enabling TM failed");
update_time(&mut self.time_provider, &mut self.timestamp);
report_completion(event_req, &self.timestamp);
}
EventRequest::Disable(event) => {
self.pus_event_dispatcher
.disable_tm_for_event(&event)
.expect("Disabling TM failed");
update_time(&mut self.time_provider, &mut self.timestamp);
report_completion(event_req, &self.timestamp);
}
loop {
// handle event requests
match self.event_request_rx.try_recv() {
Ok(event_req) => match event_req.request {
EventRequest::Enable(event) => {
self.pus_event_tm_creator
.enable_tm_for_event(&event)
.expect("Enabling TM failed");
update_time(&mut self.time_provider, &mut self.timestamp);
report_completion(event_req, &self.timestamp);
}
EventRequest::Disable(event) => {
self.pus_event_tm_creator
.disable_tm_for_event(&event)
.expect("Disabling TM failed");
update_time(&mut self.time_provider, &mut self.timestamp);
report_completion(event_req, &self.timestamp);
}
},
Err(e) => match e {
mpsc::TryRecvError::Empty => break,
mpsc::TryRecvError::Disconnected => {
log::warn!("all event request senders have disconnected");
break;
}
},
}
}
}
pub fn generate_pus_event_tm(&mut self) {
// Perform the generation of PUS event packets
if let Ok(event_msg) = self.pus_event_man_rx.try_recv() {
update_time(&mut self.time_provider, &mut self.timestamp);
let param_vec = event_msg.params().map_or(Vec::new(), |param| {
param.to_vec().expect("failed to convert params to vec")
});
self.event_apid_setter.next_apid = UniqueApidTargetId::from(event_msg.sender_id()).apid;
self.pus_event_dispatcher
.generate_pus_event_tm_generic(
&self.tm_sender,
&self.timestamp,
event_msg.event(),
Some(&param_vec),
)
.expect("Sending TM as event failed");
loop {
// Perform the generation of PUS event packets
match self.pus_event_man_rx.try_recv() {
Ok(event_msg) => {
update_time(&mut self.time_provider, &mut self.timestamp);
let param_vec = event_msg.params().map_or(Vec::new(), |param| {
param.to_vec().expect("failed to convert params to vec")
});
// We use the TM modification hook to set the sender APID for each event.
self.pus_event_tm_creator.reporter.tm_hook.next_apid =
UniqueApidTargetId::from(event_msg.sender_id()).apid;
self.pus_event_tm_creator
.generate_pus_event_tm_generic(
&self.tm_sender,
&self.timestamp,
event_msg.event(),
Some(&param_vec),
)
.expect("Sending TM as event failed");
}
Err(e) => match e {
mpsc::TryRecvError::Empty => break,
mpsc::TryRecvError::Disconnected => {
log::warn!("All event senders have disconnected");
break;
}
},
}
}
}
}
/// This is a thin wrapper around the event manager which also caches the sender component
/// used to send events to the event manager.
pub struct EventManagerWrapper {
pub struct EventHandler<TmSender: EcssTmSender> {
pub pus_event_handler: PusEventHandler<TmSender>,
event_manager: EventManagerWithBoundedMpsc,
event_sender: mpsc::Sender<EventMessageU32>,
}
impl EventManagerWrapper {
pub fn new() -> Self {
// The sender handle is the primary sender handle for all components which want to create events.
// The event manager will receive the RX handle to receive all the events.
let (event_sender, event_man_rx) = mpsc::channel();
let event_recv = MpscEventReceiver::new(event_man_rx);
impl<TmSender: EcssTmSender> EventHandler<TmSender> {
pub fn new(
tm_sender: TmSender,
event_rx: mpsc::Receiver<EventMessageU32>,
event_request_rx: mpsc::Receiver<EventRequestWithToken>,
) -> Self {
let mut event_manager = EventManagerWithBoundedMpsc::new(event_rx);
let pus_event_handler = PusEventHandler::new(
tm_sender,
create_verification_reporter(PUS_EVENT_MANAGEMENT.id(), PUS_EVENT_MANAGEMENT.apid),
&mut event_manager,
event_request_rx,
);
Self {
event_manager: EventManagerWithBoundedMpsc::new(event_recv),
event_sender,
pus_event_handler,
event_manager,
}
}
// Returns a cached event sender to send events to the event manager for routing.
pub fn clone_event_sender(&self) -> mpsc::Sender<EventMessageU32> {
self.event_sender.clone()
}
#[allow(dead_code)]
pub fn event_manager(&mut self) -> &mut EventManagerWithBoundedMpsc {
&mut self.event_manager
}
pub fn periodic_operation(&mut self) {
self.pus_event_handler.handle_event_requests();
self.try_event_routing();
self.pus_event_handler.generate_pus_event_tm();
}
pub fn try_event_routing(&mut self) {
let error_handler = |event_msg: &EventMessageU32, error: EventRoutingError| {
self.routing_error_handler(event_msg, error)
@@ -177,41 +209,5 @@ impl EventManagerWrapper {
}
}
pub struct EventHandler<TmSender: EcssTmSender> {
pub event_man_wrapper: EventManagerWrapper,
pub pus_event_handler: PusEventHandler<TmSender>,
}
impl<TmSender: EcssTmSender> EventHandler<TmSender> {
pub fn new(
tm_sender: TmSender,
event_request_rx: mpsc::Receiver<EventRequestWithToken>,
) -> Self {
let mut event_man_wrapper = EventManagerWrapper::new();
let pus_event_handler = PusEventHandler::new(
tm_sender,
create_verification_reporter(PUS_EVENT_MANAGEMENT.id(), PUS_EVENT_MANAGEMENT.apid),
event_man_wrapper.event_manager(),
event_request_rx,
);
Self {
event_man_wrapper,
pus_event_handler,
}
}
pub fn clone_event_sender(&self) -> mpsc::Sender<EventMessageU32> {
self.event_man_wrapper.clone_event_sender()
}
#[allow(dead_code)]
pub fn event_manager(&mut self) -> &mut EventManagerWithBoundedMpsc {
self.event_man_wrapper.event_manager()
}
pub fn periodic_operation(&mut self) {
self.pus_event_handler.handle_event_requests();
self.event_man_wrapper.try_event_routing();
self.pus_event_handler.generate_pus_event_tm();
}
}
#[cfg(test)]
mod tests {}

View File

@@ -1,3 +1,4 @@
use std::time::Duration;
use std::{
collections::{HashSet, VecDeque},
fmt::Debug,
@@ -139,7 +140,9 @@ impl<TcSender: PacketSenderRaw<Error = SendError>, SendError: Debug + 'static>
pub fn periodic_operation(&mut self) {
loop {
let result = self.0.handle_all_connections(None);
let result = self
.0
.handle_all_connections(Some(Duration::from_millis(400)));
match result {
Ok(_conn_result) => (),
Err(e) => {

View File

@@ -114,6 +114,7 @@ impl<
#[cfg(test)]
mod tests {
use std::net::Ipv4Addr;
use std::{
cell::RefCell,
collections::VecDeque,
@@ -182,7 +183,7 @@ mod tests {
#[test]
fn test_transactions() {
let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), 0);
let sock_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0);
let test_receiver = TestSender::default();
// let tc_queue = test_receiver.tc_vec.clone();
let udp_tc_server =
@@ -200,8 +201,8 @@ mod tests {
.unwrap();
let client = UdpSocket::bind("127.0.0.1:0").expect("Connecting to UDP server failed");
let client_addr = client.local_addr().unwrap();
client.connect(server_addr).unwrap();
client.send(&ping_tc).unwrap();
println!("{}", server_addr);
client.send_to(&ping_tc, server_addr).unwrap();
udp_dyn_server.periodic_operation();
{
let mut queue = udp_dyn_server.udp_tc_server.tc_sender.tc_vec.borrow_mut();

View File

@@ -11,7 +11,7 @@ use crate::events::EventHandler;
use crate::interface::udp::DynamicUdpTmHandler;
use crate::pus::stack::PusStack;
use crate::tmtc::tc_source::{TcSourceTaskDynamic, TcSourceTaskStatic};
use crate::tmtc::tm_sink::{TmFunnelDynamic, TmFunnelStatic};
use crate::tmtc::tm_sink::{TmSinkDynamic, TmSinkStatic};
use log::info;
use pus::test::create_test_service_dynamic;
use satrs::hal::std::tcp_server::ServerConfig;
@@ -54,11 +54,11 @@ fn static_tmtc_pool_main() {
let shared_tm_pool_wrapper = SharedPacketPool::new(&shared_tm_pool);
let shared_tc_pool_wrapper = SharedPacketPool::new(&shared_tc_pool);
let (tc_source_tx, tc_source_rx) = mpsc::sync_channel(50);
let (tm_funnel_tx, tm_funnel_rx) = mpsc::sync_channel(50);
let (tm_sink_tx, tm_sink_rx) = mpsc::sync_channel(50);
let (tm_server_tx, tm_server_rx) = mpsc::sync_channel(50);
let tm_funnel_tx_sender =
PacketSenderWithSharedPool::new(tm_funnel_tx.clone(), shared_tm_pool_wrapper.clone());
let tm_sink_tx_sender =
PacketSenderWithSharedPool::new(tm_sink_tx.clone(), shared_tm_pool_wrapper.clone());
let (mgm_handler_composite_tx, mgm_handler_composite_rx) =
mpsc::channel::<GenericMessage<CompositeRequest>>();
@@ -80,11 +80,12 @@ fn static_tmtc_pool_main() {
// Create event handling components
// These sender handles are used to send event requests, for example to enable or disable
// certain events.
let (event_tx, event_rx) = mpsc::sync_channel(100);
let (event_request_tx, event_request_rx) = mpsc::channel::<EventRequestWithToken>();
// The event task is the core handler to perform the event routing and TM handling as specified
// in the sat-rs documentation.
let mut event_handler = EventHandler::new(tm_funnel_tx.clone(), event_request_rx);
let mut event_handler = EventHandler::new(tm_sink_tx.clone(), event_rx, event_request_rx);
let (pus_test_tx, pus_test_rx) = mpsc::channel();
let (pus_event_tx, pus_event_rx) = mpsc::channel();
@@ -106,39 +107,39 @@ fn static_tmtc_pool_main() {
mode_tc_sender: pus_mode_tx,
};
let pus_test_service = create_test_service_static(
tm_funnel_tx_sender.clone(),
tm_sink_tx_sender.clone(),
shared_tc_pool.clone(),
event_handler.clone_event_sender(),
event_tx.clone(),
pus_test_rx,
);
let pus_scheduler_service = create_scheduler_service_static(
tm_funnel_tx_sender.clone(),
tm_sink_tx_sender.clone(),
tc_source.clone(),
pus_sched_rx,
create_sched_tc_pool(),
);
let pus_event_service = create_event_service_static(
tm_funnel_tx_sender.clone(),
tm_sink_tx_sender.clone(),
shared_tc_pool.clone(),
pus_event_rx,
event_request_tx,
);
let pus_action_service = create_action_service_static(
tm_funnel_tx_sender.clone(),
tm_sink_tx_sender.clone(),
shared_tc_pool.clone(),
pus_action_rx,
request_map.clone(),
pus_action_reply_rx,
);
let pus_hk_service = create_hk_service_static(
tm_funnel_tx_sender.clone(),
tm_sink_tx_sender.clone(),
shared_tc_pool.clone(),
pus_hk_rx,
request_map.clone(),
pus_hk_reply_rx,
);
let pus_mode_service = create_mode_service_static(
tm_funnel_tx_sender.clone(),
tm_sink_tx_sender.clone(),
shared_tc_pool.clone(),
pus_mode_rx,
request_map,
@@ -156,7 +157,7 @@ fn static_tmtc_pool_main() {
let mut tmtc_task = TcSourceTaskStatic::new(
shared_tc_pool_wrapper.clone(),
tc_source_rx,
PusTcDistributor::new(tm_funnel_tx_sender, pus_router),
PusTcDistributor::new(tm_sink_tx_sender, pus_router),
);
let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT);
@@ -186,10 +187,10 @@ fn static_tmtc_pool_main() {
)
.expect("tcp server creation failed");
let mut tm_funnel = TmFunnelStatic::new(
let mut tm_sink = TmSinkStatic::new(
shared_tm_pool_wrapper,
sync_tm_tcp_source,
tm_funnel_rx,
tm_sink_rx,
tm_server_tx,
);
@@ -209,7 +210,7 @@ fn static_tmtc_pool_main() {
mode_leaf_interface,
mgm_handler_composite_rx,
pus_hk_reply_tx,
tm_funnel_tx,
tm_sink_tx,
dummy_spi_interface,
shared_mgm_set,
);
@@ -240,9 +241,9 @@ fn static_tmtc_pool_main() {
info!("Starting TM funnel task");
let jh_tm_funnel = thread::Builder::new()
.name("TM Funnel".to_string())
.name("tm sink".to_string())
.spawn(move || loop {
tm_funnel.operation();
tm_sink.operation();
})
.unwrap();
@@ -314,10 +315,11 @@ fn dyn_tmtc_pool_main() {
// Create event handling components
// These sender handles are used to send event requests, for example to enable or disable
// certain events.
let (event_tx, event_rx) = mpsc::sync_channel(100);
let (event_request_tx, event_request_rx) = mpsc::channel::<EventRequestWithToken>();
// The event task is the core handler to perform the event routing and TM handling as specified
// in the sat-rs documentation.
let mut event_handler = EventHandler::new(tm_funnel_tx.clone(), event_request_rx);
let mut event_handler = EventHandler::new(tm_funnel_tx.clone(), event_rx, event_request_rx);
let (pus_test_tx, pus_test_rx) = mpsc::channel();
let (pus_event_tx, pus_event_rx) = mpsc::channel();
@@ -339,11 +341,8 @@ fn dyn_tmtc_pool_main() {
mode_tc_sender: pus_mode_tx,
};
let pus_test_service = create_test_service_dynamic(
tm_funnel_tx.clone(),
event_handler.clone_event_sender(),
pus_test_rx,
);
let pus_test_service =
create_test_service_dynamic(tm_funnel_tx.clone(), event_tx.clone(), pus_test_rx);
let pus_scheduler_service = create_scheduler_service_dynamic(
tm_funnel_tx.clone(),
tc_source_tx.clone(),
@@ -411,7 +410,7 @@ fn dyn_tmtc_pool_main() {
)
.expect("tcp server creation failed");
let mut tm_funnel = TmFunnelDynamic::new(sync_tm_tcp_source, tm_funnel_rx, tm_server_tx);
let mut tm_funnel = TmSinkDynamic::new(sync_tm_tcp_source, tm_funnel_rx, tm_server_tx);
let (mgm_handler_mode_reply_to_parent_tx, _mgm_handler_mode_reply_to_parent_rx) =
mpsc::channel();

View File

@@ -23,7 +23,7 @@ use super::HandlingStatus;
pub fn create_test_service_static(
tm_sender: PacketSenderWithSharedPool,
tc_pool: SharedStaticMemoryPool,
event_sender: mpsc::Sender<EventMessageU32>,
event_sender: mpsc::SyncSender<EventMessageU32>,
pus_test_rx: mpsc::Receiver<EcssTcAndToken>,
) -> TestCustomServiceWrapper<PacketSenderWithSharedPool, EcssTcInSharedStoreConverter> {
let pus17_handler = PusService17TestHandler::new(PusServiceHelper::new(
@@ -41,7 +41,7 @@ pub fn create_test_service_static(
pub fn create_test_service_dynamic(
tm_funnel_tx: mpsc::Sender<PacketAsVec>,
event_sender: mpsc::Sender<EventMessageU32>,
event_sender: mpsc::SyncSender<EventMessageU32>,
pus_test_rx: mpsc::Receiver<EcssTcAndToken>,
) -> TestCustomServiceWrapper<MpscTmAsVecSender, EcssTcInVecConverter> {
let pus17_handler = PusService17TestHandler::new(PusServiceHelper::new(
@@ -61,7 +61,7 @@ pub struct TestCustomServiceWrapper<TmSender: EcssTmSender, TcInMemConverter: Ec
{
pub handler:
PusService17TestHandler<MpscTcReceiver, TmSender, TcInMemConverter, VerificationReporter>,
pub test_srv_event_sender: mpsc::Sender<EventMessageU32>,
pub test_srv_event_sender: mpsc::SyncSender<EventMessageU32>,
}
impl<TmSender: EcssTmSender, TcInMemConverter: EcssTcInMemConverter>

View File

@@ -70,18 +70,23 @@ impl TmFunnelCommon {
}
fn packet_printout(tm: &PusTmZeroCopyWriter) {
info!("Sending PUS TM[{},{}]", tm.service(), tm.subservice());
info!(
"Sending PUS TM[{},{}] with APID {}",
tm.service(),
tm.subservice(),
tm.apid()
);
}
}
pub struct TmFunnelStatic {
pub struct TmSinkStatic {
common: TmFunnelCommon,
shared_tm_store: SharedPacketPool,
tm_funnel_rx: mpsc::Receiver<PacketInPool>,
tm_server_tx: mpsc::SyncSender<PacketInPool>,
}
impl TmFunnelStatic {
impl TmSinkStatic {
pub fn new(
shared_tm_store: SharedPacketPool,
sync_tm_tcp_source: SyncTcpTmSource,
@@ -121,13 +126,13 @@ impl TmFunnelStatic {
}
}
pub struct TmFunnelDynamic {
pub struct TmSinkDynamic {
common: TmFunnelCommon,
tm_funnel_rx: mpsc::Receiver<PacketAsVec>,
tm_server_tx: mpsc::Sender<PacketAsVec>,
}
impl TmFunnelDynamic {
impl TmSinkDynamic {
pub fn new(
sync_tm_tcp_source: SyncTcpTmSource,
tm_funnel_rx: mpsc::Receiver<PacketAsVec>,

View File

@@ -8,6 +8,41 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
# [unreleased]
# [v0.2.0-rc.5] 2024-04-23
## Changed
- Removed `MpscEventReceiver`, the `EventReceiveProvider` trait is implemented directly
on `mpsc::Receiver<EventMessage<Event>>`
- Renamed `PusEventDispatcher` to `PusEventTmCreatorWithMap`.
- Renamed `DefaultPusEventU32Dispatcher` to `DefaultPusEventU32EventCreator`.
- Renamed `PusEventMgmtBackendProvider` renamed to `PusEventReportingMap`.
# [v0.2.0-rc.4] 2024-04-23
## Changed
- The `parse_for_ccsds_space_packets` method now expects a non-mutable slice and does not copy
broken tail packets anymore. It also does not expect a mutable `next_write_idx` argument anymore.
Instead, a `ParseResult` structure is returned which contains the `packets_found` and an
optional `incomplete_tail_start` value.
## Fixed
- `parse_for_ccsds_space_packets` did not detect CCSDS space packets at the buffer end with the
smallest possible size of 7 bytes.
- TCP server component now re-registers the internal `mio::Poll` object if the client reset
the connection unexpectedly. Not doing so prevented the server from functioning properly
after a re-connect.
# [v0.2.0-rc.3] 2024-04-17
docs-rs hotfix 2
# [v0.2.0-rc.2] 2024-04-17
docs-rs hotfix
# [v0.2.0-rc.1] 2024-04-17
- `spacepackets` v0.11

View File

@@ -1,8 +1,8 @@
[package]
name = "satrs"
version = "0.2.0-rc.1"
version = "0.2.0-rc.4"
edition = "2021"
rust-version = "1.61"
rust-version = "1.71.1"
authors = ["Robin Mueller <muellerr@irs.uni-stuttgart.de>"]
description = "A framework to build software for remote systems"
homepage = "https://absatsw.irs.uni-stuttgart.de/projects/sat-rs/"
@@ -126,4 +126,4 @@ doc-images = []
[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "doc_cfg", "--generate-link-to-definition"]
rustdoc-args = ["--cfg", "docs_rs", "--generate-link-to-definition"]

View File

@@ -21,6 +21,13 @@ pub trait SpacePacketValidator {
fn validate(&self, sp_header: &SpHeader, raw_buf: &[u8]) -> SpValidity;
}
#[derive(Default, Debug, PartialEq, Eq)]
pub struct ParseResult {
pub packets_found: u32,
/// If an incomplete space packet was found, its start index is indicated by this value.
pub incomplete_tail_start: Option<usize>,
}
/// This function parses a given buffer for tightly packed CCSDS space packets. It uses the
/// [spacepackets::SpHeader] of the CCSDS packets and a user provided [SpacePacketValidator]
/// to check whether a received space packet is relevant for processing.
@@ -41,35 +48,29 @@ pub trait SpacePacketValidator {
/// 3. [SpValidity::Skip]: The parser skips the packet using the packet length determined from the
/// space packet header.
pub fn parse_buffer_for_ccsds_space_packets<SendError>(
buf: &mut [u8],
buf: &[u8],
packet_validator: &(impl SpacePacketValidator + ?Sized),
sender_id: ComponentId,
packet_sender: &(impl PacketSenderRaw<Error = SendError> + ?Sized),
next_write_idx: &mut usize,
) -> Result<u32, SendError> {
*next_write_idx = 0;
let mut packets_found = 0;
) -> Result<ParseResult, SendError> {
let mut parse_result = ParseResult::default();
let mut current_idx = 0;
let buf_len = buf.len();
loop {
if current_idx + 7 >= buf.len() {
if current_idx + 7 > buf.len() {
break;
}
let sp_header = SpHeader::from_be_bytes(&buf[current_idx..]).unwrap().0;
// let packet_id = u16::from_be_bytes(buf[current_idx..current_idx + 2].try_into().unwrap());
match packet_validator.validate(&sp_header, &buf[current_idx..]) {
SpValidity::Valid => {
let packet_size = sp_header.total_len();
if (current_idx + packet_size) <= buf_len {
packet_sender
.send_packet(sender_id, &buf[current_idx..current_idx + packet_size])?;
packets_found += 1;
parse_result.packets_found += 1;
} else {
// Move packet to start of buffer if applicable.
if current_idx > 0 {
buf.copy_within(current_idx.., 0);
*next_write_idx = buf.len() - current_idx;
}
parse_result.incomplete_tail_start = Some(current_idx);
}
current_idx += packet_size;
continue;
@@ -83,14 +84,14 @@ pub fn parse_buffer_for_ccsds_space_packets<SendError>(
}
}
}
Ok(packets_found)
Ok(parse_result)
}
#[cfg(test)]
mod tests {
use spacepackets::{
ecss::{tc::PusTcCreator, WritablePusPacket},
CcsdsPacket, PacketId, SpHeader,
CcsdsPacket, PacketId, PacketSequenceCtrl, PacketType, SequenceFlags, SpHeader,
};
use crate::{encoding::tests::TcCacher, ComponentId};
@@ -136,17 +137,15 @@ mod tests {
.write_to_bytes(&mut buffer)
.expect("writing packet failed");
let tc_cacher = TcCacher::default();
let mut next_write_idx = 0;
let parse_result = parse_buffer_for_ccsds_space_packets(
&mut buffer,
&buffer,
&SimpleVerificator::default(),
PARSER_ID,
&tc_cacher,
&mut next_write_idx,
);
assert!(parse_result.is_ok());
let parsed_packets = parse_result.unwrap();
assert_eq!(parsed_packets, 1);
let parse_result = parse_result.unwrap();
assert_eq!(parse_result.packets_found, 1);
let mut queue = tc_cacher.tc_queue.borrow_mut();
assert_eq!(queue.len(), 1);
let packet_with_sender = queue.pop_front().unwrap();
@@ -167,17 +166,15 @@ mod tests {
.write_to_bytes(&mut buffer[packet_len_ping..])
.expect("writing packet failed");
let tc_cacher = TcCacher::default();
let mut next_write_idx = 0;
let parse_result = parse_buffer_for_ccsds_space_packets(
&mut buffer,
&buffer,
&SimpleVerificator::default(),
PARSER_ID,
&tc_cacher,
&mut next_write_idx,
);
assert!(parse_result.is_ok());
let parsed_packets = parse_result.unwrap();
assert_eq!(parsed_packets, 2);
let parse_result = parse_result.unwrap();
assert_eq!(parse_result.packets_found, 2);
let mut queue = tc_cacher.tc_queue.borrow_mut();
assert_eq!(queue.len(), 2);
let packet_with_addr = queue.pop_front().unwrap();
@@ -205,18 +202,12 @@ mod tests {
.write_to_bytes(&mut buffer[packet_len_ping..])
.expect("writing packet failed");
let tc_cacher = TcCacher::default();
let mut next_write_idx = 0;
let verificator = SimpleVerificator::new_with_second_id();
let parse_result = parse_buffer_for_ccsds_space_packets(
&mut buffer,
&verificator,
PARSER_ID,
&tc_cacher,
&mut next_write_idx,
);
let parse_result =
parse_buffer_for_ccsds_space_packets(&buffer, &verificator, PARSER_ID, &tc_cacher);
assert!(parse_result.is_ok());
let parsed_packets = parse_result.unwrap();
assert_eq!(parsed_packets, 2);
let parse_result = parse_result.unwrap();
assert_eq!(parse_result.packets_found, 2);
let mut queue = tc_cacher.tc_queue.borrow_mut();
assert_eq!(queue.len(), 2);
let packet_with_addr = queue.pop_front().unwrap();
@@ -242,24 +233,24 @@ mod tests {
.write_to_bytes(&mut buffer[packet_len_ping..])
.expect("writing packet failed");
let tc_cacher = TcCacher::default();
let mut next_write_idx = 0;
let verificator = SimpleVerificator::new_with_second_id();
let parse_result = parse_buffer_for_ccsds_space_packets(
&mut buffer[..packet_len_ping + packet_len_action - 4],
&buffer[..packet_len_ping + packet_len_action - 4],
&verificator,
PARSER_ID,
&tc_cacher,
&mut next_write_idx,
);
assert!(parse_result.is_ok());
let parsed_packets = parse_result.unwrap();
assert_eq!(parsed_packets, 1);
let parse_result = parse_result.unwrap();
assert_eq!(parse_result.packets_found, 1);
assert!(parse_result.incomplete_tail_start.is_some());
let incomplete_tail_idx = parse_result.incomplete_tail_start.unwrap();
assert_eq!(incomplete_tail_idx, packet_len_ping);
let queue = tc_cacher.tc_queue.borrow();
assert_eq!(queue.len(), 1);
// The broken packet was moved to the start, so the next write index should be after the
// last segment missing 4 bytes.
assert_eq!(next_write_idx, packet_len_action - 4);
}
#[test]
@@ -273,19 +264,36 @@ mod tests {
let tc_cacher = TcCacher::default();
let verificator = SimpleVerificator::new_with_second_id();
let mut next_write_idx = 0;
let parse_result = parse_buffer_for_ccsds_space_packets(
&mut buffer[..packet_len_ping - 4],
&buffer[..packet_len_ping - 4],
&verificator,
PARSER_ID,
&tc_cacher,
&mut next_write_idx,
);
assert_eq!(next_write_idx, 0);
assert!(parse_result.is_ok());
let parsed_packets = parse_result.unwrap();
assert_eq!(parsed_packets, 0);
let parse_result = parse_result.unwrap();
assert_eq!(parse_result.packets_found, 0);
let queue = tc_cacher.tc_queue.borrow();
assert_eq!(queue.len(), 0);
}
#[test]
fn test_smallest_packet() {
let ccsds_header_only = SpHeader::new(
PacketId::new(PacketType::Tc, true, TEST_APID_0),
PacketSequenceCtrl::new(SequenceFlags::Unsegmented, 0),
0,
);
let mut buf: [u8; 7] = [0; 7];
ccsds_header_only
.write_to_be_bytes(&mut buf)
.expect("writing failed");
let verificator = SimpleVerificator::default();
let tc_cacher = TcCacher::default();
let parse_result =
parse_buffer_for_ccsds_space_packets(&buf, &verificator, PARSER_ID, &tc_cacher);
assert!(parse_result.is_ok());
let parse_result = parse_result.unwrap();
assert_eq!(parse_result.packets_found, 1);
}
}

View File

@@ -1,14 +1,12 @@
//! Event management and forwarding
//!
//! This module provides components to perform event routing. The most important component for this
//! task is the [EventManager]. It receives all events and then routes them to event subscribers
//! where appropriate. One common use case for satellite systems is to offer a light-weight
//! publish-subscribe mechanism and IPC mechanism for software and hardware events which are also
//! packaged as telemetry (TM) or can trigger a system response.
//!
//! It is recommended to read the
//! [sat-rs book chapter](https://absatsw.irs.uni-stuttgart.de/projects/sat-rs/book/events.html)
//! about events first:
//! about events first.
//!
//! This module provides components to perform event routing. The most important component for this
//! task is the [EventManager]. It receives all events and then routes them to event subscribers
//! where appropriate.
//!
//! The event manager has a listener table abstracted by the [ListenerMapProvider], which maps
//! listener groups identified by [ListenerKey]s to a [listener ID][ComponentId].
@@ -21,8 +19,8 @@
//!
//! 1. Provide a concrete [EventReceiveProvider] implementation. This abstraction allow to use different
//! message queue backends. A straightforward implementation where dynamic memory allocation is
//! not a big concern could use [std::sync::mpsc::channel] to do this and is provided in
//! form of the [MpscEventReceiver].
//! not a big concern would be to use the [std::sync::mpsc::Receiver] handle. The trait is
//! already implemented for this type.
//! 2. To set up event creators, create channel pairs using some message queue implementation.
//! Each event creator gets a (cloned) sender component which allows it to send events to the
//! manager.
@@ -44,6 +42,12 @@
//! You can check [integration test](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs/tests/pus_events.rs)
//! for a concrete example using multi-threading where events are routed to
//! different threads.
//!
//! The [satrs-example](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-example)
//! also contains a full event manager instance and exposes a test event via the PUS test service.
//! The [PUS event](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-example/src/pus/event.rs)
//! module and the generic [events module](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs-example/src/events.rs)
//! show how the event management modules can be integrated into a more complex software.
use crate::events::{EventU16, EventU32, GenericEvent, LargestEventRaw, LargestGroupIdRaw};
use crate::params::Params;
use crate::queue::GenericSendError;
@@ -157,9 +161,10 @@ pub trait SenderMapProvider<
/// * `SenderMap`: [SenderMapProvider] which maps channel IDs to send providers.
/// * `ListenerMap`: [ListenerMapProvider] which maps listener keys to channel IDs.
/// * `EventSender`: [EventSendProvider] contained within the sender map which sends the events.
/// * `Ev`: The event type. This type must implement the [GenericEvent]. Currently only [EventU32]
/// * `Event`: The event type. This type must implement the [GenericEvent]. Currently only [EventU32]
/// and [EventU16] are supported.
/// * `Data`: Auxiliary data which is sent with the event to provide optional context information
/// * `ParamProvider`: Auxiliary data which is sent with the event to provide optional context
/// information
pub struct EventManager<
EventReceiver: EventReceiveProvider<Event, ParamProvider>,
SenderMap: SenderMapProvider<EventSender, Event, ParamProvider>,
@@ -331,11 +336,11 @@ pub mod alloc_mod {
/// Helper type which constrains the sender map and listener map generics to the [DefaultSenderMap]
/// and the [DefaultListenerMap]. It uses regular mpsc channels as the message queue backend.
pub type EventManagerWithMpsc<EV = EventU32, AUX = Params> = EventManager<
MpscEventReceiver,
DefaultSenderMap<EventSenderMpsc<EV>, EV, AUX>,
pub type EventManagerWithMpsc<Event = EventU32, ParamProvider = Params> = EventManager<
EventU32ReceiverMpsc<ParamProvider>,
DefaultSenderMap<EventSenderMpsc<Event>, Event, ParamProvider>,
DefaultListenerMap,
EventSenderMpsc<EV>,
EventSenderMpsc<Event>,
>;
/// Helper type which constrains the sender map and listener map generics to the [DefaultSenderMap]
@@ -343,7 +348,7 @@ pub mod alloc_mod {
/// [bounded mpsc senders](https://doc.rust-lang.org/std/sync/mpsc/struct.SyncSender.html) as the
/// message queue backend.
pub type EventManagerWithBoundedMpsc<Event = EventU32, ParamProvider = Params> = EventManager<
MpscEventReceiver,
EventU32ReceiverMpsc<ParamProvider>,
DefaultSenderMap<EventSenderMpscBounded<Event>, Event, ParamProvider>,
DefaultListenerMap,
EventSenderMpscBounded<Event>,
@@ -479,20 +484,16 @@ pub mod std_mod {
use super::*;
use std::sync::mpsc;
pub struct MpscEventReceiver<Event: GenericEvent + Send = EventU32> {
receiver: mpsc::Receiver<EventMessage<Event>>,
}
impl<Event: GenericEvent + Send> MpscEventReceiver<Event> {
pub fn new(receiver: mpsc::Receiver<EventMessage<Event>>) -> Self {
Self { receiver }
}
}
impl<Event: GenericEvent + Send> EventReceiveProvider<Event> for MpscEventReceiver<Event> {
impl<Event: GenericEvent + Send, ParamProvider: Debug>
EventReceiveProvider<Event, ParamProvider>
for mpsc::Receiver<EventMessage<Event, ParamProvider>>
{
type Error = GenericReceiveError;
fn try_recv_event(&self) -> Result<Option<EventMessage<Event>>, Self::Error> {
match self.receiver.try_recv() {
fn try_recv_event(
&self,
) -> Result<Option<EventMessage<Event, ParamProvider>>, Self::Error> {
match self.try_recv() {
Ok(msg) => Ok(Some(msg)),
Err(e) => match e {
mpsc::TryRecvError::Empty => Ok(None),
@@ -504,8 +505,10 @@ pub mod std_mod {
}
}
pub type MpscEventU32Receiver = MpscEventReceiver<EventU32>;
pub type MpscEventU16Receiver = MpscEventReceiver<EventU16>;
pub type EventU32ReceiverMpsc<ParamProvider = Params> =
mpsc::Receiver<EventMessage<EventU32, ParamProvider>>;
pub type EventU16ReceiverMpsc<ParamProvider = Params> =
mpsc::Receiver<EventMessage<EventU16, ParamProvider>>;
/// Generic event sender which uses a regular [mpsc::Sender] as the messaging backend to
/// send events.
@@ -624,9 +627,8 @@ mod tests {
}
fn generic_event_man() -> (mpsc::Sender<EventMessageU32>, EventManagerWithMpsc) {
let (event_sender, manager_queue) = mpsc::channel();
let event_man_receiver = MpscEventReceiver::new(manager_queue);
(event_sender, EventManager::new(event_man_receiver))
let (event_sender, event_receiver) = mpsc::channel();
(event_sender, EventManager::new(event_receiver))
}
#[test]
@@ -793,9 +795,8 @@ mod tests {
let error_handler = |event_msg: &EventMessageU32, e: EventRoutingError| {
panic!("routing error occurred for event {:?}: {:?}", event_msg, e);
};
let (event_sender, manager_queue) = mpsc::channel();
let event_man_receiver = MpscEventReceiver::new(manager_queue);
let mut event_man = EventManagerWithMpsc::new(event_man_receiver);
let (event_sender, event_receiver) = mpsc::channel();
let mut event_man = EventManagerWithMpsc::new(event_receiver);
let event_0 = EventU32::new(Severity::INFO, 0, 5).unwrap();
let event_1 = EventU32::new(Severity::HIGH, 1, 0).unwrap();
let (event_0_tx_0, all_events_rx) = mpsc::channel();

View File

@@ -9,8 +9,6 @@ use mio::{Events, Interest, Poll, Token};
use socket2::{Domain, Socket, Type};
use std::io::{self, Read};
use std::net::SocketAddr;
// use std::net::TcpListener;
// use std::net::{SocketAddr, TcpStream};
use std::thread;
use crate::tmtc::{PacketSenderRaw, PacketSource};
@@ -244,13 +242,16 @@ impl<
// Create a poll instance.
let poll = Poll::new()?;
// Create storage for events.
let events = Events::with_capacity(10);
let events = Events::with_capacity(32);
let listener: std::net::TcpListener = socket.into();
let mut mio_listener = TcpListener::from_std(listener);
// Start listening for incoming connections.
poll.registry()
.register(&mut mio_listener, Token(0), Interest::READABLE)?;
poll.registry().register(
&mut mio_listener,
Token(0),
Interest::READABLE | Interest::WRITABLE,
)?;
Ok(Self {
id: cfg.id,
@@ -280,11 +281,11 @@ impl<
self.listener.local_addr()
}
/// This call is used to handle the next connection to a client. Right now, it performs
/// This call is used to handle all connection from clients. Right now, it performs
/// the following steps:
///
/// 1. It calls the [std::net::TcpListener::accept] method internally using the blocking API
/// until a client connects.
/// 1. It calls the [std::net::TcpListener::accept] method until a client connects. An optional
/// timeout can be specified for non-blocking acceptance.
/// 2. It reads all the telecommands from the client and parses all received data using the
/// user specified [TcpTcParser].
/// 3. After reading and parsing all telecommands, it sends back all telemetry using the
@@ -317,11 +318,17 @@ impl<
loop {
match self.listener.accept() {
Ok((stream, addr)) => {
self.handle_accepted_connection(stream, addr)?;
if let Err(e) = self.handle_accepted_connection(stream, addr) {
self.reregister_poll_interest()?;
return Err(e);
}
handled_connections += 1;
}
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => break,
Err(err) => return Err(TcpTmtcError::Io(err)),
Err(err) => {
self.reregister_poll_interest()?;
return Err(TcpTmtcError::Io(err));
}
}
}
}
@@ -331,6 +338,14 @@ impl<
Ok(ConnectionResult::AcceptTimeout)
}
fn reregister_poll_interest(&mut self) -> io::Result<()> {
self.poll.registry().reregister(
&mut self.listener,
Token(0),
Interest::READABLE | Interest::WRITABLE,
)
}
fn handle_accepted_connection(
&mut self,
mut stream: TcpStream,

View File

@@ -26,14 +26,19 @@ impl<T: SpacePacketValidator, TmError, TcError: 'static> TcpTcParser<TmError, Tc
next_write_idx: &mut usize,
) -> Result<(), TcpTmtcError<TmError, TcError>> {
// Reader vec full, need to parse for packets.
conn_result.num_received_tcs += parse_buffer_for_ccsds_space_packets(
&mut tc_buffer[..current_write_idx],
let parse_result = parse_buffer_for_ccsds_space_packets(
&tc_buffer[..current_write_idx],
self,
sender_id,
tc_sender,
next_write_idx,
)
.map_err(|e| TcpTmtcError::TcError(e))?;
if let Some(broken_tail_start) = parse_result.incomplete_tail_start {
// Copy broken tail to front of buffer.
tc_buffer.copy_within(broken_tail_start..current_write_idx, 0);
*next_write_idx = current_write_idx - broken_tail_start;
}
conn_result.num_received_tcs += parse_result.packets_found;
Ok(())
}
}

View File

@@ -7,11 +7,9 @@ use crate::{
use satrs_shared::res_code::ResultU16;
#[cfg(feature = "std")]
#[cfg_attr(doc_cfg, doc(cfg(feature = "std")))]
pub use std_mod::*;
#[cfg(feature = "alloc")]
#[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))]
#[allow(unused_imports)]
pub use alloc_mod::*;
@@ -65,7 +63,6 @@ impl GenericActionReplyPus {
}
#[cfg(feature = "alloc")]
#[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))]
pub mod alloc_mod {
use crate::{
action::ActionRequest,
@@ -127,7 +124,6 @@ pub mod alloc_mod {
}
#[cfg(feature = "std")]
#[cfg_attr(doc_cfg, doc(cfg(feature = "std")))]
pub mod std_mod {
use std::sync::mpsc;

View File

@@ -13,10 +13,8 @@ use crate::pus::verification::TcStateToken;
use crate::pus::EcssTmSender;
use crate::pus::EcssTmtcError;
#[cfg(feature = "alloc")]
#[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))]
pub use alloc_mod::*;
#[cfg(feature = "heapless")]
#[cfg_attr(doc_cfg, doc(cfg(feature = "heapless")))]
pub use heapless_mod::*;
/// This trait allows the PUS event manager implementation to stay generic over various types
@@ -30,7 +28,7 @@ pub use heapless_mod::*;
/// structure to track disabled events. A more primitive and embedded friendly
/// solution could track this information in a static or pre-allocated list which contains
/// the disabled events.
pub trait PusEventMgmtBackendProvider<Event: GenericEvent> {
pub trait PusEventReportingMapProvider<Event: GenericEvent> {
type Error;
fn event_enabled(&self, event: &Event) -> bool;
@@ -44,7 +42,6 @@ pub mod heapless_mod {
use crate::events::LargestEventRaw;
use core::marker::PhantomData;
#[cfg_attr(doc_cfg, doc(cfg(feature = "heapless")))]
// TODO: After a new version of heapless is released which uses hash32 version 0.3, try using
// regular Event type again.
#[derive(Default)]
@@ -59,7 +56,7 @@ pub mod heapless_mod {
{
}
impl<const N: usize, Provider: GenericEvent> PusEventMgmtBackendProvider<Provider>
impl<const N: usize, Provider: GenericEvent> PusEventReportingMapProvider<Provider>
for HeaplessPusMgmtBackendProvider<N, Provider>
{
type Error = ();
@@ -108,20 +105,23 @@ impl From<EcssTmtcError> for EventManError {
pub mod alloc_mod {
use core::marker::PhantomData;
use crate::events::EventU16;
use crate::{
events::EventU16,
pus::event::{DummyEventHook, EventTmHookProvider},
};
use super::*;
/// Default backend provider which uses a hash set as the event reporting status container
/// like mentioned in the example of the [PusEventMgmtBackendProvider] documentation.
/// like mentioned in the example of the [PusEventReportingMapProvider] documentation.
///
/// This provider is a good option for host systems or larger embedded systems where
/// the expected occasional memory allocation performed by the [HashSet] is not an issue.
pub struct DefaultPusEventMgmtBackend<Event: GenericEvent = EventU32> {
pub struct DefaultPusEventReportingMap<Event: GenericEvent = EventU32> {
disabled: HashSet<Event>,
}
impl<Event: GenericEvent> Default for DefaultPusEventMgmtBackend<Event> {
impl<Event: GenericEvent> Default for DefaultPusEventReportingMap<Event> {
fn default() -> Self {
Self {
disabled: HashSet::default(),
@@ -129,51 +129,54 @@ pub mod alloc_mod {
}
}
impl<EV: GenericEvent + PartialEq + Eq + Hash + Copy + Clone> PusEventMgmtBackendProvider<EV>
for DefaultPusEventMgmtBackend<EV>
impl<Event: GenericEvent + PartialEq + Eq + Hash + Copy + Clone>
PusEventReportingMapProvider<Event> for DefaultPusEventReportingMap<Event>
{
type Error = ();
fn event_enabled(&self, event: &EV) -> bool {
fn event_enabled(&self, event: &Event) -> bool {
!self.disabled.contains(event)
}
fn enable_event_reporting(&mut self, event: &EV) -> Result<bool, Self::Error> {
fn enable_event_reporting(&mut self, event: &Event) -> Result<bool, Self::Error> {
Ok(self.disabled.remove(event))
}
fn disable_event_reporting(&mut self, event: &EV) -> Result<bool, Self::Error> {
fn disable_event_reporting(&mut self, event: &Event) -> Result<bool, Self::Error> {
Ok(self.disabled.insert(*event))
}
}
pub struct PusEventDispatcher<
B: PusEventMgmtBackendProvider<EV, Error = E>,
EV: GenericEvent,
E,
pub struct PusEventTmCreatorWithMap<
ReportingMap: PusEventReportingMapProvider<Event>,
Event: GenericEvent,
EventTmHook: EventTmHookProvider = DummyEventHook,
> {
reporter: EventReporter,
backend: B,
phantom: PhantomData<(E, EV)>,
pub reporter: EventReporter<EventTmHook>,
reporting_map: ReportingMap,
phantom: PhantomData<Event>,
}
impl<B: PusEventMgmtBackendProvider<Event, Error = E>, Event: GenericEvent, E>
PusEventDispatcher<B, Event, E>
impl<
ReportingMap: PusEventReportingMapProvider<Event>,
Event: GenericEvent,
EventTmHook: EventTmHookProvider,
> PusEventTmCreatorWithMap<ReportingMap, Event, EventTmHook>
{
pub fn new(reporter: EventReporter, backend: B) -> Self {
pub fn new(reporter: EventReporter<EventTmHook>, backend: ReportingMap) -> Self {
Self {
reporter,
backend,
reporting_map: backend,
phantom: PhantomData,
}
}
pub fn enable_tm_for_event(&mut self, event: &Event) -> Result<bool, E> {
self.backend.enable_event_reporting(event)
pub fn enable_tm_for_event(&mut self, event: &Event) -> Result<bool, ReportingMap::Error> {
self.reporting_map.enable_event_reporting(event)
}
pub fn disable_tm_for_event(&mut self, event: &Event) -> Result<bool, E> {
self.backend.disable_event_reporting(event)
pub fn disable_tm_for_event(&mut self, event: &Event) -> Result<bool, ReportingMap::Error> {
self.reporting_map.disable_event_reporting(event)
}
pub fn generate_pus_event_tm_generic(
@@ -183,7 +186,7 @@ pub mod alloc_mod {
event: Event,
params: Option<&[u8]>,
) -> Result<bool, EventManError> {
if !self.backend.event_enabled(&event) {
if !self.reporting_map.event_enabled(&event) {
return Ok(false);
}
match event.severity() {
@@ -211,31 +214,33 @@ pub mod alloc_mod {
}
}
impl<EV: GenericEvent + Copy + PartialEq + Eq + Hash>
PusEventDispatcher<DefaultPusEventMgmtBackend<EV>, EV, ()>
impl<Event: GenericEvent + Copy + PartialEq + Eq + Hash, EventTmHook: EventTmHookProvider>
PusEventTmCreatorWithMap<DefaultPusEventReportingMap<Event>, Event, EventTmHook>
{
pub fn new_with_default_backend(reporter: EventReporter) -> Self {
pub fn new_with_default_backend(reporter: EventReporter<EventTmHook>) -> Self {
Self {
reporter,
backend: DefaultPusEventMgmtBackend::default(),
reporting_map: DefaultPusEventReportingMap::default(),
phantom: PhantomData,
}
}
}
impl<B: PusEventMgmtBackendProvider<EventU32, Error = E>, E> PusEventDispatcher<B, EventU32, E> {
impl<ReportingMap: PusEventReportingMapProvider<EventU32>>
PusEventTmCreatorWithMap<ReportingMap, EventU32>
{
pub fn enable_tm_for_event_with_sev<Severity: HasSeverity>(
&mut self,
event: &EventU32TypedSev<Severity>,
) -> Result<bool, E> {
self.backend.enable_event_reporting(event.as_ref())
) -> Result<bool, ReportingMap::Error> {
self.reporting_map.enable_event_reporting(event.as_ref())
}
pub fn disable_tm_for_event_with_sev<Severity: HasSeverity>(
&mut self,
event: &EventU32TypedSev<Severity>,
) -> Result<bool, E> {
self.backend.disable_event_reporting(event.as_ref())
) -> Result<bool, ReportingMap::Error> {
self.reporting_map.disable_event_reporting(event.as_ref())
}
pub fn generate_pus_event_tm<Severity: HasSeverity>(
@@ -249,10 +254,10 @@ pub mod alloc_mod {
}
}
pub type DefaultPusEventU16Dispatcher<E> =
PusEventDispatcher<DefaultPusEventMgmtBackend<EventU16>, EventU16, E>;
pub type DefaultPusEventU32Dispatcher<E> =
PusEventDispatcher<DefaultPusEventMgmtBackend<EventU32>, EventU32, E>;
pub type DefaultPusEventU16TmCreator<EventTmHook = DummyEventHook> =
PusEventTmCreatorWithMap<DefaultPusEventReportingMap<EventU16>, EventU16, EventTmHook>;
pub type DefaultPusEventU32TmCreator<EventTmHook = DummyEventHook> =
PusEventTmCreatorWithMap<DefaultPusEventReportingMap<EventU32>, EventU32, EventTmHook>;
}
#[cfg(test)]
mod tests {
@@ -268,16 +273,16 @@ mod tests {
const TEST_APID: u16 = 0x02;
const TEST_ID: UniqueApidTargetId = UniqueApidTargetId::new(TEST_APID, 0x05);
fn create_basic_man_1() -> DefaultPusEventU32Dispatcher<()> {
fn create_basic_man_1() -> DefaultPusEventU32TmCreator {
let reporter = EventReporter::new(TEST_ID.raw(), TEST_APID, 0, 128)
.expect("Creating event repoter failed");
PusEventDispatcher::new_with_default_backend(reporter)
PusEventTmCreatorWithMap::new_with_default_backend(reporter)
}
fn create_basic_man_2() -> DefaultPusEventU32Dispatcher<()> {
fn create_basic_man_2() -> DefaultPusEventU32TmCreator {
let reporter = EventReporter::new(TEST_ID.raw(), TEST_APID, 0, 128)
.expect("Creating event repoter failed");
let backend = DefaultPusEventMgmtBackend::default();
PusEventDispatcher::new(reporter, backend)
let backend = DefaultPusEventReportingMap::default();
PusEventTmCreatorWithMap::new(reporter, backend)
}
#[test]

View File

@@ -368,7 +368,6 @@ pub mod alloc_mod {
/// [DynClone] allows cloning the trait object as long as the boxed object implements
/// [Clone].
#[cfg(feature = "alloc")]
#[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))]
pub trait EcssTmSenderExt: EcssTmSender + Downcast + DynClone {
// Remove this once trait upcasting coercion has been implemented.
// Tracking issue: https://github.com/rust-lang/rust/issues/65991
@@ -409,7 +408,6 @@ pub mod alloc_mod {
/// [DynClone] allows cloning the trait object as long as the boxed object implements
/// [Clone].
#[cfg(feature = "alloc")]
#[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))]
pub trait EcssTcSenderExt: EcssTcSender + Downcast + DynClone {}
/// Blanket implementation for all types which implement [EcssTcSender] and are clonable.
@@ -429,7 +427,6 @@ pub mod alloc_mod {
/// [DynClone] allows cloning the trait object as long as the boxed object implements
/// [Clone].
#[cfg(feature = "alloc")]
#[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))]
pub trait EcssTcReceiverExt: EcssTcReceiver + Downcast {}
/// Blanket implementation for all types which implement [EcssTcReceiver] and are clonable.
@@ -551,7 +548,6 @@ pub mod alloc_mod {
>
{
#[cfg(feature = "std")]
#[cfg_attr(doc_cfg, doc(cfg(feature = "std")))]
pub fn new_from_now(
active_request_map: ActiveRequestMap,
fail_data_buf_size: usize,
@@ -638,7 +634,6 @@ pub mod alloc_mod {
/// Update the current time used for timeout checks based on the current OS time.
#[cfg(feature = "std")]
#[cfg_attr(doc_cfg, doc(cfg(feature = "std")))]
pub fn update_time_from_now(&mut self) -> Result<(), std::time::SystemTimeError> {
self.current_time = UnixTimestamp::from_now()?;
Ok(())
@@ -653,7 +648,6 @@ pub mod alloc_mod {
}
#[cfg(feature = "std")]
#[cfg_attr(doc_cfg, doc(cfg(feature = "std")))]
pub mod std_mod {
use crate::pool::{
PoolAddr, PoolError, PoolProvider, PoolProviderWithGuards, SharedStaticMemoryPool,

View File

@@ -26,11 +26,9 @@ pub enum Subservice {
}
#[cfg(feature = "alloc")]
#[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))]
pub mod alloc_mod {}
#[cfg(feature = "alloc")]
#[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))]
pub mod std_mod {}
#[cfg(test)]

View File

@@ -423,7 +423,6 @@ pub mod alloc_mod {
/// Like [Self::new], but sets the `init_current_time` parameter to the current system time.
#[cfg(feature = "std")]
#[cfg_attr(doc_cfg, doc(cfg(feature = "std")))]
pub fn new_with_current_init_time(time_margin: Duration) -> Result<Self, SystemTimeError> {
Ok(Self::new(UnixTime::now()?, time_margin))
}
@@ -667,7 +666,6 @@ pub mod alloc_mod {
}
#[cfg(feature = "std")]
#[cfg_attr(doc_cfg, doc(cfg(feature = "std")))]
pub fn update_time_from_now(&mut self) -> Result<(), SystemTimeError> {
self.current_time = UnixTime::now()?;
Ok(())

View File

@@ -98,18 +98,11 @@ pub use crate::seq_count::SeqCountProviderSimple;
pub use spacepackets::ecss::verification::*;
#[cfg(feature = "alloc")]
#[cfg_attr(feature = "doc_cfg", doc(cfg(feature = "alloc")))]
pub use alloc_mod::*;
use crate::request::Apid;
use crate::ComponentId;
/*
#[cfg(feature = "std")]
#[cfg_attr(feature = "doc_cfg", doc(cfg(feature = "std")))]
pub use std_mod::*;
*/
/// This is a request identifier as specified in 5.4.11.2 c. of the PUS standard.
///
/// This field equivalent to the first two bytes of the CCSDS space packet header.

View File

@@ -1,11 +1,11 @@
use satrs::event_man::{
EventManagerWithMpsc, EventMessage, EventMessageU32, EventRoutingError, EventSendProvider,
EventU32SenderMpsc, MpscEventU32Receiver,
EventU32SenderMpsc,
};
use satrs::events::{EventU32, EventU32TypedSev, Severity, SeverityInfo};
use satrs::params::U32Pair;
use satrs::params::{Params, ParamsHeapless, WritableToBeBytes};
use satrs::pus::event_man::{DefaultPusEventMgmtBackend, EventReporter, PusEventDispatcher};
use satrs::pus::event_man::{DefaultPusEventReportingMap, EventReporter, PusEventTmCreatorWithMap};
use satrs::pus::test_util::TEST_COMPONENT_ID_0;
use satrs::request::UniqueApidTargetId;
use satrs::tmtc::PacketAsVec;
@@ -29,18 +29,18 @@ pub enum CustomTmSenderError {
#[test]
fn test_threaded_usage() {
let (event_sender, event_man_receiver) = mpsc::channel();
let event_receiver = MpscEventU32Receiver::new(event_man_receiver);
let mut event_man = EventManagerWithMpsc::new(event_receiver);
let (event_tx, event_rx) = mpsc::sync_channel(100);
let mut event_man = EventManagerWithMpsc::new(event_rx);
let (pus_event_man_tx, pus_event_man_rx) = mpsc::channel();
let pus_event_man_send_provider = EventU32SenderMpsc::new(1, pus_event_man_tx);
event_man.subscribe_all(pus_event_man_send_provider.target_id());
event_man.add_sender(pus_event_man_send_provider);
let (event_tx, event_rx) = mpsc::channel::<PacketAsVec>();
let (event_packet_tx, event_packet_rx) = mpsc::channel::<PacketAsVec>();
let reporter =
EventReporter::new(TEST_ID.raw(), 0x02, 0, 128).expect("Creating event reporter failed");
let pus_event_man = PusEventDispatcher::new(reporter, DefaultPusEventMgmtBackend::default());
let pus_event_man =
PusEventTmCreatorWithMap::new(reporter, DefaultPusEventReportingMap::default());
let error_handler = |event_msg: &EventMessageU32, error: EventRoutingError| {
panic!("received routing error for event {event_msg:?}: {error:?}");
};
@@ -54,7 +54,7 @@ fn test_threaded_usage() {
Ok(event_msg) => {
let gen_event = |aux_data| {
pus_event_man.generate_pus_event_tm_generic(
&event_tx,
&event_packet_tx,
&EMPTY_STAMP,
event_msg.event(),
aux_data,
@@ -100,14 +100,14 @@ fn test_threaded_usage() {
// Event sender and TM checker thread
let jh1 = thread::spawn(move || {
event_sender
event_tx
.send(EventMessage::new(
TEST_COMPONENT_ID_0.id(),
INFO_EVENT.into(),
))
.expect("Sending info event failed");
loop {
match event_rx.try_recv() {
match event_packet_rx.try_recv() {
// Event TM received successfully
Ok(event_tm) => {
let tm = PusTmReader::new(event_tm.packet.as_slice(), 7)
@@ -129,7 +129,7 @@ fn test_threaded_usage() {
}
}
}
event_sender
event_tx
.send(EventMessage::new_with_params(
TEST_COMPONENT_ID_0.id(),
LOW_SEV_EVENT,
@@ -137,7 +137,7 @@ fn test_threaded_usage() {
))
.expect("Sending low severity event failed");
loop {
match event_rx.try_recv() {
match event_packet_rx.try_recv() {
// Event TM received successfully
Ok(event_tm) => {
let tm = PusTmReader::new(event_tm.packet.as_slice(), 7)