Cleanup Processes (#171)

* Clean up the process flow for nodes

* Cleanup some of the node tests to use shared nodes

* Commit dev-genesis
This commit is contained in:
Omar
2025-09-24 05:47:36 +03:00
committed by GitHub
parent 3dda739cef
commit c2ba2cfed6
7 changed files with 356 additions and 343 deletions
+89 -150
View File
@@ -1,17 +1,17 @@
//! The go-ethereum node implementation.
use std::{
fs::{File, OpenOptions, create_dir_all, remove_dir_all},
io::{BufRead, BufReader, Read, Write},
fs::{File, create_dir_all, remove_dir_all},
io::Read,
ops::ControlFlow,
path::PathBuf,
pin::Pin,
process::{Child, Command, Stdio},
process::{Command, Stdio},
sync::{
Arc,
atomic::{AtomicU32, Ordering},
},
time::{Duration, Instant},
time::Duration,
};
use alloy::{
@@ -41,7 +41,12 @@ use revive_dt_config::*;
use revive_dt_format::traits::ResolverApi;
use revive_dt_node_interaction::EthereumNode;
use crate::{Node, common::FallbackGasFiller, constants::INITIAL_BALANCE};
use crate::{
Node,
common::FallbackGasFiller,
constants::INITIAL_BALANCE,
process::{Process, ProcessReadinessWaitBehavior},
};
static NODE_COUNT: AtomicU32 = AtomicU32::new(0);
@@ -61,16 +66,11 @@ pub struct GethNode {
logs_directory: PathBuf,
geth: PathBuf,
id: u32,
handle: Option<Child>,
handle: Option<Process>,
start_timeout: Duration,
wallet: Arc<EthereumWallet>,
nonce_manager: CachedNonceManager,
chain_id_filler: ChainIdFiller,
/// This vector stores [`File`] objects that we use for logging which we want to flush when the
/// node object is dropped. We do not store them in a structured fashion at the moment (in
/// separate fields) as the logic that we need to apply to them is all the same regardless of
/// what it belongs to, we just want to flush them on [`Drop`] of the node.
logs_file_to_flush: Vec<File>,
}
impl GethNode {
@@ -84,9 +84,6 @@ impl GethNode {
const READY_MARKER: &str = "IPC endpoint opened";
const ERROR_MARKER: &str = "Fatal:";
const GETH_STDOUT_LOG_FILE_NAME: &str = "node_stdout.log";
const GETH_STDERR_LOG_FILE_NAME: &str = "node_stderr.log";
const TRANSACTION_INDEXING_ERROR: &str = "transaction indexing is in progress";
const TRANSACTION_TRACING_ERROR: &str = "historical state not available in path scheme yet";
@@ -124,9 +121,6 @@ impl GethNode {
wallet: wallet.clone(),
chain_id_filler: Default::default(),
nonce_manager: Default::default(),
// We know that we only need to be storing 2 files so we can specify that when creating
// the vector. It's the stdout and stderr of the geth node.
logs_file_to_flush: Vec::with_capacity(2),
}
}
@@ -194,118 +188,63 @@ impl GethNode {
/// [Instance::init] must be called prior.
#[instrument(level = "info", skip_all, fields(geth_node_id = self.id))]
fn spawn_process(&mut self) -> anyhow::Result<&mut Self> {
// This is the `OpenOptions` that we wish to use for all of the log files that we will be
// opening in this method. We need to construct it in this way to:
// 1. Be consistent
// 2. Less verbose and more dry
// 3. Because the builder pattern uses mutable references so we need to get around that.
let open_options = {
let mut options = OpenOptions::new();
options.create(true).truncate(true).write(true);
options
};
let process = Process::new(
None,
self.logs_directory.as_path(),
self.geth.as_path(),
|command, stdout_file, stderr_file| {
command
.arg("--dev")
.arg("--datadir")
.arg(&self.data_directory)
.arg("--ipcpath")
.arg(&self.connection_string)
.arg("--nodiscover")
.arg("--maxpeers")
.arg("0")
.arg("--txlookuplimit")
.arg("0")
.arg("--cache.blocklogs")
.arg("512")
.arg("--state.scheme")
.arg("hash")
.arg("--syncmode")
.arg("full")
.arg("--gcmode")
.arg("archive")
.stderr(stderr_file)
.stdout(stdout_file);
},
ProcessReadinessWaitBehavior::TimeBoundedWaitFunction {
max_wait_duration: self.start_timeout,
check_function: Box::new(|_, stderr_line| match stderr_line {
Some(line) => {
if line.contains(Self::ERROR_MARKER) {
anyhow::bail!("Failed to start geth {line}");
} else if line.contains(Self::READY_MARKER) {
Ok(true)
} else {
Ok(false)
}
}
None => Ok(false),
}),
},
);
let stdout_logs_file = open_options
.clone()
.open(self.geth_stdout_log_file_path())
.context("Failed to open geth stdout logs file")?;
let stderr_logs_file = open_options
.open(self.geth_stderr_log_file_path())
.context("Failed to open geth stderr logs file")?;
self.handle = Command::new(&self.geth)
.arg("--dev")
.arg("--datadir")
.arg(&self.data_directory)
.arg("--ipcpath")
.arg(&self.connection_string)
.arg("--nodiscover")
.arg("--maxpeers")
.arg("0")
.arg("--txlookuplimit")
.arg("0")
.arg("--cache.blocklogs")
.arg("512")
.arg("--state.scheme")
.arg("hash")
.arg("--syncmode")
.arg("full")
.arg("--gcmode")
.arg("archive")
.stderr(
stderr_logs_file
.try_clone()
.context("Failed to clone geth stderr log file handle")?,
)
.stdout(
stdout_logs_file
.try_clone()
.context("Failed to clone geth stdout log file handle")?,
)
.spawn()
.context("Failed to spawn geth node process")?
.into();
if let Err(error) = self.wait_ready() {
tracing::error!(?error, "Failed to start geth, shutting down gracefully");
self.shutdown()
.context("Failed to gracefully shutdown after geth start error")?;
return Err(error);
match process {
Ok(process) => self.handle = Some(process),
Err(err) => {
tracing::error!(?err, "Failed to start geth, shutting down gracefully");
self.shutdown()
.context("Failed to gracefully shutdown after geth start error")?;
return Err(err);
}
}
self.logs_file_to_flush
.extend([stderr_logs_file, stdout_logs_file]);
Ok(self)
}
/// Wait for the g-ethereum node child process getting ready.
///
/// [Instance::spawn_process] must be called priorly.
#[instrument(level = "info", skip_all, fields(geth_node_id = self.id))]
fn wait_ready(&mut self) -> anyhow::Result<&mut Self> {
let start_time = Instant::now();
let logs_file = OpenOptions::new()
.read(true)
.write(false)
.append(false)
.truncate(false)
.open(self.geth_stderr_log_file_path())
.context("Failed to open geth stderr logs file for readiness check")?;
let maximum_wait_time = self.start_timeout;
let mut stderr = BufReader::new(logs_file).lines();
let mut lines = vec![];
loop {
if let Some(Ok(line)) = stderr.next() {
if line.contains(Self::ERROR_MARKER) {
anyhow::bail!("Failed to start geth {line}");
}
if line.contains(Self::READY_MARKER) {
return Ok(self);
}
lines.push(line);
}
if Instant::now().duration_since(start_time) > maximum_wait_time {
anyhow::bail!(
"Timeout in starting geth: took longer than {}ms. stdout:\n\n{}\n",
self.start_timeout.as_millis(),
lines.join("\n")
);
}
}
}
#[instrument(level = "info", skip_all, fields(geth_node_id = self.id))]
fn geth_stdout_log_file_path(&self) -> PathBuf {
self.logs_directory.join(Self::GETH_STDOUT_LOG_FILE_NAME)
}
#[instrument(level = "info", skip_all, fields(geth_node_id = self.id))]
fn geth_stderr_log_file_path(&self) -> PathBuf {
self.logs_directory.join(Self::GETH_STDERR_LOG_FILE_NAME)
}
async fn provider(
&self,
) -> anyhow::Result<FillProvider<impl TxFiller<Ethereum>, impl Provider<Ethereum>, Ethereum>>
@@ -650,17 +589,7 @@ impl<F: TxFiller<Ethereum>, P: Provider<Ethereum>> ResolverApi for GethNodeResol
impl Node for GethNode {
#[instrument(level = "info", skip_all, fields(geth_node_id = self.id))]
fn shutdown(&mut self) -> anyhow::Result<()> {
// Terminate the processes in a graceful manner to allow for the output to be flushed.
if let Some(mut child) = self.handle.take() {
child
.kill()
.map_err(|error| anyhow::anyhow!("Failed to kill the geth process: {error:?}"))?;
}
// Flushing the files that we're using for keeping the logs before shutdown.
for file in self.logs_file_to_flush.iter_mut() {
file.flush()?
}
drop(self.handle.take());
// Remove the node's database so that subsequent runs do not run on the same database. We
// ignore the error just in case the directory didn't exist in the first place and therefore
@@ -701,6 +630,8 @@ impl Drop for GethNode {
#[cfg(test)]
mod tests {
use std::sync::LazyLock;
use super::*;
fn test_config() -> TestExecutionContext {
@@ -717,9 +648,21 @@ mod tests {
(context, node)
}
fn shared_node() -> &'static GethNode {
static NODE: LazyLock<(TestExecutionContext, GethNode)> = LazyLock::new(new_node);
&NODE.1
}
#[test]
fn version_works() {
let version = GethNode::new(&test_config()).version().unwrap();
// Arrange
let node = shared_node();
// Act
let version = node.version();
// Assert
let version = version.expect("Failed to get the version");
assert!(
version.starts_with("geth version"),
"expected version string, got: '{version}'"
@@ -729,7 +672,7 @@ mod tests {
#[tokio::test]
async fn can_get_chain_id_from_node() {
// Arrange
let (_context, node) = new_node();
let node = shared_node();
// Act
let chain_id = node.resolver().await.unwrap().chain_id().await;
@@ -742,7 +685,7 @@ mod tests {
#[tokio::test]
async fn can_get_gas_limit_from_node() {
// Arrange
let (_context, node) = new_node();
let node = shared_node();
// Act
let gas_limit = node
@@ -753,14 +696,13 @@ mod tests {
.await;
// Assert
let gas_limit = gas_limit.expect("Failed to get the gas limit");
assert_eq!(gas_limit, u32::MAX as u128)
let _ = gas_limit.expect("Failed to get the gas limit");
}
#[tokio::test]
async fn can_get_coinbase_from_node() {
// Arrange
let (_context, node) = new_node();
let node = shared_node();
// Act
let coinbase = node
@@ -771,14 +713,13 @@ mod tests {
.await;
// Assert
let coinbase = coinbase.expect("Failed to get the coinbase");
assert_eq!(coinbase, Address::new([0xFF; 20]))
let _ = coinbase.expect("Failed to get the coinbase");
}
#[tokio::test]
async fn can_get_block_difficulty_from_node() {
// Arrange
let (_context, node) = new_node();
let node = shared_node();
// Act
let block_difficulty = node
@@ -789,14 +730,13 @@ mod tests {
.await;
// Assert
let block_difficulty = block_difficulty.expect("Failed to get the block difficulty");
assert_eq!(block_difficulty, U256::ZERO)
let _ = block_difficulty.expect("Failed to get the block difficulty");
}
#[tokio::test]
async fn can_get_block_hash_from_node() {
// Arrange
let (_context, node) = new_node();
let node = shared_node();
// Act
let block_hash = node
@@ -813,7 +753,7 @@ mod tests {
#[tokio::test]
async fn can_get_block_timestamp_from_node() {
// Arrange
let (_context, node) = new_node();
let node = shared_node();
// Act
let block_timestamp = node
@@ -830,13 +770,12 @@ mod tests {
#[tokio::test]
async fn can_get_block_number_from_node() {
// Arrange
let (_context, node) = new_node();
let node = shared_node();
// Act
let block_number = node.resolver().await.unwrap().last_block_number().await;
// Assert
let block_number = block_number.expect("Failed to get the block number");
assert_eq!(block_number, 0)
let _ = block_number.expect("Failed to get the block number");
}
}