1
0
forked from ROMEO/nexosim

Merge pull request #70 from asynchronics/feature/report_missing_recipient

Report an error if a message cannot be delivered
This commit is contained in:
Jauhien Piatlicki 2024-11-27 13:05:50 +01:00 committed by GitHub
commit 31be2b0c75
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 425 additions and 325 deletions

View File

@ -95,52 +95,62 @@ jobs:
env: env:
MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=4 MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=4
- name: Run cargo miri example1 (single-threaded executor) - name: Run cargo miri espresso_machine (single-threaded executor)
run: cargo miri run --example espresso_machine run: cargo miri run --example espresso_machine
env: env:
MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=1 MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=1
- name: Run cargo miri example1 (multi-threaded executor) - name: Run cargo miri espresso_machine (multi-threaded executor)
run: cargo miri run --example espresso_machine run: cargo miri run --example espresso_machine
env: env:
MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=4 MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=4
- name: Run cargo miri example2 (single-threaded executor) - name: Run cargo miri power_supply (single-threaded executor)
run: cargo miri run --example power_supply run: cargo miri run --example power_supply
env: env:
MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=1 MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=1
- name: Run cargo miri example2 (multi-threaded executor) - name: Run cargo miri power_supply (multi-threaded executor)
run: cargo miri run --example power_supply run: cargo miri run --example power_supply
env: env:
MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=4 MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=4
- name: Run cargo miri example3 (single-threaded executor) - name: Run cargo miri stepper_motor (single-threaded executor)
run: cargo miri run --example stepper_motor run: cargo miri run --example stepper_motor
env: env:
MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=1 MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=1
- name: Run cargo miri example3 (multi-threaded executor) - name: Run cargo miri stepper_motor (multi-threaded executor)
run: cargo miri run --example stepper_motor run: cargo miri run --example stepper_motor
env: env:
MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=4 MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=4
- name: Run cargo miri example4 (single-threaded executor) - name: Run cargo miri assembly (single-threaded executor)
run: cargo miri run --example assembly run: cargo miri run --example assembly
env: env:
MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=1 MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=1
- name: Run cargo miri example4 (multi-threaded executor) - name: Run cargo miri assembly (multi-threaded executor)
run: cargo miri run --example assembly run: cargo miri run --example assembly
env: env:
MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=4 MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=4
- name: Run cargo miri example5 (single-threaded executor) - name: Run cargo miri uni_requestor (single-threaded executor)
run: cargo miri run --example uni_requestor
env:
MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=1
- name: Run cargo miri uni_requestor (multi-threaded executor)
run: cargo miri run --example uni_requestor
env:
MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=4
- name: Run cargo miri observables (single-threaded executor)
run: cargo miri run --example observables run: cargo miri run --example observables
env: env:
MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=1 MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=1
- name: Run cargo miri example5 (multi-threaded executor) - name: Run cargo miri observables (multi-threaded executor)
run: cargo miri run --example observables run: cargo miri run --example observables
env: env:
MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=4 MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=4

View File

@ -1,4 +1,4 @@
//! Example: sensor reading data from the environment model. //! Example: sensor reading data from environment model.
//! //!
//! This example demonstrates in particular: //! This example demonstrates in particular:
//! //!
@ -12,7 +12,7 @@
//! ┌─────────────┐ ┌──────────┐ //! ┌─────────────┐ ┌──────────┐
//! │ │ temperature │ │ overheat //! │ │ temperature │ │ overheat
//! Temperature ●─────────►│ Environment ├──────────────►│ Sensor ├──────────► //! Temperature ●─────────►│ Environment ├──────────────►│ Sensor ├──────────►
//! │ │ │ │ state //! │ │ │ │
//! └─────────────┘ └──────────┘ //! └─────────────┘ └──────────┘
//! ``` //! ```

View File

