Persist node logs

This commit is contained in:
Omar Abdulla
2025-07-12 21:04:29 +03:00
parent 0513a4befb
commit 50e1f0ccdf
6 changed files with 189 additions and 79 deletions
+72 -40
View File
@@ -2,15 +2,14 @@
use std::{
collections::HashMap,
fs::{File, create_dir_all, remove_dir_all},
fs::{File, OpenOptions, create_dir_all},
io::{BufRead, BufReader, Read, Write},
path::PathBuf,
process::{Child, Command, Stdio},
process::{Command, Stdio},
sync::{
Mutex,
atomic::{AtomicU32, Ordering},
},
thread,
time::{Duration, Instant},
};
@@ -28,6 +27,7 @@ use revive_dt_node_interaction::{
EthereumNode, nonce::fetch_onchain_nonce, trace::trace_transaction,
transaction::execute_transaction,
};
use subprocess::{Exec, Popen};
use crate::Node;
@@ -47,7 +47,7 @@ pub struct Instance {
data_directory: PathBuf,
geth: PathBuf,
id: u32,
handle: Option<Child>,
handle: Option<Popen>,
network_id: u64,
start_timeout: u64,
wallet: EthereumWallet,
@@ -65,8 +65,10 @@ impl Instance {
const ERROR_MARKER: &str = "Fatal:";
/// Create the node directory and call `geth init` to configure the genesis.
#[tracing::instrument(skip_all, fields(geth_node_id = self.id))]
fn init(&mut self, genesis: String) -> anyhow::Result<&mut Self> {
create_dir_all(&self.base_directory)?;
create_dir_all(&self.base_directory.join("logs"))?;
let genesis_path = self.base_directory.join(Self::GENESIS_JSON_FILE);
File::create(&genesis_path)?.write_all(genesis.as_bytes())?;
@@ -96,9 +98,19 @@ impl Instance {
/// Spawn the go-ethereum node child process.
///
/// [Instance::init] must be called priorly.
/// [Instance::init] must be called prior.
#[tracing::instrument(skip_all, fields(geth_node_id = self.id))]
fn spawn_process(&mut self) -> anyhow::Result<&mut Self> {
self.handle = Command::new(&self.geth)
let node_logs_file_path = self.base_directory.join("logs").join("node.log");
let node_logs_file = OpenOptions::new()
// Options to re-create and re-write to the file starting at offset zero. We do not want
// to re-use log files between runs. Users that want to keep their log files should pass
// in a different working directory between runs.
.create(true)
.truncate(true)
.write(true)
.open(&node_logs_file_path)?;
self.handle = Exec::cmd(&self.geth)
.arg("--dev")
.arg("--datadir")
.arg(&self.data_directory)
@@ -109,49 +121,58 @@ impl Instance {
.arg("--nodiscover")
.arg("--maxpeers")
.arg("0")
.stderr(Stdio::piped())
.stdout(Stdio::null())
.spawn()?
// We pipe both stdout and stderr to the same log file and therefore we're persisting
// both. In the implementation of [`std::fs::File`] the `try_clone` method will ensure
// that both [`std::fs::File`] objects have the same seeks and offsets and therefore we
// don't have to worry about either streams overriding each other.
.stderr(node_logs_file.try_clone()?)
.stdout(node_logs_file)
.popen()?
.into();
if let Err(error) = self.wait_ready() {
tracing::error!(?error, "Failed to start geth, shutting down gracefully");
self.shutdown()?;
return Err(error);
}
Ok(self)
}
/// Wait for the g-ethereum node child process getting ready.
///
/// [Instance::spawn_process] must be called priorly.
#[tracing::instrument(skip_all, fields(geth_node_id = self.id))]
fn wait_ready(&mut self) -> anyhow::Result<&mut Self> {
// Thanks clippy but geth is a server; we don't `wait` but eventually kill it.
#[allow(clippy::zombie_processes)]
let mut child = self.handle.take().expect("should be spawned");
let start_time = Instant::now();
let maximum_wait_time = Duration::from_millis(self.start_timeout);
let mut stderr = BufReader::new(child.stderr.take().expect("should be piped")).lines();
let error = loop {
let Some(Ok(line)) = stderr.next() else {
break "child process stderr reading error".to_string();
};
if line.contains(Self::ERROR_MARKER) {
break line;
}
if line.contains(Self::READY_MARKER) {
// Keep stderr alive
// https://github.com/alloy-rs/alloy/issues/2091#issuecomment-2676134147
thread::spawn(move || for _ in stderr.by_ref() {});
self.handle = child.into();
return Ok(self);
let logs_file = OpenOptions::new()
.read(true)
.write(false)
.append(false)
.truncate(false)
.open(self.base_directory.join("logs").join("node.log"))?;
let maximum_wait_time = Duration::from_millis(self.start_timeout);
let mut stderr = BufReader::new(logs_file).lines();
loop {
if let Some(Ok(line)) = stderr.next() {
if line.contains(Self::ERROR_MARKER) {
anyhow::bail!("Failed to start geth {line}");
}
if line.contains(Self::READY_MARKER) {
return Ok(self);
}
}
if Instant::now().duration_since(start_time) > maximum_wait_time {
break "spawn timeout".to_string();
anyhow::bail!("Timeout in starting geth");
}
};
let _ = child.kill();
anyhow::bail!("geth node #{} spawn error: {error}", self.id)
}
}
}
impl EthereumNode for Instance {
#[tracing::instrument(skip_all, fields(geth_node_id = self.id))]
fn execute_transaction(
&self,
transaction: TransactionRequest,
@@ -173,6 +194,7 @@ impl EthereumNode for Instance {
}))
}
#[tracing::instrument(skip_all, fields(geth_node_id = self.id))]
fn trace_transaction(
&self,
transaction: TransactionReceipt,
@@ -195,6 +217,7 @@ impl EthereumNode for Instance {
}))
}
#[tracing::instrument(skip_all, fields(geth_node_id = self.id))]
fn state_diff(
&self,
transaction: alloy::rpc::types::TransactionReceipt,
@@ -208,6 +231,7 @@ impl EthereumNode for Instance {
}
}
#[tracing::instrument(skip_all, fields(geth_node_id = self.id))]
fn fetch_add_nonce(&self, address: Address) -> anyhow::Result<u64> {
let connection_string = self.connection_string.clone();
let wallet = self.wallet.clone();
@@ -242,19 +266,31 @@ impl Node for Instance {
}
}
#[tracing::instrument(skip_all, fields(geth_node_id = self.id))]
fn connection_string(&self) -> String {
self.connection_string.clone()
}
fn shutdown(self) -> anyhow::Result<()> {
#[tracing::instrument(skip_all, fields(geth_node_id = self.id))]
fn shutdown(&mut self) -> anyhow::Result<()> {
if let Some(mut child) = self.handle.take() {
child.terminate().map_err(|error| {
anyhow::anyhow!("Failed to terminate the geth process: {error:?}")
})?;
child.wait().map_err(|error| {
anyhow::anyhow!("Failed to wait for the termination of the geth process: {error:?}")
})?;
}
Ok(())
}
#[tracing::instrument(skip_all, fields(geth_node_id = self.id))]
fn spawn(&mut self, genesis: String) -> anyhow::Result<()> {
self.init(genesis)?.spawn_process()?.wait_ready()?;
self.init(genesis)?.spawn_process()?;
Ok(())
}
#[tracing::instrument(skip_all, fields(geth_node_id = self.id))]
fn version(&self) -> anyhow::Result<String> {
let output = Command::new(&self.geth)
.arg("--version")
@@ -269,13 +305,9 @@ impl Node for Instance {
}
impl Drop for Instance {
#[tracing::instrument(skip_all, fields(geth_node_id = self.id))]
fn drop(&mut self) {
if let Some(child) = self.handle.as_mut() {
let _ = child.kill();
}
if self.base_directory.exists() {
let _ = remove_dir_all(&self.base_directory);
}
self.shutdown().expect("Failed to shutdown")
}
}
+103 -38
View File
@@ -1,9 +1,9 @@
use std::{
collections::HashMap,
fs::create_dir_all,
fs::{OpenOptions, create_dir_all},
io::BufRead,
path::PathBuf,
process::{Child, Command, Stdio},
path::{Path, PathBuf},
process::{Command, Stdio},
sync::{
Mutex,
atomic::{AtomicU32, Ordering},
@@ -30,6 +30,7 @@ use revive_dt_node_interaction::{
EthereumNode, nonce::fetch_onchain_nonce, trace::trace_transaction,
transaction::execute_transaction,
};
use subprocess::{Exec, Popen};
use crate::Node;
@@ -43,8 +44,8 @@ pub struct KitchensinkNode {
rpc_url: String,
wallet: EthereumWallet,
base_directory: PathBuf,
process_substrate: Option<Child>,
process_proxy: Option<Child>,
process_substrate: Option<Popen>,
process_proxy: Option<Popen>,
nonces: Mutex<HashMap<Address, u64>>,
}
@@ -59,11 +60,15 @@ impl KitchensinkNode {
const SUBSTRATE_LOG_ENV: &str = "error,evm=debug,sc_rpc_server=info,runtime::revive=debug";
const PROXY_LOG_ENV: &str = "info,eth-rpc=debug";
#[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))]
fn init(&mut self, genesis: &str) -> anyhow::Result<&mut Self> {
create_dir_all(&self.base_directory)?;
create_dir_all(&self.base_directory.join("logs"))?;
let template_chainspec_path = self.base_directory.join(Self::CHAIN_SPEC_JSON_FILE);
// Note: we do not pipe the logs of this process to a separate file since this is just a
// once-off export of the default chain spec and not part of the long-running node process.
let output = Command::new(&self.substrate_binary)
.arg("export-chain-spec")
.arg("--chain")
@@ -112,6 +117,7 @@ impl KitchensinkNode {
Ok(self)
}
#[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))]
fn spawn_process(&mut self) -> anyhow::Result<()> {
let substrate_rpc_port = Self::BASE_SUBSTRATE_RPC_PORT + self.id as u16;
let proxy_rpc_port = Self::BASE_PROXY_RPC_PORT + self.id as u16;
@@ -120,8 +126,19 @@ impl KitchensinkNode {
let chainspec_path = self.base_directory.join(Self::CHAIN_SPEC_JSON_FILE);
let logs_directory_path = self.base_directory.join("logs");
// Start Substrate node
let mut substrate_process = Command::new(&self.substrate_binary)
let substrate_logs_file_path = logs_directory_path.join("node.log");
let substrate_logs_file = OpenOptions::new()
// Options to re-create and re-write to the file starting at offset zero. We do not want
// to re-use log files between runs. Users that want to keep their log files should pass
// in a different working directory between runs.
.create(true)
.truncate(true)
.write(true)
.open(&substrate_logs_file_path)?;
self.process_substrate = Exec::cmd(&self.substrate_binary)
.arg("--chain")
.arg(chainspec_path)
.arg("--base-path")
@@ -136,40 +153,67 @@ impl KitchensinkNode {
.arg("--rpc-cors")
.arg("all")
.env("RUST_LOG", Self::SUBSTRATE_LOG_ENV)
.stdout(Stdio::null())
.stderr(Stdio::piped())
.spawn()?;
// We pipe both stdout and stderr to the same log file and therefore we're persisting
// both. In the implementation of [`std::fs::File`] the `try_clone` method will ensure
// that both [`std::fs::File`] objects have the same seeks and offsets and therefore we
// don't have to worry about either streams overriding each other.
.stdout(substrate_logs_file.try_clone()?)
.stderr(substrate_logs_file)
.popen()?
.into();
// Give the node a moment to boot
Self::wait_ready(
&mut substrate_process,
if let Err(error) = Self::wait_ready(
substrate_logs_file_path.as_path(),
Self::SUBSTRATE_READY_MARKER,
Duration::from_secs(30),
)?;
) {
tracing::error!(
?error,
"Failed to start substrate, shutting down gracefully"
);
self.shutdown()?;
return Err(error);
};
let mut proxy_process = Command::new(&self.eth_proxy_binary)
let proxy_logs_file_path = logs_directory_path.join("proxy.log");
let proxy_logs_file = OpenOptions::new()
// Options to re-create and re-write to the file starting at offset zero. We do not want
// to re-use log files between runs. Users that want to keep their log files should pass
// in a different working directory between runs.
.create(true)
.truncate(true)
.write(true)
.open(&proxy_logs_file_path)?;
self.process_proxy = Exec::cmd(&self.eth_proxy_binary)
.arg("--dev")
.arg("--rpc-port")
.arg(proxy_rpc_port.to_string())
.arg("--node-rpc-url")
.arg(format!("ws://127.0.0.1:{substrate_rpc_port}"))
.env("RUST_LOG", Self::PROXY_LOG_ENV)
.stdout(Stdio::null())
.stderr(Stdio::piped())
.spawn()?;
// We pipe both stdout and stderr to the same log file and therefore we're persisting
// both. In the implementation of [`std::fs::File`] the `try_clone` method will ensure
// that both [`std::fs::File`] objects have the same seeks and offsets and therefore we
// don't have to worry about either streams overriding each other.
.stdout(proxy_logs_file.try_clone()?)
.stderr(proxy_logs_file)
.popen()?
.into();
Self::wait_ready(
&mut proxy_process,
if let Err(error) = Self::wait_ready(
proxy_logs_file_path.as_path(),
Self::ETH_PROXY_READY_MARKER,
Duration::from_secs(30),
)?;
self.process_substrate = Some(substrate_process);
self.process_proxy = Some(proxy_process);
) {
tracing::error!(?error, "Failed to start proxy, shutting down gracefully");
self.shutdown()?;
return Err(error);
};
Ok(())
}
#[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))]
fn extract_balance_from_genesis_file(
&self,
genesis_str: &str,
@@ -210,27 +254,30 @@ impl KitchensinkNode {
Ok(account_id.to_ss58check())
}
fn wait_ready(child: &mut Child, marker: &str, timeout: Duration) -> anyhow::Result<()> {
fn wait_ready(logs_file_path: &Path, marker: &str, timeout: Duration) -> anyhow::Result<()> {
let start_time = std::time::Instant::now();
let stderr = child.stderr.take().expect("stderr must be piped");
let logs_file = OpenOptions::new()
.read(true)
.write(false)
.append(false)
.truncate(false)
.open(logs_file_path)?;
let mut lines = std::io::BufReader::new(stderr).lines();
let mut lines = std::io::BufReader::new(logs_file).lines();
loop {
if let Some(Ok(line)) = lines.next() {
println!("Kitchensink log: {line:?}");
if line.contains(marker) {
std::thread::spawn(move || for _ in lines.by_ref() {});
return Ok(());
}
}
if start_time.elapsed() > timeout {
let _ = child.kill();
anyhow::bail!("Timeout waiting for process readiness: {marker}");
}
}
}
#[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))]
pub fn eth_rpc_version(&self) -> anyhow::Result<String> {
let output = Command::new(&self.eth_proxy_binary)
.arg("--version")
@@ -245,6 +292,7 @@ impl KitchensinkNode {
}
impl EthereumNode for KitchensinkNode {
#[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))]
fn execute_transaction(
&self,
transaction: alloy::rpc::types::TransactionRequest,
@@ -266,6 +314,7 @@ impl EthereumNode for KitchensinkNode {
}))
}
#[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))]
fn trace_transaction(
&self,
transaction: TransactionReceipt,
@@ -289,6 +338,7 @@ impl EthereumNode for KitchensinkNode {
}))
}
#[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))]
fn state_diff(&self, transaction: TransactionReceipt) -> anyhow::Result<DiffMode> {
match self
.trace_transaction(transaction)?
@@ -299,6 +349,7 @@ impl EthereumNode for KitchensinkNode {
}
}
#[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))]
fn fetch_add_nonce(&self, address: Address) -> anyhow::Result<u64> {
let url = self.rpc_url.clone();
let wallet = self.wallet.clone();
@@ -332,24 +383,42 @@ impl Node for KitchensinkNode {
}
}
#[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))]
fn connection_string(&self) -> String {
self.rpc_url.clone()
}
fn shutdown(mut self) -> anyhow::Result<()> {
#[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))]
fn shutdown(&mut self) -> anyhow::Result<()> {
if let Some(mut child) = self.process_proxy.take() {
let _ = child.kill();
child.terminate().map_err(|error| {
anyhow::anyhow!("Failed to terminate the proxy process: {error:?}")
})?;
child.wait().map_err(|error| {
anyhow::anyhow!(
"Failed to wait for the termination of the proxy process: {error:?}"
)
})?;
}
if let Some(mut child) = self.process_substrate.take() {
let _ = child.kill();
child.terminate().map_err(|error| {
anyhow::anyhow!("Failed to terminate the substrate process: {error:?}")
})?;
child.wait().map_err(|error| {
anyhow::anyhow!(
"Failed to wait for the termination of the substrate process: {error:?}"
)
})?;
}
Ok(())
}
#[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))]
fn spawn(&mut self, genesis: String) -> anyhow::Result<()> {
self.init(&genesis)?.spawn_process()
}
#[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))]
fn version(&self) -> anyhow::Result<String> {
let output = Command::new(&self.substrate_binary)
.arg("--version")
@@ -364,13 +433,9 @@ impl Node for KitchensinkNode {
}
impl Drop for KitchensinkNode {
#[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))]
fn drop(&mut self) {
if let Some(mut child) = self.process_proxy.take() {
let _ = child.kill();
}
if let Some(mut child) = self.process_substrate.take() {
let _ = child.kill();
}
self.shutdown().expect("Failed to shutdown")
}
}
+1 -1
View File
@@ -23,7 +23,7 @@ pub trait Node: EthereumNode {
/// Prune the node instance and related data.
///
/// Blocking until it's completely stopped.
fn shutdown(self) -> anyhow::Result<()>;
fn shutdown(&mut self) -> anyhow::Result<()>;
/// Returns the nodes connection string.
fn connection_string(&self) -> String;