// This file is part of Substrate. // Copyright (C) 2017-2020 Parity Technologies (UK) Ltd. // SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with this program. If not, see . //! Contains the object that makes the telemetry work. //! //! # Usage //! //! - Create a `TelemetryWorker` with `TelemetryWorker::new`. //! - Send messages to the telemetry with `TelemetryWorker::send_message`. Messages will only be //! sent to the appropriate targets. Messages may be ignored if the target happens to be //! temporarily unreachable. //! - You must appropriately poll the worker with `TelemetryWorker::poll`. Polling will/may produce //! events indicating what happened since the latest polling. //! use bytes::BytesMut; use futures::{prelude::*, ready}; use libp2p::{core::transport::OptionalTransport, Multiaddr, Transport, wasm_ext}; use log::{trace, warn, error}; use slog::Drain; use std::{io, pin::Pin, task::Context, task::Poll, time}; mod node; /// Timeout after which a connection attempt is considered failed. Includes the WebSocket HTTP /// upgrading. const CONNECT_TIMEOUT: time::Duration = time::Duration::from_secs(20); /// Event generated when polling the worker. #[derive(Debug)] pub enum TelemetryWorkerEvent { /// We have established a connection to one of the telemetry endpoint, either for the first /// time or after having been disconnected earlier. Connected, } /// Telemetry processing machine. #[derive(Debug)] pub struct TelemetryWorker { /// List of nodes with their maximum verbosity level. nodes: Vec<(node::Node, u8)>, } trait StreamAndSink: Stream + Sink {} impl, I> StreamAndSink for T {} type WsTrans = libp2p::core::transport::boxed::Boxed< Pin, Error = io::Error > + Send>>, io::Error >; impl TelemetryWorker { /// Builds a new `TelemetryWorker`. /// /// The endpoints must be a list of targets, plus a verbosity level. When you send a message /// to the telemetry, only the targets whose verbosity is higher than the verbosity of the /// message will receive it. pub fn new( endpoints: impl IntoIterator, wasm_external_transport: impl Into> ) -> Result { let transport = match wasm_external_transport.into() { Some(t) => OptionalTransport::some(t), None => OptionalTransport::none() }.map((|inner, _| StreamSink::from(inner)) as fn(_, _) -> _); // The main transport is the `wasm_external_transport`, but if we're on desktop we add // support for TCP+WebSocket+DNS as a fallback. In practice, you're not expected to pass // an external transport on desktop and the fallback is used all the time. #[cfg(not(target_os = "unknown"))] let transport = transport.or_transport({ let inner = libp2p::dns::DnsConfig::new(libp2p::tcp::TcpConfig::new())?; libp2p::websocket::framed::WsConfig::new(inner) .and_then(|connec, _| { let connec = connec .with(|item: BytesMut| { let item = libp2p::websocket::framed::OutgoingData::Binary(item); future::ready(Ok::<_, io::Error>(item)) }) .try_filter(|item| future::ready(item.is_data())) .map_ok(|data| BytesMut::from(data.as_ref())); future::ready(Ok::<_, io::Error>(connec)) }) }); let transport = transport .timeout(CONNECT_TIMEOUT) .map_err(|err| io::Error::new(io::ErrorKind::Other, err)) .map(|out, _| { let out = out .map_err(|err| io::Error::new(io::ErrorKind::Other, err)) .sink_map_err(|err| io::Error::new(io::ErrorKind::Other, err)); Box::pin(out) as Pin> }) .boxed(); Ok(TelemetryWorker { nodes: endpoints.into_iter().map(|(addr, verbosity)| { let node = node::Node::new(transport.clone(), addr); (node, verbosity) }).collect() }) } /// Polls the worker for events that happened. pub fn poll(&mut self, cx: &mut Context) -> Poll { for (node, _) in &mut self.nodes { loop { match node::Node::poll(Pin::new(node), cx) { Poll::Ready(node::NodeEvent::Connected) => return Poll::Ready(TelemetryWorkerEvent::Connected), Poll::Ready(node::NodeEvent::Disconnected(_)) => continue, Poll::Pending => break, } } } Poll::Pending } /// Equivalent to `slog::Drain::log`, but takes `self` by `&mut` instead, which is more convenient. /// /// Keep in mind that you should call `TelemetryWorker::poll` in order to process the messages. /// You should call this function right after calling `slog::Drain::log`. pub fn log(&mut self, record: &slog::Record, values: &slog::OwnedKVList) -> Result<(), ()> { let msg_verbosity = match record.tag().parse::() { Ok(v) => v, Err(err) => { warn!(target: "telemetry", "Failed to parse telemetry tag {:?}: {:?}", record.tag(), err); return Err(()) } }; // None of the nodes want that verbosity, so just return without doing any serialization. if self.nodes.iter().all(|(_, node_max_verbosity)| msg_verbosity > *node_max_verbosity) { trace!( target: "telemetry", "Skipping log entry because verbosity {:?} is too high for all endpoints", msg_verbosity ); return Ok(()) } // Turn the message into JSON. let serialized = { let mut out = Vec::new(); slog_json::Json::default(&mut out).log(record, values).map_err(|_| ())?; out }; for (node, node_max_verbosity) in &mut self.nodes { if msg_verbosity > *node_max_verbosity { trace!(target: "telemetry", "Skipping {:?} for log entry with verbosity {:?}", node.addr(), msg_verbosity); continue; } // `send_message` returns an error if we're not connected, which we silently ignore. let _ = node.send_message(&serialized.clone()[..]); } Ok(()) } } /// Wraps around an `AsyncWrite` and implements `Sink`. Guarantees that each item being sent maps /// to one call of `write`. /// /// For some context, we put this object around the `wasm_ext::ExtTransport` in order to make sure /// that each telemetry message maps to one single call to `write` in the WASM FFI. #[pin_project::pin_project] struct StreamSink(#[pin] T, Option); impl From for StreamSink { fn from(inner: T) -> StreamSink { StreamSink(inner, None) } } impl Stream for StreamSink { type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let this = self.project(); let mut buf = [0; 128]; match ready!(AsyncRead::poll_read(this.0, cx, &mut buf)) { Ok(0) => Poll::Ready(None), Ok(n) => { let buf: BytesMut = buf[..n].into(); Poll::Ready(Some(Ok(buf))) }, Err(err) => Poll::Ready(Some(Err(err))), } } } impl StreamSink { fn poll_flush_buffer(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let this = self.project(); if let Some(buffer) = this.1 { if ready!(this.0.poll_write(cx, &buffer[..]))? != buffer.len() { error!(target: "telemetry", "Detected some internal buffering happening in the telemetry"); let err = io::Error::new(io::ErrorKind::Other, "Internal buffering detected"); return Poll::Ready(Err(err)); } } *this.1 = None; Poll::Ready(Ok(())) } } impl Sink for StreamSink { type Error = io::Error; fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { ready!(StreamSink::poll_flush_buffer(self, cx))?; Poll::Ready(Ok(())) } fn start_send(self: Pin<&mut Self>, item: BytesMut) -> Result<(), Self::Error> { let this = self.project(); debug_assert!(this.1.is_none()); *this.1 = Some(item); Ok(()) } fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { ready!(self.as_mut().poll_flush_buffer(cx))?; let this = self.project(); AsyncWrite::poll_flush(this.0, cx) } fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { ready!(self.as_mut().poll_flush_buffer(cx))?; let this = self.project(); AsyncWrite::poll_close(this.0, cx) } }