merge main

This commit is contained in:
lkoester
2024-04-22 15:55:47 +02:00
19 changed files with 1424 additions and 206 deletions

View File

@ -8,7 +8,8 @@ use std::net::Ipv4Addr;
use std::path::{Path, PathBuf};
pub const STOP_FILE_NAME: &str = "stop-experiment";
pub const HOME_FOLDER_EXPERIMENT: &str = "/home/exp278";
pub const CONFIG_FILE_NAME: &str = "exp278.toml";
pub const HOME_FOLER_EXPERIMENT: &str = "/home/exp278";
pub const LOG_FOLDER: &str = "logs";
pub const OBSW_SERVER_ADDR: Ipv4Addr = Ipv4Addr::UNSPECIFIED;
@ -21,7 +22,8 @@ pub const VALID_PACKET_ID_LIST: &[PacketId] = &[PacketId::new_for_tc(true, EXPER
// TODO: Would be nice if this can be commanded as well..
/// Can be enabled to print all SPP packets received from the SPP server on port 4096.
pub const SPP_CLIENT_WIRETAPPING_RX: bool = false;
pub const SPP_CLIENT_WIRETAPPING_RX: bool = true;
pub const SPP_CLIENT_WIRETAPPING_TX: bool = true;
#[derive(Copy, Clone, PartialEq, Eq, Debug, TryFromPrimitive, IntoPrimitive)]
#[repr(u8)]
@ -52,6 +54,80 @@ lazy_static! {
};
}
pub mod cfg_file {
use std::{
fs::File,
io::Read,
path::{Path, PathBuf},
};
use super::{CONFIG_FILE_NAME, HOME_PATH, TCP_SPP_SERVER_PORT};
pub const SPP_CLIENT_PORT_CFG_KEY: &str = "tcp_spp_server_port";
#[derive(Debug)]
pub struct AppCfg {
pub tcp_spp_server_port: u16,
}
impl Default for AppCfg {
fn default() -> Self {
Self {
tcp_spp_server_port: TCP_SPP_SERVER_PORT,
}
}
}
pub fn create_app_config() -> AppCfg {
let mut cfg_path = HOME_PATH.clone();
cfg_path.push(CONFIG_FILE_NAME);
let cfg_path_home = cfg_path.as_path();
let relevant_path = if Path::new(CONFIG_FILE_NAME).exists() {
Some(PathBuf::from(Path::new(CONFIG_FILE_NAME)))
} else if cfg_path_home.exists() {
Some(PathBuf::from(cfg_path_home))
} else {
None
};
let mut app_cfg = AppCfg::default();
if relevant_path.is_none() {
log::warn!("No config file found, using default values");
return app_cfg;
}
let relevant_path = relevant_path.unwrap();
match File::open(relevant_path.as_path()) {
Ok(mut file) => {
let mut toml_str = String::new();
match file.read_to_string(&mut toml_str) {
Ok(_size) => match toml_str.parse::<toml::Table>() {
Ok(table) => {
handle_config_file_table(table, &mut app_cfg);
}
Err(e) => log::error!("error parsing TOML config file: {e}"),
},
Err(e) => log::error!("error reading TOML config file: {e}"),
}
}
Err(e) => log::error!("error opening TOML config file: {e}"),
}
app_cfg
}
#[allow(clippy::collapsible_match)]
pub fn handle_config_file_table(table: toml::Table, app_cfg: &mut AppCfg) {
if let Some(value) = table.get(SPP_CLIENT_PORT_CFG_KEY) {
if let toml::Value::Integer(port) = value {
if *port < 0 {
log::warn!("invalid port value, is negative");
} else {
app_cfg.tcp_spp_server_port = *port as u16
}
}
}
}
}
pub mod tmtc_err {
use super::*;
use satrs::res_code::ResultU16;
@ -144,11 +220,14 @@ pub mod components {
}
pub mod tasks {
use std::time::Duration;
pub const FREQ_MS_UDP_TMTC: u64 = 200;
pub const FREQ_MS_EVENT_HANDLING: u64 = 400;
pub const FREQ_MS_AOCS: u64 = 500;
pub const FREQ_MS_PUS_STACK: u64 = 200;
pub const FREQ_MS_CTRL: u64 = 400;
pub const STOP_CHECK_FREQUENCY: u64 = 400;
pub const STOP_CHECK_FREQUENCY_MS: u64 = 400;
pub const STOP_CHECK_FREQUENCY: Duration = Duration::from_millis(STOP_CHECK_FREQUENCY_MS);
}

View File

@ -177,10 +177,20 @@ impl IMS100BatchHandler {
self.handle_hk_request(&msg.requestor_info, hk_request);
}
CompositeRequest::Action(action_request) => {
self.handle_action_request(&msg.requestor_info, action_request).unwrap();
if let Err(e) =
self.handle_action_request(&msg.requestor_info, action_request)
{
log::warn!("camera action request IO error: {e}");
}
}
},
Err(e) => match e {
mpsc::TryRecvError::Empty => break,
mpsc::TryRecvError::Disconnected => {
log::warn!("composite request receiver disconnected");
break;
}
},
Err(_) => {}
}
}
}

