Tests: parse port from substrate binary output to avoid races (#501)

* parse port from substrate binary output to avoid races

* cargo fmt

* clippy

* remove "rt" feature from tokio

Co-authored-by: Niklas Adolfsson <niklasadolfsson1@gmail.com>

Co-authored-by: Niklas Adolfsson <niklasadolfsson1@gmail.com>
This commit is contained in:
James Wilson
2022-04-06 14:57:14 +01:00
committed by GitHub
parent f7e9491b35
commit 10627fb715
3 changed files with 48 additions and 79 deletions
+47 -77
View File
@@ -20,10 +20,12 @@ use std::{
OsStr,
OsString,
},
net::TcpListener,
io::{
BufRead,
BufReader,
Read,
},
process,
thread,
time,
};
use subxt::{
Client,
@@ -79,7 +81,6 @@ where
pub struct TestNodeProcessBuilder {
node_path: OsString,
authority: Option<AccountKeyring>,
scan_port_range: bool,
}
impl TestNodeProcessBuilder {
@@ -90,7 +91,6 @@ impl TestNodeProcessBuilder {
Self {
node_path: node_path.as_ref().into(),
authority: None,
scan_port_range: false,
}
}
@@ -100,21 +100,20 @@ impl TestNodeProcessBuilder {
self
}
/// Enable port scanning to scan for open ports.
///
/// Allows spawning multiple node instances for tests to run in parallel.
pub fn scan_for_open_ports(&mut self) -> &mut Self {
self.scan_port_range = true;
self
}
/// Spawn the substrate node at the given path, and wait for rpc to be initialized.
pub async fn spawn<R>(&self) -> Result<TestNodeProcess<R>, String>
where
R: Config,
{
let mut cmd = process::Command::new(&self.node_path);
cmd.env("RUST_LOG", "error").arg("--dev").arg("--tmp");
cmd.env("RUST_LOG", "info")
.arg("--dev")
.arg("--tmp")
.stdout(process::Stdio::piped())
.stderr(process::Stdio::piped())
.arg("--port=0")
.arg("--rpc-port=0")
.arg("--ws-port=0");
if let Some(authority) = self.authority {
let authority = format!("{:?}", authority);
@@ -122,21 +121,6 @@ impl TestNodeProcessBuilder {
cmd.arg(arg);
}
let ws_port = if self.scan_port_range {
let (p2p_port, http_port, ws_port) = next_open_port()
.ok_or_else(|| "No available ports in the given port range".to_owned())?;
cmd.arg(format!("--port={}", p2p_port));
cmd.arg(format!("--rpc-port={}", http_port));
cmd.arg(format!("--ws-port={}", ws_port));
ws_port
} else {
// the default Websockets port
9944
};
let ws_url = format!("ws://127.0.0.1:{}", ws_port);
let mut proc = cmd.spawn().map_err(|e| {
format!(
"Error spawning substrate node '{}': {}",
@@ -144,37 +128,18 @@ impl TestNodeProcessBuilder {
e
)
})?;
// wait for rpc to be initialized
const MAX_ATTEMPTS: u32 = 6;
let mut attempts = 1;
let mut wait_secs = 1;
let client = loop {
thread::sleep(time::Duration::from_secs(wait_secs));
log::info!(
"Connecting to contracts enabled node, attempt {}/{}",
attempts,
MAX_ATTEMPTS
);
let result = ClientBuilder::new().set_url(ws_url.clone()).build().await;
match result {
Ok(client) => break Ok(client),
Err(err) => {
if attempts < MAX_ATTEMPTS {
attempts += 1;
wait_secs *= 2; // backoff
continue
}
break Err(err)
}
}
};
// Wait for RPC port to be logged (it's logged to stderr):
let stderr = proc.stderr.take().unwrap();
let ws_port = find_substrate_port_from_output(stderr);
let ws_url = format!("ws://127.0.0.1:{}", ws_port);
// Connect to the node with a subxt client:
let client = ClientBuilder::new().set_url(ws_url.clone()).build().await;
match client {
Ok(client) => Ok(TestNodeProcess { proc, client }),
Err(err) => {
let err = format!(
"Failed to connect to node rpc at {} after {} attempts: {}",
ws_url, attempts, err
);
let err = format!("Failed to connect to node rpc at {}: {}", ws_url, err);
log::error!("{}", err);
proc.kill().map_err(|e| {
format!("Error killing substrate process '{}': {}", proc.id(), e)
@@ -185,25 +150,30 @@ impl TestNodeProcessBuilder {
}
}
/// Returns the next set of 3 open ports.
///
/// Returns None if there are not 3 open ports available.
fn next_open_port() -> Option<(u16, u16, u16)> {
// Ask the kernel to allocate a port.
let next_port = || {
match TcpListener::bind(("127.0.0.1", 0)) {
Ok(listener) => {
if let Ok(address) = listener.local_addr() {
Some(address.port())
} else {
None
}
}
Err(_) => None,
}
};
// Consume a stderr reader from a spawned substrate command and
// locate the port number that is logged out to it.
fn find_substrate_port_from_output(r: impl Read + Send + 'static) -> u16 {
BufReader::new(r)
.lines()
.find_map(|line| {
let line = line
.expect("failed to obtain next line from stdout for port discovery");
// The ports allocated should be different, unless in
// the unlikely case that the system has less than 3 available ports.
Some((next_port()?, next_port()?, next_port()?))
// does the line contain our port (we expect this specific output from substrate).
let line_end = match line.rsplit_once("Listening for new connections on 127.0.0.1:") {
None => return None,
Some((_, after)) => after
};
// trim non-numeric chars from the end of the port part of the line.
let port_str = line_end.trim_end_matches(|b| !('0'..='9').contains(&b));
// expect to have a number here (the chars after '127.0.0.1:') and parse them into a u16.
let port_num = port_str
.parse()
.unwrap_or_else(|_| panic!("valid port expected on 'Listening for new connections' line, got '{port_str}'"));
Some(port_num)
})
.expect("We should find a port before the reader ends")
}