diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index fb4b7a1..0eeb93f 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -99,9 +99,28 @@ jobs: - name: Install Geth on Ubuntu if: matrix.os == 'ubuntu-24.04' run: | - sudo add-apt-repository -y ppa:ethereum/ethereum sudo apt-get update - sudo apt-get install -y ethereum protobuf-compiler + sudo apt-get install -y protobuf-compiler + + # We were facing some issues in CI with the 1.16.* versions of geth, and specifically on + # Ubuntu. Eventually, we found out that the last version of geth that worked in our CI was + # version 1.15.11. Thus, this is the version that we want to use in CI. The PPA sadly does + # not have historic versions of Geth and therefore we need to resort to downloading pre + # built binaries for Geth and the surrounding tools which is what the following parts of + # the script do. + + sudo apt-get install -y wget ca-certificates tar + ARCH=$(uname -m) + if [ "$ARCH" = "x86_64" ]; then + URL="https://gethstore.blob.core.windows.net/builds/geth-alltools-linux-amd64-1.15.11-36b2371c.tar.gz" + elif [ "$ARCH" = "aarch64" ]; then + URL="https://gethstore.blob.core.windows.net/builds/geth-alltools-linux-arm64-1.15.11-36b2371c.tar.gz" + else + echo "Unsupported architecture: $ARCH" + exit 1 + fi + wget -qO- "$URL" | sudo tar xz -C /usr/local/bin --strip-components=1 + geth --version - name: Install Geth on macOS if: matrix.os == 'macos-14' diff --git a/.gitignore b/.gitignore index 10a2f71..cf26eca 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,7 @@ .DS_Store node_modules /*.json + +# We do not want to commit any log files that we produce from running the code locally so this is +# added to the .gitignore file. +*.log \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 9d729db..42b6164 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -67,9 +67,9 @@ checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" [[package]] name = "alloy" -version = "1.0.9" +version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0093d23bf026b580c1f66ed3a053d8209c104a446c5264d3ad99587f6edef24e" +checksum = "ae58d888221eecf621595e2096836ce7cfc37be06bfa39d7f64aa6a3ea4c9e5b" dependencies = [ "alloy-consensus", "alloy-contract", @@ -162,9 +162,9 @@ dependencies = [ [[package]] name = "alloy-core" -version = "1.1.2" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3c5a28f166629752f2e7246b813cdea3243cca59aab2d4264b1fd68392c10eb" +checksum = "ad31216895d27d307369daa1393f5850b50bbbd372478a9fa951c095c210627e" dependencies = [ "alloy-dyn-abi", "alloy-json-abi", @@ -175,9 +175,9 @@ dependencies = [ [[package]] name = "alloy-dyn-abi" -version = "1.1.2" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18cc14d832bc3331ca22a1c7819de1ede99f58f61a7d123952af7dde8de124a6" +checksum = "7b95b3deca680efc7e9cba781f1a1db352fa1ea50e6384a514944dcf4419e652" dependencies = [ "alloy-json-abi", "alloy-primitives", @@ -4002,6 +4002,7 @@ dependencies = [ "sp-core", "sp-runtime", "temp-dir", + "tokio", "tracing", ] diff --git a/Cargo.toml b/Cargo.toml index a28e194..5d1d6a7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -68,6 +68,9 @@ features = [ "rpc-types", "signer-local", "std", + "network", + "serde", + "rpc-types-eth", ] [profile.bench] diff --git a/crates/config/src/lib.rs b/crates/config/src/lib.rs index e72e8f6..7f1c92e 100644 --- a/crates/config/src/lib.rs +++ b/crates/config/src/lib.rs @@ -1,4 +1,4 @@ -//! The global configuration used accross all revive differential testing crates. +//! The global configuration used across all revive differential testing crates. use std::{ fmt::Display, diff --git a/crates/core/src/driver/mod.rs b/crates/core/src/driver/mod.rs index 8c41f96..ba7020d 100644 --- a/crates/core/src/driver/mod.rs +++ b/crates/core/src/driver/mod.rs @@ -1,11 +1,11 @@ //! The test driver handles the compilation and execution of the test cases. use alloy::json_abi::JsonAbi; -use alloy::primitives::Bytes; +use alloy::network::TransactionBuilder; +use alloy::rpc::types::TransactionReceipt; use alloy::rpc::types::trace::geth::GethTrace; -use alloy::rpc::types::{TransactionInput, TransactionReceipt}; use alloy::{ - primitives::{Address, TxKind, map::HashMap}, + primitives::{Address, map::HashMap}, rpc::types::{ TransactionRequest, trace::geth::{AccountState, DiffMode}, @@ -135,21 +135,17 @@ where std::any::type_name::() ); - let tx = match input.legacy_transaction( - self.config.network_id, - nonce, - &self.deployed_contracts, - &self.deployed_abis, - ) { - Ok(tx) => { - tracing::debug!("Legacy transaction data: {tx:#?}"); - tx - } - Err(err) => { - tracing::error!("Failed to construct legacy transaction: {err:?}"); - return Err(err); - } - }; + let tx = + match input.legacy_transaction(nonce, &self.deployed_contracts, &self.deployed_abis) { + Ok(tx) => { + tracing::debug!("Legacy transaction data: {tx:#?}"); + tx + } + Err(err) => { + tracing::error!("Failed to construct legacy transaction: {err:?}"); + return Err(err); + } + }; tracing::trace!("Executing transaction for input: {input:?}"); @@ -201,6 +197,9 @@ where for contracts in contract_map.values() { for (contract_name, contract) in contracts { + let tracing_span = tracing::info_span!("Deploying contract", contract_name); + let _guard = tracing_span.enter(); + tracing::debug!( "Contract name is: {:?} and the input name is: {:?}", &contract_name, @@ -228,16 +227,14 @@ where std::any::type_name::() ); - let tx = TransactionRequest { - from: Some(input.caller), - to: Some(TxKind::Create), - gas_price: Some(5_000_000), - gas: Some(5_000_000), - chain_id: Some(self.config.network_id), - nonce: Some(nonce), - input: TransactionInput::new(Bytes::from(code.into_bytes())), - ..Default::default() - }; + // We are using alloy for building and submitting the transactions and it will + // automatically fill in all of the missing fields from the provider that we + // are using. + let code = alloy::hex::decode(&code)?; + let tx = TransactionRequest::default() + .nonce(nonce) + .from(input.caller) + .with_deploy_code(code); let receipt = match node.execute_transaction(tx) { Ok(receipt) => receipt, diff --git a/crates/format/src/input.rs b/crates/format/src/input.rs index 127640a..0e4b802 100644 --- a/crates/format/src/input.rs +++ b/crates/format/src/input.rs @@ -2,10 +2,10 @@ use std::{collections::HashMap, str::FromStr}; use alloy::{ json_abi::JsonAbi, + network::TransactionBuilder, primitives::{Address, Bytes, U256}, - rpc::types::{TransactionInput, TransactionRequest}, + rpc::types::TransactionRequest, }; -use alloy_primitives::TxKind; use semver::VersionReq; use serde::Deserialize; use serde_json::Value; @@ -157,30 +157,18 @@ impl Input { /// Parse this input into a legacy transaction. pub fn legacy_transaction( &self, - chain_id: u64, nonce: u64, deployed_contracts: &HashMap, deployed_abis: &HashMap, ) -> anyhow::Result { - let to = match self.method { - Method::Deployer => Some(TxKind::Create), - _ => Some(TxKind::Call( - self.instance_to_address(&self.instance, deployed_contracts)?, - )), - }; - let input_data = self.encoded_input(deployed_abis, deployed_contracts)?; - - Ok(TransactionRequest { - from: Some(self.caller), - to, - nonce: Some(nonce), - chain_id: Some(chain_id), - gas_price: Some(5_000_000), - gas: Some(5_000_000), - input: TransactionInput::new(input_data), - ..Default::default() - }) + let transaction_request = TransactionRequest::default().nonce(nonce); + match self.method { + Method::Deployer => Ok(transaction_request.with_deploy_code(input_data)), + _ => Ok(transaction_request + .to(self.instance_to_address(&self.instance, deployed_contracts)?) + .input(input_data.into())), + } } } diff --git a/crates/node/Cargo.toml b/crates/node/Cargo.toml index 1a9dcb0..790c5ba 100644 --- a/crates/node/Cargo.toml +++ b/crates/node/Cargo.toml @@ -12,6 +12,7 @@ rust-version.workspace = true anyhow = { workspace = true } alloy = { workspace = true } tracing = { workspace = true } +tokio = { workspace = true } revive-dt-node-interaction = { workspace = true } revive-dt-config = { workspace = true } diff --git a/crates/node/src/geth.rs b/crates/node/src/geth.rs index c40b18f..99893a0 100644 --- a/crates/node/src/geth.rs +++ b/crates/node/src/geth.rs @@ -2,7 +2,7 @@ use std::{ collections::HashMap, - fs::{File, create_dir_all, remove_dir_all}, + fs::{File, OpenOptions, create_dir_all, remove_dir_all}, io::{BufRead, BufReader, Read, Write}, path::PathBuf, process::{Child, Command, Stdio}, @@ -10,7 +10,6 @@ use std::{ Mutex, atomic::{AtomicU32, Ordering}, }, - thread, time::{Duration, Instant}, }; @@ -28,6 +27,7 @@ use revive_dt_node_interaction::{ EthereumNode, nonce::fetch_onchain_nonce, trace::trace_transaction, transaction::execute_transaction, }; +use tracing::Level; use crate::Node; @@ -45,6 +45,7 @@ pub struct Instance { connection_string: String, base_directory: PathBuf, data_directory: PathBuf, + logs_directory: PathBuf, geth: PathBuf, id: u32, handle: Option, @@ -52,11 +53,17 @@ pub struct Instance { start_timeout: u64, wallet: EthereumWallet, nonces: Mutex>, + /// This vector stores [`File`] objects that we use for logging which we want to flush when the + /// node object is dropped. We do not store them in a structured fashion at the moment (in + /// separate fields) as the logic that we need to apply to them is all the same regardless of + /// what it belongs to, we just want to flush them on [`Drop`] of the node. + logs_file_to_flush: Vec, } impl Instance { const BASE_DIRECTORY: &str = "geth"; const DATA_DIRECTORY: &str = "data"; + const LOGS_DIRECTORY: &str = "logs"; const IPC_FILE: &str = "geth.ipc"; const GENESIS_JSON_FILE: &str = "genesis.json"; @@ -64,9 +71,14 @@ impl Instance { const READY_MARKER: &str = "IPC endpoint opened"; const ERROR_MARKER: &str = "Fatal:"; + const GETH_STDOUT_LOG_FILE_NAME: &str = "node_stdout.log"; + const GETH_STDERR_LOG_FILE_NAME: &str = "node_stderr.log"; + /// Create the node directory and call `geth init` to configure the genesis. + #[tracing::instrument(skip_all, fields(geth_node_id = self.id))] fn init(&mut self, genesis: String) -> anyhow::Result<&mut Self> { create_dir_all(&self.base_directory)?; + create_dir_all(&self.logs_directory)?; let genesis_path = self.base_directory.join(Self::GENESIS_JSON_FILE); File::create(&genesis_path)?.write_all(genesis.as_bytes())?; @@ -96,8 +108,24 @@ impl Instance { /// Spawn the go-ethereum node child process. /// - /// [Instance::init] must be called priorly. + /// [Instance::init] must be called prior. + #[tracing::instrument(skip_all, fields(geth_node_id = self.id))] fn spawn_process(&mut self) -> anyhow::Result<&mut Self> { + // This is the `OpenOptions` that we wish to use for all of the log files that we will be + // opening in this method. We need to construct it in this way to: + // 1. Be consistent + // 2. Less verbose and more dry + // 3. Because the builder pattern uses mutable references so we need to get around that. + let open_options = { + let mut options = OpenOptions::new(); + options.create(true).truncate(true).write(true); + options + }; + + let stdout_logs_file = open_options + .clone() + .open(self.geth_stdout_log_file_path())?; + let stderr_logs_file = open_options.open(self.geth_stderr_log_file_path())?; self.handle = Command::new(&self.geth) .arg("--dev") .arg("--datadir") @@ -109,49 +137,67 @@ impl Instance { .arg("--nodiscover") .arg("--maxpeers") .arg("0") - .stderr(Stdio::piped()) - .stdout(Stdio::null()) + .stderr(stderr_logs_file.try_clone()?) + .stdout(stdout_logs_file.try_clone()?) .spawn()? .into(); + + if let Err(error) = self.wait_ready() { + tracing::error!(?error, "Failed to start geth, shutting down gracefully"); + self.shutdown()?; + return Err(error); + } + + self.logs_file_to_flush + .extend([stderr_logs_file, stdout_logs_file]); + Ok(self) } /// Wait for the g-ethereum node child process getting ready. /// /// [Instance::spawn_process] must be called priorly. + #[tracing::instrument(skip_all, fields(geth_node_id = self.id))] fn wait_ready(&mut self) -> anyhow::Result<&mut Self> { - // Thanks clippy but geth is a server; we don't `wait` but eventually kill it. - #[allow(clippy::zombie_processes)] - let mut child = self.handle.take().expect("should be spawned"); let start_time = Instant::now(); - let maximum_wait_time = Duration::from_millis(self.start_timeout); - let mut stderr = BufReader::new(child.stderr.take().expect("should be piped")).lines(); - let error = loop { - let Some(Ok(line)) = stderr.next() else { - break "child process stderr reading error".to_string(); - }; - if line.contains(Self::ERROR_MARKER) { - break line; - } - if line.contains(Self::READY_MARKER) { - // Keep stderr alive - // https://github.com/alloy-rs/alloy/issues/2091#issuecomment-2676134147 - thread::spawn(move || for _ in stderr.by_ref() {}); - self.handle = child.into(); - return Ok(self); + let logs_file = OpenOptions::new() + .read(true) + .write(false) + .append(false) + .truncate(false) + .open(self.geth_stderr_log_file_path())?; + + let maximum_wait_time = Duration::from_millis(self.start_timeout); + let mut stderr = BufReader::new(logs_file).lines(); + loop { + if let Some(Ok(line)) = stderr.next() { + if line.contains(Self::ERROR_MARKER) { + anyhow::bail!("Failed to start geth {line}"); + } + if line.contains(Self::READY_MARKER) { + return Ok(self); + } } if Instant::now().duration_since(start_time) > maximum_wait_time { - break "spawn timeout".to_string(); + anyhow::bail!("Timeout in starting geth"); } - }; + } + } - let _ = child.kill(); - anyhow::bail!("geth node #{} spawn error: {error}", self.id) + #[tracing::instrument(skip_all, fields(geth_node_id = self.id), level = Level::TRACE)] + fn geth_stdout_log_file_path(&self) -> PathBuf { + self.logs_directory.join(Self::GETH_STDOUT_LOG_FILE_NAME) + } + + #[tracing::instrument(skip_all, fields(geth_node_id = self.id), level = Level::TRACE)] + fn geth_stderr_log_file_path(&self) -> PathBuf { + self.logs_directory.join(Self::GETH_STDERR_LOG_FILE_NAME) } } impl EthereumNode for Instance { + #[tracing::instrument(skip_all, fields(geth_node_id = self.id))] fn execute_transaction( &self, transaction: TransactionRequest, @@ -159,20 +205,89 @@ impl EthereumNode for Instance { let connection_string = self.connection_string(); let wallet = self.wallet.clone(); - tracing::debug!("Submitting transaction: {transaction:#?}"); - execute_transaction(Box::pin(async move { - Ok(ProviderBuilder::new() + let outer_span = tracing::debug_span!("Submitting transaction", ?transaction,); + let _outer_guard = outer_span.enter(); + + let provider = ProviderBuilder::new() .wallet(wallet) .connect(&connection_string) - .await? - .send_transaction(transaction) - .await? - .get_receipt() - .await?) + .await?; + + let pending_transaction = provider.send_transaction(transaction).await?; + let transaction_hash = pending_transaction.tx_hash(); + + let span = tracing::info_span!("Awaiting transaction receipt", ?transaction_hash); + let _guard = span.enter(); + + // The following is a fix for the "transaction indexing is in progress" error that we + // used to get. You can find more information on this in the following GH issue in geth + // https://github.com/ethereum/go-ethereum/issues/28877. To summarize what's going on, + // before we can get the receipt of the transaction it needs to have been indexed by the + // node's indexer. Just because the transaction has been confirmed it doesn't mean that + // it has been indexed. When we call alloy's `get_receipt` it checks if the transaction + // was confirmed. If it has been, then it will call `eth_getTransactionReceipt` method + // which _might_ return the above error if the tx has not yet been indexed yet. So, we + // need to implement a retry mechanism for the receipt to keep retrying to get it until + // it eventually works, but we only do that if the error we get back is the "transaction + // indexing is in progress" error or if the receipt is None. + // + // At the moment we do not allow for the 60 seconds to be modified and we take it as + // being an implementation detail that's invisible to anything outside of this module. + // + // We allow a total of 60 retries for getting the receipt with one second between each + // retry and the next which means that we allow for a total of 60 seconds of waiting + // before we consider that we're unable to get the transaction receipt. + let mut retries = 0; + loop { + match provider.get_transaction_receipt(*transaction_hash).await { + Ok(Some(receipt)) => { + tracing::info!("Obtained the transaction receipt"); + break Ok(receipt); + } + Ok(None) => { + if retries == 60 { + tracing::error!( + "Polled for transaction receipt for 60 seconds but failed to get it" + ); + break Err(anyhow::anyhow!("Failed to get the transaction receipt")); + } else { + tracing::trace!( + retries, + "Sleeping for 1 second and trying to get the receipt again" + ); + retries += 1; + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + continue; + } + } + Err(error) => { + let error_string = error.to_string(); + if error_string.contains("transaction indexing is in progress") { + if retries == 60 { + tracing::error!( + "Polled for transaction receipt for 60 seconds but failed to get it" + ); + break Err(error.into()); + } else { + tracing::trace!( + retries, + "Sleeping for 1 second and trying to get the receipt again" + ); + retries += 1; + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + continue; + } + } else { + break Err(error.into()); + } + } + } + } })) } + #[tracing::instrument(skip_all, fields(geth_node_id = self.id))] fn trace_transaction( &self, transaction: TransactionReceipt, @@ -195,6 +310,7 @@ impl EthereumNode for Instance { })) } + #[tracing::instrument(skip_all, fields(geth_node_id = self.id))] fn state_diff( &self, transaction: alloy::rpc::types::TransactionReceipt, @@ -208,6 +324,7 @@ impl EthereumNode for Instance { } } + #[tracing::instrument(skip_all, fields(geth_node_id = self.id))] fn fetch_add_nonce(&self, address: Address) -> anyhow::Result { let connection_string = self.connection_string.clone(); let wallet = self.wallet.clone(); @@ -231,6 +348,7 @@ impl Node for Instance { Self { connection_string: base_directory.join(Self::IPC_FILE).display().to_string(), data_directory: base_directory.join(Self::DATA_DIRECTORY), + logs_directory: base_directory.join(Self::LOGS_DIRECTORY), base_directory, geth: config.geth.clone(), id, @@ -239,22 +357,46 @@ impl Node for Instance { start_timeout: config.geth_start_timeout, wallet: config.wallet(), nonces: Mutex::new(HashMap::new()), + // We know that we only need to be storing 2 files so we can specify that when creating + // the vector. It's the stdout and stderr of the geth node. + logs_file_to_flush: Vec::with_capacity(2), } } + #[tracing::instrument(skip_all, fields(geth_node_id = self.id))] fn connection_string(&self) -> String { self.connection_string.clone() } - fn shutdown(self) -> anyhow::Result<()> { + #[tracing::instrument(skip_all, fields(geth_node_id = self.id))] + fn shutdown(&mut self) -> anyhow::Result<()> { + // Terminate the processes in a graceful manner to allow for the output to be flushed. + if let Some(mut child) = self.handle.take() { + child + .kill() + .map_err(|error| anyhow::anyhow!("Failed to kill the geth process: {error:?}"))?; + } + + // Flushing the files that we're using for keeping the logs before shutdown. + for file in self.logs_file_to_flush.iter_mut() { + file.flush()? + } + + // Remove the node's database so that subsequent runs do not run on the same database. We + // ignore the error just in case the directory didn't exist in the first place and therefore + // there's nothing to be deleted. + let _ = remove_dir_all(self.base_directory.join(Self::DATA_DIRECTORY)); + Ok(()) } + #[tracing::instrument(skip_all, fields(geth_node_id = self.id))] fn spawn(&mut self, genesis: String) -> anyhow::Result<()> { - self.init(genesis)?.spawn_process()?.wait_ready()?; + self.init(genesis)?.spawn_process()?; Ok(()) } + #[tracing::instrument(skip_all, fields(geth_node_id = self.id))] fn version(&self) -> anyhow::Result { let output = Command::new(&self.geth) .arg("--version") @@ -269,13 +411,9 @@ impl Node for Instance { } impl Drop for Instance { + #[tracing::instrument(skip_all, fields(geth_node_id = self.id))] fn drop(&mut self) { - if let Some(child) = self.handle.as_mut() { - let _ = child.kill(); - } - if self.base_directory.exists() { - let _ = remove_dir_all(&self.base_directory); - } + self.shutdown().expect("Failed to shutdown") } } diff --git a/crates/node/src/kitchensink.rs b/crates/node/src/kitchensink.rs index 4f14a18..c05f54c 100644 --- a/crates/node/src/kitchensink.rs +++ b/crates/node/src/kitchensink.rs @@ -1,8 +1,8 @@ use std::{ collections::HashMap, - fs::create_dir_all, - io::BufRead, - path::PathBuf, + fs::{File, OpenOptions, create_dir_all, remove_dir_all}, + io::{BufRead, Write}, + path::{Path, PathBuf}, process::{Child, Command, Stdio}, sync::{ Mutex, @@ -24,6 +24,7 @@ use alloy::{ use serde_json::{Value as JsonValue, json}; use sp_core::crypto::Ss58Codec; use sp_runtime::AccountId32; +use tracing::Level; use revive_dt_config::Arguments; use revive_dt_node_interaction::{ @@ -43,13 +44,22 @@ pub struct KitchensinkNode { rpc_url: String, wallet: EthereumWallet, base_directory: PathBuf, + logs_directory: PathBuf, process_substrate: Option, process_proxy: Option, nonces: Mutex>, + /// This vector stores [`File`] objects that we use for logging which we want to flush when the + /// node object is dropped. We do not store them in a structured fashion at the moment (in + /// separate fields) as the logic that we need to apply to them is all the same regardless of + /// what it belongs to, we just want to flush them on [`Drop`] of the node. + logs_file_to_flush: Vec, } impl KitchensinkNode { const BASE_DIRECTORY: &str = "kitchensink"; + const LOGS_DIRECTORY: &str = "logs"; + const DATA_DIRECTORY: &str = "chains"; + const SUBSTRATE_READY_MARKER: &str = "Running JSON-RPC server"; const ETH_PROXY_READY_MARKER: &str = "Running JSON-RPC server"; const CHAIN_SPEC_JSON_FILE: &str = "template_chainspec.json"; @@ -59,11 +69,21 @@ impl KitchensinkNode { const SUBSTRATE_LOG_ENV: &str = "error,evm=debug,sc_rpc_server=info,runtime::revive=debug"; const PROXY_LOG_ENV: &str = "info,eth-rpc=debug"; + const KITCHENSINK_STDOUT_LOG_FILE_NAME: &str = "node_stdout.log"; + const KITCHENSINK_STDERR_LOG_FILE_NAME: &str = "node_stderr.log"; + + const PROXY_STDOUT_LOG_FILE_NAME: &str = "proxy_stdout.log"; + const PROXY_STDERR_LOG_FILE_NAME: &str = "proxy_stderr.log"; + + #[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))] fn init(&mut self, genesis: &str) -> anyhow::Result<&mut Self> { create_dir_all(&self.base_directory)?; + create_dir_all(&self.logs_directory)?; let template_chainspec_path = self.base_directory.join(Self::CHAIN_SPEC_JSON_FILE); + // Note: we do not pipe the logs of this process to a separate file since this is just a + // once-off export of the default chain spec and not part of the long-running node process. let output = Command::new(&self.substrate_binary) .arg("export-chain-spec") .arg("--chain") @@ -112,6 +132,7 @@ impl KitchensinkNode { Ok(self) } + #[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))] fn spawn_process(&mut self) -> anyhow::Result<()> { let substrate_rpc_port = Self::BASE_SUBSTRATE_RPC_PORT + self.id as u16; let proxy_rpc_port = Self::BASE_PROXY_RPC_PORT + self.id as u16; @@ -120,8 +141,25 @@ impl KitchensinkNode { let chainspec_path = self.base_directory.join(Self::CHAIN_SPEC_JSON_FILE); + // This is the `OpenOptions` that we wish to use for all of the log files that we will be + // opening in this method. We need to construct it in this way to: + // 1. Be consistent + // 2. Less verbose and more dry + // 3. Because the builder pattern uses mutable references so we need to get around that. + let open_options = { + let mut options = OpenOptions::new(); + options.create(true).truncate(true).write(true); + options + }; + // Start Substrate node - let mut substrate_process = Command::new(&self.substrate_binary) + let kitchensink_stdout_logs_file = open_options + .clone() + .open(self.kitchensink_stdout_log_file_path())?; + let kitchensink_stderr_logs_file = open_options + .clone() + .open(self.kitchensink_stderr_log_file_path())?; + self.process_substrate = Command::new(&self.substrate_binary) .arg("--chain") .arg(chainspec_path) .arg("--base-path") @@ -136,40 +174,61 @@ impl KitchensinkNode { .arg("--rpc-cors") .arg("all") .env("RUST_LOG", Self::SUBSTRATE_LOG_ENV) - .stdout(Stdio::null()) - .stderr(Stdio::piped()) - .spawn()?; + .stdout(kitchensink_stdout_logs_file.try_clone()?) + .stderr(kitchensink_stderr_logs_file.try_clone()?) + .spawn()? + .into(); // Give the node a moment to boot - Self::wait_ready( - &mut substrate_process, + if let Err(error) = Self::wait_ready( + self.kitchensink_stderr_log_file_path().as_path(), Self::SUBSTRATE_READY_MARKER, Duration::from_secs(30), - )?; + ) { + tracing::error!( + ?error, + "Failed to start substrate, shutting down gracefully" + ); + self.shutdown()?; + return Err(error); + }; - let mut proxy_process = Command::new(&self.eth_proxy_binary) + let eth_proxy_stdout_logs_file = open_options + .clone() + .open(self.proxy_stdout_log_file_path())?; + let eth_proxy_stderr_logs_file = open_options.open(self.proxy_stderr_log_file_path())?; + self.process_proxy = Command::new(&self.eth_proxy_binary) .arg("--dev") .arg("--rpc-port") .arg(proxy_rpc_port.to_string()) .arg("--node-rpc-url") .arg(format!("ws://127.0.0.1:{substrate_rpc_port}")) .env("RUST_LOG", Self::PROXY_LOG_ENV) - .stdout(Stdio::null()) - .stderr(Stdio::piped()) - .spawn()?; + .stdout(eth_proxy_stdout_logs_file.try_clone()?) + .stderr(eth_proxy_stderr_logs_file.try_clone()?) + .spawn()? + .into(); - Self::wait_ready( - &mut proxy_process, + if let Err(error) = Self::wait_ready( + self.proxy_stderr_log_file_path().as_path(), Self::ETH_PROXY_READY_MARKER, Duration::from_secs(30), - )?; + ) { + tracing::error!(?error, "Failed to start proxy, shutting down gracefully"); + self.shutdown()?; + return Err(error); + }; - self.process_substrate = Some(substrate_process); - self.process_proxy = Some(proxy_process); + self.logs_file_to_flush.extend([ + kitchensink_stdout_logs_file, + kitchensink_stderr_logs_file, + eth_proxy_stdout_logs_file, + eth_proxy_stderr_logs_file, + ]); Ok(()) } - + #[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))] fn extract_balance_from_genesis_file( &self, genesis_str: &str, @@ -210,27 +269,30 @@ impl KitchensinkNode { Ok(account_id.to_ss58check()) } - fn wait_ready(child: &mut Child, marker: &str, timeout: Duration) -> anyhow::Result<()> { + fn wait_ready(logs_file_path: &Path, marker: &str, timeout: Duration) -> anyhow::Result<()> { let start_time = std::time::Instant::now(); - let stderr = child.stderr.take().expect("stderr must be piped"); + let logs_file = OpenOptions::new() + .read(true) + .write(false) + .append(false) + .truncate(false) + .open(logs_file_path)?; - let mut lines = std::io::BufReader::new(stderr).lines(); + let mut lines = std::io::BufReader::new(logs_file).lines(); loop { if let Some(Ok(line)) = lines.next() { - println!("Kitchensink log: {line:?}"); if line.contains(marker) { - std::thread::spawn(move || for _ in lines.by_ref() {}); return Ok(()); } } if start_time.elapsed() > timeout { - let _ = child.kill(); anyhow::bail!("Timeout waiting for process readiness: {marker}"); } } } + #[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))] pub fn eth_rpc_version(&self) -> anyhow::Result { let output = Command::new(&self.eth_proxy_binary) .arg("--version") @@ -242,9 +304,32 @@ impl KitchensinkNode { .stdout; Ok(String::from_utf8_lossy(&output).trim().to_string()) } + + #[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id), level = Level::TRACE)] + fn kitchensink_stdout_log_file_path(&self) -> PathBuf { + self.logs_directory + .join(Self::KITCHENSINK_STDOUT_LOG_FILE_NAME) + } + + #[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id), level = Level::TRACE)] + fn kitchensink_stderr_log_file_path(&self) -> PathBuf { + self.logs_directory + .join(Self::KITCHENSINK_STDERR_LOG_FILE_NAME) + } + + #[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id), level = Level::TRACE)] + fn proxy_stdout_log_file_path(&self) -> PathBuf { + self.logs_directory.join(Self::PROXY_STDOUT_LOG_FILE_NAME) + } + + #[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id), level = Level::TRACE)] + fn proxy_stderr_log_file_path(&self) -> PathBuf { + self.logs_directory.join(Self::PROXY_STDERR_LOG_FILE_NAME) + } } impl EthereumNode for KitchensinkNode { + #[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))] fn execute_transaction( &self, transaction: alloy::rpc::types::TransactionRequest, @@ -254,7 +339,8 @@ impl EthereumNode for KitchensinkNode { tracing::debug!("Submitting transaction: {transaction:#?}"); - execute_transaction(Box::pin(async move { + tracing::info!("Submitting tx to kitchensink"); + let receipt = execute_transaction(Box::pin(async move { Ok(ProviderBuilder::new() .wallet(wallet) .connect(&url) @@ -263,9 +349,12 @@ impl EthereumNode for KitchensinkNode { .await? .get_receipt() .await?) - })) + })); + tracing::info!(?receipt, "Submitted tx to kitchensink"); + receipt } + #[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))] fn trace_transaction( &self, transaction: TransactionReceipt, @@ -289,6 +378,7 @@ impl EthereumNode for KitchensinkNode { })) } + #[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))] fn state_diff(&self, transaction: TransactionReceipt) -> anyhow::Result { match self .trace_transaction(transaction)? @@ -299,6 +389,7 @@ impl EthereumNode for KitchensinkNode { } } + #[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))] fn fetch_add_nonce(&self, address: Address) -> anyhow::Result { let url = self.rpc_url.clone(); let wallet = self.wallet.clone(); @@ -318,6 +409,7 @@ impl Node for KitchensinkNode { let kitchensink_directory = config.directory().join(Self::BASE_DIRECTORY); let id = NODE_COUNT.fetch_add(1, Ordering::SeqCst); let base_directory = kitchensink_directory.join(id.to_string()); + let logs_directory = base_directory.join(Self::LOGS_DIRECTORY); Self { id, @@ -326,30 +418,54 @@ impl Node for KitchensinkNode { rpc_url: String::new(), wallet: config.wallet(), base_directory, + logs_directory, process_substrate: None, process_proxy: None, nonces: Mutex::new(HashMap::new()), + // We know that we only need to be storing 4 files so we can specify that when creating + // the vector. It's the stdout and stderr of the substrate-node and the eth-rpc. + logs_file_to_flush: Vec::with_capacity(4), } } + #[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))] fn connection_string(&self) -> String { self.rpc_url.clone() } - fn shutdown(mut self) -> anyhow::Result<()> { + #[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))] + fn shutdown(&mut self) -> anyhow::Result<()> { + // Terminate the processes in a graceful manner to allow for the output to be flushed. if let Some(mut child) = self.process_proxy.take() { - let _ = child.kill(); + child + .kill() + .map_err(|error| anyhow::anyhow!("Failed to kill the proxy process: {error:?}"))?; } if let Some(mut child) = self.process_substrate.take() { - let _ = child.kill(); + child.kill().map_err(|error| { + anyhow::anyhow!("Failed to kill the substrate process: {error:?}") + })?; } + + // Flushing the files that we're using for keeping the logs before shutdown. + for file in self.logs_file_to_flush.iter_mut() { + file.flush()? + } + + // Remove the node's database so that subsequent runs do not run on the same database. We + // ignore the error just in case the directory didn't exist in the first place and therefore + // there's nothing to be deleted. + let _ = remove_dir_all(self.base_directory.join(Self::DATA_DIRECTORY)); + Ok(()) } + #[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))] fn spawn(&mut self, genesis: String) -> anyhow::Result<()> { self.init(&genesis)?.spawn_process() } + #[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))] fn version(&self) -> anyhow::Result { let output = Command::new(&self.substrate_binary) .arg("--version") @@ -364,13 +480,9 @@ impl Node for KitchensinkNode { } impl Drop for KitchensinkNode { + #[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))] fn drop(&mut self) { - if let Some(mut child) = self.process_proxy.take() { - let _ = child.kill(); - } - if let Some(mut child) = self.process_substrate.take() { - let _ = child.kill(); - } + self.shutdown().expect("Failed to shutdown") } } diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index a293c8a..7552ae6 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -23,7 +23,7 @@ pub trait Node: EthereumNode { /// Prune the node instance and related data. /// /// Blocking until it's completely stopped. - fn shutdown(self) -> anyhow::Result<()>; + fn shutdown(&mut self) -> anyhow::Result<()>; /// Returns the nodes connection string. fn connection_string(&self) -> String;