diff --git a/Cargo.lock b/Cargo.lock index 42b6164..eebc456 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4012,6 +4012,7 @@ version = "0.1.0" dependencies = [ "alloy", "anyhow", + "futures", "once_cell", "tokio", "tracing", diff --git a/Cargo.toml b/Cargo.toml index 5d1d6a7..9f3c875 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,7 @@ alloy-primitives = "1.2.1" alloy-sol-types = "1.2.1" anyhow = "1.0" clap = { version = "4", features = ["derive"] } +futures = { version = "0.3.31" } hex = "0.4.3" reqwest = { version = "0.12.15", features = ["blocking", "json"] } once_cell = "1.21" diff --git a/crates/node-interaction/Cargo.toml b/crates/node-interaction/Cargo.toml index b044456..84ea315 100644 --- a/crates/node-interaction/Cargo.toml +++ b/crates/node-interaction/Cargo.toml @@ -11,6 +11,7 @@ rust-version.workspace = true [dependencies] alloy = { workspace = true } anyhow = { workspace = true } +futures = { workspace = true } tracing = { workspace = true } once_cell = { workspace = true } tokio = { workspace = true } diff --git a/crates/node-interaction/src/blocking_executor.rs b/crates/node-interaction/src/blocking_executor.rs new file mode 100644 index 0000000..043dd19 --- /dev/null +++ b/crates/node-interaction/src/blocking_executor.rs @@ -0,0 +1,221 @@ +//! The alloy crate __requires__ a tokio runtime. +//! We contain any async rust right here. + +use std::{any::Any, panic::AssertUnwindSafe, pin::Pin, thread}; + +use futures::FutureExt; +use once_cell::sync::Lazy; +use tokio::{ + runtime::Builder, + sync::{mpsc::UnboundedSender, oneshot}, +}; + +/// A blocking async executor. +/// +/// This struct exposes the abstraction of a blocking async executor. It is a global and static +/// executor which means that it doesn't require for new instances of it to be created, it's a +/// singleton and can be accessed by any thread that wants to perform some async computation on the +/// blocking executor thread. +/// +/// The API of the blocking executor is created in a way so that it's very natural, simple to use, +/// and unbounded to specific tasks or return types. The following is an example of using this +/// executor to drive an async computation: +/// +/// ```rust +/// use revive_dt_node_interaction::*; +/// +/// fn blocking_function() { +/// let result = BlockingExecutor::execute(async move { +/// tokio::time::sleep(std::time::Duration::from_secs(1)).await; +/// 0xFFu8 +/// }) +/// .expect("Computation failed"); +/// +/// assert_eq!(result, 0xFF); +/// } +/// ``` +/// +/// Users get to pass in their async tasks without needing to worry about putting them in a [`Box`], +/// [`Pin`], needing to perform down-casting, or the internal channel mechanism used by the runtime. +/// To the user, it just looks like a function that converts some async code into sync code. +/// +/// This struct also handled panics that occur in the passed futures and converts them into errors +/// that can be handled by the user. This is done to allow the executor to be robust. +/// +/// Internally, the executor communicates with the tokio runtime thread through channels which carry +/// the [`TaskMessage`] and the results of the execution. +pub struct BlockingExecutor; + +impl BlockingExecutor { + pub fn execute(future: impl Future + Send + 'static) -> Result + where + R: Send + 'static, + { + // Note: The blocking executor is a singleton and therefore we store its state in a static + // so that it's assigned only once. Additionally, when we set the state of the executor we + // spawn the thread where the async runtime runs. + static STATE: Lazy = Lazy::new(|| { + tracing::trace!("Initializing the BlockingExecutor state"); + + // All communication with the tokio runtime thread happens over mspc channels where the + // producers here are the threads that want to run async tasks and the consumer here is + // the tokio runtime thread. + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::(); + + thread::spawn(move || { + let runtime = Builder::new_current_thread() + .enable_all() + .build() + .expect("Failed to create the async runtime"); + + runtime.block_on(async move { + while let Some(TaskMessage { + future: task, + response_tx: response_channel, + }) = rx.recv().await + { + tracing::trace!("Received a new future to execute"); + tokio::spawn(async move { + // One of the things that the blocking executor does is that it allows + // us to catch panics if they occur. By wrapping the given future in an + // AssertUnwindSafe::catch_unwind we are able to catch all panic unwinds + // in the given future and convert them into errors. + let task = AssertUnwindSafe(task).catch_unwind(); + + let result = task.await; + let _ = response_channel.send(result); + }); + } + }) + }); + + ExecutorState { tx } + }); + + // We need to perform blocking synchronous communication between the current thread and the + // tokio runtime thread with the result of the async computation and the oneshot channels + // from tokio allows us to do that. The sender side of the channel will be given to the + // tokio runtime thread to send the result when the computation is completed and the receive + // side of the channel will be kept with this thread to await for the response of the async + // task to come back. + let (response_tx, response_rx) = + oneshot::channel::, Box>>(); + + // The tokio runtime thread expects a Future> + Send to be + // sent to it to execute. However, this function has a typed Future + Send and + // therefore we need to change the type of the future to fit what the runtime thread expects + // in the task message. In doing this conversion, we lose some of the type information since + // we're converting R => dyn Any. However, we will perform down-casting on the result to + // convert it back into R. + let future = Box::pin(async move { Box::new(future.await) as Box }); + + let task = TaskMessage::new(future, response_tx); + if let Err(error) = STATE.tx.send(task) { + tracing::error!(?error, "Failed to send the task to the blocking executor"); + anyhow::bail!("Failed to send the task to the blocking executor: {error:?}") + } + + let result = match response_rx.blocking_recv() { + Ok(result) => result, + Err(error) => { + tracing::error!( + ?error, + "Failed to get the response from the blocking executor" + ); + anyhow::bail!("Failed to get the response from the blocking executor: {error:?}") + } + }; + + match result.map(|result| { + *result + .downcast::() + .expect("Type mismatch in the downcast") + }) { + Ok(result) => Ok(result), + Err(error) => { + tracing::error!( + ?error, + "Failed to downcast the returned result into the expected type" + ); + anyhow::bail!( + "Failed to downcast the returned result into the expected type: {error:?}" + ) + } + } + } +} + +/// Represents the state of the async runtime. This runtime is designed to be a singleton runtime +/// which means that in the current running program there's just a single thread that has an async +/// runtime. +struct ExecutorState { + /// The sending side of the task messages channel. This is used by all of the other threads to + /// communicate with the async runtime thread. + tx: UnboundedSender, +} + +/// Represents a message that contains an asynchronous task that's to be executed by the runtime +/// as well as a way for the runtime to report back on the result of the execution. +struct TaskMessage { + /// The task that's being requested to run. This is a future that returns an object that does + /// implement [`Any`] and [`Send`] to allow it to be sent between the requesting thread and the + /// async thread. + future: Pin> + Send>>, + + /// A one shot sender channel where the sender of the task is expecting to hear back on the + /// result of the task. + response_tx: oneshot::Sender, Box>>, +} + +impl TaskMessage { + pub fn new( + future: Pin> + Send>>, + response_tx: oneshot::Sender, Box>>, + ) -> Self { + Self { + future, + response_tx, + } + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn simple_future_works() { + // Act + let result = BlockingExecutor::execute(async move { + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + 0xFFu8 + }) + .unwrap(); + + // Assert + assert_eq!(result, 0xFFu8); + } + + #[test] + #[allow(unreachable_code, clippy::unreachable)] + fn panics_in_futures_are_caught() { + // Act + let result = BlockingExecutor::execute(async move { + panic!("This is a panic!"); + 0xFFu8 + }); + + // Assert + assert!(result.is_err()); + + // Act + let result = BlockingExecutor::execute(async move { + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + 0xFFu8 + }) + .unwrap(); + + // Assert + assert_eq!(result, 0xFFu8) + } +} diff --git a/crates/node-interaction/src/lib.rs b/crates/node-interaction/src/lib.rs index 2006d1b..2c2eef5 100644 --- a/crates/node-interaction/src/lib.rs +++ b/crates/node-interaction/src/lib.rs @@ -3,12 +3,9 @@ use alloy::primitives::Address; use alloy::rpc::types::trace::geth::{DiffMode, GethTrace}; use alloy::rpc::types::{TransactionReceipt, TransactionRequest}; -use tokio_runtime::TO_TOKIO; -pub mod nonce; -mod tokio_runtime; -pub mod trace; -pub mod transaction; +mod blocking_executor; +pub use blocking_executor::*; /// An interface for all interactions with Ethereum compatible nodes. pub trait EthereumNode { diff --git a/crates/node-interaction/src/nonce.rs b/crates/node-interaction/src/nonce.rs deleted file mode 100644 index 53b73c9..0000000 --- a/crates/node-interaction/src/nonce.rs +++ /dev/null @@ -1,55 +0,0 @@ -use std::pin::Pin; - -use alloy::{ - primitives::Address, - providers::{Provider, ProviderBuilder}, -}; -use tokio::sync::oneshot; - -use crate::{TO_TOKIO, tokio_runtime::AsyncNodeInteraction}; - -pub type Task = Pin> + Send>>; - -pub(crate) struct Nonce { - sender: oneshot::Sender>, - task: Task, -} - -impl AsyncNodeInteraction for Nonce { - type Output = anyhow::Result; - - fn split( - self, - ) -> ( - std::pin::Pin + Send>>, - oneshot::Sender, - ) { - (self.task, self.sender) - } -} - -/// This is like `trace_transaction`, just for nonces. -pub fn fetch_onchain_nonce( - connection: String, - wallet: alloy::network::EthereumWallet, - address: Address, -) -> anyhow::Result { - let sender = TO_TOKIO.lock().unwrap().nonce_sender.clone(); - - let (tx, rx) = oneshot::channel(); - let task: Task = Box::pin(async move { - let provider = ProviderBuilder::new() - .wallet(wallet) - .connect(&connection) - .await?; - let onchain = provider.get_transaction_count(address).await?; - Ok(onchain) - }); - - sender - .blocking_send(Nonce { task, sender: tx }) - .expect("not in async context"); - - rx.blocking_recv() - .unwrap_or_else(|err| anyhow::bail!("nonce fetch failed: {err}")) -} diff --git a/crates/node-interaction/src/tokio_runtime.rs b/crates/node-interaction/src/tokio_runtime.rs deleted file mode 100644 index 20a3ae3..0000000 --- a/crates/node-interaction/src/tokio_runtime.rs +++ /dev/null @@ -1,87 +0,0 @@ -//! The alloy crate __requires__ a tokio runtime. -//! We contain any async rust right here. - -use once_cell::sync::Lazy; -use std::pin::Pin; -use std::sync::Mutex; -use std::thread; -use tokio::runtime::Runtime; -use tokio::spawn; -use tokio::sync::{mpsc, oneshot}; -use tokio::task::JoinError; - -use crate::nonce::Nonce; -use crate::trace::Trace; -use crate::transaction::Transaction; - -pub(crate) static TO_TOKIO: Lazy> = - Lazy::new(|| Mutex::new(TokioRuntime::spawn())); - -/// Common interface for executing async node interactions from a non-async context. -#[allow(clippy::type_complexity)] -pub(crate) trait AsyncNodeInteraction: Send + 'static { - type Output: Send; - - //// Returns the task and the output sender. - fn split( - self, - ) -> ( - Pin + Send>>, - oneshot::Sender, - ); -} - -pub(crate) struct TokioRuntime { - pub(crate) transaction_sender: mpsc::Sender, - pub(crate) trace_sender: mpsc::Sender, - pub(crate) nonce_sender: mpsc::Sender, -} - -impl TokioRuntime { - fn spawn() -> Self { - let rt = Runtime::new().expect("should be able to create the tokio runtime"); - let (transaction_sender, transaction_receiver) = mpsc::channel::(1024); - let (trace_sender, trace_receiver) = mpsc::channel::(1024); - let (nonce_sender, nonce_receiver) = mpsc::channel::(1024); - - thread::spawn(move || { - rt.block_on(async move { - let transaction_task = spawn(interaction::(transaction_receiver)); - let trace_task = spawn(interaction::(trace_receiver)); - let nonce_task = spawn(interaction::(nonce_receiver)); - - if let Err(error) = transaction_task.await { - tracing::error!("tokio transaction task failed: {error}"); - } - if let Err(error) = trace_task.await { - tracing::error!("tokio trace transaction task failed: {error}"); - } - if let Err(error) = nonce_task.await { - tracing::error!("tokio nonce task failed: {error}"); - } - }); - }); - - Self { - transaction_sender, - trace_sender, - nonce_sender, - } - } -} - -async fn interaction(mut receiver: mpsc::Receiver) -> Result<(), JoinError> -where - T: AsyncNodeInteraction, -{ - while let Some(task) = receiver.recv().await { - spawn(async move { - let (task, sender) = task.split(); - sender - .send(task.await) - .unwrap_or_else(|_| panic!("failed to send task output")); - }); - } - - Ok(()) -} diff --git a/crates/node-interaction/src/trace.rs b/crates/node-interaction/src/trace.rs deleted file mode 100644 index 9255d00..0000000 --- a/crates/node-interaction/src/trace.rs +++ /dev/null @@ -1,43 +0,0 @@ -//! Trace transactions in a sync context. - -use std::pin::Pin; - -use alloy::rpc::types::trace::geth::GethTrace; -use tokio::sync::oneshot; - -use crate::TO_TOKIO; -use crate::tokio_runtime::AsyncNodeInteraction; - -pub type Task = Pin> + Send>>; - -pub(crate) struct Trace { - sender: oneshot::Sender>, - task: Task, -} - -impl AsyncNodeInteraction for Trace { - type Output = anyhow::Result; - - fn split( - self, - ) -> ( - std::pin::Pin + Send>>, - oneshot::Sender, - ) { - (self.task, self.sender) - } -} - -/// Execute some [Task] that return a [GethTrace] result. -pub fn trace_transaction(task: Task) -> anyhow::Result { - let task_sender = TO_TOKIO.lock().unwrap().trace_sender.clone(); - let (sender, receiver) = oneshot::channel(); - - task_sender - .blocking_send(Trace { task, sender }) - .expect("we are not calling this from an async context"); - - receiver - .blocking_recv() - .unwrap_or_else(|error| anyhow::bail!("no trace received: {error}")) -} diff --git a/crates/node-interaction/src/transaction.rs b/crates/node-interaction/src/transaction.rs deleted file mode 100644 index b5af221..0000000 --- a/crates/node-interaction/src/transaction.rs +++ /dev/null @@ -1,46 +0,0 @@ -//! Execute transactions in a sync context. - -use std::pin::Pin; - -use alloy::rpc::types::TransactionReceipt; -use tokio::sync::oneshot; - -use crate::TO_TOKIO; -use crate::tokio_runtime::AsyncNodeInteraction; - -pub type Task = Pin> + Send>>; - -pub(crate) struct Transaction { - receipt_sender: oneshot::Sender>, - task: Task, -} - -impl AsyncNodeInteraction for Transaction { - type Output = anyhow::Result; - - fn split( - self, - ) -> ( - Pin + Send>>, - oneshot::Sender, - ) { - (self.task, self.receipt_sender) - } -} - -/// Execute some [Task] that returns a [TransactionReceipt]. -pub fn execute_transaction(task: Task) -> anyhow::Result { - let request_sender = TO_TOKIO.lock().unwrap().transaction_sender.clone(); - let (receipt_sender, receipt_receiver) = oneshot::channel(); - - request_sender - .blocking_send(Transaction { - receipt_sender, - task, - }) - .expect("we are not calling this from an async context"); - - receipt_receiver - .blocking_recv() - .unwrap_or_else(|error| anyhow::bail!("no receipt received: {error}")) -} diff --git a/crates/node/src/geth.rs b/crates/node/src/geth.rs index 99893a0..57cb2db 100644 --- a/crates/node/src/geth.rs +++ b/crates/node/src/geth.rs @@ -23,10 +23,7 @@ use alloy::{ }, }; use revive_dt_config::Arguments; -use revive_dt_node_interaction::{ - EthereumNode, nonce::fetch_onchain_nonce, trace::trace_transaction, - transaction::execute_transaction, -}; +use revive_dt_node_interaction::{BlockingExecutor, EthereumNode}; use tracing::Level; use crate::Node; @@ -205,7 +202,7 @@ impl EthereumNode for Instance { let connection_string = self.connection_string(); let wallet = self.wallet.clone(); - execute_transaction(Box::pin(async move { + BlockingExecutor::execute(async move { let outer_span = tracing::debug_span!("Submitting transaction", ?transaction,); let _outer_guard = outer_span.enter(); @@ -284,7 +281,7 @@ impl EthereumNode for Instance { } } } - })) + })? } #[tracing::instrument(skip_all, fields(geth_node_id = self.id))] @@ -300,14 +297,14 @@ impl EthereumNode for Instance { }); let wallet = self.wallet.clone(); - trace_transaction(Box::pin(async move { + BlockingExecutor::execute(async move { Ok(ProviderBuilder::new() .wallet(wallet) .connect(&connection_string) .await? .debug_trace_transaction(transaction.transaction_hash, trace_options) .await?) - })) + })? } #[tracing::instrument(skip_all, fields(geth_node_id = self.id))] @@ -329,7 +326,15 @@ impl EthereumNode for Instance { let connection_string = self.connection_string.clone(); let wallet = self.wallet.clone(); - let onchain_nonce = fetch_onchain_nonce(connection_string, wallet, address)?; + let onchain_nonce = BlockingExecutor::execute::>(async move { + ProviderBuilder::new() + .wallet(wallet) + .connect(&connection_string) + .await? + .get_transaction_count(address) + .await + .map_err(Into::into) + })??; let mut nonces = self.nonces.lock().unwrap(); let current = nonces.entry(address).or_insert(onchain_nonce); diff --git a/crates/node/src/kitchensink.rs b/crates/node/src/kitchensink.rs index c05f54c..768c1aa 100644 --- a/crates/node/src/kitchensink.rs +++ b/crates/node/src/kitchensink.rs @@ -27,10 +27,7 @@ use sp_runtime::AccountId32; use tracing::Level; use revive_dt_config::Arguments; -use revive_dt_node_interaction::{ - EthereumNode, nonce::fetch_onchain_nonce, trace::trace_transaction, - transaction::execute_transaction, -}; +use revive_dt_node_interaction::{BlockingExecutor, EthereumNode}; use crate::Node; @@ -340,7 +337,7 @@ impl EthereumNode for KitchensinkNode { tracing::debug!("Submitting transaction: {transaction:#?}"); tracing::info!("Submitting tx to kitchensink"); - let receipt = execute_transaction(Box::pin(async move { + let receipt = BlockingExecutor::execute(async move { Ok(ProviderBuilder::new() .wallet(wallet) .connect(&url) @@ -349,7 +346,7 @@ impl EthereumNode for KitchensinkNode { .await? .get_receipt() .await?) - })); + })?; tracing::info!(?receipt, "Submitted tx to kitchensink"); receipt } @@ -368,14 +365,14 @@ impl EthereumNode for KitchensinkNode { let wallet = self.wallet.clone(); - trace_transaction(Box::pin(async move { + BlockingExecutor::execute(async move { Ok(ProviderBuilder::new() .wallet(wallet) .connect(&url) .await? .debug_trace_transaction(transaction.transaction_hash, trace_options) .await?) - })) + })? } #[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))] @@ -394,7 +391,15 @@ impl EthereumNode for KitchensinkNode { let url = self.rpc_url.clone(); let wallet = self.wallet.clone(); - let onchain_nonce = fetch_onchain_nonce(url, wallet, address)?; + let onchain_nonce = BlockingExecutor::execute::>(async move { + ProviderBuilder::new() + .wallet(wallet) + .connect(&url) + .await? + .get_transaction_count(address) + .await + .map_err(Into::into) + })??; let mut nonces = self.nonces.lock().unwrap(); let current = nonces.entry(address).or_insert(onchain_nonce);