diff --git a/Cargo.lock b/Cargo.lock index 9d729db..9a4c924 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4001,6 +4001,7 @@ dependencies = [ "serde_json", "sp-core", "sp-runtime", + "subprocess", "temp-dir", "tracing", ] @@ -5041,6 +5042,16 @@ dependencies = [ "syn 2.0.101", ] +[[package]] +name = "subprocess" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c2e86926081dda636c546d8c5e641661049d7562a68f5488be4a1f7f66f6086" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "substrate-bip39" version = "0.6.0" diff --git a/Cargo.toml b/Cargo.toml index a28e194..6e139e8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,6 +50,7 @@ tracing-subscriber = { version = "0.3.19", default-features = false, features = "json", "env-filter", ] } +subprocess = { version = "0.2.9" } # revive compiler revive-solc-json-interface = { git = "https://github.com/paritytech/revive", rev = "3389865af7c3ff6f29a586d82157e8bc573c1a8e" } diff --git a/crates/node/Cargo.toml b/crates/node/Cargo.toml index 1a9dcb0..9d3b98b 100644 --- a/crates/node/Cargo.toml +++ b/crates/node/Cargo.toml @@ -12,6 +12,7 @@ rust-version.workspace = true anyhow = { workspace = true } alloy = { workspace = true } tracing = { workspace = true } +subprocess = { workspace = true } revive-dt-node-interaction = { workspace = true } revive-dt-config = { workspace = true } diff --git a/crates/node/src/geth.rs b/crates/node/src/geth.rs index c40b18f..cf40116 100644 --- a/crates/node/src/geth.rs +++ b/crates/node/src/geth.rs @@ -2,15 +2,14 @@ use std::{ collections::HashMap, - fs::{File, create_dir_all, remove_dir_all}, + fs::{File, OpenOptions, create_dir_all}, io::{BufRead, BufReader, Read, Write}, path::PathBuf, - process::{Child, Command, Stdio}, + process::{Command, Stdio}, sync::{ Mutex, atomic::{AtomicU32, Ordering}, }, - thread, time::{Duration, Instant}, }; @@ -28,6 +27,7 @@ use revive_dt_node_interaction::{ EthereumNode, nonce::fetch_onchain_nonce, trace::trace_transaction, transaction::execute_transaction, }; +use subprocess::{Exec, Popen}; use crate::Node; @@ -47,7 +47,7 @@ pub struct Instance { data_directory: PathBuf, geth: PathBuf, id: u32, - handle: Option, + handle: Option, network_id: u64, start_timeout: u64, wallet: EthereumWallet, @@ -65,8 +65,10 @@ impl Instance { const ERROR_MARKER: &str = "Fatal:"; /// Create the node directory and call `geth init` to configure the genesis. + #[tracing::instrument(skip_all, fields(geth_node_id = self.id))] fn init(&mut self, genesis: String) -> anyhow::Result<&mut Self> { create_dir_all(&self.base_directory)?; + create_dir_all(&self.base_directory.join("logs"))?; let genesis_path = self.base_directory.join(Self::GENESIS_JSON_FILE); File::create(&genesis_path)?.write_all(genesis.as_bytes())?; @@ -96,9 +98,19 @@ impl Instance { /// Spawn the go-ethereum node child process. /// - /// [Instance::init] must be called priorly. + /// [Instance::init] must be called prior. + #[tracing::instrument(skip_all, fields(geth_node_id = self.id))] fn spawn_process(&mut self) -> anyhow::Result<&mut Self> { - self.handle = Command::new(&self.geth) + let node_logs_file_path = self.base_directory.join("logs").join("node.log"); + let node_logs_file = OpenOptions::new() + // Options to re-create and re-write to the file starting at offset zero. We do not want + // to re-use log files between runs. Users that want to keep their log files should pass + // in a different working directory between runs. + .create(true) + .truncate(true) + .write(true) + .open(&node_logs_file_path)?; + self.handle = Exec::cmd(&self.geth) .arg("--dev") .arg("--datadir") .arg(&self.data_directory) @@ -109,49 +121,58 @@ impl Instance { .arg("--nodiscover") .arg("--maxpeers") .arg("0") - .stderr(Stdio::piped()) - .stdout(Stdio::null()) - .spawn()? + // We pipe both stdout and stderr to the same log file and therefore we're persisting + // both. In the implementation of [`std::fs::File`] the `try_clone` method will ensure + // that both [`std::fs::File`] objects have the same seeks and offsets and therefore we + // don't have to worry about either streams overriding each other. + .stderr(node_logs_file.try_clone()?) + .stdout(node_logs_file) + .popen()? .into(); + + if let Err(error) = self.wait_ready() { + tracing::error!(?error, "Failed to start geth, shutting down gracefully"); + self.shutdown()?; + return Err(error); + } + Ok(self) } /// Wait for the g-ethereum node child process getting ready. /// /// [Instance::spawn_process] must be called priorly. + #[tracing::instrument(skip_all, fields(geth_node_id = self.id))] fn wait_ready(&mut self) -> anyhow::Result<&mut Self> { - // Thanks clippy but geth is a server; we don't `wait` but eventually kill it. - #[allow(clippy::zombie_processes)] - let mut child = self.handle.take().expect("should be spawned"); let start_time = Instant::now(); - let maximum_wait_time = Duration::from_millis(self.start_timeout); - let mut stderr = BufReader::new(child.stderr.take().expect("should be piped")).lines(); - let error = loop { - let Some(Ok(line)) = stderr.next() else { - break "child process stderr reading error".to_string(); - }; - if line.contains(Self::ERROR_MARKER) { - break line; - } - if line.contains(Self::READY_MARKER) { - // Keep stderr alive - // https://github.com/alloy-rs/alloy/issues/2091#issuecomment-2676134147 - thread::spawn(move || for _ in stderr.by_ref() {}); - self.handle = child.into(); - return Ok(self); + let logs_file = OpenOptions::new() + .read(true) + .write(false) + .append(false) + .truncate(false) + .open(self.base_directory.join("logs").join("node.log"))?; + + let maximum_wait_time = Duration::from_millis(self.start_timeout); + let mut stderr = BufReader::new(logs_file).lines(); + loop { + if let Some(Ok(line)) = stderr.next() { + if line.contains(Self::ERROR_MARKER) { + anyhow::bail!("Failed to start geth {line}"); + } + if line.contains(Self::READY_MARKER) { + return Ok(self); + } } if Instant::now().duration_since(start_time) > maximum_wait_time { - break "spawn timeout".to_string(); + anyhow::bail!("Timeout in starting geth"); } - }; - - let _ = child.kill(); - anyhow::bail!("geth node #{} spawn error: {error}", self.id) + } } } impl EthereumNode for Instance { + #[tracing::instrument(skip_all, fields(geth_node_id = self.id))] fn execute_transaction( &self, transaction: TransactionRequest, @@ -173,6 +194,7 @@ impl EthereumNode for Instance { })) } + #[tracing::instrument(skip_all, fields(geth_node_id = self.id))] fn trace_transaction( &self, transaction: TransactionReceipt, @@ -195,6 +217,7 @@ impl EthereumNode for Instance { })) } + #[tracing::instrument(skip_all, fields(geth_node_id = self.id))] fn state_diff( &self, transaction: alloy::rpc::types::TransactionReceipt, @@ -208,6 +231,7 @@ impl EthereumNode for Instance { } } + #[tracing::instrument(skip_all, fields(geth_node_id = self.id))] fn fetch_add_nonce(&self, address: Address) -> anyhow::Result { let connection_string = self.connection_string.clone(); let wallet = self.wallet.clone(); @@ -242,19 +266,31 @@ impl Node for Instance { } } + #[tracing::instrument(skip_all, fields(geth_node_id = self.id))] fn connection_string(&self) -> String { self.connection_string.clone() } - fn shutdown(self) -> anyhow::Result<()> { + #[tracing::instrument(skip_all, fields(geth_node_id = self.id))] + fn shutdown(&mut self) -> anyhow::Result<()> { + if let Some(mut child) = self.handle.take() { + child.terminate().map_err(|error| { + anyhow::anyhow!("Failed to terminate the geth process: {error:?}") + })?; + child.wait().map_err(|error| { + anyhow::anyhow!("Failed to wait for the termination of the geth process: {error:?}") + })?; + } Ok(()) } + #[tracing::instrument(skip_all, fields(geth_node_id = self.id))] fn spawn(&mut self, genesis: String) -> anyhow::Result<()> { - self.init(genesis)?.spawn_process()?.wait_ready()?; + self.init(genesis)?.spawn_process()?; Ok(()) } + #[tracing::instrument(skip_all, fields(geth_node_id = self.id))] fn version(&self) -> anyhow::Result { let output = Command::new(&self.geth) .arg("--version") @@ -269,13 +305,9 @@ impl Node for Instance { } impl Drop for Instance { + #[tracing::instrument(skip_all, fields(geth_node_id = self.id))] fn drop(&mut self) { - if let Some(child) = self.handle.as_mut() { - let _ = child.kill(); - } - if self.base_directory.exists() { - let _ = remove_dir_all(&self.base_directory); - } + self.shutdown().expect("Failed to shutdown") } } diff --git a/crates/node/src/kitchensink.rs b/crates/node/src/kitchensink.rs index 4f14a18..9ec80ca 100644 --- a/crates/node/src/kitchensink.rs +++ b/crates/node/src/kitchensink.rs @@ -1,9 +1,9 @@ use std::{ collections::HashMap, - fs::create_dir_all, + fs::{OpenOptions, create_dir_all}, io::BufRead, - path::PathBuf, - process::{Child, Command, Stdio}, + path::{Path, PathBuf}, + process::{Command, Stdio}, sync::{ Mutex, atomic::{AtomicU32, Ordering}, @@ -30,6 +30,7 @@ use revive_dt_node_interaction::{ EthereumNode, nonce::fetch_onchain_nonce, trace::trace_transaction, transaction::execute_transaction, }; +use subprocess::{Exec, Popen}; use crate::Node; @@ -43,8 +44,8 @@ pub struct KitchensinkNode { rpc_url: String, wallet: EthereumWallet, base_directory: PathBuf, - process_substrate: Option, - process_proxy: Option, + process_substrate: Option, + process_proxy: Option, nonces: Mutex>, } @@ -59,11 +60,15 @@ impl KitchensinkNode { const SUBSTRATE_LOG_ENV: &str = "error,evm=debug,sc_rpc_server=info,runtime::revive=debug"; const PROXY_LOG_ENV: &str = "info,eth-rpc=debug"; + #[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))] fn init(&mut self, genesis: &str) -> anyhow::Result<&mut Self> { create_dir_all(&self.base_directory)?; + create_dir_all(&self.base_directory.join("logs"))?; let template_chainspec_path = self.base_directory.join(Self::CHAIN_SPEC_JSON_FILE); + // Note: we do not pipe the logs of this process to a separate file since this is just a + // once-off export of the default chain spec and not part of the long-running node process. let output = Command::new(&self.substrate_binary) .arg("export-chain-spec") .arg("--chain") @@ -112,6 +117,7 @@ impl KitchensinkNode { Ok(self) } + #[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))] fn spawn_process(&mut self) -> anyhow::Result<()> { let substrate_rpc_port = Self::BASE_SUBSTRATE_RPC_PORT + self.id as u16; let proxy_rpc_port = Self::BASE_PROXY_RPC_PORT + self.id as u16; @@ -120,8 +126,19 @@ impl KitchensinkNode { let chainspec_path = self.base_directory.join(Self::CHAIN_SPEC_JSON_FILE); + let logs_directory_path = self.base_directory.join("logs"); + // Start Substrate node - let mut substrate_process = Command::new(&self.substrate_binary) + let substrate_logs_file_path = logs_directory_path.join("node.log"); + let substrate_logs_file = OpenOptions::new() + // Options to re-create and re-write to the file starting at offset zero. We do not want + // to re-use log files between runs. Users that want to keep their log files should pass + // in a different working directory between runs. + .create(true) + .truncate(true) + .write(true) + .open(&substrate_logs_file_path)?; + self.process_substrate = Exec::cmd(&self.substrate_binary) .arg("--chain") .arg(chainspec_path) .arg("--base-path") @@ -136,40 +153,67 @@ impl KitchensinkNode { .arg("--rpc-cors") .arg("all") .env("RUST_LOG", Self::SUBSTRATE_LOG_ENV) - .stdout(Stdio::null()) - .stderr(Stdio::piped()) - .spawn()?; + // We pipe both stdout and stderr to the same log file and therefore we're persisting + // both. In the implementation of [`std::fs::File`] the `try_clone` method will ensure + // that both [`std::fs::File`] objects have the same seeks and offsets and therefore we + // don't have to worry about either streams overriding each other. + .stdout(substrate_logs_file.try_clone()?) + .stderr(substrate_logs_file) + .popen()? + .into(); // Give the node a moment to boot - Self::wait_ready( - &mut substrate_process, + if let Err(error) = Self::wait_ready( + substrate_logs_file_path.as_path(), Self::SUBSTRATE_READY_MARKER, Duration::from_secs(30), - )?; + ) { + tracing::error!( + ?error, + "Failed to start substrate, shutting down gracefully" + ); + self.shutdown()?; + return Err(error); + }; - let mut proxy_process = Command::new(&self.eth_proxy_binary) + let proxy_logs_file_path = logs_directory_path.join("proxy.log"); + let proxy_logs_file = OpenOptions::new() + // Options to re-create and re-write to the file starting at offset zero. We do not want + // to re-use log files between runs. Users that want to keep their log files should pass + // in a different working directory between runs. + .create(true) + .truncate(true) + .write(true) + .open(&proxy_logs_file_path)?; + self.process_proxy = Exec::cmd(&self.eth_proxy_binary) .arg("--dev") .arg("--rpc-port") .arg(proxy_rpc_port.to_string()) .arg("--node-rpc-url") .arg(format!("ws://127.0.0.1:{substrate_rpc_port}")) .env("RUST_LOG", Self::PROXY_LOG_ENV) - .stdout(Stdio::null()) - .stderr(Stdio::piped()) - .spawn()?; + // We pipe both stdout and stderr to the same log file and therefore we're persisting + // both. In the implementation of [`std::fs::File`] the `try_clone` method will ensure + // that both [`std::fs::File`] objects have the same seeks and offsets and therefore we + // don't have to worry about either streams overriding each other. + .stdout(proxy_logs_file.try_clone()?) + .stderr(proxy_logs_file) + .popen()? + .into(); - Self::wait_ready( - &mut proxy_process, + if let Err(error) = Self::wait_ready( + proxy_logs_file_path.as_path(), Self::ETH_PROXY_READY_MARKER, Duration::from_secs(30), - )?; - - self.process_substrate = Some(substrate_process); - self.process_proxy = Some(proxy_process); + ) { + tracing::error!(?error, "Failed to start proxy, shutting down gracefully"); + self.shutdown()?; + return Err(error); + }; Ok(()) } - + #[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))] fn extract_balance_from_genesis_file( &self, genesis_str: &str, @@ -210,27 +254,30 @@ impl KitchensinkNode { Ok(account_id.to_ss58check()) } - fn wait_ready(child: &mut Child, marker: &str, timeout: Duration) -> anyhow::Result<()> { + fn wait_ready(logs_file_path: &Path, marker: &str, timeout: Duration) -> anyhow::Result<()> { let start_time = std::time::Instant::now(); - let stderr = child.stderr.take().expect("stderr must be piped"); + let logs_file = OpenOptions::new() + .read(true) + .write(false) + .append(false) + .truncate(false) + .open(logs_file_path)?; - let mut lines = std::io::BufReader::new(stderr).lines(); + let mut lines = std::io::BufReader::new(logs_file).lines(); loop { if let Some(Ok(line)) = lines.next() { - println!("Kitchensink log: {line:?}"); if line.contains(marker) { - std::thread::spawn(move || for _ in lines.by_ref() {}); return Ok(()); } } if start_time.elapsed() > timeout { - let _ = child.kill(); anyhow::bail!("Timeout waiting for process readiness: {marker}"); } } } + #[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))] pub fn eth_rpc_version(&self) -> anyhow::Result { let output = Command::new(&self.eth_proxy_binary) .arg("--version") @@ -245,6 +292,7 @@ impl KitchensinkNode { } impl EthereumNode for KitchensinkNode { + #[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))] fn execute_transaction( &self, transaction: alloy::rpc::types::TransactionRequest, @@ -266,6 +314,7 @@ impl EthereumNode for KitchensinkNode { })) } + #[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))] fn trace_transaction( &self, transaction: TransactionReceipt, @@ -289,6 +338,7 @@ impl EthereumNode for KitchensinkNode { })) } + #[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))] fn state_diff(&self, transaction: TransactionReceipt) -> anyhow::Result { match self .trace_transaction(transaction)? @@ -299,6 +349,7 @@ impl EthereumNode for KitchensinkNode { } } + #[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))] fn fetch_add_nonce(&self, address: Address) -> anyhow::Result { let url = self.rpc_url.clone(); let wallet = self.wallet.clone(); @@ -332,24 +383,42 @@ impl Node for KitchensinkNode { } } + #[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))] fn connection_string(&self) -> String { self.rpc_url.clone() } - fn shutdown(mut self) -> anyhow::Result<()> { + #[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))] + fn shutdown(&mut self) -> anyhow::Result<()> { if let Some(mut child) = self.process_proxy.take() { - let _ = child.kill(); + child.terminate().map_err(|error| { + anyhow::anyhow!("Failed to terminate the proxy process: {error:?}") + })?; + child.wait().map_err(|error| { + anyhow::anyhow!( + "Failed to wait for the termination of the proxy process: {error:?}" + ) + })?; } if let Some(mut child) = self.process_substrate.take() { - let _ = child.kill(); + child.terminate().map_err(|error| { + anyhow::anyhow!("Failed to terminate the substrate process: {error:?}") + })?; + child.wait().map_err(|error| { + anyhow::anyhow!( + "Failed to wait for the termination of the substrate process: {error:?}" + ) + })?; } Ok(()) } + #[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))] fn spawn(&mut self, genesis: String) -> anyhow::Result<()> { self.init(&genesis)?.spawn_process() } + #[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))] fn version(&self) -> anyhow::Result { let output = Command::new(&self.substrate_binary) .arg("--version") @@ -364,13 +433,9 @@ impl Node for KitchensinkNode { } impl Drop for KitchensinkNode { + #[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))] fn drop(&mut self) { - if let Some(mut child) = self.process_proxy.take() { - let _ = child.kill(); - } - if let Some(mut child) = self.process_substrate.take() { - let _ = child.kill(); - } + self.shutdown().expect("Failed to shutdown") } } diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index a293c8a..7552ae6 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -23,7 +23,7 @@ pub trait Node: EthereumNode { /// Prune the node instance and related data. /// /// Blocking until it's completely stopped. - fn shutdown(self) -> anyhow::Result<()>; + fn shutdown(&mut self) -> anyhow::Result<()>; /// Returns the nodes connection string. fn connection_string(&self) -> String;