Merge remote-tracking branch 'origin/main' into feature/better-input-parser

This commit is contained in:
Omar Abdulla
2025-07-16 15:31:33 +03:00
7 changed files with 221 additions and 55 deletions
@@ -51,35 +51,37 @@ impl BlockingExecutor {
where
R: Send + 'static,
{
// A static of the state associated with the async runtime. This is initialized on the first
// access of the state.
// Note: The blocking executor is a singleton and therefore we store its state in a static
// so that it's assigned only once. Additionally, when we set the state of the executor we
// spawn the thread where the async runtime runs.
static STATE: Lazy<ExecutorState> = Lazy::new(|| {
tracing::trace!("Initializing the BlockingExecutor state");
// Creating a multiple-producer-single-consumer channel which allows all of the other
// threads to communicate with this one async runtime thread.
// All communication with the tokio runtime thread happens over mspc channels where the
// producers here are the threads that want to run async tasks and the consumer here is
// the tokio runtime thread.
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<TaskMessage>();
// We spawn a new thread which will house the async runtime and will always be listening
// for new tasks coming in and executing them as they come in.
thread::spawn(move || {
// Creating the tokio runtime on this current thread.
let runtime = Builder::new_current_thread()
.enable_all()
.build()
.expect("Failed to create the async runtime");
runtime.block_on(async move {
// Keep getting new task messages from all of the other threads.
while let Some(TaskMessage {
future: task,
response_tx: response_channel,
}) = rx.recv().await
{
// Spawn off each job so that the receive loop is not blocked.
tracing::trace!("Received a new future to execute");
tokio::spawn(async move {
// One of the things that the blocking executor does is that it allows
// us to catch panics if they occur. By wrapping the given future in an
// AssertUnwindSafe::catch_unwind we are able to catch all panic unwinds
// in the given future and convert them into errors.
let task = AssertUnwindSafe(task).catch_unwind();
let result = task.await;
let _ = response_channel.send(result);
});
@@ -87,31 +89,32 @@ impl BlockingExecutor {
})
});
// Creating the state of the async runtime.
ExecutorState { tx }
});
// Creating a one-shot channel for this task that will be used to send and receive the
// response of the task.
// We need to perform blocking synchronous communication between the current thread and the
// tokio runtime thread with the result of the async computation and the oneshot channels
// from tokio allows us to do that. The sender side of the channel will be given to the
// tokio runtime thread to send the result when the computation is completed and the receive
// side of the channel will be kept with this thread to await for the response of the async
// task to come back.
let (response_tx, response_rx) =
oneshot::channel::<Result<Box<dyn Any + Send>, Box<dyn Any + Send>>>();
// Converting the future from the shape that it is in into the shape that the runtime is
// expecting it to be in.
// The tokio runtime thread expects a Future<Output = Box<dyn Any + Send>> + Send to be
// sent to it to execute. However, this function has a typed Future<Output = R> + Send and
// therefore we need to change the type of the future to fit what the runtime thread expects
// in the task message. In doing this conversion, we lose some of the type information since
// we're converting R => dyn Any. However, we will perform down-casting on the result to
// convert it back into R.
let future = Box::pin(async move { Box::new(future.await) as Box<dyn Any + Send> });
// Sending the task to the runtime,
let task = TaskMessage {
future,
response_tx,
};
let task = TaskMessage::new(future, response_tx);
if let Err(error) = STATE.tx.send(task) {
tracing::error!(?error, "Failed to send the task to the blocking executor");
anyhow::bail!("Failed to send the task to the blocking executor: {error:?}")
}
// Await for the result of the execution to come back over the channel.
let result = match response_rx.blocking_recv() {
Ok(result) => result,
Err(error) => {
@@ -163,6 +166,18 @@ struct TaskMessage {
response_tx: oneshot::Sender<Result<Box<dyn Any + Send>, Box<dyn Any + Send>>>,
}
impl TaskMessage {
pub fn new(
future: Pin<Box<dyn Future<Output = Box<dyn Any + Send>> + Send>>,
response_tx: oneshot::Sender<Result<Box<dyn Any + Send>, Box<dyn Any + Send>>>,
) -> Self {
Self {
future,
response_tx,
}
}
}
#[cfg(test)]
mod test {
use super::*;