1
0
forked from ROMEO/nexosim

Detect lost messages

Messages sent to a mailbox that wasn't added to the simulation are now
specifically detected. Earlier this would be wrongly reported as a
deadlock.
This commit is contained in:
Serge Barral 2024-11-18 18:56:23 +01:00
parent 0da9e6649d
commit b1a02bd07f
10 changed files with 179 additions and 51 deletions

View File

@ -3,7 +3,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::configure() tonic_build::configure()
.build_client(false) .build_client(false)
.out_dir("src/grpc/codegen/") .out_dir("src/grpc/codegen/")
.compile(&["simulation.proto"], &["src/grpc/api/"])?; .compile_protos(&["simulation.proto"], &["src/grpc/api/"])?;
Ok(()) Ok(())
} }

View File

@ -23,8 +23,8 @@ static NEXT_EXECUTOR_ID: AtomicUsize = AtomicUsize::new(0);
#[derive(Debug)] #[derive(Debug)]
pub(crate) enum ExecutorError { pub(crate) enum ExecutorError {
/// The simulation has deadlocked. /// Not all messages have been processed.
Deadlock, UnprocessedMessages(usize),
/// The simulation has timed out. /// The simulation has timed out.
Timeout, Timeout,
/// The simulation has panicked. /// The simulation has panicked.

View File

@ -250,9 +250,9 @@ impl Executor {
if self.context.pool_manager.pool_is_idle() { if self.context.pool_manager.pool_is_idle() {
let msg_count = self.context.msg_count.load(Ordering::Relaxed); let msg_count = self.context.msg_count.load(Ordering::Relaxed);
if msg_count != 0 { if msg_count != 0 {
assert!(msg_count > 0); let msg_count: usize = msg_count.try_into().unwrap();
return Err(ExecutorError::Deadlock); return Err(ExecutorError::UnprocessedMessages(msg_count));
} }
return Ok(()); return Ok(());

View File

@ -193,12 +193,12 @@ impl ExecutorInner {
return Err(ExecutorError::Panic(model_id, payload)); return Err(ExecutorError::Panic(model_id, payload));
} }
// Check for deadlock. // Check for unprocessed messages.
self.context.msg_count = channel::THREAD_MSG_COUNT.replace(msg_count_stash); self.context.msg_count = channel::THREAD_MSG_COUNT.replace(msg_count_stash);
if self.context.msg_count != 0 { if self.context.msg_count != 0 {
assert!(self.context.msg_count > 0); let msg_count: usize = self.context.msg_count.try_into().unwrap();
return Err(ExecutorError::Deadlock); return Err(ExecutorError::UnprocessedMessages(msg_count));
} }
Ok(()) Ok(())

View File

@ -12,11 +12,12 @@ enum ErrorCode {
SIMULATION_NOT_STARTED = 1; SIMULATION_NOT_STARTED = 1;
SIMULATION_TERMINATED = 2; SIMULATION_TERMINATED = 2;
SIMULATION_DEADLOCK = 3; SIMULATION_DEADLOCK = 3;
SIMULATION_PANIC = 4; SIMULATION_MESSAGE_LOSS = 4;
SIMULATION_TIMEOUT = 5; SIMULATION_PANIC = 5;
SIMULATION_OUT_OF_SYNC = 6; SIMULATION_TIMEOUT = 6;
SIMULATION_BAD_QUERY = 7; SIMULATION_OUT_OF_SYNC = 7;
SIMULATION_TIME_OUT_OF_RANGE = 8; SIMULATION_BAD_QUERY = 8;
SIMULATION_TIME_OUT_OF_RANGE = 9;
MISSING_ARGUMENT = 20; MISSING_ARGUMENT = 20;
INVALID_TIME = 30; INVALID_TIME = 30;
INVALID_PERIOD = 31; INVALID_PERIOD = 31;

View File

@ -338,11 +338,12 @@ pub enum ErrorCode {
SimulationNotStarted = 1, SimulationNotStarted = 1,
SimulationTerminated = 2, SimulationTerminated = 2,
SimulationDeadlock = 3, SimulationDeadlock = 3,
SimulationPanic = 4, SimulationMessageLoss = 4,
SimulationTimeout = 5, SimulationPanic = 5,
SimulationOutOfSync = 6, SimulationTimeout = 6,
SimulationBadQuery = 7, SimulationOutOfSync = 7,
SimulationTimeOutOfRange = 8, SimulationBadQuery = 8,
SimulationTimeOutOfRange = 9,
MissingArgument = 20, MissingArgument = 20,
InvalidTime = 30, InvalidTime = 30,
InvalidPeriod = 31, InvalidPeriod = 31,
@ -359,23 +360,24 @@ impl ErrorCode {
/// (if the ProtoBuf definition does not change) and safe for programmatic use. /// (if the ProtoBuf definition does not change) and safe for programmatic use.
pub fn as_str_name(&self) -> &'static str { pub fn as_str_name(&self) -> &'static str {
match self { match self {
ErrorCode::InternalError => "INTERNAL_ERROR", Self::InternalError => "INTERNAL_ERROR",
ErrorCode::SimulationNotStarted => "SIMULATION_NOT_STARTED", Self::SimulationNotStarted => "SIMULATION_NOT_STARTED",
ErrorCode::SimulationTerminated => "SIMULATION_TERMINATED", Self::SimulationTerminated => "SIMULATION_TERMINATED",
ErrorCode::SimulationDeadlock => "SIMULATION_DEADLOCK", Self::SimulationDeadlock => "SIMULATION_DEADLOCK",
ErrorCode::SimulationPanic => "SIMULATION_PANIC", Self::SimulationMessageLoss => "SIMULATION_MESSAGE_LOSS",
ErrorCode::SimulationTimeout => "SIMULATION_TIMEOUT", Self::SimulationPanic => "SIMULATION_PANIC",
ErrorCode::SimulationOutOfSync => "SIMULATION_OUT_OF_SYNC", Self::SimulationTimeout => "SIMULATION_TIMEOUT",
ErrorCode::SimulationBadQuery => "SIMULATION_BAD_QUERY", Self::SimulationOutOfSync => "SIMULATION_OUT_OF_SYNC",
ErrorCode::SimulationTimeOutOfRange => "SIMULATION_TIME_OUT_OF_RANGE", Self::SimulationBadQuery => "SIMULATION_BAD_QUERY",
ErrorCode::MissingArgument => "MISSING_ARGUMENT", Self::SimulationTimeOutOfRange => "SIMULATION_TIME_OUT_OF_RANGE",
ErrorCode::InvalidTime => "INVALID_TIME", Self::MissingArgument => "MISSING_ARGUMENT",
ErrorCode::InvalidPeriod => "INVALID_PERIOD", Self::InvalidTime => "INVALID_TIME",
ErrorCode::InvalidDeadline => "INVALID_DEADLINE", Self::InvalidPeriod => "INVALID_PERIOD",
ErrorCode::InvalidMessage => "INVALID_MESSAGE", Self::InvalidDeadline => "INVALID_DEADLINE",
ErrorCode::InvalidKey => "INVALID_KEY", Self::InvalidMessage => "INVALID_MESSAGE",
ErrorCode::SourceNotFound => "SOURCE_NOT_FOUND", Self::InvalidKey => "INVALID_KEY",
ErrorCode::SinkNotFound => "SINK_NOT_FOUND", Self::SourceNotFound => "SOURCE_NOT_FOUND",
Self::SinkNotFound => "SINK_NOT_FOUND",
} }
} }
/// Creates an enum from field names used in the ProtoBuf definition. /// Creates an enum from field names used in the ProtoBuf definition.
@ -385,6 +387,7 @@ impl ErrorCode {
"SIMULATION_NOT_STARTED" => Some(Self::SimulationNotStarted), "SIMULATION_NOT_STARTED" => Some(Self::SimulationNotStarted),
"SIMULATION_TERMINATED" => Some(Self::SimulationTerminated), "SIMULATION_TERMINATED" => Some(Self::SimulationTerminated),
"SIMULATION_DEADLOCK" => Some(Self::SimulationDeadlock), "SIMULATION_DEADLOCK" => Some(Self::SimulationDeadlock),
"SIMULATION_MESSAGE_LOSS" => Some(Self::SimulationMessageLoss),
"SIMULATION_PANIC" => Some(Self::SimulationPanic), "SIMULATION_PANIC" => Some(Self::SimulationPanic),
"SIMULATION_TIMEOUT" => Some(Self::SimulationTimeout), "SIMULATION_TIMEOUT" => Some(Self::SimulationTimeout),
"SIMULATION_OUT_OF_SYNC" => Some(Self::SimulationOutOfSync), "SIMULATION_OUT_OF_SYNC" => Some(Self::SimulationOutOfSync),
@ -404,7 +407,13 @@ impl ErrorCode {
} }
/// Generated server implementations. /// Generated server implementations.
pub mod simulation_server { pub mod simulation_server {
#![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] #![allow(
unused_variables,
dead_code,
missing_docs,
clippy::wildcard_imports,
clippy::let_unit_value,
)]
use tonic::codegen::*; use tonic::codegen::*;
/// Generated trait containing gRPC methods that should be implemented for use with SimulationServer. /// Generated trait containing gRPC methods that should be implemented for use with SimulationServer.
#[async_trait] #[async_trait]
@ -1033,17 +1042,19 @@ pub mod simulation_server {
} }
_ => { _ => {
Box::pin(async move { Box::pin(async move {
Ok( let mut response = http::Response::new(empty_body());
http::Response::builder() let headers = response.headers_mut();
.status(200) headers
.header("grpc-status", tonic::Code::Unimplemented as i32) .insert(
.header( tonic::Status::GRPC_STATUS,
(tonic::Code::Unimplemented as i32).into(),
);
headers
.insert(
http::header::CONTENT_TYPE, http::header::CONTENT_TYPE,
tonic::metadata::GRPC_CONTENT_TYPE, tonic::metadata::GRPC_CONTENT_TYPE,
) );
.body(empty_body()) Ok(response)
.unwrap(),
)
}) })
} }
} }

View File

@ -34,6 +34,7 @@ fn simulation_not_started_error() -> Error {
fn map_execution_error(error: ExecutionError) -> Error { fn map_execution_error(error: ExecutionError) -> Error {
let error_code = match error { let error_code = match error {
ExecutionError::Deadlock(_) => ErrorCode::SimulationDeadlock, ExecutionError::Deadlock(_) => ErrorCode::SimulationDeadlock,
ExecutionError::MessageLoss(_) => ErrorCode::SimulationMessageLoss,
ExecutionError::Panic { .. } => ErrorCode::SimulationPanic, ExecutionError::Panic { .. } => ErrorCode::SimulationPanic,
ExecutionError::Timeout => ErrorCode::SimulationTimeout, ExecutionError::Timeout => ErrorCode::SimulationTimeout,
ExecutionError::OutOfSync(_) => ErrorCode::SimulationOutOfSync, ExecutionError::OutOfSync(_) => ErrorCode::SimulationOutOfSync,

View File

@ -333,7 +333,7 @@ impl Simulation {
} }
self.executor.run(self.timeout).map_err(|e| match e { self.executor.run(self.timeout).map_err(|e| match e {
ExecutorError::Deadlock => { ExecutorError::UnprocessedMessages(msg_count) => {
self.is_terminated = true; self.is_terminated = true;
let mut deadlock_info = Vec::new(); let mut deadlock_info = Vec::new();
for (model, observer) in &self.observers { for (model, observer) in &self.observers {
@ -346,8 +346,12 @@ impl Simulation {
} }
} }
if deadlock_info.is_empty() {
ExecutionError::MessageLoss(msg_count)
} else {
ExecutionError::Deadlock(deadlock_info) ExecutionError::Deadlock(deadlock_info)
} }
}
ExecutorError::Timeout => { ExecutorError::Timeout => {
self.is_terminated = true; self.is_terminated = true;
@ -525,14 +529,22 @@ pub struct DeadlockInfo {
/// An error returned upon simulation execution failure. /// An error returned upon simulation execution failure.
#[derive(Debug)] #[derive(Debug)]
pub enum ExecutionError { pub enum ExecutionError {
/// The simulation has been terminated due to an earlier deadlock, model /// The simulation has been terminated due to an earlier deadlock, message
/// panic, timeout or synchronization loss. /// loss, model panic, timeout or synchronization loss.
Terminated, Terminated,
/// The simulation has deadlocked due to the enlisted models. /// The simulation has deadlocked due to the enlisted models.
/// ///
/// This is a fatal error: any subsequent attempt to run the simulation will /// This is a fatal error: any subsequent attempt to run the simulation will
/// return an [`ExecutionError::Terminated`] error. /// return an [`ExecutionError::Terminated`] error.
Deadlock(Vec<DeadlockInfo>), Deadlock(Vec<DeadlockInfo>),
/// One or more message were left unprocessed because the recipient's
/// mailbox was not migrated to the simulation.
///
/// The payload indicates the number of lost messages.
///
/// This is a fatal error: any subsequent attempt to run the simulation will
/// return an [`ExecutionError::Terminated`] error.
MessageLoss(usize),
/// A panic was caught during execution. /// A panic was caught during execution.
/// ///
/// This is a fatal error: any subsequent attempt to run the simulation will /// This is a fatal error: any subsequent attempt to run the simulation will
@ -604,6 +616,9 @@ impl fmt::Display for ExecutionError {
Ok(()) Ok(())
} }
Self::MessageLoss(count) => {
write!(f, "{} messages have been lost", count)
}
Self::Panic{model, payload} => { Self::Panic{model, payload} => {
let msg: &str = if let Some(s) = payload.downcast_ref::<&str>() { let msg: &str = if let Some(s) = payload.downcast_ref::<&str>() {
s s

View File

@ -5,6 +5,7 @@ mod model_scheduling;
#[cfg(not(miri))] #[cfg(not(miri))]
mod simulation_clock_sync; mod simulation_clock_sync;
mod simulation_deadlock; mod simulation_deadlock;
mod simulation_message_loss;
mod simulation_panic; mod simulation_panic;
mod simulation_scheduling; mod simulation_scheduling;
#[cfg(not(miri))] #[cfg(not(miri))]

View File

@ -0,0 +1,99 @@
//! Message loss detection.
use nexosim::model::Model;
use nexosim::ports::{Output, Requestor};
use nexosim::simulation::{ExecutionError, Mailbox, SimInit};
use nexosim::time::MonotonicTime;
const MT_NUM_THREADS: usize = 4;
#[derive(Default)]
struct TestModel {
output: Output<()>,
requestor: Requestor<(), ()>,
}
impl TestModel {
async fn activate_output_twice(&mut self) {
self.output.send(()).await;
self.output.send(()).await;
}
async fn activate_requestor_twice(&mut self) {
let _ = self.requestor.send(()).await;
let _ = self.requestor.send(()).await;
}
}
impl Model for TestModel {}
/// Loose an event.
fn event_loss(num_threads: usize) {
let mut model = TestModel::default();
let mbox = Mailbox::new();
let addr = mbox.address();
let bad_mbox = Mailbox::new();
// Make two self-connections so that each outgoing message generates two
// incoming messages.
model
.output
.connect(TestModel::activate_output_twice, &bad_mbox);
let t0 = MonotonicTime::EPOCH;
let mut simu = SimInit::with_num_threads(num_threads)
.add_model(model, mbox, "")
.init(t0)
.unwrap()
.0;
match simu.process_event(TestModel::activate_output_twice, (), addr) {
Err(ExecutionError::MessageLoss(msg_count)) => {
assert_eq!(msg_count, 2);
}
_ => panic!("message loss not detected"),
}
}
/// Loose an event.
fn request_loss(num_threads: usize) {
let mut model = TestModel::default();
let mbox = Mailbox::new();
let addr = mbox.address();
let bad_mbox = Mailbox::new();
model
.requestor
.connect(TestModel::activate_requestor_twice, &bad_mbox);
let t0 = MonotonicTime::EPOCH;
let mut simu = SimInit::with_num_threads(num_threads)
.add_model(model, mbox, "")
.init(t0)
.unwrap()
.0;
match simu.process_event(TestModel::activate_requestor_twice, (), addr) {
Err(ExecutionError::MessageLoss(msg_count)) => {
assert_eq!(msg_count, 1);
}
_ => panic!("message loss not detected"),
}
}
#[test]
fn event_loss_st() {
event_loss(1);
}
#[test]
fn event_loss_mt() {
event_loss(MT_NUM_THREADS);
}
#[test]
fn request_loss_st() {
request_loss(1);
}
#[test]
fn request_loss_mt() {
request_loss(MT_NUM_THREADS);
}