the node pool

Signed-off-by: Cyrill Leutwiler <bigcyrill@hotmail.com>
This commit is contained in:
Cyrill Leutwiler
2025-03-26 11:58:38 +01:00
parent 34b8879b15
commit 95d2afde05
11 changed files with 304 additions and 112 deletions
Generated
+108
View File
@@ -39,16 +39,19 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b4ae82946772d69f868b9ef81fc66acb1b149ef9b4601849bec4bcf5da6552e"
dependencies = [
"alloy-consensus",
"alloy-contract",
"alloy-core",
"alloy-eips",
"alloy-genesis",
"alloy-network",
"alloy-provider",
"alloy-pubsub",
"alloy-rpc-client",
"alloy-rpc-types",
"alloy-serde",
"alloy-transport",
"alloy-transport-http",
"alloy-transport-ipc",
]
[[package]]
@@ -99,6 +102,28 @@ dependencies = [
"serde",
]
[[package]]
name = "alloy-contract"
version = "0.12.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd0a0c1ddee20ecc14308aae21c2438c994df7b39010c26d70f86e1d8fdb8db0"
dependencies = [
"alloy-consensus",
"alloy-dyn-abi",
"alloy-json-abi",
"alloy-network",
"alloy-network-primitives",
"alloy-primitives",
"alloy-provider",
"alloy-pubsub",
"alloy-rpc-types-eth",
"alloy-sol-types",
"alloy-transport",
"futures",
"futures-util",
"thiserror",
]
[[package]]
name = "alloy-core"
version = "0.8.23"
@@ -304,6 +329,7 @@ dependencies = [
"alloy-network",
"alloy-network-primitives",
"alloy-primitives",
"alloy-pubsub",
"alloy-rpc-client",
"alloy-rpc-types-debug",
"alloy-rpc-types-eth",
@@ -311,6 +337,7 @@ dependencies = [
"alloy-sol-types",
"alloy-transport",
"alloy-transport-http",
"alloy-transport-ipc",
"async-stream",
"async-trait",
"auto_impl",
@@ -330,6 +357,25 @@ dependencies = [
"wasmtimer",
]
[[package]]
name = "alloy-pubsub"
version = "0.12.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "721aca709a9231815ad5903a2d284042cc77e7d9d382696451b30c9ee0950001"
dependencies = [
"alloy-json-rpc",
"alloy-primitives",
"alloy-transport",
"bimap",
"futures",
"serde",
"serde_json",
"tokio",
"tokio-stream",
"tower",
"tracing",
]
[[package]]
name = "alloy-rlp"
version = "0.3.11"
@@ -360,8 +406,10 @@ checksum = "445a3298c14fae7afb5b9f2f735dead989f3dd83020c2ab8e48ed95d7b6d1acb"
dependencies = [
"alloy-json-rpc",
"alloy-primitives",
"alloy-pubsub",
"alloy-transport",
"alloy-transport-http",
"alloy-transport-ipc",
"async-stream",
"futures",
"pin-project",
@@ -582,6 +630,26 @@ dependencies = [
"url",
]
[[package]]
name = "alloy-transport-ipc"
version = "0.12.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "45a78cfda2cac16fa83f6b5dd8b4643caec6161433b25b67e484ce05d2194513"
dependencies = [
"alloy-json-rpc",
"alloy-pubsub",
"alloy-transport",
"bytes",
"futures",
"interprocess",
"pin-project",
"serde",
"serde_json",
"tokio",
"tokio-util",
"tracing",
]
[[package]]
name = "alloy-trie"
version = "0.7.9"
@@ -891,6 +959,12 @@ version = "1.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89e25b6adfb930f02d1981565a6e5d9c547ac15a96606256d3b59040e5cd4ca3"
[[package]]
name = "bimap"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "230c5f1ca6a325a32553f8640d31ac9b49f2411e901e427570154868b46da4f7"
[[package]]
name = "bit-set"
version = "0.8.0"
@@ -1346,6 +1420,12 @@ dependencies = [
"syn 2.0.100",
]
[[package]]
name = "doctest-file"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aac81fa3e28d21450aa4d2ac065992ba96a1d7303efbce51a95f4fd175b67562"
[[package]]
name = "dunce"
version = "1.0.5"
@@ -2083,6 +2163,21 @@ dependencies = [
"serde",
]
[[package]]
name = "interprocess"
version = "2.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d941b405bd2322993887859a8ee6ac9134945a24ec5ec763a8a962fc64dfec2d"
dependencies = [
"doctest-file",
"futures-core",
"libc",
"recvmsg",
"tokio",
"widestring",
"windows-sys 0.52.0",
]
[[package]]
name = "ipnet"
version = "2.11.0"
@@ -2757,6 +2852,12 @@ dependencies = [
"crossbeam-utils",
]
[[package]]
name = "recvmsg"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3edd4d5d42c92f0a659926464d4cce56b562761267ecf0f469d85b7de384175"
[[package]]
name = "redox_syscall"
version = "0.5.10"
@@ -2912,6 +3013,7 @@ version = "0.1.0"
dependencies = [
"alloy",
"anyhow",
"log",
"revive-dt-config",
"revive-dt-node-interaction",
"serde_json",
@@ -4020,6 +4122,12 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "widestring"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7219d36b6eac893fa81e84ebe06485e7dcbb616177469b142df14f1f4deb1311"
[[package]]
name = "windows-core"
version = "0.52.0"
+1
View File
@@ -48,6 +48,7 @@ default-features = false
features = [
"json-abi",
"providers",
"provider-ipc",
"provider-debug-api",
"reqwest",
"rpc-types",
+4 -4
View File
@@ -42,7 +42,7 @@ pub struct Arguments {
///
/// We attach it here because [TempDir] prunes itself on drop.
#[clap(skip)]
pub temp_dir: Option<TempDir>,
pub temp_dir: Option<&'static TempDir>,
/// The path to the `geth` executable.
///
@@ -59,8 +59,8 @@ pub struct Arguments {
pub network_id: u64,
/// Configure nodes according to this genesis.json file.
#[arg(long = "genesis-file")]
pub genesis_file: Option<PathBuf>,
#[arg(long = "genesis", default_value = "genesis.json")]
pub genesis_file: PathBuf,
/// The signing account private key.
#[arg(
@@ -80,7 +80,7 @@ pub struct Arguments {
/// Only compile against this testing platform (doesn't execute the tests).
#[arg(long = "compile-only")]
pub compile_only: bool,
pub compile_only: Option<TestingPlatform>,
/// Determines the amount of tests that are executed in parallel.
#[arg(long = "workers", default_value = "12")]
+21 -36
View File
@@ -32,7 +32,7 @@ impl<'a, T> State<'a, T>
where
T: Platform,
{
fn new(config: &'a Arguments) -> Self {
pub fn new(config: &'a Arguments) -> Self {
Self {
config,
contracts: Default::default(),
@@ -49,6 +49,7 @@ where
let base_path = metadata.directory()?.display().to_string();
let mut compiler = Compiler::<T::Compiler>::new().base_path(base_path.clone());
for (file, _contract) in sources.values() {
log::debug!("contract source {}", file.display());
compiler = compiler.with_source(file)?;
}
@@ -73,15 +74,15 @@ where
&self.deployed_contracts,
)?)?;
dbg!(&receipt);
Ok(node.trace_transaction(receipt)?)
node.trace_transaction(receipt)
}
}
pub struct Driver<'a, Leader: Platform, Follower: Platform> {
metadata: &'a Metadata,
config: &'a Arguments,
leader: State<'a, Leader>,
follower: State<'a, Follower>,
leader_node: &'a Leader::Blockchain,
follower_node: &'a Follower::Blockchain,
}
impl<'a, L, F> Driver<'a, L, F>
@@ -89,52 +90,36 @@ where
L: Platform,
F: Platform,
{
pub fn new(metadata: &'a Metadata, config: &'a Arguments) -> Driver<'a, L, F> {
pub fn new(
metadata: &'a Metadata,
config: &'a Arguments,
leader_node: &'a L::Blockchain,
follower_node: &'a F::Blockchain,
) -> Driver<'a, L, F> {
Self {
metadata,
config,
leader: State::new(config),
follower: State::new(config),
leader_node,
follower_node,
}
}
pub fn execute(
&mut self,
leader: L::Blockchain,
follower: F::Blockchain,
) -> anyhow::Result<()> {
for mode in self.modes() {
self.leader.build_contracts(&mode, self.metadata)?;
self.follower.build_contracts(&mode, self.metadata)?;
pub fn execute(&mut self) -> anyhow::Result<()> {
for mode in self.metadata.solc_modes() {
let mut leader_state = State::<L>::new(self.config);
leader_state.build_contracts(&mode, self.metadata)?;
if self.config.compile_only {
continue;
}
let mut follower_state = State::<F>::new(self.config);
follower_state.build_contracts(&mode, self.metadata)?;
for case in &self.metadata.cases {
for input in &case.inputs {
let expected = self.leader.execute_input(input, &leader)?;
let expected = leader_state.execute_input(input, self.leader_node)?;
let received = follower_state.execute_input(input, self.follower_node)?;
}
}
*self = Self::new(self.metadata, self.config);
}
Ok(())
}
fn modes(&self) -> Vec<SolcMode> {
self.metadata
.modes()
.iter()
.filter_map(|mode| match mode {
Mode::Solidity(solc_mode) => Some(solc_mode),
Mode::Unknown(mode) => {
log::debug!("compiler: ignoring unknown mode '{mode}'");
None
}
})
.cloned()
.collect()
}
}
+3 -2
View File
@@ -4,7 +4,8 @@
//! provides a helper utilty to execute tests.
use revive_dt_compiler::{SolidityCompiler, solc};
use revive_dt_node::{Node, geth};
use revive_dt_node::geth;
use revive_dt_node_interaction::EthereumNode;
pub mod driver;
@@ -12,7 +13,7 @@ pub mod driver;
///
/// For this we need a blockchain node implementation and a compiler.
pub trait Platform {
type Blockchain: Node;
type Blockchain: EthereumNode;
type Compiler: SolidityCompiler;
}
+92 -40
View File
@@ -1,15 +1,40 @@
use std::collections::BTreeSet;
use std::{collections::HashMap, sync::LazyLock};
use clap::Parser;
use rayon::{ThreadPoolBuilder, prelude::*};
use revive_dt_config::*;
use revive_dt_core::{Geth, Kitchensink, driver::Driver};
use revive_dt_format::corpus::Corpus;
use revive_dt_node::{Node, geth};
use revive_dt_core::{
Geth, Kitchensink,
driver::{Driver, State},
};
use revive_dt_format::{corpus::Corpus, metadata::Metadata};
use revive_dt_node::pool::NodePool;
use temp_dir::TempDir;
static TEMP_DIR: LazyLock<TempDir> = LazyLock::new(|| TempDir::new().unwrap());
fn main() -> anyhow::Result<()> {
let args = init_cli()?;
let corpora = collect_corpora(&args)?;
if let Some(platform) = &args.compile_only {
for tests in corpora.values() {
main_compile_only(&args, tests, platform)?;
}
return Ok(());
}
for tests in corpora.values() {
main_execute_differential(&args, tests)?;
}
Ok(())
}
fn init_cli() -> anyhow::Result<Arguments> {
env_logger::init();
let mut args = Arguments::parse();
@@ -17,7 +42,7 @@ fn main() -> anyhow::Result<()> {
anyhow::bail!("no test corpus specified");
}
if args.working_directory.is_none() {
args.temp_dir = TempDir::new()?.into()
args.temp_dir = Some(&TEMP_DIR);
}
ThreadPoolBuilder::new()
@@ -25,45 +50,72 @@ fn main() -> anyhow::Result<()> {
.build_global()
.unwrap();
for path in args.corpus.iter().collect::<BTreeSet<_>>() {
log::trace!("attempting corpus {path:?}");
let corpus = Corpus::try_from_path(path)?;
log::info!("found corpus: {corpus:?}");
Ok(args)
}
fn collect_corpora(args: &Arguments) -> anyhow::Result<HashMap<Corpus, Vec<Metadata>>> {
let mut corpora = HashMap::new();
for path in &args.corpus {
let corpus = Corpus::try_from_path(path)?;
log::info!("found corpus: {}", path.display());
let tests = corpus.enumerate_tests();
log::info!("corpus '{}' contains {} tests", &corpus.name, tests.len());
tests.par_iter().for_each(|metadata| {
let (leader, follower) = match (&args.leader, &args.follower) {
(TestingPlatform::Geth, TestingPlatform::Kitchensink) => {
(geth::Instance::new(&args), geth::Instance::new(&args))
}
_ => unimplemented!(),
};
let mut driver = match (&args.leader, &args.follower) {
(TestingPlatform::Geth, TestingPlatform::Kitchensink) => {
Driver::<Geth, Geth>::new(metadata, &args)
}
_ => unimplemented!(),
};
match Driver::<Geth, Geth>::new(metadata, &args).execute(leader, follower) {
Ok(build) => {
log::info!(
"metadata {} success",
metadata.directory().as_ref().unwrap().display()
);
build
}
Err(error) => {
log::warn!(
"metadata {} failure: {error:?}",
metadata.file_path.as_ref().unwrap().display()
);
}
}
});
corpora.insert(corpus, tests);
}
Ok(corpora)
}
fn main_execute_differential(args: &Arguments, tests: &[Metadata]) -> anyhow::Result<()> {
let leader_nodes = NodePool::new(args)?;
let follower_nodes = NodePool::new(args)?;
tests.par_iter().for_each(|metadata| {
let mut driver = match (&args.leader, &args.follower) {
(TestingPlatform::Geth, TestingPlatform::Kitchensink) => Driver::<Geth, Geth>::new(
metadata,
args,
leader_nodes.round_robbin(),
follower_nodes.round_robbin(),
),
_ => unimplemented!(),
};
match driver.execute() {
Ok(build) => {
log::info!(
"metadata {} success",
metadata.directory().as_ref().unwrap().display()
);
build
}
Err(error) => {
log::warn!(
"metadata {} failure: {error:?}",
metadata.file_path.as_ref().unwrap().display()
);
}
}
});
Ok(())
}
fn main_compile_only(
config: &Arguments,
tests: &[Metadata],
platform: &TestingPlatform,
) -> anyhow::Result<()> {
tests.par_iter().for_each(|metadata| {
for mode in &metadata.solc_modes() {
let mut state = match platform {
TestingPlatform::Geth => State::<Geth>::new(config),
_ => todo!(),
};
let _ = state.build_contracts(mode, metadata);
}
});
Ok(())
}
+1 -1
View File
@@ -7,7 +7,7 @@ use serde::Deserialize;
use crate::metadata::Metadata;
#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq)]
#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Hash)]
pub struct Corpus {
pub name: String,
pub path: PathBuf,
+17 -4
View File
@@ -6,7 +6,10 @@ use std::{
use serde::Deserialize;
use crate::{case::Case, mode::Mode};
use crate::{
case::Case,
mode::{Mode, SolcMode},
};
pub const METADATA_FILE_EXTENSION: &str = "json";
pub const SOLIDITY_CASE_FILE_EXTENSION: &str = "sol";
@@ -17,16 +20,26 @@ pub struct Metadata {
pub contracts: Option<BTreeMap<String, String>>,
pub libraries: Option<BTreeMap<String, BTreeMap<String, String>>>,
pub ignore: Option<bool>,
modes: Option<Vec<Mode>>,
pub modes: Option<Vec<Mode>>,
pub file_path: Option<PathBuf>,
}
impl Metadata {
/// Returns the modes of this metadata, inserting a default mode if not present.
pub fn modes(&self) -> Vec<Mode> {
/// Returns the solc modes of this metadata, inserting a default mode if not present.
pub fn solc_modes(&self) -> Vec<SolcMode> {
self.modes
.to_owned()
.unwrap_or_else(|| vec![Mode::Solidity(Default::default())])
.iter()
.filter_map(|mode| match mode {
Mode::Solidity(solc_mode) => Some(solc_mode),
Mode::Unknown(mode) => {
log::debug!("compiler: ignoring unknown mode '{mode}'");
None
}
})
.cloned()
.collect()
}
/// Returns the base directory of this metadata.
+1
View File
@@ -11,6 +11,7 @@ rust-version.workspace = true
[dependencies]
anyhow = { workspace = true }
alloy = { workspace = true }
log = { workspace = true }
serde_json = { workspace = true }
revive-dt-node-interaction = { workspace = true }
+55 -25
View File
@@ -1,38 +1,68 @@
use std::sync::{
LazyLock, Mutex,
mpsc::{Receiver, Sender},
//! This crate implements concurrent handling of testing node.
use std::{
fs::read_to_string,
sync::atomic::{AtomicUsize, Ordering},
thread,
};
pub trait NodePool<T: Node> {
fn access() -> &'static LazyLock<Mutex<Vec<T>>>;
}
use anyhow::Context;
use revive_dt_config::Arguments;
use crate::Node;
//static POOL: LazyLock<Mutex<Pool<T>>> = LazyLock::new(Default::default);
pub struct Handle<T> {
node: T,
notifier: Sender<()>,
/// The node pool starts one or more [Node] which then can be accessed
/// in a round robbin fasion.
pub struct NodePool<T> {
next: AtomicUsize,
nodes: Vec<T>,
}
pub struct Pool<T> {
request: Receiver<()>,
nodes: usize,
handles: Vec<T>,
}
impl<T> Pool<T>
impl<T> NodePool<T>
where
T: Node,
T: Node + Send + 'static,
{
pub fn spawn() {}
/// Create a new Pool. This will start as many nodes as there are workers in `config`.
pub fn new(config: &Arguments) -> anyhow::Result<Self> {
let nodes = config.workers;
let genesis = read_to_string(&config.genesis_file).context(format!(
"can not read genesis file: {}",
config.genesis_file.display()
))?;
let mut handles = Vec::with_capacity(nodes);
for _ in 0..nodes {
let config = config.clone();
let genesis = genesis.clone();
handles.push(thread::spawn(move || spawn_node::<T>(&config, genesis)));
}
let mut nodes = Vec::with_capacity(nodes);
for handle in handles {
nodes.push(
handle
.join()
.map_err(|error| anyhow::anyhow!("failed to spawn node: {:?}", error))?
.map_err(|error| anyhow::anyhow!("node failed to spawn: {error}"))?,
);
}
Ok(Self {
nodes,
next: Default::default(),
})
}
/// Get a handle to the next node.
pub fn round_robbin(&self) -> &T {
let current = self.next.fetch_add(1, Ordering::SeqCst) % self.nodes.len();
self.nodes.get(current).unwrap()
}
}
// spawner: loops on a queue
pub fn get_handle<T: Node + NodePool<T>>(config: &Arguments) -> Receiver<T> {
todo!()
fn spawn_node<T: Node + Send>(args: &Arguments, genesis: String) -> anyhow::Result<T> {
let mut node = T::new(&args);
log::info!("starting node: {}", node.connection_string());
node.spawn(genesis)?;
Ok(node)
}
+1
View File
@@ -25,6 +25,7 @@ pub(crate) fn get_or_download(
let mut cache = SOLC_CACHER.lock().unwrap();
if cache.contains(&target_file) {
log::debug!("using cached solc: {}", target_file.display());
return Ok(target_file);
}