Change names from ZombieNet to Zombienet and switch from Command to Process

This commit is contained in:
Marios Christou
2025-10-03 16:03:04 +03:00
parent 23c3da4956
commit 2e3a81cdd3
5 changed files with 76 additions and 139 deletions
+49 -107
View File
@@ -27,9 +27,8 @@
//! the full paths in your configuration.
use std::{
fs::{OpenOptions, create_dir_all, remove_dir_all},
io::BufRead,
path::{Path, PathBuf},
fs::{create_dir_all, remove_dir_all},
path::PathBuf,
pin::Pin,
process::{Command, Stdio},
sync::{
@@ -68,7 +67,11 @@ use tracing::instrument;
use zombienet_sdk::{LocalFileSystem, NetworkConfigBuilder, NetworkConfigExt};
use crate::{
Node, common::FallbackGasFiller, constants::INITIAL_BALANCE, substrate::ReviveNetwork,
Node,
common::FallbackGasFiller,
constants::INITIAL_BALANCE,
process::{Process, ProcessReadinessWaitBehavior},
substrate::ReviveNetwork,
};
static NODE_COUNT: AtomicU32 = AtomicU32::new(0);
@@ -89,7 +92,7 @@ pub struct ZombieNode {
chain_id_filler: ChainIdFiller,
network_config: Option<zombienet_sdk::NetworkConfig>,
network: Option<zombienet_sdk::Network<LocalFileSystem>>,
eth_rpc_process: Option<std::process::Child>,
eth_rpc_process: Option<Process>,
node_rpc_port: Option<u16>,
}
@@ -103,8 +106,6 @@ impl ZombieNode {
const ETH_RPC_BASE_PORT: u16 = 8545;
const ETH_RPC_READY_MARKER: &str = "Running JSON-RPC server";
const ETH_RPC_STDOUT_LOG_FILE_NAME: &str = "eth_rpc_stdout.log";
const ETH_RPC_STDERR_LOG_FILE_NAME: &str = "eth_rpc_stderr.log";
const EXPORT_CHAINSPEC_COMMAND: &str = "build-spec";
const CHAIN_SPEC_JSON_FILE: &str = "template_chainspec.json";
@@ -189,9 +190,11 @@ impl ZombieNode {
}
fn spawn_process(&mut self) -> anyhow::Result<()> {
let Some(network_config) = self.network_config.clone() else {
anyhow::bail!("Node not initialized, call init() first");
};
let network_config = self
.network_config
.clone()
.context("Node not initialized, call init() first")?;
let rt = tokio::runtime::Runtime::new().unwrap();
let network = rt.block_on(async {
network_config
@@ -202,103 +205,53 @@ impl ZombieNode {
tracing::debug!("Zombienet network is up");
let open_options = {
let mut options = OpenOptions::new();
options.create(true).truncate(true).write(true);
options
};
let node_url = format!("ws://localhost:{}", self.node_rpc_port.unwrap());
let eth_rpc_port = Self::ETH_RPC_BASE_PORT + self.id as u16;
let eth_rpc_stdout_log = format!(
"{}-{}.log",
self.eth_rpc_stdout_log_file_path().display(),
eth_rpc_port
);
let eth_rpc_stderr_log = format!(
"{}-{}.log",
self.eth_rpc_stderr_log_file_path().display(),
eth_rpc_port
let eth_rpc_process = Process::new(
"proxy",
self.logs_directory.as_path(),
self.eth_proxy_binary.as_path(),
|command, stdout_file, stderr_file| {
command
.arg("--node-rpc-url")
.arg(node_url)
.arg("--rpc-cors")
.arg("all")
.arg("--rpc-max-connections")
.arg(u32::MAX.to_string())
.arg("--rpc-port")
.arg(eth_rpc_port.to_string())
.stdout(stdout_file)
.stderr(stderr_file);
},
ProcessReadinessWaitBehavior::TimeBoundedWaitFunction {
max_wait_duration: Duration::from_secs(30),
check_function: Box::new(|_, stderr_line| match stderr_line {
Some(line) => Ok(line.contains(Self::ETH_RPC_READY_MARKER)),
None => Ok(false),
}),
},
);
let eth_rpc_stdout_logs_file = open_options
.clone()
.open(&eth_rpc_stdout_log)
.context("Failed to open eth-rpc stdout logs file")?;
let eth_rpc_stderr_logs_file = open_options
.open(&eth_rpc_stderr_log)
.context("Failed to open eth-rpc stderr logs file")?;
let child = Command::new("eth-rpc")
.arg("--node-rpc-url")
.arg(node_url)
.arg("--rpc-cors")
.arg("all")
.arg("--rpc-max-connections")
.arg(u32::MAX.to_string())
.arg("--rpc-port")
.arg(eth_rpc_port.to_string())
.stdout(
eth_rpc_stdout_logs_file
.try_clone()
.context("Failed to clone eth-rpc stdout logs file")?,
)
.stderr(
eth_rpc_stderr_logs_file
.try_clone()
.context("Failed to clone eth-rpc stderr logs file")?,
)
.spawn()
.context("Failed to spawn eth-rpc process")?;
// Give the node a moment to boot
let ready_result = Self::wait_ready(
Path::new(&eth_rpc_stderr_log),
Self::ETH_RPC_READY_MARKER,
Duration::from_secs(60),
);
if let Err(error) = ready_result {
tracing::error!("eth-rpc failed to start: {error:?}");
self.shutdown()
.context("Failed to gracefully shutdown after Substrate start error")?;
return Err(error);
};
match eth_rpc_process {
Ok(process) => self.eth_rpc_process = Some(process),
Err(err) => {
tracing::error!(?err, "Failed to start eth proxy, shutting down gracefully");
self.shutdown()
.context("Failed to gracefully shutdown after eth proxy start error")?;
return Err(err);
}
}
tracing::info!("eth-rpc is up");
tracing::debug!("Monitoring eth-rpc stderr logs at: {}", eth_rpc_stderr_log);
tracing::debug!("Monitoring eth-rpc stdout logs at: {}", eth_rpc_stdout_log);
self.connection_string = format!("http://localhost:{}", eth_rpc_port);
self.eth_rpc_process = Some(child);
self.network = Some(network);
Ok(())
}
fn wait_ready(logs_file_path: &Path, marker: &str, timeout: Duration) -> anyhow::Result<()> {
let start_time = std::time::Instant::now();
let logs_file = std::fs::OpenOptions::new()
.read(true)
.write(false)
.append(false)
.truncate(false)
.open(logs_file_path)?;
let mut lines = std::io::BufReader::new(logs_file).lines();
loop {
if let Some(Ok(line)) = lines.next() {
if line.contains(marker) {
return Ok(());
}
}
if start_time.elapsed() > timeout {
anyhow::bail!("Timeout waiting for process readiness: {marker}");
}
}
}
fn prepare_chainspec(
&mut self,
template_chainspec_path: PathBuf,
@@ -434,14 +387,6 @@ impl ZombieNode {
.await
.context("Failed to connect to parachain Ethereum RPC")
}
fn eth_rpc_stdout_log_file_path(&self) -> PathBuf {
self.logs_directory.join(Self::ETH_RPC_STDOUT_LOG_FILE_NAME)
}
fn eth_rpc_stderr_log_file_path(&self) -> PathBuf {
self.logs_directory.join(Self::ETH_RPC_STDERR_LOG_FILE_NAME)
}
}
impl EthereumNode for ZombieNode {
@@ -688,9 +633,7 @@ impl<F: TxFiller<ReviveNetwork>, P: Provider<ReviveNetwork>> ResolverApi
impl Node for ZombieNode {
fn shutdown(&mut self) -> anyhow::Result<()> {
// Kill the eth_rpc process
if let Some(mut child) = self.eth_rpc_process.take() {
child.kill().context("Failed to kill eth-rpc process")?;
}
drop(self.eth_rpc_process.take());
// Destroy the network
if let Some(network) = self.network.take() {
@@ -753,8 +696,7 @@ mod tests {
use tokio::sync::OnceCell;
pub fn test_config() -> TestExecutionContext {
let mut context = TestExecutionContext::default();
context.zombienet_configuration.use_zombienet = true;
let context = TestExecutionContext::default();
context
}