Remove on_connect from TelemetryConfig (#2888)

This commit is contained in:
Pierre Krieger
2019-06-19 15:41:35 +02:00
committed by Bastian Köcher
parent 80a4cd2b0d
commit c4877bc05b
2 changed files with 33 additions and 28 deletions
+9 -4
View File
@@ -416,7 +416,12 @@ impl<Components: components::Components> Service<Components> {
let telemetry = tel::init_telemetry(tel::TelemetryConfig {
endpoints,
wasm_external_transport: None,
on_connect: Box::new(move || {
});
let future = telemetry.clone()
.for_each(move |event| {
// Safe-guard in case we add more events in the future.
let tel::TelemetryEvent::Connected = event;
telemetry!(SUBSTRATE_INFO; "system.connected";
"name" => name.clone(),
"implementation" => impl_name.clone(),
@@ -431,9 +436,9 @@ impl<Components: components::Components> Service<Components> {
telemetry_connection_sinks_.lock().retain(|sink| {
sink.unbounded_send(()).is_ok()
});
}),
});
task_executor.spawn(telemetry.clone()
Ok(())
});
task_executor.spawn(future
.select(exit.clone())
.then(|_| Ok(())));
telemetry
+24 -24
View File
@@ -25,26 +25,30 @@
//! the moment. Substate may eventually be reworked to get proper `slog` support, including sending
//! information to the telemetry.
//!
//! The `Telemetry` struct implements `Future` and must be polled regularly (or sent to a
//! The [`Telemetry`] struct implements `Stream` and must be polled regularly (or sent to a
//! background thread/task) in order for the telemetry to properly function. Dropping the object
//! will also deregister the global logger and replace it with a logger that discards messages.
//! The `Stream` generates [`TelemetryEvent`]s.
//!
//! > **Note**: Cloning the [`Telemetry`] and polling from multiple clones has an unspecified behaviour.
//!
//! # Example
//!
//! ```no_run
//! use futures::prelude::*;
//!
//! let telemetry = substrate_telemetry::init_telemetry(substrate_telemetry::TelemetryConfig {
//! endpoints: substrate_telemetry::TelemetryEndpoints::new(vec![
//! // The `0` is the maximum verbosity level of messages to send to this endpoint.
//! ("wss://example.com".into(), 0)
//! ]),
//! on_connect: Box::new(|| {}),
//! // Can be used to pass an external implementation of WebSockets.
//! wasm_external_transport: None,
//! });
//!
//! // The `telemetry` object implements `Future` and must be processed.
//! // The `telemetry` object implements `Stream` and must be processed.
//! std::thread::spawn(move || {
//! tokio::run(telemetry);
//! tokio::run(telemetry.for_each(|_| Ok(())));
//! });
//!
//! // Sends a message on the telemetry.
@@ -56,7 +60,7 @@
use futures::{prelude::*, task::AtomicTask};
use libp2p::{Multiaddr, wasm_ext};
use log::{trace, warn};
use log::warn;
use parking_lot::Mutex;
use serde::{Serialize, Deserialize};
use std::sync::{Arc, Weak};
@@ -72,12 +76,6 @@ pub struct TelemetryConfig {
/// Collection of telemetry WebSocket servers with a corresponding verbosity level.
pub endpoints: TelemetryEndpoints,
/// What to do when we connect to a server.
///
/// This closure is executed each time we connect to a telemetry endpoint, either for the first
/// time or after being disconnected.
pub on_connect: Box<dyn Fn() + Send + Sync + 'static>,
/// Optional external implementation of a libp2p transport. Used in WASM contexts where we need
/// some binding between the networking provided by the operating system or environment and
/// libp2p.
@@ -133,8 +131,6 @@ pub struct Telemetry {
struct TelemetryInner {
/// Worker for the telemetry.
worker: Mutex<worker::TelemetryWorker>,
/// Same field as in the configuration. Called when we connected to an endpoint.
on_connect: Box<dyn Fn() + Send + Sync + 'static>,
/// Task to wake up when we add a log entry to the worker.
polling_task: AtomicTask,
}
@@ -160,7 +156,6 @@ pub fn init_telemetry(config: TelemetryConfig) -> Telemetry {
let inner = Arc::new(TelemetryInner {
worker: Mutex::new(worker::TelemetryWorker::new(endpoints, config.wasm_external_transport)),
on_connect: config.on_connect,
polling_task: AtomicTask::new(),
});
@@ -176,11 +171,19 @@ pub fn init_telemetry(config: TelemetryConfig) -> Telemetry {
}
}
impl Future for Telemetry {
type Item = ();
/// Event generated when polling the worker.
#[derive(Debug)]
pub enum TelemetryEvent {
/// We have established a connection to one of the telemetry endpoint, either for the first
/// time or after having been disconnected earlier.
Connected,
}
impl Stream for Telemetry {
type Item = TelemetryEvent;
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
let before = Instant::now();
let mut has_connected = false;
@@ -195,15 +198,12 @@ impl Future for Telemetry {
warn!(target: "telemetry", "Polling the telemetry took more than 200ms");
}
// We use an intermediary variable `has_connected` so that the lock is released when we
// call `on_connect`.
if has_connected {
trace!(target: "telemetry", "Running on_connect handlers");
(self.inner.on_connect)();
Ok(Async::Ready(Some(TelemetryEvent::Connected)))
} else {
self.inner.polling_task.register();
Ok(Async::NotReady)
}
self.inner.polling_task.register();
Ok(Async::NotReady)
}
}