Merge pull request 'Refactor and improve TCP servers' (#150) from refactor-tcp-server into main
All checks were successful
Rust/sat-rs/pipeline/head This commit looks good
All checks were successful
Rust/sat-rs/pipeline/head This commit looks good
Reviewed-on: #150
This commit is contained in:
commit
3375780e00
@ -225,7 +225,7 @@ fn static_tmtc_pool_main() {
|
|||||||
|
|
||||||
info!("Starting TMTC and UDP task");
|
info!("Starting TMTC and UDP task");
|
||||||
let jh_udp_tmtc = thread::Builder::new()
|
let jh_udp_tmtc = thread::Builder::new()
|
||||||
.name("TMTC and UDP".to_string())
|
.name("SATRS tmtc-udp".to_string())
|
||||||
.spawn(move || {
|
.spawn(move || {
|
||||||
info!("Running UDP server on port {SERVER_PORT}");
|
info!("Running UDP server on port {SERVER_PORT}");
|
||||||
loop {
|
loop {
|
||||||
@ -238,7 +238,7 @@ fn static_tmtc_pool_main() {
|
|||||||
|
|
||||||
info!("Starting TCP task");
|
info!("Starting TCP task");
|
||||||
let jh_tcp = thread::Builder::new()
|
let jh_tcp = thread::Builder::new()
|
||||||
.name("TCP".to_string())
|
.name("sat-rs tcp".to_string())
|
||||||
.spawn(move || {
|
.spawn(move || {
|
||||||
info!("Running TCP server on port {SERVER_PORT}");
|
info!("Running TCP server on port {SERVER_PORT}");
|
||||||
loop {
|
loop {
|
||||||
@ -257,7 +257,7 @@ fn static_tmtc_pool_main() {
|
|||||||
|
|
||||||
info!("Starting event handling task");
|
info!("Starting event handling task");
|
||||||
let jh_event_handling = thread::Builder::new()
|
let jh_event_handling = thread::Builder::new()
|
||||||
.name("Event".to_string())
|
.name("sat-rs events".to_string())
|
||||||
.spawn(move || loop {
|
.spawn(move || loop {
|
||||||
event_handler.periodic_operation();
|
event_handler.periodic_operation();
|
||||||
thread::sleep(Duration::from_millis(FREQ_MS_EVENT_HANDLING));
|
thread::sleep(Duration::from_millis(FREQ_MS_EVENT_HANDLING));
|
||||||
@ -266,7 +266,7 @@ fn static_tmtc_pool_main() {
|
|||||||
|
|
||||||
info!("Starting AOCS thread");
|
info!("Starting AOCS thread");
|
||||||
let jh_aocs = thread::Builder::new()
|
let jh_aocs = thread::Builder::new()
|
||||||
.name("AOCS".to_string())
|
.name("sat-rs aocs".to_string())
|
||||||
.spawn(move || loop {
|
.spawn(move || loop {
|
||||||
mgm_handler.periodic_operation();
|
mgm_handler.periodic_operation();
|
||||||
thread::sleep(Duration::from_millis(FREQ_MS_AOCS));
|
thread::sleep(Duration::from_millis(FREQ_MS_AOCS));
|
||||||
@ -275,7 +275,7 @@ fn static_tmtc_pool_main() {
|
|||||||
|
|
||||||
info!("Starting PUS handler thread");
|
info!("Starting PUS handler thread");
|
||||||
let jh_pus_handler = thread::Builder::new()
|
let jh_pus_handler = thread::Builder::new()
|
||||||
.name("PUS".to_string())
|
.name("sat-rs pus".to_string())
|
||||||
.spawn(move || loop {
|
.spawn(move || loop {
|
||||||
pus_stack.periodic_operation();
|
pus_stack.periodic_operation();
|
||||||
thread::sleep(Duration::from_millis(FREQ_MS_PUS_STACK));
|
thread::sleep(Duration::from_millis(FREQ_MS_PUS_STACK));
|
||||||
@ -444,7 +444,7 @@ fn dyn_tmtc_pool_main() {
|
|||||||
|
|
||||||
info!("Starting TMTC and UDP task");
|
info!("Starting TMTC and UDP task");
|
||||||
let jh_udp_tmtc = thread::Builder::new()
|
let jh_udp_tmtc = thread::Builder::new()
|
||||||
.name("TMTC and UDP".to_string())
|
.name("sat-rs tmtc-udp".to_string())
|
||||||
.spawn(move || {
|
.spawn(move || {
|
||||||
info!("Running UDP server on port {SERVER_PORT}");
|
info!("Running UDP server on port {SERVER_PORT}");
|
||||||
loop {
|
loop {
|
||||||
@ -457,7 +457,7 @@ fn dyn_tmtc_pool_main() {
|
|||||||
|
|
||||||
info!("Starting TCP task");
|
info!("Starting TCP task");
|
||||||
let jh_tcp = thread::Builder::new()
|
let jh_tcp = thread::Builder::new()
|
||||||
.name("TCP".to_string())
|
.name("sat-rs tcp".to_string())
|
||||||
.spawn(move || {
|
.spawn(move || {
|
||||||
info!("Running TCP server on port {SERVER_PORT}");
|
info!("Running TCP server on port {SERVER_PORT}");
|
||||||
loop {
|
loop {
|
||||||
@ -468,7 +468,7 @@ fn dyn_tmtc_pool_main() {
|
|||||||
|
|
||||||
info!("Starting TM funnel task");
|
info!("Starting TM funnel task");
|
||||||
let jh_tm_funnel = thread::Builder::new()
|
let jh_tm_funnel = thread::Builder::new()
|
||||||
.name("TM Funnel".to_string())
|
.name("sat-rs tm-funnel".to_string())
|
||||||
.spawn(move || loop {
|
.spawn(move || loop {
|
||||||
tm_funnel.operation();
|
tm_funnel.operation();
|
||||||
})
|
})
|
||||||
@ -476,7 +476,7 @@ fn dyn_tmtc_pool_main() {
|
|||||||
|
|
||||||
info!("Starting event handling task");
|
info!("Starting event handling task");
|
||||||
let jh_event_handling = thread::Builder::new()
|
let jh_event_handling = thread::Builder::new()
|
||||||
.name("Event".to_string())
|
.name("sat-rs events".to_string())
|
||||||
.spawn(move || loop {
|
.spawn(move || loop {
|
||||||
event_handler.periodic_operation();
|
event_handler.periodic_operation();
|
||||||
thread::sleep(Duration::from_millis(FREQ_MS_EVENT_HANDLING));
|
thread::sleep(Duration::from_millis(FREQ_MS_EVENT_HANDLING));
|
||||||
@ -485,7 +485,7 @@ fn dyn_tmtc_pool_main() {
|
|||||||
|
|
||||||
info!("Starting AOCS thread");
|
info!("Starting AOCS thread");
|
||||||
let jh_aocs = thread::Builder::new()
|
let jh_aocs = thread::Builder::new()
|
||||||
.name("AOCS".to_string())
|
.name("sat-rs aocs".to_string())
|
||||||
.spawn(move || loop {
|
.spawn(move || loop {
|
||||||
mgm_handler.periodic_operation();
|
mgm_handler.periodic_operation();
|
||||||
thread::sleep(Duration::from_millis(FREQ_MS_AOCS));
|
thread::sleep(Duration::from_millis(FREQ_MS_AOCS));
|
||||||
@ -494,7 +494,7 @@ fn dyn_tmtc_pool_main() {
|
|||||||
|
|
||||||
info!("Starting PUS handler thread");
|
info!("Starting PUS handler thread");
|
||||||
let jh_pus_handler = thread::Builder::new()
|
let jh_pus_handler = thread::Builder::new()
|
||||||
.name("PUS".to_string())
|
.name("sat-rs pus".to_string())
|
||||||
.spawn(move || loop {
|
.spawn(move || loop {
|
||||||
pus_stack.periodic_operation();
|
pus_stack.periodic_operation();
|
||||||
thread::sleep(Duration::from_millis(FREQ_MS_PUS_STACK));
|
thread::sleep(Duration::from_millis(FREQ_MS_PUS_STACK));
|
||||||
|
@ -267,7 +267,7 @@ impl<TmSender: EcssTmSenderCore, TcInMemConverter: EcssTcInMemConverter> Targete
|
|||||||
for ActionServiceWrapper<TmSender, TcInMemConverter>
|
for ActionServiceWrapper<TmSender, TcInMemConverter>
|
||||||
{
|
{
|
||||||
/// Returns [true] if the packet handling is finished.
|
/// Returns [true] if the packet handling is finished.
|
||||||
fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> bool {
|
fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus {
|
||||||
match self.service.poll_and_handle_next_tc(time_stamp) {
|
match self.service.poll_and_handle_next_tc(time_stamp) {
|
||||||
Ok(result) => match result {
|
Ok(result) => match result {
|
||||||
PusPacketHandlerResult::RequestHandled => {}
|
PusPacketHandlerResult::RequestHandled => {}
|
||||||
@ -280,15 +280,13 @@ impl<TmSender: EcssTmSenderCore, TcInMemConverter: EcssTcInMemConverter> Targete
|
|||||||
PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
|
PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
|
||||||
warn!("PUS 8 subservice {subservice} not implemented");
|
warn!("PUS 8 subservice {subservice} not implemented");
|
||||||
}
|
}
|
||||||
PusPacketHandlerResult::Empty => {
|
PusPacketHandlerResult::Empty => return HandlingStatus::Empty,
|
||||||
return true;
|
|
||||||
}
|
|
||||||
},
|
},
|
||||||
Err(error) => {
|
Err(error) => {
|
||||||
error!("PUS packet handling error: {error:?}")
|
error!("PUS packet handling error: {error:?}")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
false
|
HandlingStatus::HandledOne
|
||||||
}
|
}
|
||||||
|
|
||||||
fn poll_and_handle_next_reply(&mut self, time_stamp: &[u8]) -> HandlingStatus {
|
fn poll_and_handle_next_reply(&mut self, time_stamp: &[u8]) -> HandlingStatus {
|
||||||
|
@ -13,6 +13,8 @@ use satrs::pus::{
|
|||||||
};
|
};
|
||||||
use satrs_example::config::components::PUS_EVENT_MANAGEMENT;
|
use satrs_example::config::components::PUS_EVENT_MANAGEMENT;
|
||||||
|
|
||||||
|
use super::HandlingStatus;
|
||||||
|
|
||||||
pub fn create_event_service_static(
|
pub fn create_event_service_static(
|
||||||
tm_sender: TmInSharedPoolSender<mpsc::SyncSender<PusTmInPool>>,
|
tm_sender: TmInSharedPoolSender<mpsc::SyncSender<PusTmInPool>>,
|
||||||
tc_pool: SharedStaticMemoryPool,
|
tc_pool: SharedStaticMemoryPool,
|
||||||
@ -62,7 +64,7 @@ pub struct EventServiceWrapper<TmSender: EcssTmSenderCore, TcInMemConverter: Ecs
|
|||||||
impl<TmSender: EcssTmSenderCore, TcInMemConverter: EcssTcInMemConverter>
|
impl<TmSender: EcssTmSenderCore, TcInMemConverter: EcssTcInMemConverter>
|
||||||
EventServiceWrapper<TmSender, TcInMemConverter>
|
EventServiceWrapper<TmSender, TcInMemConverter>
|
||||||
{
|
{
|
||||||
pub fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> bool {
|
pub fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus {
|
||||||
match self.handler.poll_and_handle_next_tc(time_stamp) {
|
match self.handler.poll_and_handle_next_tc(time_stamp) {
|
||||||
Ok(result) => match result {
|
Ok(result) => match result {
|
||||||
PusPacketHandlerResult::RequestHandled => {}
|
PusPacketHandlerResult::RequestHandled => {}
|
||||||
@ -75,14 +77,12 @@ impl<TmSender: EcssTmSenderCore, TcInMemConverter: EcssTcInMemConverter>
|
|||||||
PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
|
PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
|
||||||
warn!("PUS 5 subservice {subservice} not implemented");
|
warn!("PUS 5 subservice {subservice} not implemented");
|
||||||
}
|
}
|
||||||
PusPacketHandlerResult::Empty => {
|
PusPacketHandlerResult::Empty => return HandlingStatus::Empty,
|
||||||
return true;
|
|
||||||
}
|
|
||||||
},
|
},
|
||||||
Err(error) => {
|
Err(error) => {
|
||||||
error!("PUS packet handling error: {error:?}")
|
error!("PUS packet handling error: {error:?}")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
false
|
HandlingStatus::HandledOne
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -300,7 +300,7 @@ pub struct HkServiceWrapper<TmSender: EcssTmSenderCore, TcInMemConverter: EcssTc
|
|||||||
impl<TmSender: EcssTmSenderCore, TcInMemConverter: EcssTcInMemConverter>
|
impl<TmSender: EcssTmSenderCore, TcInMemConverter: EcssTcInMemConverter>
|
||||||
HkServiceWrapper<TmSender, TcInMemConverter>
|
HkServiceWrapper<TmSender, TcInMemConverter>
|
||||||
{
|
{
|
||||||
pub fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> bool {
|
pub fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus {
|
||||||
match self.service.poll_and_handle_next_tc(time_stamp) {
|
match self.service.poll_and_handle_next_tc(time_stamp) {
|
||||||
Ok(result) => match result {
|
Ok(result) => match result {
|
||||||
PusPacketHandlerResult::RequestHandled => {}
|
PusPacketHandlerResult::RequestHandled => {}
|
||||||
@ -313,15 +313,13 @@ impl<TmSender: EcssTmSenderCore, TcInMemConverter: EcssTcInMemConverter>
|
|||||||
PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
|
PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
|
||||||
warn!("PUS 3 subservice {subservice} not implemented");
|
warn!("PUS 3 subservice {subservice} not implemented");
|
||||||
}
|
}
|
||||||
PusPacketHandlerResult::Empty => {
|
PusPacketHandlerResult::Empty => return HandlingStatus::Empty,
|
||||||
return true;
|
|
||||||
}
|
|
||||||
},
|
},
|
||||||
Err(error) => {
|
Err(error) => {
|
||||||
error!("PUS packet handling error: {error:?}")
|
error!("PUS packet handling error: {error:?}")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
false
|
HandlingStatus::HandledOne
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn poll_and_handle_next_reply(&mut self, time_stamp: &[u8]) -> HandlingStatus {
|
pub fn poll_and_handle_next_reply(&mut self, time_stamp: &[u8]) -> HandlingStatus {
|
||||||
|
@ -157,7 +157,7 @@ impl<TmSender: EcssTmSenderCore> PusReceiver<TmSender> {
|
|||||||
|
|
||||||
pub trait TargetedPusService {
|
pub trait TargetedPusService {
|
||||||
/// Returns [true] if the packet handling is finished.
|
/// Returns [true] if the packet handling is finished.
|
||||||
fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> bool;
|
fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus;
|
||||||
fn poll_and_handle_next_reply(&mut self, time_stamp: &[u8]) -> HandlingStatus;
|
fn poll_and_handle_next_reply(&mut self, time_stamp: &[u8]) -> HandlingStatus;
|
||||||
fn check_for_request_timeouts(&mut self);
|
fn check_for_request_timeouts(&mut self);
|
||||||
}
|
}
|
||||||
|
@ -272,7 +272,7 @@ impl<TmSender: EcssTmSenderCore, TcInMemConverter: EcssTcInMemConverter> Targete
|
|||||||
for ModeServiceWrapper<TmSender, TcInMemConverter>
|
for ModeServiceWrapper<TmSender, TcInMemConverter>
|
||||||
{
|
{
|
||||||
/// Returns [true] if the packet handling is finished.
|
/// Returns [true] if the packet handling is finished.
|
||||||
fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> bool {
|
fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus {
|
||||||
match self.service.poll_and_handle_next_tc(time_stamp) {
|
match self.service.poll_and_handle_next_tc(time_stamp) {
|
||||||
Ok(result) => match result {
|
Ok(result) => match result {
|
||||||
PusPacketHandlerResult::RequestHandled => {}
|
PusPacketHandlerResult::RequestHandled => {}
|
||||||
@ -285,15 +285,13 @@ impl<TmSender: EcssTmSenderCore, TcInMemConverter: EcssTcInMemConverter> Targete
|
|||||||
PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
|
PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
|
||||||
warn!("PUS mode service: {subservice} not implemented");
|
warn!("PUS mode service: {subservice} not implemented");
|
||||||
}
|
}
|
||||||
PusPacketHandlerResult::Empty => {
|
PusPacketHandlerResult::Empty => return HandlingStatus::Empty,
|
||||||
return true;
|
|
||||||
}
|
|
||||||
},
|
},
|
||||||
Err(error) => {
|
Err(error) => {
|
||||||
error!("PUS mode service: packet handling error: {error:?}")
|
error!("PUS mode service: packet handling error: {error:?}")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
false
|
HandlingStatus::HandledOne
|
||||||
}
|
}
|
||||||
|
|
||||||
fn poll_and_handle_next_reply(&mut self, time_stamp: &[u8]) -> HandlingStatus {
|
fn poll_and_handle_next_reply(&mut self, time_stamp: &[u8]) -> HandlingStatus {
|
||||||
|
@ -16,6 +16,8 @@ use satrs_example::config::components::PUS_SCHED_SERVICE;
|
|||||||
|
|
||||||
use crate::tmtc::PusTcSourceProviderSharedPool;
|
use crate::tmtc::PusTcSourceProviderSharedPool;
|
||||||
|
|
||||||
|
use super::HandlingStatus;
|
||||||
|
|
||||||
pub trait TcReleaser {
|
pub trait TcReleaser {
|
||||||
fn release(&mut self, enabled: bool, info: &TcInfo, tc: &[u8]) -> bool;
|
fn release(&mut self, enabled: bool, info: &TcInfo, tc: &[u8]) -> bool;
|
||||||
}
|
}
|
||||||
@ -92,7 +94,7 @@ impl<TmSender: EcssTmSenderCore, TcInMemConverter: EcssTcInMemConverter>
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> bool {
|
pub fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus {
|
||||||
match self
|
match self
|
||||||
.pus_11_handler
|
.pus_11_handler
|
||||||
.poll_and_handle_next_tc(time_stamp, &mut self.sched_tc_pool)
|
.poll_and_handle_next_tc(time_stamp, &mut self.sched_tc_pool)
|
||||||
@ -108,15 +110,13 @@ impl<TmSender: EcssTmSenderCore, TcInMemConverter: EcssTcInMemConverter>
|
|||||||
PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
|
PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
|
||||||
warn!("PUS11: Subservice {subservice} not implemented");
|
warn!("PUS11: Subservice {subservice} not implemented");
|
||||||
}
|
}
|
||||||
PusPacketHandlerResult::Empty => {
|
PusPacketHandlerResult::Empty => return HandlingStatus::Empty,
|
||||||
return true;
|
|
||||||
}
|
|
||||||
},
|
},
|
||||||
Err(error) => {
|
Err(error) => {
|
||||||
error!("PUS packet handling error: {error:?}")
|
error!("PUS packet handling error: {error:?}")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
false
|
HandlingStatus::HandledOne
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -35,18 +35,29 @@ impl<TmSender: EcssTmSenderCore, TcInMemConverter: EcssTcInMemConverter>
|
|||||||
loop {
|
loop {
|
||||||
let mut nothing_to_do = true;
|
let mut nothing_to_do = true;
|
||||||
let mut is_srv_finished =
|
let mut is_srv_finished =
|
||||||
|tc_handling_done: bool, reply_handling_done: Option<HandlingStatus>| {
|
|_srv_id: u8,
|
||||||
if !tc_handling_done
|
tc_handling_done: HandlingStatus,
|
||||||
|
reply_handling_done: Option<HandlingStatus>| {
|
||||||
|
if tc_handling_done == HandlingStatus::HandledOne
|
||||||
|| (reply_handling_done.is_some()
|
|| (reply_handling_done.is_some()
|
||||||
&& reply_handling_done.unwrap() == HandlingStatus::Empty)
|
&& reply_handling_done.unwrap() == HandlingStatus::HandledOne)
|
||||||
{
|
{
|
||||||
nothing_to_do = false;
|
nothing_to_do = false;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
is_srv_finished(self.test_srv.poll_and_handle_next_packet(&time_stamp), None);
|
|
||||||
is_srv_finished(self.schedule_srv.poll_and_handle_next_tc(&time_stamp), None);
|
|
||||||
is_srv_finished(self.event_srv.poll_and_handle_next_tc(&time_stamp), None);
|
|
||||||
is_srv_finished(
|
is_srv_finished(
|
||||||
|
17,
|
||||||
|
self.test_srv.poll_and_handle_next_packet(&time_stamp),
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
is_srv_finished(
|
||||||
|
11,
|
||||||
|
self.schedule_srv.poll_and_handle_next_tc(&time_stamp),
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
is_srv_finished(5, self.event_srv.poll_and_handle_next_tc(&time_stamp), None);
|
||||||
|
is_srv_finished(
|
||||||
|
8,
|
||||||
self.action_srv_wrapper.poll_and_handle_next_tc(&time_stamp),
|
self.action_srv_wrapper.poll_and_handle_next_tc(&time_stamp),
|
||||||
Some(
|
Some(
|
||||||
self.action_srv_wrapper
|
self.action_srv_wrapper
|
||||||
@ -54,10 +65,12 @@ impl<TmSender: EcssTmSenderCore, TcInMemConverter: EcssTcInMemConverter>
|
|||||||
),
|
),
|
||||||
);
|
);
|
||||||
is_srv_finished(
|
is_srv_finished(
|
||||||
|
3,
|
||||||
self.hk_srv_wrapper.poll_and_handle_next_tc(&time_stamp),
|
self.hk_srv_wrapper.poll_and_handle_next_tc(&time_stamp),
|
||||||
Some(self.hk_srv_wrapper.poll_and_handle_next_reply(&time_stamp)),
|
Some(self.hk_srv_wrapper.poll_and_handle_next_reply(&time_stamp)),
|
||||||
);
|
);
|
||||||
is_srv_finished(
|
is_srv_finished(
|
||||||
|
200,
|
||||||
self.mode_srv.poll_and_handle_next_tc(&time_stamp),
|
self.mode_srv.poll_and_handle_next_tc(&time_stamp),
|
||||||
Some(self.mode_srv.poll_and_handle_next_reply(&time_stamp)),
|
Some(self.mode_srv.poll_and_handle_next_reply(&time_stamp)),
|
||||||
);
|
);
|
||||||
|
@ -18,6 +18,8 @@ use satrs_example::config::components::PUS_TEST_SERVICE;
|
|||||||
use satrs_example::config::{tmtc_err, TEST_EVENT};
|
use satrs_example::config::{tmtc_err, TEST_EVENT};
|
||||||
use std::sync::mpsc;
|
use std::sync::mpsc;
|
||||||
|
|
||||||
|
use super::HandlingStatus;
|
||||||
|
|
||||||
pub fn create_test_service_static(
|
pub fn create_test_service_static(
|
||||||
tm_sender: TmInSharedPoolSender<mpsc::SyncSender<PusTmInPool>>,
|
tm_sender: TmInSharedPoolSender<mpsc::SyncSender<PusTmInPool>>,
|
||||||
tc_pool: SharedStaticMemoryPool,
|
tc_pool: SharedStaticMemoryPool,
|
||||||
@ -67,11 +69,11 @@ pub struct TestCustomServiceWrapper<
|
|||||||
impl<TmSender: EcssTmSenderCore, TcInMemConverter: EcssTcInMemConverter>
|
impl<TmSender: EcssTmSenderCore, TcInMemConverter: EcssTcInMemConverter>
|
||||||
TestCustomServiceWrapper<TmSender, TcInMemConverter>
|
TestCustomServiceWrapper<TmSender, TcInMemConverter>
|
||||||
{
|
{
|
||||||
pub fn poll_and_handle_next_packet(&mut self, time_stamp: &[u8]) -> bool {
|
pub fn poll_and_handle_next_packet(&mut self, time_stamp: &[u8]) -> HandlingStatus {
|
||||||
let res = self.handler.poll_and_handle_next_tc(time_stamp);
|
let res = self.handler.poll_and_handle_next_tc(time_stamp);
|
||||||
if res.is_err() {
|
if res.is_err() {
|
||||||
warn!("PUS17 handler failed with error {:?}", res.unwrap_err());
|
warn!("PUS17 handler failed with error {:?}", res.unwrap_err());
|
||||||
return true;
|
return HandlingStatus::HandledOne;
|
||||||
}
|
}
|
||||||
match res.unwrap() {
|
match res.unwrap() {
|
||||||
PusPacketHandlerResult::RequestHandled => {
|
PusPacketHandlerResult::RequestHandled => {
|
||||||
@ -135,10 +137,8 @@ impl<TmSender: EcssTmSenderCore, TcInMemConverter: EcssTcInMemConverter>
|
|||||||
.expect("Sending start failure verification failed");
|
.expect("Sending start failure verification failed");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
PusPacketHandlerResult::Empty => {
|
PusPacketHandlerResult::Empty => return HandlingStatus::Empty,
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
false
|
HandlingStatus::HandledOne
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -5,7 +5,7 @@ use std::{
|
|||||||
|
|
||||||
use log::{info, warn};
|
use log::{info, warn};
|
||||||
use satrs::{
|
use satrs::{
|
||||||
hal::std::tcp_server::{ServerConfig, TcpSpacepacketsServer},
|
hal::std::tcp_server::{HandledConnectionHandler, ServerConfig, TcpSpacepacketsServer},
|
||||||
pus::ReceivesEcssPusTc,
|
pus::ReceivesEcssPusTc,
|
||||||
spacepackets::PacketId,
|
spacepackets::PacketId,
|
||||||
tmtc::{CcsdsDistributor, CcsdsError, ReceivesCcsdsTc, TmPacketSourceCore},
|
tmtc::{CcsdsDistributor, CcsdsError, ReceivesCcsdsTc, TmPacketSourceCore},
|
||||||
@ -13,6 +13,18 @@ use satrs::{
|
|||||||
|
|
||||||
use crate::ccsds::CcsdsReceiver;
|
use crate::ccsds::CcsdsReceiver;
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
pub struct ConnectionFinishedHandler {}
|
||||||
|
|
||||||
|
impl HandledConnectionHandler for ConnectionFinishedHandler {
|
||||||
|
fn handled_connection(&mut self, info: satrs::hal::std::tcp_server::HandledConnectionInfo) {
|
||||||
|
info!(
|
||||||
|
"Served {} TMs and {} TCs for client {:?}",
|
||||||
|
info.num_sent_tms, info.num_received_tcs, info.addr
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Default, Clone)]
|
#[derive(Default, Clone)]
|
||||||
pub struct SyncTcpTmSource {
|
pub struct SyncTcpTmSource {
|
||||||
tm_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
|
tm_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
|
||||||
@ -70,11 +82,12 @@ impl TmPacketSourceCore for SyncTcpTmSource {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub type TcpServerType<TcSource, MpscErrorType> = TcpSpacepacketsServer<
|
pub type TcpServerType<TcSource, MpscErrorType> = TcpSpacepacketsServer<
|
||||||
(),
|
|
||||||
CcsdsError<MpscErrorType>,
|
|
||||||
SyncTcpTmSource,
|
SyncTcpTmSource,
|
||||||
CcsdsDistributor<CcsdsReceiver<TcSource, MpscErrorType>, MpscErrorType>,
|
CcsdsDistributor<CcsdsReceiver<TcSource, MpscErrorType>, MpscErrorType>,
|
||||||
HashSet<PacketId>,
|
HashSet<PacketId>,
|
||||||
|
ConnectionFinishedHandler,
|
||||||
|
(),
|
||||||
|
CcsdsError<MpscErrorType>,
|
||||||
>;
|
>;
|
||||||
|
|
||||||
pub struct TcpTask<
|
pub struct TcpTask<
|
||||||
@ -109,6 +122,7 @@ impl<
|
|||||||
tm_source,
|
tm_source,
|
||||||
tc_receiver,
|
tc_receiver,
|
||||||
packet_id_lookup,
|
packet_id_lookup,
|
||||||
|
ConnectionFinishedHandler::default(),
|
||||||
None,
|
None,
|
||||||
)?,
|
)?,
|
||||||
})
|
})
|
||||||
@ -116,14 +130,9 @@ impl<
|
|||||||
|
|
||||||
pub fn periodic_operation(&mut self) {
|
pub fn periodic_operation(&mut self) {
|
||||||
loop {
|
loop {
|
||||||
let result = self.server.handle_next_connection();
|
let result = self.server.handle_next_connection(None);
|
||||||
match result {
|
match result {
|
||||||
Ok(conn_result) => {
|
Ok(_conn_result) => (),
|
||||||
info!(
|
|
||||||
"Served {} TMs and {} TCs for client {:?}",
|
|
||||||
conn_result.num_sent_tms, conn_result.num_received_tcs, conn_result.addr
|
|
||||||
);
|
|
||||||
}
|
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
warn!("TCP server error: {e:?}");
|
warn!("TCP server error: {e:?}");
|
||||||
}
|
}
|
||||||
|
@ -22,9 +22,13 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
|
|||||||
parameter type. This message also contains the sender ID which can be useful for debugging
|
parameter type. This message also contains the sender ID which can be useful for debugging
|
||||||
or application layer / FDIR logic.
|
or application layer / FDIR logic.
|
||||||
- Stop signal handling for the TCP servers.
|
- Stop signal handling for the TCP servers.
|
||||||
|
- TCP server now uses `mio` crate to allow non-blocking operation. The server can now handle
|
||||||
|
multiple connections at once, and the context information about handled transfers is
|
||||||
|
passed via a callback which is inserted as a generic as well.
|
||||||
|
|
||||||
## Changed
|
## Changed
|
||||||
|
|
||||||
|
- TCP server generics order. The error generics come last now.
|
||||||
- `encoding::ccsds::PacketIdValidator` renamed to `ValidatorU16Id`, which lives in the crate root.
|
- `encoding::ccsds::PacketIdValidator` renamed to `ValidatorU16Id`, which lives in the crate root.
|
||||||
It can be used for both CCSDS packet ID and CCSDS APID validation.
|
It can be used for both CCSDS packet ID and CCSDS APID validation.
|
||||||
- `EventManager::try_event_handling` not expects a mutable error handling closure instead of
|
- `EventManager::try_event_handling` not expects a mutable error handling closure instead of
|
||||||
|
@ -70,6 +70,11 @@ version = "0.5.4"
|
|||||||
features = ["all"]
|
features = ["all"]
|
||||||
optional = true
|
optional = true
|
||||||
|
|
||||||
|
[dependencies.mio]
|
||||||
|
version = "0.8"
|
||||||
|
features = ["os-poll", "net"]
|
||||||
|
optional = true
|
||||||
|
|
||||||
[dependencies.spacepackets]
|
[dependencies.spacepackets]
|
||||||
# git = "https://egit.irs.uni-stuttgart.de/rust/spacepackets.git"
|
# git = "https://egit.irs.uni-stuttgart.de/rust/spacepackets.git"
|
||||||
version = "0.11.0-rc.2"
|
version = "0.11.0-rc.2"
|
||||||
@ -104,7 +109,8 @@ std = [
|
|||||||
"spacepackets/std",
|
"spacepackets/std",
|
||||||
"num_enum/std",
|
"num_enum/std",
|
||||||
"thiserror",
|
"thiserror",
|
||||||
"socket2"
|
"socket2",
|
||||||
|
"mio"
|
||||||
]
|
]
|
||||||
alloc = [
|
alloc = [
|
||||||
"serde/alloc",
|
"serde/alloc",
|
||||||
|
@ -2,11 +2,11 @@ use alloc::sync::Arc;
|
|||||||
use alloc::vec;
|
use alloc::vec;
|
||||||
use cobs::encode;
|
use cobs::encode;
|
||||||
use core::sync::atomic::AtomicBool;
|
use core::sync::atomic::AtomicBool;
|
||||||
|
use core::time::Duration;
|
||||||
use delegate::delegate;
|
use delegate::delegate;
|
||||||
|
use mio::net::{TcpListener, TcpStream};
|
||||||
use std::io::Write;
|
use std::io::Write;
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::net::TcpListener;
|
|
||||||
use std::net::TcpStream;
|
|
||||||
use std::vec::Vec;
|
use std::vec::Vec;
|
||||||
|
|
||||||
use crate::encoding::parse_buffer_for_cobs_encoded_packets;
|
use crate::encoding::parse_buffer_for_cobs_encoded_packets;
|
||||||
@ -17,6 +17,9 @@ use crate::hal::std::tcp_server::{
|
|||||||
ConnectionResult, ServerConfig, TcpTcParser, TcpTmSender, TcpTmtcError, TcpTmtcGenericServer,
|
ConnectionResult, ServerConfig, TcpTcParser, TcpTmSender, TcpTmtcError, TcpTmtcGenericServer,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use super::tcp_server::HandledConnectionHandler;
|
||||||
|
use super::tcp_server::HandledConnectionInfo;
|
||||||
|
|
||||||
/// Concrete [TcpTcParser] implementation for the [TcpTmtcInCobsServer].
|
/// Concrete [TcpTcParser] implementation for the [TcpTmtcInCobsServer].
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
pub struct CobsTcParser {}
|
pub struct CobsTcParser {}
|
||||||
@ -26,7 +29,7 @@ impl<TmError, TcError: 'static> TcpTcParser<TmError, TcError> for CobsTcParser {
|
|||||||
&mut self,
|
&mut self,
|
||||||
tc_buffer: &mut [u8],
|
tc_buffer: &mut [u8],
|
||||||
tc_receiver: &mut (impl ReceivesTc<Error = TcError> + ?Sized),
|
tc_receiver: &mut (impl ReceivesTc<Error = TcError> + ?Sized),
|
||||||
conn_result: &mut ConnectionResult,
|
conn_result: &mut HandledConnectionInfo,
|
||||||
current_write_idx: usize,
|
current_write_idx: usize,
|
||||||
next_write_idx: &mut usize,
|
next_write_idx: &mut usize,
|
||||||
) -> Result<(), TcpTmtcError<TmError, TcError>> {
|
) -> Result<(), TcpTmtcError<TmError, TcError>> {
|
||||||
@ -60,7 +63,7 @@ impl<TmError, TcError> TcpTmSender<TmError, TcError> for CobsTmSender {
|
|||||||
&mut self,
|
&mut self,
|
||||||
tm_buffer: &mut [u8],
|
tm_buffer: &mut [u8],
|
||||||
tm_source: &mut (impl TmPacketSource<Error = TmError> + ?Sized),
|
tm_source: &mut (impl TmPacketSource<Error = TmError> + ?Sized),
|
||||||
conn_result: &mut ConnectionResult,
|
conn_result: &mut HandledConnectionInfo,
|
||||||
stream: &mut TcpStream,
|
stream: &mut TcpStream,
|
||||||
) -> Result<bool, TcpTmtcError<TmError, TcError>> {
|
) -> Result<bool, TcpTmtcError<TmError, TcError>> {
|
||||||
let mut tm_was_sent = false;
|
let mut tm_was_sent = false;
|
||||||
@ -112,21 +115,30 @@ impl<TmError, TcError> TcpTmSender<TmError, TcError> for CobsTmSender {
|
|||||||
/// The [TCP integration tests](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs/tests/tcp_servers.rs)
|
/// The [TCP integration tests](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs/tests/tcp_servers.rs)
|
||||||
/// test also serves as the example application for this module.
|
/// test also serves as the example application for this module.
|
||||||
pub struct TcpTmtcInCobsServer<
|
pub struct TcpTmtcInCobsServer<
|
||||||
TmError,
|
|
||||||
TcError: 'static,
|
|
||||||
TmSource: TmPacketSource<Error = TmError>,
|
TmSource: TmPacketSource<Error = TmError>,
|
||||||
TcReceiver: ReceivesTc<Error = TcError>,
|
TcReceiver: ReceivesTc<Error = TcError>,
|
||||||
|
HandledConnection: HandledConnectionHandler,
|
||||||
|
TmError,
|
||||||
|
TcError: 'static,
|
||||||
> {
|
> {
|
||||||
generic_server:
|
pub generic_server: TcpTmtcGenericServer<
|
||||||
TcpTmtcGenericServer<TmError, TcError, TmSource, TcReceiver, CobsTmSender, CobsTcParser>,
|
TmSource,
|
||||||
|
TcReceiver,
|
||||||
|
CobsTmSender,
|
||||||
|
CobsTcParser,
|
||||||
|
HandledConnection,
|
||||||
|
TmError,
|
||||||
|
TcError,
|
||||||
|
>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<
|
impl<
|
||||||
TmError: 'static,
|
|
||||||
TcError: 'static,
|
|
||||||
TmSource: TmPacketSource<Error = TmError>,
|
TmSource: TmPacketSource<Error = TmError>,
|
||||||
TcReceiver: ReceivesTc<Error = TcError>,
|
TcReceiver: ReceivesTc<Error = TcError>,
|
||||||
> TcpTmtcInCobsServer<TmError, TcError, TmSource, TcReceiver>
|
HandledConnection: HandledConnectionHandler,
|
||||||
|
TmError: 'static,
|
||||||
|
TcError: 'static,
|
||||||
|
> TcpTmtcInCobsServer<TmSource, TcReceiver, HandledConnection, TmError, TcError>
|
||||||
{
|
{
|
||||||
/// Create a new TCP TMTC server which exchanges TMTC packets encoded with
|
/// Create a new TCP TMTC server which exchanges TMTC packets encoded with
|
||||||
/// [COBS protocol](https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing).
|
/// [COBS protocol](https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing).
|
||||||
@ -142,6 +154,7 @@ impl<
|
|||||||
cfg: ServerConfig,
|
cfg: ServerConfig,
|
||||||
tm_source: TmSource,
|
tm_source: TmSource,
|
||||||
tc_receiver: TcReceiver,
|
tc_receiver: TcReceiver,
|
||||||
|
handled_connection: HandledConnection,
|
||||||
stop_signal: Option<Arc<AtomicBool>>,
|
stop_signal: Option<Arc<AtomicBool>>,
|
||||||
) -> Result<Self, std::io::Error> {
|
) -> Result<Self, std::io::Error> {
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
@ -151,6 +164,7 @@ impl<
|
|||||||
CobsTmSender::new(cfg.tm_buffer_size),
|
CobsTmSender::new(cfg.tm_buffer_size),
|
||||||
tm_source,
|
tm_source,
|
||||||
tc_receiver,
|
tc_receiver,
|
||||||
|
handled_connection,
|
||||||
stop_signal,
|
stop_signal,
|
||||||
)?,
|
)?,
|
||||||
})
|
})
|
||||||
@ -167,6 +181,7 @@ impl<
|
|||||||
/// Delegation to the [TcpTmtcGenericServer::handle_next_connection] call.
|
/// Delegation to the [TcpTmtcGenericServer::handle_next_connection] call.
|
||||||
pub fn handle_next_connection(
|
pub fn handle_next_connection(
|
||||||
&mut self,
|
&mut self,
|
||||||
|
poll_duration: Option<Duration>,
|
||||||
) -> Result<ConnectionResult, TcpTmtcError<TmError, TcError>>;
|
) -> Result<ConnectionResult, TcpTmtcError<TmError, TcError>>;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -181,15 +196,15 @@ mod tests {
|
|||||||
use std::{
|
use std::{
|
||||||
io::{Read, Write},
|
io::{Read, Write},
|
||||||
net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream},
|
net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream},
|
||||||
thread,
|
panic, thread,
|
||||||
time::Instant,
|
time::Instant,
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
encoding::tests::{INVERTED_PACKET, SIMPLE_PACKET},
|
encoding::tests::{INVERTED_PACKET, SIMPLE_PACKET},
|
||||||
hal::std::tcp_server::{
|
hal::std::tcp_server::{
|
||||||
tests::{SyncTcCacher, SyncTmSource},
|
tests::{ConnectionFinishedHandler, SyncTcCacher, SyncTmSource},
|
||||||
ServerConfig,
|
ConnectionResult, ServerConfig,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
use alloc::sync::Arc;
|
use alloc::sync::Arc;
|
||||||
@ -218,11 +233,12 @@ mod tests {
|
|||||||
tc_receiver: SyncTcCacher,
|
tc_receiver: SyncTcCacher,
|
||||||
tm_source: SyncTmSource,
|
tm_source: SyncTmSource,
|
||||||
stop_signal: Option<Arc<AtomicBool>>,
|
stop_signal: Option<Arc<AtomicBool>>,
|
||||||
) -> TcpTmtcInCobsServer<(), (), SyncTmSource, SyncTcCacher> {
|
) -> TcpTmtcInCobsServer<SyncTmSource, SyncTcCacher, ConnectionFinishedHandler, (), ()> {
|
||||||
TcpTmtcInCobsServer::new(
|
TcpTmtcInCobsServer::new(
|
||||||
ServerConfig::new(*addr, Duration::from_millis(2), 1024, 1024),
|
ServerConfig::new(*addr, Duration::from_millis(2), 1024, 1024),
|
||||||
tm_source,
|
tm_source,
|
||||||
tc_receiver,
|
tc_receiver,
|
||||||
|
ConnectionFinishedHandler::default(),
|
||||||
stop_signal,
|
stop_signal,
|
||||||
)
|
)
|
||||||
.expect("TCP server generation failed")
|
.expect("TCP server generation failed")
|
||||||
@ -242,13 +258,20 @@ mod tests {
|
|||||||
let set_if_done = conn_handled.clone();
|
let set_if_done = conn_handled.clone();
|
||||||
// Call the connection handler in separate thread, does block.
|
// Call the connection handler in separate thread, does block.
|
||||||
thread::spawn(move || {
|
thread::spawn(move || {
|
||||||
let result = tcp_server.handle_next_connection();
|
let result = tcp_server.handle_next_connection(Some(Duration::from_millis(100)));
|
||||||
if result.is_err() {
|
if result.is_err() {
|
||||||
panic!("handling connection failed: {:?}", result.unwrap_err());
|
panic!("handling connection failed: {:?}", result.unwrap_err());
|
||||||
}
|
}
|
||||||
let conn_result = result.unwrap();
|
let result = result.unwrap();
|
||||||
assert_eq!(conn_result.num_received_tcs, 1);
|
assert_eq!(result, ConnectionResult::HandledConnections(1));
|
||||||
assert_eq!(conn_result.num_sent_tms, 0);
|
tcp_server
|
||||||
|
.generic_server
|
||||||
|
.finished_handler
|
||||||
|
.check_last_connection(0, 1);
|
||||||
|
tcp_server
|
||||||
|
.generic_server
|
||||||
|
.finished_handler
|
||||||
|
.check_no_connections_left();
|
||||||
set_if_done.store(true, Ordering::Relaxed);
|
set_if_done.store(true, Ordering::Relaxed);
|
||||||
});
|
});
|
||||||
// Send TC to server now.
|
// Send TC to server now.
|
||||||
@ -299,13 +322,20 @@ mod tests {
|
|||||||
let set_if_done = conn_handled.clone();
|
let set_if_done = conn_handled.clone();
|
||||||
// Call the connection handler in separate thread, does block.
|
// Call the connection handler in separate thread, does block.
|
||||||
thread::spawn(move || {
|
thread::spawn(move || {
|
||||||
let result = tcp_server.handle_next_connection();
|
let result = tcp_server.handle_next_connection(Some(Duration::from_millis(100)));
|
||||||
if result.is_err() {
|
if result.is_err() {
|
||||||
panic!("handling connection failed: {:?}", result.unwrap_err());
|
panic!("handling connection failed: {:?}", result.unwrap_err());
|
||||||
}
|
}
|
||||||
let conn_result = result.unwrap();
|
let result = result.unwrap();
|
||||||
assert_eq!(conn_result.num_received_tcs, 2, "Not enough TCs received");
|
assert_eq!(result, ConnectionResult::HandledConnections(1));
|
||||||
assert_eq!(conn_result.num_sent_tms, 2, "Not enough TMs received");
|
tcp_server
|
||||||
|
.generic_server
|
||||||
|
.finished_handler
|
||||||
|
.check_last_connection(2, 2);
|
||||||
|
tcp_server
|
||||||
|
.generic_server
|
||||||
|
.finished_handler
|
||||||
|
.check_no_connections_left();
|
||||||
set_if_done.store(true, Ordering::Relaxed);
|
set_if_done.store(true, Ordering::Relaxed);
|
||||||
});
|
});
|
||||||
// Send TC to server now.
|
// Send TC to server now.
|
||||||
@ -389,6 +419,31 @@ mod tests {
|
|||||||
drop(tc_queue);
|
drop(tc_queue);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_server_accept_timeout() {
|
||||||
|
let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
|
||||||
|
let tc_receiver = SyncTcCacher::default();
|
||||||
|
let tm_source = SyncTmSource::default();
|
||||||
|
let mut tcp_server =
|
||||||
|
generic_tmtc_server(&auto_port_addr, tc_receiver.clone(), tm_source, None);
|
||||||
|
let start = Instant::now();
|
||||||
|
// Call the connection handler in separate thread, does block.
|
||||||
|
let thread_jh = thread::spawn(move || loop {
|
||||||
|
let result = tcp_server.handle_next_connection(Some(Duration::from_millis(20)));
|
||||||
|
if result.is_err() {
|
||||||
|
panic!("handling connection failed: {:?}", result.unwrap_err());
|
||||||
|
}
|
||||||
|
let result = result.unwrap();
|
||||||
|
if result == ConnectionResult::AcceptTimeout {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if Instant::now() - start > Duration::from_millis(100) {
|
||||||
|
panic!("regular stop signal handling failed");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
thread_jh.join().expect("thread join failed");
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_server_stop_signal() {
|
fn test_server_stop_signal() {
|
||||||
let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
|
let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
|
||||||
@ -401,17 +456,32 @@ mod tests {
|
|||||||
tm_source,
|
tm_source,
|
||||||
Some(stop_signal.clone()),
|
Some(stop_signal.clone()),
|
||||||
);
|
);
|
||||||
|
let dest_addr = tcp_server
|
||||||
|
.local_addr()
|
||||||
|
.expect("retrieving dest addr failed");
|
||||||
|
let stop_signal_copy = stop_signal.clone();
|
||||||
let start = Instant::now();
|
let start = Instant::now();
|
||||||
// Call the connection handler in separate thread, does block.
|
// Call the connection handler in separate thread, does block.
|
||||||
thread::spawn(move || loop {
|
let thread_jh = thread::spawn(move || loop {
|
||||||
let result = tcp_server.handle_next_connection();
|
let result = tcp_server.handle_next_connection(Some(Duration::from_millis(20)));
|
||||||
if result.is_err() {
|
if result.is_err() {
|
||||||
panic!("handling connection failed: {:?}", result.unwrap_err());
|
panic!("handling connection failed: {:?}", result.unwrap_err());
|
||||||
}
|
}
|
||||||
if Instant::now() - start > Duration::from_millis(50) {
|
let result = result.unwrap();
|
||||||
|
if result == ConnectionResult::AcceptTimeout {
|
||||||
|
panic!("unexpected accept timeout");
|
||||||
|
}
|
||||||
|
if stop_signal_copy.load(Ordering::Relaxed) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if Instant::now() - start > Duration::from_millis(100) {
|
||||||
panic!("regular stop signal handling failed");
|
panic!("regular stop signal handling failed");
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
// We connect but do not do anything.
|
||||||
|
let _stream = TcpStream::connect(dest_addr).expect("connecting to TCP server failed");
|
||||||
stop_signal.store(true, Ordering::Relaxed);
|
stop_signal.store(true, Ordering::Relaxed);
|
||||||
|
// No need to drop the connection, the stop signal should take take of everything.
|
||||||
|
thread_jh.join().expect("thread join failed");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -4,10 +4,13 @@ use alloc::vec;
|
|||||||
use alloc::vec::Vec;
|
use alloc::vec::Vec;
|
||||||
use core::sync::atomic::AtomicBool;
|
use core::sync::atomic::AtomicBool;
|
||||||
use core::time::Duration;
|
use core::time::Duration;
|
||||||
|
use mio::net::{TcpListener, TcpStream};
|
||||||
|
use mio::{Events, Interest, Poll, Token};
|
||||||
use socket2::{Domain, Socket, Type};
|
use socket2::{Domain, Socket, Type};
|
||||||
use std::io::Read;
|
use std::io::{self, Read};
|
||||||
use std::net::TcpListener;
|
use std::net::SocketAddr;
|
||||||
use std::net::{SocketAddr, TcpStream};
|
// use std::net::TcpListener;
|
||||||
|
// use std::net::{SocketAddr, TcpStream};
|
||||||
use std::thread;
|
use std::thread;
|
||||||
|
|
||||||
use crate::tmtc::{ReceivesTc, TmPacketSource};
|
use crate::tmtc::{ReceivesTc, TmPacketSource};
|
||||||
@ -81,11 +84,35 @@ pub enum TcpTmtcError<TmError, TcError> {
|
|||||||
|
|
||||||
/// Result of one connection attempt. Contains the client address if a connection was established,
|
/// Result of one connection attempt. Contains the client address if a connection was established,
|
||||||
/// in addition to the number of telecommands and telemetry packets exchanged.
|
/// in addition to the number of telecommands and telemetry packets exchanged.
|
||||||
#[derive(Debug, Default)]
|
#[derive(Debug, PartialEq, Eq)]
|
||||||
pub struct ConnectionResult {
|
pub enum ConnectionResult {
|
||||||
pub addr: Option<SocketAddr>,
|
AcceptTimeout,
|
||||||
|
HandledConnections(u32),
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct HandledConnectionInfo {
|
||||||
|
pub addr: SocketAddr,
|
||||||
pub num_received_tcs: u32,
|
pub num_received_tcs: u32,
|
||||||
pub num_sent_tms: u32,
|
pub num_sent_tms: u32,
|
||||||
|
/// The generic TCP server can be stopped using an external signal. If this happened, this
|
||||||
|
/// boolean will be set to true.
|
||||||
|
pub stopped_by_signal: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl HandledConnectionInfo {
|
||||||
|
pub fn new(addr: SocketAddr) -> Self {
|
||||||
|
Self {
|
||||||
|
addr,
|
||||||
|
num_received_tcs: 0,
|
||||||
|
num_sent_tms: 0,
|
||||||
|
stopped_by_signal: false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub trait HandledConnectionHandler {
|
||||||
|
fn handled_connection(&mut self, info: HandledConnectionInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Generic parser abstraction for an object which can parse for telecommands given a raw
|
/// Generic parser abstraction for an object which can parse for telecommands given a raw
|
||||||
@ -96,7 +123,7 @@ pub trait TcpTcParser<TmError, TcError> {
|
|||||||
&mut self,
|
&mut self,
|
||||||
tc_buffer: &mut [u8],
|
tc_buffer: &mut [u8],
|
||||||
tc_receiver: &mut (impl ReceivesTc<Error = TcError> + ?Sized),
|
tc_receiver: &mut (impl ReceivesTc<Error = TcError> + ?Sized),
|
||||||
conn_result: &mut ConnectionResult,
|
conn_result: &mut HandledConnectionInfo,
|
||||||
current_write_idx: usize,
|
current_write_idx: usize,
|
||||||
next_write_idx: &mut usize,
|
next_write_idx: &mut usize,
|
||||||
) -> Result<(), TcpTmtcError<TmError, TcError>>;
|
) -> Result<(), TcpTmtcError<TmError, TcError>>;
|
||||||
@ -111,7 +138,7 @@ pub trait TcpTmSender<TmError, TcError> {
|
|||||||
&mut self,
|
&mut self,
|
||||||
tm_buffer: &mut [u8],
|
tm_buffer: &mut [u8],
|
||||||
tm_source: &mut (impl TmPacketSource<Error = TmError> + ?Sized),
|
tm_source: &mut (impl TmPacketSource<Error = TmError> + ?Sized),
|
||||||
conn_result: &mut ConnectionResult,
|
conn_result: &mut HandledConnectionInfo,
|
||||||
stream: &mut TcpStream,
|
stream: &mut TcpStream,
|
||||||
) -> Result<bool, TcpTmtcError<TmError, TcError>>;
|
) -> Result<bool, TcpTmtcError<TmError, TcError>>;
|
||||||
}
|
}
|
||||||
@ -134,32 +161,46 @@ pub trait TcpTmSender<TmError, TcError> {
|
|||||||
///
|
///
|
||||||
/// 1. [TcpTmtcInCobsServer] to exchange TMTC wrapped inside the COBS framing protocol.
|
/// 1. [TcpTmtcInCobsServer] to exchange TMTC wrapped inside the COBS framing protocol.
|
||||||
pub struct TcpTmtcGenericServer<
|
pub struct TcpTmtcGenericServer<
|
||||||
TmError,
|
|
||||||
TcError,
|
|
||||||
TmSource: TmPacketSource<Error = TmError>,
|
TmSource: TmPacketSource<Error = TmError>,
|
||||||
TcReceiver: ReceivesTc<Error = TcError>,
|
TcReceiver: ReceivesTc<Error = TcError>,
|
||||||
TmSender: TcpTmSender<TmError, TcError>,
|
TmSender: TcpTmSender<TmError, TcError>,
|
||||||
TcParser: TcpTcParser<TmError, TcError>,
|
TcParser: TcpTcParser<TmError, TcError>,
|
||||||
|
HandledConnection: HandledConnectionHandler,
|
||||||
|
TmError,
|
||||||
|
TcError,
|
||||||
> {
|
> {
|
||||||
|
pub finished_handler: HandledConnection,
|
||||||
pub(crate) listener: TcpListener,
|
pub(crate) listener: TcpListener,
|
||||||
pub(crate) inner_loop_delay: Duration,
|
pub(crate) inner_loop_delay: Duration,
|
||||||
pub(crate) tm_source: TmSource,
|
pub(crate) tm_source: TmSource,
|
||||||
pub(crate) tm_buffer: Vec<u8>,
|
pub(crate) tm_buffer: Vec<u8>,
|
||||||
pub(crate) tc_receiver: TcReceiver,
|
pub(crate) tc_receiver: TcReceiver,
|
||||||
pub(crate) tc_buffer: Vec<u8>,
|
pub(crate) tc_buffer: Vec<u8>,
|
||||||
stop_signal: Option<Arc<AtomicBool>>,
|
poll: Poll,
|
||||||
|
events: Events,
|
||||||
tc_handler: TcParser,
|
tc_handler: TcParser,
|
||||||
tm_handler: TmSender,
|
tm_handler: TmSender,
|
||||||
|
stop_signal: Option<Arc<AtomicBool>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<
|
impl<
|
||||||
TmError: 'static,
|
|
||||||
TcError: 'static,
|
|
||||||
TmSource: TmPacketSource<Error = TmError>,
|
TmSource: TmPacketSource<Error = TmError>,
|
||||||
TcReceiver: ReceivesTc<Error = TcError>,
|
TcReceiver: ReceivesTc<Error = TcError>,
|
||||||
TmSender: TcpTmSender<TmError, TcError>,
|
TmSender: TcpTmSender<TmError, TcError>,
|
||||||
TcParser: TcpTcParser<TmError, TcError>,
|
TcParser: TcpTcParser<TmError, TcError>,
|
||||||
> TcpTmtcGenericServer<TmError, TcError, TmSource, TcReceiver, TmSender, TcParser>
|
HandledConnection: HandledConnectionHandler,
|
||||||
|
TmError: 'static,
|
||||||
|
TcError: 'static,
|
||||||
|
>
|
||||||
|
TcpTmtcGenericServer<
|
||||||
|
TmSource,
|
||||||
|
TcReceiver,
|
||||||
|
TmSender,
|
||||||
|
TcParser,
|
||||||
|
HandledConnection,
|
||||||
|
TmError,
|
||||||
|
TcError,
|
||||||
|
>
|
||||||
{
|
{
|
||||||
/// Create a new generic TMTC server instance.
|
/// Create a new generic TMTC server instance.
|
||||||
///
|
///
|
||||||
@ -173,32 +214,52 @@ impl<
|
|||||||
/// then sent back to the client.
|
/// then sent back to the client.
|
||||||
/// * `tc_receiver` - Any received telecommand which was decoded successfully will be forwarded
|
/// * `tc_receiver` - Any received telecommand which was decoded successfully will be forwarded
|
||||||
/// to this TC receiver.
|
/// to this TC receiver.
|
||||||
|
/// * `stop_signal` - Can be used to stop the server even if a connection is ongoing.
|
||||||
pub fn new(
|
pub fn new(
|
||||||
cfg: ServerConfig,
|
cfg: ServerConfig,
|
||||||
tc_parser: TcParser,
|
tc_parser: TcParser,
|
||||||
tm_sender: TmSender,
|
tm_sender: TmSender,
|
||||||
tm_source: TmSource,
|
tm_source: TmSource,
|
||||||
tc_receiver: TcReceiver,
|
tc_receiver: TcReceiver,
|
||||||
|
finished_handler: HandledConnection,
|
||||||
stop_signal: Option<Arc<AtomicBool>>,
|
stop_signal: Option<Arc<AtomicBool>>,
|
||||||
) -> Result<Self, std::io::Error> {
|
) -> Result<Self, std::io::Error> {
|
||||||
// Create a TCP listener bound to two addresses.
|
// Create a TCP listener bound to two addresses.
|
||||||
let socket = Socket::new(Domain::IPV4, Type::STREAM, None)?;
|
let socket = Socket::new(Domain::IPV4, Type::STREAM, None)?;
|
||||||
|
|
||||||
socket.set_reuse_address(cfg.reuse_addr)?;
|
socket.set_reuse_address(cfg.reuse_addr)?;
|
||||||
#[cfg(unix)]
|
#[cfg(unix)]
|
||||||
socket.set_reuse_port(cfg.reuse_port)?;
|
socket.set_reuse_port(cfg.reuse_port)?;
|
||||||
|
// MIO does not do this for us. We want the accept calls to be non-blocking.
|
||||||
|
socket.set_nonblocking(true)?;
|
||||||
let addr = (cfg.addr).into();
|
let addr = (cfg.addr).into();
|
||||||
socket.bind(&addr)?;
|
socket.bind(&addr)?;
|
||||||
socket.listen(128)?;
|
socket.listen(128)?;
|
||||||
|
|
||||||
|
// Create a poll instance.
|
||||||
|
let poll = Poll::new()?;
|
||||||
|
// Create storage for events.
|
||||||
|
let events = Events::with_capacity(10);
|
||||||
|
let listener: std::net::TcpListener = socket.into();
|
||||||
|
let mut mio_listener = TcpListener::from_std(listener);
|
||||||
|
|
||||||
|
// Start listening for incoming connections.
|
||||||
|
poll.registry()
|
||||||
|
.register(&mut mio_listener, Token(0), Interest::READABLE)?;
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
tc_handler: tc_parser,
|
tc_handler: tc_parser,
|
||||||
tm_handler: tm_sender,
|
tm_handler: tm_sender,
|
||||||
listener: socket.into(),
|
poll,
|
||||||
|
events,
|
||||||
|
listener: mio_listener,
|
||||||
inner_loop_delay: cfg.inner_loop_delay,
|
inner_loop_delay: cfg.inner_loop_delay,
|
||||||
tm_source,
|
tm_source,
|
||||||
tm_buffer: vec![0; cfg.tm_buffer_size],
|
tm_buffer: vec![0; cfg.tm_buffer_size],
|
||||||
tc_receiver,
|
tc_receiver,
|
||||||
tc_buffer: vec![0; cfg.tc_buffer_size],
|
tc_buffer: vec![0; cfg.tc_buffer_size],
|
||||||
stop_signal,
|
stop_signal,
|
||||||
|
finished_handler,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -228,13 +289,50 @@ impl<
|
|||||||
/// client does not send any telecommands and no telemetry needs to be sent back to the client.
|
/// client does not send any telecommands and no telemetry needs to be sent back to the client.
|
||||||
pub fn handle_next_connection(
|
pub fn handle_next_connection(
|
||||||
&mut self,
|
&mut self,
|
||||||
|
poll_timeout: Option<Duration>,
|
||||||
) -> Result<ConnectionResult, TcpTmtcError<TmError, TcError>> {
|
) -> Result<ConnectionResult, TcpTmtcError<TmError, TcError>> {
|
||||||
let mut connection_result = ConnectionResult::default();
|
let mut handled_connections = 0;
|
||||||
|
// Poll Mio for events.
|
||||||
|
self.poll.poll(&mut self.events, poll_timeout)?;
|
||||||
|
let mut acceptable_connection = false;
|
||||||
|
// Process each event.
|
||||||
|
for event in self.events.iter() {
|
||||||
|
if event.token() == Token(0) {
|
||||||
|
acceptable_connection = true;
|
||||||
|
} else {
|
||||||
|
// Should never happen..
|
||||||
|
panic!("unexpected TCP event token");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// I'd love to do this in the loop above, but there are issues with multiple borrows.
|
||||||
|
if acceptable_connection {
|
||||||
|
// There might be mutliple connections available. Accept until all of them have
|
||||||
|
// been handled.
|
||||||
|
loop {
|
||||||
|
match self.listener.accept() {
|
||||||
|
Ok((stream, addr)) => {
|
||||||
|
self.handle_accepted_connection(stream, addr)?;
|
||||||
|
handled_connections += 1;
|
||||||
|
}
|
||||||
|
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => break,
|
||||||
|
Err(err) => return Err(TcpTmtcError::Io(err)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if handled_connections > 0 {
|
||||||
|
return Ok(ConnectionResult::HandledConnections(handled_connections));
|
||||||
|
}
|
||||||
|
Ok(ConnectionResult::AcceptTimeout)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_accepted_connection(
|
||||||
|
&mut self,
|
||||||
|
mut stream: TcpStream,
|
||||||
|
addr: SocketAddr,
|
||||||
|
) -> Result<(), TcpTmtcError<TmError, TcError>> {
|
||||||
let mut current_write_idx;
|
let mut current_write_idx;
|
||||||
let mut next_write_idx = 0;
|
let mut next_write_idx = 0;
|
||||||
let (mut stream, addr) = self.listener.accept()?;
|
let mut connection_result = HandledConnectionInfo::new(addr);
|
||||||
stream.set_nonblocking(true)?;
|
|
||||||
connection_result.addr = Some(addr);
|
|
||||||
current_write_idx = next_write_idx;
|
current_write_idx = next_write_idx;
|
||||||
loop {
|
loop {
|
||||||
let read_result = stream.read(&mut self.tc_buffer[current_write_idx..]);
|
let read_result = stream.read(&mut self.tc_buffer[current_write_idx..]);
|
||||||
@ -297,7 +395,9 @@ impl<
|
|||||||
.unwrap()
|
.unwrap()
|
||||||
.load(std::sync::atomic::Ordering::Relaxed)
|
.load(std::sync::atomic::Ordering::Relaxed)
|
||||||
{
|
{
|
||||||
return Ok(connection_result);
|
connection_result.stopped_by_signal = true;
|
||||||
|
self.finished_handler.handled_connection(connection_result);
|
||||||
|
return Ok(());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -313,7 +413,8 @@ impl<
|
|||||||
&mut connection_result,
|
&mut connection_result,
|
||||||
&mut stream,
|
&mut stream,
|
||||||
)?;
|
)?;
|
||||||
Ok(connection_result)
|
self.finished_handler.handled_connection(connection_result);
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -325,6 +426,8 @@ pub(crate) mod tests {
|
|||||||
|
|
||||||
use crate::tmtc::{ReceivesTcCore, TmPacketSourceCore};
|
use crate::tmtc::{ReceivesTcCore, TmPacketSourceCore};
|
||||||
|
|
||||||
|
use super::*;
|
||||||
|
|
||||||
#[derive(Default, Clone)]
|
#[derive(Default, Clone)]
|
||||||
pub(crate) struct SyncTcCacher {
|
pub(crate) struct SyncTcCacher {
|
||||||
pub(crate) tc_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
|
pub(crate) tc_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
|
||||||
@ -371,4 +474,30 @@ pub(crate) mod tests {
|
|||||||
Ok(0)
|
Ok(0)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
pub struct ConnectionFinishedHandler {
|
||||||
|
connection_info: VecDeque<HandledConnectionInfo>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl HandledConnectionHandler for ConnectionFinishedHandler {
|
||||||
|
fn handled_connection(&mut self, info: HandledConnectionInfo) {
|
||||||
|
self.connection_info.push_back(info);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ConnectionFinishedHandler {
|
||||||
|
pub fn check_last_connection(&mut self, num_tms: u32, num_tcs: u32) {
|
||||||
|
let last_conn_result = self
|
||||||
|
.connection_info
|
||||||
|
.pop_back()
|
||||||
|
.expect("no connection info available");
|
||||||
|
assert_eq!(last_conn_result.num_received_tcs, num_tcs);
|
||||||
|
assert_eq!(last_conn_result.num_sent_tms, num_tms);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn check_no_connections_left(&self) {
|
||||||
|
assert!(self.connection_info.is_empty());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,10 +1,8 @@
|
|||||||
use alloc::sync::Arc;
|
use alloc::sync::Arc;
|
||||||
use core::sync::atomic::AtomicBool;
|
use core::{sync::atomic::AtomicBool, time::Duration};
|
||||||
use delegate::delegate;
|
use delegate::delegate;
|
||||||
use std::{
|
use mio::net::{TcpListener, TcpStream};
|
||||||
io::Write,
|
use std::{io::Write, net::SocketAddr};
|
||||||
net::{SocketAddr, TcpListener, TcpStream},
|
|
||||||
};
|
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
encoding::parse_buffer_for_ccsds_space_packets,
|
encoding::parse_buffer_for_ccsds_space_packets,
|
||||||
@ -13,7 +11,8 @@ use crate::{
|
|||||||
};
|
};
|
||||||
|
|
||||||
use super::tcp_server::{
|
use super::tcp_server::{
|
||||||
ConnectionResult, ServerConfig, TcpTcParser, TcpTmSender, TcpTmtcError, TcpTmtcGenericServer,
|
ConnectionResult, HandledConnectionHandler, HandledConnectionInfo, ServerConfig, TcpTcParser,
|
||||||
|
TcpTmSender, TcpTmtcError, TcpTmtcGenericServer,
|
||||||
};
|
};
|
||||||
|
|
||||||
/// Concrete [TcpTcParser] implementation for the [TcpSpacepacketsServer].
|
/// Concrete [TcpTcParser] implementation for the [TcpSpacepacketsServer].
|
||||||
@ -27,14 +26,14 @@ impl<PacketIdChecker: ValidatorU16Id> SpacepacketsTcParser<PacketIdChecker> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<TmError, TcError: 'static, PacketIdChecker: ValidatorU16Id> TcpTcParser<TmError, TcError>
|
impl<PacketIdChecker: ValidatorU16Id, TmError, TcError: 'static> TcpTcParser<TmError, TcError>
|
||||||
for SpacepacketsTcParser<PacketIdChecker>
|
for SpacepacketsTcParser<PacketIdChecker>
|
||||||
{
|
{
|
||||||
fn handle_tc_parsing(
|
fn handle_tc_parsing(
|
||||||
&mut self,
|
&mut self,
|
||||||
tc_buffer: &mut [u8],
|
tc_buffer: &mut [u8],
|
||||||
tc_receiver: &mut (impl ReceivesTc<Error = TcError> + ?Sized),
|
tc_receiver: &mut (impl ReceivesTc<Error = TcError> + ?Sized),
|
||||||
conn_result: &mut ConnectionResult,
|
conn_result: &mut HandledConnectionInfo,
|
||||||
current_write_idx: usize,
|
current_write_idx: usize,
|
||||||
next_write_idx: &mut usize,
|
next_write_idx: &mut usize,
|
||||||
) -> Result<(), TcpTmtcError<TmError, TcError>> {
|
) -> Result<(), TcpTmtcError<TmError, TcError>> {
|
||||||
@ -59,7 +58,7 @@ impl<TmError, TcError> TcpTmSender<TmError, TcError> for SpacepacketsTmSender {
|
|||||||
&mut self,
|
&mut self,
|
||||||
tm_buffer: &mut [u8],
|
tm_buffer: &mut [u8],
|
||||||
tm_source: &mut (impl TmPacketSource<Error = TmError> + ?Sized),
|
tm_source: &mut (impl TmPacketSource<Error = TmError> + ?Sized),
|
||||||
conn_result: &mut ConnectionResult,
|
conn_result: &mut HandledConnectionInfo,
|
||||||
stream: &mut TcpStream,
|
stream: &mut TcpStream,
|
||||||
) -> Result<bool, TcpTmtcError<TmError, TcError>> {
|
) -> Result<bool, TcpTmtcError<TmError, TcError>> {
|
||||||
let mut tm_was_sent = false;
|
let mut tm_was_sent = false;
|
||||||
@ -94,29 +93,40 @@ impl<TmError, TcError> TcpTmSender<TmError, TcError> for SpacepacketsTmSender {
|
|||||||
/// The [TCP server integration tests](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs/tests/tcp_servers.rs)
|
/// The [TCP server integration tests](https://egit.irs.uni-stuttgart.de/rust/sat-rs/src/branch/main/satrs/tests/tcp_servers.rs)
|
||||||
/// also serves as the example application for this module.
|
/// also serves as the example application for this module.
|
||||||
pub struct TcpSpacepacketsServer<
|
pub struct TcpSpacepacketsServer<
|
||||||
TmError,
|
|
||||||
TcError: 'static,
|
|
||||||
TmSource: TmPacketSource<Error = TmError>,
|
TmSource: TmPacketSource<Error = TmError>,
|
||||||
TcReceiver: ReceivesTc<Error = TcError>,
|
TcReceiver: ReceivesTc<Error = TcError>,
|
||||||
PacketIdChecker: ValidatorU16Id,
|
PacketIdChecker: ValidatorU16Id,
|
||||||
|
HandledConnection: HandledConnectionHandler,
|
||||||
|
TmError,
|
||||||
|
TcError: 'static,
|
||||||
> {
|
> {
|
||||||
generic_server: TcpTmtcGenericServer<
|
pub generic_server: TcpTmtcGenericServer<
|
||||||
TmError,
|
|
||||||
TcError,
|
|
||||||
TmSource,
|
TmSource,
|
||||||
TcReceiver,
|
TcReceiver,
|
||||||
SpacepacketsTmSender,
|
SpacepacketsTmSender,
|
||||||
SpacepacketsTcParser<PacketIdChecker>,
|
SpacepacketsTcParser<PacketIdChecker>,
|
||||||
|
HandledConnection,
|
||||||
|
TmError,
|
||||||
|
TcError,
|
||||||
>,
|
>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<
|
impl<
|
||||||
TmError: 'static,
|
|
||||||
TcError: 'static,
|
|
||||||
TmSource: TmPacketSource<Error = TmError>,
|
TmSource: TmPacketSource<Error = TmError>,
|
||||||
TcReceiver: ReceivesTc<Error = TcError>,
|
TcReceiver: ReceivesTc<Error = TcError>,
|
||||||
PacketIdChecker: ValidatorU16Id,
|
PacketIdChecker: ValidatorU16Id,
|
||||||
> TcpSpacepacketsServer<TmError, TcError, TmSource, TcReceiver, PacketIdChecker>
|
HandledConnection: HandledConnectionHandler,
|
||||||
|
TmError: 'static,
|
||||||
|
TcError: 'static,
|
||||||
|
>
|
||||||
|
TcpSpacepacketsServer<
|
||||||
|
TmSource,
|
||||||
|
TcReceiver,
|
||||||
|
PacketIdChecker,
|
||||||
|
HandledConnection,
|
||||||
|
TmError,
|
||||||
|
TcError,
|
||||||
|
>
|
||||||
{
|
{
|
||||||
///
|
///
|
||||||
/// ## Parameter
|
/// ## Parameter
|
||||||
@ -133,6 +143,7 @@ impl<
|
|||||||
tm_source: TmSource,
|
tm_source: TmSource,
|
||||||
tc_receiver: TcReceiver,
|
tc_receiver: TcReceiver,
|
||||||
packet_id_checker: PacketIdChecker,
|
packet_id_checker: PacketIdChecker,
|
||||||
|
handled_connection: HandledConnection,
|
||||||
stop_signal: Option<Arc<AtomicBool>>,
|
stop_signal: Option<Arc<AtomicBool>>,
|
||||||
) -> Result<Self, std::io::Error> {
|
) -> Result<Self, std::io::Error> {
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
@ -142,6 +153,7 @@ impl<
|
|||||||
SpacepacketsTmSender::default(),
|
SpacepacketsTmSender::default(),
|
||||||
tm_source,
|
tm_source,
|
||||||
tc_receiver,
|
tc_receiver,
|
||||||
|
handled_connection,
|
||||||
stop_signal,
|
stop_signal,
|
||||||
)?,
|
)?,
|
||||||
})
|
})
|
||||||
@ -158,6 +170,7 @@ impl<
|
|||||||
/// Delegation to the [TcpTmtcGenericServer::handle_next_connection] call.
|
/// Delegation to the [TcpTmtcGenericServer::handle_next_connection] call.
|
||||||
pub fn handle_next_connection(
|
pub fn handle_next_connection(
|
||||||
&mut self,
|
&mut self,
|
||||||
|
poll_timeout: Option<Duration>
|
||||||
) -> Result<ConnectionResult, TcpTmtcError<TmError, TcError>>;
|
) -> Result<ConnectionResult, TcpTmtcError<TmError, TcError>>;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -185,8 +198,8 @@ mod tests {
|
|||||||
};
|
};
|
||||||
|
|
||||||
use crate::hal::std::tcp_server::{
|
use crate::hal::std::tcp_server::{
|
||||||
tests::{SyncTcCacher, SyncTmSource},
|
tests::{ConnectionFinishedHandler, SyncTcCacher, SyncTmSource},
|
||||||
ServerConfig,
|
ConnectionResult, ServerConfig,
|
||||||
};
|
};
|
||||||
|
|
||||||
use super::TcpSpacepacketsServer;
|
use super::TcpSpacepacketsServer;
|
||||||
@ -202,12 +215,20 @@ mod tests {
|
|||||||
tm_source: SyncTmSource,
|
tm_source: SyncTmSource,
|
||||||
packet_id_lookup: HashSet<PacketId>,
|
packet_id_lookup: HashSet<PacketId>,
|
||||||
stop_signal: Option<Arc<AtomicBool>>,
|
stop_signal: Option<Arc<AtomicBool>>,
|
||||||
) -> TcpSpacepacketsServer<(), (), SyncTmSource, SyncTcCacher, HashSet<PacketId>> {
|
) -> TcpSpacepacketsServer<
|
||||||
|
SyncTmSource,
|
||||||
|
SyncTcCacher,
|
||||||
|
HashSet<PacketId>,
|
||||||
|
ConnectionFinishedHandler,
|
||||||
|
(),
|
||||||
|
(),
|
||||||
|
> {
|
||||||
TcpSpacepacketsServer::new(
|
TcpSpacepacketsServer::new(
|
||||||
ServerConfig::new(*addr, Duration::from_millis(2), 1024, 1024),
|
ServerConfig::new(*addr, Duration::from_millis(2), 1024, 1024),
|
||||||
tm_source,
|
tm_source,
|
||||||
tc_receiver,
|
tc_receiver,
|
||||||
packet_id_lookup,
|
packet_id_lookup,
|
||||||
|
ConnectionFinishedHandler::default(),
|
||||||
stop_signal,
|
stop_signal,
|
||||||
)
|
)
|
||||||
.expect("TCP server generation failed")
|
.expect("TCP server generation failed")
|
||||||
@ -234,13 +255,20 @@ mod tests {
|
|||||||
let set_if_done = conn_handled.clone();
|
let set_if_done = conn_handled.clone();
|
||||||
// Call the connection handler in separate thread, does block.
|
// Call the connection handler in separate thread, does block.
|
||||||
thread::spawn(move || {
|
thread::spawn(move || {
|
||||||
let result = tcp_server.handle_next_connection();
|
let result = tcp_server.handle_next_connection(Some(Duration::from_millis(100)));
|
||||||
if result.is_err() {
|
if result.is_err() {
|
||||||
panic!("handling connection failed: {:?}", result.unwrap_err());
|
panic!("handling connection failed: {:?}", result.unwrap_err());
|
||||||
}
|
}
|
||||||
let conn_result = result.unwrap();
|
let conn_result = result.unwrap();
|
||||||
assert_eq!(conn_result.num_received_tcs, 1);
|
matches!(conn_result, ConnectionResult::HandledConnections(1));
|
||||||
assert_eq!(conn_result.num_sent_tms, 0);
|
tcp_server
|
||||||
|
.generic_server
|
||||||
|
.finished_handler
|
||||||
|
.check_last_connection(0, 1);
|
||||||
|
tcp_server
|
||||||
|
.generic_server
|
||||||
|
.finished_handler
|
||||||
|
.check_no_connections_left();
|
||||||
set_if_done.store(true, Ordering::Relaxed);
|
set_if_done.store(true, Ordering::Relaxed);
|
||||||
});
|
});
|
||||||
let ping_tc =
|
let ping_tc =
|
||||||
@ -305,16 +333,20 @@ mod tests {
|
|||||||
|
|
||||||
// Call the connection handler in separate thread, does block.
|
// Call the connection handler in separate thread, does block.
|
||||||
thread::spawn(move || {
|
thread::spawn(move || {
|
||||||
let result = tcp_server.handle_next_connection();
|
let result = tcp_server.handle_next_connection(Some(Duration::from_millis(100)));
|
||||||
if result.is_err() {
|
if result.is_err() {
|
||||||
panic!("handling connection failed: {:?}", result.unwrap_err());
|
panic!("handling connection failed: {:?}", result.unwrap_err());
|
||||||
}
|
}
|
||||||
let conn_result = result.unwrap();
|
let conn_result = result.unwrap();
|
||||||
assert_eq!(
|
matches!(conn_result, ConnectionResult::HandledConnections(1));
|
||||||
conn_result.num_received_tcs, 2,
|
tcp_server
|
||||||
"wrong number of received TCs"
|
.generic_server
|
||||||
);
|
.finished_handler
|
||||||
assert_eq!(conn_result.num_sent_tms, 2, "wrong number of sent TMs");
|
.check_last_connection(2, 2);
|
||||||
|
tcp_server
|
||||||
|
.generic_server
|
||||||
|
.finished_handler
|
||||||
|
.check_no_connections_left();
|
||||||
set_if_done.store(true, Ordering::Relaxed);
|
set_if_done.store(true, Ordering::Relaxed);
|
||||||
});
|
});
|
||||||
let mut stream = TcpStream::connect(dest_addr).expect("connecting to TCP server failed");
|
let mut stream = TcpStream::connect(dest_addr).expect("connecting to TCP server failed");
|
||||||
|
@ -24,7 +24,10 @@ use std::{
|
|||||||
use hashbrown::HashSet;
|
use hashbrown::HashSet;
|
||||||
use satrs::{
|
use satrs::{
|
||||||
encoding::cobs::encode_packet_with_cobs,
|
encoding::cobs::encode_packet_with_cobs,
|
||||||
hal::std::tcp_server::{ServerConfig, TcpSpacepacketsServer, TcpTmtcInCobsServer},
|
hal::std::tcp_server::{
|
||||||
|
ConnectionResult, HandledConnectionHandler, HandledConnectionInfo, ServerConfig,
|
||||||
|
TcpSpacepacketsServer, TcpTmtcInCobsServer,
|
||||||
|
},
|
||||||
tmtc::{ReceivesTcCore, TmPacketSourceCore},
|
tmtc::{ReceivesTcCore, TmPacketSourceCore},
|
||||||
};
|
};
|
||||||
use spacepackets::{
|
use spacepackets::{
|
||||||
@ -33,10 +36,36 @@ use spacepackets::{
|
|||||||
};
|
};
|
||||||
use std::{collections::VecDeque, sync::Arc, vec::Vec};
|
use std::{collections::VecDeque, sync::Arc, vec::Vec};
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
pub struct ConnectionFinishedHandler {
|
||||||
|
connection_info: VecDeque<HandledConnectionInfo>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl HandledConnectionHandler for ConnectionFinishedHandler {
|
||||||
|
fn handled_connection(&mut self, info: HandledConnectionInfo) {
|
||||||
|
self.connection_info.push_back(info);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ConnectionFinishedHandler {
|
||||||
|
pub fn check_last_connection(&mut self, num_tms: u32, num_tcs: u32) {
|
||||||
|
let last_conn_result = self
|
||||||
|
.connection_info
|
||||||
|
.pop_back()
|
||||||
|
.expect("no connection info available");
|
||||||
|
assert_eq!(last_conn_result.num_received_tcs, num_tcs);
|
||||||
|
assert_eq!(last_conn_result.num_sent_tms, num_tms);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn check_no_connections_left(&self) {
|
||||||
|
assert!(self.connection_info.is_empty());
|
||||||
|
}
|
||||||
|
}
|
||||||
#[derive(Default, Clone)]
|
#[derive(Default, Clone)]
|
||||||
struct SyncTcCacher {
|
struct SyncTcCacher {
|
||||||
tc_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
|
tc_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ReceivesTcCore for SyncTcCacher {
|
impl ReceivesTcCore for SyncTcCacher {
|
||||||
type Error = ();
|
type Error = ();
|
||||||
|
|
||||||
@ -96,6 +125,7 @@ fn test_cobs_server() {
|
|||||||
ServerConfig::new(AUTO_PORT_ADDR, Duration::from_millis(2), 1024, 1024),
|
ServerConfig::new(AUTO_PORT_ADDR, Duration::from_millis(2), 1024, 1024),
|
||||||
tm_source,
|
tm_source,
|
||||||
tc_receiver.clone(),
|
tc_receiver.clone(),
|
||||||
|
ConnectionFinishedHandler::default(),
|
||||||
None,
|
None,
|
||||||
)
|
)
|
||||||
.expect("TCP server generation failed");
|
.expect("TCP server generation failed");
|
||||||
@ -107,13 +137,20 @@ fn test_cobs_server() {
|
|||||||
|
|
||||||
// Call the connection handler in separate thread, does block.
|
// Call the connection handler in separate thread, does block.
|
||||||
thread::spawn(move || {
|
thread::spawn(move || {
|
||||||
let result = tcp_server.handle_next_connection();
|
let result = tcp_server.handle_next_connection(Some(Duration::from_millis(400)));
|
||||||
if result.is_err() {
|
if result.is_err() {
|
||||||
panic!("handling connection failed: {:?}", result.unwrap_err());
|
panic!("handling connection failed: {:?}", result.unwrap_err());
|
||||||
}
|
}
|
||||||
let conn_result = result.unwrap();
|
let conn_result = result.unwrap();
|
||||||
assert_eq!(conn_result.num_received_tcs, 1, "No TC received");
|
assert_eq!(conn_result, ConnectionResult::HandledConnections(1));
|
||||||
assert_eq!(conn_result.num_sent_tms, 1, "No TM received");
|
tcp_server
|
||||||
|
.generic_server
|
||||||
|
.finished_handler
|
||||||
|
.check_last_connection(1, 1);
|
||||||
|
tcp_server
|
||||||
|
.generic_server
|
||||||
|
.finished_handler
|
||||||
|
.check_no_connections_left();
|
||||||
// Signal the main thread we are done.
|
// Signal the main thread we are done.
|
||||||
set_if_done.store(true, Ordering::Relaxed);
|
set_if_done.store(true, Ordering::Relaxed);
|
||||||
});
|
});
|
||||||
@ -180,6 +217,7 @@ fn test_ccsds_server() {
|
|||||||
tm_source,
|
tm_source,
|
||||||
tc_receiver.clone(),
|
tc_receiver.clone(),
|
||||||
packet_id_lookup,
|
packet_id_lookup,
|
||||||
|
ConnectionFinishedHandler::default(),
|
||||||
None,
|
None,
|
||||||
)
|
)
|
||||||
.expect("TCP server generation failed");
|
.expect("TCP server generation failed");
|
||||||
@ -190,13 +228,20 @@ fn test_ccsds_server() {
|
|||||||
let set_if_done = conn_handled.clone();
|
let set_if_done = conn_handled.clone();
|
||||||
// Call the connection handler in separate thread, does block.
|
// Call the connection handler in separate thread, does block.
|
||||||
thread::spawn(move || {
|
thread::spawn(move || {
|
||||||
let result = tcp_server.handle_next_connection();
|
let result = tcp_server.handle_next_connection(Some(Duration::from_millis(500)));
|
||||||
if result.is_err() {
|
if result.is_err() {
|
||||||
panic!("handling connection failed: {:?}", result.unwrap_err());
|
panic!("handling connection failed: {:?}", result.unwrap_err());
|
||||||
}
|
}
|
||||||
let conn_result = result.unwrap();
|
let conn_result = result.unwrap();
|
||||||
assert_eq!(conn_result.num_received_tcs, 1);
|
assert_eq!(conn_result, ConnectionResult::HandledConnections(1));
|
||||||
assert_eq!(conn_result.num_sent_tms, 1);
|
tcp_server
|
||||||
|
.generic_server
|
||||||
|
.finished_handler
|
||||||
|
.check_last_connection(1, 1);
|
||||||
|
tcp_server
|
||||||
|
.generic_server
|
||||||
|
.finished_handler
|
||||||
|
.check_no_connections_left();
|
||||||
set_if_done.store(true, Ordering::Relaxed);
|
set_if_done.store(true, Ordering::Relaxed);
|
||||||
});
|
});
|
||||||
let mut stream = TcpStream::connect(dest_addr).expect("connecting to TCP server failed");
|
let mut stream = TcpStream::connect(dest_addr).expect("connecting to TCP server failed");
|
||||||
|
Loading…
Reference in New Issue
Block a user