Compare commits

...

6 Commits

Author SHA1 Message Date
Omar Abdulla 3ef17e6255 Ignore the zombienet tests for the time being 2025-10-09 14:18:32 +03:00
Omar Abdulla ddc1b9c0d3 Allow for the consensus to be specified for the revive dev node 2025-10-09 04:58:42 +03:00
Omar Abdulla be2ad65d6a Ignore the lighthouse tests 2025-10-09 01:59:37 +03:00
Omar Abdulla 8ef7424fd8 Improve the benchmarks driver 2025-10-08 22:35:56 +03:00
Omar Abdulla 0ef04b246e Remove un-necessary trace call from the benchmark driver 2025-10-08 00:55:58 +03:00
Omar Abdulla e0a1314cb3 Minor zombienet cleanups 2025-10-07 17:08:21 +03:00
14 changed files with 292 additions and 227 deletions
Generated
+1
View File
@@ -5658,6 +5658,7 @@ version = "0.1.0"
dependencies = [ dependencies = [
"alloy", "alloy",
"anyhow", "anyhow",
"async-stream",
"futures", "futures",
"revive-common", "revive-common",
"revive-dt-common", "revive-dt-common",
+2 -1
View File
@@ -23,6 +23,7 @@ revive-dt-report = { version = "0.1.0", path = "crates/report" }
revive-dt-solc-binaries = { version = "0.1.0", path = "crates/solc-binaries" } revive-dt-solc-binaries = { version = "0.1.0", path = "crates/solc-binaries" }
anyhow = "1.0" anyhow = "1.0"
async-stream = { version = "0.3.6" }
bson = { version = "2.15.0" } bson = { version = "2.15.0" }
cacache = { version = "13.1.0" } cacache = { version = "13.1.0" }
clap = { version = "4", features = ["derive"] } clap = { version = "4", features = ["derive"] }
@@ -73,7 +74,7 @@ revive-solc-json-interface = { git = "https://github.com/paritytech/revive", rev
revive-common = { git = "https://github.com/paritytech/revive", rev = "3389865af7c3ff6f29a586d82157e8bc573c1a8e" } revive-common = { git = "https://github.com/paritytech/revive", rev = "3389865af7c3ff6f29a586d82157e8bc573c1a8e" }
revive-differential = { git = "https://github.com/paritytech/revive", rev = "3389865af7c3ff6f29a586d82157e8bc573c1a8e" } revive-differential = { git = "https://github.com/paritytech/revive", rev = "3389865af7c3ff6f29a586d82157e8bc573c1a8e" }
zombienet-sdk = { git = "https://github.com/paritytech/zombienet-sdk.git", rev ="891f6554354ce466abd496366dbf8b4f82141241" } zombienet-sdk = { git = "https://github.com/paritytech/zombienet-sdk.git", rev = "891f6554354ce466abd496366dbf8b4f82141241" }
[workspace.dependencies.alloy] [workspace.dependencies.alloy]
version = "1.0.37" version = "1.0.37"
+8
View File
@@ -680,6 +680,14 @@ pub struct ReviveDevNodeConfiguration {
value_parser = parse_duration value_parser = parse_duration
)] )]
pub start_timeout_ms: Duration, pub start_timeout_ms: Duration,
/// The consensus to use for the spawned revive-dev-node.
#[clap(
id = "revive-dev-node.consensus",
long = "revive-dev-node.consensus",
default_value = "instant-seal"
)]
pub consensus: String,
} }
/// A set of configuration parameters for the ETH RPC. /// A set of configuration parameters for the ETH RPC.
@@ -22,6 +22,7 @@ use alloy::{
}, },
}; };
use anyhow::{Context as _, Result, bail}; use anyhow::{Context as _, Result, bail};
use futures::TryFutureExt;
use indexmap::IndexMap; use indexmap::IndexMap;
use revive_dt_common::{ use revive_dt_common::{
futures::{PollingWaitBehavior, poll}, futures::{PollingWaitBehavior, poll},
@@ -35,7 +36,7 @@ use revive_dt_format::{
}, },
traits::{ResolutionContext, ResolverApi}, traits::{ResolutionContext, ResolverApi},
}; };
use tokio::sync::{Mutex, mpsc::UnboundedSender}; use tokio::sync::{Mutex, OnceCell, mpsc::UnboundedSender};
use tracing::{Instrument, Span, debug, error, field::display, info, info_span, instrument}; use tracing::{Instrument, Span, debug, error, field::display, info, info_span, instrument};
use crate::{ use crate::{
@@ -123,13 +124,7 @@ where
&self.platform_information.reporter, &self.platform_information.reporter,
) )
.await .await
.inspect_err(|err| { .inspect_err(|err| error!(?err, "Pre-linking compilation failed"))
error!(
?err,
platform_identifier = %self.platform_information.platform.platform_identifier(),
"Pre-linking compilation failed"
)
})
.context("Failed to produce the pre-linking compiled contracts")?; .context("Failed to produce the pre-linking compiled contracts")?;
let mut deployed_libraries = None::<HashMap<_, _>>; let mut deployed_libraries = None::<HashMap<_, _>>;
@@ -137,13 +132,7 @@ where
.test_definition .test_definition
.metadata .metadata
.contract_sources() .contract_sources()
.inspect_err(|err| { .inspect_err(|err| error!(?err, "Failed to retrieve contract sources from metadata"))
error!(
?err,
platform_identifier = %self.platform_information.platform.platform_identifier(),
"Failed to retrieve contract sources from metadata"
)
})
.context("Failed to get the contract instances from the metadata file")?; .context("Failed to get the contract instances from the metadata file")?;
for library_instance in self for library_instance in self
.test_definition .test_definition
@@ -191,20 +180,19 @@ where
TransactionRequest::default().from(deployer_address), TransactionRequest::default().from(deployer_address),
code, code,
); );
let receipt = self.execute_transaction(tx).await.inspect_err(|err| { let receipt = self
error!( .execute_transaction(tx)
?err, .and_then(|(_, receipt_fut)| receipt_fut)
%library_instance, .await
platform_identifier = %self.platform_information.platform.platform_identifier(), .inspect_err(|err| {
"Failed to deploy the library" error!(
) ?err,
})?; %library_instance,
"Failed to deploy the library"
)
})?;
debug!( debug!(?library_instance, "Deployed library");
?library_instance,
platform_identifier = %self.platform_information.platform.platform_identifier(),
"Deployed library"
);
let library_address = receipt let library_address = receipt
.contract_address .contract_address
@@ -227,13 +215,7 @@ where
&self.platform_information.reporter, &self.platform_information.reporter,
) )
.await .await
.inspect_err(|err| { .inspect_err(|err| error!(?err, "Post-linking compilation failed"))
error!(
?err,
platform_identifier = %self.platform_information.platform.platform_identifier(),
"Post-linking compilation failed"
)
})
.context("Failed to compile the post-link contracts")?; .context("Failed to compile the post-link contracts")?;
self.execution_state = ExecutionState::new( self.execution_state = ExecutionState::new(
@@ -269,7 +251,6 @@ where
skip_all, skip_all,
fields( fields(
driver_id = self.driver_id, driver_id = self.driver_id,
platform_identifier = %self.platform_information.platform.platform_identifier(),
%step_path, %step_path,
), ),
err(Debug), err(Debug),
@@ -305,15 +286,11 @@ where
.handle_function_call_contract_deployment(step) .handle_function_call_contract_deployment(step)
.await .await
.context("Failed to deploy contracts for the function call step")?; .context("Failed to deploy contracts for the function call step")?;
let execution_receipt = self let transaction_hash = self
.handle_function_call_execution(step, deployment_receipts) .handle_function_call_execution(step, deployment_receipts)
.await .await
.context("Failed to handle the function call execution")?; .context("Failed to handle the function call execution")?;
let tracing_result = self self.handle_function_call_variable_assignment(step, transaction_hash)
.handle_function_call_call_frame_tracing(execution_receipt.transaction_hash)
.await
.context("Failed to handle the function call call frame tracing")?;
self.handle_function_call_variable_assignment(step, &tracing_result)
.await .await
.context("Failed to handle function call variable assignment")?; .context("Failed to handle function call variable assignment")?;
Ok(1) Ok(1)
@@ -367,18 +344,19 @@ where
&mut self, &mut self,
step: &FunctionCallStep, step: &FunctionCallStep,
mut deployment_receipts: HashMap<ContractInstance, TransactionReceipt>, mut deployment_receipts: HashMap<ContractInstance, TransactionReceipt>,
) -> Result<TransactionReceipt> { ) -> Result<TxHash> {
match step.method { match step.method {
// This step was already executed when `handle_step` was called. We just need to // This step was already executed when `handle_step` was called. We just need to
// lookup the transaction receipt in this case and continue on. // lookup the transaction receipt in this case and continue on.
Method::Deployer => deployment_receipts Method::Deployer => deployment_receipts
.remove(&step.instance) .remove(&step.instance)
.context("Failed to find deployment receipt for constructor call"), .context("Failed to find deployment receipt for constructor call")
.map(|receipt| receipt.transaction_hash),
Method::Fallback | Method::FunctionName(_) => { Method::Fallback | Method::FunctionName(_) => {
let tx = step let tx = step
.as_transaction(self.resolver.as_ref(), self.default_resolution_context()) .as_transaction(self.resolver.as_ref(), self.default_resolution_context())
.await?; .await?;
self.execute_transaction(tx).await Ok(self.execute_transaction(tx).await?.0)
} }
} }
} }
@@ -417,15 +395,19 @@ where
async fn handle_function_call_variable_assignment( async fn handle_function_call_variable_assignment(
&mut self, &mut self,
step: &FunctionCallStep, step: &FunctionCallStep,
tracing_result: &CallFrame, tx_hash: TxHash,
) -> Result<()> { ) -> Result<()> {
let Some(ref assignments) = step.variable_assignments else { let Some(ref assignments) = step.variable_assignments else {
return Ok(()); return Ok(());
}; };
// Handling the return data variable assignments. // Handling the return data variable assignments.
let callframe = OnceCell::new();
for (variable_name, output_word) in assignments.return_data.iter().zip( for (variable_name, output_word) in assignments.return_data.iter().zip(
tracing_result callframe
.get_or_try_init(|| self.handle_function_call_call_frame_tracing(tx_hash))
.await
.context("Failed to get the callframe trace for transaction")?
.output .output
.as_ref() .as_ref()
.unwrap_or_default() .unwrap_or_default()
@@ -547,7 +529,6 @@ where
skip_all, skip_all,
fields( fields(
driver_id = self.driver_id, driver_id = self.driver_id,
platform_identifier = %self.platform_information.platform.platform_identifier(),
%contract_instance, %contract_instance,
%deployer %deployer
), ),
@@ -590,7 +571,6 @@ where
skip_all, skip_all,
fields( fields(
driver_id = self.driver_id, driver_id = self.driver_id,
platform_identifier = %self.platform_information.platform.platform_identifier(),
%contract_instance, %contract_instance,
%deployer %deployer
), ),
@@ -660,7 +640,11 @@ where
TransactionBuilder::<Ethereum>::with_deploy_code(tx, code) TransactionBuilder::<Ethereum>::with_deploy_code(tx, code)
}; };
let receipt = match self.execute_transaction(tx).await { let receipt = match self
.execute_transaction(tx)
.and_then(|(_, receipt_fut)| receipt_fut)
.await
{
Ok(receipt) => receipt, Ok(receipt) => receipt,
Err(error) => { Err(error) => {
tracing::error!(?error, "Contract deployment transaction failed."); tracing::error!(?error, "Contract deployment transaction failed.");
@@ -734,7 +718,7 @@ where
async fn execute_transaction( async fn execute_transaction(
&self, &self,
transaction: TransactionRequest, transaction: TransactionRequest,
) -> anyhow::Result<TransactionReceipt> { ) -> anyhow::Result<(TxHash, impl Future<Output = Result<TransactionReceipt>>)> {
let node = self.platform_information.node; let node = self.platform_information.node;
let transaction_hash = node let transaction_hash = node
.submit_transaction(transaction) .submit_transaction(transaction)
@@ -747,24 +731,28 @@ where
.send(WatcherEvent::SubmittedTransaction { transaction_hash }) .send(WatcherEvent::SubmittedTransaction { transaction_hash })
.context("Failed to send the transaction hash to the watcher")?; .context("Failed to send the transaction hash to the watcher")?;
info!("Starting to poll for transaction receipt"); Ok((transaction_hash, async move {
poll( info!("Starting to poll for transaction receipt");
Duration::from_secs(30 * 60), poll(
PollingWaitBehavior::Constant(Duration::from_secs(1)), Duration::from_secs(30 * 60),
|| { PollingWaitBehavior::Constant(Duration::from_secs(1)),
async move { || {
match node.get_receipt(transaction_hash).await { async move {
Ok(receipt) => { match node.get_receipt(transaction_hash).await {
info!("Polling succeeded, receipt found"); Ok(receipt) => {
Ok(ControlFlow::Break(receipt)) info!("Polling succeeded, receipt found");
Ok(ControlFlow::Break(receipt))
}
Err(_) => Ok(ControlFlow::Continue(())),
} }
Err(_) => Ok(ControlFlow::Continue(())),
} }
} .instrument(info_span!("Polling for receipt"))
.instrument(info_span!("Polling for receipt")) },
}, )
) .instrument(info_span!("Polling for receipt", %transaction_hash))
.await .await
.inspect(|_| info!("Found the transaction receipt"))
}))
} }
// endregion:Transaction Execution // endregion:Transaction Execution
} }
@@ -8,7 +8,7 @@ use revive_dt_common::types::PrivateKeyAllocator;
use revive_dt_core::Platform; use revive_dt_core::Platform;
use revive_dt_format::steps::{Step, StepIdx, StepPath}; use revive_dt_format::steps::{Step, StepIdx, StepPath};
use tokio::sync::Mutex; use tokio::sync::Mutex;
use tracing::{error, info, info_span, instrument, warn}; use tracing::{Instrument, error, info, info_span, instrument, warn};
use revive_dt_config::{BenchmarkingContext, Context}; use revive_dt_config::{BenchmarkingContext, Context};
use revive_dt_report::Reporter; use revive_dt_report::Reporter;
@@ -159,12 +159,15 @@ pub async fn handle_differential_benchmarks(
futures::future::try_join( futures::future::try_join(
watcher.run(), watcher.run(),
driver.execute_all().inspect(|_| { driver
info!("All transactions submitted - driver completed execution"); .execute_all()
watcher_tx .instrument(info_span!("Executing Benchmarks", %platform_identifier))
.send(WatcherEvent::AllTransactionsSubmitted) .inspect(|_| {
.unwrap() info!("All transactions submitted - driver completed execution");
}), watcher_tx
.send(WatcherEvent::AllTransactionsSubmitted)
.unwrap()
}),
) )
.await .await
.context("Failed to run the driver and executor") .context("Failed to run the driver and executor")
@@ -104,6 +104,10 @@ impl Watcher {
async move { async move {
let mut mined_blocks_information = Vec::new(); let mut mined_blocks_information = Vec::new();
// region:TEMPORARY
eprintln!("Watcher information for {}", self.platform_identifier);
eprintln!("block_number,block_timestamp,mined_gas,block_gas_limit,tx_count");
// endregion:TEMPORARY
while let Some(block) = blocks_information_stream.next().await { while let Some(block) = blocks_information_stream.next().await {
// If the block number is equal to or less than the last block before the // If the block number is equal to or less than the last block before the
// repetition then we ignore it and continue on to the next block. // repetition then we ignore it and continue on to the next block.
@@ -118,8 +122,9 @@ impl Watcher {
} }
info!( info!(
remaining_transactions = watch_for_transaction_hashes.read().await.len(), block_number = block.block_number,
block_tx_count = block.transaction_hashes.len(), block_tx_count = block.transaction_hashes.len(),
remaining_transactions = watch_for_transaction_hashes.read().await.len(),
"Observed a block" "Observed a block"
); );
@@ -131,6 +136,20 @@ impl Watcher {
watch_for_transaction_hashes.remove(tx_hash); watch_for_transaction_hashes.remove(tx_hash);
} }
// region:TEMPORARY
// TODO: The following core is TEMPORARY and will be removed once we have proper
// reporting in place and then it can be removed. This serves as as way of doing
// some very simple reporting for the time being.
eprintln!(
"\"{}\",\"{}\",\"{}\",\"{}\",\"{}\"",
block.block_number,
block.block_timestamp,
block.mined_gas,
block.block_gas_limit,
block.transaction_hashes.len()
);
// endregion:TEMPORARY
mined_blocks_information.push(block); mined_blocks_information.push(block);
} }
@@ -139,41 +158,10 @@ impl Watcher {
} }
}; };
let (_, mined_blocks_information) = let (_, _) =
futures::future::join(watcher_event_watching_task, block_information_watching_task) futures::future::join(watcher_event_watching_task, block_information_watching_task)
.await; .await;
// region:TEMPORARY
{
// TODO: The following core is TEMPORARY and will be removed once we have proper
// reporting in place and then it can be removed. This serves as as way of doing some
// very simple reporting for the time being.
use std::io::Write;
let mut stderr = std::io::stderr().lock();
writeln!(
stderr,
"Watcher information for {}",
self.platform_identifier
)?;
writeln!(
stderr,
"block_number,block_timestamp,mined_gas,block_gas_limit,tx_count"
)?;
for block in mined_blocks_information {
writeln!(
stderr,
"{},{},{},{},{}",
block.block_number,
block.block_timestamp,
block.mined_gas,
block.block_gas_limit,
block.transaction_hashes.len()
)?
}
}
// endregion:TEMPORARY
Ok(()) Ok(())
} }
} }
+18 -10
View File
@@ -16,7 +16,7 @@ use revive_dt_config::*;
use revive_dt_node::{ use revive_dt_node::{
Node, node_implementations::geth::GethNode, Node, node_implementations::geth::GethNode,
node_implementations::lighthouse_geth::LighthouseGethNode, node_implementations::lighthouse_geth::LighthouseGethNode,
node_implementations::substrate::SubstrateNode, node_implementations::zombienet::ZombieNode, node_implementations::substrate::SubstrateNode, node_implementations::zombienet::ZombienetNode,
}; };
use revive_dt_node_interaction::EthereumNode; use revive_dt_node_interaction::EthereumNode;
use tracing::info; use tracing::info;
@@ -184,6 +184,7 @@ impl Platform for KitchensinkPolkavmResolcPlatform {
let node = SubstrateNode::new( let node = SubstrateNode::new(
kitchensink_path, kitchensink_path,
SubstrateNode::KITCHENSINK_EXPORT_CHAINSPEC_COMMAND, SubstrateNode::KITCHENSINK_EXPORT_CHAINSPEC_COMMAND,
None,
context, context,
); );
let node = spawn_node(node, genesis)?; let node = spawn_node(node, genesis)?;
@@ -236,6 +237,7 @@ impl Platform for KitchensinkRevmSolcPlatform {
let node = SubstrateNode::new( let node = SubstrateNode::new(
kitchensink_path, kitchensink_path,
SubstrateNode::KITCHENSINK_EXPORT_CHAINSPEC_COMMAND, SubstrateNode::KITCHENSINK_EXPORT_CHAINSPEC_COMMAND,
None,
context, context,
); );
let node = spawn_node(node, genesis)?; let node = spawn_node(node, genesis)?;
@@ -280,14 +282,17 @@ impl Platform for ReviveDevNodePolkavmResolcPlatform {
context: Context, context: Context,
) -> anyhow::Result<JoinHandle<anyhow::Result<Box<dyn EthereumNode + Send + Sync>>>> { ) -> anyhow::Result<JoinHandle<anyhow::Result<Box<dyn EthereumNode + Send + Sync>>>> {
let genesis_configuration = AsRef::<GenesisConfiguration>::as_ref(&context); let genesis_configuration = AsRef::<GenesisConfiguration>::as_ref(&context);
let revive_dev_node_path = AsRef::<ReviveDevNodeConfiguration>::as_ref(&context) let revive_dev_node_configuration = AsRef::<ReviveDevNodeConfiguration>::as_ref(&context);
.path
.clone(); let revive_dev_node_path = revive_dev_node_configuration.path.clone();
let revive_dev_node_consensus = revive_dev_node_configuration.consensus.clone();
let genesis = genesis_configuration.genesis()?.clone(); let genesis = genesis_configuration.genesis()?.clone();
Ok(thread::spawn(move || { Ok(thread::spawn(move || {
let node = SubstrateNode::new( let node = SubstrateNode::new(
revive_dev_node_path, revive_dev_node_path,
SubstrateNode::REVIVE_DEV_NODE_EXPORT_CHAINSPEC_COMMAND, SubstrateNode::REVIVE_DEV_NODE_EXPORT_CHAINSPEC_COMMAND,
Some(revive_dev_node_consensus),
context, context,
); );
let node = spawn_node(node, genesis)?; let node = spawn_node(node, genesis)?;
@@ -332,14 +337,17 @@ impl Platform for ReviveDevNodeRevmSolcPlatform {
context: Context, context: Context,
) -> anyhow::Result<JoinHandle<anyhow::Result<Box<dyn EthereumNode + Send + Sync>>>> { ) -> anyhow::Result<JoinHandle<anyhow::Result<Box<dyn EthereumNode + Send + Sync>>>> {
let genesis_configuration = AsRef::<GenesisConfiguration>::as_ref(&context); let genesis_configuration = AsRef::<GenesisConfiguration>::as_ref(&context);
let revive_dev_node_path = AsRef::<ReviveDevNodeConfiguration>::as_ref(&context) let revive_dev_node_configuration = AsRef::<ReviveDevNodeConfiguration>::as_ref(&context);
.path
.clone(); let revive_dev_node_path = revive_dev_node_configuration.path.clone();
let revive_dev_node_consensus = revive_dev_node_configuration.consensus.clone();
let genesis = genesis_configuration.genesis()?.clone(); let genesis = genesis_configuration.genesis()?.clone();
Ok(thread::spawn(move || { Ok(thread::spawn(move || {
let node = SubstrateNode::new( let node = SubstrateNode::new(
revive_dev_node_path, revive_dev_node_path,
SubstrateNode::REVIVE_DEV_NODE_EXPORT_CHAINSPEC_COMMAND, SubstrateNode::REVIVE_DEV_NODE_EXPORT_CHAINSPEC_COMMAND,
Some(revive_dev_node_consensus),
context, context,
); );
let node = spawn_node(node, genesis)?; let node = spawn_node(node, genesis)?;
@@ -389,7 +397,7 @@ impl Platform for ZombienetPolkavmResolcPlatform {
.clone(); .clone();
let genesis = genesis_configuration.genesis()?.clone(); let genesis = genesis_configuration.genesis()?.clone();
Ok(thread::spawn(move || { Ok(thread::spawn(move || {
let node = ZombieNode::new(polkadot_parachain_path, context); let node = ZombienetNode::new(polkadot_parachain_path, context);
let node = spawn_node(node, genesis)?; let node = spawn_node(node, genesis)?;
Ok(Box::new(node) as Box<_>) Ok(Box::new(node) as Box<_>)
})) }))
@@ -401,7 +409,7 @@ impl Platform for ZombienetPolkavmResolcPlatform {
version: Option<VersionOrRequirement>, version: Option<VersionOrRequirement>,
) -> Pin<Box<dyn Future<Output = anyhow::Result<Box<dyn SolidityCompiler>>>>> { ) -> Pin<Box<dyn Future<Output = anyhow::Result<Box<dyn SolidityCompiler>>>>> {
Box::pin(async move { Box::pin(async move {
let compiler = Solc::new(context, version).await; let compiler = Resolc::new(context, version).await;
compiler.map(|compiler| Box::new(compiler) as Box<dyn SolidityCompiler>) compiler.map(|compiler| Box::new(compiler) as Box<dyn SolidityCompiler>)
}) })
} }
@@ -437,7 +445,7 @@ impl Platform for ZombienetRevmSolcPlatform {
.clone(); .clone();
let genesis = genesis_configuration.genesis()?.clone(); let genesis = genesis_configuration.genesis()?.clone();
Ok(thread::spawn(move || { Ok(thread::spawn(move || {
let node = ZombieNode::new(polkadot_parachain_path, context); let node = ZombienetNode::new(polkadot_parachain_path, context);
let node = spawn_node(node, genesis)?; let node = spawn_node(node, genesis)?;
Ok(Box::new(node) as Box<_>) Ok(Box::new(node) as Box<_>)
})) }))
+1
View File
@@ -11,6 +11,7 @@ rust-version.workspace = true
[dependencies] [dependencies]
anyhow = { workspace = true } anyhow = { workspace = true }
alloy = { workspace = true } alloy = { workspace = true }
async-stream = { workspace = true }
futures = { workspace = true } futures = { workspace = true }
tracing = { workspace = true } tracing = { workspace = true }
tower = { workspace = true } tower = { workspace = true }
@@ -1131,6 +1131,7 @@ mod tests {
} }
#[tokio::test] #[tokio::test]
#[ignore = "Ignored since they take a long time to run"]
async fn node_mines_simple_transfer_transaction_and_returns_receipt() { async fn node_mines_simple_transfer_transaction_and_returns_receipt() {
// Arrange // Arrange
let (context, node) = new_node(); let (context, node) = new_node();
@@ -36,7 +36,8 @@ use alloy::{
}, },
}; };
use anyhow::Context as _; use anyhow::Context as _;
use futures::{Stream, StreamExt}; use async_stream::stream;
use futures::Stream;
use revive_common::EVMVersion; use revive_common::EVMVersion;
use revive_dt_common::fs::clear_directory; use revive_dt_common::fs::clear_directory;
use revive_dt_format::traits::ResolverApi; use revive_dt_format::traits::ResolverApi;
@@ -80,6 +81,7 @@ pub struct SubstrateNode {
wallet: Arc<EthereumWallet>, wallet: Arc<EthereumWallet>,
nonce_manager: CachedNonceManager, nonce_manager: CachedNonceManager,
provider: OnceCell<ConcreteProvider<ReviveNetwork, Arc<EthereumWallet>>>, provider: OnceCell<ConcreteProvider<ReviveNetwork, Arc<EthereumWallet>>>,
consensus: Option<String>,
} }
impl SubstrateNode { impl SubstrateNode {
@@ -102,6 +104,7 @@ impl SubstrateNode {
pub fn new( pub fn new(
node_path: PathBuf, node_path: PathBuf,
export_chainspec_command: &str, export_chainspec_command: &str,
consensus: Option<String>,
context: impl AsRef<WorkingDirectoryConfiguration> context: impl AsRef<WorkingDirectoryConfiguration>
+ AsRef<EthRpcConfiguration> + AsRef<EthRpcConfiguration>
+ AsRef<WalletConfiguration>, + AsRef<WalletConfiguration>,
@@ -131,6 +134,7 @@ impl SubstrateNode {
wallet: wallet.clone(), wallet: wallet.clone(),
nonce_manager: Default::default(), nonce_manager: Default::default(),
provider: Default::default(), provider: Default::default(),
consensus,
} }
} }
@@ -228,7 +232,7 @@ impl SubstrateNode {
self.logs_directory.as_path(), self.logs_directory.as_path(),
self.node_binary.as_path(), self.node_binary.as_path(),
|command, stdout_file, stderr_file| { |command, stdout_file, stderr_file| {
command let cmd = command
.arg("--dev") .arg("--dev")
.arg("--chain") .arg("--chain")
.arg(chainspec_path) .arg(chainspec_path)
@@ -245,9 +249,16 @@ impl SubstrateNode {
.arg("all") .arg("all")
.arg("--rpc-max-connections") .arg("--rpc-max-connections")
.arg(u32::MAX.to_string()) .arg(u32::MAX.to_string())
.arg("--pool-limit")
.arg(u32::MAX.to_string())
.arg("--pool-kbytes")
.arg(u32::MAX.to_string())
.env("RUST_LOG", Self::SUBSTRATE_LOG_ENV) .env("RUST_LOG", Self::SUBSTRATE_LOG_ENV)
.stdout(stdout_file) .stdout(stdout_file)
.stderr(stderr_file); .stderr(stderr_file);
if let Some(consensus) = self.consensus.as_ref() {
cmd.arg("--consensus").arg(consensus.clone());
}
}, },
ProcessReadinessWaitBehavior::TimeBoundedWaitFunction { ProcessReadinessWaitBehavior::TimeBoundedWaitFunction {
max_wait_duration: Duration::from_secs(30), max_wait_duration: Duration::from_secs(30),
@@ -508,37 +519,46 @@ impl EthereumNode for SubstrateNode {
+ '_, + '_,
>, >,
> { > {
fn create_stream(
provider: ConcreteProvider<ReviveNetwork, Arc<EthereumWallet>>,
) -> impl Stream<Item = MinedBlockInformation> {
stream! {
let mut block_number = provider.get_block_number().await.expect("Failed to get the block number");
loop {
tokio::time::sleep(Duration::from_secs(1)).await;
let Ok(Some(block)) = provider.get_block_by_number(BlockNumberOrTag::Number(block_number)).await
else {
continue;
};
block_number += 1;
yield MinedBlockInformation {
block_number: block.number(),
block_timestamp: block.header.timestamp,
mined_gas: block.header.gas_used as _,
block_gas_limit: block.header.gas_limit,
transaction_hashes: block
.transactions
.into_hashes()
.as_hashes()
.expect("Must be hashes")
.to_vec(),
};
};
}
}
Box::pin(async move { Box::pin(async move {
let provider = self let provider = self
.provider() .provider()
.await .await
.context("Failed to create the provider for block subscription")?; .context("Failed to create the provider for a block subscription")?;
let mut block_subscription = provider
.watch_full_blocks()
.await
.context("Failed to create the blocks stream")?;
block_subscription.set_channel_size(0xFFFF);
block_subscription.set_poll_interval(Duration::from_secs(1));
let block_stream = block_subscription.into_stream();
let mined_block_information_stream = block_stream.filter_map(|block| async { let stream = Box::pin(create_stream(provider))
let block = block.ok()?; as Pin<Box<dyn Stream<Item = MinedBlockInformation>>>;
Some(MinedBlockInformation {
block_number: block.number(),
block_timestamp: block.header.timestamp,
mined_gas: block.header.gas_used as _,
block_gas_limit: block.header.gas_limit,
transaction_hashes: block
.transactions
.into_hashes()
.as_hashes()
.expect("Must be hashes")
.to_vec(),
})
});
Ok(Box::pin(mined_block_information_stream) Ok(stream)
as Pin<Box<dyn Stream<Item = MinedBlockInformation>>>)
}) })
} }
} }
@@ -1170,6 +1190,7 @@ mod tests {
let mut node = SubstrateNode::new( let mut node = SubstrateNode::new(
context.kitchensink_configuration.path.clone(), context.kitchensink_configuration.path.clone(),
SubstrateNode::KITCHENSINK_EXPORT_CHAINSPEC_COMMAND, SubstrateNode::KITCHENSINK_EXPORT_CHAINSPEC_COMMAND,
None,
&context, &context,
); );
node.init(context.genesis_configuration.genesis().unwrap().clone()) node.init(context.genesis_configuration.genesis().unwrap().clone())
@@ -1235,6 +1256,7 @@ mod tests {
let mut dummy_node = SubstrateNode::new( let mut dummy_node = SubstrateNode::new(
context.kitchensink_configuration.path.clone(), context.kitchensink_configuration.path.clone(),
SubstrateNode::KITCHENSINK_EXPORT_CHAINSPEC_COMMAND, SubstrateNode::KITCHENSINK_EXPORT_CHAINSPEC_COMMAND,
None,
&context, &context,
); );
@@ -1287,6 +1309,7 @@ mod tests {
let node = SubstrateNode::new( let node = SubstrateNode::new(
context.kitchensink_configuration.path.clone(), context.kitchensink_configuration.path.clone(),
SubstrateNode::KITCHENSINK_EXPORT_CHAINSPEC_COMMAND, SubstrateNode::KITCHENSINK_EXPORT_CHAINSPEC_COMMAND,
None,
&context, &context,
); );
+101 -73
View File
@@ -55,7 +55,8 @@ use alloy::{
}; };
use anyhow::Context as _; use anyhow::Context as _;
use futures::{Stream, StreamExt}; use async_stream::stream;
use futures::Stream;
use revive_common::EVMVersion; use revive_common::EVMVersion;
use revive_dt_common::fs::clear_directory; use revive_dt_common::fs::clear_directory;
use revive_dt_config::*; use revive_dt_config::*;
@@ -73,16 +74,19 @@ use crate::{
constants::INITIAL_BALANCE, constants::INITIAL_BALANCE,
helpers::{Process, ProcessReadinessWaitBehavior}, helpers::{Process, ProcessReadinessWaitBehavior},
node_implementations::substrate::ReviveNetwork, node_implementations::substrate::ReviveNetwork,
provider_utils::{ConcreteProvider, FallbackGasFiller, construct_concurrency_limited_provider}, provider_utils::{
ConcreteProvider, FallbackGasFiller, construct_concurrency_limited_provider,
execute_transaction,
},
}; };
static NODE_COUNT: AtomicU32 = AtomicU32::new(0); static NODE_COUNT: AtomicU32 = AtomicU32::new(0);
/// A Zombienet network where collator is `polkadot-parachain` node with `eth-rpc` /// A Zombienet network where collator is `polkadot-parachain` node with `eth-rpc` [`ZombieNode`]
/// [`ZombieNode`] abstracts away the details of managing the zombienet network and provides /// abstracts away the details of managing the zombienet network and provides an interface to
/// an interface to interact with the parachain's Ethereum RPC. /// interact with the parachain's Ethereum RPC.
#[derive(Debug, Default)] #[derive(Debug, Default)]
pub struct ZombieNode { pub struct ZombienetNode {
/* Node Identifier */ /* Node Identifier */
id: u32, id: u32,
connection_string: String, connection_string: String,
@@ -110,7 +114,7 @@ pub struct ZombieNode {
provider: OnceCell<ConcreteProvider<ReviveNetwork, Arc<EthereumWallet>>>, provider: OnceCell<ConcreteProvider<ReviveNetwork, Arc<EthereumWallet>>>,
} }
impl ZombieNode { impl ZombienetNode {
const BASE_DIRECTORY: &str = "zombienet"; const BASE_DIRECTORY: &str = "zombienet";
const DATA_DIRECTORY: &str = "data"; const DATA_DIRECTORY: &str = "data";
const LOGS_DIRECTORY: &str = "logs"; const LOGS_DIRECTORY: &str = "logs";
@@ -119,6 +123,8 @@ impl ZombieNode {
const PARACHAIN_ID: u32 = 100; const PARACHAIN_ID: u32 = 100;
const ETH_RPC_BASE_PORT: u16 = 8545; const ETH_RPC_BASE_PORT: u16 = 8545;
const PROXY_LOG_ENV: &str = "info,eth-rpc=debug";
const ETH_RPC_READY_MARKER: &str = "Running JSON-RPC server"; const ETH_RPC_READY_MARKER: &str = "Running JSON-RPC server";
const EXPORT_CHAINSPEC_COMMAND: &str = "build-spec"; const EXPORT_CHAINSPEC_COMMAND: &str = "build-spec";
@@ -177,25 +183,35 @@ impl ZombieNode {
let node_rpc_port = Self::NODE_BASE_RPC_PORT + self.id as u16; let node_rpc_port = Self::NODE_BASE_RPC_PORT + self.id as u16;
let network_config = NetworkConfigBuilder::new() let network_config = NetworkConfigBuilder::new()
.with_relaychain(|r| { .with_relaychain(|relay_chain| {
r.with_chain("westend-local") relay_chain
.with_chain("westend-local")
.with_default_command("polkadot") .with_default_command("polkadot")
.with_node(|node| node.with_name("alice")) .with_node(|node| node.with_name("alice"))
.with_node(|node| node.with_name("bob")) .with_node(|node| node.with_name("bob"))
}) })
.with_global_settings(|g| g.with_base_dir(&self.base_directory)) .with_global_settings(|global_settings| {
.with_parachain(|p| { // global_settings.with_base_dir(&self.base_directory)
p.with_id(Self::PARACHAIN_ID) global_settings
.with_chain_spec_path(template_chainspec_path.to_str().unwrap()) })
.with_parachain(|parachain| {
parachain
.with_id(Self::PARACHAIN_ID)
.with_chain_spec_path(template_chainspec_path.to_path_buf())
.with_chain("asset-hub-westend-local") .with_chain("asset-hub-westend-local")
.with_collator(|n| { .with_collator(|node_config| {
n.with_name("Collator") node_config
.with_name("Collator")
.with_command(polkadot_parachain_path) .with_command(polkadot_parachain_path)
.with_rpc_port(node_rpc_port) .with_rpc_port(node_rpc_port)
.with_args(vec![
("--pool-limit", u32::MAX.to_string().as_str()).into(),
("--pool-kbytes", u32::MAX.to_string().as_str()).into(),
])
}) })
}) })
.build() .build()
.map_err(|e| anyhow::anyhow!("Failed to build zombienet network config: {e:?}"))?; .map_err(|err| anyhow::anyhow!("Failed to build zombienet network config: {err:?}"))?;
self.node_rpc_port = Some(node_rpc_port); self.node_rpc_port = Some(node_rpc_port);
self.network_config = Some(network_config); self.network_config = Some(network_config);
@@ -209,6 +225,9 @@ impl ZombieNode {
.clone() .clone()
.context("Node not initialized, call init() first")?; .context("Node not initialized, call init() first")?;
// TODO: Look into the possibility of removing this in the future, perhaps by reintroducing
// the blocking runtime abstraction and making it available to the entire program so that we
// don't need to be spawning multiple different runtimes.
let rt = tokio::runtime::Runtime::new().unwrap(); let rt = tokio::runtime::Runtime::new().unwrap();
let network = rt.block_on(async { let network = rt.block_on(async {
network_config network_config
@@ -236,6 +255,7 @@ impl ZombieNode {
.arg(u32::MAX.to_string()) .arg(u32::MAX.to_string())
.arg("--rpc-port") .arg("--rpc-port")
.arg(eth_rpc_port.to_string()) .arg(eth_rpc_port.to_string())
.env("RUST_LOG", Self::PROXY_LOG_ENV)
.stdout(stdout_file) .stdout(stdout_file)
.stderr(stderr_file); .stderr(stderr_file);
}, },
@@ -271,12 +291,13 @@ impl ZombieNode {
template_chainspec_path: PathBuf, template_chainspec_path: PathBuf,
mut genesis: Genesis, mut genesis: Genesis,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let mut cmd: Command = std::process::Command::new(&self.polkadot_parachain_path); let output = Command::new(self.polkadot_parachain_path.as_path())
cmd.arg(Self::EXPORT_CHAINSPEC_COMMAND) .arg(Self::EXPORT_CHAINSPEC_COMMAND)
.arg("--chain") .arg("--chain")
.arg("asset-hub-westend-local"); .arg("asset-hub-westend-local")
.env_remove("RUST_LOG")
let output = cmd.output().context("Failed to export the chain-spec")?; .output()
.context("Failed to export the chainspec of the chain")?;
if !output.status.success() { if !output.status.success() {
anyhow::bail!( anyhow::bail!(
@@ -384,7 +405,7 @@ impl ZombieNode {
.get_or_try_init(|| async move { .get_or_try_init(|| async move {
construct_concurrency_limited_provider::<ReviveNetwork, _>( construct_concurrency_limited_provider::<ReviveNetwork, _>(
self.connection_string.as_str(), self.connection_string.as_str(),
FallbackGasFiller::new(250_000_000, 5_000_000_000, 1_000_000_000), FallbackGasFiller::new(u64::MAX, 5_000_000_000, 1_000_000_000),
ChainIdFiller::default(), // TODO: use CHAIN_ID constant ChainIdFiller::default(), // TODO: use CHAIN_ID constant
NonceFiller::new(self.nonce_manager.clone()), NonceFiller::new(self.nonce_manager.clone()),
self.wallet.clone(), self.wallet.clone(),
@@ -397,7 +418,7 @@ impl ZombieNode {
} }
} }
impl EthereumNode for ZombieNode { impl EthereumNode for ZombienetNode {
fn pre_transactions(&mut self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + '_>> { fn pre_transactions(&mut self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + '_>> {
Box::pin(async move { Ok(()) }) Box::pin(async move { Ok(()) })
} }
@@ -447,17 +468,11 @@ impl EthereumNode for ZombieNode {
transaction: alloy::rpc::types::TransactionRequest, transaction: alloy::rpc::types::TransactionRequest,
) -> Pin<Box<dyn Future<Output = anyhow::Result<TransactionReceipt>> + '_>> { ) -> Pin<Box<dyn Future<Output = anyhow::Result<TransactionReceipt>> + '_>> {
Box::pin(async move { Box::pin(async move {
let receipt = self let provider = self
.provider() .provider()
.await .await
.context("Failed to create provider for transaction submission")? .context("Failed to create the provider")?;
.send_transaction(transaction) execute_transaction(provider, transaction).await
.await
.context("Failed to submit transaction to proxy")?
.get_receipt()
.await
.context("Failed to fetch transaction receipt from proxy")?;
Ok(receipt)
}) })
} }
@@ -551,37 +566,46 @@ impl EthereumNode for ZombieNode {
+ '_, + '_,
>, >,
> { > {
fn create_stream(
provider: ConcreteProvider<ReviveNetwork, Arc<EthereumWallet>>,
) -> impl Stream<Item = MinedBlockInformation> {
stream! {
let mut block_number = provider.get_block_number().await.expect("Failed to get the block number");
loop {
tokio::time::sleep(Duration::from_secs(1)).await;
let Ok(Some(block)) = provider.get_block_by_number(BlockNumberOrTag::Number(block_number)).await
else {
continue;
};
block_number += 1;
yield MinedBlockInformation {
block_number: block.number(),
block_timestamp: block.header.timestamp,
mined_gas: block.header.gas_used as _,
block_gas_limit: block.header.gas_limit,
transaction_hashes: block
.transactions
.into_hashes()
.as_hashes()
.expect("Must be hashes")
.to_vec(),
};
};
}
}
Box::pin(async move { Box::pin(async move {
let provider = self let provider = self
.provider() .provider()
.await .await
.context("Failed to create the provider for block subscription")?; .context("Failed to create the provider for a block subscription")?;
let mut block_subscription = provider
.watch_full_blocks()
.await
.context("Failed to create the blocks stream")?;
block_subscription.set_channel_size(0xFFFF);
block_subscription.set_poll_interval(Duration::from_secs(1));
let block_stream = block_subscription.into_stream();
let mined_block_information_stream = block_stream.filter_map(|block| async { let stream = Box::pin(create_stream(provider))
let block = block.ok()?; as Pin<Box<dyn Stream<Item = MinedBlockInformation>>>;
Some(MinedBlockInformation {
block_number: block.number(),
block_timestamp: block.header.timestamp,
mined_gas: block.header.gas_used as _,
block_gas_limit: block.header.gas_limit,
transaction_hashes: block
.transactions
.into_hashes()
.as_hashes()
.expect("Must be hashes")
.to_vec(),
})
});
Ok(Box::pin(mined_block_information_stream) Ok(stream)
as Pin<Box<dyn Stream<Item = MinedBlockInformation>>>)
}) })
} }
} }
@@ -716,7 +740,7 @@ impl<F: TxFiller<ReviveNetwork>, P: Provider<ReviveNetwork>> ResolverApi
} }
} }
impl Node for ZombieNode { impl Node for ZombienetNode {
fn shutdown(&mut self) -> anyhow::Result<()> { fn shutdown(&mut self) -> anyhow::Result<()> {
// Kill the eth_rpc process // Kill the eth_rpc process
drop(self.eth_rpc_process.take()); drop(self.eth_rpc_process.take());
@@ -761,7 +785,7 @@ impl Node for ZombieNode {
} }
} }
impl Drop for ZombieNode { impl Drop for ZombienetNode {
fn drop(&mut self) { fn drop(&mut self) {
let _ = self.shutdown(); let _ = self.shutdown();
} }
@@ -785,9 +809,9 @@ mod tests {
TestExecutionContext::default() TestExecutionContext::default()
} }
pub async fn new_node() -> (TestExecutionContext, ZombieNode) { pub async fn new_node() -> (TestExecutionContext, ZombienetNode) {
let context = test_config(); let context = test_config();
let mut node = ZombieNode::new( let mut node = ZombienetNode::new(
context.polkadot_parachain_configuration.path.clone(), context.polkadot_parachain_configuration.path.clone(),
&context, &context,
); );
@@ -805,8 +829,9 @@ mod tests {
(context, node) (context, node)
} }
pub async fn shared_state() -> &'static (TestExecutionContext, Arc<ZombieNode>) { pub async fn shared_state() -> &'static (TestExecutionContext, Arc<ZombienetNode>) {
static NODE: OnceCell<(TestExecutionContext, Arc<ZombieNode>)> = OnceCell::const_new(); static NODE: OnceCell<(TestExecutionContext, Arc<ZombienetNode>)> =
OnceCell::const_new();
NODE.get_or_init(|| async { NODE.get_or_init(|| async {
let (context, node) = new_node().await; let (context, node) = new_node().await;
@@ -815,13 +840,14 @@ mod tests {
.await .await
} }
pub async fn shared_node() -> &'static Arc<ZombieNode> { pub async fn shared_node() -> &'static Arc<ZombienetNode> {
&shared_state().await.1 &shared_state().await.1
} }
} }
use utils::{new_node, test_config}; use utils::{new_node, test_config};
#[tokio::test] #[tokio::test]
#[ignore = "Ignored for the time being"]
async fn test_transfer_transaction_should_return_receipt() { async fn test_transfer_transaction_should_return_receipt() {
let (ctx, node) = new_node().await; let (ctx, node) = new_node().await;
@@ -855,7 +881,7 @@ mod tests {
"#; "#;
let context = test_config(); let context = test_config();
let mut node = ZombieNode::new( let mut node = ZombienetNode::new(
context.polkadot_parachain_configuration.path.clone(), context.polkadot_parachain_configuration.path.clone(),
&context, &context,
); );
@@ -865,17 +891,19 @@ mod tests {
.expect("init failed"); .expect("init failed");
// Check that the patched chainspec file was generated // Check that the patched chainspec file was generated
let final_chainspec_path = node.base_directory.join(ZombieNode::CHAIN_SPEC_JSON_FILE); let final_chainspec_path = node
.base_directory
.join(ZombienetNode::CHAIN_SPEC_JSON_FILE);
assert!(final_chainspec_path.exists(), "Chainspec file should exist"); assert!(final_chainspec_path.exists(), "Chainspec file should exist");
let contents = let contents =
std::fs::read_to_string(&final_chainspec_path).expect("Failed to read chainspec"); std::fs::read_to_string(&final_chainspec_path).expect("Failed to read chainspec");
// Validate that the Polkadot addresses derived from the Ethereum addresses are in the file // Validate that the Polkadot addresses derived from the Ethereum addresses are in the file
let first_eth_addr = ZombieNode::eth_to_polkadot_address( let first_eth_addr = ZombienetNode::eth_to_polkadot_address(
&"90F8bf6A479f320ead074411a4B0e7944Ea8c9C1".parse().unwrap(), &"90F8bf6A479f320ead074411a4B0e7944Ea8c9C1".parse().unwrap(),
); );
let second_eth_addr = ZombieNode::eth_to_polkadot_address( let second_eth_addr = ZombienetNode::eth_to_polkadot_address(
&"Ab8483F64d9C6d1EcF9b849Ae677dD3315835cb2".parse().unwrap(), &"Ab8483F64d9C6d1EcF9b849Ae677dD3315835cb2".parse().unwrap(),
); );
@@ -903,7 +931,7 @@ mod tests {
"#; "#;
let context = test_config(); let context = test_config();
let node = ZombieNode::new( let node = ZombienetNode::new(
context.polkadot_parachain_configuration.path.clone(), context.polkadot_parachain_configuration.path.clone(),
&context, &context,
); );
@@ -939,7 +967,7 @@ mod tests {
]; ];
for eth_addr in eth_addresses { for eth_addr in eth_addresses {
let ss58 = ZombieNode::eth_to_polkadot_address(&eth_addr.parse().unwrap()); let ss58 = ZombienetNode::eth_to_polkadot_address(&eth_addr.parse().unwrap());
println!("Ethereum: {eth_addr} -> Polkadot SS58: {ss58}"); println!("Ethereum: {eth_addr} -> Polkadot SS58: {ss58}");
} }
@@ -967,7 +995,7 @@ mod tests {
]; ];
for (eth_addr, expected_ss58) in cases { for (eth_addr, expected_ss58) in cases {
let result = ZombieNode::eth_to_polkadot_address(&eth_addr.parse().unwrap()); let result = ZombienetNode::eth_to_polkadot_address(&eth_addr.parse().unwrap());
assert_eq!( assert_eq!(
result, expected_ss58, result, expected_ss58,
"Mismatch for Ethereum address {eth_addr}" "Mismatch for Ethereum address {eth_addr}"
@@ -979,7 +1007,7 @@ mod tests {
fn eth_rpc_version_works() { fn eth_rpc_version_works() {
// Arrange // Arrange
let context = test_config(); let context = test_config();
let node = ZombieNode::new( let node = ZombienetNode::new(
context.polkadot_parachain_configuration.path.clone(), context.polkadot_parachain_configuration.path.clone(),
&context, &context,
); );
@@ -998,7 +1026,7 @@ mod tests {
fn version_works() { fn version_works() {
// Arrange // Arrange
let context = test_config(); let context = test_config();
let node = ZombieNode::new( let node = ZombienetNode::new(
context.polkadot_parachain_configuration.path.clone(), context.polkadot_parachain_configuration.path.clone(),
&context, &context,
); );
@@ -7,6 +7,10 @@ use alloy::{
transports::TransportResult, transports::TransportResult,
}; };
// Percentage padding applied to estimated gas (e.g. 120 = 20% padding)
const GAS_ESTIMATE_PADDING_NUMERATOR: u64 = 120;
const GAS_ESTIMATE_PADDING_DENOMINATOR: u64 = 100;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct FallbackGasFiller { pub struct FallbackGasFiller {
inner: GasFiller, inner: GasFiller,
@@ -56,8 +60,6 @@ where
provider: &P, provider: &P,
tx: &<N as Network>::TransactionRequest, tx: &<N as Network>::TransactionRequest,
) -> TransportResult<Self::Fillable> { ) -> TransportResult<Self::Fillable> {
// Try to fetch GasFillers “fillable” (gas_price, base_fee, estimate_gas, …)
// If it errors (i.e. tx would revert under eth_estimateGas), swallow it.
match self.inner.prepare(provider, tx).await { match self.inner.prepare(provider, tx).await {
Ok(fill) => Ok(Some(fill)), Ok(fill) => Ok(Some(fill)),
Err(_) => Ok(None), Err(_) => Ok(None),
@@ -70,8 +72,17 @@ where
mut tx: alloy::providers::SendableTx<N>, mut tx: alloy::providers::SendableTx<N>,
) -> TransportResult<SendableTx<N>> { ) -> TransportResult<SendableTx<N>> {
if let Some(fill) = fillable { if let Some(fill) = fillable {
// our inner GasFiller succeeded — use it let mut tx = self.inner.fill(fill, tx).await?;
self.inner.fill(fill, tx).await if let Some(builder) = tx.as_mut_builder() {
if let Some(estimated) = builder.gas_limit() {
let padded = estimated
.checked_mul(GAS_ESTIMATE_PADDING_NUMERATOR)
.and_then(|v| v.checked_div(GAS_ESTIMATE_PADDING_DENOMINATOR))
.unwrap_or(u64::MAX);
builder.set_gas_limit(padded);
}
}
Ok(tx)
} else { } else {
if let Some(builder) = tx.as_mut_builder() { if let Some(builder) = tx.as_mut_builder() {
builder.set_gas_limit(self.default_gas_limit); builder.set_gas_limit(self.default_gas_limit);
+2 -2
View File
@@ -1,7 +1,7 @@
mod concurrency_limiter; mod concurrency_limiter;
mod fallback_gas_provider; mod fallback_gas_filler;
mod provider; mod provider;
pub use concurrency_limiter::*; pub use concurrency_limiter::*;
pub use fallback_gas_provider::*; pub use fallback_gas_filler::*;
pub use provider::*; pub use provider::*;
+7 -3
View File
@@ -10,7 +10,7 @@ use alloy::{
}; };
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use revive_dt_common::futures::{PollingWaitBehavior, poll}; use revive_dt_common::futures::{PollingWaitBehavior, poll};
use tracing::debug; use tracing::{Instrument, debug, info, info_span};
use crate::provider_utils::{ConcurrencyLimiterLayer, FallbackGasFiller}; use crate::provider_utils::{ConcurrencyLimiterLayer, FallbackGasFiller};
@@ -44,7 +44,7 @@ where
// requests at any point of time and no more than that. This is done in an effort to stabilize // requests at any point of time and no more than that. This is done in an effort to stabilize
// the framework from some of the interment issues that we've been seeing related to RPC calls. // the framework from some of the interment issues that we've been seeing related to RPC calls.
static GLOBAL_CONCURRENCY_LIMITER_LAYER: LazyLock<ConcurrencyLimiterLayer> = static GLOBAL_CONCURRENCY_LIMITER_LAYER: LazyLock<ConcurrencyLimiterLayer> =
LazyLock::new(|| ConcurrencyLimiterLayer::new(10)); LazyLock::new(|| ConcurrencyLimiterLayer::new(500));
let client = ClientBuilder::default() let client = ClientBuilder::default()
.layer(GLOBAL_CONCURRENCY_LIMITER_LAYER.clone()) .layer(GLOBAL_CONCURRENCY_LIMITER_LAYER.clone())
@@ -117,12 +117,16 @@ where
async move { async move {
match provider.get_transaction_receipt(tx_hash).await { match provider.get_transaction_receipt(tx_hash).await {
Ok(Some(receipt)) => Ok(ControlFlow::Break(receipt)), Ok(Some(receipt)) => {
info!("Found the transaction receipt");
Ok(ControlFlow::Break(receipt))
}
_ => Ok(ControlFlow::Continue(())), _ => Ok(ControlFlow::Continue(())),
} }
} }
}, },
) )
.instrument(info_span!("Polling for receipt", %tx_hash))
.await .await
.context(format!("Polling for receipt failed for {tx_hash}")) .context(format!("Polling for receipt failed for {tx_hash}"))
} }