From 95d2afde0558afe6ff99d99fa6d72fe142058a41 Mon Sep 17 00:00:00 2001 From: Cyrill Leutwiler Date: Wed, 26 Mar 2025 11:58:38 +0100 Subject: [PATCH] the node pool Signed-off-by: Cyrill Leutwiler --- Cargo.lock | 108 ++++++++++++++++++++++++ Cargo.toml | 1 + crates/config/src/lib.rs | 8 +- crates/core/src/driver/mod.rs | 57 +++++-------- crates/core/src/lib.rs | 5 +- crates/core/src/main.rs | 132 +++++++++++++++++++++--------- crates/format/src/corpus.rs | 2 +- crates/format/src/metadata.rs | 21 ++++- crates/node/Cargo.toml | 1 + crates/node/src/pool.rs | 80 ++++++++++++------ crates/solc-binaries/src/cache.rs | 1 + 11 files changed, 304 insertions(+), 112 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 54c817b..d1b5acc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -39,16 +39,19 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b4ae82946772d69f868b9ef81fc66acb1b149ef9b4601849bec4bcf5da6552e" dependencies = [ "alloy-consensus", + "alloy-contract", "alloy-core", "alloy-eips", "alloy-genesis", "alloy-network", "alloy-provider", + "alloy-pubsub", "alloy-rpc-client", "alloy-rpc-types", "alloy-serde", "alloy-transport", "alloy-transport-http", + "alloy-transport-ipc", ] [[package]] @@ -99,6 +102,28 @@ dependencies = [ "serde", ] +[[package]] +name = "alloy-contract" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd0a0c1ddee20ecc14308aae21c2438c994df7b39010c26d70f86e1d8fdb8db0" +dependencies = [ + "alloy-consensus", + "alloy-dyn-abi", + "alloy-json-abi", + "alloy-network", + "alloy-network-primitives", + "alloy-primitives", + "alloy-provider", + "alloy-pubsub", + "alloy-rpc-types-eth", + "alloy-sol-types", + "alloy-transport", + "futures", + "futures-util", + "thiserror", +] + [[package]] name = "alloy-core" version = "0.8.23" @@ -304,6 +329,7 @@ dependencies = [ "alloy-network", "alloy-network-primitives", "alloy-primitives", + "alloy-pubsub", "alloy-rpc-client", "alloy-rpc-types-debug", "alloy-rpc-types-eth", @@ -311,6 +337,7 @@ dependencies = [ "alloy-sol-types", "alloy-transport", "alloy-transport-http", + "alloy-transport-ipc", "async-stream", "async-trait", "auto_impl", @@ -330,6 +357,25 @@ dependencies = [ "wasmtimer", ] +[[package]] +name = "alloy-pubsub" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "721aca709a9231815ad5903a2d284042cc77e7d9d382696451b30c9ee0950001" +dependencies = [ + "alloy-json-rpc", + "alloy-primitives", + "alloy-transport", + "bimap", + "futures", + "serde", + "serde_json", + "tokio", + "tokio-stream", + "tower", + "tracing", +] + [[package]] name = "alloy-rlp" version = "0.3.11" @@ -360,8 +406,10 @@ checksum = "445a3298c14fae7afb5b9f2f735dead989f3dd83020c2ab8e48ed95d7b6d1acb" dependencies = [ "alloy-json-rpc", "alloy-primitives", + "alloy-pubsub", "alloy-transport", "alloy-transport-http", + "alloy-transport-ipc", "async-stream", "futures", "pin-project", @@ -582,6 +630,26 @@ dependencies = [ "url", ] +[[package]] +name = "alloy-transport-ipc" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45a78cfda2cac16fa83f6b5dd8b4643caec6161433b25b67e484ce05d2194513" +dependencies = [ + "alloy-json-rpc", + "alloy-pubsub", + "alloy-transport", + "bytes", + "futures", + "interprocess", + "pin-project", + "serde", + "serde_json", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "alloy-trie" version = "0.7.9" @@ -891,6 +959,12 @@ version = "1.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "89e25b6adfb930f02d1981565a6e5d9c547ac15a96606256d3b59040e5cd4ca3" +[[package]] +name = "bimap" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "230c5f1ca6a325a32553f8640d31ac9b49f2411e901e427570154868b46da4f7" + [[package]] name = "bit-set" version = "0.8.0" @@ -1346,6 +1420,12 @@ dependencies = [ "syn 2.0.100", ] +[[package]] +name = "doctest-file" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aac81fa3e28d21450aa4d2ac065992ba96a1d7303efbce51a95f4fd175b67562" + [[package]] name = "dunce" version = "1.0.5" @@ -2083,6 +2163,21 @@ dependencies = [ "serde", ] +[[package]] +name = "interprocess" +version = "2.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d941b405bd2322993887859a8ee6ac9134945a24ec5ec763a8a962fc64dfec2d" +dependencies = [ + "doctest-file", + "futures-core", + "libc", + "recvmsg", + "tokio", + "widestring", + "windows-sys 0.52.0", +] + [[package]] name = "ipnet" version = "2.11.0" @@ -2757,6 +2852,12 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "recvmsg" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3edd4d5d42c92f0a659926464d4cce56b562761267ecf0f469d85b7de384175" + [[package]] name = "redox_syscall" version = "0.5.10" @@ -2912,6 +3013,7 @@ version = "0.1.0" dependencies = [ "alloy", "anyhow", + "log", "revive-dt-config", "revive-dt-node-interaction", "serde_json", @@ -4020,6 +4122,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "widestring" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7219d36b6eac893fa81e84ebe06485e7dcbb616177469b142df14f1f4deb1311" + [[package]] name = "windows-core" version = "0.52.0" diff --git a/Cargo.toml b/Cargo.toml index ffda817..b73b8e7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,6 +48,7 @@ default-features = false features = [ "json-abi", "providers", + "provider-ipc", "provider-debug-api", "reqwest", "rpc-types", diff --git a/crates/config/src/lib.rs b/crates/config/src/lib.rs index 1a5878e..7ab366f 100644 --- a/crates/config/src/lib.rs +++ b/crates/config/src/lib.rs @@ -42,7 +42,7 @@ pub struct Arguments { /// /// We attach it here because [TempDir] prunes itself on drop. #[clap(skip)] - pub temp_dir: Option, + pub temp_dir: Option<&'static TempDir>, /// The path to the `geth` executable. /// @@ -59,8 +59,8 @@ pub struct Arguments { pub network_id: u64, /// Configure nodes according to this genesis.json file. - #[arg(long = "genesis-file")] - pub genesis_file: Option, + #[arg(long = "genesis", default_value = "genesis.json")] + pub genesis_file: PathBuf, /// The signing account private key. #[arg( @@ -80,7 +80,7 @@ pub struct Arguments { /// Only compile against this testing platform (doesn't execute the tests). #[arg(long = "compile-only")] - pub compile_only: bool, + pub compile_only: Option, /// Determines the amount of tests that are executed in parallel. #[arg(long = "workers", default_value = "12")] diff --git a/crates/core/src/driver/mod.rs b/crates/core/src/driver/mod.rs index 7a2cf4a..00c884c 100644 --- a/crates/core/src/driver/mod.rs +++ b/crates/core/src/driver/mod.rs @@ -32,7 +32,7 @@ impl<'a, T> State<'a, T> where T: Platform, { - fn new(config: &'a Arguments) -> Self { + pub fn new(config: &'a Arguments) -> Self { Self { config, contracts: Default::default(), @@ -49,6 +49,7 @@ where let base_path = metadata.directory()?.display().to_string(); let mut compiler = Compiler::::new().base_path(base_path.clone()); for (file, _contract) in sources.values() { + log::debug!("contract source {}", file.display()); compiler = compiler.with_source(file)?; } @@ -73,15 +74,15 @@ where &self.deployed_contracts, )?)?; dbg!(&receipt); - Ok(node.trace_transaction(receipt)?) + node.trace_transaction(receipt) } } pub struct Driver<'a, Leader: Platform, Follower: Platform> { metadata: &'a Metadata, config: &'a Arguments, - leader: State<'a, Leader>, - follower: State<'a, Follower>, + leader_node: &'a Leader::Blockchain, + follower_node: &'a Follower::Blockchain, } impl<'a, L, F> Driver<'a, L, F> @@ -89,52 +90,36 @@ where L: Platform, F: Platform, { - pub fn new(metadata: &'a Metadata, config: &'a Arguments) -> Driver<'a, L, F> { + pub fn new( + metadata: &'a Metadata, + config: &'a Arguments, + leader_node: &'a L::Blockchain, + follower_node: &'a F::Blockchain, + ) -> Driver<'a, L, F> { Self { metadata, config, - leader: State::new(config), - follower: State::new(config), + leader_node, + follower_node, } } - pub fn execute( - &mut self, - leader: L::Blockchain, - follower: F::Blockchain, - ) -> anyhow::Result<()> { - for mode in self.modes() { - self.leader.build_contracts(&mode, self.metadata)?; - self.follower.build_contracts(&mode, self.metadata)?; + pub fn execute(&mut self) -> anyhow::Result<()> { + for mode in self.metadata.solc_modes() { + let mut leader_state = State::::new(self.config); + leader_state.build_contracts(&mode, self.metadata)?; - if self.config.compile_only { - continue; - } + let mut follower_state = State::::new(self.config); + follower_state.build_contracts(&mode, self.metadata)?; for case in &self.metadata.cases { for input in &case.inputs { - let expected = self.leader.execute_input(input, &leader)?; + let expected = leader_state.execute_input(input, self.leader_node)?; + let received = follower_state.execute_input(input, self.follower_node)?; } } - - *self = Self::new(self.metadata, self.config); } Ok(()) } - - fn modes(&self) -> Vec { - self.metadata - .modes() - .iter() - .filter_map(|mode| match mode { - Mode::Solidity(solc_mode) => Some(solc_mode), - Mode::Unknown(mode) => { - log::debug!("compiler: ignoring unknown mode '{mode}'"); - None - } - }) - .cloned() - .collect() - } } diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index c231476..9ae8f3a 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -4,7 +4,8 @@ //! provides a helper utilty to execute tests. use revive_dt_compiler::{SolidityCompiler, solc}; -use revive_dt_node::{Node, geth}; +use revive_dt_node::geth; +use revive_dt_node_interaction::EthereumNode; pub mod driver; @@ -12,7 +13,7 @@ pub mod driver; /// /// For this we need a blockchain node implementation and a compiler. pub trait Platform { - type Blockchain: Node; + type Blockchain: EthereumNode; type Compiler: SolidityCompiler; } diff --git a/crates/core/src/main.rs b/crates/core/src/main.rs index c6217a1..e64e621 100644 --- a/crates/core/src/main.rs +++ b/crates/core/src/main.rs @@ -1,15 +1,40 @@ -use std::collections::BTreeSet; +use std::{collections::HashMap, sync::LazyLock}; use clap::Parser; use rayon::{ThreadPoolBuilder, prelude::*}; use revive_dt_config::*; -use revive_dt_core::{Geth, Kitchensink, driver::Driver}; -use revive_dt_format::corpus::Corpus; -use revive_dt_node::{Node, geth}; +use revive_dt_core::{ + Geth, Kitchensink, + driver::{Driver, State}, +}; +use revive_dt_format::{corpus::Corpus, metadata::Metadata}; +use revive_dt_node::pool::NodePool; use temp_dir::TempDir; +static TEMP_DIR: LazyLock = LazyLock::new(|| TempDir::new().unwrap()); + fn main() -> anyhow::Result<()> { + let args = init_cli()?; + + let corpora = collect_corpora(&args)?; + + if let Some(platform) = &args.compile_only { + for tests in corpora.values() { + main_compile_only(&args, tests, platform)?; + } + + return Ok(()); + } + + for tests in corpora.values() { + main_execute_differential(&args, tests)?; + } + + Ok(()) +} + +fn init_cli() -> anyhow::Result { env_logger::init(); let mut args = Arguments::parse(); @@ -17,7 +42,7 @@ fn main() -> anyhow::Result<()> { anyhow::bail!("no test corpus specified"); } if args.working_directory.is_none() { - args.temp_dir = TempDir::new()?.into() + args.temp_dir = Some(&TEMP_DIR); } ThreadPoolBuilder::new() @@ -25,45 +50,72 @@ fn main() -> anyhow::Result<()> { .build_global() .unwrap(); - for path in args.corpus.iter().collect::>() { - log::trace!("attempting corpus {path:?}"); - let corpus = Corpus::try_from_path(path)?; - log::info!("found corpus: {corpus:?}"); + Ok(args) +} +fn collect_corpora(args: &Arguments) -> anyhow::Result>> { + let mut corpora = HashMap::new(); + + for path in &args.corpus { + let corpus = Corpus::try_from_path(path)?; + log::info!("found corpus: {}", path.display()); let tests = corpus.enumerate_tests(); log::info!("corpus '{}' contains {} tests", &corpus.name, tests.len()); - - tests.par_iter().for_each(|metadata| { - let (leader, follower) = match (&args.leader, &args.follower) { - (TestingPlatform::Geth, TestingPlatform::Kitchensink) => { - (geth::Instance::new(&args), geth::Instance::new(&args)) - } - _ => unimplemented!(), - }; - let mut driver = match (&args.leader, &args.follower) { - (TestingPlatform::Geth, TestingPlatform::Kitchensink) => { - Driver::::new(metadata, &args) - } - _ => unimplemented!(), - }; - - match Driver::::new(metadata, &args).execute(leader, follower) { - Ok(build) => { - log::info!( - "metadata {} success", - metadata.directory().as_ref().unwrap().display() - ); - build - } - Err(error) => { - log::warn!( - "metadata {} failure: {error:?}", - metadata.file_path.as_ref().unwrap().display() - ); - } - } - }); + corpora.insert(corpus, tests); } + Ok(corpora) +} + +fn main_execute_differential(args: &Arguments, tests: &[Metadata]) -> anyhow::Result<()> { + let leader_nodes = NodePool::new(args)?; + let follower_nodes = NodePool::new(args)?; + + tests.par_iter().for_each(|metadata| { + let mut driver = match (&args.leader, &args.follower) { + (TestingPlatform::Geth, TestingPlatform::Kitchensink) => Driver::::new( + metadata, + args, + leader_nodes.round_robbin(), + follower_nodes.round_robbin(), + ), + _ => unimplemented!(), + }; + + match driver.execute() { + Ok(build) => { + log::info!( + "metadata {} success", + metadata.directory().as_ref().unwrap().display() + ); + build + } + Err(error) => { + log::warn!( + "metadata {} failure: {error:?}", + metadata.file_path.as_ref().unwrap().display() + ); + } + } + }); + + Ok(()) +} + +fn main_compile_only( + config: &Arguments, + tests: &[Metadata], + platform: &TestingPlatform, +) -> anyhow::Result<()> { + tests.par_iter().for_each(|metadata| { + for mode in &metadata.solc_modes() { + let mut state = match platform { + TestingPlatform::Geth => State::::new(config), + _ => todo!(), + }; + let _ = state.build_contracts(mode, metadata); + } + }); + Ok(()) } diff --git a/crates/format/src/corpus.rs b/crates/format/src/corpus.rs index 2717207..5f0fe5a 100644 --- a/crates/format/src/corpus.rs +++ b/crates/format/src/corpus.rs @@ -7,7 +7,7 @@ use serde::Deserialize; use crate::metadata::Metadata; -#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq)] +#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Hash)] pub struct Corpus { pub name: String, pub path: PathBuf, diff --git a/crates/format/src/metadata.rs b/crates/format/src/metadata.rs index f072907..8d8e56d 100644 --- a/crates/format/src/metadata.rs +++ b/crates/format/src/metadata.rs @@ -6,7 +6,10 @@ use std::{ use serde::Deserialize; -use crate::{case::Case, mode::Mode}; +use crate::{ + case::Case, + mode::{Mode, SolcMode}, +}; pub const METADATA_FILE_EXTENSION: &str = "json"; pub const SOLIDITY_CASE_FILE_EXTENSION: &str = "sol"; @@ -17,16 +20,26 @@ pub struct Metadata { pub contracts: Option>, pub libraries: Option>>, pub ignore: Option, - modes: Option>, + pub modes: Option>, pub file_path: Option, } impl Metadata { - /// Returns the modes of this metadata, inserting a default mode if not present. - pub fn modes(&self) -> Vec { + /// Returns the solc modes of this metadata, inserting a default mode if not present. + pub fn solc_modes(&self) -> Vec { self.modes .to_owned() .unwrap_or_else(|| vec![Mode::Solidity(Default::default())]) + .iter() + .filter_map(|mode| match mode { + Mode::Solidity(solc_mode) => Some(solc_mode), + Mode::Unknown(mode) => { + log::debug!("compiler: ignoring unknown mode '{mode}'"); + None + } + }) + .cloned() + .collect() } /// Returns the base directory of this metadata. diff --git a/crates/node/Cargo.toml b/crates/node/Cargo.toml index 23edde0..6ee9473 100644 --- a/crates/node/Cargo.toml +++ b/crates/node/Cargo.toml @@ -11,6 +11,7 @@ rust-version.workspace = true [dependencies] anyhow = { workspace = true } alloy = { workspace = true } +log = { workspace = true } serde_json = { workspace = true } revive-dt-node-interaction = { workspace = true } diff --git a/crates/node/src/pool.rs b/crates/node/src/pool.rs index bc6501a..0143000 100644 --- a/crates/node/src/pool.rs +++ b/crates/node/src/pool.rs @@ -1,38 +1,68 @@ -use std::sync::{ - LazyLock, Mutex, - mpsc::{Receiver, Sender}, +//! This crate implements concurrent handling of testing node. + +use std::{ + fs::read_to_string, + sync::atomic::{AtomicUsize, Ordering}, + thread, }; -pub trait NodePool { - fn access() -> &'static LazyLock>>; -} - +use anyhow::Context; use revive_dt_config::Arguments; use crate::Node; -//static POOL: LazyLock>> = LazyLock::new(Default::default); - -pub struct Handle { - node: T, - notifier: Sender<()>, +/// The node pool starts one or more [Node] which then can be accessed +/// in a round robbin fasion. +pub struct NodePool { + next: AtomicUsize, + nodes: Vec, } -pub struct Pool { - request: Receiver<()>, - nodes: usize, - handles: Vec, -} - -impl Pool +impl NodePool where - T: Node, + T: Node + Send + 'static, { - pub fn spawn() {} + /// Create a new Pool. This will start as many nodes as there are workers in `config`. + pub fn new(config: &Arguments) -> anyhow::Result { + let nodes = config.workers; + let genesis = read_to_string(&config.genesis_file).context(format!( + "can not read genesis file: {}", + config.genesis_file.display() + ))?; + + let mut handles = Vec::with_capacity(nodes); + for _ in 0..nodes { + let config = config.clone(); + let genesis = genesis.clone(); + handles.push(thread::spawn(move || spawn_node::(&config, genesis))); + } + + let mut nodes = Vec::with_capacity(nodes); + for handle in handles { + nodes.push( + handle + .join() + .map_err(|error| anyhow::anyhow!("failed to spawn node: {:?}", error))? + .map_err(|error| anyhow::anyhow!("node failed to spawn: {error}"))?, + ); + } + + Ok(Self { + nodes, + next: Default::default(), + }) + } + + /// Get a handle to the next node. + pub fn round_robbin(&self) -> &T { + let current = self.next.fetch_add(1, Ordering::SeqCst) % self.nodes.len(); + self.nodes.get(current).unwrap() + } } -// spawner: loops on a queue - -pub fn get_handle>(config: &Arguments) -> Receiver { - todo!() +fn spawn_node(args: &Arguments, genesis: String) -> anyhow::Result { + let mut node = T::new(&args); + log::info!("starting node: {}", node.connection_string()); + node.spawn(genesis)?; + Ok(node) } diff --git a/crates/solc-binaries/src/cache.rs b/crates/solc-binaries/src/cache.rs index e54cb82..75b83f3 100644 --- a/crates/solc-binaries/src/cache.rs +++ b/crates/solc-binaries/src/cache.rs @@ -25,6 +25,7 @@ pub(crate) fn get_or_download( let mut cache = SOLC_CACHER.lock().unwrap(); if cache.contains(&target_file) { + log::debug!("using cached solc: {}", target_file.display()); return Ok(target_file); }