mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-09 05:57:59 +00:00
Fix telemetry span not entering properly attempt 3 (#8043)
* Fix tracing tests (#8022) * Fix tracing tests The tests were not working properly. 1. Some test was setting a global subscriber, this could lead to racy conditions with other tests. 2. A logging test called `process::exit` which is completly wrong. * Update client/tracing/src/lib.rs Co-authored-by: David <dvdplm@gmail.com> * Review comments Co-authored-by: David <dvdplm@gmail.com> * Fix tracing spans are not being forwarded to spawned task (#8009) * Fix tracing spans are not being forwarded to spawned task There is a bug that tracing spans are not forwarded to spawned task. The problem was that only the telemetry span was forwarded. The solution to this is to use the tracing provided `in_current_span` to capture the current active span and pass the telemetry span explictely. We will now always enter the span when the future is polled. This is essentially the same strategy as tracing is doing with its `Instrumented`, but now extended for our use case with having multiple spans active. * More tests * Proper test for telemetry and prefix span * WIP * Fix test (need to create & enter the span at the same time) * WIP * Remove telemtry_span from sc_service config * CLEANUP * Update comment * Incorrect indent * More meaningful name * Dedent * Naming XD * Attempt to make a more complete test * lint * Missing licenses * Remove user data * CLEANUP * Apply suggestions from code review Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com> * CLEANUP * Apply suggestion * Update bin/node/cli/tests/telemetry.rs Co-authored-by: David <dvdplm@gmail.com> * Wrapping lines Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com> Co-authored-by: David <dvdplm@gmail.com>
This commit is contained in:
Generated
+14
@@ -192,6 +192,16 @@ version = "1.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "695579f0f2520f3774bb40461e5adb066459d4e0af4d59d20175484fb8e9edf1"
|
||||
|
||||
[[package]]
|
||||
name = "async-attributes"
|
||||
version = "1.1.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a3203e79f4dd9bdda415ed03cf14dae5a2bf775c683a00f94e9cd1faf0f596e5"
|
||||
dependencies = [
|
||||
"quote",
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-channel"
|
||||
version = "1.5.1"
|
||||
@@ -293,6 +303,7 @@ version = "1.9.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d9f06685bad74e0570f5213741bea82158279a4103d988e57bfada11ad230341"
|
||||
dependencies = [
|
||||
"async-attributes",
|
||||
"async-channel",
|
||||
"async-global-executor",
|
||||
"async-io",
|
||||
@@ -3869,6 +3880,7 @@ name = "node-cli"
|
||||
version = "2.0.0"
|
||||
dependencies = [
|
||||
"assert_cmd",
|
||||
"async-std",
|
||||
"frame-benchmarking-cli",
|
||||
"frame-support",
|
||||
"frame-system",
|
||||
@@ -3918,6 +3930,7 @@ dependencies = [
|
||||
"sc-transaction-pool",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"soketto",
|
||||
"sp-authority-discovery",
|
||||
"sp-consensus",
|
||||
"sp-consensus-babe",
|
||||
@@ -7558,6 +7571,7 @@ dependencies = [
|
||||
"tokio 0.2.25",
|
||||
"tracing",
|
||||
"tracing-futures",
|
||||
"tracing-log",
|
||||
"tracing-subscriber",
|
||||
"wasm-timer",
|
||||
]
|
||||
|
||||
@@ -11,6 +11,7 @@ pub use sc_executor::NativeExecutor;
|
||||
use sp_consensus_aura::sr25519::{AuthorityPair as AuraPair};
|
||||
use sc_finality_grandpa::SharedVoterState;
|
||||
use sc_keystore::LocalKeystore;
|
||||
use sc_telemetry::TelemetrySpan;
|
||||
|
||||
// Our native executor instance.
|
||||
native_executor_instance!(
|
||||
@@ -162,6 +163,9 @@ pub fn new_full(mut config: Configuration) -> Result<TaskManager, ServiceError>
|
||||
})
|
||||
};
|
||||
|
||||
let telemetry_span = TelemetrySpan::new();
|
||||
let _telemetry_span_entered = telemetry_span.enter();
|
||||
|
||||
let (_rpc_handlers, telemetry_connection_notifier) = sc_service::spawn_tasks(
|
||||
sc_service::SpawnTasksParams {
|
||||
network: network.clone(),
|
||||
@@ -176,6 +180,7 @@ pub fn new_full(mut config: Configuration) -> Result<TaskManager, ServiceError>
|
||||
network_status_sinks,
|
||||
system_rpc_tx,
|
||||
config,
|
||||
telemetry_span: Some(telemetry_span.clone()),
|
||||
},
|
||||
)?;
|
||||
|
||||
@@ -312,6 +317,9 @@ pub fn new_light(mut config: Configuration) -> Result<TaskManager, ServiceError>
|
||||
);
|
||||
}
|
||||
|
||||
let telemetry_span = TelemetrySpan::new();
|
||||
let _telemetry_span_entered = telemetry_span.enter();
|
||||
|
||||
sc_service::spawn_tasks(sc_service::SpawnTasksParams {
|
||||
remote_blockchain: Some(backend.remote_blockchain()),
|
||||
transaction_pool,
|
||||
@@ -325,6 +333,7 @@ pub fn new_light(mut config: Configuration) -> Result<TaskManager, ServiceError>
|
||||
network,
|
||||
network_status_sinks,
|
||||
system_rpc_tx,
|
||||
telemetry_span: Some(telemetry_span.clone()),
|
||||
})?;
|
||||
|
||||
network_starter.start_network();
|
||||
|
||||
@@ -124,6 +124,8 @@ nix = "0.19"
|
||||
serde_json = "1.0"
|
||||
regex = "1"
|
||||
platforms = "1.1"
|
||||
async-std = { version = "1.6.5", features = ["attributes"] }
|
||||
soketto = "0.4.2"
|
||||
|
||||
[build-dependencies]
|
||||
structopt = { version = "0.3.8", optional = true }
|
||||
|
||||
@@ -34,7 +34,7 @@ use sp_runtime::traits::Block as BlockT;
|
||||
use futures::prelude::*;
|
||||
use sc_client_api::{ExecutorProvider, RemoteBackend};
|
||||
use node_executor::Executor;
|
||||
use sc_telemetry::TelemetryConnectionNotifier;
|
||||
use sc_telemetry::{TelemetryConnectionNotifier, TelemetrySpan};
|
||||
|
||||
type FullClient = sc_service::TFullClient<Block, RuntimeApi, Executor>;
|
||||
type FullBackend = sc_service::TFullBackend<Block>;
|
||||
@@ -226,6 +226,9 @@ pub fn new_full_base(
|
||||
let enable_grandpa = !config.disable_grandpa;
|
||||
let prometheus_registry = config.prometheus_registry().cloned();
|
||||
|
||||
let telemetry_span = TelemetrySpan::new();
|
||||
let _telemetry_span_entered = telemetry_span.enter();
|
||||
|
||||
let (_rpc_handlers, telemetry_connection_notifier) = sc_service::spawn_tasks(
|
||||
sc_service::SpawnTasksParams {
|
||||
config,
|
||||
@@ -240,6 +243,7 @@ pub fn new_full_base(
|
||||
remote_blockchain: None,
|
||||
network_status_sinks: network_status_sinks.clone(),
|
||||
system_rpc_tx,
|
||||
telemetry_span: Some(telemetry_span.clone()),
|
||||
},
|
||||
)?;
|
||||
|
||||
@@ -433,6 +437,9 @@ pub fn new_light_base(mut config: Configuration) -> Result<(
|
||||
|
||||
let rpc_extensions = node_rpc::create_light(light_deps);
|
||||
|
||||
let telemetry_span = TelemetrySpan::new();
|
||||
let _telemetry_span_entered = telemetry_span.enter();
|
||||
|
||||
let (rpc_handlers, telemetry_connection_notifier) =
|
||||
sc_service::spawn_tasks(sc_service::SpawnTasksParams {
|
||||
on_demand: Some(on_demand),
|
||||
@@ -444,6 +451,7 @@ pub fn new_light_base(mut config: Configuration) -> Result<(
|
||||
config, backend, network_status_sinks, system_rpc_tx,
|
||||
network: network.clone(),
|
||||
task_manager: &mut task_manager,
|
||||
telemetry_span: Some(telemetry_span.clone()),
|
||||
})?;
|
||||
|
||||
Ok((
|
||||
|
||||
@@ -0,0 +1,102 @@
|
||||
// This file is part of Substrate.
|
||||
|
||||
// Copyright (C) 2021 Parity Technologies (UK) Ltd.
|
||||
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
|
||||
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
use assert_cmd::cargo::cargo_bin;
|
||||
use nix::sys::signal::{kill, Signal::SIGINT};
|
||||
use nix::unistd::Pid;
|
||||
use std::convert::TryInto;
|
||||
use std::process;
|
||||
|
||||
pub mod common;
|
||||
pub mod websocket_server;
|
||||
|
||||
#[async_std::test]
|
||||
async fn telemetry_works() {
|
||||
let config = websocket_server::Config {
|
||||
capacity: 1,
|
||||
max_frame_size: 1048 * 1024,
|
||||
send_buffer_len: 32,
|
||||
bind_address: "127.0.0.1:0".parse().unwrap(),
|
||||
};
|
||||
let mut server = websocket_server::WsServer::new(config).await.unwrap();
|
||||
|
||||
let addr = server.local_addr().unwrap();
|
||||
|
||||
let server_task = async_std::task::spawn(async move {
|
||||
loop {
|
||||
use websocket_server::Event;
|
||||
match server.next_event().await {
|
||||
// New connection on the listener.
|
||||
Event::ConnectionOpen { address } => {
|
||||
println!("New connection from {:?}", address);
|
||||
server.accept();
|
||||
}
|
||||
|
||||
// Received a message from a connection.
|
||||
Event::BinaryFrame { message, .. } => {
|
||||
let json: serde_json::Value = serde_json::from_slice(&message).unwrap();
|
||||
let object = json
|
||||
.as_object()
|
||||
.unwrap()
|
||||
.get("payload")
|
||||
.unwrap()
|
||||
.as_object()
|
||||
.unwrap();
|
||||
if matches!(object.get("best"), Some(serde_json::Value::String(_))) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Event::TextFrame { .. } => panic!("Got a TextFrame over the socket, this is a bug"),
|
||||
|
||||
// Connection has been closed.
|
||||
Event::ConnectionError { .. } => {}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let mut substrate = process::Command::new(cargo_bin("substrate"));
|
||||
|
||||
let mut substrate = substrate
|
||||
.args(&["--dev", "--tmp", "--telemetry-url"])
|
||||
.arg(format!("ws://{} 10", addr))
|
||||
.stdout(process::Stdio::piped())
|
||||
.stderr(process::Stdio::piped())
|
||||
.stdin(process::Stdio::null())
|
||||
.spawn()
|
||||
.unwrap();
|
||||
|
||||
server_task.await;
|
||||
|
||||
assert!(
|
||||
substrate.try_wait().unwrap().is_none(),
|
||||
"the process should still be running"
|
||||
);
|
||||
|
||||
// Stop the process
|
||||
kill(Pid::from_raw(substrate.id().try_into().unwrap()), SIGINT).unwrap();
|
||||
assert!(common::wait_for(&mut substrate, 40)
|
||||
.map(|x| x.success())
|
||||
.unwrap_or_default());
|
||||
|
||||
let output = substrate.wait_with_output().unwrap();
|
||||
|
||||
println!("{}", String::from_utf8(output.stdout).unwrap());
|
||||
eprintln!("{}", String::from_utf8(output.stderr).unwrap());
|
||||
assert!(output.status.success());
|
||||
}
|
||||
@@ -0,0 +1,281 @@
|
||||
// This file is part of Substrate.
|
||||
|
||||
// Copyright (C) 2021 Parity Technologies (UK) Ltd.
|
||||
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
|
||||
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
use async_std::net::{TcpListener, TcpStream};
|
||||
use core::pin::Pin;
|
||||
use futures::prelude::*;
|
||||
use soketto::handshake::{server::Response, Server};
|
||||
use std::{io, net::SocketAddr};
|
||||
|
||||
/// Configuration for a [`WsServer`].
|
||||
pub struct Config {
|
||||
/// IP address to try to bind to.
|
||||
pub bind_address: SocketAddr,
|
||||
|
||||
/// Maximum size, in bytes, of a frame sent by the remote.
|
||||
///
|
||||
/// Since the messages are entirely buffered before being returned, a maximum value is
|
||||
/// necessary in order to prevent malicious clients from sending huge frames that would
|
||||
/// occupy a lot of memory.
|
||||
pub max_frame_size: usize,
|
||||
|
||||
/// Number of pending messages to buffer up for sending before the socket is considered
|
||||
/// unresponsive.
|
||||
pub send_buffer_len: usize,
|
||||
|
||||
/// Pre-allocated capacity for the list of connections.
|
||||
pub capacity: usize,
|
||||
}
|
||||
|
||||
/// Identifier for a connection with regard to a [`WsServer`].
|
||||
///
|
||||
/// After a connection has been closed, its [`ConnectionId`] might be reused.
|
||||
#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash)]
|
||||
pub struct ConnectionId(u64);
|
||||
|
||||
/// A WebSocket message.
|
||||
pub enum Message {
|
||||
Text(String),
|
||||
Binary(Vec<u8>),
|
||||
}
|
||||
|
||||
/// WebSockets listening socket and list of open connections.
|
||||
pub struct WsServer {
|
||||
/// Value passed through [`Config::max_frame_size`].
|
||||
max_frame_size: usize,
|
||||
|
||||
/// Endpoint for incoming TCP sockets.
|
||||
listener: TcpListener,
|
||||
|
||||
/// Pending incoming connection to accept. Accepted by calling [`WsServer::accept`].
|
||||
pending_incoming: Option<TcpStream>,
|
||||
|
||||
/// List of TCP connections that are currently negotiating the WebSocket handshake.
|
||||
///
|
||||
/// The output can be an error if the handshake fails.
|
||||
negotiating: stream::FuturesUnordered<
|
||||
Pin<
|
||||
Box<
|
||||
dyn Future<Output = Result<Server<'static, TcpStream>, Box<dyn std::error::Error>>>
|
||||
+ Send,
|
||||
>,
|
||||
>,
|
||||
>,
|
||||
|
||||
/// List of streams of incoming messages for all connections.
|
||||
incoming_messages: stream::SelectAll<
|
||||
Pin<Box<dyn Stream<Item = Result<Message, Box<dyn std::error::Error>>> + Send>>,
|
||||
>,
|
||||
|
||||
/// Tasks dedicated to closing sockets that have been rejected.
|
||||
rejected_sockets: stream::FuturesUnordered<Pin<Box<dyn Future<Output = ()> + Send>>>,
|
||||
}
|
||||
|
||||
impl WsServer {
|
||||
/// Try opening a TCP listening socket.
|
||||
///
|
||||
/// Returns an error if the listening socket fails to open.
|
||||
pub async fn new(config: Config) -> Result<Self, io::Error> {
|
||||
let listener = TcpListener::bind(config.bind_address).await?;
|
||||
|
||||
Ok(WsServer {
|
||||
max_frame_size: config.max_frame_size,
|
||||
listener,
|
||||
pending_incoming: None,
|
||||
negotiating: stream::FuturesUnordered::new(),
|
||||
incoming_messages: stream::SelectAll::new(),
|
||||
rejected_sockets: stream::FuturesUnordered::new(),
|
||||
})
|
||||
}
|
||||
|
||||
/// Address of the local TCP listening socket, as provided by the operating system.
|
||||
pub fn local_addr(&self) -> Result<SocketAddr, io::Error> {
|
||||
self.listener.local_addr()
|
||||
}
|
||||
|
||||
/// Accepts the pending connection.
|
||||
///
|
||||
/// Either [`WsServer::accept`] or [`WsServer::reject`] must be called after a
|
||||
/// [`Event::ConnectionOpen`] event is returned.
|
||||
///
|
||||
/// # Panic
|
||||
///
|
||||
/// Panics if no connection is pending.
|
||||
///
|
||||
pub fn accept(&mut self) {
|
||||
let pending_incoming = self.pending_incoming.take().expect("no pending socket");
|
||||
|
||||
self.negotiating.push(Box::pin(async move {
|
||||
let mut server = Server::new(pending_incoming);
|
||||
|
||||
let websocket_key = match server.receive_request().await {
|
||||
Ok(req) => req.into_key(),
|
||||
Err(err) => return Err(Box::new(err) as Box<_>),
|
||||
};
|
||||
|
||||
match server
|
||||
.send_response(&{
|
||||
Response::Accept {
|
||||
key: &websocket_key,
|
||||
protocol: None,
|
||||
}
|
||||
})
|
||||
.await
|
||||
{
|
||||
Ok(()) => {}
|
||||
Err(err) => return Err(Box::new(err) as Box<_>),
|
||||
};
|
||||
|
||||
Ok(server)
|
||||
}));
|
||||
}
|
||||
|
||||
/// Reject the pending connection.
|
||||
///
|
||||
/// Either [`WsServer::accept`] or [`WsServer::reject`] must be called after a
|
||||
/// [`Event::ConnectionOpen`] event is returned.
|
||||
///
|
||||
/// # Panic
|
||||
///
|
||||
/// Panics if no connection is pending.
|
||||
///
|
||||
pub fn reject(&mut self) {
|
||||
let _ = self.pending_incoming.take().expect("no pending socket");
|
||||
}
|
||||
|
||||
/// Returns the next event happening on the server.
|
||||
pub async fn next_event(&mut self) -> Event {
|
||||
loop {
|
||||
futures::select! {
|
||||
// Only try to fetch a new incoming connection if none is pending.
|
||||
socket = {
|
||||
let listener = &self.listener;
|
||||
let has_pending = self.pending_incoming.is_some();
|
||||
async move {
|
||||
if !has_pending {
|
||||
listener.accept().await
|
||||
} else {
|
||||
loop { futures::pending!() }
|
||||
}
|
||||
}
|
||||
}.fuse() => {
|
||||
let (socket, address) = match socket {
|
||||
Ok(s) => s,
|
||||
Err(_) => continue,
|
||||
};
|
||||
debug_assert!(self.pending_incoming.is_none());
|
||||
self.pending_incoming = Some(socket);
|
||||
return Event::ConnectionOpen { address };
|
||||
},
|
||||
|
||||
result = self.negotiating.select_next_some() => {
|
||||
let server = match result {
|
||||
Ok(s) => s,
|
||||
Err(error) => return Event::ConnectionError {
|
||||
error,
|
||||
},
|
||||
};
|
||||
|
||||
let (mut _sender, receiver) = {
|
||||
let mut builder = server.into_builder();
|
||||
builder.set_max_frame_size(self.max_frame_size);
|
||||
builder.set_max_message_size(self.max_frame_size);
|
||||
builder.finish()
|
||||
};
|
||||
|
||||
// Spawn a task dedicated to receiving messages from the socket.
|
||||
self.incoming_messages.push({
|
||||
// Turn `receiver` into a stream of received packets.
|
||||
let socket_packets = stream::unfold((receiver, Vec::new()), move |(mut receiver, mut buf)| async {
|
||||
buf.clear();
|
||||
let ret = match receiver.receive_data(&mut buf).await {
|
||||
Ok(soketto::Data::Text(len)) => String::from_utf8(buf[..len].to_vec())
|
||||
.map(Message::Text)
|
||||
.map_err(|err| Box::new(err) as Box<_>),
|
||||
Ok(soketto::Data::Binary(len)) => Ok(buf[..len].to_vec())
|
||||
.map(Message::Binary),
|
||||
Err(err) => Err(Box::new(err) as Box<_>),
|
||||
};
|
||||
Some((ret, (receiver, buf)))
|
||||
});
|
||||
|
||||
Box::pin(socket_packets.map(move |msg| (msg)))
|
||||
});
|
||||
},
|
||||
|
||||
result = self.incoming_messages.select_next_some() => {
|
||||
let message = match result {
|
||||
Ok(m) => m,
|
||||
Err(error) => return Event::ConnectionError {
|
||||
error,
|
||||
},
|
||||
};
|
||||
|
||||
match message {
|
||||
Message::Text(message) => {
|
||||
return Event::TextFrame {
|
||||
message,
|
||||
}
|
||||
}
|
||||
Message::Binary(message) => {
|
||||
return Event::BinaryFrame {
|
||||
message,
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
|
||||
_ = self.rejected_sockets.select_next_some() => {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Event that has happened on a [`WsServer`].
|
||||
#[derive(Debug)]
|
||||
pub enum Event {
|
||||
/// A new TCP connection has arrived on the listening socket.
|
||||
///
|
||||
/// The connection *must* be accepted or rejected using [`WsServer::accept`] or
|
||||
/// [`WsServer::reject`].
|
||||
/// No other [`Event::ConnectionOpen`] event will be generated until the current pending
|
||||
/// connection has been either accepted or rejected.
|
||||
ConnectionOpen {
|
||||
/// Address of the remote, as provided by the operating system.
|
||||
address: SocketAddr,
|
||||
},
|
||||
|
||||
/// An error has happened on a connection. The connection is now closed and its
|
||||
/// [`ConnectionId`] is now invalid.
|
||||
ConnectionError { error: Box<dyn std::error::Error> },
|
||||
|
||||
/// A text frame has been received on a connection.
|
||||
TextFrame {
|
||||
/// Message sent by the remote. Its content is entirely decided by the client, and
|
||||
/// nothing must be assumed about the validity of this message.
|
||||
message: String,
|
||||
},
|
||||
|
||||
/// A text frame has been received on a connection.
|
||||
BinaryFrame {
|
||||
/// Message sent by the remote. Its content is entirely decided by the client, and
|
||||
/// nothing must be assumed about the validity of this message.
|
||||
message: Vec<u8>,
|
||||
},
|
||||
}
|
||||
@@ -33,7 +33,7 @@ use sc_service::config::{
|
||||
TaskExecutor, TelemetryEndpoints, TransactionPoolOptions, WasmExecutionMethod,
|
||||
};
|
||||
use sc_service::{ChainSpec, TracingReceiver, KeepBlocks, TransactionStorageMode};
|
||||
use sc_telemetry::{TelemetryHandle, TelemetrySpan};
|
||||
use sc_telemetry::TelemetryHandle;
|
||||
use sc_tracing::logging::LoggerBuilder;
|
||||
use std::net::SocketAddr;
|
||||
use std::path::PathBuf;
|
||||
@@ -494,7 +494,6 @@ pub trait CliConfiguration<DCV: DefaultConfigurationValues = ()>: Sized {
|
||||
.transpose()?
|
||||
// Don't initialise telemetry if `telemetry_endpoints` == Some([])
|
||||
.filter(|x| !x.is_empty());
|
||||
let telemetry_span = telemetry_endpoints.as_ref().map(|_| TelemetrySpan::new());
|
||||
|
||||
let unsafe_pruning = self
|
||||
.import_params()
|
||||
@@ -534,7 +533,6 @@ pub trait CliConfiguration<DCV: DefaultConfigurationValues = ()>: Sized {
|
||||
rpc_cors: self.rpc_cors(is_dev)?,
|
||||
prometheus_config: self.prometheus_config(DCV::prometheus_listen_port())?,
|
||||
telemetry_endpoints,
|
||||
telemetry_span,
|
||||
telemetry_external_transport: self.telemetry_external_transport()?,
|
||||
default_heap_pages: self.default_heap_pages()?,
|
||||
offchain_worker: self.offchain_worker(&role)?,
|
||||
|
||||
@@ -92,3 +92,4 @@ grandpa-primitives = { version = "3.0.0", package = "sp-finality-grandpa", path
|
||||
tokio = { version = "0.2.25", default-features = false }
|
||||
async-std = { version = "1.6.5", default-features = false }
|
||||
tracing-subscriber = "0.2.15"
|
||||
tracing-log = "0.1.1"
|
||||
|
||||
@@ -56,6 +56,7 @@ use sc_telemetry::{
|
||||
telemetry,
|
||||
ConnectionMessage,
|
||||
TelemetryConnectionNotifier,
|
||||
TelemetrySpan,
|
||||
SUBSTRATE_INFO,
|
||||
};
|
||||
use sp_transaction_pool::MaintainedTransactionPool;
|
||||
@@ -308,7 +309,7 @@ pub fn new_full_parts<TBl, TRtApi, TExecDisp>(
|
||||
|
||||
let task_manager = {
|
||||
let registry = config.prometheus_config.as_ref().map(|cfg| &cfg.registry);
|
||||
TaskManager::new(config.task_executor.clone(), registry, config.telemetry_span.clone())?
|
||||
TaskManager::new(config.task_executor.clone(), registry)?
|
||||
};
|
||||
|
||||
let executor = NativeExecutor::<TExecDisp>::new(
|
||||
@@ -377,7 +378,7 @@ pub fn new_light_parts<TBl, TRtApi, TExecDisp>(
|
||||
let keystore_container = KeystoreContainer::new(&config.keystore)?;
|
||||
let task_manager = {
|
||||
let registry = config.prometheus_config.as_ref().map(|cfg| &cfg.registry);
|
||||
TaskManager::new(config.task_executor.clone(), registry, config.telemetry_span.clone())?
|
||||
TaskManager::new(config.task_executor.clone(), registry)?
|
||||
};
|
||||
|
||||
let executor = NativeExecutor::<TExecDisp>::new(
|
||||
@@ -491,6 +492,10 @@ pub struct SpawnTasksParams<'a, TBl: BlockT, TCl, TExPool, TRpc, Backend> {
|
||||
pub network_status_sinks: NetworkStatusSinks<TBl>,
|
||||
/// A Sender for RPC requests.
|
||||
pub system_rpc_tx: TracingUnboundedSender<sc_rpc::system::Request<TBl>>,
|
||||
/// Telemetry span.
|
||||
///
|
||||
/// This span needs to be entered **before** calling [`spawn_tasks()`].
|
||||
pub telemetry_span: Option<TelemetrySpan>,
|
||||
}
|
||||
|
||||
/// Build a shared offchain workers instance.
|
||||
@@ -569,6 +574,7 @@ pub fn spawn_tasks<TBl, TBackend, TExPool, TRpc, TCl>(
|
||||
network,
|
||||
network_status_sinks,
|
||||
system_rpc_tx,
|
||||
telemetry_span,
|
||||
} = params;
|
||||
|
||||
let chain_info = client.usage_info().chain;
|
||||
@@ -581,6 +587,7 @@ pub fn spawn_tasks<TBl, TBackend, TExPool, TRpc, TCl>(
|
||||
|
||||
let telemetry_connection_notifier = init_telemetry(
|
||||
&mut config,
|
||||
telemetry_span,
|
||||
network.clone(),
|
||||
client.clone(),
|
||||
);
|
||||
@@ -681,10 +688,11 @@ async fn transaction_notifications<TBl, TExPool>(
|
||||
|
||||
fn init_telemetry<TBl: BlockT, TCl: BlockBackend<TBl>>(
|
||||
config: &mut Configuration,
|
||||
telemetry_span: Option<TelemetrySpan>,
|
||||
network: Arc<NetworkService<TBl, <TBl as BlockT>::Hash>>,
|
||||
client: Arc<TCl>,
|
||||
) -> Option<TelemetryConnectionNotifier> {
|
||||
let telemetry_span = config.telemetry_span.clone()?;
|
||||
let telemetry_span = telemetry_span?;
|
||||
let endpoints = config.telemetry_endpoints.clone()?;
|
||||
let genesis_hash = client.block_hash(Zero::zero()).ok().flatten().unwrap_or_default();
|
||||
let connection_message = ConnectionMessage {
|
||||
|
||||
@@ -101,10 +101,6 @@ pub struct Configuration {
|
||||
/// This is a handle to a `TelemetryWorker` instance. It is used to initialize the telemetry for
|
||||
/// a substrate node.
|
||||
pub telemetry_handle: Option<sc_telemetry::TelemetryHandle>,
|
||||
/// Telemetry span.
|
||||
///
|
||||
/// This span is entered for every background task spawned using the TaskManager.
|
||||
pub telemetry_span: Option<sc_telemetry::TelemetrySpan>,
|
||||
/// The default number of 64KB pages to allocate for Wasm execution
|
||||
pub default_heap_pages: Option<u64>,
|
||||
/// Should offchain workers be executed.
|
||||
|
||||
@@ -24,7 +24,7 @@ use log::{debug, error};
|
||||
use futures::{
|
||||
Future, FutureExt, StreamExt,
|
||||
future::{select, Either, BoxFuture, join_all, try_join_all, pending},
|
||||
sink::SinkExt, task::{Context, Poll},
|
||||
sink::SinkExt,
|
||||
};
|
||||
use prometheus_endpoint::{
|
||||
exponential_buckets, register,
|
||||
@@ -34,43 +34,11 @@ use prometheus_endpoint::{
|
||||
use sp_utils::mpsc::{TracingUnboundedSender, TracingUnboundedReceiver, tracing_unbounded};
|
||||
use tracing_futures::Instrument;
|
||||
use crate::{config::{TaskExecutor, TaskType, JoinFuture}, Error};
|
||||
use sc_telemetry::TelemetrySpan;
|
||||
|
||||
mod prometheus_future;
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
/// A wrapper around a `[Option<TelemetrySpan>]` and a [`Future`].
|
||||
///
|
||||
/// The telemetry in Substrate uses a span to identify the telemetry context. The span "infrastructure"
|
||||
/// is provided by the tracing-crate. Now it is possible to have your own spans as well. To support
|
||||
/// this with the [`TaskManager`] we have this wrapper. This wrapper enters the telemetry span every
|
||||
/// time the future is polled and polls the inner future. So, the inner future can still have its
|
||||
/// own span attached and we get our telemetry span ;)
|
||||
struct WithTelemetrySpan<T> {
|
||||
span: Option<TelemetrySpan>,
|
||||
inner: T,
|
||||
}
|
||||
|
||||
impl<T> WithTelemetrySpan<T> {
|
||||
fn new(span: Option<TelemetrySpan>, inner: T) -> Self {
|
||||
Self {
|
||||
span,
|
||||
inner,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Future<Output = ()> + Unpin> Future for WithTelemetrySpan<T> {
|
||||
type Output = ();
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
|
||||
let span = self.span.clone();
|
||||
let _enter = span.as_ref().map(|s| s.enter());
|
||||
Pin::new(&mut self.inner).poll(ctx)
|
||||
}
|
||||
}
|
||||
|
||||
/// An handle for spawning tasks in the service.
|
||||
#[derive(Clone)]
|
||||
pub struct SpawnTaskHandle {
|
||||
@@ -78,7 +46,6 @@ pub struct SpawnTaskHandle {
|
||||
executor: TaskExecutor,
|
||||
metrics: Option<Metrics>,
|
||||
task_notifier: TracingUnboundedSender<JoinFuture>,
|
||||
telemetry_span: Option<TelemetrySpan>,
|
||||
}
|
||||
|
||||
impl SpawnTaskHandle {
|
||||
@@ -155,11 +122,7 @@ impl SpawnTaskHandle {
|
||||
}
|
||||
};
|
||||
|
||||
let future = future.in_current_span().boxed();
|
||||
let join_handle = self.executor.spawn(
|
||||
WithTelemetrySpan::new(self.telemetry_span.clone(), future).boxed(),
|
||||
task_type,
|
||||
);
|
||||
let join_handle = self.executor.spawn(future.in_current_span().boxed(), task_type);
|
||||
|
||||
let mut task_notifier = self.task_notifier.clone();
|
||||
self.executor.spawn(
|
||||
@@ -266,8 +229,6 @@ pub struct TaskManager {
|
||||
/// terminates and gracefully shutdown. Also ends the parent `future()` if a child's essential
|
||||
/// task fails.
|
||||
children: Vec<TaskManager>,
|
||||
/// A `TelemetrySpan` used to enter the telemetry span when a task is spawned.
|
||||
telemetry_span: Option<TelemetrySpan>,
|
||||
}
|
||||
|
||||
impl TaskManager {
|
||||
@@ -276,7 +237,6 @@ impl TaskManager {
|
||||
pub(super) fn new(
|
||||
executor: TaskExecutor,
|
||||
prometheus_registry: Option<&Registry>,
|
||||
telemetry_span: Option<TelemetrySpan>,
|
||||
) -> Result<Self, PrometheusError> {
|
||||
let (signal, on_exit) = exit_future::signal();
|
||||
|
||||
@@ -305,7 +265,6 @@ impl TaskManager {
|
||||
task_notifier,
|
||||
completion_future,
|
||||
children: Vec::new(),
|
||||
telemetry_span,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -316,7 +275,6 @@ impl TaskManager {
|
||||
executor: self.executor.clone(),
|
||||
metrics: self.metrics.clone(),
|
||||
task_notifier: self.task_notifier.clone(),
|
||||
telemetry_span: self.telemetry_span.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -20,10 +20,14 @@ use crate::config::TaskExecutor;
|
||||
use crate::task_manager::TaskManager;
|
||||
use futures::{future::FutureExt, pin_mut, select};
|
||||
use parking_lot::Mutex;
|
||||
use std::{any::Any, sync::Arc, time::Duration};
|
||||
use tracing_subscriber::{layer::{SubscriberExt, Context}, Layer};
|
||||
use tracing::{subscriber::Subscriber, span::{Attributes, Id, Record, Span}, event::Event};
|
||||
use sc_telemetry::TelemetrySpan;
|
||||
use std::{any::Any, env, sync::Arc, time::Duration};
|
||||
use tracing::{event::Event, span::Id, subscriber::Subscriber};
|
||||
use tracing_subscriber::{
|
||||
layer::{Context, SubscriberExt},
|
||||
registry::LookupSpan,
|
||||
Layer,
|
||||
};
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
struct DropTester(Arc<Mutex<usize>>);
|
||||
@@ -83,7 +87,7 @@ async fn run_background_task_blocking(duration: Duration, _keep_alive: impl Any)
|
||||
}
|
||||
|
||||
fn new_task_manager(task_executor: TaskExecutor) -> TaskManager {
|
||||
TaskManager::new(task_executor, None, None).unwrap()
|
||||
TaskManager::new(task_executor, None).unwrap()
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -315,92 +319,92 @@ fn ensure_task_manager_future_continues_when_childs_not_essential_task_fails() {
|
||||
}
|
||||
|
||||
struct TestLayer {
|
||||
spans_entered: Arc<Mutex<Vec<String>>>,
|
||||
spans: Arc<Mutex<std::collections::HashMap<Id, String>>>,
|
||||
spans_found: Arc<Mutex<Option<Vec<Id>>>>,
|
||||
}
|
||||
|
||||
impl<S: Subscriber> Layer<S> for TestLayer {
|
||||
fn new_span(&self, attrs: &Attributes<'_>, id: &Id, _ctx: Context<S>) {
|
||||
self.spans.lock().insert(id.clone(), attrs.metadata().name().to_string());
|
||||
impl<S> Layer<S> for TestLayer
|
||||
where
|
||||
S: Subscriber + for<'a> LookupSpan<'a>,
|
||||
{
|
||||
fn on_event(&self, _: &Event<'_>, ctx: Context<S>) {
|
||||
let mut spans_found = self.spans_found.lock();
|
||||
|
||||
if spans_found.is_some() {
|
||||
panic!("on_event called multiple times");
|
||||
}
|
||||
|
||||
*spans_found = Some(ctx.scope().map(|x| x.id()).collect());
|
||||
}
|
||||
|
||||
fn on_record(&self, _: &Id, _: &Record<'_>, _: Context<S>) {}
|
||||
|
||||
fn on_event(&self, _: &Event<'_>, _: Context<S>) {}
|
||||
|
||||
fn on_enter(&self, span: &Id, _: Context<S>) {
|
||||
let name = self.spans.lock().get(span).unwrap().clone();
|
||||
self.spans_entered.lock().push(name);
|
||||
}
|
||||
|
||||
fn on_exit(&self, _: &Id, _: Context<S>) {}
|
||||
|
||||
fn on_close(&self, _: Id, _: Context<S>) {}
|
||||
}
|
||||
|
||||
type TestSubscriber = tracing_subscriber::layer::Layered<
|
||||
TestLayer,
|
||||
tracing_subscriber::fmt::Subscriber
|
||||
>;
|
||||
|
||||
fn setup_subscriber() -> (
|
||||
TestSubscriber,
|
||||
Arc<Mutex<Vec<String>>>,
|
||||
impl Subscriber + for<'a> LookupSpan<'a>,
|
||||
Arc<Mutex<Option<Vec<Id>>>>,
|
||||
) {
|
||||
let spans_entered = Arc::new(Mutex::new(Default::default()));
|
||||
let spans_found = Arc::new(Mutex::new(Default::default()));
|
||||
let layer = TestLayer {
|
||||
spans: Arc::new(Mutex::new(Default::default())),
|
||||
spans_entered: spans_entered.clone(),
|
||||
spans_found: spans_found.clone(),
|
||||
};
|
||||
let subscriber = tracing_subscriber::fmt().finish().with(layer);
|
||||
(subscriber, spans_entered)
|
||||
(subscriber, spans_found)
|
||||
}
|
||||
|
||||
/// This is not an actual test, it is used by the `telemetry_span_is_forwarded_to_task` test.
|
||||
/// The given test will call the test executable and only execute this one test that
|
||||
/// test that the telemetry span and the prefix span are forwarded correctly. This needs to be done
|
||||
/// in a separate process to avoid interfering with the other tests.
|
||||
#[test]
|
||||
fn subprocess_telemetry_span_is_forwarded_to_task() {
|
||||
if env::var("SUBPROCESS_TEST").is_err() {
|
||||
return;
|
||||
}
|
||||
|
||||
let (subscriber, spans_found) = setup_subscriber();
|
||||
tracing_log::LogTracer::init().unwrap();
|
||||
let _sub_guard = tracing::subscriber::set_global_default(subscriber);
|
||||
|
||||
let mut runtime = tokio::runtime::Runtime::new().unwrap();
|
||||
|
||||
let prefix_span = tracing::info_span!("prefix");
|
||||
let _enter_prefix_span = prefix_span.enter();
|
||||
|
||||
let telemetry_span = TelemetrySpan::new();
|
||||
let _enter_telemetry_span = telemetry_span.enter();
|
||||
|
||||
let handle = runtime.handle().clone();
|
||||
let task_executor = TaskExecutor::from(move |fut, _| handle.spawn(fut).map(|_| ()));
|
||||
let task_manager = new_task_manager(task_executor);
|
||||
|
||||
let (sender, receiver) = futures::channel::oneshot::channel();
|
||||
|
||||
task_manager.spawn_handle().spawn(
|
||||
"log-something",
|
||||
async move {
|
||||
log::info!("boo!");
|
||||
sender.send(()).unwrap();
|
||||
}
|
||||
.boxed(),
|
||||
);
|
||||
|
||||
runtime.block_on(receiver).unwrap();
|
||||
runtime.block_on(task_manager.clean_shutdown());
|
||||
|
||||
let spans = spans_found.lock().take().unwrap();
|
||||
assert_eq!(2, spans.len());
|
||||
|
||||
assert_eq!(spans[0], prefix_span.id().unwrap());
|
||||
assert_eq!(spans[1], telemetry_span.span().id().unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn telemetry_span_is_forwarded_to_task() {
|
||||
let (subscriber, spans_entered) = setup_subscriber();
|
||||
let _sub_guard = tracing::subscriber::set_global_default(subscriber);
|
||||
|
||||
let telemetry_span = TelemetrySpan::new();
|
||||
|
||||
let span = tracing::info_span!("test");
|
||||
let _enter = span.enter();
|
||||
|
||||
let mut runtime = tokio::runtime::Runtime::new().unwrap();
|
||||
let handle = runtime.handle().clone();
|
||||
let task_executor = TaskExecutor::from(move |fut, _| handle.spawn(fut).map(|_| ()));
|
||||
let task_manager = TaskManager::new(task_executor, None, Some(telemetry_span.clone())).unwrap();
|
||||
|
||||
let (sender, receiver) = futures::channel::oneshot::channel();
|
||||
let spawn_handle = task_manager.spawn_handle();
|
||||
|
||||
let span = span.clone();
|
||||
task_manager.spawn_handle().spawn(
|
||||
"test",
|
||||
async move {
|
||||
assert_eq!(span, Span::current());
|
||||
spawn_handle.spawn("test-nested", async move {
|
||||
assert_eq!(span, Span::current());
|
||||
sender.send(()).unwrap();
|
||||
}.boxed());
|
||||
}.boxed(),
|
||||
);
|
||||
|
||||
// We need to leave exit the span here. If tokio is not running with multithreading, this
|
||||
// would lead to duplicate spans being "active" and forwarding the wrong one.
|
||||
drop(_enter);
|
||||
runtime.block_on(receiver).unwrap();
|
||||
runtime.block_on(task_manager.clean_shutdown());
|
||||
drop(runtime);
|
||||
|
||||
let spans = spans_entered.lock();
|
||||
// We entered the telemetry span and the "test" in the future, the nested future and
|
||||
// the "test" span outside of the future. So, we should have recorded 3 spans.
|
||||
assert_eq!(5, spans.len());
|
||||
|
||||
assert_eq!(spans[0], "test");
|
||||
assert_eq!(spans[1], telemetry_span.span().metadata().unwrap().name());
|
||||
assert_eq!(spans[2], "test");
|
||||
assert_eq!(spans[3], telemetry_span.span().metadata().unwrap().name());
|
||||
assert_eq!(spans[4], "test");
|
||||
let executable = env::current_exe().unwrap();
|
||||
let output = std::process::Command::new(executable)
|
||||
.env("SUBPROCESS_TEST", "1")
|
||||
.args(&["--nocapture", "subprocess_telemetry_span_is_forwarded_to_task"])
|
||||
.output()
|
||||
.unwrap();
|
||||
println!("{}", String::from_utf8(output.stdout).unwrap());
|
||||
eprintln!("{}", String::from_utf8(output.stderr).unwrap());
|
||||
assert!(output.status.success());
|
||||
}
|
||||
|
||||
@@ -268,7 +268,6 @@ fn node_config<G: RuntimeGenesis + 'static, E: ChainSpecExtension + Clone + 'sta
|
||||
telemetry_endpoints: None,
|
||||
telemetry_external_transport: None,
|
||||
telemetry_handle: None,
|
||||
telemetry_span: None,
|
||||
default_heap_pages: None,
|
||||
offchain_worker: Default::default(),
|
||||
force_authoring: false,
|
||||
|
||||
@@ -24,7 +24,7 @@ use sc_service::{
|
||||
GenericChainSpec, RuntimeGenesis,
|
||||
KeepBlocks, TransactionStorageMode,
|
||||
};
|
||||
use sc_telemetry::{TelemetryHandle, TelemetrySpan};
|
||||
use sc_telemetry::TelemetryHandle;
|
||||
use sc_tracing::logging::LoggerBuilder;
|
||||
use wasm_bindgen::prelude::*;
|
||||
use futures::{
|
||||
@@ -72,7 +72,6 @@ where
|
||||
allow_private_ipv4: true,
|
||||
enable_mdns: false,
|
||||
};
|
||||
let telemetry_span = telemetry_handle.as_ref().map(|_| TelemetrySpan::new());
|
||||
|
||||
let config = Configuration {
|
||||
network,
|
||||
@@ -84,7 +83,6 @@ where
|
||||
}).into(),
|
||||
telemetry_external_transport: Some(transport),
|
||||
telemetry_handle,
|
||||
telemetry_span,
|
||||
role: Role::Light,
|
||||
database: {
|
||||
info!("Opening Indexed DB database '{}'...", name);
|
||||
|
||||
Reference in New Issue
Block a user