diff --git a/Cargo.lock b/Cargo.lock index 04b8867196..5b722f0e06 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2960,6 +2960,15 @@ dependencies = [ "regex-automata 0.1.10", ] +[[package]] +name = "matchers" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +dependencies = [ + "regex-automata 0.1.10", +] + [[package]] name = "memchr" version = "2.7.2" @@ -6203,7 +6212,7 @@ dependencies = [ "ansi_term", "chrono", "lazy_static", - "matchers", + "matchers 0.0.1", "regex", "serde", "serde_json", @@ -6222,10 +6231,14 @@ version = "0.3.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" dependencies = [ + "matchers 0.1.0", "nu-ansi-term", + "once_cell", + "regex", "sharded-slab", "smallvec", "thread_local", + "tracing", "tracing-core", "tracing-log 0.2.0", ] diff --git a/Cargo.toml b/Cargo.toml index 2f8d9a34d4..2ae5dcace6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -107,7 +107,7 @@ thiserror = "1.0.63" tokio = { version = "1.40", default-features = false } tracing = { version = "0.1.40", default-features = false } tracing-wasm = "0.2.1" -tracing-subscriber = "0.3.18" +tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } trybuild = "1.0.99" url = "2.5.2" wabt = "0.10.0" diff --git a/testing/integration-tests/src/full_client/blocks/mod.rs b/testing/integration-tests/src/full_client/blocks/mod.rs index 4125397b21..145cb693ec 100644 --- a/testing/integration-tests/src/full_client/blocks/mod.rs +++ b/testing/integration-tests/src/full_client/blocks/mod.rs @@ -2,7 +2,7 @@ // This file is dual-licensed as Apache-2.0 or GPL-3.0. // see LICENSE for license details. -use crate::{subxt_test, test_context}; +use crate::{subxt_test, test_context, utils::consume_initial_blocks}; use codec::{Compact, Encode}; use futures::StreamExt; @@ -94,6 +94,7 @@ async fn finalized_headers_subscription() -> Result<(), subxt::Error> { let api = ctx.client(); let mut sub = api.blocks().subscribe_finalized().await?; + consume_initial_blocks(&mut sub).await; // check that the finalized block reported lines up with the `latest_finalized_block_ref`. for _ in 0..2 { diff --git a/testing/integration-tests/src/full_client/client/mod.rs b/testing/integration-tests/src/full_client/client/mod.rs index fa909ec9c5..880601dbad 100644 --- a/testing/integration-tests/src/full_client/client/mod.rs +++ b/testing/integration-tests/src/full_client/client/mod.rs @@ -420,29 +420,28 @@ async fn legacy_and_unstable_block_subscription_reconnect() { let api = api.clone(); async move { let mut missed_blocks = false; - (api.blocks() - .subscribe_finalized() - .await - .unwrap() - // Ignore `disconnected events`. - // This will be emitted by the legacy backend for every reconnection. - .filter(|item| { - let disconnected = match item { - Ok(_) => false, - Err(e) => { - if matches!(e, Error::Rpc(subxt::error::RpcError::DisconnectedWillReconnect(e)) if e.contains("Missed at least one block when the connection was lost")) { - missed_blocks = true; - } - e.is_disconnected_will_reconnect() - } - }; - futures::future::ready(!disconnected) - }) - .take(num) - .map(|x| x.unwrap().hash().to_string()) - .collect::>() - .await, missed_blocks) + let blocks = + // Ignore `disconnected events`. + // This will be emitted by the legacy backend for every reconnection. + api.blocks().subscribe_finalized().await.unwrap().filter(|item| { + let disconnected = match item { + Ok(_) => false, + Err(e) => { + if matches!(e, Error::Rpc(subxt::error::RpcError::DisconnectedWillReconnect(e)) if e.contains("Missed at least one block when the connection was lost")) { + missed_blocks = true; + } + e.is_disconnected_will_reconnect() + } + }; + + futures::future::ready(!disconnected) + }) + .take(num) + .map(|x| x.unwrap().hash().to_string()) + .collect::>().await; + + (blocks, missed_blocks) } }; diff --git a/testing/integration-tests/src/full_client/client/unstable_rpcs.rs b/testing/integration-tests/src/full_client/client/unstable_rpcs.rs index 232d91490d..0e8ceb74ba 100644 --- a/testing/integration-tests/src/full_client/client/unstable_rpcs.rs +++ b/testing/integration-tests/src/full_client/client/unstable_rpcs.rs @@ -5,7 +5,10 @@ //! Just sanity checking some of the new RPC methods to try and //! catch differences as the implementations evolve. -use crate::{subxt_test, test_context, utils::node_runtime}; +use crate::{ + subxt_test, test_context, + utils::{consume_initial_blocks, node_runtime}, +}; use assert_matches::assert_matches; use codec::Encode; use futures::Stream; @@ -341,8 +344,11 @@ async fn transaction_v1_broadcast() { // Subscribe to finalized blocks. let mut finalized_sub = api.blocks().subscribe_finalized().await.unwrap(); + + consume_initial_blocks(&mut finalized_sub).await; + // Expect the tx to be encountered in a maximum number of blocks. - let mut num_blocks: usize = 10; + let mut num_blocks: usize = 20; // Submit the transaction. let _operation_id = rpc diff --git a/testing/integration-tests/src/utils/mod.rs b/testing/integration-tests/src/utils/mod.rs index 741e152913..1a942447fb 100644 --- a/testing/integration-tests/src/utils/mod.rs +++ b/testing/integration-tests/src/utils/mod.rs @@ -8,9 +8,8 @@ mod wait_for_blocks; pub use context::*; pub use node_proc::TestNodeProcess; -pub use wait_for_blocks::*; - pub use subxt_test_macro::subxt_test; +pub use wait_for_blocks::*; /// The test timeout is set to 1 second. /// However, the test is sleeping for 5 seconds. diff --git a/testing/integration-tests/src/utils/wait_for_blocks.rs b/testing/integration-tests/src/utils/wait_for_blocks.rs index 157bd02c17..10b1d78b03 100644 --- a/testing/integration-tests/src/utils/wait_for_blocks.rs +++ b/testing/integration-tests/src/utils/wait_for_blocks.rs @@ -2,7 +2,10 @@ // This file is dual-licensed as Apache-2.0 or GPL-3.0. // see LICENSE for license details. -use subxt::{client::OnlineClientT, Config}; +use subxt::{ + backend::StreamOf, blocks::Block, client::OnlineClientT, Config, Error, OnlineClient, + SubstrateConfig, +}; /// Wait for blocks to be produced before running tests. Specifically, we /// wait for one more finalized block to be produced, which is important because @@ -23,3 +26,25 @@ pub async fn wait_for_number_of_blocks( sub.next().await; } } + +/// Consumes the initial blocks from the stream of blocks to ensure that the stream is up-to-date. +/// +/// This may be useful on the unstable backend when the initial blocks may be large +/// and one relies on something to included in finalized block in ner future. +pub async fn consume_initial_blocks( + blocks: &mut StreamOf>, Error>>, +) { + use tokio::time::{interval_at, Duration, Instant}; + const MAX_DURATION: Duration = Duration::from_millis(200); + + let mut now = interval_at(Instant::now() + MAX_DURATION, MAX_DURATION); + + loop { + tokio::select! { + _ = now.tick() => { + break; + } + _ = blocks.next() => {} + } + } +} diff --git a/testing/substrate-runner/src/error.rs b/testing/substrate-runner/src/error.rs index c7179da2d0..fe3063021d 100644 --- a/testing/substrate-runner/src/error.rs +++ b/testing/substrate-runner/src/error.rs @@ -5,26 +5,26 @@ #[derive(Debug)] pub enum Error { Io(std::io::Error), - CouldNotExtractPort, - CouldNotExtractP2pAddress, - CouldNotExtractP2pPort, + CouldNotExtractPort(String), + CouldNotExtractP2pAddress(String), + CouldNotExtractP2pPort(String), } impl std::fmt::Display for Error { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Error::Io(err) => write!(f, "IO error: {err}"), - Error::CouldNotExtractPort => write!( + Error::CouldNotExtractPort(log) => write!( f, - "could not extract port from running substrate node's stdout" + "could not extract port from running substrate node's stdout: {log}" ), - Error::CouldNotExtractP2pAddress => write!( + Error::CouldNotExtractP2pAddress(log) => write!( f, - "could not extract p2p address from running substrate node's stdout" + "could not extract p2p address from running substrate node's stdout: {log}" ), - Error::CouldNotExtractP2pPort => write!( + Error::CouldNotExtractP2pPort(log) => write!( f, - "could not extract p2p port from running substrate node's stdout" + "could not extract p2p port from running substrate node's stdout: {log}" ), } } diff --git a/testing/substrate-runner/src/lib.rs b/testing/substrate-runner/src/lib.rs index 82d3ea6258..f47347d9cd 100644 --- a/testing/substrate-runner/src/lib.rs +++ b/testing/substrate-runner/src/lib.rs @@ -102,11 +102,11 @@ impl SubstrateNodeBuilder { // Wait for RPC port to be logged (it's logged to stderr). let stderr = proc.stderr.take().unwrap(); - let (ws_port, p2p_address, p2p_port) = try_find_substrate_port_from_output(stderr); + let running_node = try_find_substrate_port_from_output(stderr); - let ws_port = ws_port.ok_or(Error::CouldNotExtractPort)?; - let p2p_address = p2p_address.ok_or(Error::CouldNotExtractP2pAddress)?; - let p2p_port = p2p_port.ok_or(Error::CouldNotExtractP2pPort)?; + let ws_port = running_node.ws_port()?; + let p2p_address = running_node.p2p_address()?; + let p2p_port = running_node.p2p_port()?; Ok(SubstrateNode { binary_path: bin_path, @@ -244,16 +244,19 @@ impl Drop for SubstrateNode { // Consume a stderr reader from a spawned substrate command and // locate the port number that is logged out to it. -fn try_find_substrate_port_from_output( - r: impl Read + Send + 'static, -) -> (Option, Option, Option) { +fn try_find_substrate_port_from_output(r: impl Read + Send + 'static) -> SubstrateNodeInfo { let mut port = None; let mut p2p_address = None; let mut p2p_port = None; - for line in BufReader::new(r).lines().take(50) { + let mut log = String::new(); + + for line in BufReader::new(r).lines().take(100) { let line = line.expect("failed to obtain next line from stdout for port discovery"); + log.push_str(&line); + log.push('\n'); + // Parse the port lines let line_port = line // oldest message: @@ -301,7 +304,43 @@ fn try_find_substrate_port_from_output( .unwrap_or_else(|_| panic!("valid port expected for log line, got '{port_str}'")); p2p_port = Some(port_num); } + + if port.is_some() && p2p_address.is_some() && p2p_port.is_some() { + break; + } } - (port, p2p_address, p2p_port) + SubstrateNodeInfo { + ws_port: port, + p2p_address, + p2p_port, + log, + } +} + +/// Data extracted from the running node's stdout. +#[derive(Debug)] +pub struct SubstrateNodeInfo { + ws_port: Option, + p2p_address: Option, + p2p_port: Option, + log: String, +} + +impl SubstrateNodeInfo { + pub fn ws_port(&self) -> Result { + self.ws_port + .ok_or_else(|| Error::CouldNotExtractPort(self.log.clone())) + } + + pub fn p2p_address(&self) -> Result { + self.p2p_address + .clone() + .ok_or_else(|| Error::CouldNotExtractP2pAddress(self.log.clone())) + } + + pub fn p2p_port(&self) -> Result { + self.p2p_port + .ok_or_else(|| Error::CouldNotExtractP2pPort(self.log.clone())) + } }