diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index e36b8fab48..e3c9131a86 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -192,6 +192,16 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "695579f0f2520f3774bb40461e5adb066459d4e0af4d59d20175484fb8e9edf1" +[[package]] +name = "async-attributes" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3203e79f4dd9bdda415ed03cf14dae5a2bf775c683a00f94e9cd1faf0f596e5" +dependencies = [ + "quote", + "syn", +] + [[package]] name = "async-channel" version = "1.5.1" @@ -293,6 +303,7 @@ version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9f06685bad74e0570f5213741bea82158279a4103d988e57bfada11ad230341" dependencies = [ + "async-attributes", "async-channel", "async-global-executor", "async-io", @@ -3869,6 +3880,7 @@ name = "node-cli" version = "2.0.0" dependencies = [ "assert_cmd", + "async-std", "frame-benchmarking-cli", "frame-support", "frame-system", @@ -3918,6 +3930,7 @@ dependencies = [ "sc-transaction-pool", "serde", "serde_json", + "soketto", "sp-authority-discovery", "sp-consensus", "sp-consensus-babe", @@ -7558,6 +7571,7 @@ dependencies = [ "tokio 0.2.25", "tracing", "tracing-futures", + "tracing-log", "tracing-subscriber", "wasm-timer", ] diff --git a/substrate/bin/node-template/node/src/service.rs b/substrate/bin/node-template/node/src/service.rs index f565156a64..552705f299 100644 --- a/substrate/bin/node-template/node/src/service.rs +++ b/substrate/bin/node-template/node/src/service.rs @@ -11,6 +11,7 @@ pub use sc_executor::NativeExecutor; use sp_consensus_aura::sr25519::{AuthorityPair as AuraPair}; use sc_finality_grandpa::SharedVoterState; use sc_keystore::LocalKeystore; +use sc_telemetry::TelemetrySpan; // Our native executor instance. native_executor_instance!( @@ -162,6 +163,9 @@ pub fn new_full(mut config: Configuration) -> Result }) }; + let telemetry_span = TelemetrySpan::new(); + let _telemetry_span_entered = telemetry_span.enter(); + let (_rpc_handlers, telemetry_connection_notifier) = sc_service::spawn_tasks( sc_service::SpawnTasksParams { network: network.clone(), @@ -176,6 +180,7 @@ pub fn new_full(mut config: Configuration) -> Result network_status_sinks, system_rpc_tx, config, + telemetry_span: Some(telemetry_span.clone()), }, )?; @@ -312,6 +317,9 @@ pub fn new_light(mut config: Configuration) -> Result ); } + let telemetry_span = TelemetrySpan::new(); + let _telemetry_span_entered = telemetry_span.enter(); + sc_service::spawn_tasks(sc_service::SpawnTasksParams { remote_blockchain: Some(backend.remote_blockchain()), transaction_pool, @@ -325,6 +333,7 @@ pub fn new_light(mut config: Configuration) -> Result network, network_status_sinks, system_rpc_tx, + telemetry_span: Some(telemetry_span.clone()), })?; network_starter.start_network(); diff --git a/substrate/bin/node/cli/Cargo.toml b/substrate/bin/node/cli/Cargo.toml index 112c874573..6162726c89 100644 --- a/substrate/bin/node/cli/Cargo.toml +++ b/substrate/bin/node/cli/Cargo.toml @@ -124,6 +124,8 @@ nix = "0.19" serde_json = "1.0" regex = "1" platforms = "1.1" +async-std = { version = "1.6.5", features = ["attributes"] } +soketto = "0.4.2" [build-dependencies] structopt = { version = "0.3.8", optional = true } diff --git a/substrate/bin/node/cli/src/service.rs b/substrate/bin/node/cli/src/service.rs index ca647c5834..df3802d3d8 100644 --- a/substrate/bin/node/cli/src/service.rs +++ b/substrate/bin/node/cli/src/service.rs @@ -34,7 +34,7 @@ use sp_runtime::traits::Block as BlockT; use futures::prelude::*; use sc_client_api::{ExecutorProvider, RemoteBackend}; use node_executor::Executor; -use sc_telemetry::TelemetryConnectionNotifier; +use sc_telemetry::{TelemetryConnectionNotifier, TelemetrySpan}; type FullClient = sc_service::TFullClient; type FullBackend = sc_service::TFullBackend; @@ -226,6 +226,9 @@ pub fn new_full_base( let enable_grandpa = !config.disable_grandpa; let prometheus_registry = config.prometheus_registry().cloned(); + let telemetry_span = TelemetrySpan::new(); + let _telemetry_span_entered = telemetry_span.enter(); + let (_rpc_handlers, telemetry_connection_notifier) = sc_service::spawn_tasks( sc_service::SpawnTasksParams { config, @@ -240,6 +243,7 @@ pub fn new_full_base( remote_blockchain: None, network_status_sinks: network_status_sinks.clone(), system_rpc_tx, + telemetry_span: Some(telemetry_span.clone()), }, )?; @@ -433,6 +437,9 @@ pub fn new_light_base(mut config: Configuration) -> Result<( let rpc_extensions = node_rpc::create_light(light_deps); + let telemetry_span = TelemetrySpan::new(); + let _telemetry_span_entered = telemetry_span.enter(); + let (rpc_handlers, telemetry_connection_notifier) = sc_service::spawn_tasks(sc_service::SpawnTasksParams { on_demand: Some(on_demand), @@ -444,6 +451,7 @@ pub fn new_light_base(mut config: Configuration) -> Result<( config, backend, network_status_sinks, system_rpc_tx, network: network.clone(), task_manager: &mut task_manager, + telemetry_span: Some(telemetry_span.clone()), })?; Ok(( diff --git a/substrate/bin/node/cli/tests/telemetry.rs b/substrate/bin/node/cli/tests/telemetry.rs new file mode 100644 index 0000000000..0b90f56a03 --- /dev/null +++ b/substrate/bin/node/cli/tests/telemetry.rs @@ -0,0 +1,102 @@ +// This file is part of Substrate. + +// Copyright (C) 2021 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use assert_cmd::cargo::cargo_bin; +use nix::sys::signal::{kill, Signal::SIGINT}; +use nix::unistd::Pid; +use std::convert::TryInto; +use std::process; + +pub mod common; +pub mod websocket_server; + +#[async_std::test] +async fn telemetry_works() { + let config = websocket_server::Config { + capacity: 1, + max_frame_size: 1048 * 1024, + send_buffer_len: 32, + bind_address: "127.0.0.1:0".parse().unwrap(), + }; + let mut server = websocket_server::WsServer::new(config).await.unwrap(); + + let addr = server.local_addr().unwrap(); + + let server_task = async_std::task::spawn(async move { + loop { + use websocket_server::Event; + match server.next_event().await { + // New connection on the listener. + Event::ConnectionOpen { address } => { + println!("New connection from {:?}", address); + server.accept(); + } + + // Received a message from a connection. + Event::BinaryFrame { message, .. } => { + let json: serde_json::Value = serde_json::from_slice(&message).unwrap(); + let object = json + .as_object() + .unwrap() + .get("payload") + .unwrap() + .as_object() + .unwrap(); + if matches!(object.get("best"), Some(serde_json::Value::String(_))) { + break; + } + } + + Event::TextFrame { .. } => panic!("Got a TextFrame over the socket, this is a bug"), + + // Connection has been closed. + Event::ConnectionError { .. } => {} + } + } + }); + + let mut substrate = process::Command::new(cargo_bin("substrate")); + + let mut substrate = substrate + .args(&["--dev", "--tmp", "--telemetry-url"]) + .arg(format!("ws://{} 10", addr)) + .stdout(process::Stdio::piped()) + .stderr(process::Stdio::piped()) + .stdin(process::Stdio::null()) + .spawn() + .unwrap(); + + server_task.await; + + assert!( + substrate.try_wait().unwrap().is_none(), + "the process should still be running" + ); + + // Stop the process + kill(Pid::from_raw(substrate.id().try_into().unwrap()), SIGINT).unwrap(); + assert!(common::wait_for(&mut substrate, 40) + .map(|x| x.success()) + .unwrap_or_default()); + + let output = substrate.wait_with_output().unwrap(); + + println!("{}", String::from_utf8(output.stdout).unwrap()); + eprintln!("{}", String::from_utf8(output.stderr).unwrap()); + assert!(output.status.success()); +} diff --git a/substrate/bin/node/cli/tests/websocket_server.rs b/substrate/bin/node/cli/tests/websocket_server.rs new file mode 100644 index 0000000000..a8af1c3599 --- /dev/null +++ b/substrate/bin/node/cli/tests/websocket_server.rs @@ -0,0 +1,281 @@ +// This file is part of Substrate. + +// Copyright (C) 2021 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use async_std::net::{TcpListener, TcpStream}; +use core::pin::Pin; +use futures::prelude::*; +use soketto::handshake::{server::Response, Server}; +use std::{io, net::SocketAddr}; + +/// Configuration for a [`WsServer`]. +pub struct Config { + /// IP address to try to bind to. + pub bind_address: SocketAddr, + + /// Maximum size, in bytes, of a frame sent by the remote. + /// + /// Since the messages are entirely buffered before being returned, a maximum value is + /// necessary in order to prevent malicious clients from sending huge frames that would + /// occupy a lot of memory. + pub max_frame_size: usize, + + /// Number of pending messages to buffer up for sending before the socket is considered + /// unresponsive. + pub send_buffer_len: usize, + + /// Pre-allocated capacity for the list of connections. + pub capacity: usize, +} + +/// Identifier for a connection with regard to a [`WsServer`]. +/// +/// After a connection has been closed, its [`ConnectionId`] might be reused. +#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash)] +pub struct ConnectionId(u64); + +/// A WebSocket message. +pub enum Message { + Text(String), + Binary(Vec), +} + +/// WebSockets listening socket and list of open connections. +pub struct WsServer { + /// Value passed through [`Config::max_frame_size`]. + max_frame_size: usize, + + /// Endpoint for incoming TCP sockets. + listener: TcpListener, + + /// Pending incoming connection to accept. Accepted by calling [`WsServer::accept`]. + pending_incoming: Option, + + /// List of TCP connections that are currently negotiating the WebSocket handshake. + /// + /// The output can be an error if the handshake fails. + negotiating: stream::FuturesUnordered< + Pin< + Box< + dyn Future, Box>> + + Send, + >, + >, + >, + + /// List of streams of incoming messages for all connections. + incoming_messages: stream::SelectAll< + Pin>> + Send>>, + >, + + /// Tasks dedicated to closing sockets that have been rejected. + rejected_sockets: stream::FuturesUnordered + Send>>>, +} + +impl WsServer { + /// Try opening a TCP listening socket. + /// + /// Returns an error if the listening socket fails to open. + pub async fn new(config: Config) -> Result { + let listener = TcpListener::bind(config.bind_address).await?; + + Ok(WsServer { + max_frame_size: config.max_frame_size, + listener, + pending_incoming: None, + negotiating: stream::FuturesUnordered::new(), + incoming_messages: stream::SelectAll::new(), + rejected_sockets: stream::FuturesUnordered::new(), + }) + } + + /// Address of the local TCP listening socket, as provided by the operating system. + pub fn local_addr(&self) -> Result { + self.listener.local_addr() + } + + /// Accepts the pending connection. + /// + /// Either [`WsServer::accept`] or [`WsServer::reject`] must be called after a + /// [`Event::ConnectionOpen`] event is returned. + /// + /// # Panic + /// + /// Panics if no connection is pending. + /// + pub fn accept(&mut self) { + let pending_incoming = self.pending_incoming.take().expect("no pending socket"); + + self.negotiating.push(Box::pin(async move { + let mut server = Server::new(pending_incoming); + + let websocket_key = match server.receive_request().await { + Ok(req) => req.into_key(), + Err(err) => return Err(Box::new(err) as Box<_>), + }; + + match server + .send_response(&{ + Response::Accept { + key: &websocket_key, + protocol: None, + } + }) + .await + { + Ok(()) => {} + Err(err) => return Err(Box::new(err) as Box<_>), + }; + + Ok(server) + })); + } + + /// Reject the pending connection. + /// + /// Either [`WsServer::accept`] or [`WsServer::reject`] must be called after a + /// [`Event::ConnectionOpen`] event is returned. + /// + /// # Panic + /// + /// Panics if no connection is pending. + /// + pub fn reject(&mut self) { + let _ = self.pending_incoming.take().expect("no pending socket"); + } + + /// Returns the next event happening on the server. + pub async fn next_event(&mut self) -> Event { + loop { + futures::select! { + // Only try to fetch a new incoming connection if none is pending. + socket = { + let listener = &self.listener; + let has_pending = self.pending_incoming.is_some(); + async move { + if !has_pending { + listener.accept().await + } else { + loop { futures::pending!() } + } + } + }.fuse() => { + let (socket, address) = match socket { + Ok(s) => s, + Err(_) => continue, + }; + debug_assert!(self.pending_incoming.is_none()); + self.pending_incoming = Some(socket); + return Event::ConnectionOpen { address }; + }, + + result = self.negotiating.select_next_some() => { + let server = match result { + Ok(s) => s, + Err(error) => return Event::ConnectionError { + error, + }, + }; + + let (mut _sender, receiver) = { + let mut builder = server.into_builder(); + builder.set_max_frame_size(self.max_frame_size); + builder.set_max_message_size(self.max_frame_size); + builder.finish() + }; + + // Spawn a task dedicated to receiving messages from the socket. + self.incoming_messages.push({ + // Turn `receiver` into a stream of received packets. + let socket_packets = stream::unfold((receiver, Vec::new()), move |(mut receiver, mut buf)| async { + buf.clear(); + let ret = match receiver.receive_data(&mut buf).await { + Ok(soketto::Data::Text(len)) => String::from_utf8(buf[..len].to_vec()) + .map(Message::Text) + .map_err(|err| Box::new(err) as Box<_>), + Ok(soketto::Data::Binary(len)) => Ok(buf[..len].to_vec()) + .map(Message::Binary), + Err(err) => Err(Box::new(err) as Box<_>), + }; + Some((ret, (receiver, buf))) + }); + + Box::pin(socket_packets.map(move |msg| (msg))) + }); + }, + + result = self.incoming_messages.select_next_some() => { + let message = match result { + Ok(m) => m, + Err(error) => return Event::ConnectionError { + error, + }, + }; + + match message { + Message::Text(message) => { + return Event::TextFrame { + message, + } + } + Message::Binary(message) => { + return Event::BinaryFrame { + message, + } + } + } + }, + + _ = self.rejected_sockets.select_next_some() => { + } + } + } + } +} + +/// Event that has happened on a [`WsServer`]. +#[derive(Debug)] +pub enum Event { + /// A new TCP connection has arrived on the listening socket. + /// + /// The connection *must* be accepted or rejected using [`WsServer::accept`] or + /// [`WsServer::reject`]. + /// No other [`Event::ConnectionOpen`] event will be generated until the current pending + /// connection has been either accepted or rejected. + ConnectionOpen { + /// Address of the remote, as provided by the operating system. + address: SocketAddr, + }, + + /// An error has happened on a connection. The connection is now closed and its + /// [`ConnectionId`] is now invalid. + ConnectionError { error: Box }, + + /// A text frame has been received on a connection. + TextFrame { + /// Message sent by the remote. Its content is entirely decided by the client, and + /// nothing must be assumed about the validity of this message. + message: String, + }, + + /// A text frame has been received on a connection. + BinaryFrame { + /// Message sent by the remote. Its content is entirely decided by the client, and + /// nothing must be assumed about the validity of this message. + message: Vec, + }, +} diff --git a/substrate/client/cli/src/config.rs b/substrate/client/cli/src/config.rs index f81a64bf15..748e3b1012 100644 --- a/substrate/client/cli/src/config.rs +++ b/substrate/client/cli/src/config.rs @@ -33,7 +33,7 @@ use sc_service::config::{ TaskExecutor, TelemetryEndpoints, TransactionPoolOptions, WasmExecutionMethod, }; use sc_service::{ChainSpec, TracingReceiver, KeepBlocks, TransactionStorageMode}; -use sc_telemetry::{TelemetryHandle, TelemetrySpan}; +use sc_telemetry::TelemetryHandle; use sc_tracing::logging::LoggerBuilder; use std::net::SocketAddr; use std::path::PathBuf; @@ -494,7 +494,6 @@ pub trait CliConfiguration: Sized { .transpose()? // Don't initialise telemetry if `telemetry_endpoints` == Some([]) .filter(|x| !x.is_empty()); - let telemetry_span = telemetry_endpoints.as_ref().map(|_| TelemetrySpan::new()); let unsafe_pruning = self .import_params() @@ -534,7 +533,6 @@ pub trait CliConfiguration: Sized { rpc_cors: self.rpc_cors(is_dev)?, prometheus_config: self.prometheus_config(DCV::prometheus_listen_port())?, telemetry_endpoints, - telemetry_span, telemetry_external_transport: self.telemetry_external_transport()?, default_heap_pages: self.default_heap_pages()?, offchain_worker: self.offchain_worker(&role)?, diff --git a/substrate/client/service/Cargo.toml b/substrate/client/service/Cargo.toml index 0a9be763b2..c6119695ac 100644 --- a/substrate/client/service/Cargo.toml +++ b/substrate/client/service/Cargo.toml @@ -92,3 +92,4 @@ grandpa-primitives = { version = "3.0.0", package = "sp-finality-grandpa", path tokio = { version = "0.2.25", default-features = false } async-std = { version = "1.6.5", default-features = false } tracing-subscriber = "0.2.15" +tracing-log = "0.1.1" diff --git a/substrate/client/service/src/builder.rs b/substrate/client/service/src/builder.rs index 486f816676..916929bff6 100644 --- a/substrate/client/service/src/builder.rs +++ b/substrate/client/service/src/builder.rs @@ -56,6 +56,7 @@ use sc_telemetry::{ telemetry, ConnectionMessage, TelemetryConnectionNotifier, + TelemetrySpan, SUBSTRATE_INFO, }; use sp_transaction_pool::MaintainedTransactionPool; @@ -308,7 +309,7 @@ pub fn new_full_parts( let task_manager = { let registry = config.prometheus_config.as_ref().map(|cfg| &cfg.registry); - TaskManager::new(config.task_executor.clone(), registry, config.telemetry_span.clone())? + TaskManager::new(config.task_executor.clone(), registry)? }; let executor = NativeExecutor::::new( @@ -377,7 +378,7 @@ pub fn new_light_parts( let keystore_container = KeystoreContainer::new(&config.keystore)?; let task_manager = { let registry = config.prometheus_config.as_ref().map(|cfg| &cfg.registry); - TaskManager::new(config.task_executor.clone(), registry, config.telemetry_span.clone())? + TaskManager::new(config.task_executor.clone(), registry)? }; let executor = NativeExecutor::::new( @@ -491,6 +492,10 @@ pub struct SpawnTasksParams<'a, TBl: BlockT, TCl, TExPool, TRpc, Backend> { pub network_status_sinks: NetworkStatusSinks, /// A Sender for RPC requests. pub system_rpc_tx: TracingUnboundedSender>, + /// Telemetry span. + /// + /// This span needs to be entered **before** calling [`spawn_tasks()`]. + pub telemetry_span: Option, } /// Build a shared offchain workers instance. @@ -569,6 +574,7 @@ pub fn spawn_tasks( network, network_status_sinks, system_rpc_tx, + telemetry_span, } = params; let chain_info = client.usage_info().chain; @@ -581,6 +587,7 @@ pub fn spawn_tasks( let telemetry_connection_notifier = init_telemetry( &mut config, + telemetry_span, network.clone(), client.clone(), ); @@ -681,10 +688,11 @@ async fn transaction_notifications( fn init_telemetry>( config: &mut Configuration, + telemetry_span: Option, network: Arc::Hash>>, client: Arc, ) -> Option { - let telemetry_span = config.telemetry_span.clone()?; + let telemetry_span = telemetry_span?; let endpoints = config.telemetry_endpoints.clone()?; let genesis_hash = client.block_hash(Zero::zero()).ok().flatten().unwrap_or_default(); let connection_message = ConnectionMessage { diff --git a/substrate/client/service/src/config.rs b/substrate/client/service/src/config.rs index 1e316c37dc..4f0d426bdb 100644 --- a/substrate/client/service/src/config.rs +++ b/substrate/client/service/src/config.rs @@ -101,10 +101,6 @@ pub struct Configuration { /// This is a handle to a `TelemetryWorker` instance. It is used to initialize the telemetry for /// a substrate node. pub telemetry_handle: Option, - /// Telemetry span. - /// - /// This span is entered for every background task spawned using the TaskManager. - pub telemetry_span: Option, /// The default number of 64KB pages to allocate for Wasm execution pub default_heap_pages: Option, /// Should offchain workers be executed. diff --git a/substrate/client/service/src/task_manager/mod.rs b/substrate/client/service/src/task_manager/mod.rs index 9a1fd15952..652e5d4439 100644 --- a/substrate/client/service/src/task_manager/mod.rs +++ b/substrate/client/service/src/task_manager/mod.rs @@ -24,7 +24,7 @@ use log::{debug, error}; use futures::{ Future, FutureExt, StreamExt, future::{select, Either, BoxFuture, join_all, try_join_all, pending}, - sink::SinkExt, task::{Context, Poll}, + sink::SinkExt, }; use prometheus_endpoint::{ exponential_buckets, register, @@ -34,43 +34,11 @@ use prometheus_endpoint::{ use sp_utils::mpsc::{TracingUnboundedSender, TracingUnboundedReceiver, tracing_unbounded}; use tracing_futures::Instrument; use crate::{config::{TaskExecutor, TaskType, JoinFuture}, Error}; -use sc_telemetry::TelemetrySpan; mod prometheus_future; #[cfg(test)] mod tests; -/// A wrapper around a `[Option]` and a [`Future`]. -/// -/// The telemetry in Substrate uses a span to identify the telemetry context. The span "infrastructure" -/// is provided by the tracing-crate. Now it is possible to have your own spans as well. To support -/// this with the [`TaskManager`] we have this wrapper. This wrapper enters the telemetry span every -/// time the future is polled and polls the inner future. So, the inner future can still have its -/// own span attached and we get our telemetry span ;) -struct WithTelemetrySpan { - span: Option, - inner: T, -} - -impl WithTelemetrySpan { - fn new(span: Option, inner: T) -> Self { - Self { - span, - inner, - } - } -} - -impl + Unpin> Future for WithTelemetrySpan { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll { - let span = self.span.clone(); - let _enter = span.as_ref().map(|s| s.enter()); - Pin::new(&mut self.inner).poll(ctx) - } -} - /// An handle for spawning tasks in the service. #[derive(Clone)] pub struct SpawnTaskHandle { @@ -78,7 +46,6 @@ pub struct SpawnTaskHandle { executor: TaskExecutor, metrics: Option, task_notifier: TracingUnboundedSender, - telemetry_span: Option, } impl SpawnTaskHandle { @@ -155,11 +122,7 @@ impl SpawnTaskHandle { } }; - let future = future.in_current_span().boxed(); - let join_handle = self.executor.spawn( - WithTelemetrySpan::new(self.telemetry_span.clone(), future).boxed(), - task_type, - ); + let join_handle = self.executor.spawn(future.in_current_span().boxed(), task_type); let mut task_notifier = self.task_notifier.clone(); self.executor.spawn( @@ -266,8 +229,6 @@ pub struct TaskManager { /// terminates and gracefully shutdown. Also ends the parent `future()` if a child's essential /// task fails. children: Vec, - /// A `TelemetrySpan` used to enter the telemetry span when a task is spawned. - telemetry_span: Option, } impl TaskManager { @@ -276,7 +237,6 @@ impl TaskManager { pub(super) fn new( executor: TaskExecutor, prometheus_registry: Option<&Registry>, - telemetry_span: Option, ) -> Result { let (signal, on_exit) = exit_future::signal(); @@ -305,7 +265,6 @@ impl TaskManager { task_notifier, completion_future, children: Vec::new(), - telemetry_span, }) } @@ -316,7 +275,6 @@ impl TaskManager { executor: self.executor.clone(), metrics: self.metrics.clone(), task_notifier: self.task_notifier.clone(), - telemetry_span: self.telemetry_span.clone(), } } diff --git a/substrate/client/service/src/task_manager/tests.rs b/substrate/client/service/src/task_manager/tests.rs index 257f7db198..762348ba9f 100644 --- a/substrate/client/service/src/task_manager/tests.rs +++ b/substrate/client/service/src/task_manager/tests.rs @@ -20,10 +20,14 @@ use crate::config::TaskExecutor; use crate::task_manager::TaskManager; use futures::{future::FutureExt, pin_mut, select}; use parking_lot::Mutex; -use std::{any::Any, sync::Arc, time::Duration}; -use tracing_subscriber::{layer::{SubscriberExt, Context}, Layer}; -use tracing::{subscriber::Subscriber, span::{Attributes, Id, Record, Span}, event::Event}; use sc_telemetry::TelemetrySpan; +use std::{any::Any, env, sync::Arc, time::Duration}; +use tracing::{event::Event, span::Id, subscriber::Subscriber}; +use tracing_subscriber::{ + layer::{Context, SubscriberExt}, + registry::LookupSpan, + Layer, +}; #[derive(Clone, Debug)] struct DropTester(Arc>); @@ -83,7 +87,7 @@ async fn run_background_task_blocking(duration: Duration, _keep_alive: impl Any) } fn new_task_manager(task_executor: TaskExecutor) -> TaskManager { - TaskManager::new(task_executor, None, None).unwrap() + TaskManager::new(task_executor, None).unwrap() } #[test] @@ -315,92 +319,92 @@ fn ensure_task_manager_future_continues_when_childs_not_essential_task_fails() { } struct TestLayer { - spans_entered: Arc>>, - spans: Arc>>, + spans_found: Arc>>>, } -impl Layer for TestLayer { - fn new_span(&self, attrs: &Attributes<'_>, id: &Id, _ctx: Context) { - self.spans.lock().insert(id.clone(), attrs.metadata().name().to_string()); +impl Layer for TestLayer +where + S: Subscriber + for<'a> LookupSpan<'a>, +{ + fn on_event(&self, _: &Event<'_>, ctx: Context) { + let mut spans_found = self.spans_found.lock(); + + if spans_found.is_some() { + panic!("on_event called multiple times"); + } + + *spans_found = Some(ctx.scope().map(|x| x.id()).collect()); } - - fn on_record(&self, _: &Id, _: &Record<'_>, _: Context) {} - - fn on_event(&self, _: &Event<'_>, _: Context) {} - - fn on_enter(&self, span: &Id, _: Context) { - let name = self.spans.lock().get(span).unwrap().clone(); - self.spans_entered.lock().push(name); - } - - fn on_exit(&self, _: &Id, _: Context) {} - - fn on_close(&self, _: Id, _: Context) {} } -type TestSubscriber = tracing_subscriber::layer::Layered< - TestLayer, - tracing_subscriber::fmt::Subscriber ->; - fn setup_subscriber() -> ( - TestSubscriber, - Arc>>, + impl Subscriber + for<'a> LookupSpan<'a>, + Arc>>>, ) { - let spans_entered = Arc::new(Mutex::new(Default::default())); + let spans_found = Arc::new(Mutex::new(Default::default())); let layer = TestLayer { - spans: Arc::new(Mutex::new(Default::default())), - spans_entered: spans_entered.clone(), + spans_found: spans_found.clone(), }; let subscriber = tracing_subscriber::fmt().finish().with(layer); - (subscriber, spans_entered) + (subscriber, spans_found) +} + +/// This is not an actual test, it is used by the `telemetry_span_is_forwarded_to_task` test. +/// The given test will call the test executable and only execute this one test that +/// test that the telemetry span and the prefix span are forwarded correctly. This needs to be done +/// in a separate process to avoid interfering with the other tests. +#[test] +fn subprocess_telemetry_span_is_forwarded_to_task() { + if env::var("SUBPROCESS_TEST").is_err() { + return; + } + + let (subscriber, spans_found) = setup_subscriber(); + tracing_log::LogTracer::init().unwrap(); + let _sub_guard = tracing::subscriber::set_global_default(subscriber); + + let mut runtime = tokio::runtime::Runtime::new().unwrap(); + + let prefix_span = tracing::info_span!("prefix"); + let _enter_prefix_span = prefix_span.enter(); + + let telemetry_span = TelemetrySpan::new(); + let _enter_telemetry_span = telemetry_span.enter(); + + let handle = runtime.handle().clone(); + let task_executor = TaskExecutor::from(move |fut, _| handle.spawn(fut).map(|_| ())); + let task_manager = new_task_manager(task_executor); + + let (sender, receiver) = futures::channel::oneshot::channel(); + + task_manager.spawn_handle().spawn( + "log-something", + async move { + log::info!("boo!"); + sender.send(()).unwrap(); + } + .boxed(), + ); + + runtime.block_on(receiver).unwrap(); + runtime.block_on(task_manager.clean_shutdown()); + + let spans = spans_found.lock().take().unwrap(); + assert_eq!(2, spans.len()); + + assert_eq!(spans[0], prefix_span.id().unwrap()); + assert_eq!(spans[1], telemetry_span.span().id().unwrap()); } #[test] fn telemetry_span_is_forwarded_to_task() { - let (subscriber, spans_entered) = setup_subscriber(); - let _sub_guard = tracing::subscriber::set_global_default(subscriber); - - let telemetry_span = TelemetrySpan::new(); - - let span = tracing::info_span!("test"); - let _enter = span.enter(); - - let mut runtime = tokio::runtime::Runtime::new().unwrap(); - let handle = runtime.handle().clone(); - let task_executor = TaskExecutor::from(move |fut, _| handle.spawn(fut).map(|_| ())); - let task_manager = TaskManager::new(task_executor, None, Some(telemetry_span.clone())).unwrap(); - - let (sender, receiver) = futures::channel::oneshot::channel(); - let spawn_handle = task_manager.spawn_handle(); - - let span = span.clone(); - task_manager.spawn_handle().spawn( - "test", - async move { - assert_eq!(span, Span::current()); - spawn_handle.spawn("test-nested", async move { - assert_eq!(span, Span::current()); - sender.send(()).unwrap(); - }.boxed()); - }.boxed(), - ); - - // We need to leave exit the span here. If tokio is not running with multithreading, this - // would lead to duplicate spans being "active" and forwarding the wrong one. - drop(_enter); - runtime.block_on(receiver).unwrap(); - runtime.block_on(task_manager.clean_shutdown()); - drop(runtime); - - let spans = spans_entered.lock(); - // We entered the telemetry span and the "test" in the future, the nested future and - // the "test" span outside of the future. So, we should have recorded 3 spans. - assert_eq!(5, spans.len()); - - assert_eq!(spans[0], "test"); - assert_eq!(spans[1], telemetry_span.span().metadata().unwrap().name()); - assert_eq!(spans[2], "test"); - assert_eq!(spans[3], telemetry_span.span().metadata().unwrap().name()); - assert_eq!(spans[4], "test"); + let executable = env::current_exe().unwrap(); + let output = std::process::Command::new(executable) + .env("SUBPROCESS_TEST", "1") + .args(&["--nocapture", "subprocess_telemetry_span_is_forwarded_to_task"]) + .output() + .unwrap(); + println!("{}", String::from_utf8(output.stdout).unwrap()); + eprintln!("{}", String::from_utf8(output.stderr).unwrap()); + assert!(output.status.success()); } diff --git a/substrate/client/service/test/src/lib.rs b/substrate/client/service/test/src/lib.rs index d286c945f0..6c99f83d4c 100644 --- a/substrate/client/service/test/src/lib.rs +++ b/substrate/client/service/test/src/lib.rs @@ -268,7 +268,6 @@ fn node_config