the node interaction interface

Signed-off-by: xermicus <bigcyrill@hotmail.com>
This commit is contained in:
xermicus
2025-03-22 19:41:24 +01:00
parent 3b713ad2cb
commit f9a0542d49
12 changed files with 276 additions and 57 deletions
+19 -39
View File
@@ -1,45 +1,25 @@
//! Implements helpers for node interactions using transactions.
//!
//! The alloy crate is convenient but requires running in a tokio runtime.
//! We contain any async rust right here.
//! This crate implements all node interactions.
use once_cell::sync::Lazy;
use std::sync::Mutex;
use std::thread;
use tokio::runtime::Runtime;
use tokio::sync::mpsc;
use transaction::Transaction;
use alloy::rpc::types::trace::geth::GethTrace;
use alloy::rpc::types::{TransactionReceipt, TransactionRequest};
use revive_dt_node::Node;
use tokio_runtime::TO_TOKIO;
mod tokio_runtime;
pub mod trace;
pub mod transaction;
pub(crate) static TO_TOKIO: Lazy<Mutex<TokioRuntime>> =
Lazy::new(|| Mutex::new(TokioRuntime::spawn()));
/// An interface for all node interactions.
pub trait NodeInteraction: Node {
/// Execute the [TransactionRequest] and return a [TransactionReceipt].
fn execute_transaction(
&self,
transaction_request: TransactionRequest,
) -> anyhow::Result<TransactionReceipt>;
pub struct TokioRuntime {
pub transaction_sender: mpsc::Sender<Transaction>,
}
impl TokioRuntime {
pub fn spawn() -> Self {
let rt = Runtime::new().expect("should be able to create the tokio runtime");
let (transaction_sender, mut transaction_receiver) = mpsc::channel::<Transaction>(1024);
thread::spawn(move || {
rt.block_on(async move {
while let Some(transaction) = transaction_receiver.recv().await {
tokio::task::spawn(async move {
let sender = transaction.receipt_sender.clone();
let result = transaction.execute().await;
if let Err(error) = sender.send(result).await {
log::error!("failed to send transaction receipt: {error}");
}
})
.await
.expect("should alaways be able to spawn the tokio tasks");
}
});
});
Self { transaction_sender }
}
/// Trace the transaction in the [TransactionReceipt] and return a [GethTrace].
fn trace_transaction(
&self,
transaction_receipt: TransactionReceipt,
) -> anyhow::Result<GethTrace>;
}
@@ -0,0 +1,76 @@
//! The alloy crate is convenient but requires a tokio runtime.
//! We contain any async rust right here.
use once_cell::sync::Lazy;
use std::sync::Mutex;
use std::thread;
use tokio::runtime::Runtime;
use tokio::spawn;
use tokio::sync::mpsc;
use tokio::task::JoinError;
use crate::trace::Trace;
use crate::transaction::Transaction;
pub(crate) static TO_TOKIO: Lazy<Mutex<TokioRuntime>> =
Lazy::new(|| Mutex::new(TokioRuntime::spawn()));
// Common interface for executing async node interactions from a non-async context.
pub(crate) trait AsyncNodeInteraction: Send + 'static {
type Output: Send + 'static;
/// Any async calls the task needs to perform go here.
fn execute_async(self) -> impl std::future::Future<Output = Self::Output> + Send;
/// Returns the interactions output sender.
fn output_sender(&self) -> mpsc::Sender<Self::Output>;
}
pub(crate) struct TokioRuntime {
pub(crate) transaction_sender: mpsc::Sender<Transaction>,
pub(crate) trace_sender: mpsc::Sender<Trace>,
}
impl TokioRuntime {
fn spawn() -> Self {
let rt = Runtime::new().expect("should be able to create the tokio runtime");
let (transaction_sender, transaction_receiver) = mpsc::channel::<Transaction>(1024);
let (trace_sender, trace_receiver) = mpsc::channel::<Trace>(1024);
thread::spawn(move || {
rt.block_on(async move {
let transaction_task = spawn(interaction::<Transaction>(transaction_receiver));
let trace_task = spawn(interaction::<Trace>(trace_receiver));
if let Err(error) = transaction_task.await {
log::error!("tokio transaction task failed: {error}");
}
if let Err(error) = trace_task.await {
log::error!("tokio trace transaction task failed: {error}");
}
});
});
Self {
transaction_sender,
trace_sender,
}
}
}
async fn interaction<T: AsyncNodeInteraction>(
mut receiver: mpsc::Receiver<T>,
) -> Result<(), JoinError> {
while let Some(task) = receiver.recv().await {
spawn(async move {
let sender = task.output_sender();
let result = task.execute_async().await;
if let Err(error) = sender.send(result).await {
log::error!("failed to send task output: {error}");
}
})
.await?;
}
Ok(())
}
+58
View File
@@ -0,0 +1,58 @@
//! Trace transactions in a sync context.
use alloy::primitives::TxHash;
use alloy::providers::ProviderBuilder;
use alloy::providers::ext::DebugApi;
use alloy::rpc::types::TransactionReceipt;
use alloy::rpc::types::trace::geth::{GethDebugTracingOptions, GethTrace};
use revive_dt_node::Node;
use tokio::sync::mpsc;
use crate::TO_TOKIO;
use crate::tokio_runtime::AsyncNodeInteraction;
pub(crate) struct Trace {
transaction_hash: TxHash,
options: GethDebugTracingOptions,
geth_trace_sender: mpsc::Sender<anyhow::Result<GethTrace>>,
connection_string: String,
}
impl AsyncNodeInteraction for Trace {
type Output = anyhow::Result<GethTrace>;
async fn execute_async(self) -> Self::Output {
let provider = ProviderBuilder::new()
.connect(&self.connection_string)
.await?;
Ok(provider
.debug_trace_transaction(self.transaction_hash, self.options)
.await?)
}
fn output_sender(&self) -> mpsc::Sender<Self::Output> {
self.geth_trace_sender.clone()
}
}
/// Trace the transaction in [TransactionReceipt] against the `node`,
/// using the provided [GethDebugTracingOptions].
pub fn trace_transaction<T: Node>(
transaction_receipt: TransactionReceipt,
options: GethDebugTracingOptions,
node: &T,
) -> anyhow::Result<GethTrace> {
let trace_sender = TO_TOKIO.lock().unwrap().trace_sender.clone();
let (geth_trace_sender, mut geth_trace_receiver) = mpsc::channel(1);
trace_sender.blocking_send(Trace {
transaction_hash: transaction_receipt.transaction_hash,
options,
geth_trace_sender,
connection_string: node.connection_string(),
})?;
geth_trace_receiver
.blocking_recv()
.unwrap_or_else(|| anyhow::bail!("no receipt received"))
}
+20 -9
View File
@@ -1,17 +1,23 @@
//! Execute transactions in a sync context.
use alloy::providers::{Provider, ProviderBuilder};
use alloy::rpc::types::{TransactionReceipt, TransactionRequest};
use revive_dt_node::Node;
use tokio::sync::mpsc;
use crate::TO_TOKIO;
use crate::tokio_runtime::AsyncNodeInteraction;
pub struct Transaction {
pub transaction_request: TransactionRequest,
pub receipt_sender: mpsc::Sender<anyhow::Result<TransactionReceipt>>,
pub connection_string: String,
pub(crate) struct Transaction {
transaction_request: TransactionRequest,
receipt_sender: mpsc::Sender<anyhow::Result<TransactionReceipt>>,
connection_string: String,
}
impl Transaction {
pub async fn execute(self) -> anyhow::Result<TransactionReceipt> {
impl AsyncNodeInteraction for Transaction {
type Output = anyhow::Result<TransactionReceipt>;
async fn execute_async(self) -> Self::Output {
let provider = ProviderBuilder::new()
.connect(&self.connection_string)
.await?;
@@ -21,11 +27,16 @@ impl Transaction {
.get_receipt()
.await?)
}
fn output_sender(&self) -> mpsc::Sender<Self::Output> {
self.receipt_sender.clone()
}
}
pub fn execute_transaction(
/// Execute the [TransactionRequest] against the `node`.
pub fn execute_transaction<T: Node>(
transaction_request: TransactionRequest,
connection_string: String,
node: &T,
) -> anyhow::Result<TransactionReceipt> {
let request_sender = TO_TOKIO.lock().unwrap().transaction_sender.clone();
let (receipt_sender, mut receipt_receiver) = mpsc::channel(1);
@@ -33,7 +44,7 @@ pub fn execute_transaction(
request_sender.blocking_send(Transaction {
transaction_request,
receipt_sender,
connection_string,
connection_string: node.connection_string(),
})?;
receipt_receiver