Compare commits

...

12 Commits

Author SHA1 Message Date
Omar Abdulla 9aa26a99d6 Update the release profile 2025-07-15 14:06:53 +03:00
Omar Abdulla 02f853699e Improve the comments 2025-07-15 13:43:33 +03:00
Omar fde303f549 Update crates/node-interaction/src/blocking_executor.rs
Co-authored-by: xermicus <cyrill@parity.io>
2025-07-15 13:23:56 +03:00
Omar e5a751f507 Update crates/node-interaction/src/blocking_executor.rs
Co-authored-by: xermicus <cyrill@parity.io>
2025-07-15 13:23:50 +03:00
Omar d9d62b1038 Update crates/node-interaction/src/blocking_executor.rs
Co-authored-by: xermicus <cyrill@parity.io>
2025-07-15 13:23:15 +03:00
Omar 71ae3b0f9a Update crates/node-interaction/src/blocking_executor.rs
Co-authored-by: xermicus <cyrill@parity.io>
2025-07-15 13:23:03 +03:00
Omar 8cda6a9726 Update crates/node-interaction/src/blocking_executor.rs
Co-authored-by: xermicus <cyrill@parity.io>
2025-07-15 13:22:54 +03:00
Omar a43d94ea7d Update crates/node-interaction/src/blocking_executor.rs
Co-authored-by: xermicus <cyrill@parity.io>
2025-07-15 13:22:45 +03:00
Omar 6960298438 Update crates/node-interaction/src/blocking_executor.rs
Co-authored-by: xermicus <cyrill@parity.io>
2025-07-15 13:22:36 +03:00
Omar 62cf57d39e Update crates/node-interaction/src/blocking_executor.rs
Co-authored-by: xermicus <cyrill@parity.io>
2025-07-15 13:22:23 +03:00
Omar 3fc26eb03b Update crates/node-interaction/src/blocking_executor.rs
Co-authored-by: xermicus <cyrill@parity.io>
2025-07-15 13:22:15 +03:00
Omar 268437b4d9 Update crates/node-interaction/src/blocking_executor.rs
Co-authored-by: xermicus <cyrill@parity.io>
2025-07-15 13:22:03 +03:00
2 changed files with 37 additions and 33 deletions
-12
View File
@@ -78,15 +78,3 @@ features = [
inherits = "release"
lto = true
codegen-units = 1
# We set the `panic` behavior to `unwind` in both the `release` and `dev` profiles since our async
# runtime attempts to catch panics and it can only catch panics if we compile our code with unwind
# as the panic behavior. For more information, please see the `catch_unwind` documentation where it
# mentions that panics can only be caught if they unwind and not if they abort:
# https://doc.rust-lang.org/std/panic/fn.catch_unwind.html#notes
[profile.release]
panic = "unwind"
[profile.dev]
panic = "unwind"
@@ -51,35 +51,37 @@ impl BlockingExecutor {
where
R: Send + 'static,
{
// A static of the state associated with the async runtime. This is initialized on the first
// access of the state.
// Note: The blocking executor is a singleton and therefore we store its state in a static
// so that it's assigned only once. Additionally, when we set the state of the executor we
// spawn the thread where the async runtime runs.
static STATE: Lazy<ExecutorState> = Lazy::new(|| {
tracing::trace!("Initializing the BlockingExecutor state");
// Creating a multiple-producer-single-consumer channel which allows all of the other
// threads to communicate with this one async runtime thread.
// All communication with the tokio runtime thread happens over mspc channels where the
// producers here are the threads that want to run async tasks and the consumer here is
// the tokio runtime thread.
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<TaskMessage>();
// We spawn a new thread which will house the async runtime and will always be listening
// for new tasks coming in and executing them as they come in.
thread::spawn(move || {
// Creating the tokio runtime on this current thread.
let runtime = Builder::new_current_thread()
.enable_all()
.build()
.expect("Failed to create the async runtime");
runtime.block_on(async move {
// Keep getting new task messages from all of the other threads.
while let Some(TaskMessage {
future: task,
response_tx: response_channel,
}) = rx.recv().await
{
// Spawn off each job so that the receive loop is not blocked.
tracing::trace!("Received a new future to execute");
tokio::spawn(async move {
// One of the things that the blocking executor does is that it allows
// us to catch panics if they occur. By wrapping the given future in an
// AssertUnwindSafe::catch_unwind we are able to catch all panic unwinds
// in the given future and convert them into errors.
let task = AssertUnwindSafe(task).catch_unwind();
let result = task.await;
let _ = response_channel.send(result);
});
@@ -87,31 +89,32 @@ impl BlockingExecutor {
})
});
// Creating the state of the async runtime.
ExecutorState { tx }
});
// Creating a one-shot channel for this task that will be used to send and receive the
// response of the task.
// We need to perform blocking synchronous communication between the current thread and the
// tokio runtime thread with the result of the async computation and the oneshot channels
// from tokio allows us to do that. The sender side of the channel will be given to the
// tokio runtime thread to send the result when the computation is completed and the receive
// side of the channel will be kept with this thread to await for the response of the async
// task to come back.
let (response_tx, response_rx) =
oneshot::channel::<Result<Box<dyn Any + Send>, Box<dyn Any + Send>>>();
// Converting the future from the shape that it is in into the shape that the runtime is
// expecting it to be in.
// The tokio runtime thread expects a Future<Output = Box<dyn Any + Send>> + Send to be
// sent to it to execute. However, this function has a typed Future<Output = R> + Send and
// therefore we need to change the type of the future to fit what the runtime thread expects
// in the task message. In doing this conversion, we lose some of the type information since
// we're converting R => dyn Any. However, we will perform down-casting on the result to
// convert it back into R.
let future = Box::pin(async move { Box::new(future.await) as Box<dyn Any + Send> });
// Sending the task to the runtime,
let task = TaskMessage {
future,
response_tx,
};
let task = TaskMessage::new(future, response_tx);
if let Err(error) = STATE.tx.send(task) {
tracing::error!(?error, "Failed to send the task to the blocking executor");
anyhow::bail!("Failed to send the task to the blocking executor: {error:?}")
}
// Await for the result of the execution to come back over the channel.
let result = match response_rx.blocking_recv() {
Ok(result) => result,
Err(error) => {
@@ -141,6 +144,7 @@ impl BlockingExecutor {
}
}
}
/// Represents the state of the async runtime. This runtime is designed to be a singleton runtime
/// which means that in the current running program there's just a single thread that has an async
/// runtime.
@@ -163,6 +167,18 @@ struct TaskMessage {
response_tx: oneshot::Sender<Result<Box<dyn Any + Send>, Box<dyn Any + Send>>>,
}
impl TaskMessage {
pub fn new(
future: Pin<Box<dyn Future<Output = Box<dyn Any + Send>> + Send>>,
response_tx: oneshot::Sender<Result<Box<dyn Any + Send>, Box<dyn Any + Send>>>,
) -> Self {
Self {
future,
response_tx,
}
}
}
#[cfg(test)]
mod test {
use super::*;