Compare commits

...

14 Commits

Author SHA1 Message Date
Omar Abdulla 9aa26a99d6 Update the release profile 2025-07-15 14:06:53 +03:00
Omar Abdulla 02f853699e Improve the comments 2025-07-15 13:43:33 +03:00
Omar fde303f549 Update crates/node-interaction/src/blocking_executor.rs
Co-authored-by: xermicus <cyrill@parity.io>
2025-07-15 13:23:56 +03:00
Omar e5a751f507 Update crates/node-interaction/src/blocking_executor.rs
Co-authored-by: xermicus <cyrill@parity.io>
2025-07-15 13:23:50 +03:00
Omar d9d62b1038 Update crates/node-interaction/src/blocking_executor.rs
Co-authored-by: xermicus <cyrill@parity.io>
2025-07-15 13:23:15 +03:00
Omar 71ae3b0f9a Update crates/node-interaction/src/blocking_executor.rs
Co-authored-by: xermicus <cyrill@parity.io>
2025-07-15 13:23:03 +03:00
Omar 8cda6a9726 Update crates/node-interaction/src/blocking_executor.rs
Co-authored-by: xermicus <cyrill@parity.io>
2025-07-15 13:22:54 +03:00
Omar a43d94ea7d Update crates/node-interaction/src/blocking_executor.rs
Co-authored-by: xermicus <cyrill@parity.io>
2025-07-15 13:22:45 +03:00
Omar 6960298438 Update crates/node-interaction/src/blocking_executor.rs
Co-authored-by: xermicus <cyrill@parity.io>
2025-07-15 13:22:36 +03:00
Omar 62cf57d39e Update crates/node-interaction/src/blocking_executor.rs
Co-authored-by: xermicus <cyrill@parity.io>
2025-07-15 13:22:23 +03:00
Omar 3fc26eb03b Update crates/node-interaction/src/blocking_executor.rs
Co-authored-by: xermicus <cyrill@parity.io>
2025-07-15 13:22:15 +03:00
Omar 268437b4d9 Update crates/node-interaction/src/blocking_executor.rs
Co-authored-by: xermicus <cyrill@parity.io>
2025-07-15 13:22:03 +03:00
Omar Abdulla 27a0a0de0b Fix doc test 2025-07-14 21:33:57 +03:00
Omar Abdulla 331705134a Update the async runtime with syntactic sugar. 2025-07-14 21:13:58 +03:00
11 changed files with 254 additions and 254 deletions
Generated
+1
View File
@@ -4012,6 +4012,7 @@ version = "0.1.0"
dependencies = [ dependencies = [
"alloy", "alloy",
"anyhow", "anyhow",
"futures",
"once_cell", "once_cell",
"tokio", "tokio",
"tracing", "tracing",
+1
View File
@@ -25,6 +25,7 @@ alloy-primitives = "1.2.1"
alloy-sol-types = "1.2.1" alloy-sol-types = "1.2.1"
anyhow = "1.0" anyhow = "1.0"
clap = { version = "4", features = ["derive"] } clap = { version = "4", features = ["derive"] }
futures = { version = "0.3.31" }
hex = "0.4.3" hex = "0.4.3"
reqwest = { version = "0.12.15", features = ["blocking", "json"] } reqwest = { version = "0.12.15", features = ["blocking", "json"] }
once_cell = "1.21" once_cell = "1.21"
+1
View File
@@ -11,6 +11,7 @@ rust-version.workspace = true
[dependencies] [dependencies]
alloy = { workspace = true } alloy = { workspace = true }
anyhow = { workspace = true } anyhow = { workspace = true }
futures = { workspace = true }
tracing = { workspace = true } tracing = { workspace = true }
once_cell = { workspace = true } once_cell = { workspace = true }
tokio = { workspace = true } tokio = { workspace = true }
@@ -0,0 +1,221 @@
//! The alloy crate __requires__ a tokio runtime.
//! We contain any async rust right here.
use std::{any::Any, panic::AssertUnwindSafe, pin::Pin, thread};
use futures::FutureExt;
use once_cell::sync::Lazy;
use tokio::{
runtime::Builder,
sync::{mpsc::UnboundedSender, oneshot},
};
/// A blocking async executor.
///
/// This struct exposes the abstraction of a blocking async executor. It is a global and static
/// executor which means that it doesn't require for new instances of it to be created, it's a
/// singleton and can be accessed by any thread that wants to perform some async computation on the
/// blocking executor thread.
///
/// The API of the blocking executor is created in a way so that it's very natural, simple to use,
/// and unbounded to specific tasks or return types. The following is an example of using this
/// executor to drive an async computation:
///
/// ```rust
/// use revive_dt_node_interaction::*;
///
/// fn blocking_function() {
/// let result = BlockingExecutor::execute(async move {
/// tokio::time::sleep(std::time::Duration::from_secs(1)).await;
/// 0xFFu8
/// })
/// .expect("Computation failed");
///
/// assert_eq!(result, 0xFF);
/// }
/// ```
///
/// Users get to pass in their async tasks without needing to worry about putting them in a [`Box`],
/// [`Pin`], needing to perform down-casting, or the internal channel mechanism used by the runtime.
/// To the user, it just looks like a function that converts some async code into sync code.
///
/// This struct also handled panics that occur in the passed futures and converts them into errors
/// that can be handled by the user. This is done to allow the executor to be robust.
///
/// Internally, the executor communicates with the tokio runtime thread through channels which carry
/// the [`TaskMessage`] and the results of the execution.
pub struct BlockingExecutor;
impl BlockingExecutor {
pub fn execute<R>(future: impl Future<Output = R> + Send + 'static) -> Result<R, anyhow::Error>
where
R: Send + 'static,
{
// Note: The blocking executor is a singleton and therefore we store its state in a static
// so that it's assigned only once. Additionally, when we set the state of the executor we
// spawn the thread where the async runtime runs.
static STATE: Lazy<ExecutorState> = Lazy::new(|| {
tracing::trace!("Initializing the BlockingExecutor state");
// All communication with the tokio runtime thread happens over mspc channels where the
// producers here are the threads that want to run async tasks and the consumer here is
// the tokio runtime thread.
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<TaskMessage>();
thread::spawn(move || {
let runtime = Builder::new_current_thread()
.enable_all()
.build()
.expect("Failed to create the async runtime");
runtime.block_on(async move {
while let Some(TaskMessage {
future: task,
response_tx: response_channel,
}) = rx.recv().await
{
tracing::trace!("Received a new future to execute");
tokio::spawn(async move {
// One of the things that the blocking executor does is that it allows
// us to catch panics if they occur. By wrapping the given future in an
// AssertUnwindSafe::catch_unwind we are able to catch all panic unwinds
// in the given future and convert them into errors.
let task = AssertUnwindSafe(task).catch_unwind();
let result = task.await;
let _ = response_channel.send(result);
});
}
})
});
ExecutorState { tx }
});
// We need to perform blocking synchronous communication between the current thread and the
// tokio runtime thread with the result of the async computation and the oneshot channels
// from tokio allows us to do that. The sender side of the channel will be given to the
// tokio runtime thread to send the result when the computation is completed and the receive
// side of the channel will be kept with this thread to await for the response of the async
// task to come back.
let (response_tx, response_rx) =
oneshot::channel::<Result<Box<dyn Any + Send>, Box<dyn Any + Send>>>();
// The tokio runtime thread expects a Future<Output = Box<dyn Any + Send>> + Send to be
// sent to it to execute. However, this function has a typed Future<Output = R> + Send and
// therefore we need to change the type of the future to fit what the runtime thread expects
// in the task message. In doing this conversion, we lose some of the type information since
// we're converting R => dyn Any. However, we will perform down-casting on the result to
// convert it back into R.
let future = Box::pin(async move { Box::new(future.await) as Box<dyn Any + Send> });
let task = TaskMessage::new(future, response_tx);
if let Err(error) = STATE.tx.send(task) {
tracing::error!(?error, "Failed to send the task to the blocking executor");
anyhow::bail!("Failed to send the task to the blocking executor: {error:?}")
}
let result = match response_rx.blocking_recv() {
Ok(result) => result,
Err(error) => {
tracing::error!(
?error,
"Failed to get the response from the blocking executor"
);
anyhow::bail!("Failed to get the response from the blocking executor: {error:?}")
}
};
match result.map(|result| {
*result
.downcast::<R>()
.expect("Type mismatch in the downcast")
}) {
Ok(result) => Ok(result),
Err(error) => {
tracing::error!(
?error,
"Failed to downcast the returned result into the expected type"
);
anyhow::bail!(
"Failed to downcast the returned result into the expected type: {error:?}"
)
}
}
}
}
/// Represents the state of the async runtime. This runtime is designed to be a singleton runtime
/// which means that in the current running program there's just a single thread that has an async
/// runtime.
struct ExecutorState {
/// The sending side of the task messages channel. This is used by all of the other threads to
/// communicate with the async runtime thread.
tx: UnboundedSender<TaskMessage>,
}
/// Represents a message that contains an asynchronous task that's to be executed by the runtime
/// as well as a way for the runtime to report back on the result of the execution.
struct TaskMessage {
/// The task that's being requested to run. This is a future that returns an object that does
/// implement [`Any`] and [`Send`] to allow it to be sent between the requesting thread and the
/// async thread.
future: Pin<Box<dyn Future<Output = Box<dyn Any + Send>> + Send>>,
/// A one shot sender channel where the sender of the task is expecting to hear back on the
/// result of the task.
response_tx: oneshot::Sender<Result<Box<dyn Any + Send>, Box<dyn Any + Send>>>,
}
impl TaskMessage {
pub fn new(
future: Pin<Box<dyn Future<Output = Box<dyn Any + Send>> + Send>>,
response_tx: oneshot::Sender<Result<Box<dyn Any + Send>, Box<dyn Any + Send>>>,
) -> Self {
Self {
future,
response_tx,
}
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn simple_future_works() {
// Act
let result = BlockingExecutor::execute(async move {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
0xFFu8
})
.unwrap();
// Assert
assert_eq!(result, 0xFFu8);
}
#[test]
#[allow(unreachable_code, clippy::unreachable)]
fn panics_in_futures_are_caught() {
// Act
let result = BlockingExecutor::execute(async move {
panic!("This is a panic!");
0xFFu8
});
// Assert
assert!(result.is_err());
// Act
let result = BlockingExecutor::execute(async move {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
0xFFu8
})
.unwrap();
// Assert
assert_eq!(result, 0xFFu8)
}
}
+2 -5
View File
@@ -3,12 +3,9 @@
use alloy::primitives::Address; use alloy::primitives::Address;
use alloy::rpc::types::trace::geth::{DiffMode, GethTrace}; use alloy::rpc::types::trace::geth::{DiffMode, GethTrace};
use alloy::rpc::types::{TransactionReceipt, TransactionRequest}; use alloy::rpc::types::{TransactionReceipt, TransactionRequest};
use tokio_runtime::TO_TOKIO;
pub mod nonce; mod blocking_executor;
mod tokio_runtime; pub use blocking_executor::*;
pub mod trace;
pub mod transaction;
/// An interface for all interactions with Ethereum compatible nodes. /// An interface for all interactions with Ethereum compatible nodes.
pub trait EthereumNode { pub trait EthereumNode {
-55
View File
@@ -1,55 +0,0 @@
use std::pin::Pin;
use alloy::{
primitives::Address,
providers::{Provider, ProviderBuilder},
};
use tokio::sync::oneshot;
use crate::{TO_TOKIO, tokio_runtime::AsyncNodeInteraction};
pub type Task = Pin<Box<dyn Future<Output = anyhow::Result<u64>> + Send>>;
pub(crate) struct Nonce {
sender: oneshot::Sender<anyhow::Result<u64>>,
task: Task,
}
impl AsyncNodeInteraction for Nonce {
type Output = anyhow::Result<u64>;
fn split(
self,
) -> (
std::pin::Pin<Box<dyn Future<Output = Self::Output> + Send>>,
oneshot::Sender<Self::Output>,
) {
(self.task, self.sender)
}
}
/// This is like `trace_transaction`, just for nonces.
pub fn fetch_onchain_nonce(
connection: String,
wallet: alloy::network::EthereumWallet,
address: Address,
) -> anyhow::Result<u64> {
let sender = TO_TOKIO.lock().unwrap().nonce_sender.clone();
let (tx, rx) = oneshot::channel();
let task: Task = Box::pin(async move {
let provider = ProviderBuilder::new()
.wallet(wallet)
.connect(&connection)
.await?;
let onchain = provider.get_transaction_count(address).await?;
Ok(onchain)
});
sender
.blocking_send(Nonce { task, sender: tx })
.expect("not in async context");
rx.blocking_recv()
.unwrap_or_else(|err| anyhow::bail!("nonce fetch failed: {err}"))
}
@@ -1,87 +0,0 @@
//! The alloy crate __requires__ a tokio runtime.
//! We contain any async rust right here.
use once_cell::sync::Lazy;
use std::pin::Pin;
use std::sync::Mutex;
use std::thread;
use tokio::runtime::Runtime;
use tokio::spawn;
use tokio::sync::{mpsc, oneshot};
use tokio::task::JoinError;
use crate::nonce::Nonce;
use crate::trace::Trace;
use crate::transaction::Transaction;
pub(crate) static TO_TOKIO: Lazy<Mutex<TokioRuntime>> =
Lazy::new(|| Mutex::new(TokioRuntime::spawn()));
/// Common interface for executing async node interactions from a non-async context.
#[allow(clippy::type_complexity)]
pub(crate) trait AsyncNodeInteraction: Send + 'static {
type Output: Send;
//// Returns the task and the output sender.
fn split(
self,
) -> (
Pin<Box<dyn Future<Output = Self::Output> + Send>>,
oneshot::Sender<Self::Output>,
);
}
pub(crate) struct TokioRuntime {
pub(crate) transaction_sender: mpsc::Sender<Transaction>,
pub(crate) trace_sender: mpsc::Sender<Trace>,
pub(crate) nonce_sender: mpsc::Sender<Nonce>,
}
impl TokioRuntime {
fn spawn() -> Self {
let rt = Runtime::new().expect("should be able to create the tokio runtime");
let (transaction_sender, transaction_receiver) = mpsc::channel::<Transaction>(1024);
let (trace_sender, trace_receiver) = mpsc::channel::<Trace>(1024);
let (nonce_sender, nonce_receiver) = mpsc::channel::<Nonce>(1024);
thread::spawn(move || {
rt.block_on(async move {
let transaction_task = spawn(interaction::<Transaction>(transaction_receiver));
let trace_task = spawn(interaction::<Trace>(trace_receiver));
let nonce_task = spawn(interaction::<Nonce>(nonce_receiver));
if let Err(error) = transaction_task.await {
tracing::error!("tokio transaction task failed: {error}");
}
if let Err(error) = trace_task.await {
tracing::error!("tokio trace transaction task failed: {error}");
}
if let Err(error) = nonce_task.await {
tracing::error!("tokio nonce task failed: {error}");
}
});
});
Self {
transaction_sender,
trace_sender,
nonce_sender,
}
}
}
async fn interaction<T>(mut receiver: mpsc::Receiver<T>) -> Result<(), JoinError>
where
T: AsyncNodeInteraction,
{
while let Some(task) = receiver.recv().await {
spawn(async move {
let (task, sender) = task.split();
sender
.send(task.await)
.unwrap_or_else(|_| panic!("failed to send task output"));
});
}
Ok(())
}
-43
View File
@@ -1,43 +0,0 @@
//! Trace transactions in a sync context.
use std::pin::Pin;
use alloy::rpc::types::trace::geth::GethTrace;
use tokio::sync::oneshot;
use crate::TO_TOKIO;
use crate::tokio_runtime::AsyncNodeInteraction;
pub type Task = Pin<Box<dyn Future<Output = anyhow::Result<GethTrace>> + Send>>;
pub(crate) struct Trace {
sender: oneshot::Sender<anyhow::Result<GethTrace>>,
task: Task,
}
impl AsyncNodeInteraction for Trace {
type Output = anyhow::Result<GethTrace>;
fn split(
self,
) -> (
std::pin::Pin<Box<dyn Future<Output = Self::Output> + Send>>,
oneshot::Sender<Self::Output>,
) {
(self.task, self.sender)
}
}
/// Execute some [Task] that return a [GethTrace] result.
pub fn trace_transaction(task: Task) -> anyhow::Result<GethTrace> {
let task_sender = TO_TOKIO.lock().unwrap().trace_sender.clone();
let (sender, receiver) = oneshot::channel();
task_sender
.blocking_send(Trace { task, sender })
.expect("we are not calling this from an async context");
receiver
.blocking_recv()
.unwrap_or_else(|error| anyhow::bail!("no trace received: {error}"))
}
@@ -1,46 +0,0 @@
//! Execute transactions in a sync context.
use std::pin::Pin;
use alloy::rpc::types::TransactionReceipt;
use tokio::sync::oneshot;
use crate::TO_TOKIO;
use crate::tokio_runtime::AsyncNodeInteraction;
pub type Task = Pin<Box<dyn Future<Output = anyhow::Result<TransactionReceipt>> + Send>>;
pub(crate) struct Transaction {
receipt_sender: oneshot::Sender<anyhow::Result<TransactionReceipt>>,
task: Task,
}
impl AsyncNodeInteraction for Transaction {
type Output = anyhow::Result<TransactionReceipt>;
fn split(
self,
) -> (
Pin<Box<dyn Future<Output = Self::Output> + Send>>,
oneshot::Sender<Self::Output>,
) {
(self.task, self.receipt_sender)
}
}
/// Execute some [Task] that returns a [TransactionReceipt].
pub fn execute_transaction(task: Task) -> anyhow::Result<TransactionReceipt> {
let request_sender = TO_TOKIO.lock().unwrap().transaction_sender.clone();
let (receipt_sender, receipt_receiver) = oneshot::channel();
request_sender
.blocking_send(Transaction {
receipt_sender,
task,
})
.expect("we are not calling this from an async context");
receipt_receiver
.blocking_recv()
.unwrap_or_else(|error| anyhow::bail!("no receipt received: {error}"))
}
+14 -9
View File
@@ -23,10 +23,7 @@ use alloy::{
}, },
}; };
use revive_dt_config::Arguments; use revive_dt_config::Arguments;
use revive_dt_node_interaction::{ use revive_dt_node_interaction::{BlockingExecutor, EthereumNode};
EthereumNode, nonce::fetch_onchain_nonce, trace::trace_transaction,
transaction::execute_transaction,
};
use tracing::Level; use tracing::Level;
use crate::Node; use crate::Node;
@@ -205,7 +202,7 @@ impl EthereumNode for Instance {
let connection_string = self.connection_string(); let connection_string = self.connection_string();
let wallet = self.wallet.clone(); let wallet = self.wallet.clone();
execute_transaction(Box::pin(async move { BlockingExecutor::execute(async move {
let outer_span = tracing::debug_span!("Submitting transaction", ?transaction,); let outer_span = tracing::debug_span!("Submitting transaction", ?transaction,);
let _outer_guard = outer_span.enter(); let _outer_guard = outer_span.enter();
@@ -284,7 +281,7 @@ impl EthereumNode for Instance {
} }
} }
} }
})) })?
} }
#[tracing::instrument(skip_all, fields(geth_node_id = self.id))] #[tracing::instrument(skip_all, fields(geth_node_id = self.id))]
@@ -300,14 +297,14 @@ impl EthereumNode for Instance {
}); });
let wallet = self.wallet.clone(); let wallet = self.wallet.clone();
trace_transaction(Box::pin(async move { BlockingExecutor::execute(async move {
Ok(ProviderBuilder::new() Ok(ProviderBuilder::new()
.wallet(wallet) .wallet(wallet)
.connect(&connection_string) .connect(&connection_string)
.await? .await?
.debug_trace_transaction(transaction.transaction_hash, trace_options) .debug_trace_transaction(transaction.transaction_hash, trace_options)
.await?) .await?)
})) })?
} }
#[tracing::instrument(skip_all, fields(geth_node_id = self.id))] #[tracing::instrument(skip_all, fields(geth_node_id = self.id))]
@@ -329,7 +326,15 @@ impl EthereumNode for Instance {
let connection_string = self.connection_string.clone(); let connection_string = self.connection_string.clone();
let wallet = self.wallet.clone(); let wallet = self.wallet.clone();
let onchain_nonce = fetch_onchain_nonce(connection_string, wallet, address)?; let onchain_nonce = BlockingExecutor::execute::<anyhow::Result<_>>(async move {
ProviderBuilder::new()
.wallet(wallet)
.connect(&connection_string)
.await?
.get_transaction_count(address)
.await
.map_err(Into::into)
})??;
let mut nonces = self.nonces.lock().unwrap(); let mut nonces = self.nonces.lock().unwrap();
let current = nonces.entry(address).or_insert(onchain_nonce); let current = nonces.entry(address).or_insert(onchain_nonce);
+14 -9
View File
@@ -27,10 +27,7 @@ use sp_runtime::AccountId32;
use tracing::Level; use tracing::Level;
use revive_dt_config::Arguments; use revive_dt_config::Arguments;
use revive_dt_node_interaction::{ use revive_dt_node_interaction::{BlockingExecutor, EthereumNode};
EthereumNode, nonce::fetch_onchain_nonce, trace::trace_transaction,
transaction::execute_transaction,
};
use crate::Node; use crate::Node;
@@ -340,7 +337,7 @@ impl EthereumNode for KitchensinkNode {
tracing::debug!("Submitting transaction: {transaction:#?}"); tracing::debug!("Submitting transaction: {transaction:#?}");
tracing::info!("Submitting tx to kitchensink"); tracing::info!("Submitting tx to kitchensink");
let receipt = execute_transaction(Box::pin(async move { let receipt = BlockingExecutor::execute(async move {
Ok(ProviderBuilder::new() Ok(ProviderBuilder::new()
.wallet(wallet) .wallet(wallet)
.connect(&url) .connect(&url)
@@ -349,7 +346,7 @@ impl EthereumNode for KitchensinkNode {
.await? .await?
.get_receipt() .get_receipt()
.await?) .await?)
})); })?;
tracing::info!(?receipt, "Submitted tx to kitchensink"); tracing::info!(?receipt, "Submitted tx to kitchensink");
receipt receipt
} }
@@ -368,14 +365,14 @@ impl EthereumNode for KitchensinkNode {
let wallet = self.wallet.clone(); let wallet = self.wallet.clone();
trace_transaction(Box::pin(async move { BlockingExecutor::execute(async move {
Ok(ProviderBuilder::new() Ok(ProviderBuilder::new()
.wallet(wallet) .wallet(wallet)
.connect(&url) .connect(&url)
.await? .await?
.debug_trace_transaction(transaction.transaction_hash, trace_options) .debug_trace_transaction(transaction.transaction_hash, trace_options)
.await?) .await?)
})) })?
} }
#[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))] #[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))]
@@ -394,7 +391,15 @@ impl EthereumNode for KitchensinkNode {
let url = self.rpc_url.clone(); let url = self.rpc_url.clone();
let wallet = self.wallet.clone(); let wallet = self.wallet.clone();
let onchain_nonce = fetch_onchain_nonce(url, wallet, address)?; let onchain_nonce = BlockingExecutor::execute::<anyhow::Result<_>>(async move {
ProviderBuilder::new()
.wallet(wallet)
.connect(&url)
.await?
.get_transaction_count(address)
.await
.map_err(Into::into)
})??;
let mut nonces = self.nonces.lock().unwrap(); let mut nonces = self.nonces.lock().unwrap();
let current = nonces.entry(address).or_insert(onchain_nonce); let current = nonces.entry(address).or_insert(onchain_nonce);