Remove outdated polling function

This commit is contained in:
Omar Abdulla
2026-01-13 22:23:14 +03:00
parent 0ad80b981f
commit c60b6af50a
5 changed files with 13 additions and 150 deletions
-3
View File
@@ -1,3 +0,0 @@
mod poll;
pub use poll::*;
-72
View File
@@ -1,72 +0,0 @@
use std::ops::ControlFlow;
use std::time::Duration;
use anyhow::{Context as _, Result, anyhow};
const EXPONENTIAL_BACKOFF_MAX_WAIT_DURATION: Duration = Duration::from_secs(60);
/// A function that polls for a fallible future for some period of time and errors if it fails to
/// get a result after polling.
///
/// Given a future that returns a [`Result<ControlFlow<O, ()>>`], this function calls the future
/// repeatedly (with some wait period) until the future returns a [`ControlFlow::Break`] or until it
/// returns an [`Err`] in which case the function stops polling and returns the error.
///
/// If the future keeps returning [`ControlFlow::Continue`] and fails to return a [`Break`] within
/// the permitted polling duration then this function returns an [`Err`]
///
/// [`Break`]: ControlFlow::Break
/// [`Continue`]: ControlFlow::Continue
pub async fn poll<F, O>(
polling_duration: Duration,
polling_wait_behavior: PollingWaitBehavior,
mut future: impl FnMut() -> F,
) -> Result<O>
where
F: Future<Output = Result<ControlFlow<O, ()>>>,
{
let mut retries = 0;
let mut total_wait_duration = Duration::ZERO;
let max_allowed_wait_duration = polling_duration;
loop {
if total_wait_duration >= max_allowed_wait_duration {
break Err(anyhow!(
"Polling failed after {} retries and a total of {:?} of wait time",
retries,
total_wait_duration
));
}
match future()
.await
.context("Polled future returned an error during polling loop")?
{
ControlFlow::Continue(()) => {
let next_wait_duration = match polling_wait_behavior {
PollingWaitBehavior::Constant(duration) => duration,
PollingWaitBehavior::ExponentialBackoff => {
Duration::from_secs(2u64.pow(retries))
.min(EXPONENTIAL_BACKOFF_MAX_WAIT_DURATION)
}
};
let next_wait_duration =
next_wait_duration.min(max_allowed_wait_duration - total_wait_duration);
total_wait_duration += next_wait_duration;
retries += 1;
tokio::time::sleep(next_wait_duration).await;
}
ControlFlow::Break(output) => {
break Ok(output);
}
}
}
}
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
pub enum PollingWaitBehavior {
Constant(Duration),
#[default]
ExponentialBackoff,
}
-1
View File
@@ -3,7 +3,6 @@
pub mod cached_fs;
pub mod fs;
pub mod futures;
pub mod iterators;
pub mod macros;
pub mod types;
+6 -36
View File
@@ -3,7 +3,6 @@
use std::{
fs::{File, create_dir_all, remove_dir_all},
io::Read,
ops::ControlFlow,
path::PathBuf,
pin::Pin,
process::{Command, Stdio},
@@ -37,10 +36,7 @@ use revive_common::EVMVersion;
use tokio::sync::OnceCell;
use tracing::{error, instrument};
use revive_dt_common::{
fs::clear_directory,
futures::{PollingWaitBehavior, poll},
};
use revive_dt_common::fs::clear_directory;
use revive_dt_config::*;
use revive_dt_format::traits::ResolverApi;
use revive_dt_node_interaction::EthereumNode;
@@ -90,10 +86,6 @@ impl GethNode {
const READY_MARKER: &str = "IPC endpoint opened";
const ERROR_MARKER: &str = "Fatal:";
const TRANSACTION_TRACING_ERROR: &str = "historical state not available in path scheme yet";
const TRACE_POLLING_DURATION: Duration = Duration::from_secs(60);
pub fn new(
context: impl AsRef<WorkingDirectoryConfiguration>
+ AsRef<WalletConfiguration>
@@ -358,34 +350,12 @@ impl EthereumNode for GethNode {
trace_options: GethDebugTracingOptions,
) -> Pin<Box<dyn Future<Output = anyhow::Result<GethTrace>> + '_>> {
Box::pin(async move {
let provider = self
.provider()
self.provider()
.await
.context("Failed to create provider for tracing")?;
poll(
Self::TRACE_POLLING_DURATION,
PollingWaitBehavior::Constant(Duration::from_millis(200)),
move || {
let provider = provider.clone();
let trace_options = trace_options.clone();
async move {
match provider
.debug_trace_transaction(tx_hash, trace_options)
.await
{
Ok(trace) => Ok(ControlFlow::Break(trace)),
Err(error) => {
let error_string = error.to_string();
match error_string.contains(Self::TRANSACTION_TRACING_ERROR) {
true => Ok(ControlFlow::Continue(())),
false => Err(error.into()),
}
}
}
}
},
)
.await
.context("Failed to create provider for tracing")?
.debug_trace_transaction(tx_hash, trace_options)
.await
.context("Failed to get the transaction trace")
})
}
@@ -12,7 +12,6 @@ use std::{
collections::{BTreeMap, HashSet},
fs::{File, create_dir_all},
io::Read,
ops::ControlFlow,
path::PathBuf,
pin::Pin,
process::{Command, Stdio},
@@ -50,10 +49,7 @@ use serde_with::serde_as;
use tokio::sync::OnceCell;
use tracing::{info, instrument};
use revive_dt_common::{
fs::clear_directory,
futures::{PollingWaitBehavior, poll},
};
use revive_dt_common::fs::clear_directory;
use revive_dt_config::*;
use revive_dt_format::traits::ResolverApi;
use revive_dt_node_interaction::EthereumNode;
@@ -116,10 +112,6 @@ impl LighthouseGethNode {
const CONFIG_FILE_NAME: &str = "config.yaml";
const TRANSACTION_TRACING_ERROR: &str = "historical state not available in path scheme yet";
const TRACE_POLLING_DURATION: Duration = Duration::from_secs(60);
const VALIDATOR_MNEMONIC: &str = "giant issue aisle success illegal bike spike question tent bar rely arctic volcano long crawl hungry vocal artwork sniff fantasy very lucky have athlete";
pub fn new(
@@ -576,35 +568,12 @@ impl EthereumNode for LighthouseGethNode {
trace_options: GethDebugTracingOptions,
) -> Pin<Box<dyn Future<Output = anyhow::Result<GethTrace>> + '_>> {
Box::pin(async move {
let provider = Arc::new(
self.http_provider()
.await
.context("Failed to create provider for tracing")?,
);
poll(
Self::TRACE_POLLING_DURATION,
PollingWaitBehavior::Constant(Duration::from_millis(200)),
move || {
let provider = provider.clone();
let trace_options = trace_options.clone();
async move {
match provider
.debug_trace_transaction(tx_hash, trace_options)
.await
{
Ok(trace) => Ok(ControlFlow::Break(trace)),
Err(error) => {
let error_string = error.to_string();
match error_string.contains(Self::TRANSACTION_TRACING_ERROR) {
true => Ok(ControlFlow::Continue(())),
false => Err(error.into()),
}
}
}
}
},
)
.await
self.provider()
.await
.context("Failed to create provider for tracing")?
.debug_trace_transaction(tx_hash, trace_options)
.await
.context("Failed to get the transaction trace")
})
}