diff --git a/Cargo.lock b/Cargo.lock index 9d4bd32..f36afba 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -90,8 +90,10 @@ checksum = "8a0d04d43504c61aa6c7531f1871dd0d418d91130162063b789da00fd7057a5e" dependencies = [ "android-tzdata", "iana-time-zone", + "js-sys", "num-traits", "serde", + "wasm-bindgen", "windows-targets 0.52.4", ] @@ -184,6 +186,12 @@ dependencies = [ "allocator-api2", ] +[[package]] +name = "heck" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" + [[package]] name = "hermit-abi" version = "0.3.9" @@ -232,6 +240,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "lazy_static" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" + [[package]] name = "libc" version = "0.2.153" @@ -297,12 +311,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" [[package]] -name = "ops-sat-sw" +name = "ops-sat-rs" version = "0.1.0" dependencies = [ + "chrono", "fern", + "lazy_static", "log", "satrs", + "strum", ] [[package]] @@ -360,6 +377,12 @@ dependencies = [ "bitflags", ] +[[package]] +name = "rustversion" +version = "1.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "80af6f9131f277a45a3fba6ce8e2258037bb0477a67e610d3c1fe046ab31de47" + [[package]] name = "satrs" version = "0.2.0-rc.0" @@ -445,6 +468,28 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "strum" +version = "0.26.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d8cec3501a5194c432b2b7976db6b7d10ec95c253208b45f83f7136aa985e29" +dependencies = [ + "strum_macros", +] + +[[package]] +name = "strum_macros" +version = "0.26.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6cf59daf282c0a494ba14fd21610a0325f9f90ec9d1231dea26bcb1d696c946" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "rustversion", + "syn 2.0.58", +] + [[package]] name = "syn" version = "1.0.109" diff --git a/Cargo.toml b/Cargo.toml index e1546eb..40a8bbe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,10 @@ edition = "2021" [dependencies] fern = "0.6" +chrono = "0.4" log = "0.4" +lazy_static = "1" +strum = { version = "0.26", features = ["derive"] } [dependencies.satrs] version = "0.2.0-rc.0" diff --git a/src/ccsds.rs b/src/ccsds.rs new file mode 100644 index 0000000..e1417f4 --- /dev/null +++ b/src/ccsds.rs @@ -0,0 +1,53 @@ +use satrs::pus::ReceivesEcssPusTc; +use satrs::spacepackets::{CcsdsPacket, SpHeader}; +use satrs::tmtc::{CcsdsPacketHandler, ReceivesCcsdsTc}; +use satrs::ValidatorU16Id; +use ops_sat_rs::config::components::Apid; +use ops_sat_rs::config::APID_VALIDATOR; + +#[derive(Clone)] +pub struct CcsdsReceiver< + TcSource: ReceivesCcsdsTc + ReceivesEcssPusTc + Clone, + E, +> { + pub tc_source: TcSource, +} + +impl< + TcSource: ReceivesCcsdsTc + ReceivesEcssPusTc + Clone + 'static, + E: 'static, + > ValidatorU16Id for CcsdsReceiver +{ + fn validate(&self, apid: u16) -> bool { + APID_VALIDATOR.contains(&apid) + } +} + +impl< + TcSource: ReceivesCcsdsTc + ReceivesEcssPusTc + Clone + 'static, + E: 'static, + > CcsdsPacketHandler for CcsdsReceiver +{ + type Error = E; + + fn handle_packet_with_valid_apid( + &mut self, + sp_header: &SpHeader, + tc_raw: &[u8], + ) -> Result<(), Self::Error> { + if sp_header.apid() == Apid::Cfdp as u16 { + } else { + return self.tc_source.pass_ccsds(sp_header, tc_raw); + } + Ok(()) + } + + fn handle_packet_with_unknown_apid( + &mut self, + sp_header: &SpHeader, + _tc_raw: &[u8], + ) -> Result<(), Self::Error> { + log::warn!("unknown APID 0x{:x?} detected", sp_header.apid()); + Ok(()) + } +} diff --git a/src/config.rs b/src/config.rs new file mode 100644 index 0000000..72a6ece --- /dev/null +++ b/src/config.rs @@ -0,0 +1,74 @@ +use lazy_static::lazy_static; +use satrs::spacepackets::{PacketId, PacketType}; +use std::{collections::HashSet, net::Ipv4Addr}; +use strum::IntoEnumIterator; + +pub const OBSW_SERVER_ADDR: Ipv4Addr = Ipv4Addr::UNSPECIFIED; +pub const SERVER_PORT: u16 = 7301; + +lazy_static! { + pub static ref PACKET_ID_VALIDATOR: HashSet = { + let mut set = HashSet::new(); + for id in components::Apid::iter() { + set.insert(PacketId::new(PacketType::Tc, true, id as u16)); + } + set + }; + pub static ref APID_VALIDATOR: HashSet = { + let mut set = HashSet::new(); + for id in components::Apid::iter() { + set.insert(id as u16); + } + set + }; +} + +pub mod components { + use satrs::request::UniqueApidTargetId; + use strum::EnumIter; + + #[derive(Copy, Clone, PartialEq, Eq, EnumIter)] + pub enum Apid { + Sched = 1, + GenericPus = 2, + Cfdp = 4, + } + + // Component IDs for components with the PUS APID. + #[derive(Copy, Clone, PartialEq, Eq)] + pub enum PusId { + PusEventManagement = 0, + PusRouting = 1, + PusTest = 2, + PusAction = 3, + PusMode = 4, + PusHk = 5, + } + + #[derive(Copy, Clone, PartialEq, Eq)] + pub enum AcsId { + Mgm0 = 0, + } + + pub const PUS_ACTION_SERVICE: UniqueApidTargetId = + UniqueApidTargetId::new(Apid::GenericPus as u16, PusId::PusAction as u32); + pub const PUS_EVENT_MANAGEMENT: UniqueApidTargetId = + UniqueApidTargetId::new(Apid::GenericPus as u16, 0); + pub const PUS_ROUTING_SERVICE: UniqueApidTargetId = + UniqueApidTargetId::new(Apid::GenericPus as u16, PusId::PusRouting as u32); + pub const PUS_TEST_SERVICE: UniqueApidTargetId = + UniqueApidTargetId::new(Apid::GenericPus as u16, PusId::PusTest as u32); + pub const PUS_MODE_SERVICE: UniqueApidTargetId = + UniqueApidTargetId::new(Apid::GenericPus as u16, PusId::PusMode as u32); + pub const PUS_HK_SERVICE: UniqueApidTargetId = + UniqueApidTargetId::new(Apid::GenericPus as u16, PusId::PusHk as u32); + pub const PUS_SCHED_SERVICE: UniqueApidTargetId = + UniqueApidTargetId::new(Apid::Sched as u16, 0); +} + +pub mod tasks { + pub const FREQ_MS_UDP_TMTC: u64 = 200; + pub const FREQ_MS_EVENT_HANDLING: u64 = 400; + pub const FREQ_MS_AOCS: u64 = 500; + pub const FREQ_MS_PUS_STACK: u64 = 200; +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..b957951 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,3 @@ +use std::net::Ipv4Addr; + +pub mod config; diff --git a/src/logger.rs b/src/logger.rs new file mode 100644 index 0000000..5e7163c --- /dev/null +++ b/src/logger.rs @@ -0,0 +1,17 @@ +pub fn setup_logger() -> Result<(), fern::InitError> { + fern::Dispatch::new() + .format(|out, message, record| { + out.finish(format_args!( + "{}[{}][{}] {}", + chrono::Local::now().format("[%Y-%m-%d][%H:%M:%S]"), + std::thread::current().name().expect("unnamed_thread"), + record.level(), + message + )) + }) + .level(log::LevelFilter::Debug) + .chain(std::io::stdout()) + .chain(fern::log_file("output.log")?) + .apply()?; + Ok(()) +} diff --git a/src/main.rs b/src/main.rs index 0a9c4c2..c53fbf4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,95 @@ -fn main() { - println!("OPSSAT Rust experiment OBSW"); +use std::{ + net::{IpAddr, SocketAddr}, + sync::mpsc, + thread, + time::Duration, +}; - loop { - std::thread::sleep(std::time::Duration::from_millis(500)); - } +use log::info; +use ops_sat_rs::config::{ + tasks::FREQ_MS_UDP_TMTC, OBSW_SERVER_ADDR, PACKET_ID_VALIDATOR, SERVER_PORT, +}; +use satrs::{ + hal::std::{tcp_server::ServerConfig, udp_server::UdpTcServer}, + tmtc::CcsdsDistributor, +}; + +use crate::{ + ccsds::CcsdsReceiver, + logger::setup_logger, + tcp::{SyncTcpTmSource, TcpTask}, + tmtc::PusTcSourceProviderDynamic, + udp::{DynamicUdpTmHandler, UdpTmtcServer}, +}; + +mod ccsds; +mod logger; +mod tcp; +mod tmtc; +mod udp; + +fn main() { + setup_logger().expect("setting up logging with fern failed"); + println!("OPS-SAT Rust experiment OBSW"); + + let (tc_source_tx, tc_source_rx) = mpsc::channel(); + let (tm_server_tx, tm_server_rx) = mpsc::channel(); + + let tc_source = PusTcSourceProviderDynamic(tc_source_tx); + + let ccsds_receiver = CcsdsReceiver { tc_source }; + + let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT); + let udp_ccsds_distributor = CcsdsDistributor::new(ccsds_receiver.clone()); + let udp_tc_server = UdpTcServer::new(sock_addr, 2048, Box::new(udp_ccsds_distributor)) + .expect("creating UDP TMTC server failed"); + + let mut udp_tmtc_server = UdpTmtcServer { + udp_tc_server, + tm_handler: DynamicUdpTmHandler { + tm_rx: tm_server_rx, + }, + }; + + let tcp_ccsds_distributor = CcsdsDistributor::new(ccsds_receiver); + let tcp_server_cfg = ServerConfig::new(sock_addr, Duration::from_millis(400), 4096, 8192); + let sync_tm_tcp_source = SyncTcpTmSource::new(200); + let mut tcp_server = TcpTask::new( + tcp_server_cfg, + sync_tm_tcp_source.clone(), + tcp_ccsds_distributor, + PACKET_ID_VALIDATOR.clone(), + ) + .expect("tcp server creation failed"); + + info!("Starting TMTC and UDP task"); + let jh_udp_tmtc = thread::Builder::new() + .name("TMTC and UDP".to_string()) + .spawn(move || { + info!("Running UDP server on port {SERVER_PORT}"); + loop { + udp_tmtc_server.periodic_operation(); + // tmtc_task.periodic_operation(); + thread::sleep(Duration::from_millis(FREQ_MS_UDP_TMTC)); + } + }) + .unwrap(); + + info!("Starting TCP task"); + let jh_tcp = thread::Builder::new() + .name("TCP".to_string()) + .spawn(move || { + info!("Running TCP server on port {SERVER_PORT}"); + loop { + tcp_server.periodic_operation(); + } + }) + .unwrap(); + + jh_udp_tmtc + .join() + .expect("Joining UDP TMTC server thread failed"); + jh_tcp + .join() + .expect("Joining TCP TMTC server thread failed"); } diff --git a/src/tcp.rs b/src/tcp.rs new file mode 100644 index 0000000..04bb136 --- /dev/null +++ b/src/tcp.rs @@ -0,0 +1,127 @@ +use std::{ + collections::{HashSet, VecDeque}, + sync::{Arc, Mutex}, +}; + +use log::{info, warn}; +use satrs::{ + hal::std::tcp_server::{ServerConfig, TcpSpacepacketsServer}, + pus::ReceivesEcssPusTc, + spacepackets::PacketId, + tmtc::{CcsdsDistributor, CcsdsError, ReceivesCcsdsTc, TmPacketSourceCore}, +}; + +use crate::ccsds::CcsdsReceiver; + +#[derive(Default, Clone)] +pub struct SyncTcpTmSource { + tm_queue: Arc>>>, + max_packets_stored: usize, + pub silent_packet_overwrite: bool, +} + +impl SyncTcpTmSource { + pub fn new(max_packets_stored: usize) -> Self { + Self { + tm_queue: Arc::default(), + max_packets_stored, + silent_packet_overwrite: true, + } + } + + pub fn add_tm(&mut self, tm: &[u8]) { + let mut tm_queue = self.tm_queue.lock().expect("locking tm queue failec"); + if tm_queue.len() > self.max_packets_stored { + if !self.silent_packet_overwrite { + warn!("TPC TM source is full, deleting oldest packet"); + } + tm_queue.pop_front(); + } + tm_queue.push_back(tm.to_vec()); + } +} + +impl TmPacketSourceCore for SyncTcpTmSource { + type Error = (); + + fn retrieve_packet(&mut self, buffer: &mut [u8]) -> Result { + let mut tm_queue = self.tm_queue.lock().expect("locking tm queue failed"); + if !tm_queue.is_empty() { + let next_vec = tm_queue.front().unwrap(); + if buffer.len() < next_vec.len() { + panic!( + "provided buffer too small, must be at least {} bytes", + next_vec.len() + ); + } + let next_vec = tm_queue.pop_front().unwrap(); + buffer[0..next_vec.len()].copy_from_slice(&next_vec); + if next_vec.len() > 9 { + let service = next_vec[7]; + let subservice = next_vec[8]; + info!("Sending PUS TM[{service},{subservice}]") + } else { + info!("Sending PUS TM"); + } + return Ok(next_vec.len()); + } + Ok(0) + } +} + +pub type TcpServerType = TcpSpacepacketsServer< + (), + CcsdsError, + SyncTcpTmSource, + CcsdsDistributor, MpscErrorType>, + HashSet, +>; + +pub struct TcpTask< + TcSource: ReceivesCcsdsTc + + ReceivesEcssPusTc + + Clone + + Send + + 'static, + MpscErrorType: 'static, +> { + server: TcpServerType, +} + +impl< + TcSource: ReceivesCcsdsTc + + ReceivesEcssPusTc + + Clone + + Send + + 'static, + MpscErrorType: 'static + core::fmt::Debug, + > TcpTask +{ + pub fn new( + cfg: ServerConfig, + tm_source: SyncTcpTmSource, + tc_receiver: CcsdsDistributor, MpscErrorType>, + packet_id_lookup: HashSet, + ) -> Result { + Ok(Self { + server: TcpSpacepacketsServer::new(cfg, tm_source, tc_receiver, packet_id_lookup)?, + }) + } + + pub fn periodic_operation(&mut self) { + loop { + let result = self.server.handle_next_connection(); + match result { + Ok(conn_result) => { + info!( + "Served {} TMs and {} TCs for client {:?}", + conn_result.num_sent_tms, conn_result.num_received_tcs, conn_result.addr + ); + } + Err(e) => { + warn!("TCP server error: {e:?}"); + } + } + } + } +} diff --git a/src/tmtc.rs b/src/tmtc.rs new file mode 100644 index 0000000..1857bb9 --- /dev/null +++ b/src/tmtc.rs @@ -0,0 +1,81 @@ +use satrs::{ + pus::ReceivesEcssPusTc, + spacepackets::{ecss::tc::PusTcReader, SpHeader}, + tmtc::ReceivesCcsdsTc, +}; +use std::sync::mpsc::{self, SendError, Sender, TryRecvError}; + +// Newtype, can not implement necessary traits on MPSC sender directly because of orphan rules. +#[derive(Clone)] +pub struct PusTcSourceProviderDynamic(pub Sender>); + +impl ReceivesEcssPusTc for PusTcSourceProviderDynamic { + type Error = SendError>; + + fn pass_pus_tc(&mut self, _: &SpHeader, pus_tc: &PusTcReader) -> Result<(), Self::Error> { + self.0.send(pus_tc.raw_data().to_vec())?; + Ok(()) + } +} + +impl ReceivesCcsdsTc for PusTcSourceProviderDynamic { + type Error = mpsc::SendError>; + + fn pass_ccsds(&mut self, _: &SpHeader, tc_raw: &[u8]) -> Result<(), Self::Error> { + self.0.send(tc_raw.to_vec())?; + Ok(()) + } +} + +// TC source components where the heap is the backing memory of the received telecommands. +pub struct TcSourceTaskDynamic { + pub tc_receiver: mpsc::Receiver>, + // pus_receiver: PusReceiver, +} + +impl TcSourceTaskDynamic { + pub fn new( + tc_receiver: mpsc::Receiver>, + // pus_receiver: PusReceiver, + ) -> Self { + Self { + tc_receiver, + // pus_receiver, + } + } + + pub fn periodic_operation(&mut self) { + self.poll_tc(); + } + + pub fn poll_tc(&mut self) -> bool { + match self.tc_receiver.try_recv() { + Ok(tc) => match PusTcReader::new(&tc) { + Ok((pus_tc, _)) => { + /* + self.pus_receiver + .handle_tc_packet( + satrs::pus::TcInMemory::Vec(tc.clone()), + pus_tc.service(), + &pus_tc, + ) + .ok(); + */ + true + } + Err(e) => { + log::warn!("error creating PUS TC from raw data: {e}"); + log::warn!("raw data: {:x?}", tc); + true + } + }, + Err(e) => match e { + TryRecvError::Empty => false, + TryRecvError::Disconnected => { + log::warn!("tmtc thread: sender disconnected"); + false + } + }, + } + } +} diff --git a/src/udp.rs b/src/udp.rs new file mode 100644 index 0000000..b2e3b62 --- /dev/null +++ b/src/udp.rs @@ -0,0 +1,185 @@ +use std::net::{SocketAddr, UdpSocket}; +use std::sync::mpsc; + +use log::{info, warn}; +use satrs::pus::{PusTmAsVec, PusTmInPool}; +use satrs::{ + hal::std::udp_server::{ReceiveResult, UdpTcServer}, + pool::{PoolProviderWithGuards, SharedStaticMemoryPool}, + tmtc::CcsdsError, +}; + +pub trait UdpTmHandler { + fn send_tm_to_udp_client(&mut self, socket: &UdpSocket, recv_addr: &SocketAddr); +} + +pub struct DynamicUdpTmHandler { + pub tm_rx: mpsc::Receiver, +} + +impl UdpTmHandler for DynamicUdpTmHandler { + fn send_tm_to_udp_client(&mut self, socket: &UdpSocket, recv_addr: &SocketAddr) { + while let Ok(tm) = self.tm_rx.try_recv() { + if tm.packet.len() > 9 { + let service = tm.packet[7]; + let subservice = tm.packet[8]; + info!("Sending PUS TM[{service},{subservice}]") + } else { + info!("Sending PUS TM"); + } + let result = socket.send_to(&tm.packet, recv_addr); + if let Err(e) = result { + warn!("Sending TM with UDP socket failed: {e}") + } + } + } +} + +pub struct UdpTmtcServer { + pub udp_tc_server: UdpTcServer>, + pub tm_handler: TmHandler, +} + +impl + UdpTmtcServer +{ + pub fn periodic_operation(&mut self) { + while self.poll_tc_server() {} + if let Some(recv_addr) = self.udp_tc_server.last_sender() { + self.tm_handler + .send_tm_to_udp_client(&self.udp_tc_server.socket, &recv_addr); + } + } + + fn poll_tc_server(&mut self) -> bool { + match self.udp_tc_server.try_recv_tc() { + Ok(_) => true, + Err(e) => match e { + ReceiveResult::ReceiverError(e) => match e { + CcsdsError::ByteConversionError(e) => { + warn!("packet error: {e:?}"); + true + } + CcsdsError::CustomError(e) => { + warn!("mpsc custom error {e:?}"); + true + } + }, + ReceiveResult::IoError(e) => { + warn!("IO error {e}"); + false + } + ReceiveResult::NothingReceived => false, + }, + } + } +} + +#[cfg(test)] +mod tests { + use std::{ + collections::VecDeque, + net::IpAddr, + sync::{Arc, Mutex}, + }; + + use satrs::{ + spacepackets::{ + ecss::{tc::PusTcCreator, WritablePusPacket}, + SpHeader, + }, + tmtc::ReceivesTcCore, + }; + use satrs_example::config::{components, OBSW_SERVER_ADDR}; + + use super::*; + + #[derive(Default, Debug, Clone)] + pub struct TestReceiver { + tc_vec: Arc>>>, + } + + impl ReceivesTcCore for TestReceiver { + type Error = CcsdsError<()>; + fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> { + self.tc_vec.lock().unwrap().push_back(tc_raw.to_vec()); + Ok(()) + } + } + + #[derive(Default, Debug, Clone)] + pub struct TestTmHandler { + addrs_to_send_to: Arc>>, + } + + impl UdpTmHandler for TestTmHandler { + fn send_tm_to_udp_client(&mut self, _socket: &UdpSocket, recv_addr: &SocketAddr) { + self.addrs_to_send_to.lock().unwrap().push_back(*recv_addr); + } + } + + #[test] + fn test_basic() { + let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), 0); + let test_receiver = TestReceiver::default(); + let tc_queue = test_receiver.tc_vec.clone(); + let udp_tc_server = UdpTcServer::new(sock_addr, 2048, Box::new(test_receiver)).unwrap(); + let tm_handler = TestTmHandler::default(); + let tm_handler_calls = tm_handler.addrs_to_send_to.clone(); + let mut udp_dyn_server = UdpTmtcServer { + udp_tc_server, + tm_handler, + }; + udp_dyn_server.periodic_operation(); + assert!(tc_queue.lock().unwrap().is_empty()); + assert!(tm_handler_calls.lock().unwrap().is_empty()); + } + + #[test] + fn test_transactions() { + let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), 0); + let test_receiver = TestReceiver::default(); + let tc_queue = test_receiver.tc_vec.clone(); + let udp_tc_server = UdpTcServer::new(sock_addr, 2048, Box::new(test_receiver)).unwrap(); + let server_addr = udp_tc_server.socket.local_addr().unwrap(); + let tm_handler = TestTmHandler::default(); + let tm_handler_calls = tm_handler.addrs_to_send_to.clone(); + let mut udp_dyn_server = UdpTmtcServer { + udp_tc_server, + tm_handler, + }; + let sph = SpHeader::new_for_unseg_tc(components::Apid::GenericPus as u16, 0, 0); + let ping_tc = PusTcCreator::new_simple(sph, 17, 1, &[], true) + .to_vec() + .unwrap(); + let client = UdpSocket::bind("127.0.0.1:0").expect("Connecting to UDP server failed"); + let client_addr = client.local_addr().unwrap(); + client.connect(server_addr).unwrap(); + client.send(&ping_tc).unwrap(); + udp_dyn_server.periodic_operation(); + { + let mut tc_queue = tc_queue.lock().unwrap(); + assert!(!tc_queue.is_empty()); + let received_tc = tc_queue.pop_front().unwrap(); + assert_eq!(received_tc, ping_tc); + } + + { + let mut tm_handler_calls = tm_handler_calls.lock().unwrap(); + assert!(!tm_handler_calls.is_empty()); + assert_eq!(tm_handler_calls.len(), 1); + let received_addr = tm_handler_calls.pop_front().unwrap(); + assert_eq!(received_addr, client_addr); + } + udp_dyn_server.periodic_operation(); + assert!(tc_queue.lock().unwrap().is_empty()); + // Still tries to send to the same client. + { + let mut tm_handler_calls = tm_handler_calls.lock().unwrap(); + assert!(!tm_handler_calls.is_empty()); + assert_eq!(tm_handler_calls.len(), 1); + let received_addr = tm_handler_calls.pop_front().unwrap(); + assert_eq!(received_addr, client_addr); + } + } +}