mirror of
https://github.com/pezkuwichain/revive-differential-tests.git
synced 2026-06-13 05:51:01 +00:00
Clean up the process flow for nodes
This commit is contained in:
+93
-184
@@ -1,9 +1,8 @@
|
||||
use std::{
|
||||
fs::{File, OpenOptions, create_dir_all, remove_dir_all},
|
||||
io::{BufRead, Write},
|
||||
path::{Path, PathBuf},
|
||||
fs::{create_dir_all, remove_dir_all},
|
||||
path::PathBuf,
|
||||
pin::Pin,
|
||||
process::{Child, Command, Stdio},
|
||||
process::{Command, Stdio},
|
||||
sync::{
|
||||
Arc,
|
||||
atomic::{AtomicU32, Ordering},
|
||||
@@ -47,7 +46,12 @@ use revive_dt_config::*;
|
||||
use revive_dt_node_interaction::EthereumNode;
|
||||
use tracing::instrument;
|
||||
|
||||
use crate::{Node, common::FallbackGasFiller, constants::INITIAL_BALANCE};
|
||||
use crate::{
|
||||
Node,
|
||||
common::FallbackGasFiller,
|
||||
constants::INITIAL_BALANCE,
|
||||
process::{Process, ProcessReadinessWaitBehavior},
|
||||
};
|
||||
|
||||
static NODE_COUNT: AtomicU32 = AtomicU32::new(0);
|
||||
|
||||
@@ -63,12 +67,11 @@ pub struct SubstrateNode {
|
||||
rpc_url: String,
|
||||
base_directory: PathBuf,
|
||||
logs_directory: PathBuf,
|
||||
process_substrate: Option<Child>,
|
||||
process_proxy: Option<Child>,
|
||||
substrate_process: Option<Process>,
|
||||
eth_proxy_process: Option<Process>,
|
||||
wallet: Arc<EthereumWallet>,
|
||||
nonce_manager: CachedNonceManager,
|
||||
chain_id_filler: ChainIdFiller,
|
||||
logs_file_to_flush: Vec<File>,
|
||||
}
|
||||
|
||||
impl SubstrateNode {
|
||||
@@ -85,12 +88,6 @@ impl SubstrateNode {
|
||||
const SUBSTRATE_LOG_ENV: &str = "error,evm=debug,sc_rpc_server=info,runtime::revive=debug";
|
||||
const PROXY_LOG_ENV: &str = "info,eth-rpc=debug";
|
||||
|
||||
const SUBSTRATE_STDOUT_LOG_FILE_NAME: &str = "node_stdout.log";
|
||||
const SUBSTRATE_STDERR_LOG_FILE_NAME: &str = "node_stderr.log";
|
||||
|
||||
const PROXY_STDOUT_LOG_FILE_NAME: &str = "proxy_stdout.log";
|
||||
const PROXY_STDERR_LOG_FILE_NAME: &str = "proxy_stderr.log";
|
||||
|
||||
pub const KITCHENSINK_EXPORT_CHAINSPEC_COMMAND: &str = "export-chain-spec";
|
||||
pub const REVIVE_DEV_NODE_EXPORT_CHAINSPEC_COMMAND: &str = "build-spec";
|
||||
|
||||
@@ -121,12 +118,11 @@ impl SubstrateNode {
|
||||
rpc_url: String::new(),
|
||||
base_directory,
|
||||
logs_directory,
|
||||
process_substrate: None,
|
||||
process_proxy: None,
|
||||
substrate_process: None,
|
||||
eth_proxy_process: None,
|
||||
wallet: wallet.clone(),
|
||||
chain_id_filler: Default::default(),
|
||||
nonce_manager: Default::default(),
|
||||
logs_file_to_flush: Vec::with_capacity(4),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -213,120 +209,88 @@ impl SubstrateNode {
|
||||
let substrate_rpc_port = Self::BASE_SUBSTRATE_RPC_PORT + self.id as u16;
|
||||
let proxy_rpc_port = Self::BASE_PROXY_RPC_PORT + self.id as u16;
|
||||
|
||||
self.rpc_url = format!("http://127.0.0.1:{proxy_rpc_port}");
|
||||
|
||||
let chainspec_path = self.base_directory.join(Self::CHAIN_SPEC_JSON_FILE);
|
||||
|
||||
// This is the `OpenOptions` that we wish to use for all of the log files that we will be
|
||||
// opening in this method. We need to construct it in this way to:
|
||||
// 1. Be consistent
|
||||
// 2. Less verbose and more dry
|
||||
// 3. Because the builder pattern uses mutable references so we need to get around that.
|
||||
let open_options = {
|
||||
let mut options = OpenOptions::new();
|
||||
options.create(true).truncate(true).write(true);
|
||||
options
|
||||
};
|
||||
self.rpc_url = format!("http://127.0.0.1:{proxy_rpc_port}");
|
||||
|
||||
// Start Substrate node
|
||||
let substrate_stdout_logs_file = open_options
|
||||
.clone()
|
||||
.open(self.substrate_stdout_log_file_path())
|
||||
.context("Failed to open substrate stdout logs file")?;
|
||||
let substrate_stderr_logs_file = open_options
|
||||
.clone()
|
||||
.open(self.substrate_stderr_log_file_path())
|
||||
.context("Failed to open substrate stderr logs file")?;
|
||||
let node_binary_path = self.node_binary.as_path();
|
||||
self.process_substrate = Command::new(node_binary_path)
|
||||
.arg("--dev")
|
||||
.arg("--chain")
|
||||
.arg(chainspec_path)
|
||||
.arg("--base-path")
|
||||
.arg(&self.base_directory)
|
||||
.arg("--rpc-port")
|
||||
.arg(substrate_rpc_port.to_string())
|
||||
.arg("--name")
|
||||
.arg(format!("revive-substrate-{}", self.id))
|
||||
.arg("--force-authoring")
|
||||
.arg("--rpc-methods")
|
||||
.arg("Unsafe")
|
||||
.arg("--rpc-cors")
|
||||
.arg("all")
|
||||
.arg("--rpc-max-connections")
|
||||
.arg(u32::MAX.to_string())
|
||||
.env("RUST_LOG", Self::SUBSTRATE_LOG_ENV)
|
||||
.stdout(
|
||||
substrate_stdout_logs_file
|
||||
.try_clone()
|
||||
.context("Failed to clone substrate stdout log file handle")?,
|
||||
)
|
||||
.stderr(
|
||||
substrate_stderr_logs_file
|
||||
.try_clone()
|
||||
.context("Failed to clone substrate stderr log file handle")?,
|
||||
)
|
||||
.spawn()
|
||||
.context("Failed to spawn Substrate node process")?
|
||||
.into();
|
||||
let substrate_process = Process::new(
|
||||
"node",
|
||||
self.logs_directory.as_path(),
|
||||
self.node_binary.as_path(),
|
||||
|command, stdout_file, stderr_file| {
|
||||
command
|
||||
.arg("--dev")
|
||||
.arg("--chain")
|
||||
.arg(chainspec_path)
|
||||
.arg("--base-path")
|
||||
.arg(&self.base_directory)
|
||||
.arg("--rpc-port")
|
||||
.arg(substrate_rpc_port.to_string())
|
||||
.arg("--name")
|
||||
.arg(format!("revive-substrate-{}", self.id))
|
||||
.arg("--force-authoring")
|
||||
.arg("--rpc-methods")
|
||||
.arg("Unsafe")
|
||||
.arg("--rpc-cors")
|
||||
.arg("all")
|
||||
.arg("--rpc-max-connections")
|
||||
.arg(u32::MAX.to_string())
|
||||
.env("RUST_LOG", Self::SUBSTRATE_LOG_ENV)
|
||||
.stdout(stdout_file)
|
||||
.stderr(stderr_file);
|
||||
},
|
||||
ProcessReadinessWaitBehavior::TimeBoundedWaitFunction {
|
||||
max_wait_duration: Duration::from_secs(30),
|
||||
check_function: Box::new(|_, stderr_line| match stderr_line {
|
||||
Some(line) => Ok(line.contains(Self::SUBSTRATE_READY_MARKER)),
|
||||
None => Ok(false),
|
||||
}),
|
||||
},
|
||||
);
|
||||
match substrate_process {
|
||||
Ok(process) => self.substrate_process = Some(process),
|
||||
Err(err) => {
|
||||
tracing::error!(?err, "Failed to start substrate, shutting down gracefully");
|
||||
self.shutdown()
|
||||
.context("Failed to gracefully shutdown after substrate start error")?;
|
||||
return Err(err);
|
||||
}
|
||||
}
|
||||
|
||||
// Give the node a moment to boot
|
||||
if let Err(error) = Self::wait_ready(
|
||||
self.substrate_stderr_log_file_path().as_path(),
|
||||
Self::SUBSTRATE_READY_MARKER,
|
||||
Duration::from_secs(60),
|
||||
) {
|
||||
self.shutdown()
|
||||
.context("Failed to gracefully shutdown after Substrate start error")?;
|
||||
return Err(error);
|
||||
};
|
||||
|
||||
let eth_proxy_stdout_logs_file = open_options
|
||||
.clone()
|
||||
.open(self.proxy_stdout_log_file_path())
|
||||
.context("Failed to open eth-proxy stdout logs file")?;
|
||||
let eth_proxy_stderr_logs_file = open_options
|
||||
.open(self.proxy_stderr_log_file_path())
|
||||
.context("Failed to open eth-proxy stderr logs file")?;
|
||||
self.process_proxy = Command::new(&self.eth_proxy_binary)
|
||||
.arg("--dev")
|
||||
.arg("--rpc-port")
|
||||
.arg(proxy_rpc_port.to_string())
|
||||
.arg("--node-rpc-url")
|
||||
.arg(format!("ws://127.0.0.1:{substrate_rpc_port}"))
|
||||
.arg("--rpc-max-connections")
|
||||
.arg(u32::MAX.to_string())
|
||||
.env("RUST_LOG", Self::PROXY_LOG_ENV)
|
||||
.stdout(
|
||||
eth_proxy_stdout_logs_file
|
||||
.try_clone()
|
||||
.context("Failed to clone eth-proxy stdout log file handle")?,
|
||||
)
|
||||
.stderr(
|
||||
eth_proxy_stderr_logs_file
|
||||
.try_clone()
|
||||
.context("Failed to clone eth-proxy stderr log file handle")?,
|
||||
)
|
||||
.spawn()
|
||||
.context("Failed to spawn eth-proxy process")?
|
||||
.into();
|
||||
|
||||
if let Err(error) = Self::wait_ready(
|
||||
self.proxy_stderr_log_file_path().as_path(),
|
||||
Self::ETH_PROXY_READY_MARKER,
|
||||
Duration::from_secs(60),
|
||||
) {
|
||||
self.shutdown()
|
||||
.context("Failed to gracefully shutdown after eth-proxy start error")?;
|
||||
return Err(error);
|
||||
};
|
||||
|
||||
self.logs_file_to_flush.extend([
|
||||
substrate_stdout_logs_file,
|
||||
substrate_stderr_logs_file,
|
||||
eth_proxy_stdout_logs_file,
|
||||
eth_proxy_stderr_logs_file,
|
||||
]);
|
||||
let eth_proxy_process = Process::new(
|
||||
"proxy",
|
||||
self.logs_directory.as_path(),
|
||||
self.eth_proxy_binary.as_path(),
|
||||
|command, stdout_file, stderr_file| {
|
||||
command
|
||||
.arg("--dev")
|
||||
.arg("--rpc-port")
|
||||
.arg(proxy_rpc_port.to_string())
|
||||
.arg("--node-rpc-url")
|
||||
.arg(format!("ws://127.0.0.1:{substrate_rpc_port}"))
|
||||
.arg("--rpc-max-connections")
|
||||
.arg(u32::MAX.to_string())
|
||||
.env("RUST_LOG", Self::PROXY_LOG_ENV)
|
||||
.stdout(stdout_file)
|
||||
.stderr(stderr_file);
|
||||
},
|
||||
ProcessReadinessWaitBehavior::TimeBoundedWaitFunction {
|
||||
max_wait_duration: Duration::from_secs(30),
|
||||
check_function: Box::new(|_, stderr_line| match stderr_line {
|
||||
Some(line) => Ok(line.contains(Self::ETH_PROXY_READY_MARKER)),
|
||||
None => Ok(false),
|
||||
}),
|
||||
},
|
||||
);
|
||||
match eth_proxy_process {
|
||||
Ok(process) => self.eth_proxy_process = Some(process),
|
||||
Err(err) => {
|
||||
tracing::error!(?err, "Failed to start eth proxy, shutting down gracefully");
|
||||
self.shutdown()
|
||||
.context("Failed to gracefully shutdown after eth proxy start error")?;
|
||||
return Err(err);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -356,29 +320,6 @@ impl SubstrateNode {
|
||||
account_id.to_ss58check()
|
||||
}
|
||||
|
||||
fn wait_ready(logs_file_path: &Path, marker: &str, timeout: Duration) -> anyhow::Result<()> {
|
||||
let start_time = std::time::Instant::now();
|
||||
let logs_file = OpenOptions::new()
|
||||
.read(true)
|
||||
.write(false)
|
||||
.append(false)
|
||||
.truncate(false)
|
||||
.open(logs_file_path)?;
|
||||
|
||||
let mut lines = std::io::BufReader::new(logs_file).lines();
|
||||
loop {
|
||||
if let Some(Ok(line)) = lines.next() {
|
||||
if line.contains(marker) {
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
if start_time.elapsed() > timeout {
|
||||
anyhow::bail!("Timeout waiting for process readiness: {marker}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn eth_rpc_version(&self) -> anyhow::Result<String> {
|
||||
let output = Command::new(&self.eth_proxy_binary)
|
||||
.arg("--version")
|
||||
@@ -391,24 +332,6 @@ impl SubstrateNode {
|
||||
Ok(String::from_utf8_lossy(&output).trim().to_string())
|
||||
}
|
||||
|
||||
fn substrate_stdout_log_file_path(&self) -> PathBuf {
|
||||
self.logs_directory
|
||||
.join(Self::SUBSTRATE_STDOUT_LOG_FILE_NAME)
|
||||
}
|
||||
|
||||
fn substrate_stderr_log_file_path(&self) -> PathBuf {
|
||||
self.logs_directory
|
||||
.join(Self::SUBSTRATE_STDERR_LOG_FILE_NAME)
|
||||
}
|
||||
|
||||
fn proxy_stdout_log_file_path(&self) -> PathBuf {
|
||||
self.logs_directory.join(Self::PROXY_STDOUT_LOG_FILE_NAME)
|
||||
}
|
||||
|
||||
fn proxy_stderr_log_file_path(&self) -> PathBuf {
|
||||
self.logs_directory.join(Self::PROXY_STDERR_LOG_FILE_NAME)
|
||||
}
|
||||
|
||||
async fn provider(
|
||||
&self,
|
||||
) -> anyhow::Result<
|
||||
@@ -673,22 +596,8 @@ impl<F: TxFiller<ReviveNetwork>, P: Provider<ReviveNetwork>> ResolverApi
|
||||
|
||||
impl Node for SubstrateNode {
|
||||
fn shutdown(&mut self) -> anyhow::Result<()> {
|
||||
// Terminate the processes in a graceful manner to allow for the output to be flushed.
|
||||
if let Some(mut child) = self.process_proxy.take() {
|
||||
child
|
||||
.kill()
|
||||
.map_err(|error| anyhow::anyhow!("Failed to kill the proxy process: {error:?}"))?;
|
||||
}
|
||||
if let Some(mut child) = self.process_substrate.take() {
|
||||
child.kill().map_err(|error| {
|
||||
anyhow::anyhow!("Failed to kill the Substrate process: {error:?}")
|
||||
})?;
|
||||
}
|
||||
|
||||
// Flushing the files that we're using for keeping the logs before shutdown.
|
||||
for file in self.logs_file_to_flush.iter_mut() {
|
||||
file.flush()?
|
||||
}
|
||||
drop(self.substrate_process.take());
|
||||
drop(self.eth_proxy_process.take());
|
||||
|
||||
// Remove the node's database so that subsequent runs do not run on the same database. We
|
||||
// ignore the error just in case the directory didn't exist in the first place and therefore
|
||||
|
||||
Reference in New Issue
Block a user