Persist node logs (#36)

* Persist node logs

* Fix clippy lints

* Delete the node's db on shutdown but persist logs

* Fix tests

* Separate stdout and stderr and use more consts.

* More consistent handling of open options

* Revert the use of subprocess

* Remove outdated comment

* Flush the log files on drop

* Rename `log_files` -> `logs_file_to_flush`
This commit is contained in:
Omar
2025-07-14 19:08:47 +03:00
committed by GitHub
parent 5eb3a0e1b5
commit b204de5484
3 changed files with 250 additions and 72 deletions
+144 -35
View File
@@ -1,8 +1,8 @@
use std::{
collections::HashMap,
fs::create_dir_all,
io::BufRead,
path::PathBuf,
fs::{File, OpenOptions, create_dir_all, remove_dir_all},
io::{BufRead, Write},
path::{Path, PathBuf},
process::{Child, Command, Stdio},
sync::{
Mutex,
@@ -30,6 +30,7 @@ use serde::{Deserialize, Serialize};
use serde_json::{Value as JsonValue, json};
use sp_core::crypto::Ss58Codec;
use sp_runtime::AccountId32;
use tracing::Level;
use revive_dt_config::Arguments;
use revive_dt_node_interaction::{
@@ -49,13 +50,22 @@ pub struct KitchensinkNode {
rpc_url: String,
wallet: EthereumWallet,
base_directory: PathBuf,
logs_directory: PathBuf,
process_substrate: Option<Child>,
process_proxy: Option<Child>,
nonces: Mutex<HashMap<Address, u64>>,
/// This vector stores [`File`] objects that we use for logging which we want to flush when the
/// node object is dropped. We do not store them in a structured fashion at the moment (in
/// separate fields) as the logic that we need to apply to them is all the same regardless of
/// what it belongs to, we just want to flush them on [`Drop`] of the node.
logs_file_to_flush: Vec<File>,
}
impl KitchensinkNode {
const BASE_DIRECTORY: &str = "kitchensink";
const LOGS_DIRECTORY: &str = "logs";
const DATA_DIRECTORY: &str = "chains";
const SUBSTRATE_READY_MARKER: &str = "Running JSON-RPC server";
const ETH_PROXY_READY_MARKER: &str = "Running JSON-RPC server";
const CHAIN_SPEC_JSON_FILE: &str = "template_chainspec.json";
@@ -65,11 +75,21 @@ impl KitchensinkNode {
const SUBSTRATE_LOG_ENV: &str = "error,evm=debug,sc_rpc_server=info,runtime::revive=debug";
const PROXY_LOG_ENV: &str = "info,eth-rpc=debug";
const KITCHENSINK_STDOUT_LOG_FILE_NAME: &str = "node_stdout.log";
const KITCHENSINK_STDERR_LOG_FILE_NAME: &str = "node_stderr.log";
const PROXY_STDOUT_LOG_FILE_NAME: &str = "proxy_stdout.log";
const PROXY_STDERR_LOG_FILE_NAME: &str = "proxy_stderr.log";
#[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))]
fn init(&mut self, genesis: &str) -> anyhow::Result<&mut Self> {
create_dir_all(&self.base_directory)?;
create_dir_all(&self.logs_directory)?;
let template_chainspec_path = self.base_directory.join(Self::CHAIN_SPEC_JSON_FILE);
// Note: we do not pipe the logs of this process to a separate file since this is just a
// once-off export of the default chain spec and not part of the long-running node process.
let output = Command::new(&self.substrate_binary)
.arg("export-chain-spec")
.arg("--chain")
@@ -118,6 +138,7 @@ impl KitchensinkNode {
Ok(self)
}
#[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))]
fn spawn_process(&mut self) -> anyhow::Result<()> {
let substrate_rpc_port = Self::BASE_SUBSTRATE_RPC_PORT + self.id as u16;
let proxy_rpc_port = Self::BASE_PROXY_RPC_PORT + self.id as u16;
@@ -126,8 +147,25 @@ impl KitchensinkNode {
let chainspec_path = self.base_directory.join(Self::CHAIN_SPEC_JSON_FILE);
// This is the `OpenOptions` that we wish to use for all of the log files that we will be
// opening in this method. We need to construct it in this way to:
// 1. Be consistent
// 2. Less verbose and more dry
// 3. Because the builder pattern uses mutable references so we need to get around that.
let open_options = {
let mut options = OpenOptions::new();
options.create(true).truncate(true).write(true);
options
};
// Start Substrate node
let mut substrate_process = Command::new(&self.substrate_binary)
let kitchensink_stdout_logs_file = open_options
.clone()
.open(self.kitchensink_stdout_log_file_path())?;
let kitchensink_stderr_logs_file = open_options
.clone()
.open(self.kitchensink_stderr_log_file_path())?;
self.process_substrate = Command::new(&self.substrate_binary)
.arg("--chain")
.arg(chainspec_path)
.arg("--base-path")
@@ -142,40 +180,61 @@ impl KitchensinkNode {
.arg("--rpc-cors")
.arg("all")
.env("RUST_LOG", Self::SUBSTRATE_LOG_ENV)
.stdout(Stdio::null())
.stderr(Stdio::piped())
.spawn()?;
.stdout(kitchensink_stdout_logs_file.try_clone()?)
.stderr(kitchensink_stderr_logs_file.try_clone()?)
.spawn()?
.into();
// Give the node a moment to boot
Self::wait_ready(
&mut substrate_process,
if let Err(error) = Self::wait_ready(
self.kitchensink_stderr_log_file_path().as_path(),
Self::SUBSTRATE_READY_MARKER,
Duration::from_secs(30),
)?;
) {
tracing::error!(
?error,
"Failed to start substrate, shutting down gracefully"
);
self.shutdown()?;
return Err(error);
};
let mut proxy_process = Command::new(&self.eth_proxy_binary)
let eth_proxy_stdout_logs_file = open_options
.clone()
.open(self.proxy_stdout_log_file_path())?;
let eth_proxy_stderr_logs_file = open_options.open(self.proxy_stderr_log_file_path())?;
self.process_proxy = Command::new(&self.eth_proxy_binary)
.arg("--dev")
.arg("--rpc-port")
.arg(proxy_rpc_port.to_string())
.arg("--node-rpc-url")
.arg(format!("ws://127.0.0.1:{substrate_rpc_port}"))
.env("RUST_LOG", Self::PROXY_LOG_ENV)
.stdout(Stdio::null())
.stderr(Stdio::piped())
.spawn()?;
.stdout(eth_proxy_stdout_logs_file.try_clone()?)
.stderr(eth_proxy_stderr_logs_file.try_clone()?)
.spawn()?
.into();
Self::wait_ready(
&mut proxy_process,
if let Err(error) = Self::wait_ready(
self.proxy_stderr_log_file_path().as_path(),
Self::ETH_PROXY_READY_MARKER,
Duration::from_secs(30),
)?;
) {
tracing::error!(?error, "Failed to start proxy, shutting down gracefully");
self.shutdown()?;
return Err(error);
};
self.process_substrate = Some(substrate_process);
self.process_proxy = Some(proxy_process);
self.logs_file_to_flush.extend([
kitchensink_stdout_logs_file,
kitchensink_stderr_logs_file,
eth_proxy_stdout_logs_file,
eth_proxy_stderr_logs_file,
]);
Ok(())
}
#[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))]
fn extract_balance_from_genesis_file(
&self,
genesis_str: &str,
@@ -216,27 +275,30 @@ impl KitchensinkNode {
Ok(account_id.to_ss58check())
}
fn wait_ready(child: &mut Child, marker: &str, timeout: Duration) -> anyhow::Result<()> {
fn wait_ready(logs_file_path: &Path, marker: &str, timeout: Duration) -> anyhow::Result<()> {
let start_time = std::time::Instant::now();
let stderr = child.stderr.take().expect("stderr must be piped");
let logs_file = OpenOptions::new()
.read(true)
.write(false)
.append(false)
.truncate(false)
.open(logs_file_path)?;
let mut lines = std::io::BufReader::new(stderr).lines();
let mut lines = std::io::BufReader::new(logs_file).lines();
loop {
if let Some(Ok(line)) = lines.next() {
println!("Kitchensink log: {line:?}");
if line.contains(marker) {
std::thread::spawn(move || for _ in lines.by_ref() {});
return Ok(());
}
}
if start_time.elapsed() > timeout {
let _ = child.kill();
anyhow::bail!("Timeout waiting for process readiness: {marker}");
}
}
}
#[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))]
pub fn eth_rpc_version(&self) -> anyhow::Result<String> {
let output = Command::new(&self.eth_proxy_binary)
.arg("--version")
@@ -248,9 +310,32 @@ impl KitchensinkNode {
.stdout;
Ok(String::from_utf8_lossy(&output).trim().to_string())
}
#[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id), level = Level::TRACE)]
fn kitchensink_stdout_log_file_path(&self) -> PathBuf {
self.logs_directory
.join(Self::KITCHENSINK_STDOUT_LOG_FILE_NAME)
}
#[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id), level = Level::TRACE)]
fn kitchensink_stderr_log_file_path(&self) -> PathBuf {
self.logs_directory
.join(Self::KITCHENSINK_STDERR_LOG_FILE_NAME)
}
#[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id), level = Level::TRACE)]
fn proxy_stdout_log_file_path(&self) -> PathBuf {
self.logs_directory.join(Self::PROXY_STDOUT_LOG_FILE_NAME)
}
#[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id), level = Level::TRACE)]
fn proxy_stderr_log_file_path(&self) -> PathBuf {
self.logs_directory.join(Self::PROXY_STDERR_LOG_FILE_NAME)
}
}
impl EthereumNode for KitchensinkNode {
#[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))]
fn execute_transaction(
&self,
transaction: alloy::rpc::types::TransactionRequest,
@@ -276,6 +361,7 @@ impl EthereumNode for KitchensinkNode {
receipt
}
#[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))]
fn trace_transaction(
&self,
transaction: TransactionReceipt,
@@ -300,6 +386,7 @@ impl EthereumNode for KitchensinkNode {
}))
}
#[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))]
fn state_diff(&self, transaction: TransactionReceipt) -> anyhow::Result<DiffMode> {
match self
.trace_transaction(transaction)?
@@ -310,6 +397,7 @@ impl EthereumNode for KitchensinkNode {
}
}
#[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))]
fn fetch_add_nonce(&self, address: Address) -> anyhow::Result<u64> {
let url = self.rpc_url.clone();
let wallet = self.wallet.clone();
@@ -329,6 +417,7 @@ impl Node for KitchensinkNode {
let kitchensink_directory = config.directory().join(Self::BASE_DIRECTORY);
let id = NODE_COUNT.fetch_add(1, Ordering::SeqCst);
let base_directory = kitchensink_directory.join(id.to_string());
let logs_directory = base_directory.join(Self::LOGS_DIRECTORY);
Self {
id,
@@ -337,30 +426,54 @@ impl Node for KitchensinkNode {
rpc_url: String::new(),
wallet: config.wallet(),
base_directory,
logs_directory,
process_substrate: None,
process_proxy: None,
nonces: Mutex::new(HashMap::new()),
// We know that we only need to be storing 4 files so we can specify that when creating
// the vector. It's the stdout and stderr of the substrate-node and the eth-rpc.
logs_file_to_flush: Vec::with_capacity(4),
}
}
#[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))]
fn connection_string(&self) -> String {
self.rpc_url.clone()
}
fn shutdown(mut self) -> anyhow::Result<()> {
#[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))]
fn shutdown(&mut self) -> anyhow::Result<()> {
// Terminate the processes in a graceful manner to allow for the output to be flushed.
if let Some(mut child) = self.process_proxy.take() {
let _ = child.kill();
child
.kill()
.map_err(|error| anyhow::anyhow!("Failed to kill the proxy process: {error:?}"))?;
}
if let Some(mut child) = self.process_substrate.take() {
let _ = child.kill();
child.kill().map_err(|error| {
anyhow::anyhow!("Failed to kill the substrate process: {error:?}")
})?;
}
// Flushing the files that we're using for keeping the logs before shutdown.
for file in self.logs_file_to_flush.iter_mut() {
file.flush()?
}
// Remove the node's database so that subsequent runs do not run on the same database. We
// ignore the error just in case the directory didn't exist in the first place and therefore
// there's nothing to be deleted.
let _ = remove_dir_all(self.base_directory.join(Self::DATA_DIRECTORY));
Ok(())
}
#[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))]
fn spawn(&mut self, genesis: String) -> anyhow::Result<()> {
self.init(&genesis)?.spawn_process()
}
#[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))]
fn version(&self) -> anyhow::Result<String> {
let output = Command::new(&self.substrate_binary)
.arg("--version")
@@ -375,13 +488,9 @@ impl Node for KitchensinkNode {
}
impl Drop for KitchensinkNode {
#[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))]
fn drop(&mut self) {
if let Some(mut child) = self.process_proxy.take() {
let _ = child.kill();
}
if let Some(mut child) = self.process_substrate.take() {
let _ = child.kill();
}
self.shutdown().expect("Failed to shutdown")
}
}