Add a common crate (#75)

* Add a barebones common crate

* Refactor some code into the common crate

* Add a `ResolverApi` interface.

This commit adds a `ResolverApi` trait to the `format` crate that can be
implemented by any type that can act as a resolver. A resolver is able
to provide information on the chain state. This chain state could be
fresh or it could be cached (which is something that we will do in a
future PR).

This cleans up our crate graph so that `format` is not depending on the
node interactions crate for the `EthereumNode` trait.

* Cleanup the blocking executor
This commit is contained in:
Omar
2025-07-24 15:42:45 +03:00
committed by GitHub
parent b03ad3027e
commit 90fb89adc0
24 changed files with 169 additions and 141 deletions
@@ -0,0 +1,225 @@
//! The alloy crate __requires__ a tokio runtime.
//! We contain any async rust right here.
use std::{any::Any, panic::AssertUnwindSafe, pin::Pin, thread};
use futures::FutureExt;
use once_cell::sync::Lazy;
use tokio::{
runtime::Builder,
sync::{mpsc::UnboundedSender, oneshot},
};
use tracing::Instrument;
/// A blocking async executor.
///
/// This struct exposes the abstraction of a blocking async executor. It is a global and static
/// executor which means that it doesn't require for new instances of it to be created, it's a
/// singleton and can be accessed by any thread that wants to perform some async computation on the
/// blocking executor thread.
///
/// The API of the blocking executor is created in a way so that it's very natural, simple to use,
/// and unbounded to specific tasks or return types. The following is an example of using this
/// executor to drive an async computation:
///
/// ```rust
/// use revive_dt_common::concepts::*;
///
/// fn blocking_function() {
/// let result = BlockingExecutor::execute(async move {
/// tokio::time::sleep(std::time::Duration::from_secs(1)).await;
/// 0xFFu8
/// })
/// .expect("Computation failed");
///
/// assert_eq!(result, 0xFF);
/// }
/// ```
///
/// Users get to pass in their async tasks without needing to worry about putting them in a [`Box`],
/// [`Pin`], needing to perform down-casting, or the internal channel mechanism used by the runtime.
/// To the user, it just looks like a function that converts some async code into sync code.
///
/// This struct also handled panics that occur in the passed futures and converts them into errors
/// that can be handled by the user. This is done to allow the executor to be robust.
///
/// Internally, the executor communicates with the tokio runtime thread through channels which carry
/// the [`TaskMessage`] and the results of the execution.
pub struct BlockingExecutor;
impl BlockingExecutor {
pub fn execute<R>(future: impl Future<Output = R> + Send + 'static) -> Result<R, anyhow::Error>
where
R: Send + 'static,
{
// Note: The blocking executor is a singleton and therefore we store its state in a static
// so that it's assigned only once. Additionally, when we set the state of the executor we
// spawn the thread where the async runtime runs.
static STATE: Lazy<ExecutorState> = Lazy::new(|| {
tracing::trace!("Initializing the BlockingExecutor state");
// All communication with the tokio runtime thread happens over mspc channels where the
// producers here are the threads that want to run async tasks and the consumer here is
// the tokio runtime thread.
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<TaskMessage>();
thread::spawn(move || {
tracing::info!(
thread_id = ?std::thread::current().id(),
"Starting async runtime thread"
);
let runtime = Builder::new_current_thread()
.enable_all()
.build()
.expect("Failed to create the async runtime");
runtime.block_on(async move {
while let Some(TaskMessage {
future: task,
response_tx: response_channel,
}) = rx.recv().await
{
tracing::trace!("Received a new future to execute");
tokio::spawn(async move {
// One of the things that the blocking executor does is that it allows
// us to catch panics if they occur. By wrapping the given future in an
// AssertUnwindSafe::catch_unwind we are able to catch all panic unwinds
// in the given future and convert them into errors.
let task = AssertUnwindSafe(task).catch_unwind();
let result = task.await;
let _ = response_channel.send(result);
});
}
})
});
ExecutorState { tx }
});
// We need to perform blocking synchronous communication between the current thread and the
// tokio runtime thread with the result of the async computation and the oneshot channels
// from tokio allows us to do that. The sender side of the channel will be given to the
// tokio runtime thread to send the result when the computation is completed and the receive
// side of the channel will be kept with this thread to await for the response of the async
// task to come back.
let (response_tx, response_rx) =
oneshot::channel::<Result<Box<dyn Any + Send>, Box<dyn Any + Send>>>();
// The tokio runtime thread expects a Future<Output = Box<dyn Any + Send>> + Send to be
// sent to it to execute. However, this function has a typed Future<Output = R> + Send and
// therefore we need to change the type of the future to fit what the runtime thread expects
// in the task message. In doing this conversion, we lose some of the type information since
// we're converting R => dyn Any. However, we will perform down-casting on the result to
// convert it back into R.
let future = Box::pin(
async move { Box::new(future.await) as Box<dyn Any + Send> }.in_current_span(),
);
let task = TaskMessage::new(future, response_tx);
if let Err(error) = STATE.tx.send(task) {
tracing::error!(?error, "Failed to send the task to the blocking executor");
anyhow::bail!("Failed to send the task to the blocking executor: {error:?}")
}
let result = match response_rx.blocking_recv() {
Ok(result) => result,
Err(error) => {
tracing::error!(
?error,
"Failed to get the response from the blocking executor"
);
anyhow::bail!("Failed to get the response from the blocking executor: {error:?}")
}
};
let result = match result {
Ok(result) => result,
Err(error) => {
tracing::error!(?error, "An error occurred when running the async task");
anyhow::bail!("An error occurred when running the async task: {error:?}")
}
};
Ok(*result
.downcast::<R>()
.expect("An error occurred when downcasting into R. This is a bug"))
}
}
/// Represents the state of the async runtime. This runtime is designed to be a singleton runtime
/// which means that in the current running program there's just a single thread that has an async
/// runtime.
struct ExecutorState {
/// The sending side of the task messages channel. This is used by all of the other threads to
/// communicate with the async runtime thread.
tx: UnboundedSender<TaskMessage>,
}
/// Represents a message that contains an asynchronous task that's to be executed by the runtime
/// as well as a way for the runtime to report back on the result of the execution.
struct TaskMessage {
/// The task that's being requested to run. This is a future that returns an object that does
/// implement [`Any`] and [`Send`] to allow it to be sent between the requesting thread and the
/// async thread.
future: Pin<Box<dyn Future<Output = Box<dyn Any + Send>> + Send>>,
/// A one shot sender channel where the sender of the task is expecting to hear back on the
/// result of the task.
response_tx: oneshot::Sender<Result<Box<dyn Any + Send>, Box<dyn Any + Send>>>,
}
impl TaskMessage {
pub fn new(
future: Pin<Box<dyn Future<Output = Box<dyn Any + Send>> + Send>>,
response_tx: oneshot::Sender<Result<Box<dyn Any + Send>, Box<dyn Any + Send>>>,
) -> Self {
Self {
future,
response_tx,
}
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn simple_future_works() {
// Act
let result = BlockingExecutor::execute(async move {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
0xFFu8
})
.unwrap();
// Assert
assert_eq!(result, 0xFFu8);
}
#[test]
#[allow(unreachable_code, clippy::unreachable)]
fn panics_in_futures_are_caught() {
// Act
let result = BlockingExecutor::execute(async move {
panic!(
"If this panic causes, well, a panic, then this is an issue. If it's caught then all good!"
);
0xFFu8
});
// Assert
assert!(result.is_err());
// Act
let result = BlockingExecutor::execute(async move {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
0xFFu8
})
.unwrap();
// Assert
assert_eq!(result, 0xFFu8)
}
}
+3
View File
@@ -0,0 +1,3 @@
mod blocking_executor;
pub use blocking_executor::*;
@@ -0,0 +1,73 @@
use std::{borrow::Cow, collections::HashSet, path::PathBuf};
/// An iterator that finds files of a certain extension in the provided directory. You can think of
/// this a glob pattern similar to: `${path}/**/*.md`
pub struct FilesWithExtensionIterator {
/// The set of allowed extensions that that match the requirement and that should be returned
/// when found.
allowed_extensions: HashSet<Cow<'static, str>>,
/// The set of directories to visit next. This iterator does BFS and so these directories will
/// only be visited if we can't find any files in our state.
directories_to_search: Vec<PathBuf>,
/// The set of files matching the allowed extensions that were found. If there are entries in
/// this vector then they will be returned when the [`Iterator::next`] method is called. If not
/// then we visit one of the next directories to visit.
files_matching_allowed_extensions: Vec<PathBuf>,
}
impl FilesWithExtensionIterator {
pub fn new(root_directory: PathBuf) -> Self {
Self {
allowed_extensions: Default::default(),
directories_to_search: vec![root_directory],
files_matching_allowed_extensions: Default::default(),
}
}
pub fn with_allowed_extension(
mut self,
allowed_extension: impl Into<Cow<'static, str>>,
) -> Self {
self.allowed_extensions.insert(allowed_extension.into());
self
}
}
impl Iterator for FilesWithExtensionIterator {
type Item = PathBuf;
fn next(&mut self) -> Option<Self::Item> {
if let Some(file_path) = self.files_matching_allowed_extensions.pop() {
return Some(file_path);
};
let directory_to_search = self.directories_to_search.pop()?;
// Read all of the entries in the directory. If we failed to read this dir's entires then we
// elect to just ignore it and look in the next directory, we do that by calling the next
// method again on the iterator, which is an intentional decision that we made here instead
// of panicking.
let Ok(dir_entries) = std::fs::read_dir(directory_to_search) else {
return self.next();
};
for entry in dir_entries.flatten() {
let entry_path = entry.path();
if entry_path.is_dir() {
self.directories_to_search.push(entry_path)
} else if entry_path.is_file()
&& entry_path.extension().is_some_and(|ext| {
self.allowed_extensions
.iter()
.any(|allowed| ext.eq_ignore_ascii_case(allowed.as_ref()))
})
{
self.files_matching_allowed_extensions.push(entry_path)
}
}
self.next()
}
}
+3
View File
@@ -0,0 +1,3 @@
mod files_with_extension_iterator;
pub use files_with_extension_iterator::*;
+6
View File
@@ -0,0 +1,6 @@
//! This crate provides common concepts, functionality, types, macros, and more that other crates in
//! the workspace can benefit from.
pub mod concepts;
pub mod iterators;
pub mod macros;
@@ -0,0 +1,110 @@
/// Defines wrappers around types.
///
/// For example, the macro invocation seen below:
///
/// ```rust,ignore
/// define_wrapper_type!(CaseId => usize);
/// ```
///
/// Would define a wrapper type that looks like the following:
///
/// ```rust,ignore
/// pub struct CaseId(usize);
/// ```
///
/// And would also implement a number of methods on this type making it easier to use.
///
/// These wrapper types become very useful as they make the code a lot easier to read.
///
/// Take the following as an example:
///
/// ```rust,ignore
/// struct State {
/// contracts: HashMap<usize, HashMap<String, Vec<u8>>>
/// }
/// ```
///
/// In the above code it's hard to understand what the various types refer to or what to expect them
/// to contain.
///
/// With these wrapper types we're able to create code that's self-documenting in that the types
/// tell us what the code is referring to. The above code is transformed into
///
/// ```rust,ignore
/// struct State {
/// contracts: HashMap<CaseId, HashMap<ContractName, ContractByteCode>>
/// }
/// ```
///
/// Note that we follow the same syntax for defining wrapper structs but we do not permit the use of
/// generics.
#[macro_export]
macro_rules! define_wrapper_type {
(
$(#[$meta: meta])*
$vis:vis struct $ident: ident($ty: ty);
) => {
$(#[$meta])*
$vis struct $ident($ty);
impl $ident {
pub fn new(value: $ty) -> Self {
Self(value)
}
pub fn new_from<T: Into<$ty>>(value: T) -> Self {
Self(value.into())
}
pub fn into_inner(self) -> $ty {
self.0
}
pub fn as_inner(&self) -> &$ty {
&self.0
}
}
impl AsRef<$ty> for $ident {
fn as_ref(&self) -> &$ty {
&self.0
}
}
impl AsMut<$ty> for $ident {
fn as_mut(&mut self) -> &mut $ty {
&mut self.0
}
}
impl std::ops::Deref for $ident {
type Target = $ty;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl std::ops::DerefMut for $ident {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl From<$ty> for $ident {
fn from(value: $ty) -> Self {
Self(value)
}
}
impl From<$ident> for $ty {
fn from(value: $ident) -> Self {
value.0
}
}
};
}
/// Technically not needed but this allows for the macro to be found in the `macros` module of the
/// crate in addition to being found in the root of the crate.
pub use define_wrapper_type;
+3
View File
@@ -0,0 +1,3 @@
mod define_wrapper_type;
pub use define_wrapper_type::*;