View File

@ -1,7 +1,6 @@
use std::{
collections::VecDeque,
sync::{atomic::AtomicBool, mpsc, Arc, Mutex},
time::Duration,
};
use log::{info, warn};
@ -113,9 +112,7 @@ impl TcpTask {
}
pub fn periodic_operation(&mut self) {
let result = self
.0
.handle_all_connections(Some(Duration::from_millis(STOP_CHECK_FREQUENCY)));
let result = self.0.handle_all_connections(Some(STOP_CHECK_FREQUENCY));
match result {
Ok(_conn_result) => (),
Err(e) => {

View File

@ -1,12 +1,13 @@
use std::io::{self, Read, Write};
use std::io::{self, Read};
use std::net::TcpStream as StdTcpStream;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::mpsc;
use std::time::Duration;
use mio::net::TcpStream;
use mio::net::TcpStream as MioTcpStream;
use mio::{Events, Interest, Poll, Token};
use ops_sat_rs::config::tasks::STOP_CHECK_FREQUENCY;
use ops_sat_rs::config::{SPP_CLIENT_WIRETAPPING_RX, TCP_SPP_SERVER_PORT};
use ops_sat_rs::config::{SPP_CLIENT_WIRETAPPING_RX, SPP_CLIENT_WIRETAPPING_TX};
use satrs::encoding::ccsds::parse_buffer_for_ccsds_space_packets;
use satrs::queue::GenericSendError;
use satrs::spacepackets::PacketId;
@ -17,19 +18,22 @@ use thiserror::Error;
use super::{SimpleSpValidator, TcpComponent};
#[derive(Debug, Error)]
pub enum PacketForwardingError {
pub enum ClientError {
#[error("send error: {0}")]
Send(#[from] GenericSendError),
#[error("io error: {0}")]
Io(#[from] io::Error),
}
pub struct TcpSppClient {
#[derive(Debug)]
pub enum ClientResult {
Ok,
ConnectionLost,
}
#[allow(dead_code)]
pub struct TcpSppClientCommon {
id: ComponentId,
poll: Poll,
events: Events,
// Optional to allow periodic reconnection attempts on the TCP server.
client: Option<TcpStream>,
read_buf: [u8; 4096],
tm_tcp_client_rx: mpsc::Receiver<PacketAsVec>,
server_addr: SocketAddr,
@ -37,143 +41,13 @@ pub struct TcpSppClient {
validator: SimpleSpValidator,
}
impl TcpSppClient {
pub fn new(
id: ComponentId,
tc_source_tx: mpsc::Sender<PacketAsVec>,
tm_tcp_client_rx: mpsc::Receiver<PacketAsVec>,
valid_ids: &'static [PacketId],
) -> io::Result<Self> {
let mut poll = Poll::new()?;
let events = Events::with_capacity(128);
let server_addr =
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 1)), TCP_SPP_SERVER_PORT);
let client = Self::attempt_connection(&mut poll, &server_addr);
if client.is_err() {
log::warn!(
"connection to TCP server {} failed: {}",
server_addr,
client.unwrap_err()
);
return Ok(Self {
id,
poll,
events,
client: None,
read_buf: [0; 4096],
server_addr,
tm_tcp_client_rx,
tc_source_tx,
validator: SimpleSpValidator::new(TcpComponent::Client, valid_ids.to_vec()),
});
}
Ok(Self {
id,
poll,
events,
client: Some(client.unwrap()),
read_buf: [0; 4096],
server_addr,
tm_tcp_client_rx,
tc_source_tx,
validator: SimpleSpValidator::new(TcpComponent::Client, valid_ids.to_vec()),
})
}
pub fn attempt_connection(poll: &mut Poll, server_addr: &SocketAddr) -> io::Result<TcpStream> {
let mut client = TcpStream::connect(*server_addr)?;
poll.registry().register(
&mut client,
Token(0),
Interest::READABLE | Interest::WRITABLE,
)?;
Ok(client)
}
pub fn periodic_operation(&mut self) -> Result<(), PacketForwardingError> {
if self.client.is_some() {
return self.perform_regular_operation();
} else {
let client_result = Self::attempt_connection(&mut self.poll, &self.server_addr);
match client_result {
Ok(client) => {
self.client = Some(client);
self.perform_regular_operation()?;
}
Err(ref e) => {
log::warn!(
"connection to TCP server {} failed: {}",
self.server_addr,
e
);
}
}
}
Ok(())
}
pub fn perform_regular_operation(&mut self) -> Result<(), PacketForwardingError> {
self.poll.poll(
&mut self.events,
Some(Duration::from_millis(STOP_CHECK_FREQUENCY)),
)?;
let events: Vec<mio::event::Event> = self.events.iter().cloned().collect();
for event in events {
if event.token() == Token(0) {
if event.is_readable() {
self.read_from_server()?;
}
if event.is_writable() {
self.write_to_server()?;
}
}
}
Ok(())
}
pub fn read_from_server(&mut self) -> Result<(), PacketForwardingError> {
let client = self
.client
.as_mut()
.expect("TCP stream invalid when it should not be");
match client.read(&mut self.read_buf) {
Ok(0) => return Err(io::Error::from(io::ErrorKind::BrokenPipe).into()),
Ok(read_bytes) => self.handle_read_bytstream(read_bytes)?,
Err(e) => return Err(e.into()),
}
Ok(())
}
pub fn write_to_server(&mut self) -> io::Result<()> {
let client = self
.client
.as_mut()
.expect("TCP stream invalid when it should not be");
loop {
match self.tm_tcp_client_rx.try_recv() {
Ok(tm) => {
client.write_all(&tm.packet)?;
}
Err(e) => match e {
mpsc::TryRecvError::Empty => break,
mpsc::TryRecvError::Disconnected => {
log::error!("TM sender to TCP client has disconnected");
break;
}
},
}
}
Ok(())
}
pub fn handle_read_bytstream(
&mut self,
read_bytes: usize,
) -> Result<(), PacketForwardingError> {
#[allow(dead_code)]
impl TcpSppClientCommon {
pub fn handle_read_bytstream(&mut self, read_bytes: usize) -> Result<(), ClientError> {
let mut dummy = 0;
if SPP_CLIENT_WIRETAPPING_RX {
log::debug!(
"received {} bytes on TCP client: {:x?}",
"SPP TCP RX {} bytes: {:x?}",
read_bytes,
&self.read_buf[..read_bytes]
);
@ -188,4 +62,604 @@ impl TcpSppClient {
)?;
Ok(())
}
pub fn write_to_server(&mut self, client: &mut impl io::Write) -> io::Result<()> {
loop {
match self.tm_tcp_client_rx.try_recv() {
Ok(tm) => {
if SPP_CLIENT_WIRETAPPING_TX {
log::debug!(
"SPP TCP TX {}: {:x?}",
tm.packet.len(),
tm.packet.as_slice()
);
}
client.write_all(&tm.packet)?;
}
Err(e) => match e {
mpsc::TryRecvError::Empty => break,
mpsc::TryRecvError::Disconnected => {
println!("god fuckikng damn it");
log::error!("TM sender to TCP client has disconnected");
break;
}
},
}
}
Ok(())
}
}
pub struct TcpSppClientStd {
common: TcpSppClientCommon,
read_and_idle_delay: Duration,
// Optional to allow periodic reconnection attempts on the TCP server.
stream: Option<StdTcpStream>,
}
impl TcpSppClientStd {
pub fn new(
id: ComponentId,
tc_source_tx: mpsc::Sender<PacketAsVec>,
tm_tcp_client_rx: mpsc::Receiver<PacketAsVec>,
valid_ids: &'static [PacketId],
read_timeout: Duration,
port: u16,
) -> io::Result<Self> {
let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port);
let mut client = Self {
common: TcpSppClientCommon {
id,
read_buf: [0; 4096],
tm_tcp_client_rx,
server_addr,
tc_source_tx,
validator: SimpleSpValidator::new(TcpComponent::Client, valid_ids.to_vec()),
},
read_and_idle_delay: read_timeout,
stream: None,
};
client.attempt_connect(true)?;
Ok(client)
}
pub fn attempt_connect(&mut self, log_error: bool) -> io::Result<bool> {
Ok(match StdTcpStream::connect(self.common.server_addr) {
Ok(stream) => {
stream.set_read_timeout(Some(self.read_and_idle_delay))?;
self.stream = Some(stream);
true
}
Err(e) => {
if log_error {
log::warn!("error connecting to server: {}", e);
}
false
}
})
}
#[allow(dead_code)]
pub fn connected(&self) -> bool {
self.stream.is_some()
}
pub fn operation(&mut self) -> Result<ClientResult, ClientError> {
if let Some(client) = &mut self.stream {
// Write TM first before blocking on the read call.
self.common.write_to_server(client)?;
match client.read(&mut self.common.read_buf) {
// Not sure if this can happen or this is actually an error condition..
Ok(0) => {
log::info!("server closed connection");
self.stream = None;
return Ok(ClientResult::ConnectionLost);
}
Ok(read_bytes) => self.common.handle_read_bytstream(read_bytes)?,
Err(e) => {
if e.kind() == io::ErrorKind::WouldBlock || e.kind() == io::ErrorKind::TimedOut
{
self.common.write_to_server(client)?;
return Ok(ClientResult::ConnectionLost);
}
log::warn!("server error: {e:?}");
if e.kind() == io::ErrorKind::ConnectionReset {
self.stream = None;
return Ok(ClientResult::ConnectionLost);
}
return Err(e.into());
}
}
} else {
if self.attempt_connect(false)? {
log::info!("reconnected to server succesfully");
return self.operation();
}
std::thread::sleep(self.read_and_idle_delay);
}
Ok(ClientResult::Ok)
}
}
#[derive(Debug, PartialEq, Eq)]
pub enum ConnectionStatus {
Unknown,
Connected,
LostConnection,
TryingReconnect,
}
/// Currently not used, not behaving as expected..
#[allow(dead_code)]
pub struct TcpSppClientMio {
common: TcpSppClientCommon,
poll: Poll,
events: Events,
// Optional to allow periodic reconnection attempts on the TCP server.
client: Option<MioTcpStream>,
connection: ConnectionStatus,
}
#[allow(dead_code)]
impl TcpSppClientMio {
pub fn new(
id: ComponentId,
tc_source_tx: mpsc::Sender<PacketAsVec>,
tm_tcp_client_rx: mpsc::Receiver<PacketAsVec>,
valid_ids: &'static [PacketId],
port: u16,
) -> io::Result<Self> {
let poll = Poll::new()?;
let events = Events::with_capacity(128);
let mut client = Self {
common: TcpSppClientCommon {
id,
read_buf: [0; 4096],
tm_tcp_client_rx,
server_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port),
tc_source_tx,
validator: SimpleSpValidator::new(TcpComponent::Client, valid_ids.to_vec()),
},
poll,
events,
client: None,
connection: ConnectionStatus::Unknown,
};
client.connect()?;
Ok(client)
}
pub fn connect(&mut self) -> io::Result<()> {
let mut client = MioTcpStream::connect(self.common.server_addr)?;
self.poll.registry().register(
&mut client,
Token(0),
Interest::READABLE | Interest::WRITABLE,
)?;
self.client = Some(client);
self.connection = ConnectionStatus::TryingReconnect;
Ok(())
}
pub fn operation(&mut self) -> Result<(), ClientError> {
match self.connection {
ConnectionStatus::TryingReconnect | ConnectionStatus::Unknown => {
self.check_conn_status()?
}
ConnectionStatus::Connected => {
self.check_conn_status()?;
self.poll
.poll(&mut self.events, Some(STOP_CHECK_FREQUENCY))?;
let events: Vec<mio::event::Event> = self.events.iter().cloned().collect();
for event in events {
if event.token() == Token(0) {
if event.is_readable() {
self.read_from_server()?;
}
// For some reason, we only get this once..
if event.is_writable() {
self.common.write_to_server(self.client.as_mut().unwrap())?;
}
}
}
return Ok(());
}
ConnectionStatus::LostConnection => self.connect()?,
};
std::thread::sleep(STOP_CHECK_FREQUENCY);
Ok(())
}
pub fn read_from_server(&mut self) -> Result<(), ClientError> {
match self
.client
.as_mut()
.unwrap()
.read(&mut self.common.read_buf)
{
Ok(0) => (),
Ok(read_bytes) => self.common.handle_read_bytstream(read_bytes)?,
Err(e) => return Err(e.into()),
}
Ok(())
}
pub fn check_conn_status(&mut self) -> io::Result<()> {
match self.client.as_mut().unwrap().peer_addr() {
Ok(_) => {
if self.connection == ConnectionStatus::Unknown
|| self.connection == ConnectionStatus::TryingReconnect
{
self.connection = ConnectionStatus::Connected;
}
Ok(())
}
Err(e) => {
if e.kind() == io::ErrorKind::NotConnected {
log::warn!("lost connection, or do not have one");
self.connection = ConnectionStatus::LostConnection;
return Ok(());
}
Err(e)
}
}
}
}
#[cfg(test)]
mod tests {
use ops_sat_rs::config::EXPERIMENT_APID;
use satrs::spacepackets::{PacketSequenceCtrl, PacketType, SequenceFlags, SpHeader};
use std::{
io::Write,
net::{TcpListener, TcpStream},
sync::{atomic::AtomicBool, Arc},
thread,
time::Duration,
};
use super::*;
const VALID_IDS: &[PacketId] = &[PacketId::new_for_tc(true, EXPERIMENT_APID)];
const TEST_TC: SpHeader = SpHeader::new(
PacketId::new(PacketType::Tc, true, EXPERIMENT_APID),
PacketSequenceCtrl::new(SequenceFlags::Unsegmented, 0),
1,
);
const TEST_TM: SpHeader = SpHeader::new(
PacketId::new(PacketType::Tm, true, EXPERIMENT_APID),
PacketSequenceCtrl::new(SequenceFlags::Unsegmented, 0),
1,
);
fn init() {
let _ = env_logger::builder().is_test(true).try_init();
}
struct TcpServerTestbench {
tcp_server: TcpListener,
}
impl TcpServerTestbench {
fn new(port: u16) -> Self {
let tcp_server =
TcpListener::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port)).unwrap();
tcp_server
.set_nonblocking(true)
.expect("setting TCP server non-blocking failed");
Self { tcp_server }
}
fn local_addr(&self) -> SocketAddr {
self.tcp_server.local_addr().unwrap()
}
fn check_for_connections(&mut self, limit: u32) -> Result<TcpStream, ()> {
for _ in 0..limit {
match self.tcp_server.accept() {
Ok((stream, _)) => {
return Ok(stream);
}
Err(e) => {
if e.kind() == io::ErrorKind::WouldBlock {
thread::sleep(Duration::from_millis(10));
continue;
}
panic!("TCP server accept error: {:?}", e);
}
}
}
Err(())
}
fn try_reading_one_packet(
&mut self,
stream: &mut TcpStream,
limit: u32,
read_buf: &mut [u8],
) -> usize {
let mut read_data = 0;
for _ in 0..limit {
match stream.read(read_buf) {
Ok(0) => {}
Ok(len) => {
// assert_eq!(&tm_buf, &read_buf[0..len]);
// read_bufd_expected_data = true;
read_data = len;
break;
}
Err(e) => {
if e.kind() == io::ErrorKind::WouldBlock {
continue;
}
panic!("TCP server read error: {:?}", e);
}
}
if read_data > 0 {
break;
}
}
read_data
}
}
// This test just simplifies that the client properly connects to a server.
#[test]
fn basic_client_test() {
let (tc_source_tx, _tc_source_rx) = mpsc::channel();
let (_tm_tcp_client_tx, tm_tcp_client_rx) = mpsc::channel();
let mut tcp_server = TcpServerTestbench::new(0);
let local_addr = tcp_server.local_addr();
let jh0 = thread::spawn(move || {
tcp_server
.check_for_connections(3)
.expect("no client connection detected");
});
let mut spp_client = TcpSppClientStd::new(
1,
tc_source_tx,
tm_tcp_client_rx,
VALID_IDS,
Duration::from_millis(30),
local_addr.port(),
)
.expect("creating TCP SPP client failed");
spp_client.operation().unwrap();
jh0.join().unwrap();
}
// This test verifies that TM is sent to the server properly.
#[test]
fn basic_client_tm_test() {
let (tc_source_tx, _tc_source_rx) = mpsc::channel();
let (tm_tcp_client_tx, tm_tcp_client_rx) = mpsc::channel();
let mut tcp_server = TcpServerTestbench::new(0);
let local_addr = tcp_server.local_addr();
let mut buf: [u8; 7] = [0; 7];
TEST_TM
.write_to_be_bytes(&mut buf)
.expect("writing TM failed");
let jh0 = thread::spawn(move || {
let mut read_buf: [u8; 64] = [0; 64];
let mut stream = tcp_server
.check_for_connections(3)
.expect("no client connection detected");
stream
.set_read_timeout(Some(Duration::from_millis(10)))
.expect("setting read timeout failed");
let read_bytes = tcp_server.try_reading_one_packet(&mut stream, 5, &mut read_buf);
if read_bytes == 0 {
panic!("did not receive expected data");
} else {
assert_eq!(&buf, &read_buf[0..read_bytes]);
}
});
tm_tcp_client_tx
.send(PacketAsVec::new(0, buf.to_vec()))
.unwrap();
let mut spp_client = TcpSppClientStd::new(
1,
tc_source_tx,
tm_tcp_client_rx,
VALID_IDS,
Duration::from_millis(30),
local_addr.port(),
)
.expect("creating TCP SPP client failed");
spp_client.operation().unwrap();
jh0.join().unwrap();
}
// Test that the client can read telecommands from the server.
#[test]
fn basic_client_tc_test() {
let (tc_source_tx, tc_source_rx) = mpsc::channel();
let (_tm_tcp_client_tx, tm_tcp_client_rx) = mpsc::channel();
let mut tcp_server = TcpServerTestbench::new(0);
let local_addr = tcp_server.local_addr();
let mut buf: [u8; 8] = [0; 8];
TEST_TC
.write_to_be_bytes(&mut buf)
.expect("writing TM failed");
let jh0 = thread::spawn(move || {
let mut stream = tcp_server
.check_for_connections(3)
.expect("no client connection detected");
stream
.set_read_timeout(Some(Duration::from_millis(10)))
.expect("setting read timeout failed");
stream.write_all(&buf).expect("writing TC failed");
});
let mut spp_client = TcpSppClientStd::new(
1,
tc_source_tx,
tm_tcp_client_rx,
VALID_IDS,
Duration::from_millis(30),
local_addr.port(),
)
.expect("creating TCP SPP client failed");
assert!(spp_client.connected());
let mut received_packet = false;
(0..3).for_each(|_| {
spp_client.operation().unwrap();
if let Ok(packet) = tc_source_rx.try_recv() {
assert_eq!(packet.packet, buf.to_vec());
received_packet = true;
}
});
if !received_packet {
panic!("did not receive expected data");
}
jh0.join().unwrap();
}
// Test that the client can both read telecommands from the server and send back
// telemetry to the server.
#[test]
fn basic_client_tmtc_test() {
let (tc_source_tx, tc_source_rx) = mpsc::channel();
let (tm_tcp_client_tx, tm_tcp_client_rx) = mpsc::channel();
let mut tcp_server = TcpServerTestbench::new(0);
let local_addr = tcp_server.local_addr();
let mut tc_buf: [u8; 8] = [0; 8];
let mut tm_buf: [u8; 8] = [0; 8];
TEST_TC
.write_to_be_bytes(&mut tc_buf)
.expect("writing TM failed");
TEST_TM
.write_to_be_bytes(&mut tm_buf)
.expect("writing TM failed");
let jh0 = thread::spawn(move || {
let mut read_buf: [u8; 64] = [0; 64];
let mut stream = tcp_server
.check_for_connections(3)
.expect("no client connection detected");
stream
.set_read_timeout(Some(Duration::from_millis(10)))
.expect("setting read timeout failed");
stream.write_all(&tc_buf).expect("writing TC failed");
let read_bytes = tcp_server.try_reading_one_packet(&mut stream, 5, &mut read_buf);
if read_bytes == 0 {
panic!("did not receive expected data");
} else {
assert_eq!(&tm_buf, &read_buf[0..read_bytes]);
}
});
tm_tcp_client_tx
.send(PacketAsVec::new(0, tm_buf.to_vec()))
.unwrap();
let mut spp_client = TcpSppClientStd::new(
1,
tc_source_tx,
tm_tcp_client_rx,
VALID_IDS,
Duration::from_millis(30),
local_addr.port(),
)
.expect("creating TCP SPP client failed");
assert!(spp_client.connected());
let mut received_packet = false;
(0..3).for_each(|_| {
spp_client.operation().unwrap();
if let Ok(packet) = tc_source_rx.try_recv() {
assert_eq!(packet.packet, tc_buf.to_vec());
received_packet = true;
}
});
if !received_packet {
panic!("did not receive expected data");
}
jh0.join().unwrap();
}
#[test]
fn test_broken_connection() {
init();
let (tc_source_tx, _tc_source_rx) = mpsc::channel();
let (tm_tcp_client_tx, tm_tcp_client_rx) = mpsc::channel();
let mut tcp_server = TcpServerTestbench::new(0);
let local_port = tcp_server.local_addr().port();
let drop_signal = Arc::new(AtomicBool::new(false));
let drop_signal_0 = drop_signal.clone();
let mut tc_buf: [u8; 8] = [0; 8];
let mut tm_buf: [u8; 8] = [0; 8];
TEST_TC
.write_to_be_bytes(&mut tc_buf)
.expect("writing TM failed");
TEST_TM
.write_to_be_bytes(&mut tm_buf)
.expect("writing TM failed");
let mut jh0 = thread::spawn(move || {
tcp_server
.check_for_connections(3)
.expect("no client connection detected");
drop_signal_0.store(true, std::sync::atomic::Ordering::Relaxed);
});
let mut spp_client = TcpSppClientStd::new(
1,
tc_source_tx,
tm_tcp_client_rx,
VALID_IDS,
Duration::from_millis(30),
local_port,
)
.expect("creating TCP SPP client failed");
while !drop_signal.load(std::sync::atomic::Ordering::Relaxed) {
std::thread::sleep(Duration::from_millis(100));
}
tm_tcp_client_tx
.send(PacketAsVec::new(0, tm_buf.to_vec()))
.unwrap();
match spp_client.operation() {
Ok(ClientResult::ConnectionLost) => (),
Ok(ClientResult::Ok) => {
panic!("expected operation error");
}
Err(ClientError::Io(e)) => {
println!("io error: {:?}", e);
if e.kind() != io::ErrorKind::ConnectionReset
&& e.kind() != io::ErrorKind::ConnectionAborted
{
panic!("expected some disconnet error");
}
}
_ => {
panic!("unexpected error")
}
};
assert!(!spp_client.connected());
jh0.join().unwrap();
// spp_client.operation();
tcp_server = TcpServerTestbench::new(local_port);
tm_tcp_client_tx
.send(PacketAsVec::new(0, tm_buf.to_vec()))
.unwrap();
jh0 = thread::spawn(move || {
let mut stream = tcp_server
.check_for_connections(3)
.expect("no client connection detected");
let mut read_buf: [u8; 64] = [0; 64];
let read_bytes = tcp_server.try_reading_one_packet(&mut stream, 5, &mut read_buf);
if read_bytes == 0 {
panic!("did not receive expected data");
} else {
assert_eq!(&tm_buf, &read_buf[0..read_bytes]);
}
});
let result = spp_client.operation();
println!("{:?}", result);
assert!(!spp_client.connected());
assert!(result.is_ok());
jh0.join().unwrap();
}
}

View File

@ -8,8 +8,9 @@ use std::{
use log::info;
use ops_sat_rs::config::components::CAMERA_HANDLER;
use ops_sat_rs::config::{
cfg_file::create_app_config,
components::{CONTROLLER_ID, TCP_SERVER, TCP_SPP_CLIENT, UDP_SERVER},
tasks::{FREQ_MS_CTRL, FREQ_MS_PUS_STACK},
tasks::{FREQ_MS_CTRL, FREQ_MS_PUS_STACK, STOP_CHECK_FREQUENCY},
VALID_PACKET_ID_LIST,
};
use ops_sat_rs::config::{tasks::FREQ_MS_UDP_TMTC, OBSW_SERVER_ADDR, SERVER_PORT};
@ -17,7 +18,7 @@ use satrs::hal::std::{tcp_server::ServerConfig, udp_server::UdpTcServer};
use ops_sat_rs::TimeStampHelper;
use crate::handlers::camera::IMS100BatchHandler;
use crate::tmtc::tc_source::TcSourceTaskDynamic;
use crate::pus::{PusTcDistributor, PusTcMpscRouter};
use crate::tmtc::tm_sink::TmFunnelDynamic;
use crate::{controller::ExperimentController, pus::test::create_test_service};
use crate::{
@ -25,10 +26,7 @@ use crate::{
interface::udp_server::{DynamicUdpTmHandler, UdpTmtcServer},
logger::setup_logger,
};
use crate::{
interface::tcp_spp_client::TcpSppClient,
pus::{PusTcDistributor, PusTcMpscRouter},
};
use crate::{interface::tcp_spp_client::TcpSppClientStd, tmtc::tc_source::TcSourceTaskDynamic};
use crate::{
pus::{action::create_action_service, stack::PusStack},
requests::GenericRequestRouter,
@ -46,6 +44,9 @@ fn main() {
setup_logger().expect("setting up logging with fern failed");
println!("OPS-SAT Rust Experiment OBSW");
let app_cfg = create_app_config();
println!("App Configuration: {:?}", app_cfg);
let stop_signal = Arc::new(AtomicBool::new(false));
let (tc_source_tx, tc_source_rx) = mpsc::channel();
@ -172,11 +173,13 @@ fn main() {
stop_signal.clone(),
);
let mut tcp_spp_client = TcpSppClient::new(
let mut tcp_spp_client = TcpSppClientStd::new(
TCP_SPP_CLIENT.id(),
tc_source_tx,
tm_tcp_client_rx,
VALID_PACKET_ID_LIST,
STOP_CHECK_FREQUENCY,
app_cfg.tcp_spp_server_port,
)
.expect("creating TCP SPP client failed");
@ -245,7 +248,10 @@ fn main() {
.spawn(move || {
info!("Running TCP SPP client");
loop {
let _result = tcp_spp_client.periodic_operation();
let result = tcp_spp_client.operation();
if let Err(e) = result {
log::error!("TCP SPP client error: {}", e);
}
if tcp_client_stop_signal.load(std::sync::atomic::Ordering::Relaxed) {
break;
}

View File

@ -432,7 +432,8 @@ mod tests {
.send(EcssTcAndToken::new(
TcInMemory::Vec(PacketAsVec::new(
self.service.service_helper.id(),
tc.to_vec().unwrap().into(),
//tc.to_vec().unwrap().into(),
tc.to_vec().unwrap(),
)),
accepted_token,
))

View File

@ -42,7 +42,6 @@ pub struct TestCustomServiceWrapper {
EcssTcInVecConverter,
VerificationReporter,
>,
// pub test_srv_event_sender: mpsc::Sender<EventMessageU32>,
}
impl TestCustomServiceWrapper {

View File

@ -1,6 +1,6 @@
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::{collections::HashMap, sync::mpsc, time::Duration};
use std::{collections::HashMap, sync::mpsc};
use log::info;
use ops_sat_rs::config::tasks::STOP_CHECK_FREQUENCY;
@ -101,10 +101,7 @@ impl TmFunnelDynamic {
pub fn operation(&mut self) {
loop {
match self
.tm_funnel_rx
.recv_timeout(Duration::from_millis(STOP_CHECK_FREQUENCY))
{
match self.tm_funnel_rx.recv_timeout(STOP_CHECK_FREQUENCY) {
Ok(mut tm) => {
// Read the TM, set sequence counter and message counter, and finally update
// the CRC.