3 Commits

Author SHA1 Message Date
22ba6be780 small improvement
All checks were successful
Rust/sat-rs/pipeline/pr-main This commit looks good
2024-03-08 16:38:12 +01:00
2679815c28 added some more tests
All checks were successful
Rust/sat-rs/pipeline/pr-main This commit looks good
2024-03-08 16:36:37 +01:00
55df55a39c First version of asynchronix based mini simulator
All checks were successful
Rust/sat-rs/pipeline/head This commit looks good
2024-03-07 17:19:16 +01:00
9 changed files with 245 additions and 794 deletions

3
.gitignore vendored
View File

@ -1,5 +1,4 @@
target/ /target
/Cargo.lock /Cargo.lock
/.idea/* /.idea/*

View File

@ -18,19 +18,15 @@ def generate_cov_report(open_report: bool, format: str, package: str):
out_path = "./target/debug/coverage" out_path = "./target/debug/coverage"
if format == "lcov": if format == "lcov":
out_path = "./target/debug/lcov.info" out_path = "./target/debug/lcov.info"
grcov_cmd = ( os.system(
f"grcov . -s . --binary-path ./target/debug/ -t {format} --branch --ignore-not-existing " f"grcov . -s . --binary-path ./target/debug/ -t {format} --branch --ignore-not-existing "
f"-o {out_path}" f"-o {out_path}"
) )
print(f"Running: {grcov_cmd}")
os.system(grcov_cmd)
if format == "lcov": if format == "lcov":
lcov_cmd = ( os.system(
"genhtml -o ./target/debug/coverage/ --show-details --highlight --ignore-errors source " "genhtml -o ./target/debug/coverage/ --show-details --highlight --ignore-errors source "
"--legend ./target/debug/lcov.info" "--legend ./target/debug/lcov.info"
) )
print(f"Running: {lcov_cmd}")
os.system(lcov_cmd)
if open_report: if open_report:
coverage_report_path = os.path.abspath("./target/debug/coverage/index.html") coverage_report_path = os.path.abspath("./target/debug/coverage/index.html")
webbrowser.open_new_tab(coverage_report_path) webbrowser.open_new_tab(coverage_report_path)
@ -47,7 +43,7 @@ def main():
parser.add_argument( parser.add_argument(
"-p", "-p",
"--package", "--package",
choices=["satrs", "satrs-minisim"], choices=["satrs"],
default="satrs", default="satrs",
help="Choose project to generate coverage for", help="Choose project to generate coverage for",
) )

View File

@ -9,7 +9,6 @@ edition = "2021"
serde = { version = "1", features = ["derive"] } serde = { version = "1", features = ["derive"] }
serde_json = "1" serde_json = "1"
log = "0.4" log = "0.4"
thiserror = "1"
[dependencies.asynchronix] [dependencies.asynchronix]
version = "0.2.1" version = "0.2.1"

View File

@ -6,8 +6,8 @@ use asynchronix::{
}; };
use satrs::power::SwitchStateBinary; use satrs::power::SwitchStateBinary;
use satrs_minisim::{ use satrs_minisim::{
acs::{MgmReply, MgmSensorValues, MgtDipole, MgtHkSet, MgtReply, MGT_GEN_MAGNETIC_FIELD}, acs::{MgmSensorValues, MgtDipole, MgtReply, MGT_GEN_MAGNETIC_FIELD},
SimReply, SimTarget, SimDevice, SimReply,
}; };
use crate::time::current_millis; use crate::time::current_millis;
@ -53,15 +53,15 @@ impl MagnetometerModel {
} }
pub async fn send_sensor_values(&mut self, _: (), scheduler: &Scheduler<Self>) { pub async fn send_sensor_values(&mut self, _: (), scheduler: &Scheduler<Self>) {
let current_time = scheduler.time();
println!("current monotonic time: {:?}", current_time);
let value = self.calculate_current_mgm_tuple(current_millis(scheduler.time()));
let reply = SimReply {
device: SimDevice::Mgm,
reply: serde_json::to_string(&value).unwrap(),
};
self.reply_sender self.reply_sender
.send(SimReply::new( .send(reply)
SimTarget::Mgm,
MgmReply {
switch_state: self.switch_state,
sensor_values: self
.calculate_current_mgm_tuple(current_millis(scheduler.time())),
},
))
.expect("sending MGM sensor values failed"); .expect("sending MGM sensor values failed");
} }
@ -71,7 +71,7 @@ impl MagnetometerModel {
self.external_mag_field = Some(field); self.external_mag_field = Some(field);
} }
fn calculate_current_mgm_tuple(&self, time_ms: u64) -> MgmSensorValues { fn calculate_current_mgm_tuple(&mut self, time_ms: u64) -> MgmSensorValues {
if SwitchStateBinary::On == self.switch_state { if SwitchStateBinary::On == self.switch_state {
if let Some(ext_field) = self.external_mag_field { if let Some(ext_field) = self.external_mag_field {
return ext_field; return ext_field;
@ -140,21 +140,15 @@ impl MagnetorquerModel {
} }
pub async fn request_housekeeping_data(&mut self, _: (), scheduler: &Scheduler<Self>) { pub async fn request_housekeeping_data(&mut self, _: (), scheduler: &Scheduler<Self>) {
if self.switch_state != SwitchStateBinary::On {
return;
}
scheduler scheduler
.schedule_event(Duration::from_millis(15), Self::send_housekeeping_data, ()) .schedule_event(Duration::from_millis(15), Self::send_housekeeping_data, ())
.expect("requesting housekeeping data failed") .expect("requesting housekeeping data failed")
} }
pub fn send_housekeeping_data(&mut self) { pub fn send_housekeeping_data(&mut self) {
let mgt_reply = MgtReply::Hk(MgtHkSet { let mgt_reply = MgtReply::Hk(self.torque_dipole);
dipole: self.torque_dipole,
torquing: self.torquing,
});
self.reply_sender self.reply_sender
.send(SimReply::new(SimTarget::Mgt, mgt_reply)) .send(SimReply::new(SimDevice::Mgt, mgt_reply))
.unwrap(); .unwrap();
} }
@ -184,18 +178,20 @@ pub mod tests {
use satrs::power::SwitchStateBinary; use satrs::power::SwitchStateBinary;
use satrs_minisim::{ use satrs_minisim::{
acs::{MgmReply, MgmRequest, MgtDipole, MgtHkSet, MgtReply, MgtRequest}, acs::{MgmRequest, MgmSensorValues},
eps::PcduSwitch, SimDevice, SimRequest,
SimRequest, SimTarget,
}; };
use crate::{eps::tests::switch_device_on, test_helpers::SimTestbench}; use crate::{
eps::{self, PcduRequest},
test_helpers::SimTestbench,
};
#[test] #[test]
fn test_basic_mgm_request() { fn test_basic_mgm_request() {
let mut sim_testbench = SimTestbench::new(); let mut sim_testbench = SimTestbench::new();
let mgm_request = MgmRequest::RequestSensorData; let mgm_request = MgmRequest::RequestSensorData;
let request = SimRequest::new(SimTarget::Mgm, mgm_request); let request = SimRequest::new(SimDevice::Mgm, mgm_request);
sim_testbench sim_testbench
.send_request(request) .send_request(request)
.expect("sending MGM request failed"); .expect("sending MGM request failed");
@ -204,37 +200,47 @@ pub mod tests {
let sim_reply = sim_testbench.try_receive_next_reply(); let sim_reply = sim_testbench.try_receive_next_reply();
assert!(sim_reply.is_some()); assert!(sim_reply.is_some());
let sim_reply = sim_reply.unwrap(); let sim_reply = sim_reply.unwrap();
assert_eq!(sim_reply.target(), SimTarget::Mgm); assert_eq!(sim_reply.device, SimDevice::Mgm);
let reply: MgmReply = serde_json::from_str(sim_reply.reply()) let reply: MgmSensorValues = serde_json::from_str(&sim_reply.reply)
.expect("failed to deserialize MGM sensor values"); .expect("failed to deserialize MGM sensor values");
assert_eq!(reply.switch_state, SwitchStateBinary::Off); assert_eq!(reply.x, 0.0);
assert_eq!(reply.sensor_values.x, 0.0); assert_eq!(reply.y, 0.0);
assert_eq!(reply.sensor_values.y, 0.0); assert_eq!(reply.z, 0.0);
assert_eq!(reply.sensor_values.z, 0.0);
} }
#[test] #[test]
fn test_basic_mgm_request_switched_on() { fn test_basic_mgm_request_switched_on() {
let mut sim_testbench = SimTestbench::new(); let mut sim_testbench = SimTestbench::new();
switch_device_on(&mut sim_testbench, PcduSwitch::Mgm); let pcdu_request = PcduRequest::SwitchDevice {
switch: eps::PcduSwitch::Mgm,
state: SwitchStateBinary::On,
};
let mut request = SimRequest::new(SimDevice::Pcdu, pcdu_request);
sim_testbench
.send_request(request)
.expect("sending MGM switch request failed");
sim_testbench.handle_sim_requests();
sim_testbench.step();
let mut sim_reply_res = sim_testbench.try_receive_next_reply();
assert!(sim_reply_res.is_none());
let mgm_request = MgmRequest::RequestSensorData; let mgm_request = MgmRequest::RequestSensorData;
let mut request = SimRequest::new(SimTarget::Mgm, mgm_request); request = SimRequest::new(SimDevice::Mgm, mgm_request);
sim_testbench sim_testbench
.send_request(request) .send_request(request)
.expect("sending MGM request failed"); .expect("sending MGM request failed");
sim_testbench.handle_sim_requests(); sim_testbench.handle_sim_requests();
sim_testbench.step(); sim_testbench.step();
let mut sim_reply_res = sim_testbench.try_receive_next_reply(); sim_reply_res = sim_testbench.try_receive_next_reply();
assert!(sim_reply_res.is_some()); assert!(sim_reply_res.is_some());
let mut sim_reply = sim_reply_res.unwrap(); let mut sim_reply = sim_reply_res.unwrap();
assert_eq!(sim_reply.target(), SimTarget::Mgm); assert_eq!(sim_reply.device, SimDevice::Mgm);
let first_reply: MgmReply = serde_json::from_str(sim_reply.reply()) let first_reply: MgmSensorValues = serde_json::from_str(&sim_reply.reply)
.expect("failed to deserialize MGM sensor values"); .expect("failed to deserialize MGM sensor values");
let mgm_request = MgmRequest::RequestSensorData; let mgm_request = MgmRequest::RequestSensorData;
sim_testbench.step_by(Duration::from_millis(50)); sim_testbench.step_by(Duration::from_millis(50));
request = SimRequest::new(SimTarget::Mgm, mgm_request); request = SimRequest::new(SimDevice::Mgm, mgm_request);
sim_testbench sim_testbench
.send_request(request) .send_request(request)
.expect("sending MGM request failed"); .expect("sending MGM request failed");
@ -244,106 +250,12 @@ pub mod tests {
assert!(sim_reply_res.is_some()); assert!(sim_reply_res.is_some());
sim_reply = sim_reply_res.unwrap(); sim_reply = sim_reply_res.unwrap();
let second_reply: MgmReply = serde_json::from_str(sim_reply.reply()) let second_reply: MgmSensorValues = serde_json::from_str(&sim_reply.reply)
.expect("failed to deserialize MGM sensor values"); .expect("failed to deserialize MGM sensor values");
// Check that the values are changing. // Check that the values are changing.
assert!(first_reply != second_reply); assert!(first_reply != second_reply);
} }
#[test] #[test]
fn test_basic_mgt_request_is_off() { fn test_mgm_request_with_mgt_switched_on() {}
let mut sim_testbench = SimTestbench::new();
let mgt_request = MgtRequest::RequestHk;
let request = SimRequest::new(SimTarget::Mgt, mgt_request);
sim_testbench
.send_request(request)
.expect("sending MGM request failed");
sim_testbench.handle_sim_requests();
sim_testbench.step();
let sim_reply_res = sim_testbench.try_receive_next_reply();
assert!(sim_reply_res.is_none());
}
#[test]
fn test_basic_mgt_request_is_on() {
let mut sim_testbench = SimTestbench::new();
switch_device_on(&mut sim_testbench, PcduSwitch::Mgt);
let mgt_request = MgtRequest::RequestHk;
let request = SimRequest::new(SimTarget::Mgt, mgt_request);
sim_testbench
.send_request(request)
.expect("sending MGM request failed");
sim_testbench.handle_sim_requests();
sim_testbench.step();
let sim_reply_res = sim_testbench.try_receive_next_reply();
assert!(sim_reply_res.is_some());
let sim_reply = sim_reply_res.unwrap();
let mgt_reply: MgtReply = serde_json::from_str(sim_reply.reply())
.expect("failed to deserialize MGM sensor values");
match mgt_reply {
MgtReply::Hk(hk) => {
assert_eq!(hk.dipole, MgtDipole::default());
assert!(!hk.torquing);
}
_ => panic!("unexpected reply"),
}
}
fn check_mgt_hk(sim_testbench: &mut SimTestbench, expected_hk_set: MgtHkSet) {
let mgt_request = MgtRequest::RequestHk;
let request = SimRequest::new(SimTarget::Mgt, mgt_request);
sim_testbench
.send_request(request)
.expect("sending MGM request failed");
sim_testbench.handle_sim_requests();
sim_testbench.step();
let sim_reply_res = sim_testbench.try_receive_next_reply();
assert!(sim_reply_res.is_some());
let sim_reply = sim_reply_res.unwrap();
let mgt_reply: MgtReply = serde_json::from_str(sim_reply.reply())
.expect("failed to deserialize MGM sensor values");
match mgt_reply {
MgtReply::Hk(hk) => {
assert_eq!(hk, expected_hk_set);
}
_ => panic!("unexpected reply"),
}
}
#[test]
fn test_basic_mgt_request_is_on_and_torquing() {
let mut sim_testbench = SimTestbench::new();
switch_device_on(&mut sim_testbench, PcduSwitch::Mgt);
let commanded_dipole = MgtDipole {
x: -200,
y: 200,
z: 1000,
};
let mgt_request = MgtRequest::ApplyTorque {
duration: Duration::from_millis(100),
dipole: commanded_dipole,
};
let request = SimRequest::new(SimTarget::Mgt, mgt_request);
sim_testbench
.send_request(request)
.expect("sending MGM request failed");
sim_testbench.handle_sim_requests();
sim_testbench.step_by(Duration::from_millis(5));
check_mgt_hk(
&mut sim_testbench,
MgtHkSet {
dipole: commanded_dipole,
torquing: true,
},
);
sim_testbench.step_by(Duration::from_millis(100));
check_mgt_hk(
&mut sim_testbench,
MgtHkSet {
dipole: MgtDipole::default(),
torquing: false,
},
);
}
} }

View File

@ -6,20 +6,18 @@ use asynchronix::{
}; };
use satrs_minisim::{ use satrs_minisim::{
acs::{MgmRequest, MgtRequest}, acs::{MgmRequest, MgtRequest},
eps::PcduRequest, SimRequest,
RequestError, SimCtrlReply, SimCtrlRequest, SimReply, SimRequest, SimTarget,
}; };
use crate::{ use crate::{
acs::{MagnetometerModel, MagnetorquerModel}, acs::{MagnetometerModel, MagnetorquerModel},
eps::PcduModel, eps::{PcduModel, PcduRequest},
}; };
// The simulation controller processes requests and drives the simulation. // The simulation controller processes requests and drives the simulation.
pub struct SimController { pub struct SimController {
pub sys_clock: SystemClock, pub sys_clock: SystemClock,
pub request_receiver: mpsc::Receiver<SimRequest>, pub request_receiver: mpsc::Receiver<SimRequest>,
pub reply_sender: mpsc::Sender<SimReply>,
pub simulation: Simulation, pub simulation: Simulation,
pub mgm_addr: Address<MagnetometerModel>, pub mgm_addr: Address<MagnetometerModel>,
pub pcdu_addr: Address<PcduModel>, pub pcdu_addr: Address<PcduModel>,
@ -30,7 +28,6 @@ impl SimController {
pub fn new( pub fn new(
sys_clock: SystemClock, sys_clock: SystemClock,
request_receiver: mpsc::Receiver<SimRequest>, request_receiver: mpsc::Receiver<SimRequest>,
reply_sender: mpsc::Sender<SimReply>,
simulation: Simulation, simulation: Simulation,
mgm_addr: Address<MagnetometerModel>, mgm_addr: Address<MagnetometerModel>,
pcdu_addr: Address<PcduModel>, pcdu_addr: Address<PcduModel>,
@ -39,7 +36,6 @@ impl SimController {
Self { Self {
sys_clock, sys_clock,
request_receiver, request_receiver,
reply_sender,
simulation, simulation,
mgm_addr, mgm_addr,
pcdu_addr, pcdu_addr,
@ -66,16 +62,11 @@ impl SimController {
pub fn handle_sim_requests(&mut self) { pub fn handle_sim_requests(&mut self) {
loop { loop {
match self.request_receiver.try_recv() { match self.request_receiver.try_recv() {
Ok(request) => { Ok(request) => match request.device() {
if let Err(e) = match request.target() { satrs_minisim::SimDevice::Mgm => self.handle_mgm_request(request.request()),
SimTarget::SimCtrl => self.handle_ctrl_request(&request), satrs_minisim::SimDevice::Mgt => self.handle_mgt_request(request.request()),
SimTarget::Mgm => self.handle_mgm_request(&request), satrs_minisim::SimDevice::Pcdu => self.handle_pcdu_request(request.request()),
SimTarget::Mgt => self.handle_mgt_request(&request), },
SimTarget::Pcdu => self.handle_pcdu_request(&request),
} {
self.handle_invalid_request_with_valid_target(e, &request)
}
}
Err(e) => match e { Err(e) => match e {
mpsc::TryRecvError::Empty => break, mpsc::TryRecvError::Empty => break,
mpsc::TryRecvError::Disconnected => { mpsc::TryRecvError::Disconnected => {
@ -85,20 +76,13 @@ impl SimController {
} }
} }
} }
fn handle_mgm_request(&mut self, request: &str) {
fn handle_ctrl_request(&mut self, request: &SimRequest) -> serde_json::Result<()> { let mgm_request: serde_json::Result<MgmRequest> = serde_json::from_str(request);
let sim_ctrl_request: SimCtrlRequest = serde_json::from_str(request.request())?; if mgm_request.is_err() {
match sim_ctrl_request { log::warn!("received invalid MGM request: {}", mgm_request.unwrap_err());
SimCtrlRequest::Ping => { return;
self.reply_sender
.send(SimReply::new(SimTarget::SimCtrl, SimCtrlReply::Pong))
.expect("sending reply from sim controller failed");
}
} }
Ok(()) let mgm_request = mgm_request.unwrap();
}
fn handle_mgm_request(&mut self, request: &SimRequest) -> serde_json::Result<()> {
let mgm_request: MgmRequest = serde_json::from_str(request.request())?;
match mgm_request { match mgm_request {
MgmRequest::RequestSensorData => { MgmRequest::RequestSensorData => {
self.simulation.send_event( self.simulation.send_event(
@ -108,11 +92,18 @@ impl SimController {
); );
} }
} }
Ok(())
} }
fn handle_pcdu_request(&mut self, request: &SimRequest) -> serde_json::Result<()> { fn handle_pcdu_request(&mut self, request: &str) {
let pcdu_request: PcduRequest = serde_json::from_str(request.request())?; let pcdu_request: serde_json::Result<PcduRequest> = serde_json::from_str(request);
if pcdu_request.is_err() {
log::warn!(
"received invalid PCDU request: {}",
pcdu_request.unwrap_err()
);
return;
}
let pcdu_request = pcdu_request.unwrap();
match pcdu_request { match pcdu_request {
PcduRequest::RequestSwitchInfo => { PcduRequest::RequestSwitchInfo => {
self.simulation self.simulation
@ -126,11 +117,18 @@ impl SimController {
); );
} }
} }
Ok(())
} }
fn handle_mgt_request(&mut self, request: &SimRequest) -> serde_json::Result<()> { fn handle_mgt_request(&mut self, request: &str) {
let mgt_request: MgtRequest = serde_json::from_str(request.request())?; let mgt_request: serde_json::Result<MgtRequest> = serde_json::from_str(request);
if mgt_request.is_err() {
log::warn!(
"received invalid PCDU request: {}",
mgt_request.unwrap_err()
);
return;
}
let mgt_request = mgt_request.unwrap();
match mgt_request { match mgt_request {
MgtRequest::ApplyTorque { duration, dipole } => self.simulation.send_event( MgtRequest::ApplyTorque { duration, dipole } => self.simulation.send_event(
MagnetorquerModel::apply_torque, MagnetorquerModel::apply_torque,
@ -143,55 +141,5 @@ impl SimController {
&self.mgt_addr, &self.mgt_addr,
), ),
} }
Ok(())
}
fn handle_invalid_request_with_valid_target(
&self,
error: serde_json::Error,
request: &SimRequest,
) {
log::warn!(
"received invalid {:?} request: {:?}",
request.target(),
error
);
self.reply_sender
.send(SimReply::new(
SimTarget::SimCtrl,
SimCtrlReply::from(RequestError::TargetRequestMissmatch(request.clone())),
))
.expect("sending reply from sim controller failed");
}
}
#[cfg(test)]
mod tests {
use crate::test_helpers::SimTestbench;
use super::*;
#[test]
fn test_basic_ping() {
let mut sim_testbench = SimTestbench::new();
let sim_ctrl_request = SimCtrlRequest::Ping;
let request = SimRequest::new(SimTarget::SimCtrl, sim_ctrl_request);
sim_testbench
.send_request(request)
.expect("sending sim ctrl request failed");
sim_testbench.handle_sim_requests();
sim_testbench.step();
let sim_reply = sim_testbench.try_receive_next_reply();
assert!(sim_reply.is_some());
let sim_reply = sim_reply.unwrap();
assert_eq!(sim_reply.target(), SimTarget::SimCtrl);
let reply: SimCtrlReply = serde_json::from_str(sim_reply.reply())
.expect("failed to deserialize MGM sensor values");
assert_eq!(reply, SimCtrlReply::Pong);
}
#[test]
fn test_invalid_request() {
// TODO: Implement this test. Check for the expected reply.
} }
} }

View File

@ -5,13 +5,28 @@ use asynchronix::{
time::Scheduler, time::Scheduler,
}; };
use satrs::power::SwitchStateBinary; use satrs::power::SwitchStateBinary;
use satrs_minisim::{ use satrs_minisim::{SimDevice, SimReply};
eps::{PcduReply, PcduSwitch, SwitchMap}, use serde::{Deserialize, Serialize};
SimReply, SimTarget,
};
pub const SWITCH_INFO_DELAY_MS: u64 = 10; pub const SWITCH_INFO_DELAY_MS: u64 = 10;
pub type SwitchMap = HashMap<PcduSwitch, SwitchStateBinary>;
#[derive(Debug, Copy, Clone, PartialEq, Eq, Serialize, Deserialize, Hash)]
pub enum PcduSwitch {
Mgm = 0,
Mgt = 1,
}
#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
pub enum PcduRequest {
SwitchDevice {
switch: PcduSwitch,
state: SwitchStateBinary,
},
RequestSwitchInfo,
}
pub struct PcduModel { pub struct PcduModel {
pub switcher_map: SwitchMap, pub switcher_map: SwitchMap,
pub mgm_switch: Output<SwitchStateBinary>, pub mgm_switch: Output<SwitchStateBinary>,
@ -44,8 +59,8 @@ impl PcduModel {
} }
pub fn send_switch_info(&mut self) { pub fn send_switch_info(&mut self) {
let switch_info = PcduReply::SwitchInfo(self.switcher_map.clone()); let switch_info = self.switcher_map.clone();
let reply = SimReply::new(SimTarget::Pcdu, switch_info); let reply = SimReply::new(SimDevice::Pcdu, switch_info);
self.reply_sender.send(reply).unwrap(); self.reply_sender.send(reply).unwrap();
} }
@ -72,49 +87,17 @@ impl PcduModel {
impl Model for PcduModel {} impl Model for PcduModel {}
#[cfg(test)] #[cfg(test)]
pub(crate) mod tests { pub mod tests {
use super::*; use super::*;
use std::time::Duration; use std::time::Duration;
use satrs_minisim::{eps::PcduRequest, SimRequest, SimTarget}; use satrs_minisim::{SimDevice, SimRequest};
use crate::test_helpers::SimTestbench; use crate::test_helpers::SimTestbench;
fn switch_device(
sim_testbench: &mut SimTestbench,
switch: PcduSwitch,
target: SwitchStateBinary,
) {
let pcdu_request = PcduRequest::SwitchDevice {
switch,
state: target,
};
let request = SimRequest::new(SimTarget::Pcdu, pcdu_request);
sim_testbench
.send_request(request)
.expect("sending MGM switch request failed");
sim_testbench.handle_sim_requests();
sim_testbench.step();
}
#[allow(dead_code)]
pub(crate) fn switch_device_off(sim_testbench: &mut SimTestbench, switch: PcduSwitch) {
switch_device(sim_testbench, switch, SwitchStateBinary::Off);
}
pub(crate) fn switch_device_on(sim_testbench: &mut SimTestbench, switch: PcduSwitch) {
switch_device(sim_testbench, switch, SwitchStateBinary::On);
}
pub(crate) fn get_all_off_switch_map() -> SwitchMap {
let mut switcher_map = SwitchMap::new();
switcher_map.insert(super::PcduSwitch::Mgm, super::SwitchStateBinary::Off);
switcher_map.insert(super::PcduSwitch::Mgt, super::SwitchStateBinary::Off);
switcher_map
}
fn check_switch_state(sim_testbench: &mut SimTestbench, expected_switch_map: &SwitchMap) { fn check_switch_state(sim_testbench: &mut SimTestbench, expected_switch_map: &SwitchMap) {
let pcdu_request = PcduRequest::RequestSwitchInfo; let pcdu_request = PcduRequest::RequestSwitchInfo;
let request = SimRequest::new(SimTarget::Pcdu, pcdu_request); let request = SimRequest::new(SimDevice::Pcdu, pcdu_request);
sim_testbench sim_testbench
.send_request(request) .send_request(request)
.expect("sending MGM request failed"); .expect("sending MGM request failed");
@ -123,21 +106,33 @@ pub(crate) mod tests {
let sim_reply = sim_testbench.try_receive_next_reply(); let sim_reply = sim_testbench.try_receive_next_reply();
assert!(sim_reply.is_some()); assert!(sim_reply.is_some());
let sim_reply = sim_reply.unwrap(); let sim_reply = sim_reply.unwrap();
assert_eq!(sim_reply.target(), SimTarget::Pcdu); assert_eq!(sim_reply.device, SimDevice::Pcdu);
let pcdu_reply: PcduReply = serde_json::from_str(&sim_reply.reply()) let switch_map: super::SwitchMap =
.expect("failed to deserialize PCDU switch info"); serde_json::from_str(&sim_reply.reply).expect("failed to deserialize PCDU switch info");
match pcdu_reply { assert_eq!(&switch_map, expected_switch_map);
PcduReply::SwitchInfo(switch_map) => {
assert_eq!(switch_map, *expected_switch_map);
}
}
} }
fn test_pcdu_switching_single_switch(switch: PcduSwitch, target: SwitchStateBinary) { fn get_all_off_switch_map() -> SwitchMap {
let mut switcher_map = SwitchMap::new();
switcher_map.insert(super::PcduSwitch::Mgm, super::SwitchStateBinary::Off);
switcher_map.insert(super::PcduSwitch::Mgt, super::SwitchStateBinary::Off);
switcher_map
}
fn test_pcdu_switching_single_switch(switch: PcduSwitch) {
let mut sim_testbench = SimTestbench::new(); let mut sim_testbench = SimTestbench::new();
switch_device(&mut sim_testbench, switch, target); let pcdu_request = PcduRequest::SwitchDevice {
switch,
state: SwitchStateBinary::On,
};
let request = SimRequest::new(SimDevice::Pcdu, pcdu_request);
sim_testbench
.send_request(request)
.expect("sending MGM request failed");
sim_testbench.handle_sim_requests();
sim_testbench.step();
let mut switcher_map = get_all_off_switch_map(); let mut switcher_map = get_all_off_switch_map();
*switcher_map.get_mut(&switch).unwrap() = target; *switcher_map.get_mut(&switch).unwrap() = SwitchStateBinary::On;
check_switch_state(&mut sim_testbench, &switcher_map); check_switch_state(&mut sim_testbench, &switcher_map);
} }
@ -145,7 +140,7 @@ pub(crate) mod tests {
fn test_pcdu_switcher_request() { fn test_pcdu_switcher_request() {
let mut sim_testbench = SimTestbench::new(); let mut sim_testbench = SimTestbench::new();
let pcdu_request = PcduRequest::RequestSwitchInfo; let pcdu_request = PcduRequest::RequestSwitchInfo;
let request = SimRequest::new(SimTarget::Pcdu, pcdu_request); let request = SimRequest::new(SimDevice::Pcdu, pcdu_request);
sim_testbench sim_testbench
.send_request(request) .send_request(request)
.expect("sending MGM request failed"); .expect("sending MGM request failed");
@ -159,29 +154,19 @@ pub(crate) mod tests {
let sim_reply = sim_testbench.try_receive_next_reply(); let sim_reply = sim_testbench.try_receive_next_reply();
assert!(sim_reply.is_some()); assert!(sim_reply.is_some());
let sim_reply = sim_reply.unwrap(); let sim_reply = sim_reply.unwrap();
assert_eq!(sim_reply.target(), SimTarget::Pcdu); assert_eq!(sim_reply.device, SimDevice::Pcdu);
let pcdu_reply: PcduReply = serde_json::from_str(&sim_reply.reply()) let switch_map: super::SwitchMap =
.expect("failed to deserialize PCDU switch info"); serde_json::from_str(&sim_reply.reply).expect("failed to deserialize PCDU switch info");
match pcdu_reply { assert_eq!(switch_map, get_all_off_switch_map());
PcduReply::SwitchInfo(switch_map) => {
assert_eq!(switch_map, get_all_off_switch_map());
}
}
} }
#[test] #[test]
fn test_pcdu_switching_mgm_on() { fn test_pcdu_switching_mgm() {
test_pcdu_switching_single_switch(PcduSwitch::Mgm, SwitchStateBinary::On); test_pcdu_switching_single_switch(PcduSwitch::Mgm);
} }
#[test] #[test]
fn test_pcdu_switching_mgt_on() { fn test_pcdu_switching_mgt() {
test_pcdu_switching_single_switch(PcduSwitch::Mgt, SwitchStateBinary::On); test_pcdu_switching_single_switch(PcduSwitch::Mgt);
}
#[test]
fn test_pcdu_switching_mgt_off() {
test_pcdu_switching_single_switch(PcduSwitch::Mgt, SwitchStateBinary::On);
test_pcdu_switching_single_switch(PcduSwitch::Mgt, SwitchStateBinary::Off);
} }
} }

View File

@ -1,29 +1,28 @@
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
#[derive(Debug, Copy, Clone, PartialEq, Eq, Serialize, Deserialize)] #[derive(Debug, Copy, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum SimTarget { pub enum SimDevice {
SimCtrl,
Mgm, Mgm,
Mgt, Mgt,
Pcdu, Pcdu,
} }
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SimRequest { pub struct SimRequest {
target: SimTarget, device: SimDevice,
request: String, request: String,
} }
impl SimRequest { impl SimRequest {
pub fn new<T: Serialize>(device: SimTarget, request: T) -> Self { pub fn new<T: Serialize>(device: SimDevice, reply: T) -> Self {
Self { Self {
target: device, device,
request: serde_json::to_string(&request).unwrap(), request: serde_json::to_string(&reply).unwrap(),
} }
} }
pub fn target(&self) -> SimTarget { pub fn device(&self) -> SimDevice {
self.target self.device
} }
pub fn request(&self) -> &String { pub fn request(&self) -> &String {
@ -31,85 +30,28 @@ impl SimRequest {
} }
} }
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SimReply { pub struct SimReply {
target: SimTarget, pub device: SimDevice,
reply: String, pub reply: String,
} }
impl SimReply { impl SimReply {
pub fn new<T: Serialize>(device: SimTarget, reply: T) -> Self { pub fn new<T: Serialize>(device: SimDevice, reply: T) -> Self {
Self { Self {
target: device, device,
reply: serde_json::to_string(&reply).unwrap(), reply: serde_json::to_string(&reply).unwrap(),
} }
} }
pub fn target(&self) -> SimTarget {
self.target
}
pub fn reply(&self) -> &String { pub fn reply(&self) -> &String {
&self.reply &self.reply
} }
} }
#[derive(Debug, Copy, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum SimCtrlRequest {
Ping,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum RequestError {
TargetRequestMissmatch(SimRequest),
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum SimCtrlReply {
Pong,
InvalidRequest(RequestError),
}
impl From<RequestError> for SimCtrlReply {
fn from(error: RequestError) -> Self {
SimCtrlReply::InvalidRequest(error)
}
}
pub mod eps {
use super::*;
use std::collections::HashMap;
use satrs::power::SwitchStateBinary;
pub type SwitchMap = HashMap<PcduSwitch, SwitchStateBinary>;
#[derive(Debug, Copy, Clone, PartialEq, Eq, Serialize, Deserialize, Hash)]
pub enum PcduSwitch {
Mgm = 0,
Mgt = 1,
}
#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
pub enum PcduRequest {
SwitchDevice {
switch: PcduSwitch,
state: SwitchStateBinary,
},
RequestSwitchInfo,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum PcduReply {
SwitchInfo(SwitchMap),
}
}
pub mod acs { pub mod acs {
use std::time::Duration; use std::time::Duration;
use satrs::power::SwitchStateBinary;
use super::*; use super::*;
#[derive(Debug, Copy, Clone, Serialize, Deserialize)] #[derive(Debug, Copy, Clone, Serialize, Deserialize)]
@ -127,12 +69,6 @@ pub mod acs {
pub z: f32, pub z: f32,
} }
#[derive(Debug, Copy, Clone, PartialEq, Serialize, Deserialize)]
pub struct MgmReply {
pub switch_state: SwitchStateBinary,
pub sensor_values: MgmSensorValues,
}
pub const MGT_GEN_MAGNETIC_FIELD: MgmSensorValues = MgmSensorValues { pub const MGT_GEN_MAGNETIC_FIELD: MgmSensorValues = MgmSensorValues {
x: 0.03, x: 0.03,
y: -0.03, y: -0.03,
@ -140,18 +76,13 @@ pub mod acs {
}; };
// Simple model using i16 values. // Simple model using i16 values.
#[derive(Default, Debug, Copy, Clone, PartialEq, Eq, Serialize, Deserialize)] #[derive(Default, Debug, Copy, Clone, PartialEq, Serialize, Deserialize)]
pub struct MgtDipole { pub struct MgtDipole {
pub x: i16, pub x: i16,
pub y: i16, pub y: i16,
pub z: i16, pub z: i16,
} }
#[derive(Debug, Copy, Clone, PartialEq, Serialize, Deserialize)]
pub enum MgtRequestType {
ApplyTorque,
}
#[derive(Debug, Copy, Clone, Serialize, Deserialize)] #[derive(Debug, Copy, Clone, Serialize, Deserialize)]
pub enum MgtRequest { pub enum MgtRequest {
ApplyTorque { ApplyTorque {
@ -161,86 +92,8 @@ pub mod acs {
RequestHk, RequestHk,
} }
#[derive(Debug, Copy, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct MgtHkSet {
pub dipole: MgtDipole,
pub torquing: bool,
}
#[derive(Debug, Copy, Clone, Serialize, Deserialize)] #[derive(Debug, Copy, Clone, Serialize, Deserialize)]
pub enum MgtReply { pub enum MgtReply {
Ack(MgtRequestType), Hk(MgtDipole),
Nak(MgtRequestType),
Hk(MgtHkSet),
}
}
pub mod udp {
use std::{
net::{SocketAddr, UdpSocket},
time::Duration,
};
use thiserror::Error;
use crate::{SimReply, SimRequest};
#[derive(Error, Debug)]
pub enum ReceptionError {
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
#[error("Serde JSON error: {0}")]
SerdeJson(#[from] serde_json::Error),
}
pub struct SimUdpClient {
socket: UdpSocket,
pub reply_buf: [u8; 4096],
}
impl SimUdpClient {
pub fn new(
server_addr: &SocketAddr,
non_blocking: bool,
read_timeot_ms: Option<u64>,
) -> std::io::Result<Self> {
let socket = UdpSocket::bind("127.0.0.1:0")?;
socket.set_nonblocking(non_blocking)?;
socket
.connect(server_addr)
.expect("could not connect to server addr");
if let Some(read_timeout) = read_timeot_ms {
// Set a read timeout so the test does not hang on failures.
socket.set_read_timeout(Some(Duration::from_millis(read_timeout)))?;
}
Ok(Self {
socket,
reply_buf: [0; 4096],
})
}
pub fn set_nonblocking(&self, non_blocking: bool) -> std::io::Result<()> {
self.socket.set_nonblocking(non_blocking)
}
pub fn set_read_timeout(&self, read_timeout_ms: u64) -> std::io::Result<()> {
self.socket
.set_read_timeout(Some(Duration::from_millis(read_timeout_ms)))
}
pub fn send_request(&self, sim_request: &SimRequest) -> std::io::Result<usize> {
self.socket.send(
&serde_json::to_vec(sim_request).expect("conversion of request to vector failed"),
)
}
pub fn recv_raw(&mut self) -> std::io::Result<usize> {
self.socket.recv(&mut self.reply_buf)
}
pub fn recv_sim_reply(&mut self) -> Result<SimReply, ReceptionError> {
let read_len = self.recv_raw()?;
Ok(serde_json::from_slice(&self.reply_buf[0..read_len])?)
}
} }
} }

View File

@ -7,7 +7,7 @@ use satrs_minisim::{SimReply, SimRequest};
use std::sync::mpsc; use std::sync::mpsc;
use std::thread; use std::thread;
use std::time::{Duration, SystemTime}; use std::time::{Duration, SystemTime};
use udp::SimUdpServer; use udp::{SharedSocketAddr, UdpTcServer, UdpTmClient};
mod acs; mod acs;
mod controller; mod controller;
@ -69,7 +69,6 @@ fn create_sim_controller(
SimController::new( SimController::new(
sys_clock, sys_clock,
request_receiver, request_receiver,
reply_sender,
simulation, simulation,
mgm_addr, mgm_addr,
pcdu_addr, pcdu_addr,
@ -78,6 +77,7 @@ fn create_sim_controller(
} }
fn main() { fn main() {
let shared_socket_addr = SharedSocketAddr::default();
let (request_sender, request_receiver) = mpsc::channel(); let (request_sender, request_receiver) = mpsc::channel();
let (reply_sender, reply_receiver) = mpsc::channel(); let (reply_sender, reply_receiver) = mpsc::channel();
let t0 = MonotonicTime::EPOCH; let t0 = MonotonicTime::EPOCH;
@ -89,15 +89,19 @@ fn main() {
sim_ctrl.run(t0, 1); sim_ctrl.run(t0, 1);
}); });
let mut udp_server = SimUdpServer::new(0, request_sender, reply_receiver, 200, None) let mut server = UdpTcServer::new(request_sender, shared_socket_addr.clone()).unwrap();
.expect("could not create UDP request server"); // This thread manages the simulator UDP TC server.
// This thread manages the simulator UDP server.
let udp_tc_thread = thread::spawn(move || { let udp_tc_thread = thread::spawn(move || {
udp_server.run(); server.run();
});
let mut client = UdpTmClient::new(reply_receiver, 200, shared_socket_addr);
// This thread manages the simulator UDP TM client.
let udp_tm_thread = thread::spawn(move || {
client.run();
}); });
sim_thread.join().expect("joining simulation thread failed"); sim_thread.join().expect("joining simulation thread failed");
udp_tc_thread udp_tc_thread.join().expect("joining UDP TC thread failed");
.join() udp_tm_thread.join().expect("joining UDP TM thread failed");
.expect("joining UDP server thread failed");
} }

View File

@ -1,111 +1,113 @@
use std::{ use std::{
collections::VecDeque, collections::VecDeque,
io::ErrorKind,
net::{SocketAddr, UdpSocket}, net::{SocketAddr, UdpSocket},
sync::{atomic::AtomicBool, mpsc, Arc}, sync::{mpsc, Arc, Mutex},
time::Duration, time::Duration,
}; };
use satrs_minisim::{SimReply, SimRequest}; use satrs_minisim::{SimReply, SimRequest};
pub type SharedSocketAddr = Arc<Mutex<Option<SocketAddr>>>;
// A UDP server which handles all TC received by a client application. // A UDP server which handles all TC received by a client application.
pub struct SimUdpServer { pub struct UdpTcServer {
socket: UdpSocket, socket: UdpSocket,
request_sender: mpsc::Sender<SimRequest>, request_sender: mpsc::Sender<SimRequest>,
// shared_last_sender: SharedSocketAddr, shared_last_sender: SharedSocketAddr,
reply_receiver: mpsc::Receiver<SimReply>,
reply_queue: VecDeque<SimReply>,
max_num_replies: usize,
// Stop signal to stop the server. Required for unittests and useful to allow clean shutdown
// of the application.
stop_signal: Option<Arc<AtomicBool>>,
idle_sleep_period_ms: u64,
req_buf: [u8; 4096],
sender_addr: Option<SocketAddr>,
} }
impl SimUdpServer { impl UdpTcServer {
pub fn new( pub fn new(
local_port: u16,
request_sender: mpsc::Sender<SimRequest>, request_sender: mpsc::Sender<SimRequest>,
reply_receiver: mpsc::Receiver<SimReply>, shared_last_sender: SharedSocketAddr,
max_num_replies: usize,
stop_signal: Option<Arc<AtomicBool>>,
) -> std::io::Result<Self> { ) -> std::io::Result<Self> {
let socket = UdpSocket::bind(SocketAddr::from(([0, 0, 0, 0], local_port)))?; let socket = UdpSocket::bind("0.0.0.0:7303")?;
socket.set_nonblocking(true)?;
Ok(Self { Ok(Self {
socket, socket,
request_sender, request_sender,
reply_receiver, shared_last_sender,
reply_queue: VecDeque::new(),
max_num_replies,
stop_signal,
idle_sleep_period_ms: 3,
req_buf: [0; 4096],
sender_addr: None,
}) })
} }
#[allow(dead_code)]
pub fn server_addr(&self) -> std::io::Result<SocketAddr> {
self.socket.local_addr()
}
pub fn run(&mut self) { pub fn run(&mut self) {
let mut last_socket_addr = None;
loop { loop {
if let Some(stop_signal) = &self.stop_signal { // Buffer to store incoming data.
if stop_signal.load(std::sync::atomic::Ordering::Relaxed) { let mut buffer = [0u8; 4096];
break; // Block until data is received. `recv_from` returns the number of bytes read and the
} // sender's address.
} let (bytes_read, src) = self
let processed_requests = self.process_requests(); .socket
let processed_replies = self.process_replies(); .recv_from(&mut buffer)
let sent_replies = self.send_replies(); .expect("could not read from socket");
// Sleep for a bit if there is nothing to do to prevent burning CPU cycles. Delay
// should be kept short to ensure responsiveness of the system.
if !processed_requests && !processed_replies && !sent_replies {
std::thread::sleep(Duration::from_millis(self.idle_sleep_period_ms));
}
}
}
fn process_requests(&mut self) -> bool {
let mut processed_requests = false;
loop {
// Blocks for a certain amount of time until data is received to allow doing periodic
// work like checking the stop signal.
let (bytes_read, src) = match self.socket.recv_from(&mut self.req_buf) {
Ok((bytes_read, src)) => (bytes_read, src),
Err(e) if e.kind() == ErrorKind::WouldBlock => {
// Continue to perform regular checks like the stop signal.
break;
}
Err(e) => {
// Handle unexpected errors (e.g., socket closed) here.
log::error!("unexpected request server error: {e}");
break;
}
};
self.sender_addr = Some(src);
// Convert the buffer into a string slice and print the message. // Convert the buffer into a string slice and print the message.
let req_string = std::str::from_utf8(&self.req_buf[..bytes_read]) let req_string = std::str::from_utf8(&buffer[..bytes_read])
.expect("Could not write buffer as string"); .expect("Could not write buffer as string");
log::info!("Received request from {}: {}", src, req_string); println!("Received from {}: {}", src, req_string);
let sim_req: serde_json::Result<SimRequest> = serde_json::from_str(req_string); let sim_req: serde_json::Result<SimRequest> = serde_json::from_str(req_string);
if sim_req.is_err() { if sim_req.is_err() {
log::warn!( log::warn!(
"received UDP request with invalid format: {}", "received UDP request with invalid format: {}",
sim_req.unwrap_err() sim_req.unwrap_err()
); );
return processed_requests; continue;
} }
self.request_sender.send(sim_req.unwrap()).unwrap(); self.request_sender.send(sim_req.unwrap()).unwrap();
processed_requests = true; // Only set last sender if it has changed.
if last_socket_addr.is_some() && src != last_socket_addr.unwrap() {
self.shared_last_sender.lock().unwrap().replace(src);
}
last_socket_addr = Some(src);
}
}
}
// A helper object which sends back all replies to the UDP client.
//
// This helper is scheduled separately to minimize the delay between the requests and replies.
pub struct UdpTmClient {
reply_receiver: mpsc::Receiver<SimReply>,
reply_queue: VecDeque<SimReply>,
max_num_replies: usize,
socket: UdpSocket,
last_sender: SharedSocketAddr,
}
impl UdpTmClient {
pub fn new(
reply_receiver: mpsc::Receiver<SimReply>,
max_num_replies: usize,
last_sender: SharedSocketAddr,
) -> Self {
let socket =
UdpSocket::bind("127.0.0.1:0").expect("creating UDP client for TM sender failed");
Self {
reply_receiver,
reply_queue: VecDeque::new(),
max_num_replies,
socket,
last_sender,
}
}
pub fn run(&mut self) {
loop {
let processed_replies = self.process_replies();
let last_sender_lock = self
.last_sender
.lock()
.expect("locking last UDP sender failed");
let last_sender = *last_sender_lock;
drop(last_sender_lock);
let mut sent_replies = false;
if let Some(last_sender) = last_sender {
sent_replies = self.send_replies(last_sender);
}
if !processed_replies && !sent_replies {
std::thread::sleep(Duration::from_millis(20));
}
} }
processed_requests
} }
fn process_replies(&mut self) -> bool { fn process_replies(&mut self) -> bool {
@ -129,19 +131,18 @@ impl SimUdpServer {
} }
} }
fn send_replies(&mut self) -> bool { fn send_replies(&mut self, last_sender: SocketAddr) -> bool {
if self.sender_addr.is_none() {
return false;
}
let mut sent_replies = false; let mut sent_replies = false;
self.socket
.connect(last_sender)
.expect("connecting to last sender failed");
while !self.reply_queue.is_empty() { while !self.reply_queue.is_empty() {
let next_reply_to_send = self.reply_queue.pop_front().unwrap(); let next_reply_to_send = self.reply_queue.pop_front().unwrap();
self.socket self.socket
.send_to( .send(
serde_json::to_string(&next_reply_to_send) serde_json::to_string(&next_reply_to_send)
.unwrap() .unwrap()
.as_bytes(), .as_bytes(),
self.sender_addr.unwrap(),
) )
.expect("sending reply failed"); .expect("sending reply failed");
sent_replies = true; sent_replies = true;
@ -152,252 +153,6 @@ impl SimUdpServer {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::{
io::ErrorKind,
sync::{
atomic::{AtomicBool, Ordering},
mpsc, Arc,
},
time::Duration,
};
use satrs_minisim::{
eps::{PcduReply, PcduRequest},
udp::{ReceptionError, SimUdpClient},
SimCtrlReply, SimCtrlRequest, SimReply, SimRequest, SimTarget,
};
use crate::eps::tests::get_all_off_switch_map;
use delegate::delegate;
use super::SimUdpServer;
// Wait time to ensure even possibly laggy systems like CI servers can run the tests.
const SERVER_WAIT_TIME_MS: u64 = 50;
struct UdpTestbench {
client: SimUdpClient,
stop_signal: Arc<AtomicBool>,
request_receiver: mpsc::Receiver<SimRequest>,
reply_sender: mpsc::Sender<SimReply>,
}
impl UdpTestbench {
pub fn new(
client_non_blocking: bool,
client_read_timeout_ms: Option<u64>,
max_num_replies: usize,
) -> std::io::Result<(Self, SimUdpServer)> {
let (request_sender, request_receiver) = mpsc::channel();
let (reply_sender, reply_receiver) = mpsc::channel();
let stop_signal = Arc::new(AtomicBool::new(false));
let server = SimUdpServer::new(
0,
request_sender,
reply_receiver,
max_num_replies,
Some(stop_signal.clone()),
)?;
let server_addr = server.server_addr()?;
Ok((
Self {
client: SimUdpClient::new(
&server_addr,
client_non_blocking,
client_read_timeout_ms,
)?,
stop_signal,
request_receiver,
reply_sender,
},
server,
))
}
pub fn try_recv_request(&self) -> Result<SimRequest, mpsc::TryRecvError> {
self.request_receiver.try_recv()
}
pub fn stop(&self) {
self.stop_signal.store(true, Ordering::Relaxed);
}
pub fn send_reply(&self, sim_reply: &SimReply) {
self.reply_sender
.send(sim_reply.clone())
.expect("sending sim reply failed");
}
delegate! {
to self.client {
pub fn send_request(&self, sim_request: &SimRequest) -> std::io::Result<usize>;
pub fn recv_sim_reply(&mut self) -> Result<SimReply, ReceptionError>;
}
}
pub fn check_no_sim_reply_available(&mut self) {
if let Err(ReceptionError::Io(ref io_error)) = self.recv_sim_reply() {
if io_error.kind() == ErrorKind::WouldBlock {
// Continue to perform regular checks like the stop signal.
return;
} else {
// Handle unexpected errors (e.g., socket closed) here.
panic!("unexpected request server error: {io_error}");
}
}
panic!("unexpected reply available");
}
pub fn check_next_sim_reply(&mut self, expected_reply: &SimReply) {
match self.recv_sim_reply() {
Ok(received_sim_reply) => assert_eq!(expected_reply, &received_sim_reply),
Err(e) => match e {
ReceptionError::Io(ref io_error) => {
if io_error.kind() == ErrorKind::WouldBlock {
// Continue to perform regular checks like the stop signal.
panic!("no simulation reply received");
} else {
// Handle unexpected errors (e.g., socket closed) here.
panic!("unexpected request server error: {e}");
}
}
ReceptionError::SerdeJson(json_error) => {
panic!("unexpected JSON error: {json_error}");
}
},
}
}
}
#[test] #[test]
fn test_basic_udp_request_reception() { fn test_basic_udp_tc_reception() {}
let (udp_testbench, mut udp_server) =
UdpTestbench::new(true, Some(SERVER_WAIT_TIME_MS), 10)
.expect("could not create testbench");
let server_thread = std::thread::spawn(move || udp_server.run());
let sim_request = SimRequest::new(SimTarget::Pcdu, PcduRequest::RequestSwitchInfo);
udp_testbench
.send_request(&sim_request)
.expect("sending request failed");
std::thread::sleep(Duration::from_millis(SERVER_WAIT_TIME_MS));
// Check that the sim request has arrives and was forwarded.
let received_sim_request = udp_testbench
.try_recv_request()
.expect("did not receive request");
assert_eq!(sim_request, received_sim_request);
// Stop the server.
udp_testbench.stop();
server_thread.join().unwrap();
}
#[test]
fn test_udp_reply_server() {
let (mut udp_testbench, mut udp_server) =
UdpTestbench::new(false, Some(SERVER_WAIT_TIME_MS), 10)
.expect("could not create testbench");
let server_thread = std::thread::spawn(move || udp_server.run());
udp_testbench
.send_request(&SimRequest::new(SimTarget::SimCtrl, SimCtrlRequest::Ping))
.expect("sending request failed");
let sim_reply = SimReply::new(
SimTarget::Pcdu,
PcduReply::SwitchInfo(get_all_off_switch_map()),
);
udp_testbench.send_reply(&sim_reply);
udp_testbench.check_next_sim_reply(&sim_reply);
// Stop the server.
udp_testbench.stop();
server_thread.join().unwrap();
}
#[test]
fn test_udp_req_server_and_reply_sender() {
let (mut udp_testbench, mut udp_server) =
UdpTestbench::new(false, Some(SERVER_WAIT_TIME_MS), 10)
.expect("could not create testbench");
let server_thread = std::thread::spawn(move || udp_server.run());
// Send a ping so that the server knows the address of the client.
// Do not check that the request arrives on the receiver side, is done by other test.
udp_testbench
.send_request(&SimRequest::new(SimTarget::SimCtrl, SimCtrlRequest::Ping))
.expect("sending request failed");
// Send a reply to the server, ensure it gets forwarded to the client.
let sim_reply = SimReply::new(
SimTarget::Pcdu,
PcduReply::SwitchInfo(get_all_off_switch_map()),
);
udp_testbench.send_reply(&sim_reply);
std::thread::sleep(Duration::from_millis(SERVER_WAIT_TIME_MS));
// Now we check that the reply server can send back replies to the client.
udp_testbench.check_next_sim_reply(&sim_reply);
udp_testbench.stop();
server_thread.join().unwrap();
}
#[test]
fn test_udp_replies_client_unconnected() {
let (mut udp_testbench, mut udp_server) =
UdpTestbench::new(true, None, 10).expect("could not create testbench");
let server_thread = std::thread::spawn(move || udp_server.run());
// Send a reply to the server. The client is not connected, so it won't get forwarded.
let sim_reply = SimReply::new(
SimTarget::Pcdu,
PcduReply::SwitchInfo(get_all_off_switch_map()),
);
udp_testbench.send_reply(&sim_reply);
std::thread::sleep(Duration::from_millis(10));
udp_testbench.check_no_sim_reply_available();
// Connect by sending a ping.
udp_testbench
.send_request(&SimRequest::new(SimTarget::SimCtrl, SimCtrlRequest::Ping))
.expect("sending request failed");
std::thread::sleep(Duration::from_millis(SERVER_WAIT_TIME_MS));
udp_testbench.check_next_sim_reply(&sim_reply);
// Now we check that the reply server can sent back replies to the client.
udp_testbench.stop();
server_thread.join().unwrap();
}
#[test]
fn test_udp_reply_server_old_replies_overwritten() {
let (mut udp_testbench, mut udp_server) =
UdpTestbench::new(true, None, 3).expect("could not create testbench");
let server_thread = std::thread::spawn(move || udp_server.run());
// The server only caches up to 3 replies.
let sim_reply = SimReply::new(SimTarget::SimCtrl, SimCtrlReply::Pong);
for _ in 0..4 {
udp_testbench.send_reply(&sim_reply);
}
std::thread::sleep(Duration::from_millis(20));
udp_testbench.check_no_sim_reply_available();
// Connect by sending a ping.
udp_testbench
.send_request(&SimRequest::new(SimTarget::SimCtrl, SimCtrlRequest::Ping))
.expect("sending request failed");
std::thread::sleep(Duration::from_millis(SERVER_WAIT_TIME_MS));
for _ in 0..3 {
udp_testbench.check_next_sim_reply(&sim_reply);
}
udp_testbench.check_no_sim_reply_available();
udp_testbench.stop();
server_thread.join().unwrap();
}
} }