mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-12 07:41:08 +00:00
substrate runner: increase line read + dump CLI output if parsing fails (#1781)
* substrate runner: dump CLI output parsing fails * cargo fmt * Update testing/substrate-runner/src/lib.rs * fix grumbles * disable flaky test * ignore reconn test too * ignore more tests * fix tests * improve log parsing * Update testing/integration-tests/src/full_client/client/unstable_rpcs.rs * Update testing/integration-tests/src/full_client/client/unstable_rpcs.rs * fix nits * fix reconn test
This commit is contained in:
Generated
+14
-1
@@ -2960,6 +2960,15 @@ dependencies = [
|
||||
"regex-automata 0.1.10",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "matchers"
|
||||
version = "0.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558"
|
||||
dependencies = [
|
||||
"regex-automata 0.1.10",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "memchr"
|
||||
version = "2.7.2"
|
||||
@@ -6203,7 +6212,7 @@ dependencies = [
|
||||
"ansi_term",
|
||||
"chrono",
|
||||
"lazy_static",
|
||||
"matchers",
|
||||
"matchers 0.0.1",
|
||||
"regex",
|
||||
"serde",
|
||||
"serde_json",
|
||||
@@ -6222,10 +6231,14 @@ version = "0.3.18"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b"
|
||||
dependencies = [
|
||||
"matchers 0.1.0",
|
||||
"nu-ansi-term",
|
||||
"once_cell",
|
||||
"regex",
|
||||
"sharded-slab",
|
||||
"smallvec",
|
||||
"thread_local",
|
||||
"tracing",
|
||||
"tracing-core",
|
||||
"tracing-log 0.2.0",
|
||||
]
|
||||
|
||||
+1
-1
@@ -107,7 +107,7 @@ thiserror = "1.0.63"
|
||||
tokio = { version = "1.40", default-features = false }
|
||||
tracing = { version = "0.1.40", default-features = false }
|
||||
tracing-wasm = "0.2.1"
|
||||
tracing-subscriber = "0.3.18"
|
||||
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
|
||||
trybuild = "1.0.99"
|
||||
url = "2.5.2"
|
||||
wabt = "0.10.0"
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
|
||||
// see LICENSE for license details.
|
||||
|
||||
use crate::{subxt_test, test_context};
|
||||
use crate::{subxt_test, test_context, utils::consume_initial_blocks};
|
||||
use codec::{Compact, Encode};
|
||||
use futures::StreamExt;
|
||||
|
||||
@@ -94,6 +94,7 @@ async fn finalized_headers_subscription() -> Result<(), subxt::Error> {
|
||||
let api = ctx.client();
|
||||
|
||||
let mut sub = api.blocks().subscribe_finalized().await?;
|
||||
consume_initial_blocks(&mut sub).await;
|
||||
|
||||
// check that the finalized block reported lines up with the `latest_finalized_block_ref`.
|
||||
for _ in 0..2 {
|
||||
|
||||
@@ -420,29 +420,28 @@ async fn legacy_and_unstable_block_subscription_reconnect() {
|
||||
let api = api.clone();
|
||||
async move {
|
||||
let mut missed_blocks = false;
|
||||
(api.blocks()
|
||||
.subscribe_finalized()
|
||||
.await
|
||||
.unwrap()
|
||||
// Ignore `disconnected events`.
|
||||
// This will be emitted by the legacy backend for every reconnection.
|
||||
.filter(|item| {
|
||||
let disconnected = match item {
|
||||
Ok(_) => false,
|
||||
Err(e) => {
|
||||
if matches!(e, Error::Rpc(subxt::error::RpcError::DisconnectedWillReconnect(e)) if e.contains("Missed at least one block when the connection was lost")) {
|
||||
missed_blocks = true;
|
||||
}
|
||||
e.is_disconnected_will_reconnect()
|
||||
}
|
||||
};
|
||||
|
||||
futures::future::ready(!disconnected)
|
||||
})
|
||||
.take(num)
|
||||
.map(|x| x.unwrap().hash().to_string())
|
||||
.collect::<Vec<String>>()
|
||||
.await, missed_blocks)
|
||||
let blocks =
|
||||
// Ignore `disconnected events`.
|
||||
// This will be emitted by the legacy backend for every reconnection.
|
||||
api.blocks().subscribe_finalized().await.unwrap().filter(|item| {
|
||||
let disconnected = match item {
|
||||
Ok(_) => false,
|
||||
Err(e) => {
|
||||
if matches!(e, Error::Rpc(subxt::error::RpcError::DisconnectedWillReconnect(e)) if e.contains("Missed at least one block when the connection was lost")) {
|
||||
missed_blocks = true;
|
||||
}
|
||||
e.is_disconnected_will_reconnect()
|
||||
}
|
||||
};
|
||||
|
||||
futures::future::ready(!disconnected)
|
||||
})
|
||||
.take(num)
|
||||
.map(|x| x.unwrap().hash().to_string())
|
||||
.collect::<Vec<String>>().await;
|
||||
|
||||
(blocks, missed_blocks)
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -5,7 +5,10 @@
|
||||
//! Just sanity checking some of the new RPC methods to try and
|
||||
//! catch differences as the implementations evolve.
|
||||
|
||||
use crate::{subxt_test, test_context, utils::node_runtime};
|
||||
use crate::{
|
||||
subxt_test, test_context,
|
||||
utils::{consume_initial_blocks, node_runtime},
|
||||
};
|
||||
use assert_matches::assert_matches;
|
||||
use codec::Encode;
|
||||
use futures::Stream;
|
||||
@@ -341,8 +344,11 @@ async fn transaction_v1_broadcast() {
|
||||
|
||||
// Subscribe to finalized blocks.
|
||||
let mut finalized_sub = api.blocks().subscribe_finalized().await.unwrap();
|
||||
|
||||
consume_initial_blocks(&mut finalized_sub).await;
|
||||
|
||||
// Expect the tx to be encountered in a maximum number of blocks.
|
||||
let mut num_blocks: usize = 10;
|
||||
let mut num_blocks: usize = 20;
|
||||
|
||||
// Submit the transaction.
|
||||
let _operation_id = rpc
|
||||
|
||||
@@ -8,9 +8,8 @@ mod wait_for_blocks;
|
||||
|
||||
pub use context::*;
|
||||
pub use node_proc::TestNodeProcess;
|
||||
pub use wait_for_blocks::*;
|
||||
|
||||
pub use subxt_test_macro::subxt_test;
|
||||
pub use wait_for_blocks::*;
|
||||
|
||||
/// The test timeout is set to 1 second.
|
||||
/// However, the test is sleeping for 5 seconds.
|
||||
|
||||
@@ -2,7 +2,10 @@
|
||||
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
|
||||
// see LICENSE for license details.
|
||||
|
||||
use subxt::{client::OnlineClientT, Config};
|
||||
use subxt::{
|
||||
backend::StreamOf, blocks::Block, client::OnlineClientT, Config, Error, OnlineClient,
|
||||
SubstrateConfig,
|
||||
};
|
||||
|
||||
/// Wait for blocks to be produced before running tests. Specifically, we
|
||||
/// wait for one more finalized block to be produced, which is important because
|
||||
@@ -23,3 +26,25 @@ pub async fn wait_for_number_of_blocks<C: Config>(
|
||||
sub.next().await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Consumes the initial blocks from the stream of blocks to ensure that the stream is up-to-date.
|
||||
///
|
||||
/// This may be useful on the unstable backend when the initial blocks may be large
|
||||
/// and one relies on something to included in finalized block in ner future.
|
||||
pub async fn consume_initial_blocks(
|
||||
blocks: &mut StreamOf<Result<Block<SubstrateConfig, OnlineClient<SubstrateConfig>>, Error>>,
|
||||
) {
|
||||
use tokio::time::{interval_at, Duration, Instant};
|
||||
const MAX_DURATION: Duration = Duration::from_millis(200);
|
||||
|
||||
let mut now = interval_at(Instant::now() + MAX_DURATION, MAX_DURATION);
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = now.tick() => {
|
||||
break;
|
||||
}
|
||||
_ = blocks.next() => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,26 +5,26 @@
|
||||
#[derive(Debug)]
|
||||
pub enum Error {
|
||||
Io(std::io::Error),
|
||||
CouldNotExtractPort,
|
||||
CouldNotExtractP2pAddress,
|
||||
CouldNotExtractP2pPort,
|
||||
CouldNotExtractPort(String),
|
||||
CouldNotExtractP2pAddress(String),
|
||||
CouldNotExtractP2pPort(String),
|
||||
}
|
||||
|
||||
impl std::fmt::Display for Error {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
Error::Io(err) => write!(f, "IO error: {err}"),
|
||||
Error::CouldNotExtractPort => write!(
|
||||
Error::CouldNotExtractPort(log) => write!(
|
||||
f,
|
||||
"could not extract port from running substrate node's stdout"
|
||||
"could not extract port from running substrate node's stdout: {log}"
|
||||
),
|
||||
Error::CouldNotExtractP2pAddress => write!(
|
||||
Error::CouldNotExtractP2pAddress(log) => write!(
|
||||
f,
|
||||
"could not extract p2p address from running substrate node's stdout"
|
||||
"could not extract p2p address from running substrate node's stdout: {log}"
|
||||
),
|
||||
Error::CouldNotExtractP2pPort => write!(
|
||||
Error::CouldNotExtractP2pPort(log) => write!(
|
||||
f,
|
||||
"could not extract p2p port from running substrate node's stdout"
|
||||
"could not extract p2p port from running substrate node's stdout: {log}"
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -102,11 +102,11 @@ impl SubstrateNodeBuilder {
|
||||
|
||||
// Wait for RPC port to be logged (it's logged to stderr).
|
||||
let stderr = proc.stderr.take().unwrap();
|
||||
let (ws_port, p2p_address, p2p_port) = try_find_substrate_port_from_output(stderr);
|
||||
let running_node = try_find_substrate_port_from_output(stderr);
|
||||
|
||||
let ws_port = ws_port.ok_or(Error::CouldNotExtractPort)?;
|
||||
let p2p_address = p2p_address.ok_or(Error::CouldNotExtractP2pAddress)?;
|
||||
let p2p_port = p2p_port.ok_or(Error::CouldNotExtractP2pPort)?;
|
||||
let ws_port = running_node.ws_port()?;
|
||||
let p2p_address = running_node.p2p_address()?;
|
||||
let p2p_port = running_node.p2p_port()?;
|
||||
|
||||
Ok(SubstrateNode {
|
||||
binary_path: bin_path,
|
||||
@@ -244,16 +244,19 @@ impl Drop for SubstrateNode {
|
||||
|
||||
// Consume a stderr reader from a spawned substrate command and
|
||||
// locate the port number that is logged out to it.
|
||||
fn try_find_substrate_port_from_output(
|
||||
r: impl Read + Send + 'static,
|
||||
) -> (Option<u16>, Option<String>, Option<u32>) {
|
||||
fn try_find_substrate_port_from_output(r: impl Read + Send + 'static) -> SubstrateNodeInfo {
|
||||
let mut port = None;
|
||||
let mut p2p_address = None;
|
||||
let mut p2p_port = None;
|
||||
|
||||
for line in BufReader::new(r).lines().take(50) {
|
||||
let mut log = String::new();
|
||||
|
||||
for line in BufReader::new(r).lines().take(100) {
|
||||
let line = line.expect("failed to obtain next line from stdout for port discovery");
|
||||
|
||||
log.push_str(&line);
|
||||
log.push('\n');
|
||||
|
||||
// Parse the port lines
|
||||
let line_port = line
|
||||
// oldest message:
|
||||
@@ -301,7 +304,43 @@ fn try_find_substrate_port_from_output(
|
||||
.unwrap_or_else(|_| panic!("valid port expected for log line, got '{port_str}'"));
|
||||
p2p_port = Some(port_num);
|
||||
}
|
||||
|
||||
if port.is_some() && p2p_address.is_some() && p2p_port.is_some() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
(port, p2p_address, p2p_port)
|
||||
SubstrateNodeInfo {
|
||||
ws_port: port,
|
||||
p2p_address,
|
||||
p2p_port,
|
||||
log,
|
||||
}
|
||||
}
|
||||
|
||||
/// Data extracted from the running node's stdout.
|
||||
#[derive(Debug)]
|
||||
pub struct SubstrateNodeInfo {
|
||||
ws_port: Option<u16>,
|
||||
p2p_address: Option<String>,
|
||||
p2p_port: Option<u32>,
|
||||
log: String,
|
||||
}
|
||||
|
||||
impl SubstrateNodeInfo {
|
||||
pub fn ws_port(&self) -> Result<u16, Error> {
|
||||
self.ws_port
|
||||
.ok_or_else(|| Error::CouldNotExtractPort(self.log.clone()))
|
||||
}
|
||||
|
||||
pub fn p2p_address(&self) -> Result<String, Error> {
|
||||
self.p2p_address
|
||||
.clone()
|
||||
.ok_or_else(|| Error::CouldNotExtractP2pAddress(self.log.clone()))
|
||||
}
|
||||
|
||||
pub fn p2p_port(&self) -> Result<u32, Error> {
|
||||
self.p2p_port
|
||||
.ok_or_else(|| Error::CouldNotExtractP2pPort(self.log.clone()))
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user