make node interactions generic

Signed-off-by: Cyrill Leutwiler <bigcyrill@hotmail.com>
This commit is contained in:
Cyrill Leutwiler
2025-03-31 09:50:55 +02:00
parent 3edd72850f
commit ea17166448
10 changed files with 112 additions and 111 deletions
+22 -19
View File
@@ -1,12 +1,13 @@
//! The alloy crate is convenient but requires a tokio runtime.
//! The alloy crate __requires__ a tokio runtime.
//! We contain any async rust right here.
use once_cell::sync::Lazy;
use std::pin::Pin;
use std::sync::Mutex;
use std::thread;
use tokio::runtime::Runtime;
use tokio::spawn;
use tokio::sync::mpsc;
use tokio::sync::{mpsc, oneshot};
use tokio::task::JoinError;
use crate::trace::Trace;
@@ -15,15 +16,18 @@ use crate::transaction::Transaction;
pub(crate) static TO_TOKIO: Lazy<Mutex<TokioRuntime>> =
Lazy::new(|| Mutex::new(TokioRuntime::spawn()));
// Common interface for executing async node interactions from a non-async context.
/// Common interface for executing async node interactions from a non-async context.
#[allow(clippy::type_complexity)]
pub(crate) trait AsyncNodeInteraction: Send + 'static {
type Output: Send + 'static;
type Output: Send;
/// Any async calls the task needs to perform go here.
fn execute_async(self) -> impl std::future::Future<Output = Self::Output> + Send;
/// Returns the interactions output sender.
fn output_sender(&self) -> mpsc::Sender<Self::Output>;
//// Returns the task and the output sender.
fn split(
self,
) -> (
Pin<Box<dyn Future<Output = Self::Output> + Send>>,
oneshot::Sender<Self::Output>,
);
}
pub(crate) struct TokioRuntime {
@@ -58,18 +62,17 @@ impl TokioRuntime {
}
}
async fn interaction<T: AsyncNodeInteraction>(
mut receiver: mpsc::Receiver<T>,
) -> Result<(), JoinError> {
async fn interaction<T>(mut receiver: mpsc::Receiver<T>) -> Result<(), JoinError>
where
T: AsyncNodeInteraction,
{
while let Some(task) = receiver.recv().await {
spawn(async move {
let sender = task.output_sender();
let result = task.execute_async().await;
if let Err(error) = sender.send(result).await {
log::error!("failed to send task output: {error}");
}
})
.await?;
let (task, sender) = task.split();
sender
.send(task.await)
.unwrap_or_else(|_| panic!("failed to send task output"));
});
}
Ok(())
+24 -38
View File
@@ -1,57 +1,43 @@
//! Trace transactions in a sync context.
use alloy::primitives::TxHash;
use alloy::providers::ProviderBuilder;
use alloy::providers::ext::DebugApi;
use alloy::rpc::types::TransactionReceipt;
use alloy::rpc::types::trace::geth::{GethDebugTracingOptions, GethTrace};
use tokio::sync::mpsc;
use std::pin::Pin;
use alloy::rpc::types::trace::geth::GethTrace;
use tokio::sync::oneshot;
use crate::TO_TOKIO;
use crate::tokio_runtime::AsyncNodeInteraction;
pub type Task = Pin<Box<dyn Future<Output = anyhow::Result<GethTrace>> + Send>>;
pub(crate) struct Trace {
transaction_hash: TxHash,
options: GethDebugTracingOptions,
geth_trace_sender: mpsc::Sender<anyhow::Result<GethTrace>>,
connection_string: String,
sender: oneshot::Sender<anyhow::Result<GethTrace>>,
task: Task,
}
impl AsyncNodeInteraction for Trace {
type Output = anyhow::Result<GethTrace>;
async fn execute_async(self) -> Self::Output {
let provider = ProviderBuilder::new()
.connect(&self.connection_string)
.await?;
Ok(provider
.debug_trace_transaction(self.transaction_hash, self.options)
.await?)
}
fn output_sender(&self) -> mpsc::Sender<Self::Output> {
self.geth_trace_sender.clone()
fn split(
self,
) -> (
std::pin::Pin<Box<dyn Future<Output = Self::Output> + Send>>,
oneshot::Sender<Self::Output>,
) {
(self.task, self.sender)
}
}
/// Trace the transaction in [TransactionReceipt] against the `node`,
/// using the provided [GethDebugTracingOptions].
pub fn trace_transaction(
transaction_receipt: TransactionReceipt,
options: GethDebugTracingOptions,
connection_string: String,
) -> anyhow::Result<GethTrace> {
let trace_sender = TO_TOKIO.lock().unwrap().trace_sender.clone();
let (geth_trace_sender, mut geth_trace_receiver) = mpsc::channel(1);
/// Execute some [Task] that return a [GethTrace] result.
pub fn trace_transaction(task: Task) -> anyhow::Result<GethTrace> {
let task_sender = TO_TOKIO.lock().unwrap().trace_sender.clone();
let (sender, receiver) = oneshot::channel();
trace_sender.blocking_send(Trace {
transaction_hash: transaction_receipt.transaction_hash,
options,
geth_trace_sender,
connection_string,
})?;
task_sender
.blocking_send(Trace { task, sender })
.expect("we are not calling this from an async context");
geth_trace_receiver
receiver
.blocking_recv()
.unwrap_or_else(|| anyhow::bail!("no receipt received"))
.unwrap_or_else(|error| anyhow::bail!("no trace received: {error}"))
}
+25 -31
View File
@@ -1,52 +1,46 @@
//! Execute transactions in a sync context.
use alloy::providers::{Provider, ProviderBuilder};
use alloy::rpc::types::{TransactionReceipt, TransactionRequest};
use tokio::sync::mpsc;
use std::pin::Pin;
use alloy::rpc::types::TransactionReceipt;
use tokio::sync::oneshot;
use crate::TO_TOKIO;
use crate::tokio_runtime::AsyncNodeInteraction;
pub type Task = Pin<Box<dyn Future<Output = anyhow::Result<TransactionReceipt>> + Send>>;
pub(crate) struct Transaction {
transaction_request: TransactionRequest,
receipt_sender: mpsc::Sender<anyhow::Result<TransactionReceipt>>,
connection_string: String,
receipt_sender: oneshot::Sender<anyhow::Result<TransactionReceipt>>,
task: Task,
}
impl AsyncNodeInteraction for Transaction {
type Output = anyhow::Result<TransactionReceipt>;
async fn execute_async(self) -> Self::Output {
let provider = ProviderBuilder::new()
.connect(&self.connection_string)
.await?;
Ok(provider
.send_transaction(self.transaction_request)
.await?
.get_receipt()
.await?)
}
fn output_sender(&self) -> mpsc::Sender<Self::Output> {
self.receipt_sender.clone()
fn split(
self,
) -> (
Pin<Box<dyn Future<Output = Self::Output> + Send>>,
oneshot::Sender<Self::Output>,
) {
(self.task, self.receipt_sender)
}
}
/// Execute the [TransactionRequest] against the `node`.
pub fn execute_transaction(
transaction_request: TransactionRequest,
connection_string: String,
) -> anyhow::Result<TransactionReceipt> {
/// Execute some [Task] that returns a [TransactionReceipt].
pub fn execute_transaction(task: Task) -> anyhow::Result<TransactionReceipt> {
let request_sender = TO_TOKIO.lock().unwrap().transaction_sender.clone();
let (receipt_sender, mut receipt_receiver) = mpsc::channel(1);
let (receipt_sender, receipt_receiver) = oneshot::channel();
request_sender.blocking_send(Transaction {
transaction_request,
receipt_sender,
connection_string,
})?;
request_sender
.blocking_send(Transaction {
receipt_sender,
task,
})
.expect("we are not calling this from an async context");
receipt_receiver
.blocking_recv()
.unwrap_or_else(|| anyhow::bail!("no receipt received"))
.unwrap_or_else(|error| anyhow::bail!("no receipt received: {error}"))
}