From c4877bc05b241920a27c4b4f62ca8d733d939f2e Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Wed, 19 Jun 2019 15:41:35 +0200 Subject: [PATCH] Remove on_connect from TelemetryConfig (#2888) --- substrate/core/service/src/lib.rs | 13 +++++--- substrate/core/telemetry/src/lib.rs | 48 ++++++++++++++--------------- 2 files changed, 33 insertions(+), 28 deletions(-) diff --git a/substrate/core/service/src/lib.rs b/substrate/core/service/src/lib.rs index 644341f91a..6de8070830 100644 --- a/substrate/core/service/src/lib.rs +++ b/substrate/core/service/src/lib.rs @@ -416,7 +416,12 @@ impl Service { let telemetry = tel::init_telemetry(tel::TelemetryConfig { endpoints, wasm_external_transport: None, - on_connect: Box::new(move || { + }); + let future = telemetry.clone() + .for_each(move |event| { + // Safe-guard in case we add more events in the future. + let tel::TelemetryEvent::Connected = event; + telemetry!(SUBSTRATE_INFO; "system.connected"; "name" => name.clone(), "implementation" => impl_name.clone(), @@ -431,9 +436,9 @@ impl Service { telemetry_connection_sinks_.lock().retain(|sink| { sink.unbounded_send(()).is_ok() }); - }), - }); - task_executor.spawn(telemetry.clone() + Ok(()) + }); + task_executor.spawn(future .select(exit.clone()) .then(|_| Ok(()))); telemetry diff --git a/substrate/core/telemetry/src/lib.rs b/substrate/core/telemetry/src/lib.rs index d17dd43169..b9d9954851 100644 --- a/substrate/core/telemetry/src/lib.rs +++ b/substrate/core/telemetry/src/lib.rs @@ -25,26 +25,30 @@ //! the moment. Substate may eventually be reworked to get proper `slog` support, including sending //! information to the telemetry. //! -//! The `Telemetry` struct implements `Future` and must be polled regularly (or sent to a +//! The [`Telemetry`] struct implements `Stream` and must be polled regularly (or sent to a //! background thread/task) in order for the telemetry to properly function. Dropping the object //! will also deregister the global logger and replace it with a logger that discards messages. +//! The `Stream` generates [`TelemetryEvent`]s. +//! +//! > **Note**: Cloning the [`Telemetry`] and polling from multiple clones has an unspecified behaviour. //! //! # Example //! //! ```no_run +//! use futures::prelude::*; +//! //! let telemetry = substrate_telemetry::init_telemetry(substrate_telemetry::TelemetryConfig { //! endpoints: substrate_telemetry::TelemetryEndpoints::new(vec![ //! // The `0` is the maximum verbosity level of messages to send to this endpoint. //! ("wss://example.com".into(), 0) //! ]), -//! on_connect: Box::new(|| {}), //! // Can be used to pass an external implementation of WebSockets. //! wasm_external_transport: None, //! }); //! -//! // The `telemetry` object implements `Future` and must be processed. +//! // The `telemetry` object implements `Stream` and must be processed. //! std::thread::spawn(move || { -//! tokio::run(telemetry); +//! tokio::run(telemetry.for_each(|_| Ok(()))); //! }); //! //! // Sends a message on the telemetry. @@ -56,7 +60,7 @@ use futures::{prelude::*, task::AtomicTask}; use libp2p::{Multiaddr, wasm_ext}; -use log::{trace, warn}; +use log::warn; use parking_lot::Mutex; use serde::{Serialize, Deserialize}; use std::sync::{Arc, Weak}; @@ -72,12 +76,6 @@ pub struct TelemetryConfig { /// Collection of telemetry WebSocket servers with a corresponding verbosity level. pub endpoints: TelemetryEndpoints, - /// What to do when we connect to a server. - /// - /// This closure is executed each time we connect to a telemetry endpoint, either for the first - /// time or after being disconnected. - pub on_connect: Box, - /// Optional external implementation of a libp2p transport. Used in WASM contexts where we need /// some binding between the networking provided by the operating system or environment and /// libp2p. @@ -133,8 +131,6 @@ pub struct Telemetry { struct TelemetryInner { /// Worker for the telemetry. worker: Mutex, - /// Same field as in the configuration. Called when we connected to an endpoint. - on_connect: Box, /// Task to wake up when we add a log entry to the worker. polling_task: AtomicTask, } @@ -160,7 +156,6 @@ pub fn init_telemetry(config: TelemetryConfig) -> Telemetry { let inner = Arc::new(TelemetryInner { worker: Mutex::new(worker::TelemetryWorker::new(endpoints, config.wasm_external_transport)), - on_connect: config.on_connect, polling_task: AtomicTask::new(), }); @@ -176,11 +171,19 @@ pub fn init_telemetry(config: TelemetryConfig) -> Telemetry { } } -impl Future for Telemetry { - type Item = (); +/// Event generated when polling the worker. +#[derive(Debug)] +pub enum TelemetryEvent { + /// We have established a connection to one of the telemetry endpoint, either for the first + /// time or after having been disconnected earlier. + Connected, +} + +impl Stream for Telemetry { + type Item = TelemetryEvent; type Error = (); - fn poll(&mut self) -> Poll<(), ()> { + fn poll(&mut self) -> Poll, Self::Error> { let before = Instant::now(); let mut has_connected = false; @@ -195,15 +198,12 @@ impl Future for Telemetry { warn!(target: "telemetry", "Polling the telemetry took more than 200ms"); } - // We use an intermediary variable `has_connected` so that the lock is released when we - // call `on_connect`. if has_connected { - trace!(target: "telemetry", "Running on_connect handlers"); - (self.inner.on_connect)(); + Ok(Async::Ready(Some(TelemetryEvent::Connected))) + } else { + self.inner.polling_task.register(); + Ok(Async::NotReady) } - - self.inner.polling_task.register(); - Ok(Async::NotReady) } }