@ -13,11 +13,12 @@ enum ErrorCode {
SIMULATION_TERMINATED = 2; SIMULATION_TERMINATED = 2;
SIMULATION_DEADLOCK = 3; SIMULATION_DEADLOCK = 3;
SIMULATION_MESSAGE_LOSS = 4; SIMULATION_MESSAGE_LOSS = 4;
SIMULATION_PANIC = 5; SIMULATION_NO_RECIPIENT = 5;
SIMULATION_TIMEOUT = 6; SIMULATION_PANIC = 6;
SIMULATION_OUT_OF_SYNC = 7; SIMULATION_TIMEOUT = 7;
SIMULATION_BAD_QUERY = 8; SIMULATION_OUT_OF_SYNC = 8;
SIMULATION_TIME_OUT_OF_RANGE = 9; SIMULATION_BAD_QUERY = 9;
SIMULATION_TIME_OUT_OF_RANGE = 10;
MISSING_ARGUMENT = 20; MISSING_ARGUMENT = 20;
INVALID_TIME = 30; INVALID_TIME = 30;
INVALID_PERIOD = 31; INVALID_PERIOD = 31;

View File

@ -299,7 +299,10 @@ pub mod close_sink_reply {
#[derive(Clone, PartialEq, ::prost::Message)] #[derive(Clone, PartialEq, ::prost::Message)]
pub struct AnyRequest { pub struct AnyRequest {
/// Expects exactly 1 variant. /// Expects exactly 1 variant.
#[prost(oneof = "any_request::Request", tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11")] #[prost(
oneof = "any_request::Request",
tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11"
)]
pub request: ::core::option::Option<any_request::Request>, pub request: ::core::option::Option<any_request::Request>,
} }
/// Nested message and enum types in `AnyRequest`. /// Nested message and enum types in `AnyRequest`.
@ -339,11 +342,12 @@ pub enum ErrorCode {
SimulationTerminated = 2, SimulationTerminated = 2,
SimulationDeadlock = 3, SimulationDeadlock = 3,
SimulationMessageLoss = 4, SimulationMessageLoss = 4,
SimulationPanic = 5, SimulationNoRecipient = 5,
SimulationTimeout = 6, SimulationPanic = 6,
SimulationOutOfSync = 7, SimulationTimeout = 7,
SimulationBadQuery = 8, SimulationOutOfSync = 8,
SimulationTimeOutOfRange = 9, SimulationBadQuery = 9,
SimulationTimeOutOfRange = 10,
MissingArgument = 20, MissingArgument = 20,
InvalidTime = 30, InvalidTime = 30,
InvalidPeriod = 31, InvalidPeriod = 31,
@ -365,6 +369,7 @@ impl ErrorCode {
Self::SimulationTerminated => "SIMULATION_TERMINATED", Self::SimulationTerminated => "SIMULATION_TERMINATED",
Self::SimulationDeadlock => "SIMULATION_DEADLOCK", Self::SimulationDeadlock => "SIMULATION_DEADLOCK",
Self::SimulationMessageLoss => "SIMULATION_MESSAGE_LOSS", Self::SimulationMessageLoss => "SIMULATION_MESSAGE_LOSS",
Self::SimulationNoRecipient => "SIMULATION_NO_RECIPIENT",
Self::SimulationPanic => "SIMULATION_PANIC", Self::SimulationPanic => "SIMULATION_PANIC",
Self::SimulationTimeout => "SIMULATION_TIMEOUT", Self::SimulationTimeout => "SIMULATION_TIMEOUT",
Self::SimulationOutOfSync => "SIMULATION_OUT_OF_SYNC", Self::SimulationOutOfSync => "SIMULATION_OUT_OF_SYNC",
@ -388,6 +393,7 @@ impl ErrorCode {
"SIMULATION_TERMINATED" => Some(Self::SimulationTerminated), "SIMULATION_TERMINATED" => Some(Self::SimulationTerminated),
"SIMULATION_DEADLOCK" => Some(Self::SimulationDeadlock), "SIMULATION_DEADLOCK" => Some(Self::SimulationDeadlock),
"SIMULATION_MESSAGE_LOSS" => Some(Self::SimulationMessageLoss), "SIMULATION_MESSAGE_LOSS" => Some(Self::SimulationMessageLoss),
"SIMULATION_NO_RECIPIENT" => Some(Self::SimulationNoRecipient),
"SIMULATION_PANIC" => Some(Self::SimulationPanic), "SIMULATION_PANIC" => Some(Self::SimulationPanic),
"SIMULATION_TIMEOUT" => Some(Self::SimulationTimeout), "SIMULATION_TIMEOUT" => Some(Self::SimulationTimeout),
"SIMULATION_OUT_OF_SYNC" => Some(Self::SimulationOutOfSync), "SIMULATION_OUT_OF_SYNC" => Some(Self::SimulationOutOfSync),
@ -412,7 +418,7 @@ pub mod simulation_server {
dead_code, dead_code,
missing_docs, missing_docs,
clippy::wildcard_imports, clippy::wildcard_imports,
clippy::let_unit_value, clippy::let_unit_value
)] )]
use tonic::codegen::*; use tonic::codegen::*;
/// Generated trait containing gRPC methods that should be implemented for use with SimulationServer. /// Generated trait containing gRPC methods that should be implemented for use with SimulationServer.
@ -437,31 +443,19 @@ pub mod simulation_server {
async fn schedule_event( async fn schedule_event(
&self, &self,
request: tonic::Request<super::ScheduleEventRequest>, request: tonic::Request<super::ScheduleEventRequest>,
) -> std::result::Result< ) -> std::result::Result<tonic::Response<super::ScheduleEventReply>, tonic::Status>;
tonic::Response<super::ScheduleEventReply>,
tonic::Status,
>;
async fn cancel_event( async fn cancel_event(
&self, &self,
request: tonic::Request<super::CancelEventRequest>, request: tonic::Request<super::CancelEventRequest>,
) -> std::result::Result< ) -> std::result::Result<tonic::Response<super::CancelEventReply>, tonic::Status>;
tonic::Response<super::CancelEventReply>,
tonic::Status,
>;
async fn process_event( async fn process_event(
&self, &self,
request: tonic::Request<super::ProcessEventRequest>, request: tonic::Request<super::ProcessEventRequest>,
) -> std::result::Result< ) -> std::result::Result<tonic::Response<super::ProcessEventReply>, tonic::Status>;
tonic::Response<super::ProcessEventReply>,
tonic::Status,
>;
async fn process_query( async fn process_query(
&self, &self,
request: tonic::Request<super::ProcessQueryRequest>, request: tonic::Request<super::ProcessQueryRequest>,
) -> std::result::Result< ) -> std::result::Result<tonic::Response<super::ProcessQueryReply>, tonic::Status>;
tonic::Response<super::ProcessQueryReply>,
tonic::Status,
>;
async fn read_events( async fn read_events(
&self, &self,
request: tonic::Request<super::ReadEventsRequest>, request: tonic::Request<super::ReadEventsRequest>,
@ -496,10 +490,7 @@ pub mod simulation_server {
max_encoding_message_size: None, max_encoding_message_size: None,
} }
} }
pub fn with_interceptor<F>( pub fn with_interceptor<F>(inner: T, interceptor: F) -> InterceptedService<Self, F>
inner: T,
interceptor: F,
) -> InterceptedService<Self, F>
where where
F: tonic::service::Interceptor, F: tonic::service::Interceptor,
{ {
@ -554,21 +545,15 @@ pub mod simulation_server {
"/simulation.Simulation/Init" => { "/simulation.Simulation/Init" => {
#[allow(non_camel_case_types)] #[allow(non_camel_case_types)]
struct InitSvc<T: Simulation>(pub Arc<T>); struct InitSvc<T: Simulation>(pub Arc<T>);
impl<T: Simulation> tonic::server::UnaryService<super::InitRequest> impl<T: Simulation> tonic::server::UnaryService<super::InitRequest> for InitSvc<T> {
for InitSvc<T> {
type Response = super::InitReply; type Response = super::InitReply;
type Future = BoxFuture< type Future = BoxFuture<tonic::Response<Self::Response>, tonic::Status>;
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call( fn call(
&mut self, &mut self,
request: tonic::Request<super::InitRequest>, request: tonic::Request<super::InitRequest>,
) -> Self::Future { ) -> Self::Future {
let inner = Arc::clone(&self.0); let inner = Arc::clone(&self.0);
let fut = async move { let fut = async move { <T as Simulation>::init(&inner, request).await };
<T as Simulation>::init(&inner, request).await
};
Box::pin(fut) Box::pin(fut)
} }
} }
@ -597,21 +582,15 @@ pub mod simulation_server {
"/simulation.Simulation/Time" => { "/simulation.Simulation/Time" => {
#[allow(non_camel_case_types)] #[allow(non_camel_case_types)]
struct TimeSvc<T: Simulation>(pub Arc<T>); struct TimeSvc<T: Simulation>(pub Arc<T>);
impl<T: Simulation> tonic::server::UnaryService<super::TimeRequest> impl<T: Simulation> tonic::server::UnaryService<super::TimeRequest> for TimeSvc<T> {
for TimeSvc<T> {
type Response = super::TimeReply; type Response = super::TimeReply;
type Future = BoxFuture< type Future = BoxFuture<tonic::Response<Self::Response>, tonic::Status>;
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call( fn call(
&mut self, &mut self,
request: tonic::Request<super::TimeRequest>, request: tonic::Request<super::TimeRequest>,
) -> Self::Future { ) -> Self::Future {
let inner = Arc::clone(&self.0); let inner = Arc::clone(&self.0);
let fut = async move { let fut = async move { <T as Simulation>::time(&inner, request).await };
<T as Simulation>::time(&inner, request).await
};
Box::pin(fut) Box::pin(fut)
} }
} }
@ -640,21 +619,15 @@ pub mod simulation_server {
"/simulation.Simulation/Step" => { "/simulation.Simulation/Step" => {
#[allow(non_camel_case_types)] #[allow(non_camel_case_types)]
struct StepSvc<T: Simulation>(pub Arc<T>); struct StepSvc<T: Simulation>(pub Arc<T>);
impl<T: Simulation> tonic::server::UnaryService<super::StepRequest> impl<T: Simulation> tonic::server::UnaryService<super::StepRequest> for StepSvc<T> {
for StepSvc<T> {
type Response = super::StepReply; type Response = super::StepReply;
type Future = BoxFuture< type Future = BoxFuture<tonic::Response<Self::Response>, tonic::Status>;
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call( fn call(
&mut self, &mut self,
request: tonic::Request<super::StepRequest>, request: tonic::Request<super::StepRequest>,
) -> Self::Future { ) -> Self::Future {
let inner = Arc::clone(&self.0); let inner = Arc::clone(&self.0);
let fut = async move { let fut = async move { <T as Simulation>::step(&inner, request).await };
<T as Simulation>::step(&inner, request).await
};
Box::pin(fut) Box::pin(fut)
} }
} }
@ -683,23 +656,16 @@ pub mod simulation_server {
"/simulation.Simulation/StepUntil" => { "/simulation.Simulation/StepUntil" => {
#[allow(non_camel_case_types)] #[allow(non_camel_case_types)]
struct StepUntilSvc<T: Simulation>(pub Arc<T>); struct StepUntilSvc<T: Simulation>(pub Arc<T>);
impl< impl<T: Simulation> tonic::server::UnaryService<super::StepUntilRequest> for StepUntilSvc<T> {
T: Simulation,
> tonic::server::UnaryService<super::StepUntilRequest>
for StepUntilSvc<T> {
type Response = super::StepUntilReply; type Response = super::StepUntilReply;
type Future = BoxFuture< type Future = BoxFuture<tonic::Response<Self::Response>, tonic::Status>;
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call( fn call(
&mut self, &mut self,
request: tonic::Request<super::StepUntilRequest>, request: tonic::Request<super::StepUntilRequest>,
) -> Self::Future { ) -> Self::Future {
let inner = Arc::clone(&self.0); let inner = Arc::clone(&self.0);
let fut = async move { let fut =
<T as Simulation>::step_until(&inner, request).await async move { <T as Simulation>::step_until(&inner, request).await };
};
Box::pin(fut) Box::pin(fut)
} }
} }
@ -728,15 +694,11 @@ pub mod simulation_server {
"/simulation.Simulation/ScheduleEvent" => { "/simulation.Simulation/ScheduleEvent" => {
#[allow(non_camel_case_types)] #[allow(non_camel_case_types)]
struct ScheduleEventSvc<T: Simulation>(pub Arc<T>); struct ScheduleEventSvc<T: Simulation>(pub Arc<T>);
impl< impl<T: Simulation> tonic::server::UnaryService<super::ScheduleEventRequest>
T: Simulation, for ScheduleEventSvc<T>
> tonic::server::UnaryService<super::ScheduleEventRequest> {
for ScheduleEventSvc<T> {
type Response = super::ScheduleEventReply; type Response = super::ScheduleEventReply;
type Future = BoxFuture< type Future = BoxFuture<tonic::Response<Self::Response>, tonic::Status>;
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call( fn call(
&mut self, &mut self,
request: tonic::Request<super::ScheduleEventRequest>, request: tonic::Request<super::ScheduleEventRequest>,
@ -773,15 +735,9 @@ pub mod simulation_server {
"/simulation.Simulation/CancelEvent" => { "/simulation.Simulation/CancelEvent" => {
#[allow(non_camel_case_types)] #[allow(non_camel_case_types)]
struct CancelEventSvc<T: Simulation>(pub Arc<T>); struct CancelEventSvc<T: Simulation>(pub Arc<T>);
impl< impl<T: Simulation> tonic::server::UnaryService<super::CancelEventRequest> for CancelEventSvc<T> {
T: Simulation,
> tonic::server::UnaryService<super::CancelEventRequest>
for CancelEventSvc<T> {
type Response = super::CancelEventReply; type Response = super::CancelEventReply;
type Future = BoxFuture< type Future = BoxFuture<tonic::Response<Self::Response>, tonic::Status>;
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call( fn call(
&mut self, &mut self,
request: tonic::Request<super::CancelEventRequest>, request: tonic::Request<super::CancelEventRequest>,
@ -818,15 +774,9 @@ pub mod simulation_server {
"/simulation.Simulation/ProcessEvent" => { "/simulation.Simulation/ProcessEvent" => {
#[allow(non_camel_case_types)] #[allow(non_camel_case_types)]
struct ProcessEventSvc<T: Simulation>(pub Arc<T>); struct ProcessEventSvc<T: Simulation>(pub Arc<T>);
impl< impl<T: Simulation> tonic::server::UnaryService<super::ProcessEventRequest> for ProcessEventSvc<T> {
T: Simulation,
> tonic::server::UnaryService<super::ProcessEventRequest>
for ProcessEventSvc<T> {
type Response = super::ProcessEventReply; type Response = super::ProcessEventReply;
type Future = BoxFuture< type Future = BoxFuture<tonic::Response<Self::Response>, tonic::Status>;
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call( fn call(
&mut self, &mut self,
request: tonic::Request<super::ProcessEventRequest>, request: tonic::Request<super::ProcessEventRequest>,
@ -863,15 +813,9 @@ pub mod simulation_server {
"/simulation.Simulation/ProcessQuery" => { "/simulation.Simulation/ProcessQuery" => {
#[allow(non_camel_case_types)] #[allow(non_camel_case_types)]
struct ProcessQuerySvc<T: Simulation>(pub Arc<T>); struct ProcessQuerySvc<T: Simulation>(pub Arc<T>);
impl< impl<T: Simulation> tonic::server::UnaryService<super::ProcessQueryRequest> for ProcessQuerySvc<T> {
T: Simulation,
> tonic::server::UnaryService<super::ProcessQueryRequest>
for ProcessQuerySvc<T> {
type Response = super::ProcessQueryReply; type Response = super::ProcessQueryReply;
type Future = BoxFuture< type Future = BoxFuture<tonic::Response<Self::Response>, tonic::Status>;
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call( fn call(
&mut self, &mut self,
request: tonic::Request<super::ProcessQueryRequest>, request: tonic::Request<super::ProcessQueryRequest>,
@ -908,15 +852,9 @@ pub mod simulation_server {
"/simulation.Simulation/ReadEvents" => { "/simulation.Simulation/ReadEvents" => {
#[allow(non_camel_case_types)] #[allow(non_camel_case_types)]
struct ReadEventsSvc<T: Simulation>(pub Arc<T>); struct ReadEventsSvc<T: Simulation>(pub Arc<T>);
impl< impl<T: Simulation> tonic::server::UnaryService<super::ReadEventsRequest> for ReadEventsSvc<T> {
T: Simulation,
> tonic::server::UnaryService<super::ReadEventsRequest>
for ReadEventsSvc<T> {
type Response = super::ReadEventsReply; type Response = super::ReadEventsReply;
type Future = BoxFuture< type Future = BoxFuture<tonic::Response<Self::Response>, tonic::Status>;
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call( fn call(
&mut self, &mut self,
request: tonic::Request<super::ReadEventsRequest>, request: tonic::Request<super::ReadEventsRequest>,
@ -953,23 +891,16 @@ pub mod simulation_server {
"/simulation.Simulation/OpenSink" => { "/simulation.Simulation/OpenSink" => {
#[allow(non_camel_case_types)] #[allow(non_camel_case_types)]
struct OpenSinkSvc<T: Simulation>(pub Arc<T>); struct OpenSinkSvc<T: Simulation>(pub Arc<T>);
impl< impl<T: Simulation> tonic::server::UnaryService<super::OpenSinkRequest> for OpenSinkSvc<T> {
T: Simulation,
> tonic::server::UnaryService<super::OpenSinkRequest>
for OpenSinkSvc<T> {
type Response = super::OpenSinkReply; type Response = super::OpenSinkReply;
type Future = BoxFuture< type Future = BoxFuture<tonic::Response<Self::Response>, tonic::Status>;
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call( fn call(
&mut self, &mut self,
request: tonic::Request<super::OpenSinkRequest>, request: tonic::Request<super::OpenSinkRequest>,
) -> Self::Future { ) -> Self::Future {
let inner = Arc::clone(&self.0); let inner = Arc::clone(&self.0);
let fut = async move { let fut =
<T as Simulation>::open_sink(&inner, request).await async move { <T as Simulation>::open_sink(&inner, request).await };
};
Box::pin(fut) Box::pin(fut)
} }
} }
@ -998,23 +929,16 @@ pub mod simulation_server {
"/simulation.Simulation/CloseSink" => { "/simulation.Simulation/CloseSink" => {
#[allow(non_camel_case_types)] #[allow(non_camel_case_types)]
struct CloseSinkSvc<T: Simulation>(pub Arc<T>); struct CloseSinkSvc<T: Simulation>(pub Arc<T>);
impl< impl<T: Simulation> tonic::server::UnaryService<super::CloseSinkRequest> for CloseSinkSvc<T> {
T: Simulation,
> tonic::server::UnaryService<super::CloseSinkRequest>
for CloseSinkSvc<T> {
type Response = super::CloseSinkReply; type Response = super::CloseSinkReply;
type Future = BoxFuture< type Future = BoxFuture<tonic::Response<Self::Response>, tonic::Status>;
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call( fn call(
&mut self, &mut self,
request: tonic::Request<super::CloseSinkRequest>, request: tonic::Request<super::CloseSinkRequest>,
) -> Self::Future { ) -> Self::Future {
let inner = Arc::clone(&self.0); let inner = Arc::clone(&self.0);
let fut = async move { let fut =
<T as Simulation>::close_sink(&inner, request).await async move { <T as Simulation>::close_sink(&inner, request).await };
};
Box::pin(fut) Box::pin(fut)
} }
} }
@ -1040,23 +964,19 @@ pub mod simulation_server {
}; };
Box::pin(fut) Box::pin(fut)
} }
_ => { _ => Box::pin(async move {
Box::pin(async move {
let mut response = http::Response::new(empty_body()); let mut response = http::Response::new(empty_body());
let headers = response.headers_mut(); let headers = response.headers_mut();
headers headers.insert(
.insert(
tonic::Status::GRPC_STATUS, tonic::Status::GRPC_STATUS,
(tonic::Code::Unimplemented as i32).into(), (tonic::Code::Unimplemented as i32).into(),
); );
headers headers.insert(
.insert(
http::header::CONTENT_TYPE, http::header::CONTENT_TYPE,
tonic::metadata::GRPC_CONTENT_TYPE, tonic::metadata::GRPC_CONTENT_TYPE,
); );
Ok(response) Ok(response)
}) }),
}
} }
} }
} }

View File

@ -35,6 +35,7 @@ fn map_execution_error(error: ExecutionError) -> Error {
let error_code = match error { let error_code = match error {
ExecutionError::Deadlock(_) => ErrorCode::SimulationDeadlock, ExecutionError::Deadlock(_) => ErrorCode::SimulationDeadlock,
ExecutionError::MessageLoss(_) => ErrorCode::SimulationMessageLoss, ExecutionError::MessageLoss(_) => ErrorCode::SimulationMessageLoss,
ExecutionError::NoRecipient { .. } => ErrorCode::SimulationNoRecipient,
ExecutionError::Panic { .. } => ErrorCode::SimulationPanic, ExecutionError::Panic { .. } => ErrorCode::SimulationPanic,
ExecutionError::Timeout => ErrorCode::SimulationTimeout, ExecutionError::Timeout => ErrorCode::SimulationTimeout,
ExecutionError::OutOfSync(_) => ErrorCode::SimulationOutOfSync, ExecutionError::OutOfSync(_) => ErrorCode::SimulationOutOfSync,

View File

@ -8,6 +8,7 @@ use crate::ports::EventSink;
use crate::ports::{InputFn, ReplierFn}; use crate::ports::{InputFn, ReplierFn};
use crate::simulation::Address; use crate::simulation::Address;
use crate::util::cached_rw_lock::CachedRwLock; use crate::util::cached_rw_lock::CachedRwLock;
use crate::util::unwrap_or_throw::UnwrapOrThrow;
use broadcaster::{EventBroadcaster, QueryBroadcaster}; use broadcaster::{EventBroadcaster, QueryBroadcaster};
use sender::{FilterMapReplierSender, Sender}; use sender::{FilterMapReplierSender, Sender};
@ -32,7 +33,7 @@ pub struct Output<T: Clone + Send + 'static> {
} }
impl<T: Clone + Send + 'static> Output<T> { impl<T: Clone + Send + 'static> Output<T> {
/// Creates a new, disconnected `Output` port. /// Creates a disconnected `Output` port.
pub fn new() -> Self { pub fn new() -> Self {
Self::default() Self::default()
} }
@ -146,7 +147,7 @@ impl<T: Clone + Send + 'static> Output<T> {
/// Broadcasts an event to all connected input ports. /// Broadcasts an event to all connected input ports.
pub async fn send(&mut self, arg: T) { pub async fn send(&mut self, arg: T) {
let broadcaster = self.broadcaster.write_scratchpad().unwrap(); let broadcaster = self.broadcaster.write_scratchpad().unwrap();
broadcaster.broadcast(arg).await.unwrap(); broadcaster.broadcast(arg).await.unwrap_or_throw();
} }
} }
@ -183,7 +184,7 @@ pub struct Requestor<T: Clone + Send + 'static, R: Send + 'static> {
} }
impl<T: Clone + Send + 'static, R: Send + 'static> Requestor<T, R> { impl<T: Clone + Send + 'static, R: Send + 'static> Requestor<T, R> {
/// Creates a new, disconnected `Requestor` port. /// Creates a disconnected `Requestor` port.
pub fn new() -> Self { pub fn new() -> Self {
Self::default() Self::default()
} }
@ -250,7 +251,7 @@ impl<T: Clone + Send + 'static, R: Send + 'static> Requestor<T, R> {
/// closure plus, optionally, a context reference. /// closure plus, optionally, a context reference.
pub fn filter_map_connect<M, C, D, F, U, Q, S>( pub fn filter_map_connect<M, C, D, F, U, Q, S>(
&mut self, &mut self,
query_filer_map: C, query_filter_map: C,
reply_map: D, reply_map: D,
replier: F, replier: F,
address: impl Into<Address<M>>, address: impl Into<Address<M>>,
@ -264,7 +265,7 @@ impl<T: Clone + Send + 'static, R: Send + 'static> Requestor<T, R> {
S: Send + 'static, S: Send + 'static,
{ {
let sender = Box::new(FilterMapReplierSender::new( let sender = Box::new(FilterMapReplierSender::new(
query_filer_map, query_filter_map,
reply_map, reply_map,
replier, replier,
address.into().0, address.into().0,
@ -279,7 +280,7 @@ impl<T: Clone + Send + 'static, R: Send + 'static> Requestor<T, R> {
.unwrap() .unwrap()
.broadcast(arg) .broadcast(arg)
.await .await
.unwrap() .unwrap_or_throw()
} }
} }
@ -303,7 +304,7 @@ impl<T: Clone + Send + 'static, R: Send + 'static> fmt::Debug for Requestor<T, R
/// A requestor port with exactly one connection. /// A requestor port with exactly one connection.
/// ///
/// A `UniRequestor` port is connected to a unique replier port, i.e. to an /// A `UniRequestor` port is connected to a replier port, i.e. to an
/// asynchronous model method that returns a value. /// asynchronous model method that returns a value.
#[derive(Clone)] #[derive(Clone)]
pub struct UniRequestor<T: Clone + Send + 'static, R: Send + 'static> { pub struct UniRequestor<T: Clone + Send + 'static, R: Send + 'static> {
@ -311,7 +312,7 @@ pub struct UniRequestor<T: Clone + Send + 'static, R: Send + 'static> {
} }
impl<T: Clone + Send + 'static, R: Send + 'static> UniRequestor<T, R> { impl<T: Clone + Send + 'static, R: Send + 'static> UniRequestor<T, R> {
/// Creates a new `UniRequestor` port connected to a replier port of the model /// Creates a `UniRequestor` port connected to a replier port of the model
/// specified by the address. /// specified by the address.
/// ///
/// The replier port must be an asynchronous method of a model of type `M` /// The replier port must be an asynchronous method of a model of type `M`
@ -328,8 +329,8 @@ impl<T: Clone + Send + 'static, R: Send + 'static> UniRequestor<T, R> {
Self { sender } Self { sender }
} }
/// Creates a new `UniRequestor` port connected with auto-conversion to a /// Creates an auto-converting `UniRequestor` port connected to a replier
/// replier port of the model specified by the address. /// port of the model specified by the address.
/// ///
/// Queries and replies are mapped to other types using the closures /// Queries and replies are mapped to other types using the closures
/// provided in argument. /// provided in argument.
@ -363,8 +364,8 @@ impl<T: Clone + Send + 'static, R: Send + 'static> UniRequestor<T, R> {
Self { sender } Self { sender }
} }
/// Creates a new `UniRequestor` port connected with filtering and /// Creates an auto-converting, filtered `UniRequestor` port connected to a
/// auto-conversion to a replier port of the model specified by the address. /// replier port of the model specified by the address.
/// ///
/// Queries and replies are mapped to other types using the closures /// Queries and replies are mapped to other types using the closures
/// provided in argument, or ignored if the query closure returns `None`. /// provided in argument, or ignored if the query closure returns `None`.
@ -374,7 +375,7 @@ impl<T: Clone + Send + 'static, R: Send + 'static> UniRequestor<T, R> {
/// taking as argument a value of the type returned by the query mapping /// taking as argument a value of the type returned by the query mapping
/// closure plus, optionally, a context reference. /// closure plus, optionally, a context reference.
pub fn with_filter_map<M, C, D, F, U, Q, S>( pub fn with_filter_map<M, C, D, F, U, Q, S>(
query_filer_map: C, query_filter_map: C,
reply_map: D, reply_map: D,
replier: F, replier: F,
address: impl Into<Address<M>>, address: impl Into<Address<M>>,
@ -389,7 +390,7 @@ impl<T: Clone + Send + 'static, R: Send + 'static> UniRequestor<T, R> {
S: Send + 'static, S: Send + 'static,
{ {
let sender = Box::new(FilterMapReplierSender::new( let sender = Box::new(FilterMapReplierSender::new(
query_filer_map, query_filter_map,
reply_map, reply_map,
replier, replier,
address.into().0, address.into().0,
@ -401,7 +402,8 @@ impl<T: Clone + Send + 'static, R: Send + 'static> UniRequestor<T, R> {
/// Sends a query to the connected replier port. /// Sends a query to the connected replier port.
pub async fn send(&mut self, arg: T) -> Option<R> { pub async fn send(&mut self, arg: T) -> Option<R> {
if let Some(fut) = self.sender.send_owned(arg) { if let Some(fut) = self.sender.send_owned(arg) {
let output = fut.await.unwrap(); let output = fut.await.unwrap_or_throw();
Some(output) Some(output)
} else { } else {
None None

View File

@ -5,7 +5,8 @@ use std::task::{Context, Poll};
use diatomic_waker::WakeSink; use diatomic_waker::WakeSink;
use super::sender::{RecycledFuture, SendError, Sender}; use super::sender::{RecycledFuture, Sender};
use crate::channel::SendError;
use crate::util::task_set::TaskSet; use crate::util::task_set::TaskSet;
/// An object that can efficiently broadcast messages to several addresses. /// An object that can efficiently broadcast messages to several addresses.
@ -142,7 +143,7 @@ impl<T: Clone> EventBroadcaster<T> {
} }
/// Broadcasts an event to all addresses. /// Broadcasts an event to all addresses.
pub(super) async fn broadcast(&mut self, arg: T) -> Result<(), BroadcastError> { pub(super) async fn broadcast(&mut self, arg: T) -> Result<(), SendError> {
match self.inner.senders.as_mut_slice() { match self.inner.senders.as_mut_slice() {
// No sender. // No sender.
[] => Ok(()), [] => Ok(()),
@ -150,7 +151,7 @@ impl<T: Clone> EventBroadcaster<T> {
// One sender at most. // One sender at most.
[sender] => match sender.send_owned(arg) { [sender] => match sender.send_owned(arg) {
None => Ok(()), None => Ok(()),
Some(fut) => fut.await.map_err(|_| BroadcastError {}), Some(fut) => fut.await,
}, },
// Possibly multiple senders. // Possibly multiple senders.
@ -158,7 +159,7 @@ impl<T: Clone> EventBroadcaster<T> {
let (shared, mut futures) = self.inner.futures(arg); let (shared, mut futures) = self.inner.futures(arg);
match futures.as_mut_slice() { match futures.as_mut_slice() {
[] => Ok(()), [] => Ok(()),
[fut] => fut.await.map_err(|_| BroadcastError {}), [fut] => fut.await,
_ => BroadcastFuture::new(shared, futures).await, _ => BroadcastFuture::new(shared, futures).await,
} }
} }
@ -206,7 +207,7 @@ impl<T: Clone, R> QueryBroadcaster<T, R> {
pub(super) async fn broadcast( pub(super) async fn broadcast(
&mut self, &mut self,
arg: T, arg: T,
) -> Result<impl Iterator<Item = R> + '_, BroadcastError> { ) -> Result<impl Iterator<Item = R> + '_, SendError> {
let output_count = match self.inner.senders.as_mut_slice() { let output_count = match self.inner.senders.as_mut_slice() {
// No sender. // No sender.
[] => 0, [] => 0,
@ -214,7 +215,7 @@ impl<T: Clone, R> QueryBroadcaster<T, R> {
// One sender at most. // One sender at most.
[sender] => { [sender] => {
if let Some(fut) = sender.send_owned(arg) { if let Some(fut) = sender.send_owned(arg) {
let output = fut.await.map_err(|_| BroadcastError {})?; let output = fut.await?;
self.inner.shared.outputs[0] = Some(output); self.inner.shared.outputs[0] = Some(output);
1 1
@ -231,7 +232,7 @@ impl<T: Clone, R> QueryBroadcaster<T, R> {
match futures.as_mut_slice() { match futures.as_mut_slice() {
[] => {} [] => {}
[fut] => { [fut] => {
let output = fut.await.map_err(|_| BroadcastError {})?; let output = fut.await?;
shared.outputs[0] = Some(output); shared.outputs[0] = Some(output);
} }
_ => { _ => {
@ -361,7 +362,7 @@ impl<'a, R> Drop for BroadcastFuture<'a, R> {
} }
impl<'a, R> Future for BroadcastFuture<'a, R> { impl<'a, R> Future for BroadcastFuture<'a, R> {
type Output = Result<(), BroadcastError>; type Output = Result<(), SendError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = &mut *self; let this = &mut *self;
@ -392,10 +393,10 @@ impl<'a, R> Future for BroadcastFuture<'a, R> {
*output = Some(o); *output = Some(o);
this.pending_futures_count -= 1; this.pending_futures_count -= 1;
} }
Poll::Ready(Err(_)) => { Poll::Ready(Err(SendError)) => {
this.state = FutureState::Completed; this.state = FutureState::Completed;
return Poll::Ready(Err(BroadcastError {})); return Poll::Ready(Err(SendError));
} }
Poll::Pending => {} Poll::Pending => {}
} }
@ -450,10 +451,10 @@ impl<'a, R> Future for BroadcastFuture<'a, R> {
*output = Some(o); *output = Some(o);
this.pending_futures_count -= 1; this.pending_futures_count -= 1;
} }
Poll::Ready(Err(_)) => { Poll::Ready(Err(SendError)) => {
this.state = FutureState::Completed; this.state = FutureState::Completed;
return Poll::Ready(Err(BroadcastError {})); return Poll::Ready(Err(SendError));
} }
Poll::Pending => {} Poll::Pending => {}
} }
@ -468,10 +469,6 @@ impl<'a, R> Future for BroadcastFuture<'a, R> {
} }
} }
/// Error returned when a message could not be delivered.
#[derive(Debug)]
pub(super) struct BroadcastError {}
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
enum FutureState { enum FutureState {
Uninit, Uninit,

View File

@ -1,5 +1,3 @@
use std::error::Error;
use std::fmt;
use std::future::Future; use std::future::Future;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::mem::ManuallyDrop; use std::mem::ManuallyDrop;
@ -11,6 +9,7 @@ use dyn_clone::DynClone;
use recycle_box::{coerce_box, RecycleBox}; use recycle_box::{coerce_box, RecycleBox};
use crate::channel; use crate::channel;
use crate::channel::SendError;
use crate::model::Model; use crate::model::Model;
use crate::ports::{EventSinkWriter, InputFn, ReplierFn}; use crate::ports::{EventSinkWriter, InputFn, ReplierFn};
@ -75,9 +74,7 @@ where
coerce_box!(RecycleBox::recycle(recycle_box, fut)) coerce_box!(RecycleBox::recycle(recycle_box, fut))
}); });
Some(RecycledFuture::new(&mut self.fut_storage, async move { Some(RecycledFuture::new(&mut self.fut_storage, fut))
fut.await.map_err(|_| SendError {})
}))
} }
} }
@ -147,9 +144,7 @@ where
coerce_box!(RecycleBox::recycle(recycle_box, fut)) coerce_box!(RecycleBox::recycle(recycle_box, fut))
}); });
Some(RecycledFuture::new(&mut self.fut_storage, async move { Some(RecycledFuture::new(&mut self.fut_storage, fut))
fut.await.map_err(|_| SendError {})
}))
} }
} }
@ -221,9 +216,7 @@ where
coerce_box!(RecycleBox::recycle(recycle_box, fut)) coerce_box!(RecycleBox::recycle(recycle_box, fut))
}); });
RecycledFuture::new(&mut self.fut_storage, async move { RecycledFuture::new(&mut self.fut_storage, fut)
fut.await.map_err(|_| SendError {})
})
}) })
} }
} }
@ -474,12 +467,12 @@ where
Some(RecycledFuture::new(fut_storage, async move { Some(RecycledFuture::new(fut_storage, async move {
// Send the message. // Send the message.
send_fut.await.map_err(|_| SendError {})?; send_fut.await?;
// Wait until the message is processed and the reply is sent back. // Wait until the message is processed and the reply is sent back.
// If an error is received, it most likely means the mailbox was // If an error is received, it most likely means the mailbox was
// dropped before the message was processed. // dropped before the message was processed.
reply_receiver.recv().await.map_err(|_| SendError {}) reply_receiver.recv().await.map_err(|_| SendError)
})) }))
} }
} }
@ -574,7 +567,7 @@ where
Some(RecycledFuture::new(fut_storage, async move { Some(RecycledFuture::new(fut_storage, async move {
// Send the message. // Send the message.
send_fut.await.map_err(|_| SendError {})?; send_fut.await?;
// Wait until the message is processed and the reply is sent back. // Wait until the message is processed and the reply is sent back.
// If an error is received, it most likely means the mailbox was // If an error is received, it most likely means the mailbox was
@ -582,7 +575,7 @@ where
reply_receiver reply_receiver
.recv() .recv()
.await .await
.map_err(|_| SendError {}) .map_err(|_| SendError)
.map(reply_map) .map(reply_map)
})) }))
} }
@ -687,7 +680,7 @@ where
RecycledFuture::new(fut_storage, async move { RecycledFuture::new(fut_storage, async move {
// Send the message. // Send the message.
send_fut.await.map_err(|_| SendError {})?; send_fut.await?;
// Wait until the message is processed and the reply is sent back. // Wait until the message is processed and the reply is sent back.
// If an error is received, it most likely means the mailbox was // If an error is received, it most likely means the mailbox was
@ -695,7 +688,7 @@ where
reply_receiver reply_receiver
.recv() .recv()
.await .await
.map_err(|_| SendError {}) .map_err(|_| SendError)
.map(reply_map) .map(reply_map)
}) })
}) })
@ -723,18 +716,6 @@ where
} }
} }
/// Error returned when the mailbox was closed or dropped.
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub(super) struct SendError {}
impl fmt::Display for SendError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "sending message into a closed mailbox")
}
}
impl Error for SendError {}
pub(super) struct RecycledFuture<'a, T> { pub(super) struct RecycledFuture<'a, T> {
fut: ManuallyDrop<Pin<RecycleBox<dyn Future<Output = T> + Send + 'a>>>, fut: ManuallyDrop<Pin<RecycleBox<dyn Future<Output = T> + Send + 'a>>>,
lender_box: &'a mut Option<RecycleBox<()>>, lender_box: &'a mut Option<RecycleBox<()>>,

View File

@ -11,6 +11,7 @@ use crate::simulation::{
Action, ActionKey, Address, KeyedOnceAction, KeyedPeriodicAction, OnceAction, PeriodicAction, Action, ActionKey, Address, KeyedOnceAction, KeyedPeriodicAction, OnceAction, PeriodicAction,
}; };
use crate::util::slot; use crate::util::slot;
use crate::util::unwrap_or_throw::UnwrapOrThrow;
use broadcaster::{EventBroadcaster, QueryBroadcaster, ReplyIterator}; use broadcaster::{EventBroadcaster, QueryBroadcaster, ReplyIterator};
use sender::{ use sender::{
@ -31,7 +32,7 @@ pub struct EventSource<T: Clone + Send + 'static> {
} }
impl<T: Clone + Send + 'static> EventSource<T> { impl<T: Clone + Send + 'static> EventSource<T> {
/// Creates a new, disconnected `EventSource` port. /// Creates a disconnected `EventSource` port.
pub fn new() -> Self { pub fn new() -> Self {
Self::default() Self::default()
} }
@ -106,7 +107,7 @@ impl<T: Clone + Send + 'static> EventSource<T> {
pub fn event(&mut self, arg: T) -> Action { pub fn event(&mut self, arg: T) -> Action {
let fut = self.broadcaster.lock().unwrap().broadcast(arg); let fut = self.broadcaster.lock().unwrap().broadcast(arg);
let fut = async { let fut = async {
fut.await.unwrap(); fut.await.unwrap_or_throw();
}; };
Action::new(OnceAction::new(fut)) Action::new(OnceAction::new(fut))
@ -123,12 +124,12 @@ impl<T: Clone + Send + 'static> EventSource<T> {
let action = Action::new(KeyedOnceAction::new( let action = Action::new(KeyedOnceAction::new(
// Cancellation is ignored once the action is already spawned on the // Cancellation is ignored once the action is already spawned on the
// executor. This means the action cannot be cancelled while the // executor. This means the action cannot be cancelled once the
// simulation is running, but since an event source is meant to be // simulation step targeted by the action is running, but since an
// used outside the simulator, this shouldn't be an issue in // event source is meant to be used outside the simulator, this
// practice. // shouldn't be an issue in practice.
|_| async { |_| async {
fut.await.unwrap(); fut.await.unwrap_or_throw();
}, },
action_key.clone(), action_key.clone(),
)); ));
@ -147,7 +148,7 @@ impl<T: Clone + Send + 'static> EventSource<T> {
Action::new(PeriodicAction::new( Action::new(PeriodicAction::new(
|| async move { || async move {
let fut = broadcaster.lock().unwrap().broadcast(arg); let fut = broadcaster.lock().unwrap().broadcast(arg);
fut.await.unwrap(); fut.await.unwrap_or_throw();
}, },
period, period,
)) ))
@ -171,7 +172,7 @@ impl<T: Clone + Send + 'static> EventSource<T> {
// practice. // practice.
|_| async move { |_| async move {
let fut = broadcaster.lock().unwrap().broadcast(arg); let fut = broadcaster.lock().unwrap().broadcast(arg);
fut.await.unwrap(); fut.await.unwrap_or_throw();
}, },
period, period,
action_key.clone(), action_key.clone(),
@ -211,7 +212,7 @@ pub struct QuerySource<T: Clone + Send + 'static, R: Send + 'static> {
} }
impl<T: Clone + Send + 'static, R: Send + 'static> QuerySource<T, R> { impl<T: Clone + Send + 'static, R: Send + 'static> QuerySource<T, R> {
/// Creates a new, disconnected `EventSource` port. /// Creates a disconnected `EventSource` port.
pub fn new() -> Self { pub fn new() -> Self {
Self::default() Self::default()
} }
@ -309,7 +310,7 @@ impl<T: Clone + Send + 'static, R: Send + 'static> QuerySource<T, R> {
let (writer, reader) = slot::slot(); let (writer, reader) = slot::slot();
let fut = self.broadcaster.lock().unwrap().broadcast(arg); let fut = self.broadcaster.lock().unwrap().broadcast(arg);
let fut = async move { let fut = async move {
let replies = fut.await.unwrap(); let replies = fut.await.unwrap_or_throw();
let _ = writer.write(replies); let _ = writer.write(replies);
}; };

View File

@ -10,6 +10,7 @@ use diatomic_waker::WakeSink;
use super::sender::{Sender, SenderFuture}; use super::sender::{Sender, SenderFuture};
use crate::channel::SendError;
use crate::util::task_set::TaskSet; use crate::util::task_set::TaskSet;
/// An object that can efficiently broadcast messages to several addresses. /// An object that can efficiently broadcast messages to several addresses.
@ -109,7 +110,7 @@ impl<T: Clone + Send> EventBroadcaster<T> {
pub(super) fn broadcast( pub(super) fn broadcast(
&mut self, &mut self,
arg: T, arg: T,
) -> impl Future<Output = Result<(), BroadcastError>> + Send { ) -> impl Future<Output = Result<(), SendError>> + Send {
enum Fut<F1, F2> { enum Fut<F1, F2> {
Empty, Empty,
Single(F1), Single(F1),
@ -130,13 +131,13 @@ impl<T: Clone + Send> EventBroadcaster<T> {
// No sender. // No sender.
Fut::Empty | Fut::Single(None) => Ok(()), Fut::Empty | Fut::Single(None) => Ok(()),
Fut::Single(Some(fut)) => fut.await.map_err(|_| BroadcastError {}), Fut::Single(Some(fut)) => fut.await,
Fut::Multiple(mut futures) => match futures.as_mut_slice() { Fut::Multiple(mut futures) => match futures.as_mut_slice() {
// No sender. // No sender.
[] => Ok(()), [] => Ok(()),
// One sender. // One sender.
[SenderFutureState::Pending(fut)] => fut.await.map_err(|_| BroadcastError {}), [SenderFutureState::Pending(fut)] => fut.await,
// Multiple senders. // Multiple senders.
_ => BroadcastFuture::new(futures).await.map(|_| ()), _ => BroadcastFuture::new(futures).await.map(|_| ()),
}, },
@ -185,7 +186,7 @@ impl<T: Clone + Send, R: Send> QueryBroadcaster<T, R> {
pub(super) fn broadcast( pub(super) fn broadcast(
&mut self, &mut self,
arg: T, arg: T,
) -> impl Future<Output = Result<ReplyIterator<R>, BroadcastError>> + Send { ) -> impl Future<Output = Result<ReplyIterator<R>, SendError>> + Send {
enum Fut<F1, F2> { enum Fut<F1, F2> {
Empty, Empty,
Single(F1), Single(F1),
@ -208,25 +209,19 @@ impl<T: Clone + Send, R: Send> QueryBroadcaster<T, R> {
Fut::Single(Some(fut)) => fut Fut::Single(Some(fut)) => fut
.await .await
.map(|reply| ReplyIterator(vec![SenderFutureState::Ready(reply)].into_iter())) .map(|reply| ReplyIterator(vec![SenderFutureState::Ready(reply)].into_iter())),
.map_err(|_| BroadcastError {}),
Fut::Multiple(mut futures) => match futures.as_mut_slice() { Fut::Multiple(mut futures) => match futures.as_mut_slice() {
// No sender. // No sender.
[] => Ok(ReplyIterator(Vec::new().into_iter())), [] => Ok(ReplyIterator(Vec::new().into_iter())),
// One sender. // One sender.
[SenderFutureState::Pending(fut)] => fut [SenderFutureState::Pending(fut)] => fut.await.map(|reply| {
.await
.map(|reply| {
ReplyIterator(vec![SenderFutureState::Ready(reply)].into_iter()) ReplyIterator(vec![SenderFutureState::Ready(reply)].into_iter())
}) }),
.map_err(|_| BroadcastError {}),
// Multiple senders. // Multiple senders.
_ => BroadcastFuture::new(futures) _ => BroadcastFuture::new(futures).await,
.await
.map_err(|_| BroadcastError {}),
}, },
} }
} }
@ -280,7 +275,7 @@ impl<R> BroadcastFuture<R> {
} }
impl<R> Future for BroadcastFuture<R> { impl<R> Future for BroadcastFuture<R> {
type Output = Result<ReplyIterator<R>, BroadcastError>; type Output = Result<ReplyIterator<R>, SendError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = &mut *self; let this = &mut *self;
@ -300,10 +295,10 @@ impl<R> Future for BroadcastFuture<R> {
this.future_states[task_idx] = SenderFutureState::Ready(output); this.future_states[task_idx] = SenderFutureState::Ready(output);
this.pending_futures_count -= 1; this.pending_futures_count -= 1;
} }
Poll::Ready(Err(_)) => { Poll::Ready(Err(SendError)) => {
this.state = FutureState::Completed; this.state = FutureState::Completed;
return Poll::Ready(Err(BroadcastError {})); return Poll::Ready(Err(SendError));
} }
Poll::Pending => {} Poll::Pending => {}
} }
@ -353,10 +348,10 @@ impl<R> Future for BroadcastFuture<R> {
this.future_states[task_idx] = SenderFutureState::Ready(output); this.future_states[task_idx] = SenderFutureState::Ready(output);
this.pending_futures_count -= 1; this.pending_futures_count -= 1;
} }
Poll::Ready(Err(_)) => { Poll::Ready(Err(SendError)) => {
this.state = FutureState::Completed; this.state = FutureState::Completed;
return Poll::Ready(Err(BroadcastError {})); return Poll::Ready(Err(SendError));
} }
Poll::Pending => {} Poll::Pending => {}
} }
@ -373,10 +368,6 @@ impl<R> Future for BroadcastFuture<R> {
} }
} }
/// Error returned when a message could not be delivered.
#[derive(Debug)]
pub(super) struct BroadcastError {}
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
enum FutureState { enum FutureState {
Uninit, Uninit,
@ -749,7 +740,6 @@ mod tests {
use waker_fn::waker_fn; use waker_fn::waker_fn;
use super::super::sender::SendError;
use super::*; use super::*;
// An event that may be waken spuriously. // An event that may be waken spuriously.

View File

@ -1,5 +1,3 @@
use std::error::Error;
use std::fmt;
use std::future::Future; use std::future::Future;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::pin::Pin; use std::pin::Pin;
@ -9,6 +7,7 @@ use futures_channel::oneshot;
use recycle_box::{coerce_box, RecycleBox}; use recycle_box::{coerce_box, RecycleBox};
use crate::channel; use crate::channel;
use crate::channel::SendError;
use crate::model::Model; use crate::model::Model;
use crate::ports::{InputFn, ReplierFn}; use crate::ports::{InputFn, ReplierFn};
@ -73,7 +72,6 @@ where
coerce_box!(RecycleBox::recycle(recycle_box, fut)) coerce_box!(RecycleBox::recycle(recycle_box, fut))
}) })
.await .await
.map_err(|_| SendError {})
})) }))
} }
} }
@ -129,7 +127,6 @@ where
coerce_box!(RecycleBox::recycle(recycle_box, fut)) coerce_box!(RecycleBox::recycle(recycle_box, fut))
}) })
.await .await
.map_err(|_| SendError {})
})) }))
} }
} }
@ -185,7 +182,6 @@ where
coerce_box!(RecycleBox::recycle(recycle_box, fut)) coerce_box!(RecycleBox::recycle(recycle_box, fut))
}) })
.await .await
.map_err(|_| SendError {})
}) as SenderFuture<()> }) as SenderFuture<()>
}) })
} }
@ -243,10 +239,9 @@ where
coerce_box!(RecycleBox::recycle(recycle_box, fut)) coerce_box!(RecycleBox::recycle(recycle_box, fut))
}) })
.await .await?;
.map_err(|_| SendError {})?;
reply_receiver.await.map_err(|_| SendError {}) reply_receiver.await.map_err(|_| SendError)
})) }))
} }
} }
@ -314,13 +309,9 @@ where
coerce_box!(RecycleBox::recycle(recycle_box, fut)) coerce_box!(RecycleBox::recycle(recycle_box, fut))
}) })
.await .await?;
.map_err(|_| SendError {})?;
reply_receiver reply_receiver.await.map_err(|_| SendError).map(&*reply_map)
.await
.map_err(|_| SendError {})
.map(&*reply_map)
})) }))
} }
} }
@ -393,26 +384,10 @@ where
coerce_box!(RecycleBox::recycle(recycle_box, fut)) coerce_box!(RecycleBox::recycle(recycle_box, fut))
}) })
.await .await?;
.map_err(|_| SendError {})?;
reply_receiver reply_receiver.await.map_err(|_| SendError).map(&*reply_map)
.await
.map_err(|_| SendError {})
.map(&*reply_map)
}) as SenderFuture<R> }) as SenderFuture<R>
}) })
} }
} }
/// Error returned when the mailbox was closed or dropped.
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub(super) struct SendError {}
impl fmt::Display for SendError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "sending message into a closed mailbox")
}
}
impl Error for SendError {}

