Fix concurrency issues (#142)

* Fix the OS FD error

* Cache the compiler versions

* Allow for auto display impl in declare wrapper type macro

* Better logging and fix concurrency issues

* Fix tests

* Format

* Make the code even more concurrent
This commit is contained in:
Omar
2025-08-19 09:47:36 +03:00
committed by GitHub
parent c58551803d
commit 76d6a154c1
33 changed files with 773 additions and 720 deletions
+90 -142
View File
@@ -16,26 +16,24 @@ use alloy::rpc::types::trace::geth::{
};
use alloy::{
primitives::Address,
rpc::types::{
TransactionRequest,
trace::geth::{AccountState, DiffMode},
},
rpc::types::{TransactionRequest, trace::geth::DiffMode},
};
use anyhow::Context;
use futures::TryStreamExt;
use indexmap::IndexMap;
use revive_dt_format::traits::{ResolutionContext, ResolverApi};
use semver::Version;
use revive_dt_format::case::{Case, CaseIdx};
use revive_dt_format::case::Case;
use revive_dt_format::input::{
BalanceAssertion, Calldata, EtherValue, Expected, ExpectedOutput, Input, Method,
BalanceAssertion, Calldata, EtherValue, Expected, ExpectedOutput, Input, Method, StepIdx,
StorageEmptyAssertion,
};
use revive_dt_format::metadata::{ContractIdent, ContractInstance, ContractPathAndIdent};
use revive_dt_format::{input::Step, metadata::Metadata};
use revive_dt_node::Node;
use revive_dt_node_interaction::EthereumNode;
use tracing::Instrument;
use tokio::try_join;
use tracing::{Instrument, info, info_span, instrument};
use crate::Platform;
@@ -77,38 +75,38 @@ where
pub async fn handle_step(
&mut self,
metadata: &Metadata,
case_idx: CaseIdx,
step: &Step,
node: &T::Blockchain,
) -> anyhow::Result<StepOutput> {
match step {
Step::FunctionCall(input) => {
let (receipt, geth_trace, diff_mode) =
self.handle_input(metadata, case_idx, input, node).await?;
self.handle_input(metadata, input, node).await?;
Ok(StepOutput::FunctionCall(receipt, geth_trace, diff_mode))
}
Step::BalanceAssertion(balance_assertion) => {
self.handle_balance_assertion(metadata, case_idx, balance_assertion, node)
self.handle_balance_assertion(metadata, balance_assertion, node)
.await?;
Ok(StepOutput::BalanceAssertion)
}
Step::StorageEmptyAssertion(storage_empty) => {
self.handle_storage_empty(metadata, case_idx, storage_empty, node)
self.handle_storage_empty(metadata, storage_empty, node)
.await?;
Ok(StepOutput::StorageEmptyAssertion)
}
}
.inspect(|_| info!("Step Succeeded"))
}
#[instrument(level = "info", name = "Handling Input", skip_all)]
pub async fn handle_input(
&mut self,
metadata: &Metadata,
case_idx: CaseIdx,
input: &Input,
node: &T::Blockchain,
) -> anyhow::Result<(TransactionReceipt, GethTrace, DiffMode)> {
let deployment_receipts = self
.handle_input_contract_deployment(metadata, case_idx, input, node)
.handle_input_contract_deployment(metadata, input, node)
.await?;
let execution_receipt = self
.handle_input_execution(input, deployment_receipts, node)
@@ -117,16 +115,17 @@ where
.handle_input_call_frame_tracing(&execution_receipt, node)
.await?;
self.handle_input_variable_assignment(input, &tracing_result)?;
self.handle_input_expectations(input, &execution_receipt, node, &tracing_result)
.await?;
self.handle_input_diff(case_idx, execution_receipt, node)
.await
let (_, (geth_trace, diff_mode)) = try_join!(
self.handle_input_expectations(input, &execution_receipt, node, &tracing_result),
self.handle_input_diff(&execution_receipt, node)
)?;
Ok((execution_receipt, geth_trace, diff_mode))
}
#[instrument(level = "info", name = "Handling Balance Assertion", skip_all)]
pub async fn handle_balance_assertion(
&mut self,
metadata: &Metadata,
_: CaseIdx,
balance_assertion: &BalanceAssertion,
node: &T::Blockchain,
) -> anyhow::Result<()> {
@@ -137,10 +136,10 @@ where
Ok(())
}
#[instrument(level = "info", name = "Handling Storage Assertion", skip_all)]
pub async fn handle_storage_empty(
&mut self,
metadata: &Metadata,
_: CaseIdx,
storage_empty: &StorageEmptyAssertion,
node: &T::Blockchain,
) -> anyhow::Result<()> {
@@ -152,10 +151,10 @@ where
}
/// Handles the contract deployment for a given input performing it if it needs to be performed.
#[instrument(level = "info", skip_all)]
async fn handle_input_contract_deployment(
&mut self,
metadata: &Metadata,
_: CaseIdx,
input: &Input,
node: &T::Blockchain,
) -> anyhow::Result<HashMap<ContractInstance, TransactionReceipt>> {
@@ -170,11 +169,6 @@ where
instances_we_must_deploy.insert(input.instance.clone(), true);
}
tracing::debug!(
instances_to_deploy = instances_we_must_deploy.len(),
"Computed the number of required deployments for input"
);
let mut receipts = HashMap::new();
for (instance, deploy_with_constructor_arguments) in instances_we_must_deploy.into_iter() {
let calldata = deploy_with_constructor_arguments.then_some(&input.calldata);
@@ -201,6 +195,7 @@ where
}
/// Handles the execution of the input in terms of the calls that need to be made.
#[instrument(level = "info", skip_all)]
async fn handle_input_execution(
&mut self,
input: &Input,
@@ -218,33 +213,21 @@ where
.legacy_transaction(node, self.default_resolution_context())
.await
{
Ok(tx) => {
tracing::debug!("Legacy transaction data: {tx:#?}");
tx
}
Ok(tx) => tx,
Err(err) => {
tracing::error!("Failed to construct legacy transaction: {err:?}");
return Err(err);
}
};
tracing::trace!("Executing transaction for input: {input:?}");
match node.execute_transaction(tx).await {
Ok(receipt) => Ok(receipt),
Err(err) => {
tracing::error!(
"Failed to execute transaction when executing the contract: {}, {:?}",
&*input.instance,
err
);
Err(err)
}
Err(err) => Err(err),
}
}
}
}
#[instrument(level = "info", skip_all)]
async fn handle_input_call_frame_tracing(
&self,
execution_receipt: &TransactionReceipt,
@@ -259,7 +242,10 @@ where
tracer_config: GethDebugTracerConfig(serde_json::json! {{
"onlyTopCall": true,
"withLog": false,
"withReturnData": false
"withStorage": false,
"withMemory": false,
"withStack": false,
"withReturnData": true
}}),
..Default::default()
},
@@ -272,6 +258,7 @@ where
})
}
#[instrument(level = "info", skip_all)]
fn handle_input_variable_assignment(
&mut self,
input: &Input,
@@ -302,8 +289,9 @@ where
Ok(())
}
#[instrument(level = "info", skip_all)]
async fn handle_input_expectations(
&mut self,
&self,
input: &Input,
execution_receipt: &TransactionReceipt,
resolver: &impl ResolverApi,
@@ -337,24 +325,25 @@ where
}
}
for expectation in expectations.iter() {
self.handle_input_expectation_item(
execution_receipt,
resolver,
expectation,
tracing_result,
)
.await?;
}
Ok(())
futures::stream::iter(expectations.into_iter().map(Ok))
.try_for_each_concurrent(None, |expectation| async move {
self.handle_input_expectation_item(
execution_receipt,
resolver,
expectation,
tracing_result,
)
.await
})
.await
}
#[instrument(level = "info", skip_all)]
async fn handle_input_expectation_item(
&mut self,
&self,
execution_receipt: &TransactionReceipt,
resolver: &impl ResolverApi,
expectation: &ExpectedOutput,
expectation: ExpectedOutput,
tracing_result: &CallFrame,
) -> anyhow::Result<()> {
if let Some(ref version_requirement) = expectation.compiler_version {
@@ -492,12 +481,12 @@ where
Ok(())
}
#[instrument(level = "info", skip_all)]
async fn handle_input_diff(
&mut self,
_: CaseIdx,
execution_receipt: TransactionReceipt,
&self,
execution_receipt: &TransactionReceipt,
node: &T::Blockchain,
) -> anyhow::Result<(TransactionReceipt, GethTrace, DiffMode)> {
) -> anyhow::Result<(GethTrace, DiffMode)> {
let trace_options = GethDebugTracingOptions::prestate_tracer(PreStateConfig {
diff_mode: Some(true),
disable_code: None,
@@ -505,13 +494,14 @@ where
});
let trace = node
.trace_transaction(&execution_receipt, trace_options)
.trace_transaction(execution_receipt, trace_options)
.await?;
let diff = node.state_diff(&execution_receipt).await?;
let diff = node.state_diff(execution_receipt).await?;
Ok((execution_receipt, trace, diff))
Ok((trace, diff))
}
#[instrument(level = "info", skip_all)]
pub async fn handle_balance_assertion_contract_deployment(
&mut self,
metadata: &Metadata,
@@ -537,6 +527,7 @@ where
Ok(())
}
#[instrument(level = "info", skip_all)]
pub async fn handle_balance_assertion_execution(
&mut self,
BalanceAssertion {
@@ -572,6 +563,7 @@ where
Ok(())
}
#[instrument(level = "info", skip_all)]
pub async fn handle_storage_empty_assertion_contract_deployment(
&mut self,
metadata: &Metadata,
@@ -597,6 +589,7 @@ where
Ok(())
}
#[instrument(level = "info", skip_all)]
pub async fn handle_storage_empty_assertion_execution(
&mut self,
StorageEmptyAssertion {
@@ -658,7 +651,6 @@ where
contract_ident,
}) = metadata.contract_sources()?.remove(contract_instance)
else {
tracing::error!("Contract source not found for instance");
anyhow::bail!(
"Contract source not found for instance {:?}",
contract_instance
@@ -671,11 +663,6 @@ where
.and_then(|source_file_contracts| source_file_contracts.get(contract_ident.as_ref()))
.cloned()
else {
tracing::error!(
contract_source_path = contract_source_path.display().to_string(),
contract_ident = contract_ident.as_ref(),
"Failed to find information for contract"
);
anyhow::bail!(
"Failed to find information for contract {:?}",
contract_instance
@@ -724,7 +711,6 @@ where
};
let Some(address) = receipt.contract_address else {
tracing::error!("Contract deployment transaction didn't return an address");
anyhow::bail!("Contract deployment didn't return an address");
};
tracing::info!(
@@ -751,7 +737,6 @@ where
pub struct CaseDriver<'a, Leader: Platform, Follower: Platform> {
metadata: &'a Metadata,
case: &'a Case,
case_idx: CaseIdx,
leader_node: &'a Leader::Blockchain,
follower_node: &'a Follower::Blockchain,
leader_state: CaseState<Leader>,
@@ -767,7 +752,6 @@ where
pub fn new(
metadata: &'a Metadata,
case: &'a Case,
case_idx: impl Into<CaseIdx>,
leader_node: &'a L::Blockchain,
follower_node: &'a F::Blockchain,
leader_state: CaseState<L>,
@@ -776,7 +760,6 @@ where
Self {
metadata,
case,
case_idx: case_idx.into(),
leader_node,
follower_node,
leader_state,
@@ -784,79 +767,44 @@ where
}
}
pub fn trace_diff_mode(label: &str, diff: &DiffMode) {
tracing::trace!("{label} - PRE STATE:");
for (addr, state) in &diff.pre {
Self::trace_account_state(" [pre]", addr, state);
}
tracing::trace!("{label} - POST STATE:");
for (addr, state) in &diff.post {
Self::trace_account_state(" [post]", addr, state);
}
}
fn trace_account_state(prefix: &str, addr: &Address, state: &AccountState) {
tracing::trace!("{prefix} 0x{addr:x}");
if let Some(balance) = &state.balance {
tracing::trace!("{prefix} balance: {balance}");
}
if let Some(nonce) = &state.nonce {
tracing::trace!("{prefix} nonce: {nonce}");
}
if let Some(code) = &state.code {
tracing::trace!("{prefix} code: {code}");
}
}
#[instrument(level = "info", name = "Executing Case", skip_all)]
pub async fn execute(&mut self) -> anyhow::Result<usize> {
if !self
.leader_node
.matches_target(self.metadata.targets.as_deref())
|| !self
.follower_node
.matches_target(self.metadata.targets.as_deref())
{
tracing::warn!(
targets = ?self.metadata.targets,
"Either the leader or follower node do not support the targets of the file"
);
return Ok(0);
}
let mut steps_executed = 0;
for (step_idx, step) in self.case.steps_iterator().enumerate() {
let tracing_span = tracing::info_span!("Handling input", step_idx);
for (step_idx, step) in self
.case
.steps_iterator()
.enumerate()
.map(|(idx, v)| (StepIdx::new(idx), v))
{
let (leader_step_output, follower_step_output) = try_join!(
self.leader_state
.handle_step(self.metadata, &step, self.leader_node)
.instrument(info_span!(
"Handling Step",
%step_idx,
target = "Leader",
)),
self.follower_state
.handle_step(self.metadata, &step, self.follower_node)
.instrument(info_span!(
"Handling Step",
%step_idx,
target = "Follower",
))
)?;
let leader_step_output = self
.leader_state
.handle_step(self.metadata, self.case_idx, &step, self.leader_node)
.instrument(tracing_span.clone())
.await?;
let follower_step_output = self
.follower_state
.handle_step(self.metadata, self.case_idx, &step, self.follower_node)
.instrument(tracing_span)
.await?;
match (leader_step_output, follower_step_output) {
(
StepOutput::FunctionCall(leader_receipt, _, leader_diff),
StepOutput::FunctionCall(follower_receipt, _, follower_diff),
) => {
if leader_diff == follower_diff {
tracing::debug!("State diffs match between leader and follower.");
} else {
tracing::debug!("State diffs mismatch between leader and follower.");
Self::trace_diff_mode("Leader", &leader_diff);
Self::trace_diff_mode("Follower", &follower_diff);
}
if leader_receipt.logs() != follower_receipt.logs() {
tracing::debug!("Log/event mismatch between leader and follower.");
tracing::trace!("Leader logs: {:?}", leader_receipt.logs());
tracing::trace!("Follower logs: {:?}", follower_receipt.logs());
}
(StepOutput::FunctionCall(..), StepOutput::FunctionCall(..)) => {
// TODO: We need to actually work out how/if we will compare the diff between
// the leader and the follower. The diffs are almost guaranteed to be different
// from leader and follower and therefore without an actual strategy for this
// we have something that's guaranteed to fail. Even a simple call to some
// contract will produce two non-equal diffs because on the leader the contract
// has address X and on the follower it has address Y. On the leader contract X
// contains address A in the state and on the follower it contains address B. So
// this isn't exactly a straightforward thing to do and I'm not even sure that
// it's possible to do. Once we have an actual strategy for doing the diffs we
// will implement it here. Until then, this remains empty.
}
(StepOutput::BalanceAssertion, StepOutput::BalanceAssertion) => {}
(StepOutput::StorageEmptyAssertion, StepOutput::StorageEmptyAssertion) => {}