diff --git a/substrate/client/telemetry/src/lib.rs b/substrate/client/telemetry/src/lib.rs index 06c82d44ab..842d89d7ed 100644 --- a/substrate/client/telemetry/src/lib.rs +++ b/substrate/client/telemetry/src/lib.rs @@ -122,21 +122,11 @@ impl TelemetryWorker { /// /// Only one is needed per process. pub fn new(buffer_size: usize) -> Result { - let transport = initialize_transport(None)?; - let (message_sender, message_receiver) = mpsc::channel(buffer_size); - let (register_sender, register_receiver) = mpsc::unbounded(); - - Ok(Self { - message_receiver, - message_sender, - register_receiver, - register_sender, - id_counter: Arc::new(atomic::AtomicU64::new(1)), - transport, - }) + Self::with_transport(buffer_size, None) } - /// Instantiate a new [`TelemetryWorker`] which can run in background. + /// Instantiate a new [`TelemetryWorker`] with the given [`ExtTransport`] + /// which can run in background. /// /// Only one is needed per process. pub fn with_transport(buffer_size: usize, transport: Option) -> Result { @@ -312,12 +302,6 @@ impl TelemetryWorker { for (node_max_verbosity, addr) in nodes { if verbosity > *node_max_verbosity { - log::trace!( - target: "telemetry", - "Skipping {} for log entry with verbosity {:?}", - addr, - verbosity, - ); continue; } diff --git a/substrate/client/telemetry/src/node.rs b/substrate/client/telemetry/src/node.rs index 2d1a04b00a..9ac7ada4e5 100644 --- a/substrate/client/telemetry/src/node.rs +++ b/substrate/client/telemetry/src/node.rs @@ -73,8 +73,9 @@ enum NodeSocket { impl NodeSocket { fn wait_reconnect() -> NodeSocket { - let random_delay = rand::thread_rng().gen_range(5, 10); + let random_delay = rand::thread_rng().gen_range(10, 20); let delay = Delay::new(Duration::from_secs(random_delay)); + log::trace!(target: "telemetry", "Pausing for {} secs before reconnecting", random_delay); NodeSocket::WaitingReconnect(delay) } } @@ -214,11 +215,11 @@ where }, NodeSocket::ReconnectNow => match self.transport.clone().dial(self.addr.clone()) { Ok(d) => { - log::debug!(target: "telemetry", "Started dialing {}", self.addr); + log::trace!(target: "telemetry", "Re-dialing {}", self.addr); socket = NodeSocket::Dialing(d); } Err(err) => { - log::warn!(target: "telemetry", "❌ Error while dialing {}: {:?}", self.addr, err); + log::warn!(target: "telemetry", "❌ Error while re-dialing {}: {:?}", self.addr, err); socket = NodeSocket::wait_reconnect(); } }, @@ -236,16 +237,18 @@ where } }; - // The Dispatcher blocks when the Node sinks blocks. This is why it is important that the - // Node sinks doesn't go into "Pending" state while waiting for reconnection but rather + // The Dispatcher blocks when the Node syncs blocks. This is why it is important that the + // Node sinks don't go into "Pending" state while waiting for reconnection but rather // discard the excess of telemetry messages. Poll::Ready(Ok(())) } fn start_send(mut self: Pin<&mut Self>, item: TelemetryPayload) -> Result<(), Self::Error> { + // Any buffered outgoing telemetry messages are discarded while (re-)connecting. match &mut self.socket { NodeSocket::Connected(conn) => match serde_json::to_vec(&item) { Ok(data) => { + log::trace!(target: "telemetry", "Sending {} bytes", data.len()); let _ = conn.sink.start_send_unpin(data); } Err(err) => log::debug!( @@ -254,18 +257,14 @@ where err, ), }, - _socket => { - log::trace!( - target: "telemetry", - "Message has been discarded: {}", - serde_json::to_string(&item) - .unwrap_or_else(|err| format!( - "could not be serialized ({}): {:?}", - err, - item, - )), - ); - } + // We are currently dialing the node. + NodeSocket::Dialing(_) => log::trace!(target: "telemetry", "Dialing"), + // A new connection should be started as soon as possible. + NodeSocket::ReconnectNow => log::trace!(target: "telemetry", "Reconnecting"), + // Waiting before attempting to dial again. + NodeSocket::WaitingReconnect(_) => {} + // Temporary transition state. + NodeSocket::Poisoned => log::trace!(target: "telemetry", "Poisoned"), } Ok(()) } @@ -273,7 +272,12 @@ where fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match &mut self.socket { NodeSocket::Connected(conn) => match conn.sink.poll_flush_unpin(cx) { - Poll::Ready(Err(_)) => { + Poll::Ready(Err(e)) => { + // When `telemetry` closes the websocket connection we end + // up here, which is sub-optimal. See + // https://github.com/libp2p/rust-libp2p/issues/2021 for + // what we could do to improve this. + log::trace!(target: "telemetry", "[poll_flush] Error: {:?}", e); self.socket = NodeSocket::wait_reconnect(); Poll::Ready(Ok(())) }