1
0
forked from ROMEO/nexosim

Merge pull request #68 from asynchronics/feature/detect_lost_messages

Detect lost messages
This commit is contained in:
Jauhien Piatlicki 2024-11-19 12:59:49 +01:00 committed by GitHub
commit 224aea59e5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 179 additions and 51 deletions

View File

@ -3,7 +3,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::configure()
.build_client(false)
.out_dir("src/grpc/codegen/")
.compile(&["simulation.proto"], &["src/grpc/api/"])?;
.compile_protos(&["simulation.proto"], &["src/grpc/api/"])?;
Ok(())
}

View File

@ -23,8 +23,8 @@ static NEXT_EXECUTOR_ID: AtomicUsize = AtomicUsize::new(0);
#[derive(Debug)]
pub(crate) enum ExecutorError {
/// The simulation has deadlocked.
Deadlock,
/// Not all messages have been processed.
UnprocessedMessages(usize),
/// The simulation has timed out.
Timeout,
/// The simulation has panicked.

View File

@ -250,9 +250,9 @@ impl Executor {
if self.context.pool_manager.pool_is_idle() {
let msg_count = self.context.msg_count.load(Ordering::Relaxed);
if msg_count != 0 {
assert!(msg_count > 0);
let msg_count: usize = msg_count.try_into().unwrap();
return Err(ExecutorError::Deadlock);
return Err(ExecutorError::UnprocessedMessages(msg_count));
}
return Ok(());

View File

@ -193,12 +193,12 @@ impl ExecutorInner {
return Err(ExecutorError::Panic(model_id, payload));
}
// Check for deadlock.
// Check for unprocessed messages.
self.context.msg_count = channel::THREAD_MSG_COUNT.replace(msg_count_stash);
if self.context.msg_count != 0 {
assert!(self.context.msg_count > 0);
let msg_count: usize = self.context.msg_count.try_into().unwrap();
return Err(ExecutorError::Deadlock);
return Err(ExecutorError::UnprocessedMessages(msg_count));
}
Ok(())

View File

@ -12,11 +12,12 @@ enum ErrorCode {
SIMULATION_NOT_STARTED = 1;
SIMULATION_TERMINATED = 2;
SIMULATION_DEADLOCK = 3;
SIMULATION_PANIC = 4;
SIMULATION_TIMEOUT = 5;
SIMULATION_OUT_OF_SYNC = 6;
SIMULATION_BAD_QUERY = 7;
SIMULATION_TIME_OUT_OF_RANGE = 8;
SIMULATION_MESSAGE_LOSS = 4;
SIMULATION_PANIC = 5;
SIMULATION_TIMEOUT = 6;
SIMULATION_OUT_OF_SYNC = 7;
SIMULATION_BAD_QUERY = 8;
SIMULATION_TIME_OUT_OF_RANGE = 9;
MISSING_ARGUMENT = 20;
INVALID_TIME = 30;
INVALID_PERIOD = 31;

View File

@ -338,11 +338,12 @@ pub enum ErrorCode {
SimulationNotStarted = 1,
SimulationTerminated = 2,
SimulationDeadlock = 3,
SimulationPanic = 4,
SimulationTimeout = 5,
SimulationOutOfSync = 6,
SimulationBadQuery = 7,
SimulationTimeOutOfRange = 8,
SimulationMessageLoss = 4,
SimulationPanic = 5,
SimulationTimeout = 6,
SimulationOutOfSync = 7,
SimulationBadQuery = 8,
SimulationTimeOutOfRange = 9,
MissingArgument = 20,
InvalidTime = 30,
InvalidPeriod = 31,
@ -359,23 +360,24 @@ impl ErrorCode {
/// (if the ProtoBuf definition does not change) and safe for programmatic use.
pub fn as_str_name(&self) -> &'static str {
match self {
ErrorCode::InternalError => "INTERNAL_ERROR",
ErrorCode::SimulationNotStarted => "SIMULATION_NOT_STARTED",
ErrorCode::SimulationTerminated => "SIMULATION_TERMINATED",
ErrorCode::SimulationDeadlock => "SIMULATION_DEADLOCK",
ErrorCode::SimulationPanic => "SIMULATION_PANIC",
ErrorCode::SimulationTimeout => "SIMULATION_TIMEOUT",
ErrorCode::SimulationOutOfSync => "SIMULATION_OUT_OF_SYNC",
ErrorCode::SimulationBadQuery => "SIMULATION_BAD_QUERY",
ErrorCode::SimulationTimeOutOfRange => "SIMULATION_TIME_OUT_OF_RANGE",
ErrorCode::MissingArgument => "MISSING_ARGUMENT",
ErrorCode::InvalidTime => "INVALID_TIME",
ErrorCode::InvalidPeriod => "INVALID_PERIOD",
ErrorCode::InvalidDeadline => "INVALID_DEADLINE",
ErrorCode::InvalidMessage => "INVALID_MESSAGE",
ErrorCode::InvalidKey => "INVALID_KEY",
ErrorCode::SourceNotFound => "SOURCE_NOT_FOUND",
ErrorCode::SinkNotFound => "SINK_NOT_FOUND",
Self::InternalError => "INTERNAL_ERROR",
Self::SimulationNotStarted => "SIMULATION_NOT_STARTED",
Self::SimulationTerminated => "SIMULATION_TERMINATED",
Self::SimulationDeadlock => "SIMULATION_DEADLOCK",
Self::SimulationMessageLoss => "SIMULATION_MESSAGE_LOSS",
Self::SimulationPanic => "SIMULATION_PANIC",
Self::SimulationTimeout => "SIMULATION_TIMEOUT",
Self::SimulationOutOfSync => "SIMULATION_OUT_OF_SYNC",
Self::SimulationBadQuery => "SIMULATION_BAD_QUERY",
Self::SimulationTimeOutOfRange => "SIMULATION_TIME_OUT_OF_RANGE",
Self::MissingArgument => "MISSING_ARGUMENT",
Self::InvalidTime => "INVALID_TIME",
Self::InvalidPeriod => "INVALID_PERIOD",
Self::InvalidDeadline => "INVALID_DEADLINE",
Self::InvalidMessage => "INVALID_MESSAGE",
Self::InvalidKey => "INVALID_KEY",
Self::SourceNotFound => "SOURCE_NOT_FOUND",
Self::SinkNotFound => "SINK_NOT_FOUND",
}
}
/// Creates an enum from field names used in the ProtoBuf definition.
@ -385,6 +387,7 @@ impl ErrorCode {
"SIMULATION_NOT_STARTED" => Some(Self::SimulationNotStarted),
"SIMULATION_TERMINATED" => Some(Self::SimulationTerminated),
"SIMULATION_DEADLOCK" => Some(Self::SimulationDeadlock),
"SIMULATION_MESSAGE_LOSS" => Some(Self::SimulationMessageLoss),
"SIMULATION_PANIC" => Some(Self::SimulationPanic),
"SIMULATION_TIMEOUT" => Some(Self::SimulationTimeout),
"SIMULATION_OUT_OF_SYNC" => Some(Self::SimulationOutOfSync),
@ -404,7 +407,13 @@ impl ErrorCode {
}
/// Generated server implementations.
pub mod simulation_server {
#![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)]
#![allow(
unused_variables,
dead_code,
missing_docs,
clippy::wildcard_imports,
clippy::let_unit_value,
)]
use tonic::codegen::*;
/// Generated trait containing gRPC methods that should be implemented for use with SimulationServer.
#[async_trait]
@ -1033,17 +1042,19 @@ pub mod simulation_server {
}
_ => {
Box::pin(async move {
Ok(
http::Response::builder()
.status(200)
.header("grpc-status", tonic::Code::Unimplemented as i32)
.header(
http::header::CONTENT_TYPE,
tonic::metadata::GRPC_CONTENT_TYPE,
)
.body(empty_body())
.unwrap(),
)
let mut response = http::Response::new(empty_body());
let headers = response.headers_mut();
headers
.insert(
tonic::Status::GRPC_STATUS,
(tonic::Code::Unimplemented as i32).into(),
);
headers
.insert(
http::header::CONTENT_TYPE,
tonic::metadata::GRPC_CONTENT_TYPE,
);
Ok(response)
})
}
}

View File

@ -34,6 +34,7 @@ fn simulation_not_started_error() -> Error {
fn map_execution_error(error: ExecutionError) -> Error {
let error_code = match error {
ExecutionError::Deadlock(_) => ErrorCode::SimulationDeadlock,
ExecutionError::MessageLoss(_) => ErrorCode::SimulationMessageLoss,
ExecutionError::Panic { .. } => ErrorCode::SimulationPanic,
ExecutionError::Timeout => ErrorCode::SimulationTimeout,
ExecutionError::OutOfSync(_) => ErrorCode::SimulationOutOfSync,

View File

@ -333,7 +333,7 @@ impl Simulation {
}
self.executor.run(self.timeout).map_err(|e| match e {
ExecutorError::Deadlock => {
ExecutorError::UnprocessedMessages(msg_count) => {
self.is_terminated = true;
let mut deadlock_info = Vec::new();
for (model, observer) in &self.observers {
@ -346,7 +346,11 @@ impl Simulation {
}
}
ExecutionError::Deadlock(deadlock_info)
if deadlock_info.is_empty() {
ExecutionError::MessageLoss(msg_count)
} else {
ExecutionError::Deadlock(deadlock_info)
}
}
ExecutorError::Timeout => {
self.is_terminated = true;
@ -525,14 +529,22 @@ pub struct DeadlockInfo {
/// An error returned upon simulation execution failure.
#[derive(Debug)]
pub enum ExecutionError {
/// The simulation has been terminated due to an earlier deadlock, model
/// panic, timeout or synchronization loss.
/// The simulation has been terminated due to an earlier deadlock, message
/// loss, model panic, timeout or synchronization loss.
Terminated,
/// The simulation has deadlocked due to the enlisted models.
///
/// This is a fatal error: any subsequent attempt to run the simulation will
/// return an [`ExecutionError::Terminated`] error.
Deadlock(Vec<DeadlockInfo>),
/// One or more message were left unprocessed because the recipient's
/// mailbox was not migrated to the simulation.
///
/// The payload indicates the number of lost messages.
///
/// This is a fatal error: any subsequent attempt to run the simulation will
/// return an [`ExecutionError::Terminated`] error.
MessageLoss(usize),
/// A panic was caught during execution.
///
/// This is a fatal error: any subsequent attempt to run the simulation will
@ -604,6 +616,9 @@ impl fmt::Display for ExecutionError {
Ok(())
}
Self::MessageLoss(count) => {
write!(f, "{} messages have been lost", count)
}
Self::Panic{model, payload} => {
let msg: &str = if let Some(s) = payload.downcast_ref::<&str>() {
s

View File

@ -5,6 +5,7 @@ mod model_scheduling;
#[cfg(not(miri))]
mod simulation_clock_sync;
mod simulation_deadlock;
mod simulation_message_loss;
mod simulation_panic;
mod simulation_scheduling;
#[cfg(not(miri))]

View File

@ -0,0 +1,99 @@
//! Message loss detection.
use nexosim::model::Model;
use nexosim::ports::{Output, Requestor};
use nexosim::simulation::{ExecutionError, Mailbox, SimInit};
use nexosim::time::MonotonicTime;
const MT_NUM_THREADS: usize = 4;
#[derive(Default)]
struct TestModel {
output: Output<()>,
requestor: Requestor<(), ()>,
}
impl TestModel {
async fn activate_output_twice(&mut self) {
self.output.send(()).await;
self.output.send(()).await;
}
async fn activate_requestor_twice(&mut self) {
let _ = self.requestor.send(()).await;
let _ = self.requestor.send(()).await;
}
}
impl Model for TestModel {}
/// Loose an event.
fn event_loss(num_threads: usize) {
let mut model = TestModel::default();
let mbox = Mailbox::new();
let addr = mbox.address();
let bad_mbox = Mailbox::new();
// Make two self-connections so that each outgoing message generates two
// incoming messages.
model
.output
.connect(TestModel::activate_output_twice, &bad_mbox);
let t0 = MonotonicTime::EPOCH;
let mut simu = SimInit::with_num_threads(num_threads)
.add_model(model, mbox, "")
.init(t0)
.unwrap()
.0;
match simu.process_event(TestModel::activate_output_twice, (), addr) {
Err(ExecutionError::MessageLoss(msg_count)) => {
assert_eq!(msg_count, 2);
}
_ => panic!("message loss not detected"),
}
}
/// Loose an event.
fn request_loss(num_threads: usize) {
let mut model = TestModel::default();
let mbox = Mailbox::new();
let addr = mbox.address();
let bad_mbox = Mailbox::new();
model
.requestor
.connect(TestModel::activate_requestor_twice, &bad_mbox);
let t0 = MonotonicTime::EPOCH;
let mut simu = SimInit::with_num_threads(num_threads)
.add_model(model, mbox, "")
.init(t0)
.unwrap()
.0;
match simu.process_event(TestModel::activate_requestor_twice, (), addr) {
Err(ExecutionError::MessageLoss(msg_count)) => {
assert_eq!(msg_count, 1);
}
_ => panic!("message loss not detected"),
}
}
#[test]
fn event_loss_st() {
event_loss(1);
}
#[test]
fn event_loss_mt() {
event_loss(MT_NUM_THREADS);
}
#[test]
fn request_loss_st() {
request_loss(1);
}
#[test]
fn request_loss_mt() {
request_loss(MT_NUM_THREADS);
}