Merge branch 'main' into refactor_zombinet_substrate

This commit is contained in:
Marios Christou
2025-10-09 17:36:43 +03:00
18 changed files with 456 additions and 288 deletions
@@ -27,7 +27,8 @@ use alloy::{
},
};
use anyhow::Context as _;
use futures::{Stream, StreamExt};
use async_stream::stream;
use futures::Stream;
use revive_common::EVMVersion;
use revive_dt_common::fs::clear_directory;
use revive_dt_config::*;
@@ -71,6 +72,7 @@ pub struct SubstrateNode {
wallet: Arc<EthereumWallet>,
nonce_manager: CachedNonceManager,
provider: OnceCell<ConcreteProvider<ReviveNetwork, Arc<EthereumWallet>>>,
consensus: Option<String>,
}
impl SubstrateNode {
@@ -92,6 +94,7 @@ impl SubstrateNode {
pub fn new(
node_path: PathBuf,
export_chainspec_command: &str,
consensus: Option<String>,
context: impl AsRef<WorkingDirectoryConfiguration>
+ AsRef<EthRpcConfiguration>
+ AsRef<WalletConfiguration>,
@@ -121,6 +124,7 @@ impl SubstrateNode {
wallet: wallet.clone(),
nonce_manager: Default::default(),
provider: Default::default(),
consensus,
}
}
@@ -162,7 +166,7 @@ impl SubstrateNode {
self.logs_directory.as_path(),
self.node_binary.as_path(),
|command, stdout_file, stderr_file| {
command
let cmd = command
.arg("--dev")
.arg("--chain")
.arg(chainspec_path)
@@ -179,9 +183,16 @@ impl SubstrateNode {
.arg("all")
.arg("--rpc-max-connections")
.arg(u32::MAX.to_string())
.arg("--pool-limit")
.arg(u32::MAX.to_string())
.arg("--pool-kbytes")
.arg(u32::MAX.to_string())
.env("RUST_LOG", Self::SUBSTRATE_LOG_ENV)
.stdout(stdout_file)
.stderr(stderr_file);
if let Some(consensus) = self.consensus.as_ref() {
cmd.arg("--consensus").arg(consensus.clone());
}
},
ProcessReadinessWaitBehavior::TimeBoundedWaitFunction {
max_wait_duration: Duration::from_secs(30),
@@ -392,37 +403,46 @@ impl EthereumNode for SubstrateNode {
+ '_,
>,
> {
fn create_stream(
provider: ConcreteProvider<ReviveNetwork, Arc<EthereumWallet>>,
) -> impl Stream<Item = MinedBlockInformation> {
stream! {
let mut block_number = provider.get_block_number().await.expect("Failed to get the block number");
loop {
tokio::time::sleep(Duration::from_secs(1)).await;
let Ok(Some(block)) = provider.get_block_by_number(BlockNumberOrTag::Number(block_number)).await
else {
continue;
};
block_number += 1;
yield MinedBlockInformation {
block_number: block.number(),
block_timestamp: block.header.timestamp,
mined_gas: block.header.gas_used as _,
block_gas_limit: block.header.gas_limit,
transaction_hashes: block
.transactions
.into_hashes()
.as_hashes()
.expect("Must be hashes")
.to_vec(),
};
};
}
}
Box::pin(async move {
let provider = self
.provider()
.await
.context("Failed to create the provider for block subscription")?;
let mut block_subscription = provider
.watch_full_blocks()
.await
.context("Failed to create the blocks stream")?;
block_subscription.set_channel_size(0xFFFF);
block_subscription.set_poll_interval(Duration::from_secs(1));
let block_stream = block_subscription.into_stream();
.context("Failed to create the provider for a block subscription")?;
let mined_block_information_stream = block_stream.filter_map(|block| async {
let block = block.ok()?;
Some(MinedBlockInformation {
block_number: block.number(),
block_timestamp: block.header.timestamp,
mined_gas: block.header.gas_used as _,
block_gas_limit: block.header.gas_limit,
transaction_hashes: block
.transactions
.into_hashes()
.as_hashes()
.expect("Must be hashes")
.to_vec(),
})
});
let stream = Box::pin(create_stream(provider))
as Pin<Box<dyn Stream<Item = MinedBlockInformation>>>;
Ok(Box::pin(mined_block_information_stream)
as Pin<Box<dyn Stream<Item = MinedBlockInformation>>>)
Ok(stream)
})
}
}
@@ -620,6 +640,7 @@ mod tests {
let mut node = SubstrateNode::new(
context.kitchensink_configuration.path.clone(),
SubstrateNode::KITCHENSINK_EXPORT_CHAINSPEC_COMMAND,
None,
&context,
);
node.init(context.genesis_configuration.genesis().unwrap().clone())
@@ -685,6 +706,7 @@ mod tests {
let mut dummy_node = SubstrateNode::new(
context.kitchensink_configuration.path.clone(),
SubstrateNode::KITCHENSINK_EXPORT_CHAINSPEC_COMMAND,
None,
&context,
);