mirror of
https://github.com/pezkuwichain/revive-differential-tests.git
synced 2026-04-22 13:47:55 +00:00
Compare commits
12 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 9aa26a99d6 | |||
| 02f853699e | |||
| fde303f549 | |||
| e5a751f507 | |||
| d9d62b1038 | |||
| 71ae3b0f9a | |||
| 8cda6a9726 | |||
| a43d94ea7d | |||
| 6960298438 | |||
| 62cf57d39e | |||
| 3fc26eb03b | |||
| 268437b4d9 |
-12
@@ -78,15 +78,3 @@ features = [
|
||||
inherits = "release"
|
||||
lto = true
|
||||
codegen-units = 1
|
||||
|
||||
# We set the `panic` behavior to `unwind` in both the `release` and `dev` profiles since our async
|
||||
# runtime attempts to catch panics and it can only catch panics if we compile our code with unwind
|
||||
# as the panic behavior. For more information, please see the `catch_unwind` documentation where it
|
||||
# mentions that panics can only be caught if they unwind and not if they abort:
|
||||
# https://doc.rust-lang.org/std/panic/fn.catch_unwind.html#notes
|
||||
|
||||
[profile.release]
|
||||
panic = "unwind"
|
||||
|
||||
[profile.dev]
|
||||
panic = "unwind"
|
||||
|
||||
@@ -51,35 +51,37 @@ impl BlockingExecutor {
|
||||
where
|
||||
R: Send + 'static,
|
||||
{
|
||||
// A static of the state associated with the async runtime. This is initialized on the first
|
||||
// access of the state.
|
||||
// Note: The blocking executor is a singleton and therefore we store its state in a static
|
||||
// so that it's assigned only once. Additionally, when we set the state of the executor we
|
||||
// spawn the thread where the async runtime runs.
|
||||
static STATE: Lazy<ExecutorState> = Lazy::new(|| {
|
||||
tracing::trace!("Initializing the BlockingExecutor state");
|
||||
|
||||
// Creating a multiple-producer-single-consumer channel which allows all of the other
|
||||
// threads to communicate with this one async runtime thread.
|
||||
// All communication with the tokio runtime thread happens over mspc channels where the
|
||||
// producers here are the threads that want to run async tasks and the consumer here is
|
||||
// the tokio runtime thread.
|
||||
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<TaskMessage>();
|
||||
|
||||
// We spawn a new thread which will house the async runtime and will always be listening
|
||||
// for new tasks coming in and executing them as they come in.
|
||||
thread::spawn(move || {
|
||||
// Creating the tokio runtime on this current thread.
|
||||
let runtime = Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("Failed to create the async runtime");
|
||||
|
||||
runtime.block_on(async move {
|
||||
// Keep getting new task messages from all of the other threads.
|
||||
while let Some(TaskMessage {
|
||||
future: task,
|
||||
response_tx: response_channel,
|
||||
}) = rx.recv().await
|
||||
{
|
||||
// Spawn off each job so that the receive loop is not blocked.
|
||||
tracing::trace!("Received a new future to execute");
|
||||
tokio::spawn(async move {
|
||||
// One of the things that the blocking executor does is that it allows
|
||||
// us to catch panics if they occur. By wrapping the given future in an
|
||||
// AssertUnwindSafe::catch_unwind we are able to catch all panic unwinds
|
||||
// in the given future and convert them into errors.
|
||||
let task = AssertUnwindSafe(task).catch_unwind();
|
||||
|
||||
let result = task.await;
|
||||
let _ = response_channel.send(result);
|
||||
});
|
||||
@@ -87,31 +89,32 @@ impl BlockingExecutor {
|
||||
})
|
||||
});
|
||||
|
||||
// Creating the state of the async runtime.
|
||||
ExecutorState { tx }
|
||||
});
|
||||
|
||||
// Creating a one-shot channel for this task that will be used to send and receive the
|
||||
// response of the task.
|
||||
// We need to perform blocking synchronous communication between the current thread and the
|
||||
// tokio runtime thread with the result of the async computation and the oneshot channels
|
||||
// from tokio allows us to do that. The sender side of the channel will be given to the
|
||||
// tokio runtime thread to send the result when the computation is completed and the receive
|
||||
// side of the channel will be kept with this thread to await for the response of the async
|
||||
// task to come back.
|
||||
let (response_tx, response_rx) =
|
||||
oneshot::channel::<Result<Box<dyn Any + Send>, Box<dyn Any + Send>>>();
|
||||
|
||||
// Converting the future from the shape that it is in into the shape that the runtime is
|
||||
// expecting it to be in.
|
||||
// The tokio runtime thread expects a Future<Output = Box<dyn Any + Send>> + Send to be
|
||||
// sent to it to execute. However, this function has a typed Future<Output = R> + Send and
|
||||
// therefore we need to change the type of the future to fit what the runtime thread expects
|
||||
// in the task message. In doing this conversion, we lose some of the type information since
|
||||
// we're converting R => dyn Any. However, we will perform down-casting on the result to
|
||||
// convert it back into R.
|
||||
let future = Box::pin(async move { Box::new(future.await) as Box<dyn Any + Send> });
|
||||
|
||||
// Sending the task to the runtime,
|
||||
let task = TaskMessage {
|
||||
future,
|
||||
response_tx,
|
||||
};
|
||||
|
||||
let task = TaskMessage::new(future, response_tx);
|
||||
if let Err(error) = STATE.tx.send(task) {
|
||||
tracing::error!(?error, "Failed to send the task to the blocking executor");
|
||||
anyhow::bail!("Failed to send the task to the blocking executor: {error:?}")
|
||||
}
|
||||
|
||||
// Await for the result of the execution to come back over the channel.
|
||||
let result = match response_rx.blocking_recv() {
|
||||
Ok(result) => result,
|
||||
Err(error) => {
|
||||
@@ -141,6 +144,7 @@ impl BlockingExecutor {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Represents the state of the async runtime. This runtime is designed to be a singleton runtime
|
||||
/// which means that in the current running program there's just a single thread that has an async
|
||||
/// runtime.
|
||||
@@ -163,6 +167,18 @@ struct TaskMessage {
|
||||
response_tx: oneshot::Sender<Result<Box<dyn Any + Send>, Box<dyn Any + Send>>>,
|
||||
}
|
||||
|
||||
impl TaskMessage {
|
||||
pub fn new(
|
||||
future: Pin<Box<dyn Future<Output = Box<dyn Any + Send>> + Send>>,
|
||||
response_tx: oneshot::Sender<Result<Box<dyn Any + Send>, Box<dyn Any + Send>>>,
|
||||
) -> Self {
|
||||
Self {
|
||||
future,
|
||||
response_tx,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
|
||||
Reference in New Issue
Block a user