mirror of
https://github.com/pezkuwichain/revive-differential-tests.git
synced 2026-06-10 20:51:01 +00:00
Compare commits
12 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 9aa26a99d6 | |||
| 02f853699e | |||
| fde303f549 | |||
| e5a751f507 | |||
| d9d62b1038 | |||
| 71ae3b0f9a | |||
| 8cda6a9726 | |||
| a43d94ea7d | |||
| 6960298438 | |||
| 62cf57d39e | |||
| 3fc26eb03b | |||
| 268437b4d9 |
-12
@@ -78,15 +78,3 @@ features = [
|
|||||||
inherits = "release"
|
inherits = "release"
|
||||||
lto = true
|
lto = true
|
||||||
codegen-units = 1
|
codegen-units = 1
|
||||||
|
|
||||||
# We set the `panic` behavior to `unwind` in both the `release` and `dev` profiles since our async
|
|
||||||
# runtime attempts to catch panics and it can only catch panics if we compile our code with unwind
|
|
||||||
# as the panic behavior. For more information, please see the `catch_unwind` documentation where it
|
|
||||||
# mentions that panics can only be caught if they unwind and not if they abort:
|
|
||||||
# https://doc.rust-lang.org/std/panic/fn.catch_unwind.html#notes
|
|
||||||
|
|
||||||
[profile.release]
|
|
||||||
panic = "unwind"
|
|
||||||
|
|
||||||
[profile.dev]
|
|
||||||
panic = "unwind"
|
|
||||||
|
|||||||
@@ -51,35 +51,37 @@ impl BlockingExecutor {
|
|||||||
where
|
where
|
||||||
R: Send + 'static,
|
R: Send + 'static,
|
||||||
{
|
{
|
||||||
// A static of the state associated with the async runtime. This is initialized on the first
|
// Note: The blocking executor is a singleton and therefore we store its state in a static
|
||||||
// access of the state.
|
// so that it's assigned only once. Additionally, when we set the state of the executor we
|
||||||
|
// spawn the thread where the async runtime runs.
|
||||||
static STATE: Lazy<ExecutorState> = Lazy::new(|| {
|
static STATE: Lazy<ExecutorState> = Lazy::new(|| {
|
||||||
tracing::trace!("Initializing the BlockingExecutor state");
|
tracing::trace!("Initializing the BlockingExecutor state");
|
||||||
|
|
||||||
// Creating a multiple-producer-single-consumer channel which allows all of the other
|
// All communication with the tokio runtime thread happens over mspc channels where the
|
||||||
// threads to communicate with this one async runtime thread.
|
// producers here are the threads that want to run async tasks and the consumer here is
|
||||||
|
// the tokio runtime thread.
|
||||||
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<TaskMessage>();
|
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<TaskMessage>();
|
||||||
|
|
||||||
// We spawn a new thread which will house the async runtime and will always be listening
|
|
||||||
// for new tasks coming in and executing them as they come in.
|
|
||||||
thread::spawn(move || {
|
thread::spawn(move || {
|
||||||
// Creating the tokio runtime on this current thread.
|
|
||||||
let runtime = Builder::new_current_thread()
|
let runtime = Builder::new_current_thread()
|
||||||
.enable_all()
|
.enable_all()
|
||||||
.build()
|
.build()
|
||||||
.expect("Failed to create the async runtime");
|
.expect("Failed to create the async runtime");
|
||||||
|
|
||||||
runtime.block_on(async move {
|
runtime.block_on(async move {
|
||||||
// Keep getting new task messages from all of the other threads.
|
|
||||||
while let Some(TaskMessage {
|
while let Some(TaskMessage {
|
||||||
future: task,
|
future: task,
|
||||||
response_tx: response_channel,
|
response_tx: response_channel,
|
||||||
}) = rx.recv().await
|
}) = rx.recv().await
|
||||||
{
|
{
|
||||||
// Spawn off each job so that the receive loop is not blocked.
|
|
||||||
tracing::trace!("Received a new future to execute");
|
tracing::trace!("Received a new future to execute");
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
|
// One of the things that the blocking executor does is that it allows
|
||||||
|
// us to catch panics if they occur. By wrapping the given future in an
|
||||||
|
// AssertUnwindSafe::catch_unwind we are able to catch all panic unwinds
|
||||||
|
// in the given future and convert them into errors.
|
||||||
let task = AssertUnwindSafe(task).catch_unwind();
|
let task = AssertUnwindSafe(task).catch_unwind();
|
||||||
|
|
||||||
let result = task.await;
|
let result = task.await;
|
||||||
let _ = response_channel.send(result);
|
let _ = response_channel.send(result);
|
||||||
});
|
});
|
||||||
@@ -87,31 +89,32 @@ impl BlockingExecutor {
|
|||||||
})
|
})
|
||||||
});
|
});
|
||||||
|
|
||||||
// Creating the state of the async runtime.
|
|
||||||
ExecutorState { tx }
|
ExecutorState { tx }
|
||||||
});
|
});
|
||||||
|
|
||||||
// Creating a one-shot channel for this task that will be used to send and receive the
|
// We need to perform blocking synchronous communication between the current thread and the
|
||||||
// response of the task.
|
// tokio runtime thread with the result of the async computation and the oneshot channels
|
||||||
|
// from tokio allows us to do that. The sender side of the channel will be given to the
|
||||||
|
// tokio runtime thread to send the result when the computation is completed and the receive
|
||||||
|
// side of the channel will be kept with this thread to await for the response of the async
|
||||||
|
// task to come back.
|
||||||
let (response_tx, response_rx) =
|
let (response_tx, response_rx) =
|
||||||
oneshot::channel::<Result<Box<dyn Any + Send>, Box<dyn Any + Send>>>();
|
oneshot::channel::<Result<Box<dyn Any + Send>, Box<dyn Any + Send>>>();
|
||||||
|
|
||||||
// Converting the future from the shape that it is in into the shape that the runtime is
|
// The tokio runtime thread expects a Future<Output = Box<dyn Any + Send>> + Send to be
|
||||||
// expecting it to be in.
|
// sent to it to execute. However, this function has a typed Future<Output = R> + Send and
|
||||||
|
// therefore we need to change the type of the future to fit what the runtime thread expects
|
||||||
|
// in the task message. In doing this conversion, we lose some of the type information since
|
||||||
|
// we're converting R => dyn Any. However, we will perform down-casting on the result to
|
||||||
|
// convert it back into R.
|
||||||
let future = Box::pin(async move { Box::new(future.await) as Box<dyn Any + Send> });
|
let future = Box::pin(async move { Box::new(future.await) as Box<dyn Any + Send> });
|
||||||
|
|
||||||
// Sending the task to the runtime,
|
let task = TaskMessage::new(future, response_tx);
|
||||||
let task = TaskMessage {
|
|
||||||
future,
|
|
||||||
response_tx,
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Err(error) = STATE.tx.send(task) {
|
if let Err(error) = STATE.tx.send(task) {
|
||||||
tracing::error!(?error, "Failed to send the task to the blocking executor");
|
tracing::error!(?error, "Failed to send the task to the blocking executor");
|
||||||
anyhow::bail!("Failed to send the task to the blocking executor: {error:?}")
|
anyhow::bail!("Failed to send the task to the blocking executor: {error:?}")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Await for the result of the execution to come back over the channel.
|
|
||||||
let result = match response_rx.blocking_recv() {
|
let result = match response_rx.blocking_recv() {
|
||||||
Ok(result) => result,
|
Ok(result) => result,
|
||||||
Err(error) => {
|
Err(error) => {
|
||||||
@@ -141,6 +144,7 @@ impl BlockingExecutor {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Represents the state of the async runtime. This runtime is designed to be a singleton runtime
|
/// Represents the state of the async runtime. This runtime is designed to be a singleton runtime
|
||||||
/// which means that in the current running program there's just a single thread that has an async
|
/// which means that in the current running program there's just a single thread that has an async
|
||||||
/// runtime.
|
/// runtime.
|
||||||
@@ -163,6 +167,18 @@ struct TaskMessage {
|
|||||||
response_tx: oneshot::Sender<Result<Box<dyn Any + Send>, Box<dyn Any + Send>>>,
|
response_tx: oneshot::Sender<Result<Box<dyn Any + Send>, Box<dyn Any + Send>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl TaskMessage {
|
||||||
|
pub fn new(
|
||||||
|
future: Pin<Box<dyn Future<Output = Box<dyn Any + Send>> + Send>>,
|
||||||
|
response_tx: oneshot::Sender<Result<Box<dyn Any + Send>, Box<dyn Any + Send>>>,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
future,
|
||||||
|
response_tx,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test {
|
mod test {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|||||||
Reference in New Issue
Block a user