Rework telemetry to replace the use of tracing with an object we pass around (#8143)

polkadot companion: paritytech/polkadot#2535
This commit is contained in:
Cecile Tonglet
2021-03-11 11:05:45 +01:00
committed by GitHub
parent 7aaba0c154
commit 8031b6eacb
55 changed files with 1028 additions and 838 deletions
+214 -138
View File
@@ -29,7 +29,7 @@
//! identify which substrate node is reporting the telemetry. Every task spawned using sc-service's
//! `TaskManager` automatically inherit this span.
//!
//! Substrate's nodes initialize/register with the [`TelemetryWorker`] using a [`TelemetryHandle`].
//! Substrate's nodes initialize/register with the [`TelemetryWorker`] using a [`TelemetryWorkerHandle`].
//! This handle can be cloned and passed around. It uses an asynchronous channel to communicate with
//! the running [`TelemetryWorker`] dedicated to registration. Registering can happen at any point
//! in time during the process execution.
@@ -39,61 +39,45 @@
use futures::{channel::mpsc, prelude::*};
use libp2p::Multiaddr;
use log::{error, warn};
use parking_lot::Mutex;
use serde::Serialize;
use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver};
use std::collections::HashMap;
use tracing::Id;
use std::sync::{atomic, Arc};
pub use libp2p::wasm_ext::ExtTransport;
pub use log;
pub use serde_json;
pub use tracing;
mod endpoints;
mod layer;
mod error;
mod node;
mod transport;
pub use endpoints::*;
pub use layer::*;
pub use error::*;
use node::*;
use transport::*;
/// Substrate DEBUG log level.
pub const SUBSTRATE_DEBUG: u8 = 9;
pub const SUBSTRATE_DEBUG: VerbosityLevel = 9;
/// Substrate INFO log level.
pub const SUBSTRATE_INFO: u8 = 0;
pub const SUBSTRATE_INFO: VerbosityLevel = 0;
/// Consensus TRACE log level.
pub const CONSENSUS_TRACE: u8 = 9;
pub const CONSENSUS_TRACE: VerbosityLevel = 9;
/// Consensus DEBUG log level.
pub const CONSENSUS_DEBUG: u8 = 5;
pub const CONSENSUS_DEBUG: VerbosityLevel = 5;
/// Consensus WARN log level.
pub const CONSENSUS_WARN: u8 = 4;
pub const CONSENSUS_WARN: VerbosityLevel = 4;
/// Consensus INFO log level.
pub const CONSENSUS_INFO: u8 = 1;
pub const CONSENSUS_INFO: VerbosityLevel = 1;
pub(crate) type TelemetryMessage = (Id, u8, String);
/// Telemetry message verbosity.
pub type VerbosityLevel = u8;
/// A handle representing a telemetry span, with the capability to enter the span if it exists.
#[derive(Debug, Clone)]
pub struct TelemetrySpan(tracing::Span);
impl TelemetrySpan {
/// Enters this span, returning a guard that will exit the span when dropped.
pub fn enter(&self) -> tracing::span::Entered {
self.0.enter()
}
/// Constructs a new [`TelemetrySpan`].
pub fn new() -> Self {
Self(tracing::error_span!(TELEMETRY_LOG_SPAN))
}
/// Return a clone of the underlying `tracing::Span` instance.
pub fn span(&self) -> tracing::Span {
self.0.clone()
}
}
pub(crate) type Id = u64;
pub(crate) type TelemetryPayload = serde_json::Map<String, serde_json::Value>;
pub(crate) type TelemetryMessage = (Id, VerbosityLevel, TelemetryPayload);
/// Message sent when the connection (re-)establishes.
#[derive(Debug, Serialize)]
@@ -129,64 +113,79 @@ pub struct TelemetryWorker {
message_sender: mpsc::Sender<TelemetryMessage>,
register_receiver: mpsc::UnboundedReceiver<Register>,
register_sender: mpsc::UnboundedSender<Register>,
id_counter: Arc<atomic::AtomicU64>,
transport: WsTrans,
}
impl TelemetryWorker {
pub(crate) fn new(buffer_size: usize, transport: WsTrans) -> Self {
/// Instantiate a new [`TelemetryWorker`] which can run in background.
///
/// Only one is needed per process.
pub fn new(buffer_size: usize) -> Result<Self> {
let transport = initialize_transport(None)?;
let (message_sender, message_receiver) = mpsc::channel(buffer_size);
let (register_sender, register_receiver) = mpsc::unbounded();
Self {
Ok(Self {
message_receiver,
message_sender,
register_receiver,
register_sender,
id_counter: Arc::new(atomic::AtomicU64::new(1)),
transport,
}
})
}
/// Get a new [`TelemetryHandle`].
/// Instantiate a new [`TelemetryWorker`] which can run in background.
///
/// Only one is needed per process.
pub fn with_transport(buffer_size: usize, transport: Option<ExtTransport>) -> Result<Self> {
let transport = initialize_transport(transport)?;
let (message_sender, message_receiver) = mpsc::channel(buffer_size);
let (register_sender, register_receiver) = mpsc::unbounded();
Ok(Self {
message_receiver,
message_sender,
register_receiver,
register_sender,
id_counter: Arc::new(atomic::AtomicU64::new(1)),
transport,
})
}
/// Get a new [`TelemetryWorkerHandle`].
///
/// This is used when you want to register with the [`TelemetryWorker`].
pub fn handle(&self) -> TelemetryHandle {
TelemetryHandle {
message_sender: self.register_sender.clone(),
pub fn handle(&self) -> TelemetryWorkerHandle {
TelemetryWorkerHandle {
message_sender: self.message_sender.clone(),
register_sender: self.register_sender.clone(),
id_counter: self.id_counter.clone(),
}
}
/// Get a clone of the channel's `Sender` used to send telemetry events.
pub(crate) fn message_sender(&self) -> mpsc::Sender<TelemetryMessage> {
self.message_sender.clone()
}
/// Run the telemetry worker.
///
/// This should be run in a background task.
pub async fn run(self) {
let Self {
mut message_receiver,
message_sender: _,
mut register_receiver,
register_sender: _,
transport,
} = self;
let mut node_map: HashMap<Id, Vec<(u8, Multiaddr)>> = HashMap::new();
pub async fn run(mut self) {
let mut node_map: HashMap<Id, Vec<(VerbosityLevel, Multiaddr)>> = HashMap::new();
let mut node_pool: HashMap<Multiaddr, _> = HashMap::new();
let mut pending_connection_notifications: Vec<_> = Vec::new();
loop {
futures::select! {
message = message_receiver.next() => Self::process_message(
message = self.message_receiver.next() => Self::process_message(
message,
&mut node_pool,
&node_map,
).await,
init_payload = register_receiver.next() => Self::process_register(
init_payload = self.register_receiver.next() => Self::process_register(
init_payload,
&mut node_pool,
&mut node_map,
transport.clone(),
&mut pending_connection_notifications,
self.transport.clone(),
).await,
}
}
@@ -195,7 +194,8 @@ impl TelemetryWorker {
async fn process_register(
input: Option<Register>,
node_pool: &mut HashMap<Multiaddr, Node<WsTrans>>,
node_map: &mut HashMap<Id, Vec<(u8, Multiaddr)>>,
node_map: &mut HashMap<Id, Vec<(VerbosityLevel, Multiaddr)>>,
pending_connection_notifications: &mut Vec<(Multiaddr, ConnectionNotifierSender)>,
transport: WsTrans,
) {
let input = input.expect("the stream is never closed; qed");
@@ -212,7 +212,7 @@ impl TelemetryWorker {
Ok(serde_json::Value::Object(mut value)) => {
value.insert("msg".into(), "system.connected".into());
let mut obj = serde_json::Map::new();
obj.insert("id".to_string(), id.into_u64().into());
obj.insert("id".to_string(), id.into());
obj.insert("payload".to_string(), value.into());
Some(obj)
}
@@ -245,6 +245,16 @@ impl TelemetryWorker {
});
node.connection_messages.extend(connection_message.clone());
pending_connection_notifications.retain(|(addr_b, connection_message)| {
if *addr_b == addr {
node.telemetry_connection_notifier
.push(connection_message.clone());
false
} else {
true
}
});
}
}
Register::Notifier {
@@ -252,15 +262,15 @@ impl TelemetryWorker {
connection_notifier,
} => {
for addr in addresses {
// If the Node has been initialized, we directly push the connection_notifier.
// Otherwise we push it to a queue that will be consumed when the connection
// initializes, thus ensuring that the connection notifier will be sent to the
// Node when it becomes available.
if let Some(node) = node_pool.get_mut(&addr) {
node.telemetry_connection_notifier
.push(connection_notifier.clone());
} else {
log::error!(
target: "telemetry",
"Received connection notifier for unknown node ({}). This is a bug.",
addr,
);
pending_connection_notifications.push((addr, connection_notifier.clone()));
}
}
}
@@ -271,21 +281,31 @@ impl TelemetryWorker {
async fn process_message(
input: Option<TelemetryMessage>,
node_pool: &mut HashMap<Multiaddr, Node<WsTrans>>,
node_map: &HashMap<Id, Vec<(u8, Multiaddr)>>,
node_map: &HashMap<Id, Vec<(VerbosityLevel, Multiaddr)>>,
) {
let (id, verbosity, message) = input.expect("the stream is never closed; qed");
let (id, verbosity, payload) = input.expect("the stream is never closed; qed");
let ts = chrono::Local::now().to_rfc3339().to_string();
let mut message = serde_json::Map::new();
message.insert("id".into(), id.into());
message.insert("ts".into(), ts.into());
message.insert("payload".into(), payload.into());
let nodes = if let Some(nodes) = node_map.get(&id) {
nodes
} else {
// This is a normal error because the telemetry span is entered before the telemetry
// is initialized so it is possible that some messages in the beginning don't get
// through.
// This is a normal error because the telemetry ID exists before the telemetry is
// initialized.
log::trace!(
target: "telemetry",
"Received telemetry log for unknown id ({:?}): {}",
id,
message,
serde_json::to_string(&message)
.unwrap_or_else(|err| format!(
"could not be serialized ({}): {:?}",
err,
message,
)),
);
return;
};
@@ -304,12 +324,17 @@ impl TelemetryWorker {
if let Some(node) = node_pool.get_mut(&addr) {
let _ = node.send(message.clone()).await;
} else {
log::error!(
log::debug!(
target: "telemetry",
"Received message for unknown node ({}). This is a bug. \
Message sent: {}",
addr,
message,
serde_json::to_string(&message)
.unwrap_or_else(|err| format!(
"could not be serialized ({}): {:?}",
err,
message,
)),
);
}
}
@@ -318,11 +343,41 @@ impl TelemetryWorker {
/// Handle to the [`TelemetryWorker`] thats allows initializing the telemetry for a Substrate node.
#[derive(Debug, Clone)]
pub struct TelemetryHandle {
message_sender: mpsc::UnboundedSender<Register>,
pub struct TelemetryWorkerHandle {
message_sender: mpsc::Sender<TelemetryMessage>,
register_sender: mpsc::UnboundedSender<Register>,
id_counter: Arc<atomic::AtomicU64>,
}
impl TelemetryHandle {
impl TelemetryWorkerHandle {
/// Instantiate a new [`Telemetry`] object.
pub fn new_telemetry(&mut self, endpoints: TelemetryEndpoints) -> Telemetry {
let addresses = endpoints.0.iter().map(|(addr, _)| addr.clone()).collect();
Telemetry {
message_sender: self.message_sender.clone(),
register_sender: self.register_sender.clone(),
id: self.id_counter.fetch_add(1, atomic::Ordering::Relaxed),
connection_notifier: TelemetryConnectionNotifier {
register_sender: self.register_sender.clone(),
addresses,
},
endpoints: Some(endpoints),
}
}
}
/// A telemetry instance that can be used to send telemetry messages.
#[derive(Debug)]
pub struct Telemetry {
message_sender: mpsc::Sender<TelemetryMessage>,
register_sender: mpsc::UnboundedSender<Register>,
id: Id,
connection_notifier: TelemetryConnectionNotifier,
endpoints: Option<TelemetryEndpoints>,
}
impl Telemetry {
/// Initialize the telemetry with the endpoints provided in argument for the current substrate
/// node.
///
@@ -333,42 +388,67 @@ impl TelemetryHandle {
///
/// The `connection_message` argument is a JSON object that is sent every time the connection
/// (re-)establishes.
pub fn start_telemetry(
&mut self,
span: TelemetrySpan,
endpoints: TelemetryEndpoints,
connection_message: ConnectionMessage,
) -> TelemetryConnectionNotifier {
let Self { message_sender } = self;
let connection_notifier = TelemetryConnectionNotifier {
message_sender: message_sender.clone(),
addresses: endpoints.0.iter().map(|(addr, _)| addr.clone()).collect(),
pub fn start_telemetry(&mut self, connection_message: ConnectionMessage) -> Result<()> {
let endpoints = match self.endpoints.take() {
Some(x) => x,
None => return Err(Error::TelemetryAlreadyInitialized),
};
match span.0.id() {
Some(id) => {
match message_sender.unbounded_send(Register::Telemetry {
id,
endpoints,
connection_message,
}) {
Ok(()) => {}
Err(err) => error!(
target: "telemetry",
"Could not initialize telemetry: \
the telemetry is probably already running: {}",
err,
),
}
}
None => error!(
self.register_sender
.unbounded_send(Register::Telemetry {
id: self.id,
endpoints,
connection_message,
})
.map_err(|_| Error::TelemetryWorkerDropped)
}
/// Make a new clonable handle to this [`Telemetry`]. This is used for reporting telemetries.
pub fn handle(&self) -> TelemetryHandle {
TelemetryHandle {
message_sender: Arc::new(Mutex::new(self.message_sender.clone())),
id: self.id,
connection_notifier: self.connection_notifier.clone(),
}
}
}
/// Handle to a [`Telemetry`].
///
/// Used to report telemetry messages.
#[derive(Debug, Clone)]
pub struct TelemetryHandle {
message_sender: Arc<Mutex<mpsc::Sender<TelemetryMessage>>>,
id: Id,
connection_notifier: TelemetryConnectionNotifier,
}
impl TelemetryHandle {
/// Send telemetry messages.
pub fn send_telemetry(&self, verbosity: VerbosityLevel, payload: TelemetryPayload) {
match self
.message_sender
.lock()
.try_send((self.id, verbosity, payload))
{
Ok(()) => {}
Err(err) if err.is_full() => log::trace!(
target: "telemetry",
"Could not initialize telemetry: the span could not be entered",
"Telemetry channel full.",
),
Err(_) => log::trace!(
target: "telemetry",
"Telemetry channel closed.",
),
}
}
connection_notifier
/// Get event stream for telemetry connection established events.
///
/// This function will return an error if the telemetry has already been started by
/// [`Telemetry::start_telemetry`].
pub fn on_connect_stream(&self) -> ConnectionNotifierReceiver {
self.connection_notifier.on_connect_stream()
}
}
@@ -376,18 +456,14 @@ impl TelemetryHandle {
/// (re-)establishes.
#[derive(Clone, Debug)]
pub struct TelemetryConnectionNotifier {
message_sender: mpsc::UnboundedSender<Register>,
register_sender: mpsc::UnboundedSender<Register>,
addresses: Vec<Multiaddr>,
}
impl TelemetryConnectionNotifier {
/// Get event stream for telemetry connection established events.
///
/// This function will return an error if the telemetry has already been started by
/// [`TelemetryHandle::start_telemetry`].
pub fn on_connect_stream(&self) -> TracingUnboundedReceiver<()> {
let (message_sender, message_receiver) = tracing_unbounded("mpsc_telemetry_on_connect");
if let Err(err) = self.message_sender.unbounded_send(Register::Notifier {
fn on_connect_stream(&self) -> ConnectionNotifierReceiver {
let (message_sender, message_receiver) = connection_notifier_channel();
if let Err(err) = self.register_sender.unbounded_send(Register::Notifier {
addresses: self.addresses.clone(),
connection_notifier: message_sender,
}) {
@@ -428,34 +504,34 @@ enum Register {
/// # let authority_id = 42_u64;
/// # let set_id = (43_u64, 44_u64);
/// # let authorities = vec![45_u64];
/// telemetry!(CONSENSUS_INFO; "afg.authority_set";
/// "authority_id" => authority_id.to_string(),
/// "authority_set_id" => ?set_id,
/// "authorities" => authorities,
/// # let telemetry: Option<TelemetryHandle> = None;
/// telemetry!(
/// telemetry; // an `Option<TelemetryHandle>`
/// CONSENSUS_INFO;
/// "afg.authority_set";
/// "authority_id" => authority_id.to_string(),
/// "authority_set_id" => ?set_id,
/// "authorities" => authorities,
/// );
/// ```
#[macro_export(local_inner_macros)]
macro_rules! telemetry {
( $verbosity:expr; $msg:expr; $( $t:tt )* ) => {{
let verbosity: u8 = $verbosity;
match format_fields_to_json!($($t)*) {
Err(err) => {
$crate::tracing::error!(
target: "telemetry",
"Could not serialize value for telemetry: {}",
err,
);
},
Ok(mut json) => {
// NOTE: the span id will be added later in the JSON for the greater good
json.insert("msg".into(), $msg.into());
let serialized_json = $crate::serde_json::to_string(&json)
.expect("contains only string keys; qed");
$crate::tracing::info!(target: $crate::TELEMETRY_LOG_SPAN,
verbosity,
json = serialized_json.as_str(),
);
},
( $telemetry:expr; $verbosity:expr; $msg:expr; $( $t:tt )* ) => {{
if let Some(telemetry) = $telemetry.as_ref() {
let verbosity: $crate::VerbosityLevel = $verbosity;
match format_fields_to_json!($($t)*) {
Err(err) => {
$crate::log::debug!(
target: "telemetry",
"Could not serialize value for telemetry: {}",
err,
);
},
Ok(mut json) => {
json.insert("msg".into(), $msg.into());
telemetry.send_telemetry(verbosity, json);
},
}
}
}};
}