Compare commits

...

3 Commits

Author SHA1 Message Date
Omar Abdulla affc0ec7c9 Commit dev-genesis 2025-09-24 05:36:54 +03:00
Omar Abdulla aa92b5c3f3 Cleanup some of the node tests to use shared nodes 2025-09-24 05:26:57 +03:00
Omar Abdulla 11096128d9 Clean up the process flow for nodes 2025-09-22 10:01:22 +03:00
7 changed files with 356 additions and 343 deletions
+1
View File
@@ -13,3 +13,4 @@ resolc-compiler-tests
workdir workdir
!/schema.json !/schema.json
!/dev-genesis.json
+1 -1
View File
@@ -431,7 +431,7 @@ pub struct GenesisConfiguration {
impl GenesisConfiguration { impl GenesisConfiguration {
pub fn genesis(&self) -> anyhow::Result<&Genesis> { pub fn genesis(&self) -> anyhow::Result<&Genesis> {
static DEFAULT_GENESIS: LazyLock<Genesis> = LazyLock::new(|| { static DEFAULT_GENESIS: LazyLock<Genesis> = LazyLock::new(|| {
let genesis = include_str!("../../../genesis.json"); let genesis = include_str!("../../../dev-genesis.json");
serde_json::from_str(genesis).unwrap() serde_json::from_str(genesis).unwrap()
}); });
+89 -150
View File
@@ -1,17 +1,17 @@
//! The go-ethereum node implementation. //! The go-ethereum node implementation.
use std::{ use std::{
fs::{File, OpenOptions, create_dir_all, remove_dir_all}, fs::{File, create_dir_all, remove_dir_all},
io::{BufRead, BufReader, Read, Write}, io::Read,
ops::ControlFlow, ops::ControlFlow,
path::PathBuf, path::PathBuf,
pin::Pin, pin::Pin,
process::{Child, Command, Stdio}, process::{Command, Stdio},
sync::{ sync::{
Arc, Arc,
atomic::{AtomicU32, Ordering}, atomic::{AtomicU32, Ordering},
}, },
time::{Duration, Instant}, time::Duration,
}; };
use alloy::{ use alloy::{
@@ -41,7 +41,12 @@ use revive_dt_config::*;
use revive_dt_format::traits::ResolverApi; use revive_dt_format::traits::ResolverApi;
use revive_dt_node_interaction::EthereumNode; use revive_dt_node_interaction::EthereumNode;
use crate::{Node, common::FallbackGasFiller, constants::INITIAL_BALANCE}; use crate::{
Node,
common::FallbackGasFiller,
constants::INITIAL_BALANCE,
process::{Process, ProcessReadinessWaitBehavior},
};
static NODE_COUNT: AtomicU32 = AtomicU32::new(0); static NODE_COUNT: AtomicU32 = AtomicU32::new(0);
@@ -61,16 +66,11 @@ pub struct GethNode {
logs_directory: PathBuf, logs_directory: PathBuf,
geth: PathBuf, geth: PathBuf,
id: u32, id: u32,
handle: Option<Child>, handle: Option<Process>,
start_timeout: Duration, start_timeout: Duration,
wallet: Arc<EthereumWallet>, wallet: Arc<EthereumWallet>,
nonce_manager: CachedNonceManager, nonce_manager: CachedNonceManager,
chain_id_filler: ChainIdFiller, chain_id_filler: ChainIdFiller,
/// This vector stores [`File`] objects that we use for logging which we want to flush when the
/// node object is dropped. We do not store them in a structured fashion at the moment (in
/// separate fields) as the logic that we need to apply to them is all the same regardless of
/// what it belongs to, we just want to flush them on [`Drop`] of the node.
logs_file_to_flush: Vec<File>,
} }
impl GethNode { impl GethNode {
@@ -84,9 +84,6 @@ impl GethNode {
const READY_MARKER: &str = "IPC endpoint opened"; const READY_MARKER: &str = "IPC endpoint opened";
const ERROR_MARKER: &str = "Fatal:"; const ERROR_MARKER: &str = "Fatal:";
const GETH_STDOUT_LOG_FILE_NAME: &str = "node_stdout.log";
const GETH_STDERR_LOG_FILE_NAME: &str = "node_stderr.log";
const TRANSACTION_INDEXING_ERROR: &str = "transaction indexing is in progress"; const TRANSACTION_INDEXING_ERROR: &str = "transaction indexing is in progress";
const TRANSACTION_TRACING_ERROR: &str = "historical state not available in path scheme yet"; const TRANSACTION_TRACING_ERROR: &str = "historical state not available in path scheme yet";
@@ -124,9 +121,6 @@ impl GethNode {
wallet: wallet.clone(), wallet: wallet.clone(),
chain_id_filler: Default::default(), chain_id_filler: Default::default(),
nonce_manager: Default::default(), nonce_manager: Default::default(),
// We know that we only need to be storing 2 files so we can specify that when creating
// the vector. It's the stdout and stderr of the geth node.
logs_file_to_flush: Vec::with_capacity(2),
} }
} }
@@ -194,118 +188,63 @@ impl GethNode {
/// [Instance::init] must be called prior. /// [Instance::init] must be called prior.
#[instrument(level = "info", skip_all, fields(geth_node_id = self.id))] #[instrument(level = "info", skip_all, fields(geth_node_id = self.id))]
fn spawn_process(&mut self) -> anyhow::Result<&mut Self> { fn spawn_process(&mut self) -> anyhow::Result<&mut Self> {
// This is the `OpenOptions` that we wish to use for all of the log files that we will be let process = Process::new(
// opening in this method. We need to construct it in this way to: None,
// 1. Be consistent self.logs_directory.as_path(),
// 2. Less verbose and more dry self.geth.as_path(),
// 3. Because the builder pattern uses mutable references so we need to get around that. |command, stdout_file, stderr_file| {
let open_options = { command
let mut options = OpenOptions::new(); .arg("--dev")
options.create(true).truncate(true).write(true); .arg("--datadir")
options .arg(&self.data_directory)
}; .arg("--ipcpath")
.arg(&self.connection_string)
.arg("--nodiscover")
.arg("--maxpeers")
.arg("0")
.arg("--txlookuplimit")
.arg("0")
.arg("--cache.blocklogs")
.arg("512")
.arg("--state.scheme")
.arg("hash")
.arg("--syncmode")
.arg("full")
.arg("--gcmode")
.arg("archive")
.stderr(stderr_file)
.stdout(stdout_file);
},
ProcessReadinessWaitBehavior::TimeBoundedWaitFunction {
max_wait_duration: self.start_timeout,
check_function: Box::new(|_, stderr_line| match stderr_line {
Some(line) => {
if line.contains(Self::ERROR_MARKER) {
anyhow::bail!("Failed to start geth {line}");
} else if line.contains(Self::READY_MARKER) {
Ok(true)
} else {
Ok(false)
}
}
None => Ok(false),
}),
},
);
let stdout_logs_file = open_options match process {
.clone() Ok(process) => self.handle = Some(process),
.open(self.geth_stdout_log_file_path()) Err(err) => {
.context("Failed to open geth stdout logs file")?; tracing::error!(?err, "Failed to start geth, shutting down gracefully");
let stderr_logs_file = open_options self.shutdown()
.open(self.geth_stderr_log_file_path()) .context("Failed to gracefully shutdown after geth start error")?;
.context("Failed to open geth stderr logs file")?; return Err(err);
self.handle = Command::new(&self.geth) }
.arg("--dev")
.arg("--datadir")
.arg(&self.data_directory)
.arg("--ipcpath")
.arg(&self.connection_string)
.arg("--nodiscover")
.arg("--maxpeers")
.arg("0")
.arg("--txlookuplimit")
.arg("0")
.arg("--cache.blocklogs")
.arg("512")
.arg("--state.scheme")
.arg("hash")
.arg("--syncmode")
.arg("full")
.arg("--gcmode")
.arg("archive")
.stderr(
stderr_logs_file
.try_clone()
.context("Failed to clone geth stderr log file handle")?,
)
.stdout(
stdout_logs_file
.try_clone()
.context("Failed to clone geth stdout log file handle")?,
)
.spawn()
.context("Failed to spawn geth node process")?
.into();
if let Err(error) = self.wait_ready() {
tracing::error!(?error, "Failed to start geth, shutting down gracefully");
self.shutdown()
.context("Failed to gracefully shutdown after geth start error")?;
return Err(error);
} }
self.logs_file_to_flush
.extend([stderr_logs_file, stdout_logs_file]);
Ok(self) Ok(self)
} }
/// Wait for the g-ethereum node child process getting ready.
///
/// [Instance::spawn_process] must be called priorly.
#[instrument(level = "info", skip_all, fields(geth_node_id = self.id))]
fn wait_ready(&mut self) -> anyhow::Result<&mut Self> {
let start_time = Instant::now();
let logs_file = OpenOptions::new()
.read(true)
.write(false)
.append(false)
.truncate(false)
.open(self.geth_stderr_log_file_path())
.context("Failed to open geth stderr logs file for readiness check")?;
let maximum_wait_time = self.start_timeout;
let mut stderr = BufReader::new(logs_file).lines();
let mut lines = vec![];
loop {
if let Some(Ok(line)) = stderr.next() {
if line.contains(Self::ERROR_MARKER) {
anyhow::bail!("Failed to start geth {line}");
}
if line.contains(Self::READY_MARKER) {
return Ok(self);
}
lines.push(line);
}
if Instant::now().duration_since(start_time) > maximum_wait_time {
anyhow::bail!(
"Timeout in starting geth: took longer than {}ms. stdout:\n\n{}\n",
self.start_timeout.as_millis(),
lines.join("\n")
);
}
}
}
#[instrument(level = "info", skip_all, fields(geth_node_id = self.id))]
fn geth_stdout_log_file_path(&self) -> PathBuf {
self.logs_directory.join(Self::GETH_STDOUT_LOG_FILE_NAME)
}
#[instrument(level = "info", skip_all, fields(geth_node_id = self.id))]
fn geth_stderr_log_file_path(&self) -> PathBuf {
self.logs_directory.join(Self::GETH_STDERR_LOG_FILE_NAME)
}
async fn provider( async fn provider(
&self, &self,
) -> anyhow::Result<FillProvider<impl TxFiller<Ethereum>, impl Provider<Ethereum>, Ethereum>> ) -> anyhow::Result<FillProvider<impl TxFiller<Ethereum>, impl Provider<Ethereum>, Ethereum>>
@@ -650,17 +589,7 @@ impl<F: TxFiller<Ethereum>, P: Provider<Ethereum>> ResolverApi for GethNodeResol
impl Node for GethNode { impl Node for GethNode {
#[instrument(level = "info", skip_all, fields(geth_node_id = self.id))] #[instrument(level = "info", skip_all, fields(geth_node_id = self.id))]
fn shutdown(&mut self) -> anyhow::Result<()> { fn shutdown(&mut self) -> anyhow::Result<()> {
// Terminate the processes in a graceful manner to allow for the output to be flushed. drop(self.handle.take());
if let Some(mut child) = self.handle.take() {
child
.kill()
.map_err(|error| anyhow::anyhow!("Failed to kill the geth process: {error:?}"))?;
}
// Flushing the files that we're using for keeping the logs before shutdown.
for file in self.logs_file_to_flush.iter_mut() {
file.flush()?
}
// Remove the node's database so that subsequent runs do not run on the same database. We // Remove the node's database so that subsequent runs do not run on the same database. We
// ignore the error just in case the directory didn't exist in the first place and therefore // ignore the error just in case the directory didn't exist in the first place and therefore
@@ -701,6 +630,8 @@ impl Drop for GethNode {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::sync::LazyLock;
use super::*; use super::*;
fn test_config() -> TestExecutionContext { fn test_config() -> TestExecutionContext {
@@ -717,9 +648,21 @@ mod tests {
(context, node) (context, node)
} }
fn shared_node() -> &'static GethNode {
static NODE: LazyLock<(TestExecutionContext, GethNode)> = LazyLock::new(new_node);
&NODE.1
}
#[test] #[test]
fn version_works() { fn version_works() {
let version = GethNode::new(&test_config()).version().unwrap(); // Arrange
let node = shared_node();
// Act
let version = node.version();
// Assert
let version = version.expect("Failed to get the version");
assert!( assert!(
version.starts_with("geth version"), version.starts_with("geth version"),
"expected version string, got: '{version}'" "expected version string, got: '{version}'"
@@ -729,7 +672,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn can_get_chain_id_from_node() { async fn can_get_chain_id_from_node() {
// Arrange // Arrange
let (_context, node) = new_node(); let node = shared_node();
// Act // Act
let chain_id = node.resolver().await.unwrap().chain_id().await; let chain_id = node.resolver().await.unwrap().chain_id().await;
@@ -742,7 +685,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn can_get_gas_limit_from_node() { async fn can_get_gas_limit_from_node() {
// Arrange // Arrange
let (_context, node) = new_node(); let node = shared_node();
// Act // Act
let gas_limit = node let gas_limit = node
@@ -753,14 +696,13 @@ mod tests {
.await; .await;
// Assert // Assert
let gas_limit = gas_limit.expect("Failed to get the gas limit"); let _ = gas_limit.expect("Failed to get the gas limit");
assert_eq!(gas_limit, u32::MAX as u128)
} }
#[tokio::test] #[tokio::test]
async fn can_get_coinbase_from_node() { async fn can_get_coinbase_from_node() {
// Arrange // Arrange
let (_context, node) = new_node(); let node = shared_node();
// Act // Act
let coinbase = node let coinbase = node
@@ -771,14 +713,13 @@ mod tests {
.await; .await;
// Assert // Assert
let coinbase = coinbase.expect("Failed to get the coinbase"); let _ = coinbase.expect("Failed to get the coinbase");
assert_eq!(coinbase, Address::new([0xFF; 20]))
} }
#[tokio::test] #[tokio::test]
async fn can_get_block_difficulty_from_node() { async fn can_get_block_difficulty_from_node() {
// Arrange // Arrange
let (_context, node) = new_node(); let node = shared_node();
// Act // Act
let block_difficulty = node let block_difficulty = node
@@ -789,14 +730,13 @@ mod tests {
.await; .await;
// Assert // Assert
let block_difficulty = block_difficulty.expect("Failed to get the block difficulty"); let _ = block_difficulty.expect("Failed to get the block difficulty");
assert_eq!(block_difficulty, U256::ZERO)
} }
#[tokio::test] #[tokio::test]
async fn can_get_block_hash_from_node() { async fn can_get_block_hash_from_node() {
// Arrange // Arrange
let (_context, node) = new_node(); let node = shared_node();
// Act // Act
let block_hash = node let block_hash = node
@@ -813,7 +753,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn can_get_block_timestamp_from_node() { async fn can_get_block_timestamp_from_node() {
// Arrange // Arrange
let (_context, node) = new_node(); let node = shared_node();
// Act // Act
let block_timestamp = node let block_timestamp = node
@@ -830,13 +770,12 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn can_get_block_number_from_node() { async fn can_get_block_number_from_node() {
// Arrange // Arrange
let (_context, node) = new_node(); let node = shared_node();
// Act // Act
let block_number = node.resolver().await.unwrap().last_block_number().await; let block_number = node.resolver().await.unwrap().last_block_number().await;
// Assert // Assert
let block_number = block_number.expect("Failed to get the block number"); let _ = block_number.expect("Failed to get the block number");
assert_eq!(block_number, 0)
} }
} }
+1
View File
@@ -6,6 +6,7 @@ use revive_dt_node_interaction::EthereumNode;
pub mod common; pub mod common;
pub mod constants; pub mod constants;
pub mod geth; pub mod geth;
pub mod process;
pub mod substrate; pub mod substrate;
/// An abstract interface for testing nodes. /// An abstract interface for testing nodes.
+162
View File
@@ -0,0 +1,162 @@
use std::{
fs::{File, OpenOptions},
io::{BufRead, BufReader, Write},
path::Path,
process::{Child, Command},
time::{Duration, Instant},
};
use anyhow::{Context, Result, bail};
/// A wrapper around processes which allows for their stdout and stderr to be logged and flushed
/// when the process is dropped.
#[derive(Debug)]
pub struct Process {
/// The handle of the child process.
child: Child,
/// The file that stdout is being logged to.
stdout_logs_file: File,
/// The file that stderr is being logged to.
stderr_logs_file: File,
}
impl Process {
pub fn new(
log_file_prefix: impl Into<Option<&'static str>>,
logs_directory: impl AsRef<Path>,
binary_path: impl AsRef<Path>,
command_building_callback: impl FnOnce(&mut Command, File, File),
process_readiness_wait_behavior: ProcessReadinessWaitBehavior,
) -> Result<Self> {
let log_file_prefix = log_file_prefix.into();
let (stdout_file_name, stderr_file_name) = match log_file_prefix {
Some(prefix) => (
format!("{prefix}_stdout.log"),
format!("{prefix}_stderr.log"),
),
None => ("stdout.log".to_string(), "stderr.log".to_string()),
};
let stdout_logs_file_path = logs_directory.as_ref().join(stdout_file_name);
let stderr_logs_file_path = logs_directory.as_ref().join(stderr_file_name);
let stdout_logs_file = OpenOptions::new()
.write(true)
.truncate(true)
.create(true)
.open(stdout_logs_file_path.as_path())
.context("Failed to open the stdout logs file")?;
let stderr_logs_file = OpenOptions::new()
.write(true)
.truncate(true)
.create(true)
.open(stderr_logs_file_path.as_path())
.context("Failed to open the stderr logs file")?;
let mut command = {
let stdout_logs_file = stdout_logs_file
.try_clone()
.context("Failed to clone the stdout logs file")?;
let stderr_logs_file = stderr_logs_file
.try_clone()
.context("Failed to clone the stderr logs file")?;
let mut command = Command::new(binary_path.as_ref());
command_building_callback(&mut command, stdout_logs_file, stderr_logs_file);
command
};
let child = command
.spawn()
.context("Failed to spawn the built command")?;
match process_readiness_wait_behavior {
ProcessReadinessWaitBehavior::NoStartupWait => {}
ProcessReadinessWaitBehavior::WaitDuration(duration) => std::thread::sleep(duration),
ProcessReadinessWaitBehavior::TimeBoundedWaitFunction {
max_wait_duration,
mut check_function,
} => {
let spawn_time = Instant::now();
let stdout_logs_file = OpenOptions::new()
.read(true)
.open(stdout_logs_file_path)
.context("Failed to open the stdout logs file")?;
let stderr_logs_file = OpenOptions::new()
.read(true)
.open(stderr_logs_file_path)
.context("Failed to open the stderr logs file")?;
let mut stdout_lines = BufReader::new(stdout_logs_file).lines();
let mut stderr_lines = BufReader::new(stderr_logs_file).lines();
loop {
let stdout_line = stdout_lines.next().and_then(Result::ok);
let stderr_line = stderr_lines.next().and_then(Result::ok);
let check_result =
check_function(stdout_line.as_deref(), stderr_line.as_deref())
.context("Failed to wait for the process to be ready")?;
if check_result {
break;
}
if Instant::now().duration_since(spawn_time) > max_wait_duration {
bail!("Waited for the process to start but it failed to start in time")
}
}
}
}
Ok(Self {
child,
stdout_logs_file,
stderr_logs_file,
})
}
}
impl Drop for Process {
fn drop(&mut self) {
self.child.kill().expect("Failed to kill the process");
self.stdout_logs_file
.flush()
.expect("Failed to flush the stdout logs file");
self.stderr_logs_file
.flush()
.expect("Failed to flush the stderr logs file");
}
}
pub enum ProcessReadinessWaitBehavior {
/// The process does not require any kind of wait after it's been spawned and can be used
/// straight away.
NoStartupWait,
/// The process does require some amount of wait duration after it's been started.
WaitDuration(Duration),
/// The process requires a time bounded wait function which is a function of the lines that
/// appear in the log files.
TimeBoundedWaitFunction {
/// The maximum amount of time to wait for the check function to return true.
max_wait_duration: Duration,
/// The function to use to check if the process spawned is ready to use or not. This
/// function should return the following in the following cases:
///
/// - `Ok(true)`: Returned when the condition the process is waiting for has been fulfilled
/// and the wait is completed.
/// - `Ok(false)`: The process is not ready yet but it might be ready in the future.
/// - `Err`: The process is not ready yet and will not be ready in the future as it appears
/// that it has encountered an error when it was being spawned.
///
/// The first argument is a line from stdout and the second argument is a line from stderr.
#[allow(clippy::type_complexity)]
check_function: Box<dyn FnMut(Option<&str>, Option<&str>) -> anyhow::Result<bool>>,
},
}
+101 -191
View File
@@ -1,9 +1,8 @@
use std::{ use std::{
fs::{File, OpenOptions, create_dir_all, remove_dir_all}, fs::{create_dir_all, remove_dir_all},
io::{BufRead, Write}, path::PathBuf,
path::{Path, PathBuf},
pin::Pin, pin::Pin,
process::{Child, Command, Stdio}, process::{Command, Stdio},
sync::{ sync::{
Arc, Arc,
atomic::{AtomicU32, Ordering}, atomic::{AtomicU32, Ordering},
@@ -47,7 +46,12 @@ use revive_dt_config::*;
use revive_dt_node_interaction::EthereumNode; use revive_dt_node_interaction::EthereumNode;
use tracing::instrument; use tracing::instrument;
use crate::{Node, common::FallbackGasFiller, constants::INITIAL_BALANCE}; use crate::{
Node,
common::FallbackGasFiller,
constants::INITIAL_BALANCE,
process::{Process, ProcessReadinessWaitBehavior},
};
static NODE_COUNT: AtomicU32 = AtomicU32::new(0); static NODE_COUNT: AtomicU32 = AtomicU32::new(0);
@@ -63,12 +67,11 @@ pub struct SubstrateNode {
rpc_url: String, rpc_url: String,
base_directory: PathBuf, base_directory: PathBuf,
logs_directory: PathBuf, logs_directory: PathBuf,
process_substrate: Option<Child>, substrate_process: Option<Process>,
process_proxy: Option<Child>, eth_proxy_process: Option<Process>,
wallet: Arc<EthereumWallet>, wallet: Arc<EthereumWallet>,
nonce_manager: CachedNonceManager, nonce_manager: CachedNonceManager,
chain_id_filler: ChainIdFiller, chain_id_filler: ChainIdFiller,
logs_file_to_flush: Vec<File>,
} }
impl SubstrateNode { impl SubstrateNode {
@@ -85,12 +88,6 @@ impl SubstrateNode {
const SUBSTRATE_LOG_ENV: &str = "error,evm=debug,sc_rpc_server=info,runtime::revive=debug"; const SUBSTRATE_LOG_ENV: &str = "error,evm=debug,sc_rpc_server=info,runtime::revive=debug";
const PROXY_LOG_ENV: &str = "info,eth-rpc=debug"; const PROXY_LOG_ENV: &str = "info,eth-rpc=debug";
const SUBSTRATE_STDOUT_LOG_FILE_NAME: &str = "node_stdout.log";
const SUBSTRATE_STDERR_LOG_FILE_NAME: &str = "node_stderr.log";
const PROXY_STDOUT_LOG_FILE_NAME: &str = "proxy_stdout.log";
const PROXY_STDERR_LOG_FILE_NAME: &str = "proxy_stderr.log";
pub const KITCHENSINK_EXPORT_CHAINSPEC_COMMAND: &str = "export-chain-spec"; pub const KITCHENSINK_EXPORT_CHAINSPEC_COMMAND: &str = "export-chain-spec";
pub const REVIVE_DEV_NODE_EXPORT_CHAINSPEC_COMMAND: &str = "build-spec"; pub const REVIVE_DEV_NODE_EXPORT_CHAINSPEC_COMMAND: &str = "build-spec";
@@ -121,16 +118,16 @@ impl SubstrateNode {
rpc_url: String::new(), rpc_url: String::new(),
base_directory, base_directory,
logs_directory, logs_directory,
process_substrate: None, substrate_process: None,
process_proxy: None, eth_proxy_process: None,
wallet: wallet.clone(), wallet: wallet.clone(),
chain_id_filler: Default::default(), chain_id_filler: Default::default(),
nonce_manager: Default::default(), nonce_manager: Default::default(),
logs_file_to_flush: Vec::with_capacity(4),
} }
} }
fn init(&mut self, mut genesis: Genesis) -> anyhow::Result<&mut Self> { fn init(&mut self, mut genesis: Genesis) -> anyhow::Result<&mut Self> {
let _ = remove_dir_all(self.base_directory.as_path());
let _ = clear_directory(&self.base_directory); let _ = clear_directory(&self.base_directory);
let _ = clear_directory(&self.logs_directory); let _ = clear_directory(&self.logs_directory);
@@ -213,120 +210,88 @@ impl SubstrateNode {
let substrate_rpc_port = Self::BASE_SUBSTRATE_RPC_PORT + self.id as u16; let substrate_rpc_port = Self::BASE_SUBSTRATE_RPC_PORT + self.id as u16;
let proxy_rpc_port = Self::BASE_PROXY_RPC_PORT + self.id as u16; let proxy_rpc_port = Self::BASE_PROXY_RPC_PORT + self.id as u16;
self.rpc_url = format!("http://127.0.0.1:{proxy_rpc_port}");
let chainspec_path = self.base_directory.join(Self::CHAIN_SPEC_JSON_FILE); let chainspec_path = self.base_directory.join(Self::CHAIN_SPEC_JSON_FILE);
// This is the `OpenOptions` that we wish to use for all of the log files that we will be self.rpc_url = format!("http://127.0.0.1:{proxy_rpc_port}");
// opening in this method. We need to construct it in this way to:
// 1. Be consistent
// 2. Less verbose and more dry
// 3. Because the builder pattern uses mutable references so we need to get around that.
let open_options = {
let mut options = OpenOptions::new();
options.create(true).truncate(true).write(true);
options
};
// Start Substrate node let substrate_process = Process::new(
let substrate_stdout_logs_file = open_options "node",
.clone() self.logs_directory.as_path(),
.open(self.substrate_stdout_log_file_path()) self.node_binary.as_path(),
.context("Failed to open substrate stdout logs file")?; |command, stdout_file, stderr_file| {
let substrate_stderr_logs_file = open_options command
.clone() .arg("--dev")
.open(self.substrate_stderr_log_file_path()) .arg("--chain")
.context("Failed to open substrate stderr logs file")?; .arg(chainspec_path)
let node_binary_path = self.node_binary.as_path(); .arg("--base-path")
self.process_substrate = Command::new(node_binary_path) .arg(&self.base_directory)
.arg("--dev") .arg("--rpc-port")
.arg("--chain") .arg(substrate_rpc_port.to_string())
.arg(chainspec_path) .arg("--name")
.arg("--base-path") .arg(format!("revive-substrate-{}", self.id))
.arg(&self.base_directory) .arg("--force-authoring")
.arg("--rpc-port") .arg("--rpc-methods")
.arg(substrate_rpc_port.to_string()) .arg("Unsafe")
.arg("--name") .arg("--rpc-cors")
.arg(format!("revive-substrate-{}", self.id)) .arg("all")
.arg("--force-authoring") .arg("--rpc-max-connections")
.arg("--rpc-methods") .arg(u32::MAX.to_string())
.arg("Unsafe") .env("RUST_LOG", Self::SUBSTRATE_LOG_ENV)
.arg("--rpc-cors") .stdout(stdout_file)
.arg("all") .stderr(stderr_file);
.arg("--rpc-max-connections") },
.arg(u32::MAX.to_string()) ProcessReadinessWaitBehavior::TimeBoundedWaitFunction {
.env("RUST_LOG", Self::SUBSTRATE_LOG_ENV) max_wait_duration: Duration::from_secs(30),
.stdout( check_function: Box::new(|_, stderr_line| match stderr_line {
substrate_stdout_logs_file Some(line) => Ok(line.contains(Self::SUBSTRATE_READY_MARKER)),
.try_clone() None => Ok(false),
.context("Failed to clone substrate stdout log file handle")?, }),
) },
.stderr( );
substrate_stderr_logs_file match substrate_process {
.try_clone() Ok(process) => self.substrate_process = Some(process),
.context("Failed to clone substrate stderr log file handle")?, Err(err) => {
) tracing::error!(?err, "Failed to start substrate, shutting down gracefully");
.spawn() self.shutdown()
.context("Failed to spawn Substrate node process")? .context("Failed to gracefully shutdown after substrate start error")?;
.into(); return Err(err);
}
}
// Give the node a moment to boot let eth_proxy_process = Process::new(
if let Err(error) = Self::wait_ready( "proxy",
self.substrate_stderr_log_file_path().as_path(), self.logs_directory.as_path(),
Self::SUBSTRATE_READY_MARKER, self.eth_proxy_binary.as_path(),
Duration::from_secs(60), |command, stdout_file, stderr_file| {
) { command
self.shutdown() .arg("--dev")
.context("Failed to gracefully shutdown after Substrate start error")?; .arg("--rpc-port")
return Err(error); .arg(proxy_rpc_port.to_string())
}; .arg("--node-rpc-url")
.arg(format!("ws://127.0.0.1:{substrate_rpc_port}"))
let eth_proxy_stdout_logs_file = open_options .arg("--rpc-max-connections")
.clone() .arg(u32::MAX.to_string())
.open(self.proxy_stdout_log_file_path()) .env("RUST_LOG", Self::PROXY_LOG_ENV)
.context("Failed to open eth-proxy stdout logs file")?; .stdout(stdout_file)
let eth_proxy_stderr_logs_file = open_options .stderr(stderr_file);
.open(self.proxy_stderr_log_file_path()) },
.context("Failed to open eth-proxy stderr logs file")?; ProcessReadinessWaitBehavior::TimeBoundedWaitFunction {
self.process_proxy = Command::new(&self.eth_proxy_binary) max_wait_duration: Duration::from_secs(30),
.arg("--dev") check_function: Box::new(|_, stderr_line| match stderr_line {
.arg("--rpc-port") Some(line) => Ok(line.contains(Self::ETH_PROXY_READY_MARKER)),
.arg(proxy_rpc_port.to_string()) None => Ok(false),
.arg("--node-rpc-url") }),
.arg(format!("ws://127.0.0.1:{substrate_rpc_port}")) },
.arg("--rpc-max-connections") );
.arg(u32::MAX.to_string()) match eth_proxy_process {
.env("RUST_LOG", Self::PROXY_LOG_ENV) Ok(process) => self.eth_proxy_process = Some(process),
.stdout( Err(err) => {
eth_proxy_stdout_logs_file tracing::error!(?err, "Failed to start eth proxy, shutting down gracefully");
.try_clone() self.shutdown()
.context("Failed to clone eth-proxy stdout log file handle")?, .context("Failed to gracefully shutdown after eth proxy start error")?;
) return Err(err);
.stderr( }
eth_proxy_stderr_logs_file }
.try_clone()
.context("Failed to clone eth-proxy stderr log file handle")?,
)
.spawn()
.context("Failed to spawn eth-proxy process")?
.into();
if let Err(error) = Self::wait_ready(
self.proxy_stderr_log_file_path().as_path(),
Self::ETH_PROXY_READY_MARKER,
Duration::from_secs(60),
) {
self.shutdown()
.context("Failed to gracefully shutdown after eth-proxy start error")?;
return Err(error);
};
self.logs_file_to_flush.extend([
substrate_stdout_logs_file,
substrate_stderr_logs_file,
eth_proxy_stdout_logs_file,
eth_proxy_stderr_logs_file,
]);
Ok(()) Ok(())
} }
@@ -356,29 +321,6 @@ impl SubstrateNode {
account_id.to_ss58check() account_id.to_ss58check()
} }
fn wait_ready(logs_file_path: &Path, marker: &str, timeout: Duration) -> anyhow::Result<()> {
let start_time = std::time::Instant::now();
let logs_file = OpenOptions::new()
.read(true)
.write(false)
.append(false)
.truncate(false)
.open(logs_file_path)?;
let mut lines = std::io::BufReader::new(logs_file).lines();
loop {
if let Some(Ok(line)) = lines.next() {
if line.contains(marker) {
return Ok(());
}
}
if start_time.elapsed() > timeout {
anyhow::bail!("Timeout waiting for process readiness: {marker}");
}
}
}
pub fn eth_rpc_version(&self) -> anyhow::Result<String> { pub fn eth_rpc_version(&self) -> anyhow::Result<String> {
let output = Command::new(&self.eth_proxy_binary) let output = Command::new(&self.eth_proxy_binary)
.arg("--version") .arg("--version")
@@ -391,24 +333,6 @@ impl SubstrateNode {
Ok(String::from_utf8_lossy(&output).trim().to_string()) Ok(String::from_utf8_lossy(&output).trim().to_string())
} }
fn substrate_stdout_log_file_path(&self) -> PathBuf {
self.logs_directory
.join(Self::SUBSTRATE_STDOUT_LOG_FILE_NAME)
}
fn substrate_stderr_log_file_path(&self) -> PathBuf {
self.logs_directory
.join(Self::SUBSTRATE_STDERR_LOG_FILE_NAME)
}
fn proxy_stdout_log_file_path(&self) -> PathBuf {
self.logs_directory.join(Self::PROXY_STDOUT_LOG_FILE_NAME)
}
fn proxy_stderr_log_file_path(&self) -> PathBuf {
self.logs_directory.join(Self::PROXY_STDERR_LOG_FILE_NAME)
}
async fn provider( async fn provider(
&self, &self,
) -> anyhow::Result< ) -> anyhow::Result<
@@ -673,22 +597,8 @@ impl<F: TxFiller<ReviveNetwork>, P: Provider<ReviveNetwork>> ResolverApi
impl Node for SubstrateNode { impl Node for SubstrateNode {
fn shutdown(&mut self) -> anyhow::Result<()> { fn shutdown(&mut self) -> anyhow::Result<()> {
// Terminate the processes in a graceful manner to allow for the output to be flushed. drop(self.substrate_process.take());
if let Some(mut child) = self.process_proxy.take() { drop(self.eth_proxy_process.take());
child
.kill()
.map_err(|error| anyhow::anyhow!("Failed to kill the proxy process: {error:?}"))?;
}
if let Some(mut child) = self.process_substrate.take() {
child.kill().map_err(|error| {
anyhow::anyhow!("Failed to kill the Substrate process: {error:?}")
})?;
}
// Flushing the files that we're using for keeping the logs before shutdown.
for file in self.logs_file_to_flush.iter_mut() {
file.flush()?
}
// Remove the node's database so that subsequent runs do not run on the same database. We // Remove the node's database so that subsequent runs do not run on the same database. We
// ignore the error just in case the directory didn't exist in the first place and therefore // ignore the error just in case the directory didn't exist in the first place and therefore
@@ -1195,19 +1105,19 @@ mod tests {
(context, node) (context, node)
} }
/// A shared node that multiple tests can use. It starts up once. fn shared_state() -> &'static (TestExecutionContext, SubstrateNode) {
static STATE: LazyLock<(TestExecutionContext, SubstrateNode)> = LazyLock::new(new_node);
&STATE
}
fn shared_node() -> &'static SubstrateNode { fn shared_node() -> &'static SubstrateNode {
static NODE: LazyLock<(TestExecutionContext, SubstrateNode)> = LazyLock::new(|| { &shared_state().1
let (context, node) = new_node();
(context, node)
});
&NODE.1
} }
#[tokio::test] #[tokio::test]
async fn node_mines_simple_transfer_transaction_and_returns_receipt() { async fn node_mines_simple_transfer_transaction_and_returns_receipt() {
// Arrange // Arrange
let (context, node) = new_node(); let (context, node) = shared_state();
let provider = node.provider().await.expect("Failed to create provider"); let provider = node.provider().await.expect("Failed to create provider");
View File