diff --git a/Cargo.toml b/Cargo.toml index a61ca4f..2216f6a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -87,6 +87,3 @@ codegen-units = 1 [profile.release] panic = "unwind" - -[profile.dev] -panic = "unwind" diff --git a/crates/node-interaction/src/blocking_executor.rs b/crates/node-interaction/src/blocking_executor.rs index b354c45..043dd19 100644 --- a/crates/node-interaction/src/blocking_executor.rs +++ b/crates/node-interaction/src/blocking_executor.rs @@ -51,9 +51,15 @@ impl BlockingExecutor { where R: Send + 'static, { + // Note: The blocking executor is a singleton and therefore we store its state in a static + // so that it's assigned only once. Additionally, when we set the state of the executor we + // spawn the thread where the async runtime runs. static STATE: Lazy = Lazy::new(|| { tracing::trace!("Initializing the BlockingExecutor state"); + // All communication with the tokio runtime thread happens over mspc channels where the + // producers here are the threads that want to run async tasks and the consumer here is + // the tokio runtime thread. let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::(); thread::spawn(move || { @@ -70,7 +76,12 @@ impl BlockingExecutor { { tracing::trace!("Received a new future to execute"); tokio::spawn(async move { + // One of the things that the blocking executor does is that it allows + // us to catch panics if they occur. By wrapping the given future in an + // AssertUnwindSafe::catch_unwind we are able to catch all panic unwinds + // in the given future and convert them into errors. let task = AssertUnwindSafe(task).catch_unwind(); + let result = task.await; let _ = response_channel.send(result); }); @@ -81,20 +92,24 @@ impl BlockingExecutor { ExecutorState { tx } }); - // Creating a one-shot channel for this task that will be used to send and receive the - // response of the task. + // We need to perform blocking synchronous communication between the current thread and the + // tokio runtime thread with the result of the async computation and the oneshot channels + // from tokio allows us to do that. The sender side of the channel will be given to the + // tokio runtime thread to send the result when the computation is completed and the receive + // side of the channel will be kept with this thread to await for the response of the async + // task to come back. let (response_tx, response_rx) = oneshot::channel::, Box>>(); - // Converting the future from the shape that it is in into the shape that the runtime is - // expecting it to be in. + // The tokio runtime thread expects a Future> + Send to be + // sent to it to execute. However, this function has a typed Future + Send and + // therefore we need to change the type of the future to fit what the runtime thread expects + // in the task message. In doing this conversion, we lose some of the type information since + // we're converting R => dyn Any. However, we will perform down-casting on the result to + // convert it back into R. let future = Box::pin(async move { Box::new(future.await) as Box }); - let task = TaskMessage { - future, - response_tx, - }; - + let task = TaskMessage::new(future, response_tx); if let Err(error) = STATE.tx.send(task) { tracing::error!(?error, "Failed to send the task to the blocking executor"); anyhow::bail!("Failed to send the task to the blocking executor: {error:?}") @@ -152,6 +167,18 @@ struct TaskMessage { response_tx: oneshot::Sender, Box>>, } +impl TaskMessage { + pub fn new( + future: Pin> + Send>>, + response_tx: oneshot::Sender, Box>>, + ) -> Self { + Self { + future, + response_tx, + } + } +} + #[cfg(test)] mod test { use super::*;