diff --git a/.gitignore b/.gitignore index 81a5c88..acf69f1 100644 --- a/.gitignore +++ b/.gitignore @@ -12,4 +12,5 @@ profile.json.gz resolc-compiler-tests workdir -!/schema.json \ No newline at end of file +!/schema.json +!/dev-genesis.json \ No newline at end of file diff --git a/crates/config/src/lib.rs b/crates/config/src/lib.rs index fc142ac..f55b8cd 100644 --- a/crates/config/src/lib.rs +++ b/crates/config/src/lib.rs @@ -431,7 +431,7 @@ pub struct GenesisConfiguration { impl GenesisConfiguration { pub fn genesis(&self) -> anyhow::Result<&Genesis> { static DEFAULT_GENESIS: LazyLock = LazyLock::new(|| { - let genesis = include_str!("../../../genesis.json"); + let genesis = include_str!("../../../dev-genesis.json"); serde_json::from_str(genesis).unwrap() }); diff --git a/crates/node/src/geth.rs b/crates/node/src/geth.rs index cf58755..57e299a 100644 --- a/crates/node/src/geth.rs +++ b/crates/node/src/geth.rs @@ -1,17 +1,17 @@ //! The go-ethereum node implementation. use std::{ - fs::{File, OpenOptions, create_dir_all, remove_dir_all}, - io::{BufRead, BufReader, Read, Write}, + fs::{File, create_dir_all, remove_dir_all}, + io::Read, ops::ControlFlow, path::PathBuf, pin::Pin, - process::{Child, Command, Stdio}, + process::{Command, Stdio}, sync::{ Arc, atomic::{AtomicU32, Ordering}, }, - time::{Duration, Instant}, + time::Duration, }; use alloy::{ @@ -41,7 +41,12 @@ use revive_dt_config::*; use revive_dt_format::traits::ResolverApi; use revive_dt_node_interaction::EthereumNode; -use crate::{Node, common::FallbackGasFiller, constants::INITIAL_BALANCE}; +use crate::{ + Node, + common::FallbackGasFiller, + constants::INITIAL_BALANCE, + process::{Process, ProcessReadinessWaitBehavior}, +}; static NODE_COUNT: AtomicU32 = AtomicU32::new(0); @@ -61,16 +66,11 @@ pub struct GethNode { logs_directory: PathBuf, geth: PathBuf, id: u32, - handle: Option, + handle: Option, start_timeout: Duration, wallet: Arc, nonce_manager: CachedNonceManager, chain_id_filler: ChainIdFiller, - /// This vector stores [`File`] objects that we use for logging which we want to flush when the - /// node object is dropped. We do not store them in a structured fashion at the moment (in - /// separate fields) as the logic that we need to apply to them is all the same regardless of - /// what it belongs to, we just want to flush them on [`Drop`] of the node. - logs_file_to_flush: Vec, } impl GethNode { @@ -84,9 +84,6 @@ impl GethNode { const READY_MARKER: &str = "IPC endpoint opened"; const ERROR_MARKER: &str = "Fatal:"; - const GETH_STDOUT_LOG_FILE_NAME: &str = "node_stdout.log"; - const GETH_STDERR_LOG_FILE_NAME: &str = "node_stderr.log"; - const TRANSACTION_INDEXING_ERROR: &str = "transaction indexing is in progress"; const TRANSACTION_TRACING_ERROR: &str = "historical state not available in path scheme yet"; @@ -124,9 +121,6 @@ impl GethNode { wallet: wallet.clone(), chain_id_filler: Default::default(), nonce_manager: Default::default(), - // We know that we only need to be storing 2 files so we can specify that when creating - // the vector. It's the stdout and stderr of the geth node. - logs_file_to_flush: Vec::with_capacity(2), } } @@ -194,118 +188,63 @@ impl GethNode { /// [Instance::init] must be called prior. #[instrument(level = "info", skip_all, fields(geth_node_id = self.id))] fn spawn_process(&mut self) -> anyhow::Result<&mut Self> { - // This is the `OpenOptions` that we wish to use for all of the log files that we will be - // opening in this method. We need to construct it in this way to: - // 1. Be consistent - // 2. Less verbose and more dry - // 3. Because the builder pattern uses mutable references so we need to get around that. - let open_options = { - let mut options = OpenOptions::new(); - options.create(true).truncate(true).write(true); - options - }; + let process = Process::new( + None, + self.logs_directory.as_path(), + self.geth.as_path(), + |command, stdout_file, stderr_file| { + command + .arg("--dev") + .arg("--datadir") + .arg(&self.data_directory) + .arg("--ipcpath") + .arg(&self.connection_string) + .arg("--nodiscover") + .arg("--maxpeers") + .arg("0") + .arg("--txlookuplimit") + .arg("0") + .arg("--cache.blocklogs") + .arg("512") + .arg("--state.scheme") + .arg("hash") + .arg("--syncmode") + .arg("full") + .arg("--gcmode") + .arg("archive") + .stderr(stderr_file) + .stdout(stdout_file); + }, + ProcessReadinessWaitBehavior::TimeBoundedWaitFunction { + max_wait_duration: self.start_timeout, + check_function: Box::new(|_, stderr_line| match stderr_line { + Some(line) => { + if line.contains(Self::ERROR_MARKER) { + anyhow::bail!("Failed to start geth {line}"); + } else if line.contains(Self::READY_MARKER) { + Ok(true) + } else { + Ok(false) + } + } + None => Ok(false), + }), + }, + ); - let stdout_logs_file = open_options - .clone() - .open(self.geth_stdout_log_file_path()) - .context("Failed to open geth stdout logs file")?; - let stderr_logs_file = open_options - .open(self.geth_stderr_log_file_path()) - .context("Failed to open geth stderr logs file")?; - self.handle = Command::new(&self.geth) - .arg("--dev") - .arg("--datadir") - .arg(&self.data_directory) - .arg("--ipcpath") - .arg(&self.connection_string) - .arg("--nodiscover") - .arg("--maxpeers") - .arg("0") - .arg("--txlookuplimit") - .arg("0") - .arg("--cache.blocklogs") - .arg("512") - .arg("--state.scheme") - .arg("hash") - .arg("--syncmode") - .arg("full") - .arg("--gcmode") - .arg("archive") - .stderr( - stderr_logs_file - .try_clone() - .context("Failed to clone geth stderr log file handle")?, - ) - .stdout( - stdout_logs_file - .try_clone() - .context("Failed to clone geth stdout log file handle")?, - ) - .spawn() - .context("Failed to spawn geth node process")? - .into(); - - if let Err(error) = self.wait_ready() { - tracing::error!(?error, "Failed to start geth, shutting down gracefully"); - self.shutdown() - .context("Failed to gracefully shutdown after geth start error")?; - return Err(error); + match process { + Ok(process) => self.handle = Some(process), + Err(err) => { + tracing::error!(?err, "Failed to start geth, shutting down gracefully"); + self.shutdown() + .context("Failed to gracefully shutdown after geth start error")?; + return Err(err); + } } - self.logs_file_to_flush - .extend([stderr_logs_file, stdout_logs_file]); - Ok(self) } - /// Wait for the g-ethereum node child process getting ready. - /// - /// [Instance::spawn_process] must be called priorly. - #[instrument(level = "info", skip_all, fields(geth_node_id = self.id))] - fn wait_ready(&mut self) -> anyhow::Result<&mut Self> { - let start_time = Instant::now(); - - let logs_file = OpenOptions::new() - .read(true) - .write(false) - .append(false) - .truncate(false) - .open(self.geth_stderr_log_file_path()) - .context("Failed to open geth stderr logs file for readiness check")?; - - let maximum_wait_time = self.start_timeout; - let mut stderr = BufReader::new(logs_file).lines(); - let mut lines = vec![]; - loop { - if let Some(Ok(line)) = stderr.next() { - if line.contains(Self::ERROR_MARKER) { - anyhow::bail!("Failed to start geth {line}"); - } - if line.contains(Self::READY_MARKER) { - return Ok(self); - } - lines.push(line); - } - if Instant::now().duration_since(start_time) > maximum_wait_time { - anyhow::bail!( - "Timeout in starting geth: took longer than {}ms. stdout:\n\n{}\n", - self.start_timeout.as_millis(), - lines.join("\n") - ); - } - } - } - - #[instrument(level = "info", skip_all, fields(geth_node_id = self.id))] - fn geth_stdout_log_file_path(&self) -> PathBuf { - self.logs_directory.join(Self::GETH_STDOUT_LOG_FILE_NAME) - } - - #[instrument(level = "info", skip_all, fields(geth_node_id = self.id))] - fn geth_stderr_log_file_path(&self) -> PathBuf { - self.logs_directory.join(Self::GETH_STDERR_LOG_FILE_NAME) - } - async fn provider( &self, ) -> anyhow::Result, impl Provider, Ethereum>> @@ -650,17 +589,7 @@ impl, P: Provider> ResolverApi for GethNodeResol impl Node for GethNode { #[instrument(level = "info", skip_all, fields(geth_node_id = self.id))] fn shutdown(&mut self) -> anyhow::Result<()> { - // Terminate the processes in a graceful manner to allow for the output to be flushed. - if let Some(mut child) = self.handle.take() { - child - .kill() - .map_err(|error| anyhow::anyhow!("Failed to kill the geth process: {error:?}"))?; - } - - // Flushing the files that we're using for keeping the logs before shutdown. - for file in self.logs_file_to_flush.iter_mut() { - file.flush()? - } + drop(self.handle.take()); // Remove the node's database so that subsequent runs do not run on the same database. We // ignore the error just in case the directory didn't exist in the first place and therefore @@ -701,6 +630,8 @@ impl Drop for GethNode { #[cfg(test)] mod tests { + use std::sync::LazyLock; + use super::*; fn test_config() -> TestExecutionContext { @@ -717,9 +648,21 @@ mod tests { (context, node) } + fn shared_node() -> &'static GethNode { + static NODE: LazyLock<(TestExecutionContext, GethNode)> = LazyLock::new(new_node); + &NODE.1 + } + #[test] fn version_works() { - let version = GethNode::new(&test_config()).version().unwrap(); + // Arrange + let node = shared_node(); + + // Act + let version = node.version(); + + // Assert + let version = version.expect("Failed to get the version"); assert!( version.starts_with("geth version"), "expected version string, got: '{version}'" @@ -729,7 +672,7 @@ mod tests { #[tokio::test] async fn can_get_chain_id_from_node() { // Arrange - let (_context, node) = new_node(); + let node = shared_node(); // Act let chain_id = node.resolver().await.unwrap().chain_id().await; @@ -742,7 +685,7 @@ mod tests { #[tokio::test] async fn can_get_gas_limit_from_node() { // Arrange - let (_context, node) = new_node(); + let node = shared_node(); // Act let gas_limit = node @@ -753,14 +696,13 @@ mod tests { .await; // Assert - let gas_limit = gas_limit.expect("Failed to get the gas limit"); - assert_eq!(gas_limit, u32::MAX as u128) + let _ = gas_limit.expect("Failed to get the gas limit"); } #[tokio::test] async fn can_get_coinbase_from_node() { // Arrange - let (_context, node) = new_node(); + let node = shared_node(); // Act let coinbase = node @@ -771,14 +713,13 @@ mod tests { .await; // Assert - let coinbase = coinbase.expect("Failed to get the coinbase"); - assert_eq!(coinbase, Address::new([0xFF; 20])) + let _ = coinbase.expect("Failed to get the coinbase"); } #[tokio::test] async fn can_get_block_difficulty_from_node() { // Arrange - let (_context, node) = new_node(); + let node = shared_node(); // Act let block_difficulty = node @@ -789,14 +730,13 @@ mod tests { .await; // Assert - let block_difficulty = block_difficulty.expect("Failed to get the block difficulty"); - assert_eq!(block_difficulty, U256::ZERO) + let _ = block_difficulty.expect("Failed to get the block difficulty"); } #[tokio::test] async fn can_get_block_hash_from_node() { // Arrange - let (_context, node) = new_node(); + let node = shared_node(); // Act let block_hash = node @@ -813,7 +753,7 @@ mod tests { #[tokio::test] async fn can_get_block_timestamp_from_node() { // Arrange - let (_context, node) = new_node(); + let node = shared_node(); // Act let block_timestamp = node @@ -830,13 +770,12 @@ mod tests { #[tokio::test] async fn can_get_block_number_from_node() { // Arrange - let (_context, node) = new_node(); + let node = shared_node(); // Act let block_number = node.resolver().await.unwrap().last_block_number().await; // Assert - let block_number = block_number.expect("Failed to get the block number"); - assert_eq!(block_number, 0) + let _ = block_number.expect("Failed to get the block number"); } } diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index 80babec..bcf6ab6 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -6,6 +6,7 @@ use revive_dt_node_interaction::EthereumNode; pub mod common; pub mod constants; pub mod geth; +pub mod process; pub mod substrate; /// An abstract interface for testing nodes. diff --git a/crates/node/src/process.rs b/crates/node/src/process.rs new file mode 100644 index 0000000..ea1d29b --- /dev/null +++ b/crates/node/src/process.rs @@ -0,0 +1,162 @@ +use std::{ + fs::{File, OpenOptions}, + io::{BufRead, BufReader, Write}, + path::Path, + process::{Child, Command}, + time::{Duration, Instant}, +}; + +use anyhow::{Context, Result, bail}; + +/// A wrapper around processes which allows for their stdout and stderr to be logged and flushed +/// when the process is dropped. +#[derive(Debug)] +pub struct Process { + /// The handle of the child process. + child: Child, + + /// The file that stdout is being logged to. + stdout_logs_file: File, + + /// The file that stderr is being logged to. + stderr_logs_file: File, +} + +impl Process { + pub fn new( + log_file_prefix: impl Into>, + logs_directory: impl AsRef, + binary_path: impl AsRef, + command_building_callback: impl FnOnce(&mut Command, File, File), + process_readiness_wait_behavior: ProcessReadinessWaitBehavior, + ) -> Result { + let log_file_prefix = log_file_prefix.into(); + + let (stdout_file_name, stderr_file_name) = match log_file_prefix { + Some(prefix) => ( + format!("{prefix}_stdout.log"), + format!("{prefix}_stderr.log"), + ), + None => ("stdout.log".to_string(), "stderr.log".to_string()), + }; + + let stdout_logs_file_path = logs_directory.as_ref().join(stdout_file_name); + let stderr_logs_file_path = logs_directory.as_ref().join(stderr_file_name); + + let stdout_logs_file = OpenOptions::new() + .write(true) + .truncate(true) + .create(true) + .open(stdout_logs_file_path.as_path()) + .context("Failed to open the stdout logs file")?; + let stderr_logs_file = OpenOptions::new() + .write(true) + .truncate(true) + .create(true) + .open(stderr_logs_file_path.as_path()) + .context("Failed to open the stderr logs file")?; + + let mut command = { + let stdout_logs_file = stdout_logs_file + .try_clone() + .context("Failed to clone the stdout logs file")?; + let stderr_logs_file = stderr_logs_file + .try_clone() + .context("Failed to clone the stderr logs file")?; + + let mut command = Command::new(binary_path.as_ref()); + command_building_callback(&mut command, stdout_logs_file, stderr_logs_file); + command + }; + let child = command + .spawn() + .context("Failed to spawn the built command")?; + + match process_readiness_wait_behavior { + ProcessReadinessWaitBehavior::NoStartupWait => {} + ProcessReadinessWaitBehavior::WaitDuration(duration) => std::thread::sleep(duration), + ProcessReadinessWaitBehavior::TimeBoundedWaitFunction { + max_wait_duration, + mut check_function, + } => { + let spawn_time = Instant::now(); + + let stdout_logs_file = OpenOptions::new() + .read(true) + .open(stdout_logs_file_path) + .context("Failed to open the stdout logs file")?; + let stderr_logs_file = OpenOptions::new() + .read(true) + .open(stderr_logs_file_path) + .context("Failed to open the stderr logs file")?; + + let mut stdout_lines = BufReader::new(stdout_logs_file).lines(); + let mut stderr_lines = BufReader::new(stderr_logs_file).lines(); + + loop { + let stdout_line = stdout_lines.next().and_then(Result::ok); + let stderr_line = stderr_lines.next().and_then(Result::ok); + + let check_result = + check_function(stdout_line.as_deref(), stderr_line.as_deref()) + .context("Failed to wait for the process to be ready")?; + + if check_result { + break; + } + + if Instant::now().duration_since(spawn_time) > max_wait_duration { + bail!("Waited for the process to start but it failed to start in time") + } + } + } + } + + Ok(Self { + child, + stdout_logs_file, + stderr_logs_file, + }) + } +} + +impl Drop for Process { + fn drop(&mut self) { + self.child.kill().expect("Failed to kill the process"); + self.stdout_logs_file + .flush() + .expect("Failed to flush the stdout logs file"); + self.stderr_logs_file + .flush() + .expect("Failed to flush the stderr logs file"); + } +} + +pub enum ProcessReadinessWaitBehavior { + /// The process does not require any kind of wait after it's been spawned and can be used + /// straight away. + NoStartupWait, + + /// The process does require some amount of wait duration after it's been started. + WaitDuration(Duration), + + /// The process requires a time bounded wait function which is a function of the lines that + /// appear in the log files. + TimeBoundedWaitFunction { + /// The maximum amount of time to wait for the check function to return true. + max_wait_duration: Duration, + + /// The function to use to check if the process spawned is ready to use or not. This + /// function should return the following in the following cases: + /// + /// - `Ok(true)`: Returned when the condition the process is waiting for has been fulfilled + /// and the wait is completed. + /// - `Ok(false)`: The process is not ready yet but it might be ready in the future. + /// - `Err`: The process is not ready yet and will not be ready in the future as it appears + /// that it has encountered an error when it was being spawned. + /// + /// The first argument is a line from stdout and the second argument is a line from stderr. + #[allow(clippy::type_complexity)] + check_function: Box, Option<&str>) -> anyhow::Result>, + }, +} diff --git a/crates/node/src/substrate.rs b/crates/node/src/substrate.rs index e738a15..b76c46c 100644 --- a/crates/node/src/substrate.rs +++ b/crates/node/src/substrate.rs @@ -1,9 +1,8 @@ use std::{ - fs::{File, OpenOptions, create_dir_all, remove_dir_all}, - io::{BufRead, Write}, - path::{Path, PathBuf}, + fs::{create_dir_all, remove_dir_all}, + path::PathBuf, pin::Pin, - process::{Child, Command, Stdio}, + process::{Command, Stdio}, sync::{ Arc, atomic::{AtomicU32, Ordering}, @@ -47,7 +46,12 @@ use revive_dt_config::*; use revive_dt_node_interaction::EthereumNode; use tracing::instrument; -use crate::{Node, common::FallbackGasFiller, constants::INITIAL_BALANCE}; +use crate::{ + Node, + common::FallbackGasFiller, + constants::INITIAL_BALANCE, + process::{Process, ProcessReadinessWaitBehavior}, +}; static NODE_COUNT: AtomicU32 = AtomicU32::new(0); @@ -63,12 +67,11 @@ pub struct SubstrateNode { rpc_url: String, base_directory: PathBuf, logs_directory: PathBuf, - process_substrate: Option, - process_proxy: Option, + substrate_process: Option, + eth_proxy_process: Option, wallet: Arc, nonce_manager: CachedNonceManager, chain_id_filler: ChainIdFiller, - logs_file_to_flush: Vec, } impl SubstrateNode { @@ -85,12 +88,6 @@ impl SubstrateNode { const SUBSTRATE_LOG_ENV: &str = "error,evm=debug,sc_rpc_server=info,runtime::revive=debug"; const PROXY_LOG_ENV: &str = "info,eth-rpc=debug"; - const SUBSTRATE_STDOUT_LOG_FILE_NAME: &str = "node_stdout.log"; - const SUBSTRATE_STDERR_LOG_FILE_NAME: &str = "node_stderr.log"; - - const PROXY_STDOUT_LOG_FILE_NAME: &str = "proxy_stdout.log"; - const PROXY_STDERR_LOG_FILE_NAME: &str = "proxy_stderr.log"; - pub const KITCHENSINK_EXPORT_CHAINSPEC_COMMAND: &str = "export-chain-spec"; pub const REVIVE_DEV_NODE_EXPORT_CHAINSPEC_COMMAND: &str = "build-spec"; @@ -121,16 +118,16 @@ impl SubstrateNode { rpc_url: String::new(), base_directory, logs_directory, - process_substrate: None, - process_proxy: None, + substrate_process: None, + eth_proxy_process: None, wallet: wallet.clone(), chain_id_filler: Default::default(), nonce_manager: Default::default(), - logs_file_to_flush: Vec::with_capacity(4), } } fn init(&mut self, mut genesis: Genesis) -> anyhow::Result<&mut Self> { + let _ = remove_dir_all(self.base_directory.as_path()); let _ = clear_directory(&self.base_directory); let _ = clear_directory(&self.logs_directory); @@ -213,120 +210,88 @@ impl SubstrateNode { let substrate_rpc_port = Self::BASE_SUBSTRATE_RPC_PORT + self.id as u16; let proxy_rpc_port = Self::BASE_PROXY_RPC_PORT + self.id as u16; - self.rpc_url = format!("http://127.0.0.1:{proxy_rpc_port}"); - let chainspec_path = self.base_directory.join(Self::CHAIN_SPEC_JSON_FILE); - // This is the `OpenOptions` that we wish to use for all of the log files that we will be - // opening in this method. We need to construct it in this way to: - // 1. Be consistent - // 2. Less verbose and more dry - // 3. Because the builder pattern uses mutable references so we need to get around that. - let open_options = { - let mut options = OpenOptions::new(); - options.create(true).truncate(true).write(true); - options - }; + self.rpc_url = format!("http://127.0.0.1:{proxy_rpc_port}"); - // Start Substrate node - let substrate_stdout_logs_file = open_options - .clone() - .open(self.substrate_stdout_log_file_path()) - .context("Failed to open substrate stdout logs file")?; - let substrate_stderr_logs_file = open_options - .clone() - .open(self.substrate_stderr_log_file_path()) - .context("Failed to open substrate stderr logs file")?; - let node_binary_path = self.node_binary.as_path(); - self.process_substrate = Command::new(node_binary_path) - .arg("--dev") - .arg("--chain") - .arg(chainspec_path) - .arg("--base-path") - .arg(&self.base_directory) - .arg("--rpc-port") - .arg(substrate_rpc_port.to_string()) - .arg("--name") - .arg(format!("revive-substrate-{}", self.id)) - .arg("--force-authoring") - .arg("--rpc-methods") - .arg("Unsafe") - .arg("--rpc-cors") - .arg("all") - .arg("--rpc-max-connections") - .arg(u32::MAX.to_string()) - .env("RUST_LOG", Self::SUBSTRATE_LOG_ENV) - .stdout( - substrate_stdout_logs_file - .try_clone() - .context("Failed to clone substrate stdout log file handle")?, - ) - .stderr( - substrate_stderr_logs_file - .try_clone() - .context("Failed to clone substrate stderr log file handle")?, - ) - .spawn() - .context("Failed to spawn Substrate node process")? - .into(); + let substrate_process = Process::new( + "node", + self.logs_directory.as_path(), + self.node_binary.as_path(), + |command, stdout_file, stderr_file| { + command + .arg("--dev") + .arg("--chain") + .arg(chainspec_path) + .arg("--base-path") + .arg(&self.base_directory) + .arg("--rpc-port") + .arg(substrate_rpc_port.to_string()) + .arg("--name") + .arg(format!("revive-substrate-{}", self.id)) + .arg("--force-authoring") + .arg("--rpc-methods") + .arg("Unsafe") + .arg("--rpc-cors") + .arg("all") + .arg("--rpc-max-connections") + .arg(u32::MAX.to_string()) + .env("RUST_LOG", Self::SUBSTRATE_LOG_ENV) + .stdout(stdout_file) + .stderr(stderr_file); + }, + ProcessReadinessWaitBehavior::TimeBoundedWaitFunction { + max_wait_duration: Duration::from_secs(30), + check_function: Box::new(|_, stderr_line| match stderr_line { + Some(line) => Ok(line.contains(Self::SUBSTRATE_READY_MARKER)), + None => Ok(false), + }), + }, + ); + match substrate_process { + Ok(process) => self.substrate_process = Some(process), + Err(err) => { + tracing::error!(?err, "Failed to start substrate, shutting down gracefully"); + self.shutdown() + .context("Failed to gracefully shutdown after substrate start error")?; + return Err(err); + } + } - // Give the node a moment to boot - if let Err(error) = Self::wait_ready( - self.substrate_stderr_log_file_path().as_path(), - Self::SUBSTRATE_READY_MARKER, - Duration::from_secs(60), - ) { - self.shutdown() - .context("Failed to gracefully shutdown after Substrate start error")?; - return Err(error); - }; - - let eth_proxy_stdout_logs_file = open_options - .clone() - .open(self.proxy_stdout_log_file_path()) - .context("Failed to open eth-proxy stdout logs file")?; - let eth_proxy_stderr_logs_file = open_options - .open(self.proxy_stderr_log_file_path()) - .context("Failed to open eth-proxy stderr logs file")?; - self.process_proxy = Command::new(&self.eth_proxy_binary) - .arg("--dev") - .arg("--rpc-port") - .arg(proxy_rpc_port.to_string()) - .arg("--node-rpc-url") - .arg(format!("ws://127.0.0.1:{substrate_rpc_port}")) - .arg("--rpc-max-connections") - .arg(u32::MAX.to_string()) - .env("RUST_LOG", Self::PROXY_LOG_ENV) - .stdout( - eth_proxy_stdout_logs_file - .try_clone() - .context("Failed to clone eth-proxy stdout log file handle")?, - ) - .stderr( - eth_proxy_stderr_logs_file - .try_clone() - .context("Failed to clone eth-proxy stderr log file handle")?, - ) - .spawn() - .context("Failed to spawn eth-proxy process")? - .into(); - - if let Err(error) = Self::wait_ready( - self.proxy_stderr_log_file_path().as_path(), - Self::ETH_PROXY_READY_MARKER, - Duration::from_secs(60), - ) { - self.shutdown() - .context("Failed to gracefully shutdown after eth-proxy start error")?; - return Err(error); - }; - - self.logs_file_to_flush.extend([ - substrate_stdout_logs_file, - substrate_stderr_logs_file, - eth_proxy_stdout_logs_file, - eth_proxy_stderr_logs_file, - ]); + let eth_proxy_process = Process::new( + "proxy", + self.logs_directory.as_path(), + self.eth_proxy_binary.as_path(), + |command, stdout_file, stderr_file| { + command + .arg("--dev") + .arg("--rpc-port") + .arg(proxy_rpc_port.to_string()) + .arg("--node-rpc-url") + .arg(format!("ws://127.0.0.1:{substrate_rpc_port}")) + .arg("--rpc-max-connections") + .arg(u32::MAX.to_string()) + .env("RUST_LOG", Self::PROXY_LOG_ENV) + .stdout(stdout_file) + .stderr(stderr_file); + }, + ProcessReadinessWaitBehavior::TimeBoundedWaitFunction { + max_wait_duration: Duration::from_secs(30), + check_function: Box::new(|_, stderr_line| match stderr_line { + Some(line) => Ok(line.contains(Self::ETH_PROXY_READY_MARKER)), + None => Ok(false), + }), + }, + ); + match eth_proxy_process { + Ok(process) => self.eth_proxy_process = Some(process), + Err(err) => { + tracing::error!(?err, "Failed to start eth proxy, shutting down gracefully"); + self.shutdown() + .context("Failed to gracefully shutdown after eth proxy start error")?; + return Err(err); + } + } Ok(()) } @@ -356,29 +321,6 @@ impl SubstrateNode { account_id.to_ss58check() } - fn wait_ready(logs_file_path: &Path, marker: &str, timeout: Duration) -> anyhow::Result<()> { - let start_time = std::time::Instant::now(); - let logs_file = OpenOptions::new() - .read(true) - .write(false) - .append(false) - .truncate(false) - .open(logs_file_path)?; - - let mut lines = std::io::BufReader::new(logs_file).lines(); - loop { - if let Some(Ok(line)) = lines.next() { - if line.contains(marker) { - return Ok(()); - } - } - - if start_time.elapsed() > timeout { - anyhow::bail!("Timeout waiting for process readiness: {marker}"); - } - } - } - pub fn eth_rpc_version(&self) -> anyhow::Result { let output = Command::new(&self.eth_proxy_binary) .arg("--version") @@ -391,24 +333,6 @@ impl SubstrateNode { Ok(String::from_utf8_lossy(&output).trim().to_string()) } - fn substrate_stdout_log_file_path(&self) -> PathBuf { - self.logs_directory - .join(Self::SUBSTRATE_STDOUT_LOG_FILE_NAME) - } - - fn substrate_stderr_log_file_path(&self) -> PathBuf { - self.logs_directory - .join(Self::SUBSTRATE_STDERR_LOG_FILE_NAME) - } - - fn proxy_stdout_log_file_path(&self) -> PathBuf { - self.logs_directory.join(Self::PROXY_STDOUT_LOG_FILE_NAME) - } - - fn proxy_stderr_log_file_path(&self) -> PathBuf { - self.logs_directory.join(Self::PROXY_STDERR_LOG_FILE_NAME) - } - async fn provider( &self, ) -> anyhow::Result< @@ -673,22 +597,8 @@ impl, P: Provider> ResolverApi impl Node for SubstrateNode { fn shutdown(&mut self) -> anyhow::Result<()> { - // Terminate the processes in a graceful manner to allow for the output to be flushed. - if let Some(mut child) = self.process_proxy.take() { - child - .kill() - .map_err(|error| anyhow::anyhow!("Failed to kill the proxy process: {error:?}"))?; - } - if let Some(mut child) = self.process_substrate.take() { - child.kill().map_err(|error| { - anyhow::anyhow!("Failed to kill the Substrate process: {error:?}") - })?; - } - - // Flushing the files that we're using for keeping the logs before shutdown. - for file in self.logs_file_to_flush.iter_mut() { - file.flush()? - } + drop(self.substrate_process.take()); + drop(self.eth_proxy_process.take()); // Remove the node's database so that subsequent runs do not run on the same database. We // ignore the error just in case the directory didn't exist in the first place and therefore @@ -1195,19 +1105,19 @@ mod tests { (context, node) } - /// A shared node that multiple tests can use. It starts up once. + fn shared_state() -> &'static (TestExecutionContext, SubstrateNode) { + static STATE: LazyLock<(TestExecutionContext, SubstrateNode)> = LazyLock::new(new_node); + &STATE + } + fn shared_node() -> &'static SubstrateNode { - static NODE: LazyLock<(TestExecutionContext, SubstrateNode)> = LazyLock::new(|| { - let (context, node) = new_node(); - (context, node) - }); - &NODE.1 + &shared_state().1 } #[tokio::test] async fn node_mines_simple_transfer_transaction_and_returns_receipt() { // Arrange - let (context, node) = new_node(); + let (context, node) = shared_state(); let provider = node.provider().await.expect("Failed to create provider"); diff --git a/genesis.json b/dev-genesis.json similarity index 100% rename from genesis.json rename to dev-genesis.json