View File

@ -80,8 +80,6 @@ mod mailbox;
mod scheduler; mod scheduler;
mod sim_init; mod sim_init;
use scheduler::SchedulerQueue;
pub(crate) use scheduler::{ pub(crate) use scheduler::{
GlobalScheduler, KeyedOnceAction, KeyedPeriodicAction, OnceAction, PeriodicAction, GlobalScheduler, KeyedOnceAction, KeyedPeriodicAction, OnceAction, PeriodicAction,
}; };
@ -90,7 +88,7 @@ pub use mailbox::{Address, Mailbox};
pub use scheduler::{Action, ActionKey, AutoActionKey, Scheduler, SchedulingError}; pub use scheduler::{Action, ActionKey, AutoActionKey, Scheduler, SchedulingError};
pub use sim_init::SimInit; pub use sim_init::SimInit;
use std::any::Any; use std::any::{Any, TypeId};
use std::cell::Cell; use std::cell::Cell;
use std::error::Error; use std::error::Error;
use std::fmt; use std::fmt;
@ -104,7 +102,9 @@ use std::{panic, task};
use pin_project::pin_project; use pin_project::pin_project;
use recycle_box::{coerce_box, RecycleBox}; use recycle_box::{coerce_box, RecycleBox};
use crate::channel::ChannelObserver; use scheduler::SchedulerQueue;
use crate::channel::{ChannelObserver, SendError};
use crate::executor::{Executor, ExecutorError, Signal}; use crate::executor::{Executor, ExecutorError, Signal};
use crate::model::{BuildContext, Context, Model, ProtoModel}; use crate::model::{BuildContext, Context, Model, ProtoModel};
use crate::ports::{InputFn, ReplierFn}; use crate::ports::{InputFn, ReplierFn};
@ -332,9 +332,11 @@ impl Simulation {
return Err(ExecutionError::Terminated); return Err(ExecutionError::Terminated);
} }
self.executor.run(self.timeout).map_err(|e| match e { self.executor.run(self.timeout).map_err(|e| {
ExecutorError::UnprocessedMessages(msg_count) => {
self.is_terminated = true; self.is_terminated = true;
match e {
ExecutorError::UnprocessedMessages(msg_count) => {
let mut deadlock_info = Vec::new(); let mut deadlock_info = Vec::new();
for (model, observer) in &self.observers { for (model, observer) in &self.observers {
let mailbox_size = observer.len(); let mailbox_size = observer.len();
@ -352,22 +354,24 @@ impl Simulation {
ExecutionError::Deadlock(deadlock_info) ExecutionError::Deadlock(deadlock_info)
} }
} }
ExecutorError::Timeout => { ExecutorError::Timeout => ExecutionError::Timeout,
self.is_terminated = true;
ExecutionError::Timeout
}
ExecutorError::Panic(model_id, payload) => { ExecutorError::Panic(model_id, payload) => {
self.is_terminated = true; let model = model_id
.get()
.map(|id| self.model_names.get(id).unwrap().clone());
// Filter out panics originating from a `SendError`.
if (*payload).type_id() == TypeId::of::<SendError>() {
return ExecutionError::NoRecipient { model };
}
if let Some(model) = model {
return ExecutionError::Panic { model, payload };
}
let model = match model_id.get() {
// The panic was emitted by a model.
Some(id) => self.model_names.get(id).unwrap().clone(),
// The panic is due to an internal issue. // The panic is due to an internal issue.
None => panic::resume_unwind(payload), panic::resume_unwind(payload);
}; }
ExecutionError::Panic { model, payload }
} }
}) })
} }
@ -530,7 +534,7 @@ pub struct DeadlockInfo {
#[derive(Debug)] #[derive(Debug)]
pub enum ExecutionError { pub enum ExecutionError {
/// The simulation has been terminated due to an earlier deadlock, message /// The simulation has been terminated due to an earlier deadlock, message
/// loss, model panic, timeout or synchronization loss. /// loss, missing recipient, model panic, timeout or synchronization loss.
Terminated, Terminated,
/// The simulation has deadlocked due to the enlisted models. /// The simulation has deadlocked due to the enlisted models.
/// ///
@ -545,6 +549,22 @@ pub enum ExecutionError {
/// This is a fatal error: any subsequent attempt to run the simulation will /// This is a fatal error: any subsequent attempt to run the simulation will
/// return an [`ExecutionError::Terminated`] error. /// return an [`ExecutionError::Terminated`] error.
MessageLoss(usize), MessageLoss(usize),
/// The recipient of a message does not exists.
///
/// This indicates that the mailbox of the recipient of a message was not
/// migrated to the simulation and was no longer alive when the message was
/// sent.
///
/// This is a fatal error: any subsequent attempt to run the simulation will
/// return an [`ExecutionError::Terminated`] error.
NoRecipient {
/// The fully qualified name of the model that attempted to send a
/// message, or `None` if the message was sent from the scheduler.
///
/// The fully qualified name is made of the unqualified model name, if
/// relevant prepended by the dot-separated names of all parent models.
model: Option<String>,
},
/// A panic was caught during execution. /// A panic was caught during execution.
/// ///
/// This is a fatal error: any subsequent attempt to run the simulation will /// This is a fatal error: any subsequent attempt to run the simulation will
@ -619,6 +639,15 @@ impl fmt::Display for ExecutionError {
Self::MessageLoss(count) => { Self::MessageLoss(count) => {
write!(f, "{} messages have been lost", count) write!(f, "{} messages have been lost", count)
} }
Self::NoRecipient{model} => {
match model {
Some(model) => write!(f,
"an attempt by model '{}' to send a message failed because the recipient's mailbox is no longer alive",
model
),
None => f.write_str("an attempt by the scheduler to send a message failed because the recipient's mailbox is no longer alive"),
}
}
Self::Panic{model, payload} => { Self::Panic{model, payload} => {
let msg: &str = if let Some(s) = payload.downcast_ref::<&str>() { let msg: &str = if let Some(s) = payload.downcast_ref::<&str>() {
s s

View File

@ -7,3 +7,4 @@ pub(crate) mod seq_futures;
pub(crate) mod slot; pub(crate) mod slot;
pub(crate) mod sync_cell; pub(crate) mod sync_cell;
pub(crate) mod task_set; pub(crate) mod task_set;
pub(crate) mod unwrap_or_throw;

View File

@ -0,0 +1,22 @@
//! C++-style exceptions!
use std::any::Any;
use std::panic;
/// An extension trait that allows sending the error itself as a panic payload
/// when unwrapping fails.
pub(crate) trait UnwrapOrThrow<T> {
fn unwrap_or_throw(self) -> T;
}
impl<T, E> UnwrapOrThrow<T> for Result<T, E>
where
E: 'static + Any + Send,
{
fn unwrap_or_throw(self) -> T {
match self {
Ok(v) => v,
Err(e) => panic::panic_any(e),
}
}
}

View File

@ -6,6 +6,7 @@ mod model_scheduling;
mod simulation_clock_sync; mod simulation_clock_sync;
mod simulation_deadlock; mod simulation_deadlock;
mod simulation_message_loss; mod simulation_message_loss;
mod simulation_no_recipient;
mod simulation_panic; mod simulation_panic;
mod simulation_scheduling; mod simulation_scheduling;
#[cfg(not(miri))] #[cfg(not(miri))]

View File

@ -0,0 +1,169 @@
//! Missing recipient detection.
use std::time::Duration;
use nexosim::model::Model;
use nexosim::ports::{EventSource, Output, QuerySource, Requestor};
use nexosim::simulation::{ExecutionError, Mailbox, SimInit};
use nexosim::time::MonotonicTime;
const MT_NUM_THREADS: usize = 4;
#[derive(Default)]
struct TestModel {
output: Output<()>,
requestor: Requestor<(), ()>,
}
impl TestModel {
async fn activate_output(&mut self) {
self.output.send(()).await;
}
async fn activate_requestor(&mut self) {
let _ = self.requestor.send(()).await;
}
}
impl Model for TestModel {}
/// Send an event from a model to a dead input.
fn no_input_from_model(num_threads: usize) {
const MODEL_NAME: &str = "testmodel";
let mut model = TestModel::default();
let mbox = Mailbox::new();
let addr = mbox.address();
let bad_mbox = Mailbox::new();
model.output.connect(TestModel::activate_output, &bad_mbox);
drop(bad_mbox);
let t0 = MonotonicTime::EPOCH;
let mut simu = SimInit::with_num_threads(num_threads)
.add_model(model, mbox, MODEL_NAME)
.init(t0)
.unwrap()
.0;
match simu.process_event(TestModel::activate_output, (), addr) {
Err(ExecutionError::NoRecipient { model }) => {
assert_eq!(model, Some(String::from(MODEL_NAME)));
}
_ => panic!("missing recipient not detected"),
}
}
/// Send an event from a model to a dead replier.
fn no_replier_from_model(num_threads: usize) {
const MODEL_NAME: &str = "testmodel";
let mut model = TestModel::default();
let mbox = Mailbox::new();
let addr = mbox.address();
let bad_mbox = Mailbox::new();
model
.requestor
.connect(TestModel::activate_requestor, &bad_mbox);
drop(bad_mbox);
let t0 = MonotonicTime::EPOCH;
let mut simu = SimInit::with_num_threads(num_threads)
.add_model(model, mbox, MODEL_NAME)
.init(t0)
.unwrap()
.0;
match simu.process_event(TestModel::activate_requestor, (), addr) {
Err(ExecutionError::NoRecipient { model }) => {
assert_eq!(model, Some(String::from(MODEL_NAME)));
}
_ => panic!("missing recipient not detected"),
}
}
/// Send an event from the scheduler to a dead input.
fn no_input_from_scheduler(num_threads: usize) {
let bad_mbox = Mailbox::new();
let mut src = EventSource::new();
src.connect(TestModel::activate_output, &bad_mbox);
let event = src.event(());
drop(bad_mbox);
let t0 = MonotonicTime::EPOCH;
let (mut simu, scheduler) = SimInit::with_num_threads(num_threads).init(t0).unwrap();
scheduler.schedule(Duration::from_secs(1), event).unwrap();
match simu.step() {
Err(ExecutionError::NoRecipient { model }) => {
assert_eq!(model, None);
}
_ => panic!("missing recipient not detected"),
}
}
/// Send a query from the scheduler to a dead input.
fn no_replier_from_scheduler(num_threads: usize) {
let bad_mbox = Mailbox::new();
let mut src = QuerySource::new();
src.connect(TestModel::activate_requestor, &bad_mbox);
let query = src.query(()).0;
drop(bad_mbox);
let t0 = MonotonicTime::EPOCH;
let (mut simu, scheduler) = SimInit::with_num_threads(num_threads).init(t0).unwrap();
scheduler.schedule(Duration::from_secs(1), query).unwrap();
match simu.step() {
Err(ExecutionError::NoRecipient { model }) => {
assert_eq!(model, None);
}
_ => panic!("missing recipient not detected"),
}
}
#[test]
fn no_input_from_model_st() {
no_input_from_model(1);
}
#[test]
fn no_input_from_model_mt() {
no_input_from_model(MT_NUM_THREADS);
}
#[test]
fn no_replier_from_model_st() {
no_replier_from_model(1);
}
#[test]
fn no_replier_from_model_mt() {
no_replier_from_model(MT_NUM_THREADS);
}
#[test]
fn no_input_from_scheduler_st() {
no_input_from_scheduler(1);
}
#[test]
fn no_input_from_scheduler_mt() {
no_input_from_scheduler(MT_NUM_THREADS);
}
#[test]
fn no_replier_from_scheduler_st() {
no_replier_from_scheduler(1);
}
#[test]
fn no_replier_from_scheduler_mt() {
no_replier_from_scheduler(MT_NUM_THREADS);
}