diff --git a/nexosim/build.rs b/nexosim/build.rs index 72ccc67..908f394 100644 --- a/nexosim/build.rs +++ b/nexosim/build.rs @@ -3,7 +3,7 @@ fn main() -> Result<(), Box> { tonic_build::configure() .build_client(false) .out_dir("src/grpc/codegen/") - .compile(&["simulation.proto"], &["src/grpc/api/"])?; + .compile_protos(&["simulation.proto"], &["src/grpc/api/"])?; Ok(()) } diff --git a/nexosim/src/executor.rs b/nexosim/src/executor.rs index 80456f8..2a7c99d 100644 --- a/nexosim/src/executor.rs +++ b/nexosim/src/executor.rs @@ -23,8 +23,8 @@ static NEXT_EXECUTOR_ID: AtomicUsize = AtomicUsize::new(0); #[derive(Debug)] pub(crate) enum ExecutorError { - /// The simulation has deadlocked. - Deadlock, + /// Not all messages have been processed. + UnprocessedMessages(usize), /// The simulation has timed out. Timeout, /// The simulation has panicked. diff --git a/nexosim/src/executor/mt_executor.rs b/nexosim/src/executor/mt_executor.rs index bb9517c..017830e 100644 --- a/nexosim/src/executor/mt_executor.rs +++ b/nexosim/src/executor/mt_executor.rs @@ -250,9 +250,9 @@ impl Executor { if self.context.pool_manager.pool_is_idle() { let msg_count = self.context.msg_count.load(Ordering::Relaxed); if msg_count != 0 { - assert!(msg_count > 0); + let msg_count: usize = msg_count.try_into().unwrap(); - return Err(ExecutorError::Deadlock); + return Err(ExecutorError::UnprocessedMessages(msg_count)); } return Ok(()); diff --git a/nexosim/src/executor/st_executor.rs b/nexosim/src/executor/st_executor.rs index 9c1008e..6fd6bee 100644 --- a/nexosim/src/executor/st_executor.rs +++ b/nexosim/src/executor/st_executor.rs @@ -193,12 +193,12 @@ impl ExecutorInner { return Err(ExecutorError::Panic(model_id, payload)); } - // Check for deadlock. + // Check for unprocessed messages. self.context.msg_count = channel::THREAD_MSG_COUNT.replace(msg_count_stash); if self.context.msg_count != 0 { - assert!(self.context.msg_count > 0); + let msg_count: usize = self.context.msg_count.try_into().unwrap(); - return Err(ExecutorError::Deadlock); + return Err(ExecutorError::UnprocessedMessages(msg_count)); } Ok(()) diff --git a/nexosim/src/grpc/api/simulation.proto b/nexosim/src/grpc/api/simulation.proto index 92358ce..8a070ba 100644 --- a/nexosim/src/grpc/api/simulation.proto +++ b/nexosim/src/grpc/api/simulation.proto @@ -12,11 +12,12 @@ enum ErrorCode { SIMULATION_NOT_STARTED = 1; SIMULATION_TERMINATED = 2; SIMULATION_DEADLOCK = 3; - SIMULATION_PANIC = 4; - SIMULATION_TIMEOUT = 5; - SIMULATION_OUT_OF_SYNC = 6; - SIMULATION_BAD_QUERY = 7; - SIMULATION_TIME_OUT_OF_RANGE = 8; + SIMULATION_MESSAGE_LOSS = 4; + SIMULATION_PANIC = 5; + SIMULATION_TIMEOUT = 6; + SIMULATION_OUT_OF_SYNC = 7; + SIMULATION_BAD_QUERY = 8; + SIMULATION_TIME_OUT_OF_RANGE = 9; MISSING_ARGUMENT = 20; INVALID_TIME = 30; INVALID_PERIOD = 31; diff --git a/nexosim/src/grpc/codegen/simulation.rs b/nexosim/src/grpc/codegen/simulation.rs index a1406fb..1f75c9b 100644 --- a/nexosim/src/grpc/codegen/simulation.rs +++ b/nexosim/src/grpc/codegen/simulation.rs @@ -338,11 +338,12 @@ pub enum ErrorCode { SimulationNotStarted = 1, SimulationTerminated = 2, SimulationDeadlock = 3, - SimulationPanic = 4, - SimulationTimeout = 5, - SimulationOutOfSync = 6, - SimulationBadQuery = 7, - SimulationTimeOutOfRange = 8, + SimulationMessageLoss = 4, + SimulationPanic = 5, + SimulationTimeout = 6, + SimulationOutOfSync = 7, + SimulationBadQuery = 8, + SimulationTimeOutOfRange = 9, MissingArgument = 20, InvalidTime = 30, InvalidPeriod = 31, @@ -359,23 +360,24 @@ impl ErrorCode { /// (if the ProtoBuf definition does not change) and safe for programmatic use. pub fn as_str_name(&self) -> &'static str { match self { - ErrorCode::InternalError => "INTERNAL_ERROR", - ErrorCode::SimulationNotStarted => "SIMULATION_NOT_STARTED", - ErrorCode::SimulationTerminated => "SIMULATION_TERMINATED", - ErrorCode::SimulationDeadlock => "SIMULATION_DEADLOCK", - ErrorCode::SimulationPanic => "SIMULATION_PANIC", - ErrorCode::SimulationTimeout => "SIMULATION_TIMEOUT", - ErrorCode::SimulationOutOfSync => "SIMULATION_OUT_OF_SYNC", - ErrorCode::SimulationBadQuery => "SIMULATION_BAD_QUERY", - ErrorCode::SimulationTimeOutOfRange => "SIMULATION_TIME_OUT_OF_RANGE", - ErrorCode::MissingArgument => "MISSING_ARGUMENT", - ErrorCode::InvalidTime => "INVALID_TIME", - ErrorCode::InvalidPeriod => "INVALID_PERIOD", - ErrorCode::InvalidDeadline => "INVALID_DEADLINE", - ErrorCode::InvalidMessage => "INVALID_MESSAGE", - ErrorCode::InvalidKey => "INVALID_KEY", - ErrorCode::SourceNotFound => "SOURCE_NOT_FOUND", - ErrorCode::SinkNotFound => "SINK_NOT_FOUND", + Self::InternalError => "INTERNAL_ERROR", + Self::SimulationNotStarted => "SIMULATION_NOT_STARTED", + Self::SimulationTerminated => "SIMULATION_TERMINATED", + Self::SimulationDeadlock => "SIMULATION_DEADLOCK", + Self::SimulationMessageLoss => "SIMULATION_MESSAGE_LOSS", + Self::SimulationPanic => "SIMULATION_PANIC", + Self::SimulationTimeout => "SIMULATION_TIMEOUT", + Self::SimulationOutOfSync => "SIMULATION_OUT_OF_SYNC", + Self::SimulationBadQuery => "SIMULATION_BAD_QUERY", + Self::SimulationTimeOutOfRange => "SIMULATION_TIME_OUT_OF_RANGE", + Self::MissingArgument => "MISSING_ARGUMENT", + Self::InvalidTime => "INVALID_TIME", + Self::InvalidPeriod => "INVALID_PERIOD", + Self::InvalidDeadline => "INVALID_DEADLINE", + Self::InvalidMessage => "INVALID_MESSAGE", + Self::InvalidKey => "INVALID_KEY", + Self::SourceNotFound => "SOURCE_NOT_FOUND", + Self::SinkNotFound => "SINK_NOT_FOUND", } } /// Creates an enum from field names used in the ProtoBuf definition. @@ -385,6 +387,7 @@ impl ErrorCode { "SIMULATION_NOT_STARTED" => Some(Self::SimulationNotStarted), "SIMULATION_TERMINATED" => Some(Self::SimulationTerminated), "SIMULATION_DEADLOCK" => Some(Self::SimulationDeadlock), + "SIMULATION_MESSAGE_LOSS" => Some(Self::SimulationMessageLoss), "SIMULATION_PANIC" => Some(Self::SimulationPanic), "SIMULATION_TIMEOUT" => Some(Self::SimulationTimeout), "SIMULATION_OUT_OF_SYNC" => Some(Self::SimulationOutOfSync), @@ -404,7 +407,13 @@ impl ErrorCode { } /// Generated server implementations. pub mod simulation_server { - #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value, + )] use tonic::codegen::*; /// Generated trait containing gRPC methods that should be implemented for use with SimulationServer. #[async_trait] @@ -1033,17 +1042,19 @@ pub mod simulation_server { } _ => { Box::pin(async move { - Ok( - http::Response::builder() - .status(200) - .header("grpc-status", tonic::Code::Unimplemented as i32) - .header( - http::header::CONTENT_TYPE, - tonic::metadata::GRPC_CONTENT_TYPE, - ) - .body(empty_body()) - .unwrap(), - ) + let mut response = http::Response::new(empty_body()); + let headers = response.headers_mut(); + headers + .insert( + tonic::Status::GRPC_STATUS, + (tonic::Code::Unimplemented as i32).into(), + ); + headers + .insert( + http::header::CONTENT_TYPE, + tonic::metadata::GRPC_CONTENT_TYPE, + ); + Ok(response) }) } } diff --git a/nexosim/src/grpc/services.rs b/nexosim/src/grpc/services.rs index 5bce9ff..858bdc1 100644 --- a/nexosim/src/grpc/services.rs +++ b/nexosim/src/grpc/services.rs @@ -34,6 +34,7 @@ fn simulation_not_started_error() -> Error { fn map_execution_error(error: ExecutionError) -> Error { let error_code = match error { ExecutionError::Deadlock(_) => ErrorCode::SimulationDeadlock, + ExecutionError::MessageLoss(_) => ErrorCode::SimulationMessageLoss, ExecutionError::Panic { .. } => ErrorCode::SimulationPanic, ExecutionError::Timeout => ErrorCode::SimulationTimeout, ExecutionError::OutOfSync(_) => ErrorCode::SimulationOutOfSync, diff --git a/nexosim/src/simulation.rs b/nexosim/src/simulation.rs index 67bd8ac..e8c0cc5 100644 --- a/nexosim/src/simulation.rs +++ b/nexosim/src/simulation.rs @@ -333,7 +333,7 @@ impl Simulation { } self.executor.run(self.timeout).map_err(|e| match e { - ExecutorError::Deadlock => { + ExecutorError::UnprocessedMessages(msg_count) => { self.is_terminated = true; let mut deadlock_info = Vec::new(); for (model, observer) in &self.observers { @@ -346,7 +346,11 @@ impl Simulation { } } - ExecutionError::Deadlock(deadlock_info) + if deadlock_info.is_empty() { + ExecutionError::MessageLoss(msg_count) + } else { + ExecutionError::Deadlock(deadlock_info) + } } ExecutorError::Timeout => { self.is_terminated = true; @@ -525,14 +529,22 @@ pub struct DeadlockInfo { /// An error returned upon simulation execution failure. #[derive(Debug)] pub enum ExecutionError { - /// The simulation has been terminated due to an earlier deadlock, model - /// panic, timeout or synchronization loss. + /// The simulation has been terminated due to an earlier deadlock, message + /// loss, model panic, timeout or synchronization loss. Terminated, /// The simulation has deadlocked due to the enlisted models. /// /// This is a fatal error: any subsequent attempt to run the simulation will /// return an [`ExecutionError::Terminated`] error. Deadlock(Vec), + /// One or more message were left unprocessed because the recipient's + /// mailbox was not migrated to the simulation. + /// + /// The payload indicates the number of lost messages. + /// + /// This is a fatal error: any subsequent attempt to run the simulation will + /// return an [`ExecutionError::Terminated`] error. + MessageLoss(usize), /// A panic was caught during execution. /// /// This is a fatal error: any subsequent attempt to run the simulation will @@ -604,6 +616,9 @@ impl fmt::Display for ExecutionError { Ok(()) } + Self::MessageLoss(count) => { + write!(f, "{} messages have been lost", count) + } Self::Panic{model, payload} => { let msg: &str = if let Some(s) = payload.downcast_ref::<&str>() { s diff --git a/nexosim/tests/integration/main.rs b/nexosim/tests/integration/main.rs index 99d1658..c76016d 100644 --- a/nexosim/tests/integration/main.rs +++ b/nexosim/tests/integration/main.rs @@ -5,6 +5,7 @@ mod model_scheduling; #[cfg(not(miri))] mod simulation_clock_sync; mod simulation_deadlock; +mod simulation_message_loss; mod simulation_panic; mod simulation_scheduling; #[cfg(not(miri))] diff --git a/nexosim/tests/integration/simulation_message_loss.rs b/nexosim/tests/integration/simulation_message_loss.rs new file mode 100644 index 0000000..b47115f --- /dev/null +++ b/nexosim/tests/integration/simulation_message_loss.rs @@ -0,0 +1,99 @@ +//! Message loss detection. + +use nexosim::model::Model; +use nexosim::ports::{Output, Requestor}; +use nexosim::simulation::{ExecutionError, Mailbox, SimInit}; +use nexosim::time::MonotonicTime; + +const MT_NUM_THREADS: usize = 4; + +#[derive(Default)] +struct TestModel { + output: Output<()>, + requestor: Requestor<(), ()>, +} +impl TestModel { + async fn activate_output_twice(&mut self) { + self.output.send(()).await; + self.output.send(()).await; + } + async fn activate_requestor_twice(&mut self) { + let _ = self.requestor.send(()).await; + let _ = self.requestor.send(()).await; + } +} +impl Model for TestModel {} + +/// Loose an event. +fn event_loss(num_threads: usize) { + let mut model = TestModel::default(); + let mbox = Mailbox::new(); + let addr = mbox.address(); + let bad_mbox = Mailbox::new(); + + // Make two self-connections so that each outgoing message generates two + // incoming messages. + model + .output + .connect(TestModel::activate_output_twice, &bad_mbox); + + let t0 = MonotonicTime::EPOCH; + let mut simu = SimInit::with_num_threads(num_threads) + .add_model(model, mbox, "") + .init(t0) + .unwrap() + .0; + + match simu.process_event(TestModel::activate_output_twice, (), addr) { + Err(ExecutionError::MessageLoss(msg_count)) => { + assert_eq!(msg_count, 2); + } + _ => panic!("message loss not detected"), + } +} + +/// Loose an event. +fn request_loss(num_threads: usize) { + let mut model = TestModel::default(); + let mbox = Mailbox::new(); + let addr = mbox.address(); + let bad_mbox = Mailbox::new(); + + model + .requestor + .connect(TestModel::activate_requestor_twice, &bad_mbox); + + let t0 = MonotonicTime::EPOCH; + let mut simu = SimInit::with_num_threads(num_threads) + .add_model(model, mbox, "") + .init(t0) + .unwrap() + .0; + + match simu.process_event(TestModel::activate_requestor_twice, (), addr) { + Err(ExecutionError::MessageLoss(msg_count)) => { + assert_eq!(msg_count, 1); + } + _ => panic!("message loss not detected"), + } +} + +#[test] +fn event_loss_st() { + event_loss(1); +} + +#[test] +fn event_loss_mt() { + event_loss(MT_NUM_THREADS); +} + +#[test] +fn request_loss_st() { + request_loss(1); +} + +#[test] +fn request_loss_mt() { + request_loss(MT_NUM_THREADS); +}