Make the ethereum node trait object compatible

This commit is contained in:
Omar Abdulla
2025-09-17 06:01:13 +03:00
parent 9fc74aeea0
commit 8a05f8e6e8
5 changed files with 231 additions and 190 deletions
+139 -119
View File
@@ -25,7 +25,7 @@ use alloy::{
fillers::{CachedNonceManager, ChainIdFiller, FillProvider, NonceFiller, TxFiller},
},
rpc::types::{
EIP1186AccountProofResponse, TransactionReceipt, TransactionRequest,
EIP1186AccountProofResponse, TransactionRequest,
trace::geth::{DiffMode, GethDebugTracingOptions, PreStateConfig, PreStateFrame},
},
};
@@ -296,152 +296,172 @@ impl EthereumNode for GethNode {
fields(geth_node_id = self.id, connection_string = self.connection_string),
err,
)]
async fn execute_transaction(
fn execute_transaction(
&self,
transaction: TransactionRequest,
) -> anyhow::Result<alloy::rpc::types::TransactionReceipt> {
let provider = self
.provider()
.await
.context("Failed to create provider for transaction submission")?;
) -> Pin<Box<dyn Future<Output = anyhow::Result<alloy::rpc::types::TransactionReceipt>> + '_>>
{
Box::pin(async move {
let provider = self
.provider()
.await
.context("Failed to create provider for transaction submission")?;
let pending_transaction = provider
let pending_transaction = provider
.send_transaction(transaction)
.await
.inspect_err(
|err| tracing::error!(%err, "Encountered an error when submitting the transaction"),
)
.context("Failed to submit transaction to geth node")?;
let transaction_hash = *pending_transaction.tx_hash();
let transaction_hash = *pending_transaction.tx_hash();
// The following is a fix for the "transaction indexing is in progress" error that we used
// to get. You can find more information on this in the following GH issue in geth
// https://github.com/ethereum/go-ethereum/issues/28877. To summarize what's going on,
// before we can get the receipt of the transaction it needs to have been indexed by the
// node's indexer. Just because the transaction has been confirmed it doesn't mean that it
// has been indexed. When we call alloy's `get_receipt` it checks if the transaction was
// confirmed. If it has been, then it will call `eth_getTransactionReceipt` method which
// _might_ return the above error if the tx has not yet been indexed yet. So, we need to
// implement a retry mechanism for the receipt to keep retrying to get it until it
// eventually works, but we only do that if the error we get back is the "transaction
// indexing is in progress" error or if the receipt is None.
//
// Getting the transaction indexed and taking a receipt can take a long time especially when
// a lot of transactions are being submitted to the node. Thus, while initially we only
// allowed for 60 seconds of waiting with a 1 second delay in polling, we need to allow for
// a larger wait time. Therefore, in here we allow for 5 minutes of waiting with exponential
// backoff each time we attempt to get the receipt and find that it's not available.
let provider = Arc::new(provider);
poll(
Self::RECEIPT_POLLING_DURATION,
PollingWaitBehavior::Constant(Duration::from_millis(200)),
move || {
let provider = provider.clone();
async move {
match provider.get_transaction_receipt(transaction_hash).await {
Ok(Some(receipt)) => Ok(ControlFlow::Break(receipt)),
Ok(None) => Ok(ControlFlow::Continue(())),
Err(error) => {
let error_string = error.to_string();
match error_string.contains(Self::TRANSACTION_INDEXING_ERROR) {
true => Ok(ControlFlow::Continue(())),
false => Err(error.into()),
// The following is a fix for the "transaction indexing is in progress" error that we used
// to get. You can find more information on this in the following GH issue in geth
// https://github.com/ethereum/go-ethereum/issues/28877. To summarize what's going on,
// before we can get the receipt of the transaction it needs to have been indexed by the
// node's indexer. Just because the transaction has been confirmed it doesn't mean that it
// has been indexed. When we call alloy's `get_receipt` it checks if the transaction was
// confirmed. If it has been, then it will call `eth_getTransactionReceipt` method which
// _might_ return the above error if the tx has not yet been indexed yet. So, we need to
// implement a retry mechanism for the receipt to keep retrying to get it until it
// eventually works, but we only do that if the error we get back is the "transaction
// indexing is in progress" error or if the receipt is None.
//
// Getting the transaction indexed and taking a receipt can take a long time especially when
// a lot of transactions are being submitted to the node. Thus, while initially we only
// allowed for 60 seconds of waiting with a 1 second delay in polling, we need to allow for
// a larger wait time. Therefore, in here we allow for 5 minutes of waiting with exponential
// backoff each time we attempt to get the receipt and find that it's not available.
let provider = Arc::new(provider);
poll(
Self::RECEIPT_POLLING_DURATION,
PollingWaitBehavior::Constant(Duration::from_millis(200)),
move || {
let provider = provider.clone();
async move {
match provider.get_transaction_receipt(transaction_hash).await {
Ok(Some(receipt)) => Ok(ControlFlow::Break(receipt)),
Ok(None) => Ok(ControlFlow::Continue(())),
Err(error) => {
let error_string = error.to_string();
match error_string.contains(Self::TRANSACTION_INDEXING_ERROR) {
true => Ok(ControlFlow::Continue(())),
false => Err(error.into()),
}
}
}
}
}
},
)
.instrument(tracing::info_span!(
"Awaiting transaction receipt",
?transaction_hash
))
.await
},
)
.instrument(tracing::info_span!(
"Awaiting transaction receipt",
?transaction_hash
))
.await
})
}
#[instrument(level = "info", skip_all, fields(geth_node_id = self.id))]
async fn trace_transaction(
fn trace_transaction(
&self,
transaction: &TransactionReceipt,
tx_hash: TxHash,
trace_options: GethDebugTracingOptions,
) -> anyhow::Result<alloy::rpc::types::trace::geth::GethTrace> {
let provider = Arc::new(
) -> Pin<Box<dyn Future<Output = anyhow::Result<alloy::rpc::types::trace::geth::GethTrace>> + '_>>
{
Box::pin(async move {
let provider = Arc::new(
self.provider()
.await
.context("Failed to create provider for tracing")?,
);
poll(
Self::TRACE_POLLING_DURATION,
PollingWaitBehavior::Constant(Duration::from_millis(200)),
move || {
let provider = provider.clone();
let trace_options = trace_options.clone();
async move {
match provider
.debug_trace_transaction(tx_hash, trace_options)
.await
{
Ok(trace) => Ok(ControlFlow::Break(trace)),
Err(error) => {
let error_string = error.to_string();
match error_string.contains(Self::TRANSACTION_TRACING_ERROR) {
true => Ok(ControlFlow::Continue(())),
false => Err(error.into()),
}
}
}
}
},
)
.await
})
}
#[instrument(level = "info", skip_all, fields(geth_node_id = self.id))]
fn state_diff(
&self,
tx_hash: TxHash,
) -> Pin<Box<dyn Future<Output = anyhow::Result<DiffMode>> + '_>> {
Box::pin(async move {
let trace_options = GethDebugTracingOptions::prestate_tracer(PreStateConfig {
diff_mode: Some(true),
disable_code: None,
disable_storage: None,
});
match self
.trace_transaction(tx_hash, trace_options)
.await
.context("Failed to trace transaction for prestate diff")?
.try_into_pre_state_frame()
.context("Failed to convert trace into pre-state frame")?
{
PreStateFrame::Diff(diff) => Ok(diff),
_ => anyhow::bail!("expected a diff mode trace"),
}
})
}
#[instrument(level = "info", skip_all, fields(geth_node_id = self.id))]
fn balance_of(
&self,
address: Address,
) -> Pin<Box<dyn Future<Output = anyhow::Result<U256>> + '_>> {
Box::pin(async move {
self.provider()
.await
.context("Failed to create provider for tracing")?,
);
poll(
Self::TRACE_POLLING_DURATION,
PollingWaitBehavior::Constant(Duration::from_millis(200)),
move || {
let provider = provider.clone();
let trace_options = trace_options.clone();
async move {
match provider
.debug_trace_transaction(transaction.transaction_hash, trace_options)
.await
{
Ok(trace) => Ok(ControlFlow::Break(trace)),
Err(error) => {
let error_string = error.to_string();
match error_string.contains(Self::TRANSACTION_TRACING_ERROR) {
true => Ok(ControlFlow::Continue(())),
false => Err(error.into()),
}
}
}
}
},
)
.await
.context("Failed to get the Geth provider")?
.get_balance(address)
.await
.map_err(Into::into)
})
}
#[instrument(level = "info", skip_all, fields(geth_node_id = self.id))]
async fn state_diff(&self, transaction: &TransactionReceipt) -> anyhow::Result<DiffMode> {
let trace_options = GethDebugTracingOptions::prestate_tracer(PreStateConfig {
diff_mode: Some(true),
disable_code: None,
disable_storage: None,
});
match self
.trace_transaction(transaction, trace_options)
.await
.context("Failed to trace transaction for prestate diff")?
.try_into_pre_state_frame()
.context("Failed to convert trace into pre-state frame")?
{
PreStateFrame::Diff(diff) => Ok(diff),
_ => anyhow::bail!("expected a diff mode trace"),
}
}
#[instrument(level = "info", skip_all, fields(geth_node_id = self.id))]
async fn balance_of(&self, address: Address) -> anyhow::Result<U256> {
self.provider()
.await
.context("Failed to get the Geth provider")?
.get_balance(address)
.await
.map_err(Into::into)
}
#[instrument(level = "info", skip_all, fields(geth_node_id = self.id))]
async fn latest_state_proof(
fn latest_state_proof(
&self,
address: Address,
keys: Vec<StorageKey>,
) -> anyhow::Result<EIP1186AccountProofResponse> {
self.provider()
.await
.context("Failed to get the Geth provider")?
.get_proof(address, keys)
.latest()
.await
.map_err(Into::into)
) -> Pin<Box<dyn Future<Output = anyhow::Result<EIP1186AccountProofResponse>> + '_>> {
Box::pin(async move {
self.provider()
.await
.context("Failed to get the Geth provider")?
.get_proof(address, keys)
.latest()
.await
.map_err(Into::into)
})
}
#[instrument(level = "info", skip_all, fields(geth_node_id = self.id))]
fn resolver(&self) -> impl Future<Output = anyhow::Result<Box<dyn ResolverApi + '_>>> {
// #[instrument(level = "info", skip_all, fields(geth_node_id = self.id))]
fn resolver(
&self,
) -> Pin<Box<dyn Future<Output = anyhow::Result<Box<dyn ResolverApi + '_>>> + '_>> {
Box::pin(async move {
let id = self.id;
let provider = self.provider().await?;
+72 -54
View File
@@ -412,77 +412,95 @@ impl KitchensinkNode {
}
impl EthereumNode for KitchensinkNode {
async fn execute_transaction(
fn execute_transaction(
&self,
transaction: alloy::rpc::types::TransactionRequest,
) -> anyhow::Result<TransactionReceipt> {
let receipt = self
.provider()
.await
.context("Failed to create provider for transaction submission")?
.send_transaction(transaction)
.await
.context("Failed to submit transaction to kitchensink proxy")?
.get_receipt()
.await
.context("Failed to fetch transaction receipt from kitchensink proxy")?;
Ok(receipt)
) -> Pin<Box<dyn Future<Output = anyhow::Result<TransactionReceipt>> + '_>> {
Box::pin(async move {
let receipt = self
.provider()
.await
.context("Failed to create provider for transaction submission")?
.send_transaction(transaction)
.await
.context("Failed to submit transaction to kitchensink proxy")?
.get_receipt()
.await
.context("Failed to fetch transaction receipt from kitchensink proxy")?;
Ok(receipt)
})
}
async fn trace_transaction(
fn trace_transaction(
&self,
transaction: &TransactionReceipt,
tx_hash: TxHash,
trace_options: GethDebugTracingOptions,
) -> anyhow::Result<alloy::rpc::types::trace::geth::GethTrace> {
let tx_hash = transaction.transaction_hash;
self.provider()
.await
.context("Failed to create provider for debug tracing")?
.debug_trace_transaction(tx_hash, trace_options)
.await
.context("Failed to obtain debug trace from kitchensink proxy")
) -> Pin<Box<dyn Future<Output = anyhow::Result<alloy::rpc::types::trace::geth::GethTrace>> + '_>>
{
Box::pin(async move {
self.provider()
.await
.context("Failed to create provider for debug tracing")?
.debug_trace_transaction(tx_hash, trace_options)
.await
.context("Failed to obtain debug trace from kitchensink proxy")
})
}
async fn state_diff(&self, transaction: &TransactionReceipt) -> anyhow::Result<DiffMode> {
let trace_options = GethDebugTracingOptions::prestate_tracer(PreStateConfig {
diff_mode: Some(true),
disable_code: None,
disable_storage: None,
});
match self
.trace_transaction(transaction, trace_options)
.await?
.try_into_pre_state_frame()?
{
PreStateFrame::Diff(diff) => Ok(diff),
_ => anyhow::bail!("expected a diff mode trace"),
}
fn state_diff(
&self,
tx_hash: TxHash,
) -> Pin<Box<dyn Future<Output = anyhow::Result<DiffMode>> + '_>> {
Box::pin(async move {
let trace_options = GethDebugTracingOptions::prestate_tracer(PreStateConfig {
diff_mode: Some(true),
disable_code: None,
disable_storage: None,
});
match self
.trace_transaction(tx_hash, trace_options)
.await?
.try_into_pre_state_frame()?
{
PreStateFrame::Diff(diff) => Ok(diff),
_ => anyhow::bail!("expected a diff mode trace"),
}
})
}
async fn balance_of(&self, address: Address) -> anyhow::Result<U256> {
self.provider()
.await
.context("Failed to get the Kitchensink provider")?
.get_balance(address)
.await
.map_err(Into::into)
fn balance_of(
&self,
address: Address,
) -> Pin<Box<dyn Future<Output = anyhow::Result<U256>> + '_>> {
Box::pin(async move {
self.provider()
.await
.context("Failed to get the Kitchensink provider")?
.get_balance(address)
.await
.map_err(Into::into)
})
}
async fn latest_state_proof(
fn latest_state_proof(
&self,
address: Address,
keys: Vec<StorageKey>,
) -> anyhow::Result<EIP1186AccountProofResponse> {
self.provider()
.await
.context("Failed to get the Kitchensink provider")?
.get_proof(address, keys)
.latest()
.await
.map_err(Into::into)
) -> Pin<Box<dyn Future<Output = anyhow::Result<EIP1186AccountProofResponse>> + '_>> {
Box::pin(async move {
self.provider()
.await
.context("Failed to get the Kitchensink provider")?
.get_proof(address, keys)
.latest()
.await
.map_err(Into::into)
})
}
fn resolver(&self) -> impl Future<Output = anyhow::Result<Box<dyn ResolverApi + '_>>> {
fn resolver(
&self,
) -> Pin<Box<dyn Future<Output = anyhow::Result<Box<dyn ResolverApi + '_>>> + '_>> {
Box::pin(async move {
let id = self.id;
let provider = self.provider().await?;