mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-13 00:01:09 +00:00
Telemetry per node (#7463)
This commit is contained in:
@@ -1,155 +0,0 @@
|
||||
//! # Internal types to ssync drain slog
|
||||
//! FIXME: REMOVE THIS ONCE THE PR WAS MERGE
|
||||
//! <https://github.com/slog-rs/async/pull/14>
|
||||
|
||||
use slog::{Record, RecordStatic, Level, SingleKV, KV, BorrowedKV};
|
||||
use slog::{Serializer, OwnedKVList, Key};
|
||||
|
||||
use std::fmt;
|
||||
use take_mut::take;
|
||||
|
||||
struct ToSendSerializer {
|
||||
kv: Box<dyn KV + Send>,
|
||||
}
|
||||
|
||||
impl ToSendSerializer {
|
||||
fn new() -> Self {
|
||||
ToSendSerializer { kv: Box::new(()) }
|
||||
}
|
||||
|
||||
fn finish(self) -> Box<dyn KV + Send> {
|
||||
self.kv
|
||||
}
|
||||
}
|
||||
|
||||
impl Serializer for ToSendSerializer {
|
||||
fn emit_bool(&mut self, key: Key, val: bool) -> slog::Result {
|
||||
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
|
||||
Ok(())
|
||||
}
|
||||
fn emit_unit(&mut self, key: Key) -> slog::Result {
|
||||
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, ()))));
|
||||
Ok(())
|
||||
}
|
||||
fn emit_none(&mut self, key: Key) -> slog::Result {
|
||||
let val: Option<()> = None;
|
||||
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
|
||||
Ok(())
|
||||
}
|
||||
fn emit_char(&mut self, key: Key, val: char) -> slog::Result {
|
||||
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
|
||||
Ok(())
|
||||
}
|
||||
fn emit_u8(&mut self, key: Key, val: u8) -> slog::Result {
|
||||
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
|
||||
Ok(())
|
||||
}
|
||||
fn emit_i8(&mut self, key: Key, val: i8) -> slog::Result {
|
||||
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
|
||||
Ok(())
|
||||
}
|
||||
fn emit_u16(&mut self, key: Key, val: u16) -> slog::Result {
|
||||
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
|
||||
Ok(())
|
||||
}
|
||||
fn emit_i16(&mut self, key: Key, val: i16) -> slog::Result {
|
||||
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
|
||||
Ok(())
|
||||
}
|
||||
fn emit_u32(&mut self, key: Key, val: u32) -> slog::Result {
|
||||
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
|
||||
Ok(())
|
||||
}
|
||||
fn emit_i32(&mut self, key: Key, val: i32) -> slog::Result {
|
||||
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
|
||||
Ok(())
|
||||
}
|
||||
fn emit_f32(&mut self, key: Key, val: f32) -> slog::Result {
|
||||
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
|
||||
Ok(())
|
||||
}
|
||||
fn emit_u64(&mut self, key: Key, val: u64) -> slog::Result {
|
||||
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
|
||||
Ok(())
|
||||
}
|
||||
fn emit_i64(&mut self, key: Key, val: i64) -> slog::Result {
|
||||
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
|
||||
Ok(())
|
||||
}
|
||||
fn emit_f64(&mut self, key: Key, val: f64) -> slog::Result {
|
||||
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
|
||||
Ok(())
|
||||
}
|
||||
fn emit_usize(&mut self, key: Key, val: usize) -> slog::Result {
|
||||
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
|
||||
Ok(())
|
||||
}
|
||||
fn emit_isize(&mut self, key: Key, val: isize) -> slog::Result {
|
||||
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
|
||||
Ok(())
|
||||
}
|
||||
fn emit_str(&mut self, key: Key, val: &str) -> slog::Result {
|
||||
let val = val.to_owned();
|
||||
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
|
||||
Ok(())
|
||||
}
|
||||
fn emit_arguments(
|
||||
&mut self,
|
||||
key: Key,
|
||||
val: &fmt::Arguments,
|
||||
) -> slog::Result {
|
||||
let val = fmt::format(*val);
|
||||
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn emit_serde(&mut self, key: Key, value: &dyn slog::SerdeValue) -> slog::Result {
|
||||
let val = value.to_sendable();
|
||||
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct AsyncRecord {
|
||||
msg: String,
|
||||
level: Level,
|
||||
location: Box<slog::RecordLocation>,
|
||||
tag: String,
|
||||
logger_values: OwnedKVList,
|
||||
kv: Box<dyn KV + Send>,
|
||||
}
|
||||
|
||||
impl AsyncRecord {
|
||||
/// Serializes a `Record` and an `OwnedKVList`.
|
||||
pub fn from(record: &Record, logger_values: &OwnedKVList) -> Self {
|
||||
let mut ser = ToSendSerializer::new();
|
||||
record
|
||||
.kv()
|
||||
.serialize(record, &mut ser)
|
||||
.expect("`ToSendSerializer` can't fail");
|
||||
|
||||
AsyncRecord {
|
||||
msg: fmt::format(*record.msg()),
|
||||
level: record.level(),
|
||||
location: Box::new(*record.location()),
|
||||
tag: String::from(record.tag()),
|
||||
logger_values: logger_values.clone(),
|
||||
kv: ser.finish(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Deconstruct this `AsyncRecord` into a record and `OwnedKVList`.
|
||||
pub fn as_record_values(&self, mut f: impl FnMut(&Record, &OwnedKVList)) {
|
||||
let rs = RecordStatic {
|
||||
location: &*self.location,
|
||||
level: self.level,
|
||||
tag: &self.tag,
|
||||
};
|
||||
|
||||
f(&Record::new(
|
||||
&rs,
|
||||
&format_args!("{}", self.msg),
|
||||
BorrowedKV(&self.kv),
|
||||
), &self.logger_values)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,125 @@
|
||||
// This file is part of Substrate.
|
||||
|
||||
// Copyright (C) 2021 Parity Technologies (UK) Ltd.
|
||||
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
|
||||
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
use libp2p::Multiaddr;
|
||||
use serde::{Deserialize, Deserializer, Serialize};
|
||||
|
||||
/// List of telemetry servers we want to talk to. Contains the URL of the server, and the
|
||||
/// maximum verbosity level.
|
||||
///
|
||||
/// The URL string can be either a URL or a multiaddress.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
|
||||
pub struct TelemetryEndpoints(
|
||||
#[serde(deserialize_with = "url_or_multiaddr_deser")] pub(crate) Vec<(Multiaddr, u8)>,
|
||||
);
|
||||
|
||||
/// Custom deserializer for TelemetryEndpoints, used to convert urls or multiaddr to multiaddr.
|
||||
fn url_or_multiaddr_deser<'de, D>(deserializer: D) -> Result<Vec<(Multiaddr, u8)>, D::Error>
|
||||
where
|
||||
D: Deserializer<'de>,
|
||||
{
|
||||
Vec::<(String, u8)>::deserialize(deserializer)?
|
||||
.iter()
|
||||
.map(|e| {
|
||||
url_to_multiaddr(&e.0)
|
||||
.map_err(serde::de::Error::custom)
|
||||
.map(|m| (m, e.1))
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
impl TelemetryEndpoints {
|
||||
/// Create a `TelemetryEndpoints` based on a list of `(String, u8)`.
|
||||
pub fn new(endpoints: Vec<(String, u8)>) -> Result<Self, libp2p::multiaddr::Error> {
|
||||
let endpoints: Result<Vec<(Multiaddr, u8)>, libp2p::multiaddr::Error> = endpoints
|
||||
.iter()
|
||||
.map(|e| Ok((url_to_multiaddr(&e.0)?, e.1)))
|
||||
.collect();
|
||||
endpoints.map(Self)
|
||||
}
|
||||
}
|
||||
|
||||
impl TelemetryEndpoints {
|
||||
/// Return `true` if there are no telemetry endpoints, `false` otherwise.
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.0.is_empty()
|
||||
}
|
||||
}
|
||||
|
||||
/// Parses a WebSocket URL into a libp2p `Multiaddr`.
|
||||
fn url_to_multiaddr(url: &str) -> Result<Multiaddr, libp2p::multiaddr::Error> {
|
||||
// First, assume that we have a `Multiaddr`.
|
||||
let parse_error = match url.parse() {
|
||||
Ok(ma) => return Ok(ma),
|
||||
Err(err) => err,
|
||||
};
|
||||
|
||||
// If not, try the `ws://path/url` format.
|
||||
if let Ok(ma) = libp2p::multiaddr::from_url(url) {
|
||||
return Ok(ma);
|
||||
}
|
||||
|
||||
// If we have no clue about the format of that string, assume that we were expecting a
|
||||
// `Multiaddr`.
|
||||
Err(parse_error)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::url_to_multiaddr;
|
||||
use super::TelemetryEndpoints;
|
||||
use libp2p::Multiaddr;
|
||||
|
||||
#[test]
|
||||
fn valid_endpoints() {
|
||||
let endp = vec![
|
||||
("wss://telemetry.polkadot.io/submit/".into(), 3),
|
||||
("/ip4/80.123.90.4/tcp/5432".into(), 4),
|
||||
];
|
||||
let telem =
|
||||
TelemetryEndpoints::new(endp.clone()).expect("Telemetry endpoint should be valid");
|
||||
let mut res: Vec<(Multiaddr, u8)> = vec![];
|
||||
for (a, b) in endp.iter() {
|
||||
res.push((
|
||||
url_to_multiaddr(a).expect("provided url should be valid"),
|
||||
*b,
|
||||
))
|
||||
}
|
||||
assert_eq!(telem.0, res);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn invalid_endpoints() {
|
||||
let endp = vec![
|
||||
("/ip4/...80.123.90.4/tcp/5432".into(), 3),
|
||||
("/ip4/no:!?;rlkqre;;::::///tcp/5432".into(), 4),
|
||||
];
|
||||
let telem = TelemetryEndpoints::new(endp);
|
||||
assert!(telem.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn valid_and_invalid_endpoints() {
|
||||
let endp = vec![
|
||||
("/ip4/80.123.90.4/tcp/5432".into(), 3),
|
||||
("/ip4/no:!?;rlkqre;;::::///tcp/5432".into(), 4),
|
||||
];
|
||||
let telem = TelemetryEndpoints::new(endp);
|
||||
assert!(telem.is_err());
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,149 @@
|
||||
// This file is part of Substrate.
|
||||
|
||||
// Copyright (C) 2021 Parity Technologies (UK) Ltd.
|
||||
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
|
||||
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
use crate::{initialize_transport, TelemetryWorker};
|
||||
use futures::channel::mpsc;
|
||||
use libp2p::wasm_ext::ExtTransport;
|
||||
use parking_lot::Mutex;
|
||||
use std::convert::TryInto;
|
||||
use std::io;
|
||||
use tracing::{Event, Id, Subscriber};
|
||||
use tracing_subscriber::{layer::Context, registry::LookupSpan, Layer};
|
||||
|
||||
/// Span name used to report the telemetry.
|
||||
pub const TELEMETRY_LOG_SPAN: &str = "telemetry-logger";
|
||||
|
||||
/// `Layer` that handles the logs for telemetries.
|
||||
#[derive(Debug)]
|
||||
pub struct TelemetryLayer(Mutex<mpsc::Sender<(Id, u8, String)>>);
|
||||
|
||||
impl TelemetryLayer {
|
||||
/// Create a new [`TelemetryLayer`] and [`TelemetryWorker`].
|
||||
///
|
||||
/// If not provided, the `buffer_size` will be 16 by default.
|
||||
///
|
||||
/// The [`ExtTransport`] is used in WASM contexts where we need some binding between the
|
||||
/// networking provided by the operating system or environment and libp2p.
|
||||
///
|
||||
/// > **Important**: Each individual call to `write` corresponds to one message. There is no
|
||||
/// > internal buffering going on. In the context of WebSockets, each `write`
|
||||
/// > must be one individual WebSockets frame.
|
||||
pub fn new(
|
||||
buffer_size: Option<usize>,
|
||||
telemetry_external_transport: Option<ExtTransport>,
|
||||
) -> io::Result<(Self, TelemetryWorker)> {
|
||||
let transport = initialize_transport(telemetry_external_transport)?;
|
||||
let worker = TelemetryWorker::new(buffer_size.unwrap_or(16), transport);
|
||||
let sender = worker.message_sender();
|
||||
Ok((Self(Mutex::new(sender)), worker))
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> Layer<S> for TelemetryLayer
|
||||
where
|
||||
S: Subscriber + for<'a> LookupSpan<'a>,
|
||||
{
|
||||
fn on_event(&self, event: &Event<'_>, ctx: Context<S>) {
|
||||
if event.metadata().target() != TELEMETRY_LOG_SPAN {
|
||||
return;
|
||||
}
|
||||
|
||||
if let Some(span) = ctx.lookup_current() {
|
||||
let parents = span.parents();
|
||||
|
||||
if let Some(span) = std::iter::once(span)
|
||||
.chain(parents)
|
||||
.find(|x| x.name() == TELEMETRY_LOG_SPAN)
|
||||
{
|
||||
let id = span.id();
|
||||
let mut attrs = TelemetryAttrs::new(id.clone());
|
||||
let mut vis = TelemetryAttrsVisitor(&mut attrs);
|
||||
event.record(&mut vis);
|
||||
|
||||
if let TelemetryAttrs {
|
||||
verbosity: Some(verbosity),
|
||||
json: Some(json),
|
||||
..
|
||||
} = attrs
|
||||
{
|
||||
match self.0.lock().try_send((
|
||||
id,
|
||||
verbosity
|
||||
.try_into()
|
||||
.expect("telemetry log message verbosity are u8; qed"),
|
||||
json,
|
||||
)) {
|
||||
Err(err) if err.is_full() => eprintln!("Telemetry buffer overflowed!"),
|
||||
_ => {}
|
||||
}
|
||||
} else {
|
||||
// NOTE: logging in this function doesn't work
|
||||
eprintln!(
|
||||
"missing fields in telemetry log: {:?}. This can happen if \
|
||||
`tracing::info_span!` is (mis-)used with the telemetry target \
|
||||
directly; you should use the `telemetry!` macro.",
|
||||
event,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct TelemetryAttrs {
|
||||
verbosity: Option<u64>,
|
||||
json: Option<String>,
|
||||
id: Id,
|
||||
}
|
||||
|
||||
impl TelemetryAttrs {
|
||||
fn new(id: Id) -> Self {
|
||||
Self {
|
||||
verbosity: None,
|
||||
json: None,
|
||||
id,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct TelemetryAttrsVisitor<'a>(&'a mut TelemetryAttrs);
|
||||
|
||||
impl<'a> tracing::field::Visit for TelemetryAttrsVisitor<'a> {
|
||||
fn record_debug(&mut self, _field: &tracing::field::Field, _value: &dyn std::fmt::Debug) {
|
||||
// noop
|
||||
}
|
||||
|
||||
fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
|
||||
if field.name() == "verbosity" {
|
||||
(*self.0).verbosity = Some(value);
|
||||
}
|
||||
}
|
||||
|
||||
fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
|
||||
if field.name() == "json" {
|
||||
(*self.0).json = Some(format!(
|
||||
r#"{{"id":{},"ts":{:?},"payload":{}}}"#,
|
||||
self.0.id.into_u64(),
|
||||
chrono::Local::now().to_rfc3339().to_string(),
|
||||
value,
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -16,339 +16,472 @@
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
//! Telemetry utilities.
|
||||
//! Substrate's client telemetry is a part of substrate that allows logging telemetry information
|
||||
//! with a [Polkadot telemetry](https://github.com/paritytech/substrate-telemetry).
|
||||
//!
|
||||
//! Calling `init_telemetry` registers a global `slog` logger using `slog_scope::set_global_logger`.
|
||||
//! After that, calling `slog_scope::with_logger` will return a logger that sends information to
|
||||
//! the telemetry endpoints. The `telemetry!` macro is a short-cut for calling
|
||||
//! `slog_scope::with_logger` followed with `slog_log!`.
|
||||
//! It works using Tokio's [tracing](https://github.com/tokio-rs/tracing/). The telemetry
|
||||
//! information uses tracing's logging to report the telemetry which is then retrieved by a
|
||||
//! tracing's `Layer`. This layer will then send the data through an asynchronous channel and to a
|
||||
//! background task called [`TelemetryWorker`] which will send the information to the telemetry
|
||||
//! server.
|
||||
//!
|
||||
//! Note that you are supposed to only ever use `telemetry!` and not `slog_scope::with_logger` at
|
||||
//! the moment. Substrate may eventually be reworked to get proper `slog` support, including sending
|
||||
//! information to the telemetry.
|
||||
//!
|
||||
//! The [`Telemetry`] struct implements `Stream` and must be polled regularly (or sent to a
|
||||
//! background thread/task) in order for the telemetry to properly function. Dropping the object
|
||||
//! will also deregister the global logger and replace it with a logger that discards messages.
|
||||
//! The `Stream` generates [`TelemetryEvent`]s.
|
||||
//!
|
||||
//! > **Note**: Cloning the [`Telemetry`] and polling from multiple clones has an unspecified behaviour.
|
||||
//!
|
||||
//! # Example
|
||||
//!
|
||||
//! ```no_run
|
||||
//! use futures::prelude::*;
|
||||
//!
|
||||
//! let telemetry = sc_telemetry::init_telemetry(sc_telemetry::TelemetryConfig {
|
||||
//! endpoints: sc_telemetry::TelemetryEndpoints::new(vec![
|
||||
//! // The `0` is the maximum verbosity level of messages to send to this endpoint.
|
||||
//! ("wss://example.com".into(), 0)
|
||||
//! ]).expect("Invalid URL or multiaddr provided"),
|
||||
//! // Can be used to pass an external implementation of WebSockets.
|
||||
//! wasm_external_transport: None,
|
||||
//! });
|
||||
//!
|
||||
//! // The `telemetry` object implements `Stream` and must be processed.
|
||||
//! std::thread::spawn(move || {
|
||||
//! futures::executor::block_on(telemetry.for_each(|_| future::ready(())));
|
||||
//! });
|
||||
//!
|
||||
//! // Sends a message on the telemetry.
|
||||
//! sc_telemetry::telemetry!(sc_telemetry::SUBSTRATE_INFO; "test";
|
||||
//! "foo" => "bar",
|
||||
//! )
|
||||
//! ```
|
||||
//! If multiple substrate nodes are running, it uses a tracing's `Span` to identify which substrate
|
||||
//! node is reporting the telemetry. Every task spawned using sc-service's `TaskManager`
|
||||
//! automatically inherit this span.
|
||||
//!
|
||||
//! Substrate's nodes initialize/register to the [`TelemetryWorker`] using a [`TelemetryHandle`].
|
||||
//! This handle can be cloned and passed around. It uses an asynchronous channel to communicate with
|
||||
//! the running [`TelemetryWorker`] dedicated to registration. Registering a telemetry can happen at
|
||||
//! any point in time during the execution.
|
||||
|
||||
use futures::{prelude::*, channel::mpsc};
|
||||
use libp2p::{Multiaddr, wasm_ext};
|
||||
#![warn(missing_docs)]
|
||||
|
||||
use futures::{channel::mpsc, prelude::*};
|
||||
use libp2p::Multiaddr;
|
||||
use log::{error, warn};
|
||||
use parking_lot::Mutex;
|
||||
use serde::{Serialize, Deserialize, Deserializer};
|
||||
use std::{pin::Pin, sync::Arc, task::{Context, Poll}, time::Duration};
|
||||
use wasm_timer::Instant;
|
||||
use serde::Serialize;
|
||||
use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver};
|
||||
use std::collections::HashMap;
|
||||
use tracing::Id;
|
||||
|
||||
pub use libp2p::wasm_ext::ExtTransport;
|
||||
pub use slog_scope::with_logger;
|
||||
pub use slog;
|
||||
pub use serde_json;
|
||||
pub use tracing;
|
||||
|
||||
mod async_record;
|
||||
mod worker;
|
||||
mod endpoints;
|
||||
mod layer;
|
||||
mod node;
|
||||
mod transport;
|
||||
|
||||
/// Configuration for telemetry.
|
||||
pub struct TelemetryConfig {
|
||||
/// Collection of telemetry WebSocket servers with a corresponding verbosity level.
|
||||
pub endpoints: TelemetryEndpoints,
|
||||
pub use endpoints::*;
|
||||
pub use layer::*;
|
||||
use node::*;
|
||||
use transport::*;
|
||||
|
||||
/// Optional external implementation of a libp2p transport. Used in WASM contexts where we need
|
||||
/// some binding between the networking provided by the operating system or environment and
|
||||
/// libp2p.
|
||||
///
|
||||
/// This parameter exists whatever the target platform is, but it is expected to be set to
|
||||
/// `Some` only when compiling for WASM.
|
||||
///
|
||||
/// > **Important**: Each individual call to `write` corresponds to one message. There is no
|
||||
/// > internal buffering going on. In the context of WebSockets, each `write`
|
||||
/// > must be one individual WebSockets frame.
|
||||
pub wasm_external_transport: Option<wasm_ext::ExtTransport>,
|
||||
/// Substrate DEBUG log level.
|
||||
pub const SUBSTRATE_DEBUG: u8 = 9;
|
||||
/// Substrate INFO log level.
|
||||
pub const SUBSTRATE_INFO: u8 = 0;
|
||||
|
||||
/// Consensus TRACE log level.
|
||||
pub const CONSENSUS_TRACE: u8 = 9;
|
||||
/// Consensus DEBUG log level.
|
||||
pub const CONSENSUS_DEBUG: u8 = 5;
|
||||
/// Consensus WARN log level.
|
||||
pub const CONSENSUS_WARN: u8 = 4;
|
||||
/// Consensus INFO log level.
|
||||
pub const CONSENSUS_INFO: u8 = 1;
|
||||
|
||||
pub(crate) type TelemetryMessage = (Id, u8, String);
|
||||
|
||||
/// A handle representing a telemetry span, with the capability to enter the span if it exists.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct TelemetrySpan(tracing::Span);
|
||||
|
||||
impl TelemetrySpan {
|
||||
/// Enters this span, returning a guard that will exit the span when dropped.
|
||||
pub fn enter(&self) -> tracing::span::Entered {
|
||||
self.0.enter()
|
||||
}
|
||||
|
||||
/// Constructs a new [`TelemetrySpan`].
|
||||
pub fn new() -> Self {
|
||||
Self(tracing::info_span!(TELEMETRY_LOG_SPAN))
|
||||
}
|
||||
}
|
||||
|
||||
/// List of telemetry servers we want to talk to. Contains the URL of the server, and the
|
||||
/// maximum verbosity level.
|
||||
/// Message sent when the connection (re-)establishes.
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct ConnectionMessage {
|
||||
/// Node's name.
|
||||
pub name: String,
|
||||
/// Node's implementation.
|
||||
pub implementation: String,
|
||||
/// Node's version.
|
||||
pub version: String,
|
||||
/// Node's configuration.
|
||||
pub config: String,
|
||||
/// Node's chain.
|
||||
pub chain: String,
|
||||
/// Node's genesis hash.
|
||||
pub genesis_hash: String,
|
||||
/// Node is an authority.
|
||||
pub authority: bool,
|
||||
/// Node's startup time.
|
||||
pub startup_time: String,
|
||||
/// Node's network ID.
|
||||
pub network_id: String,
|
||||
}
|
||||
|
||||
/// Telemetry worker.
|
||||
///
|
||||
/// The URL string can be either a URL or a multiaddress.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
pub struct TelemetryEndpoints(
|
||||
#[serde(deserialize_with = "url_or_multiaddr_deser")]
|
||||
Vec<(Multiaddr, u8)>
|
||||
);
|
||||
|
||||
/// Custom deserializer for TelemetryEndpoints, used to convert urls or multiaddr to multiaddr.
|
||||
fn url_or_multiaddr_deser<'de, D>(deserializer: D) -> Result<Vec<(Multiaddr, u8)>, D::Error>
|
||||
where D: Deserializer<'de>
|
||||
{
|
||||
Vec::<(String, u8)>::deserialize(deserializer)?
|
||||
.iter()
|
||||
.map(|e| Ok((url_to_multiaddr(&e.0)
|
||||
.map_err(serde::de::Error::custom)?, e.1)))
|
||||
.collect()
|
||||
}
|
||||
|
||||
impl TelemetryEndpoints {
|
||||
pub fn new(endpoints: Vec<(String, u8)>) -> Result<Self, libp2p::multiaddr::Error> {
|
||||
let endpoints: Result<Vec<(Multiaddr, u8)>, libp2p::multiaddr::Error> = endpoints.iter()
|
||||
.map(|e| Ok((url_to_multiaddr(&e.0)?, e.1)))
|
||||
.collect();
|
||||
endpoints.map(Self)
|
||||
}
|
||||
}
|
||||
|
||||
impl TelemetryEndpoints {
|
||||
/// Return `true` if there are no telemetry endpoints, `false` otherwise.
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.0.is_empty()
|
||||
}
|
||||
}
|
||||
|
||||
/// Parses a WebSocket URL into a libp2p `Multiaddr`.
|
||||
fn url_to_multiaddr(url: &str) -> Result<Multiaddr, libp2p::multiaddr::Error> {
|
||||
// First, assume that we have a `Multiaddr`.
|
||||
let parse_error = match url.parse() {
|
||||
Ok(ma) => return Ok(ma),
|
||||
Err(err) => err,
|
||||
};
|
||||
|
||||
// If not, try the `ws://path/url` format.
|
||||
if let Ok(ma) = libp2p::multiaddr::from_url(url) {
|
||||
return Ok(ma)
|
||||
}
|
||||
|
||||
// If we have no clue about the format of that string, assume that we were expecting a
|
||||
// `Multiaddr`.
|
||||
Err(parse_error)
|
||||
}
|
||||
|
||||
/// Log levels.
|
||||
pub const SUBSTRATE_DEBUG: &str = "9";
|
||||
pub const SUBSTRATE_INFO: &str = "0";
|
||||
|
||||
pub const CONSENSUS_TRACE: &str = "9";
|
||||
pub const CONSENSUS_DEBUG: &str = "5";
|
||||
pub const CONSENSUS_WARN: &str = "4";
|
||||
pub const CONSENSUS_INFO: &str = "1";
|
||||
|
||||
/// Telemetry object. Implements `Future` and must be polled regularly.
|
||||
/// Contains an `Arc` and can be cloned and pass around. Only one clone needs to be polled
|
||||
/// regularly and should be polled regularly.
|
||||
/// Dropping all the clones unregisters the telemetry.
|
||||
#[derive(Clone)]
|
||||
pub struct Telemetry {
|
||||
inner: Arc<Mutex<TelemetryInner>>,
|
||||
/// Slog guard so that we don't get deregistered.
|
||||
_guard: Arc<slog_scope::GlobalLoggerGuard>,
|
||||
}
|
||||
|
||||
/// Behind the `Mutex` in `Telemetry`.
|
||||
///
|
||||
/// Note that ideally we wouldn't have to make the `Telemetry` cloneable, as that would remove the
|
||||
/// need for a `Mutex`. However there is currently a weird hack in place in `sc-service`
|
||||
/// where we extract the telemetry registration so that it continues running during the shutdown
|
||||
/// process.
|
||||
struct TelemetryInner {
|
||||
/// Worker for the telemetry. `None` if it failed to initialize.
|
||||
worker: Option<worker::TelemetryWorker>,
|
||||
/// Receives log entries for them to be dispatched to the worker.
|
||||
receiver: mpsc::Receiver<async_record::AsyncRecord>,
|
||||
}
|
||||
|
||||
/// Implements `slog::Drain`.
|
||||
struct TelemetryDrain {
|
||||
/// Sends log entries.
|
||||
sender: std::panic::AssertUnwindSafe<mpsc::Sender<async_record::AsyncRecord>>,
|
||||
}
|
||||
|
||||
/// Initializes the telemetry. See the crate root documentation for more information.
|
||||
///
|
||||
/// Please be careful to not call this function twice in the same program. The `slog` crate
|
||||
/// doesn't provide any way of knowing whether a global logger has already been registered.
|
||||
pub fn init_telemetry(config: TelemetryConfig) -> Telemetry {
|
||||
// Build the list of telemetry endpoints.
|
||||
let (endpoints, wasm_external_transport) = (config.endpoints.0, config.wasm_external_transport);
|
||||
|
||||
let (sender, receiver) = mpsc::channel(16);
|
||||
let guard = {
|
||||
let logger = TelemetryDrain { sender: std::panic::AssertUnwindSafe(sender) };
|
||||
let root = slog::Logger::root(slog::Drain::fuse(logger), slog::o!());
|
||||
slog_scope::set_global_logger(root)
|
||||
};
|
||||
|
||||
let worker = match worker::TelemetryWorker::new(endpoints, wasm_external_transport) {
|
||||
Ok(w) => Some(w),
|
||||
Err(err) => {
|
||||
error!(target: "telemetry", "Failed to initialize telemetry worker: {:?}", err);
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
Telemetry {
|
||||
inner: Arc::new(Mutex::new(TelemetryInner {
|
||||
worker,
|
||||
receiver,
|
||||
})),
|
||||
_guard: Arc::new(guard),
|
||||
}
|
||||
}
|
||||
|
||||
/// Event generated when polling the worker.
|
||||
/// It should be ran as a background task using the [`TelemetryWorker::run`] method. This method
|
||||
/// will consume the object and any further attempts of initializing a new telemetry through its
|
||||
/// handle will fail (without being fatal).
|
||||
#[derive(Debug)]
|
||||
pub enum TelemetryEvent {
|
||||
/// We have established a connection to one of the telemetry endpoint, either for the first
|
||||
/// time or after having been disconnected earlier.
|
||||
Connected,
|
||||
pub struct TelemetryWorker {
|
||||
message_receiver: mpsc::Receiver<TelemetryMessage>,
|
||||
message_sender: mpsc::Sender<TelemetryMessage>,
|
||||
register_receiver: mpsc::UnboundedReceiver<Register>,
|
||||
register_sender: mpsc::UnboundedSender<Register>,
|
||||
transport: WsTrans,
|
||||
}
|
||||
|
||||
impl Stream for Telemetry {
|
||||
type Item = TelemetryEvent;
|
||||
impl TelemetryWorker {
|
||||
pub(crate) fn new(buffer_size: usize, transport: WsTrans) -> Self {
|
||||
let (message_sender, message_receiver) = mpsc::channel(buffer_size);
|
||||
let (register_sender, register_receiver) = mpsc::unbounded();
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
let before = Instant::now();
|
||||
Self {
|
||||
message_receiver,
|
||||
message_sender,
|
||||
register_receiver,
|
||||
register_sender,
|
||||
transport,
|
||||
}
|
||||
}
|
||||
|
||||
// Because the `Telemetry` is cloneable, we need to put the actual fields behind a `Mutex`.
|
||||
// However, the user is only ever supposed to poll from one instance of `Telemetry`, while
|
||||
// the other instances are used only for RAII purposes.
|
||||
// We assume that the user is following this advice and therefore that the `Mutex` is only
|
||||
// ever locked once at a time.
|
||||
let mut inner = match self.inner.try_lock() {
|
||||
Some(l) => l,
|
||||
None => {
|
||||
warn!(
|
||||
target: "telemetry",
|
||||
"The telemetry seems to be polled multiple times simultaneously"
|
||||
);
|
||||
// Returning `Pending` here means that we may never get polled again, but this is
|
||||
// ok because we're in a situation where something else is actually currently doing
|
||||
// the polling.
|
||||
return Poll::Pending;
|
||||
/// Get a new [`TelemetryHandle`].
|
||||
///
|
||||
/// This is used when you want to register a new telemetry for a Substrate node.
|
||||
pub fn handle(&self) -> TelemetryHandle {
|
||||
TelemetryHandle {
|
||||
message_sender: self.register_sender.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get a clone of the channel's `Sender` used to send telemetry events.
|
||||
pub(crate) fn message_sender(&self) -> mpsc::Sender<TelemetryMessage> {
|
||||
self.message_sender.clone()
|
||||
}
|
||||
|
||||
/// Run the telemetry worker.
|
||||
///
|
||||
/// This should be run in a background task.
|
||||
pub async fn run(self) {
|
||||
let Self {
|
||||
mut message_receiver,
|
||||
message_sender: _,
|
||||
mut register_receiver,
|
||||
register_sender: _,
|
||||
transport,
|
||||
} = self;
|
||||
|
||||
let mut node_map: HashMap<Id, Vec<(u8, Multiaddr)>> = HashMap::new();
|
||||
let mut node_pool: HashMap<Multiaddr, _> = HashMap::new();
|
||||
|
||||
loop {
|
||||
futures::select! {
|
||||
message = message_receiver.next() => Self::process_message(
|
||||
message,
|
||||
&mut node_pool,
|
||||
&node_map,
|
||||
).await,
|
||||
init_payload = register_receiver.next() => Self::process_register(
|
||||
init_payload,
|
||||
&mut node_pool,
|
||||
&mut node_map,
|
||||
transport.clone(),
|
||||
).await,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn process_register(
|
||||
input: Option<Register>,
|
||||
node_pool: &mut HashMap<Multiaddr, Node<WsTrans>>,
|
||||
node_map: &mut HashMap<Id, Vec<(u8, Multiaddr)>>,
|
||||
transport: WsTrans,
|
||||
) {
|
||||
let input = input.expect("the stream is never closed; qed");
|
||||
|
||||
match input {
|
||||
Register::Telemetry {
|
||||
id,
|
||||
endpoints,
|
||||
connection_message,
|
||||
} => {
|
||||
let endpoints = endpoints.0;
|
||||
|
||||
let connection_message = match serde_json::to_value(&connection_message) {
|
||||
Ok(serde_json::Value::Object(mut value)) => {
|
||||
value.insert("msg".into(), "system.connected".into());
|
||||
let mut obj = serde_json::Map::new();
|
||||
obj.insert("id".to_string(), id.into_u64().into());
|
||||
obj.insert("payload".to_string(), value.into());
|
||||
Some(obj)
|
||||
}
|
||||
Ok(_) => {
|
||||
unreachable!("ConnectionMessage always serialize to an object; qed")
|
||||
}
|
||||
Err(err) => {
|
||||
log::error!(
|
||||
target: "telemetry",
|
||||
"Could not serialize connection message: {}",
|
||||
err,
|
||||
);
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
for (addr, verbosity) in endpoints {
|
||||
node_map
|
||||
.entry(id.clone())
|
||||
.or_default()
|
||||
.push((verbosity, addr.clone()));
|
||||
|
||||
let node = node_pool.entry(addr.clone()).or_insert_with(|| {
|
||||
Node::new(transport.clone(), addr.clone(), Vec::new(), Vec::new())
|
||||
});
|
||||
|
||||
node.connection_messages.extend(connection_message.clone());
|
||||
}
|
||||
}
|
||||
Register::Notifier {
|
||||
addresses,
|
||||
connection_notifier,
|
||||
} => {
|
||||
for addr in addresses {
|
||||
if let Some(node) = node_pool.get_mut(&addr) {
|
||||
node.telemetry_connection_notifier
|
||||
.push(connection_notifier.clone());
|
||||
} else {
|
||||
log::error!(
|
||||
target: "telemetry",
|
||||
"Received connection notifier for unknown node ({}). This is a bug.",
|
||||
addr,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// dispatch messages to the telemetry nodes
|
||||
async fn process_message(
|
||||
input: Option<TelemetryMessage>,
|
||||
node_pool: &mut HashMap<Multiaddr, Node<WsTrans>>,
|
||||
node_map: &HashMap<Id, Vec<(u8, Multiaddr)>>,
|
||||
) {
|
||||
let (id, verbosity, message) = input.expect("the stream is never closed; qed");
|
||||
|
||||
let nodes = if let Some(nodes) = node_map.get(&id) {
|
||||
nodes
|
||||
} else {
|
||||
// This is a normal error because the telemetry span is entered before the telemetry
|
||||
// is initialized so it is possible that some messages in the beginning don't get
|
||||
// through.
|
||||
log::trace!(
|
||||
target: "telemetry",
|
||||
"Received telemetry log for unknown id ({:?}): {}",
|
||||
id,
|
||||
message,
|
||||
);
|
||||
return;
|
||||
};
|
||||
|
||||
let mut has_connected = false;
|
||||
|
||||
// The polling pattern is: poll the worker so that it processes its queue, then add one
|
||||
// message from the receiver (if possible), then poll the worker again, and so on.
|
||||
loop {
|
||||
if let Some(worker) = inner.worker.as_mut() {
|
||||
while let Poll::Ready(event) = worker.poll(cx) {
|
||||
// Right now we only have one possible event. This line is here in order to not
|
||||
// forget to handle any possible new event type.
|
||||
let worker::TelemetryWorkerEvent::Connected = event;
|
||||
has_connected = true;
|
||||
}
|
||||
for (node_max_verbosity, addr) in nodes {
|
||||
if verbosity > *node_max_verbosity {
|
||||
log::trace!(
|
||||
target: "telemetry",
|
||||
"Skipping {} for log entry with verbosity {:?}",
|
||||
addr,
|
||||
verbosity,
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Poll::Ready(Some(log_entry)) = Stream::poll_next(Pin::new(&mut inner.receiver), cx) {
|
||||
if let Some(worker) = inner.worker.as_mut() {
|
||||
log_entry.as_record_values(|rec, val| { let _ = worker.log(rec, val); });
|
||||
}
|
||||
if let Some(node) = node_pool.get_mut(&addr) {
|
||||
let _ = node.send(message.clone()).await;
|
||||
} else {
|
||||
break;
|
||||
log::error!(
|
||||
target: "telemetry",
|
||||
"Received message for unknown node ({}). This is a bug. \
|
||||
Message sent: {}",
|
||||
addr,
|
||||
message,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
if before.elapsed() > Duration::from_millis(200) {
|
||||
warn!(target: "telemetry", "Polling the telemetry took more than 200ms");
|
||||
}
|
||||
|
||||
if has_connected {
|
||||
Poll::Ready(Some(TelemetryEvent::Connected))
|
||||
} else {
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl slog::Drain for TelemetryDrain {
|
||||
type Ok = ();
|
||||
type Err = ();
|
||||
/// Handle to the [`TelemetryWorker`] thats allows initializing the telemetry for a Substrate node.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct TelemetryHandle {
|
||||
message_sender: mpsc::UnboundedSender<Register>,
|
||||
}
|
||||
|
||||
fn log(&self, record: &slog::Record, values: &slog::OwnedKVList) -> Result<Self::Ok, Self::Err> {
|
||||
let before = Instant::now();
|
||||
impl TelemetryHandle {
|
||||
/// Initialize the telemetry with the endpoints provided in argument for the current substrate
|
||||
/// node.
|
||||
///
|
||||
/// This method must be called during the substrate node initialization.
|
||||
///
|
||||
/// The `endpoints` argument is a collection of telemetry WebSocket servers with a corresponding
|
||||
/// verbosity level.
|
||||
///
|
||||
/// The `connection_message` argument is a JSON object that is sent every time the connection
|
||||
/// (re-)establishes.
|
||||
pub fn start_telemetry(
|
||||
&mut self,
|
||||
span: TelemetrySpan,
|
||||
endpoints: TelemetryEndpoints,
|
||||
connection_message: ConnectionMessage,
|
||||
) -> TelemetryConnectionNotifier {
|
||||
let Self { message_sender } = self;
|
||||
|
||||
let serialized = async_record::AsyncRecord::from(record, values);
|
||||
// Note: interestingly, `try_send` requires a `&mut` because it modifies some internal value, while `clone()`
|
||||
// is lock-free.
|
||||
if let Err(err) = self.sender.clone().try_send(serialized) {
|
||||
warn!(target: "telemetry", "Ignored telemetry message because of error on channel: {:?}", err);
|
||||
let connection_notifier = TelemetryConnectionNotifier {
|
||||
message_sender: message_sender.clone(),
|
||||
addresses: endpoints.0.iter().map(|(addr, _)| addr.clone()).collect(),
|
||||
};
|
||||
|
||||
match span.0.id() {
|
||||
Some(id) => {
|
||||
match message_sender.unbounded_send(Register::Telemetry {
|
||||
id,
|
||||
endpoints,
|
||||
connection_message,
|
||||
}) {
|
||||
Ok(()) => {}
|
||||
Err(err) => error!(
|
||||
target: "telemetry",
|
||||
"Could not initialize telemetry: \
|
||||
the telemetry is probably already running: {}",
|
||||
err,
|
||||
),
|
||||
}
|
||||
}
|
||||
None => error!(
|
||||
target: "telemetry",
|
||||
"Could not initialize telemetry: the span could not be entered",
|
||||
),
|
||||
}
|
||||
|
||||
if before.elapsed() > Duration::from_millis(50) {
|
||||
warn!(target: "telemetry", "Writing a telemetry log took more than 50ms");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
connection_notifier
|
||||
}
|
||||
}
|
||||
|
||||
/// Translates to `slog_scope::info`, but contains an additional verbosity
|
||||
/// parameter which the log record is tagged with. Additionally the verbosity
|
||||
/// parameter is added to the record as a key-value pair.
|
||||
#[macro_export]
|
||||
/// Used to create a stream of events with only one event: when a telemetry connection
|
||||
/// (re-)establishes.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct TelemetryConnectionNotifier {
|
||||
message_sender: mpsc::UnboundedSender<Register>,
|
||||
addresses: Vec<Multiaddr>,
|
||||
}
|
||||
|
||||
impl TelemetryConnectionNotifier {
|
||||
/// Get event stream for telemetry connection established events.
|
||||
///
|
||||
/// This function will return an error if the telemetry has already been started by
|
||||
/// [`TelemetryHandle::start_telemetry`].
|
||||
pub fn on_connect_stream(&self) -> TracingUnboundedReceiver<()> {
|
||||
let (message_sender, message_receiver) = tracing_unbounded("mpsc_telemetry_on_connect");
|
||||
if let Err(err) = self.message_sender.unbounded_send(Register::Notifier {
|
||||
addresses: self.addresses.clone(),
|
||||
connection_notifier: message_sender,
|
||||
}) {
|
||||
error!(
|
||||
target: "telemetry",
|
||||
"Could not create a telemetry connection notifier: \
|
||||
the telemetry is probably already running: {}",
|
||||
err,
|
||||
);
|
||||
}
|
||||
message_receiver
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum Register {
|
||||
Telemetry {
|
||||
id: Id,
|
||||
endpoints: TelemetryEndpoints,
|
||||
connection_message: ConnectionMessage,
|
||||
},
|
||||
Notifier {
|
||||
addresses: Vec<Multiaddr>,
|
||||
connection_notifier: ConnectionNotifierSender,
|
||||
},
|
||||
}
|
||||
|
||||
/// Report a telemetry.
|
||||
///
|
||||
/// Translates to [`tracing::info`], but contains an additional verbosity parameter which the log
|
||||
/// record is tagged with. Additionally the verbosity parameter is added to the record as a
|
||||
/// key-value pair.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```no_run
|
||||
/// # use sc_telemetry::*;
|
||||
/// # let authority_id = 42_u64;
|
||||
/// # let set_id = (43_u64, 44_u64);
|
||||
/// # let authorities = vec![45_u64];
|
||||
/// telemetry!(CONSENSUS_INFO; "afg.authority_set";
|
||||
/// "authority_id" => authority_id.to_string(),
|
||||
/// "authority_set_id" => ?set_id,
|
||||
/// "authorities" => authorities,
|
||||
/// );
|
||||
/// ```
|
||||
#[macro_export(local_inner_macros)]
|
||||
macro_rules! telemetry {
|
||||
( $a:expr; $b:expr; $( $t:tt )* ) => {
|
||||
$crate::with_logger(|l| {
|
||||
$crate::slog::slog_info!(l, #$a, $b; $($t)* )
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod telemetry_endpoints_tests {
|
||||
use libp2p::Multiaddr;
|
||||
use super::TelemetryEndpoints;
|
||||
use super::url_to_multiaddr;
|
||||
|
||||
#[test]
|
||||
fn valid_endpoints() {
|
||||
let endp = vec![("wss://telemetry.polkadot.io/submit/".into(), 3), ("/ip4/80.123.90.4/tcp/5432".into(), 4)];
|
||||
let telem = TelemetryEndpoints::new(endp.clone()).expect("Telemetry endpoint should be valid");
|
||||
let mut res: Vec<(Multiaddr, u8)> = vec![];
|
||||
for (a, b) in endp.iter() {
|
||||
res.push((url_to_multiaddr(a).expect("provided url should be valid"), *b))
|
||||
( $verbosity:expr; $msg:expr; $( $t:tt )* ) => {{
|
||||
let verbosity: u8 = $verbosity;
|
||||
match format_fields_to_json!($($t)*) {
|
||||
Err(err) => {
|
||||
$crate::tracing::error!(
|
||||
target: "telemetry",
|
||||
"Could not serialize value for telemetry: {}",
|
||||
err,
|
||||
);
|
||||
},
|
||||
Ok(mut json) => {
|
||||
// NOTE: the span id will be added later in the JSON for the greater good
|
||||
json.insert("msg".into(), $msg.into());
|
||||
let serialized_json = $crate::serde_json::to_string(&json)
|
||||
.expect("contains only string keys; qed");
|
||||
$crate::tracing::info!(target: $crate::TELEMETRY_LOG_SPAN,
|
||||
verbosity,
|
||||
json = serialized_json.as_str(),
|
||||
);
|
||||
},
|
||||
}
|
||||
assert_eq!(telem.0, res);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn invalid_endpoints() {
|
||||
let endp = vec![("/ip4/...80.123.90.4/tcp/5432".into(), 3), ("/ip4/no:!?;rlkqre;;::::///tcp/5432".into(), 4)];
|
||||
let telem = TelemetryEndpoints::new(endp);
|
||||
assert!(telem.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn valid_and_invalid_endpoints() {
|
||||
let endp = vec![("/ip4/80.123.90.4/tcp/5432".into(), 3), ("/ip4/no:!?;rlkqre;;::::///tcp/5432".into(), 4)];
|
||||
let telem = TelemetryEndpoints::new(endp);
|
||||
assert!(telem.is_err());
|
||||
}
|
||||
}};
|
||||
}
|
||||
|
||||
#[macro_export(local_inner_macros)]
|
||||
#[doc(hidden)]
|
||||
macro_rules! format_fields_to_json {
|
||||
( $k:literal => $v:expr $(,)? $(, $($t:tt)+ )? ) => {{
|
||||
$crate::serde_json::to_value(&$v)
|
||||
.map(|value| {
|
||||
let mut map = $crate::serde_json::Map::new();
|
||||
map.insert($k.into(), value);
|
||||
map
|
||||
})
|
||||
$(
|
||||
.and_then(|mut prev_map| {
|
||||
format_fields_to_json!($($t)*)
|
||||
.map(move |mut other_map| {
|
||||
prev_map.append(&mut other_map);
|
||||
prev_map
|
||||
})
|
||||
})
|
||||
)*
|
||||
}};
|
||||
( $k:literal => ? $v:expr $(,)? $(, $($t:tt)+ )? ) => {{
|
||||
let mut map = $crate::serde_json::Map::new();
|
||||
map.insert($k.into(), std::format!("{:?}", &$v).into());
|
||||
$crate::serde_json::Result::Ok(map)
|
||||
$(
|
||||
.and_then(|mut prev_map| {
|
||||
format_fields_to_json!($($t)*)
|
||||
.map(move |mut other_map| {
|
||||
prev_map.append(&mut other_map);
|
||||
prev_map
|
||||
})
|
||||
})
|
||||
)*
|
||||
}};
|
||||
}
|
||||
|
||||
@@ -0,0 +1,286 @@
|
||||
// This file is part of Substrate.
|
||||
|
||||
// Copyright (C) 2017-2021 Parity Technologies (UK) Ltd.
|
||||
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
|
||||
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
use futures::prelude::*;
|
||||
use libp2p::core::transport::Transport;
|
||||
use libp2p::Multiaddr;
|
||||
use rand::Rng as _;
|
||||
use std::{fmt, mem, pin::Pin, task::Context, task::Poll, time::Duration};
|
||||
use wasm_timer::Delay;
|
||||
|
||||
pub(crate) type ConnectionNotifierSender = sp_utils::mpsc::TracingUnboundedSender<()>;
|
||||
|
||||
/// Handler for a single telemetry node.
|
||||
///
|
||||
/// This is a wrapper `Sink` around a network `Sink` with 3 particularities:
|
||||
/// - It is infallible: if the connection stops, it will reconnect automatically when the server
|
||||
/// becomes available again.
|
||||
/// - It holds a list of "connection messages" which are sent automatically when the connection is
|
||||
/// (re-)established. This is used for the "system.connected" message that needs to be send for
|
||||
/// every substrate node that connects.
|
||||
/// - It doesn't stay in pending while waiting for connection. Instead, it moves data into the
|
||||
/// void if the connection could not be established. This is important for the `Dispatcher`
|
||||
/// `Sink` which we don't want to block if one connection is broken.
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct Node<TTrans: Transport> {
|
||||
/// Address of the node.
|
||||
addr: Multiaddr,
|
||||
/// State of the connection.
|
||||
socket: NodeSocket<TTrans>,
|
||||
/// Transport used to establish new connections.
|
||||
transport: TTrans,
|
||||
/// Messages that are sent when the connection (re-)establishes.
|
||||
pub(crate) connection_messages: Vec<serde_json::Map<String, serde_json::Value>>,
|
||||
/// Notifier for when the connection (re-)establishes.
|
||||
pub(crate) telemetry_connection_notifier: Vec<ConnectionNotifierSender>,
|
||||
}
|
||||
|
||||
enum NodeSocket<TTrans: Transport> {
|
||||
/// We're connected to the node. This is the normal state.
|
||||
Connected(NodeSocketConnected<TTrans>),
|
||||
/// We are currently dialing the node.
|
||||
Dialing(TTrans::Dial),
|
||||
/// A new connection should be started as soon as possible.
|
||||
ReconnectNow,
|
||||
/// Waiting before attempting to dial again.
|
||||
WaitingReconnect(Delay),
|
||||
/// Temporary transition state.
|
||||
Poisoned,
|
||||
}
|
||||
|
||||
impl<TTrans: Transport> NodeSocket<TTrans> {
|
||||
fn wait_reconnect() -> NodeSocket<TTrans> {
|
||||
let random_delay = rand::thread_rng().gen_range(5, 10);
|
||||
let delay = Delay::new(Duration::from_secs(random_delay));
|
||||
NodeSocket::WaitingReconnect(delay)
|
||||
}
|
||||
}
|
||||
|
||||
struct NodeSocketConnected<TTrans: Transport> {
|
||||
/// Where to send data.
|
||||
sink: TTrans::Output,
|
||||
/// Queue of packets to send before accepting new packets.
|
||||
buf: Vec<Vec<u8>>,
|
||||
}
|
||||
|
||||
impl<TTrans: Transport> Node<TTrans> {
|
||||
/// Builds a new node handler.
|
||||
pub(crate) fn new(
|
||||
transport: TTrans,
|
||||
addr: Multiaddr,
|
||||
connection_messages: Vec<serde_json::Map<String, serde_json::Value>>,
|
||||
telemetry_connection_notifier: Vec<ConnectionNotifierSender>,
|
||||
) -> Self {
|
||||
Node {
|
||||
addr,
|
||||
socket: NodeSocket::ReconnectNow,
|
||||
transport,
|
||||
connection_messages,
|
||||
telemetry_connection_notifier,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<TTrans: Transport, TSinkErr> Node<TTrans>
|
||||
where
|
||||
TTrans: Clone + Unpin,
|
||||
TTrans::Dial: Unpin,
|
||||
TTrans::Output:
|
||||
Sink<Vec<u8>, Error = TSinkErr> + Stream<Item = Result<Vec<u8>, TSinkErr>> + Unpin,
|
||||
TSinkErr: fmt::Debug,
|
||||
{
|
||||
// NOTE: this code has been inspired from `Buffer` (`futures_util::sink::Buffer`).
|
||||
// https://docs.rs/futures-util/0.3.8/src/futures_util/sink/buffer.rs.html#32
|
||||
fn try_send_connection_messages(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
conn: &mut NodeSocketConnected<TTrans>,
|
||||
) -> Poll<Result<(), TSinkErr>> {
|
||||
while let Some(item) = conn.buf.pop() {
|
||||
if let Err(e) = conn.sink.start_send_unpin(item) {
|
||||
return Poll::Ready(Err(e));
|
||||
}
|
||||
futures::ready!(conn.sink.poll_ready_unpin(cx))?;
|
||||
}
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) enum Infallible {}
|
||||
|
||||
impl<TTrans: Transport, TSinkErr> Sink<String> for Node<TTrans>
|
||||
where
|
||||
TTrans: Clone + Unpin,
|
||||
TTrans::Dial: Unpin,
|
||||
TTrans::Output:
|
||||
Sink<Vec<u8>, Error = TSinkErr> + Stream<Item = Result<Vec<u8>, TSinkErr>> + Unpin,
|
||||
TSinkErr: fmt::Debug,
|
||||
{
|
||||
type Error = Infallible;
|
||||
|
||||
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
|
||||
let mut socket = mem::replace(&mut self.socket, NodeSocket::Poisoned);
|
||||
self.socket = loop {
|
||||
match socket {
|
||||
NodeSocket::Connected(mut conn) => match conn.sink.poll_ready_unpin(cx) {
|
||||
Poll::Ready(Ok(())) => {
|
||||
match self.as_mut().try_send_connection_messages(cx, &mut conn) {
|
||||
Poll::Ready(Err(err)) => {
|
||||
log::warn!(target: "telemetry", "⚠️ Disconnected from {}: {:?}", self.addr, err);
|
||||
socket = NodeSocket::wait_reconnect();
|
||||
}
|
||||
Poll::Ready(Ok(())) => {
|
||||
self.socket = NodeSocket::Connected(conn);
|
||||
return Poll::Ready(Ok(()));
|
||||
}
|
||||
Poll::Pending => {
|
||||
self.socket = NodeSocket::Connected(conn);
|
||||
return Poll::Pending;
|
||||
}
|
||||
}
|
||||
}
|
||||
Poll::Ready(Err(err)) => {
|
||||
log::warn!(target: "telemetry", "⚠️ Disconnected from {}: {:?}", self.addr, err);
|
||||
socket = NodeSocket::wait_reconnect();
|
||||
}
|
||||
Poll::Pending => {
|
||||
self.socket = NodeSocket::Connected(conn);
|
||||
return Poll::Pending;
|
||||
}
|
||||
},
|
||||
NodeSocket::Dialing(mut s) => match Future::poll(Pin::new(&mut s), cx) {
|
||||
Poll::Ready(Ok(sink)) => {
|
||||
log::debug!(target: "telemetry", "✅ Connected to {}", self.addr);
|
||||
|
||||
for sender in self.telemetry_connection_notifier.iter_mut() {
|
||||
let _ = sender.send(());
|
||||
}
|
||||
|
||||
let buf = self
|
||||
.connection_messages
|
||||
.iter()
|
||||
.map(|json| {
|
||||
let mut json = json.clone();
|
||||
json.insert(
|
||||
"ts".to_string(),
|
||||
chrono::Local::now().to_rfc3339().into(),
|
||||
);
|
||||
json
|
||||
})
|
||||
.filter_map(|json| match serde_json::to_vec(&json) {
|
||||
Ok(message) => Some(message),
|
||||
Err(err) => {
|
||||
log::error!(
|
||||
target: "telemetry",
|
||||
"An error occurred while generating new connection \
|
||||
messages: {}",
|
||||
err,
|
||||
);
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
socket = NodeSocket::Connected(NodeSocketConnected { sink, buf });
|
||||
}
|
||||
Poll::Pending => break NodeSocket::Dialing(s),
|
||||
Poll::Ready(Err(err)) => {
|
||||
log::warn!(target: "telemetry", "❌ Error while dialing {}: {:?}", self.addr, err);
|
||||
socket = NodeSocket::wait_reconnect();
|
||||
}
|
||||
},
|
||||
NodeSocket::ReconnectNow => match self.transport.clone().dial(self.addr.clone()) {
|
||||
Ok(d) => {
|
||||
log::debug!(target: "telemetry", "Started dialing {}", self.addr);
|
||||
socket = NodeSocket::Dialing(d);
|
||||
}
|
||||
Err(err) => {
|
||||
log::warn!(target: "telemetry", "❌ Error while dialing {}: {:?}", self.addr, err);
|
||||
socket = NodeSocket::wait_reconnect();
|
||||
}
|
||||
},
|
||||
NodeSocket::WaitingReconnect(mut s) => {
|
||||
if let Poll::Ready(_) = Future::poll(Pin::new(&mut s), cx) {
|
||||
socket = NodeSocket::ReconnectNow;
|
||||
} else {
|
||||
break NodeSocket::WaitingReconnect(s);
|
||||
}
|
||||
}
|
||||
NodeSocket::Poisoned => {
|
||||
log::error!(target: "telemetry", "‼️ Poisoned connection with {}", self.addr);
|
||||
break NodeSocket::Poisoned;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// The Dispatcher blocks when the Node sinks blocks. This is why it is important that the
|
||||
// Node sinks doesn't go into "Pending" state while waiting for reconnection but rather
|
||||
// discard the excess of telemetry messages.
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
fn start_send(mut self: Pin<&mut Self>, item: String) -> Result<(), Self::Error> {
|
||||
match &mut self.socket {
|
||||
NodeSocket::Connected(conn) => {
|
||||
let _ = conn.sink.start_send_unpin(item.into()).expect("boo");
|
||||
}
|
||||
_socket => {
|
||||
log::trace!(
|
||||
target: "telemetry",
|
||||
"Message has been discarded: {}",
|
||||
item,
|
||||
);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
match &mut self.socket {
|
||||
NodeSocket::Connected(conn) => match conn.sink.poll_flush_unpin(cx) {
|
||||
Poll::Ready(Err(_)) => {
|
||||
self.socket = NodeSocket::wait_reconnect();
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
|
||||
Poll::Pending => Poll::Pending,
|
||||
},
|
||||
_ => Poll::Ready(Ok(())),
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
match &mut self.socket {
|
||||
NodeSocket::Connected(conn) => conn.sink.poll_close_unpin(cx).map(|_| Ok(())),
|
||||
_ => Poll::Ready(Ok(())),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<TTrans: Transport> fmt::Debug for NodeSocket<TTrans> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
use NodeSocket::*;
|
||||
f.write_str(match self {
|
||||
Connected(_) => "Connected",
|
||||
Dialing(_) => "Dialing",
|
||||
ReconnectNow => "ReconnectNow",
|
||||
WaitingReconnect(_) => "WaitingReconnect",
|
||||
Poisoned => "Poisoned",
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,163 @@
|
||||
// This file is part of Substrate.
|
||||
|
||||
// Copyright (C) 2021 Parity Technologies (UK) Ltd.
|
||||
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
|
||||
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
use futures::{
|
||||
prelude::*,
|
||||
ready,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
use libp2p::{
|
||||
core::transport::{timeout::TransportTimeout, OptionalTransport},
|
||||
wasm_ext, Transport,
|
||||
};
|
||||
use std::io;
|
||||
use std::pin::Pin;
|
||||
use std::time::Duration;
|
||||
|
||||
/// Timeout after which a connection attempt is considered failed. Includes the WebSocket HTTP
|
||||
/// upgrading.
|
||||
const CONNECT_TIMEOUT: Duration = Duration::from_secs(20);
|
||||
|
||||
pub(crate) fn initialize_transport(
|
||||
wasm_external_transport: Option<wasm_ext::ExtTransport>,
|
||||
) -> Result<WsTrans, io::Error> {
|
||||
let transport = match wasm_external_transport.clone() {
|
||||
Some(t) => OptionalTransport::some(t),
|
||||
None => OptionalTransport::none(),
|
||||
}
|
||||
.map((|inner, _| StreamSink::from(inner)) as fn(_, _) -> _);
|
||||
|
||||
// The main transport is the `wasm_external_transport`, but if we're on desktop we add
|
||||
// support for TCP+WebSocket+DNS as a fallback. In practice, you're not expected to pass
|
||||
// an external transport on desktop and the fallback is used all the time.
|
||||
#[cfg(not(target_os = "unknown"))]
|
||||
let transport = transport.or_transport({
|
||||
let inner = libp2p::dns::DnsConfig::new(libp2p::tcp::TcpConfig::new())?;
|
||||
libp2p::websocket::framed::WsConfig::new(inner).and_then(|connec, _| {
|
||||
let connec = connec
|
||||
.with(|item| {
|
||||
let item = libp2p::websocket::framed::OutgoingData::Binary(item);
|
||||
future::ready(Ok::<_, io::Error>(item))
|
||||
})
|
||||
.try_filter(|item| future::ready(item.is_data()))
|
||||
.map_ok(|data| data.into_bytes());
|
||||
future::ready(Ok::<_, io::Error>(connec))
|
||||
})
|
||||
});
|
||||
|
||||
Ok(TransportTimeout::new(
|
||||
transport.map(|out, _| {
|
||||
let out = out
|
||||
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
|
||||
.sink_map_err(|err| io::Error::new(io::ErrorKind::Other, err));
|
||||
Box::pin(out) as Pin<Box<_>>
|
||||
}),
|
||||
CONNECT_TIMEOUT,
|
||||
)
|
||||
.boxed())
|
||||
}
|
||||
|
||||
/// A trait that implements `Stream` and `Sink`.
|
||||
pub(crate) trait StreamAndSink<I>: Stream + Sink<I> {}
|
||||
impl<T: ?Sized + Stream + Sink<I>, I> StreamAndSink<I> for T {}
|
||||
|
||||
/// A type alias for the WebSocket transport.
|
||||
pub(crate) type WsTrans = libp2p::core::transport::Boxed<
|
||||
Pin<
|
||||
Box<
|
||||
dyn StreamAndSink<Vec<u8>, Item = Result<Vec<u8>, io::Error>, Error = io::Error> + Send,
|
||||
>,
|
||||
>,
|
||||
>;
|
||||
|
||||
/// Wraps around an `AsyncWrite` and implements `Sink`. Guarantees that each item being sent maps
|
||||
/// to one call of `write`.
|
||||
///
|
||||
/// For some context, we put this object around the `wasm_ext::ExtTransport` in order to make sure
|
||||
/// that each telemetry message maps to one single call to `write` in the WASM FFI.
|
||||
#[pin_project::pin_project]
|
||||
pub(crate) struct StreamSink<T>(#[pin] T, Option<Vec<u8>>);
|
||||
|
||||
impl<T> From<T> for StreamSink<T> {
|
||||
fn from(inner: T) -> StreamSink<T> {
|
||||
StreamSink(inner, None)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: AsyncRead> Stream for StreamSink<T> {
|
||||
type Item = Result<Vec<u8>, io::Error>;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
let this = self.project();
|
||||
let mut buf = vec![0; 128];
|
||||
match ready!(AsyncRead::poll_read(this.0, cx, &mut buf)) {
|
||||
Ok(0) => Poll::Ready(None),
|
||||
Ok(n) => {
|
||||
buf.truncate(n);
|
||||
Poll::Ready(Some(Ok(buf)))
|
||||
}
|
||||
Err(err) => Poll::Ready(Some(Err(err))),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: AsyncWrite> StreamSink<T> {
|
||||
fn poll_flush_buffer(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
|
||||
let this = self.project();
|
||||
|
||||
if let Some(buffer) = this.1 {
|
||||
if ready!(this.0.poll_write(cx, &buffer[..]))? != buffer.len() {
|
||||
log::error!(target: "telemetry",
|
||||
"Detected some internal buffering happening in the telemetry");
|
||||
let err = io::Error::new(io::ErrorKind::Other, "Internal buffering detected");
|
||||
return Poll::Ready(Err(err));
|
||||
}
|
||||
}
|
||||
|
||||
*this.1 = None;
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: AsyncWrite> Sink<Vec<u8>> for StreamSink<T> {
|
||||
type Error = io::Error;
|
||||
|
||||
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
|
||||
ready!(StreamSink::poll_flush_buffer(self, cx))?;
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
fn start_send(self: Pin<&mut Self>, item: Vec<u8>) -> Result<(), Self::Error> {
|
||||
let this = self.project();
|
||||
debug_assert!(this.1.is_none());
|
||||
*this.1 = Some(item);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
|
||||
ready!(self.as_mut().poll_flush_buffer(cx))?;
|
||||
let this = self.project();
|
||||
AsyncWrite::poll_flush(this.0, cx)
|
||||
}
|
||||
|
||||
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
|
||||
ready!(self.as_mut().poll_flush_buffer(cx))?;
|
||||
let this = self.project();
|
||||
AsyncWrite::poll_close(this.0, cx)
|
||||
}
|
||||
}
|
||||
@@ -1,263 +0,0 @@
|
||||
// This file is part of Substrate.
|
||||
|
||||
// Copyright (C) 2017-2021 Parity Technologies (UK) Ltd.
|
||||
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
|
||||
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
//! Contains the object that makes the telemetry work.
|
||||
//!
|
||||
//! # Usage
|
||||
//!
|
||||
//! - Create a `TelemetryWorker` with `TelemetryWorker::new`.
|
||||
//! - Send messages to the telemetry with `TelemetryWorker::send_message`. Messages will only be
|
||||
//! sent to the appropriate targets. Messages may be ignored if the target happens to be
|
||||
//! temporarily unreachable.
|
||||
//! - You must appropriately poll the worker with `TelemetryWorker::poll`. Polling will/may produce
|
||||
//! events indicating what happened since the latest polling.
|
||||
//!
|
||||
|
||||
use futures::{prelude::*, ready};
|
||||
use libp2p::{
|
||||
core::transport::{OptionalTransport, timeout::TransportTimeout},
|
||||
Multiaddr,
|
||||
Transport,
|
||||
wasm_ext
|
||||
};
|
||||
use log::{trace, warn, error};
|
||||
use slog::Drain;
|
||||
use std::{io, pin::Pin, task::Context, task::Poll, time};
|
||||
|
||||
mod node;
|
||||
|
||||
/// Timeout after which a connection attempt is considered failed. Includes the WebSocket HTTP
|
||||
/// upgrading.
|
||||
const CONNECT_TIMEOUT: time::Duration = time::Duration::from_secs(20);
|
||||
|
||||
/// Event generated when polling the worker.
|
||||
#[derive(Debug)]
|
||||
pub enum TelemetryWorkerEvent {
|
||||
/// We have established a connection to one of the telemetry endpoint, either for the first
|
||||
/// time or after having been disconnected earlier.
|
||||
Connected,
|
||||
}
|
||||
|
||||
/// Telemetry processing machine.
|
||||
#[derive(Debug)]
|
||||
pub struct TelemetryWorker {
|
||||
/// List of nodes with their maximum verbosity level.
|
||||
nodes: Vec<(node::Node<WsTrans>, u8)>,
|
||||
}
|
||||
|
||||
trait StreamAndSink<I>: Stream + Sink<I> {}
|
||||
impl<T: ?Sized + Stream + Sink<I>, I> StreamAndSink<I> for T {}
|
||||
|
||||
type WsTrans = libp2p::core::transport::Boxed<
|
||||
Pin<Box<dyn StreamAndSink<
|
||||
Vec<u8>,
|
||||
Item = Result<Vec<u8>, io::Error>,
|
||||
Error = io::Error
|
||||
> + Send>>
|
||||
>;
|
||||
|
||||
impl TelemetryWorker {
|
||||
/// Builds a new `TelemetryWorker`.
|
||||
///
|
||||
/// The endpoints must be a list of targets, plus a verbosity level. When you send a message
|
||||
/// to the telemetry, only the targets whose verbosity is higher than the verbosity of the
|
||||
/// message will receive it.
|
||||
pub fn new(
|
||||
endpoints: impl IntoIterator<Item = (Multiaddr, u8)>,
|
||||
wasm_external_transport: impl Into<Option<wasm_ext::ExtTransport>>
|
||||
) -> Result<Self, io::Error> {
|
||||
let transport = match wasm_external_transport.into() {
|
||||
Some(t) => OptionalTransport::some(t),
|
||||
None => OptionalTransport::none()
|
||||
}.map((|inner, _| StreamSink::from(inner)) as fn(_, _) -> _);
|
||||
|
||||
// The main transport is the `wasm_external_transport`, but if we're on desktop we add
|
||||
// support for TCP+WebSocket+DNS as a fallback. In practice, you're not expected to pass
|
||||
// an external transport on desktop and the fallback is used all the time.
|
||||
#[cfg(not(target_os = "unknown"))]
|
||||
let transport = transport.or_transport({
|
||||
let inner = libp2p::dns::DnsConfig::new(libp2p::tcp::TcpConfig::new())?;
|
||||
libp2p::websocket::framed::WsConfig::new(inner)
|
||||
.and_then(|connec, _| {
|
||||
let connec = connec
|
||||
.with(|item| {
|
||||
let item = libp2p::websocket::framed::OutgoingData::Binary(item);
|
||||
future::ready(Ok::<_, io::Error>(item))
|
||||
})
|
||||
.try_filter(|item| future::ready(item.is_data()))
|
||||
.map_ok(|data| data.into_bytes());
|
||||
future::ready(Ok::<_, io::Error>(connec))
|
||||
})
|
||||
});
|
||||
|
||||
let transport = TransportTimeout::new(
|
||||
transport.map(|out, _| {
|
||||
let out = out
|
||||
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
|
||||
.sink_map_err(|err| io::Error::new(io::ErrorKind::Other, err));
|
||||
Box::pin(out) as Pin<Box<_>>
|
||||
}),
|
||||
CONNECT_TIMEOUT
|
||||
).boxed();
|
||||
|
||||
Ok(TelemetryWorker {
|
||||
nodes: endpoints.into_iter().map(|(addr, verbosity)| {
|
||||
let node = node::Node::new(transport.clone(), addr);
|
||||
(node, verbosity)
|
||||
}).collect()
|
||||
})
|
||||
}
|
||||
|
||||
/// Polls the worker for events that happened.
|
||||
pub fn poll(&mut self, cx: &mut Context) -> Poll<TelemetryWorkerEvent> {
|
||||
for (node, _) in &mut self.nodes {
|
||||
loop {
|
||||
match node::Node::poll(Pin::new(node), cx) {
|
||||
Poll::Ready(node::NodeEvent::Connected) =>
|
||||
return Poll::Ready(TelemetryWorkerEvent::Connected),
|
||||
Poll::Ready(node::NodeEvent::Disconnected(_)) => continue,
|
||||
Poll::Pending => break,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Poll::Pending
|
||||
}
|
||||
|
||||
/// Equivalent to `slog::Drain::log`, but takes `self` by `&mut` instead, which is more convenient.
|
||||
///
|
||||
/// Keep in mind that you should call `TelemetryWorker::poll` in order to process the messages.
|
||||
/// You should call this function right after calling `slog::Drain::log`.
|
||||
pub fn log(&mut self, record: &slog::Record, values: &slog::OwnedKVList) -> Result<(), ()> {
|
||||
let msg_verbosity = match record.tag().parse::<u8>() {
|
||||
Ok(v) => v,
|
||||
Err(err) => {
|
||||
warn!(target: "telemetry", "Failed to parse telemetry tag {:?}: {:?}",
|
||||
record.tag(), err);
|
||||
return Err(())
|
||||
}
|
||||
};
|
||||
|
||||
// None of the nodes want that verbosity, so just return without doing any serialization.
|
||||
if self.nodes.iter().all(|(_, node_max_verbosity)| msg_verbosity > *node_max_verbosity) {
|
||||
trace!(
|
||||
target: "telemetry",
|
||||
"Skipping log entry because verbosity {:?} is too high for all endpoints",
|
||||
msg_verbosity
|
||||
);
|
||||
return Ok(())
|
||||
}
|
||||
|
||||
// Turn the message into JSON.
|
||||
let serialized = {
|
||||
let mut out = Vec::new();
|
||||
slog_json::Json::default(&mut out).log(record, values).map_err(|_| ())?;
|
||||
out
|
||||
};
|
||||
|
||||
for (node, node_max_verbosity) in &mut self.nodes {
|
||||
if msg_verbosity > *node_max_verbosity {
|
||||
trace!(target: "telemetry", "Skipping {:?} for log entry with verbosity {:?}",
|
||||
node.addr(), msg_verbosity);
|
||||
continue;
|
||||
}
|
||||
|
||||
// `send_message` returns an error if we're not connected, which we silently ignore.
|
||||
let _ = node.send_message(&serialized.clone()[..]);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Wraps around an `AsyncWrite` and implements `Sink`. Guarantees that each item being sent maps
|
||||
/// to one call of `write`.
|
||||
///
|
||||
/// For some context, we put this object around the `wasm_ext::ExtTransport` in order to make sure
|
||||
/// that each telemetry message maps to one single call to `write` in the WASM FFI.
|
||||
#[pin_project::pin_project]
|
||||
struct StreamSink<T>(#[pin] T, Option<Vec<u8>>);
|
||||
|
||||
impl<T> From<T> for StreamSink<T> {
|
||||
fn from(inner: T) -> StreamSink<T> {
|
||||
StreamSink(inner, None)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: AsyncRead> Stream for StreamSink<T> {
|
||||
type Item = Result<Vec<u8>, io::Error>;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
let this = self.project();
|
||||
let mut buf = vec![0; 128];
|
||||
match ready!(AsyncRead::poll_read(this.0, cx, &mut buf)) {
|
||||
Ok(0) => Poll::Ready(None),
|
||||
Ok(n) => {
|
||||
buf.truncate(n);
|
||||
Poll::Ready(Some(Ok(buf)))
|
||||
},
|
||||
Err(err) => Poll::Ready(Some(Err(err))),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: AsyncWrite> StreamSink<T> {
|
||||
fn poll_flush_buffer(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
|
||||
let this = self.project();
|
||||
|
||||
if let Some(buffer) = this.1 {
|
||||
if ready!(this.0.poll_write(cx, &buffer[..]))? != buffer.len() {
|
||||
error!(target: "telemetry",
|
||||
"Detected some internal buffering happening in the telemetry");
|
||||
let err = io::Error::new(io::ErrorKind::Other, "Internal buffering detected");
|
||||
return Poll::Ready(Err(err));
|
||||
}
|
||||
}
|
||||
|
||||
*this.1 = None;
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: AsyncWrite> Sink<Vec<u8>> for StreamSink<T> {
|
||||
type Error = io::Error;
|
||||
|
||||
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
|
||||
ready!(StreamSink::poll_flush_buffer(self, cx))?;
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
fn start_send(self: Pin<&mut Self>, item: Vec<u8>) -> Result<(), Self::Error> {
|
||||
let this = self.project();
|
||||
debug_assert!(this.1.is_none());
|
||||
*this.1 = Some(item);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
|
||||
ready!(self.as_mut().poll_flush_buffer(cx))?;
|
||||
let this = self.project();
|
||||
AsyncWrite::poll_flush(this.0, cx)
|
||||
}
|
||||
|
||||
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
|
||||
ready!(self.as_mut().poll_flush_buffer(cx))?;
|
||||
let this = self.project();
|
||||
AsyncWrite::poll_close(this.0, cx)
|
||||
}
|
||||
}
|
||||
@@ -1,305 +0,0 @@
|
||||
// This file is part of Substrate.
|
||||
|
||||
// Copyright (C) 2017-2021 Parity Technologies (UK) Ltd.
|
||||
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
|
||||
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
//! Contains the `Node` struct, which handles communications with a single telemetry endpoint.
|
||||
|
||||
use futures::prelude::*;
|
||||
use futures_timer::Delay;
|
||||
use libp2p::Multiaddr;
|
||||
use libp2p::core::transport::Transport;
|
||||
use log::{trace, debug, warn, error};
|
||||
use rand::Rng as _;
|
||||
use std::{collections::VecDeque, fmt, mem, pin::Pin, task::Context, task::Poll, time::Duration};
|
||||
|
||||
/// Maximum number of pending telemetry messages.
|
||||
const MAX_PENDING: usize = 10;
|
||||
|
||||
/// Handler for a single telemetry node.
|
||||
pub struct Node<TTrans: Transport> {
|
||||
/// Address of the node.
|
||||
addr: Multiaddr,
|
||||
/// State of the connection.
|
||||
socket: NodeSocket<TTrans>,
|
||||
/// Transport used to establish new connections.
|
||||
transport: TTrans,
|
||||
}
|
||||
|
||||
enum NodeSocket<TTrans: Transport> {
|
||||
/// We're connected to the node. This is the normal state.
|
||||
Connected(NodeSocketConnected<TTrans>),
|
||||
/// We are currently dialing the node.
|
||||
Dialing(TTrans::Dial),
|
||||
/// A new connection should be started as soon as possible.
|
||||
ReconnectNow,
|
||||
/// Waiting before attempting to dial again.
|
||||
WaitingReconnect(Delay),
|
||||
/// Temporary transition state.
|
||||
Poisoned,
|
||||
}
|
||||
|
||||
struct NodeSocketConnected<TTrans: Transport> {
|
||||
/// Where to send data.
|
||||
sink: TTrans::Output,
|
||||
/// Queue of packets to send.
|
||||
pending: VecDeque<Vec<u8>>,
|
||||
/// If true, we need to flush the sink.
|
||||
need_flush: bool,
|
||||
/// A timeout for the socket to write data.
|
||||
timeout: Option<Delay>,
|
||||
}
|
||||
|
||||
/// Event that can happen with this node.
|
||||
#[derive(Debug)]
|
||||
pub enum NodeEvent<TSinkErr> {
|
||||
/// We are now connected to this node.
|
||||
Connected,
|
||||
/// We are now disconnected from this node.
|
||||
Disconnected(ConnectionError<TSinkErr>),
|
||||
}
|
||||
|
||||
/// Reason for disconnecting from a node.
|
||||
#[derive(Debug)]
|
||||
pub enum ConnectionError<TSinkErr> {
|
||||
/// The connection timed-out.
|
||||
Timeout,
|
||||
/// Reading from the socket returned and end-of-file, indicating that the socket has been
|
||||
/// closed.
|
||||
Closed,
|
||||
/// The sink errored.
|
||||
Sink(TSinkErr),
|
||||
}
|
||||
|
||||
impl<TTrans: Transport> Node<TTrans> {
|
||||
/// Builds a new node handler.
|
||||
pub fn new(transport: TTrans, addr: Multiaddr) -> Self {
|
||||
Node {
|
||||
addr,
|
||||
socket: NodeSocket::ReconnectNow,
|
||||
transport,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the address that was passed to `new`.
|
||||
pub fn addr(&self) -> &Multiaddr {
|
||||
&self.addr
|
||||
}
|
||||
}
|
||||
|
||||
impl<TTrans: Transport, TSinkErr> Node<TTrans>
|
||||
where TTrans: Clone + Unpin, TTrans::Dial: Unpin,
|
||||
TTrans::Output: Sink<Vec<u8>, Error = TSinkErr>
|
||||
+ Stream<Item=Result<Vec<u8>, TSinkErr>>
|
||||
+ Unpin,
|
||||
TSinkErr: fmt::Debug
|
||||
{
|
||||
/// Sends a WebSocket frame to the node. Returns an error if we are not connected to the node.
|
||||
///
|
||||
/// After calling this method, you should call `poll` in order for it to be properly processed.
|
||||
pub fn send_message(&mut self, payload: impl Into<Vec<u8>>) -> Result<(), ()> {
|
||||
if let NodeSocket::Connected(NodeSocketConnected { pending, .. }) = &mut self.socket {
|
||||
if pending.len() <= MAX_PENDING {
|
||||
trace!(target: "telemetry", "Adding log entry to queue for {:?}", self.addr);
|
||||
pending.push_back(payload.into());
|
||||
Ok(())
|
||||
} else {
|
||||
warn!(target: "telemetry", "⚠️ Rejected log entry because queue is full for {:?}",
|
||||
self.addr);
|
||||
Err(())
|
||||
}
|
||||
} else {
|
||||
Err(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Polls the node for updates. Must be performed regularly.
|
||||
pub fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<NodeEvent<TSinkErr>> {
|
||||
let mut socket = mem::replace(&mut self.socket, NodeSocket::Poisoned);
|
||||
self.socket = loop {
|
||||
match socket {
|
||||
NodeSocket::Connected(mut conn) => {
|
||||
match NodeSocketConnected::poll(Pin::new(&mut conn), cx, &self.addr) {
|
||||
Poll::Ready(Ok(v)) => match v {},
|
||||
Poll::Pending => {
|
||||
break NodeSocket::Connected(conn)
|
||||
},
|
||||
Poll::Ready(Err(err)) => {
|
||||
warn!(target: "telemetry", "⚠️ Disconnected from {}: {:?}", self.addr, err);
|
||||
let timeout = gen_rand_reconnect_delay();
|
||||
self.socket = NodeSocket::WaitingReconnect(timeout);
|
||||
return Poll::Ready(NodeEvent::Disconnected(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
NodeSocket::Dialing(mut s) => match Future::poll(Pin::new(&mut s), cx) {
|
||||
Poll::Ready(Ok(sink)) => {
|
||||
debug!(target: "telemetry", "✅ Connected to {}", self.addr);
|
||||
let conn = NodeSocketConnected {
|
||||
sink,
|
||||
pending: VecDeque::new(),
|
||||
need_flush: false,
|
||||
timeout: None,
|
||||
};
|
||||
self.socket = NodeSocket::Connected(conn);
|
||||
return Poll::Ready(NodeEvent::Connected)
|
||||
},
|
||||
Poll::Pending => break NodeSocket::Dialing(s),
|
||||
Poll::Ready(Err(err)) => {
|
||||
warn!(target: "telemetry", "❌ Error while dialing {}: {:?}", self.addr, err);
|
||||
let timeout = gen_rand_reconnect_delay();
|
||||
socket = NodeSocket::WaitingReconnect(timeout);
|
||||
}
|
||||
}
|
||||
NodeSocket::ReconnectNow => match self.transport.clone().dial(self.addr.clone()) {
|
||||
Ok(d) => {
|
||||
debug!(target: "telemetry", "Started dialing {}", self.addr);
|
||||
socket = NodeSocket::Dialing(d);
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(target: "telemetry", "❌ Error while dialing {}: {:?}", self.addr, err);
|
||||
let timeout = gen_rand_reconnect_delay();
|
||||
socket = NodeSocket::WaitingReconnect(timeout);
|
||||
}
|
||||
}
|
||||
NodeSocket::WaitingReconnect(mut s) =>
|
||||
if let Poll::Ready(_) = Future::poll(Pin::new(&mut s), cx) {
|
||||
socket = NodeSocket::ReconnectNow;
|
||||
} else {
|
||||
break NodeSocket::WaitingReconnect(s)
|
||||
}
|
||||
NodeSocket::Poisoned => {
|
||||
error!(target: "telemetry", "‼️ Poisoned connection with {}", self.addr);
|
||||
break NodeSocket::Poisoned
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
||||
/// Generates a `Delay` object with a random timeout.
|
||||
///
|
||||
/// If there are general connection issues, not all endpoints should be synchronized in their
|
||||
/// re-connection time.
|
||||
fn gen_rand_reconnect_delay() -> Delay {
|
||||
let random_delay = rand::thread_rng().gen_range(5, 10);
|
||||
Delay::new(Duration::from_secs(random_delay))
|
||||
}
|
||||
|
||||
impl<TTrans: Transport, TSinkErr> NodeSocketConnected<TTrans>
|
||||
where TTrans::Output: Sink<Vec<u8>, Error = TSinkErr>
|
||||
+ Stream<Item=Result<Vec<u8>, TSinkErr>>
|
||||
+ Unpin
|
||||
{
|
||||
/// Processes the queue of messages for the connected socket.
|
||||
///
|
||||
/// The address is passed for logging purposes only.
|
||||
fn poll(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context,
|
||||
my_addr: &Multiaddr,
|
||||
) -> Poll<Result<futures::never::Never, ConnectionError<TSinkErr>>> {
|
||||
|
||||
while let Some(item) = self.pending.pop_front() {
|
||||
if let Poll::Ready(result) = Sink::poll_ready(Pin::new(&mut self.sink), cx) {
|
||||
if let Err(err) = result {
|
||||
return Poll::Ready(Err(ConnectionError::Sink(err)))
|
||||
}
|
||||
|
||||
let item_len = item.len();
|
||||
if let Err(err) = Sink::start_send(Pin::new(&mut self.sink), item) {
|
||||
return Poll::Ready(Err(ConnectionError::Sink(err)))
|
||||
}
|
||||
trace!(
|
||||
target: "telemetry", "Successfully sent {:?} bytes message to {}",
|
||||
item_len, my_addr
|
||||
);
|
||||
self.need_flush = true;
|
||||
|
||||
} else {
|
||||
self.pending.push_front(item);
|
||||
if self.timeout.is_none() {
|
||||
self.timeout = Some(Delay::new(Duration::from_secs(10)));
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if self.need_flush {
|
||||
match Sink::poll_flush(Pin::new(&mut self.sink), cx) {
|
||||
Poll::Pending => {
|
||||
if self.timeout.is_none() {
|
||||
self.timeout = Some(Delay::new(Duration::from_secs(10)));
|
||||
}
|
||||
},
|
||||
Poll::Ready(Err(err)) => {
|
||||
self.timeout = None;
|
||||
return Poll::Ready(Err(ConnectionError::Sink(err)))
|
||||
},
|
||||
Poll::Ready(Ok(())) => {
|
||||
self.timeout = None;
|
||||
self.need_flush = false;
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(timeout) = self.timeout.as_mut() {
|
||||
match Future::poll(Pin::new(timeout), cx) {
|
||||
Poll::Pending => {},
|
||||
Poll::Ready(()) => {
|
||||
self.timeout = None;
|
||||
return Poll::Ready(Err(ConnectionError::Timeout))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
match Stream::poll_next(Pin::new(&mut self.sink), cx) {
|
||||
Poll::Ready(Some(Ok(_))) => {
|
||||
// We poll the telemetry `Stream` because the underlying implementation relies on
|
||||
// this in order to answer PINGs.
|
||||
// We don't do anything with incoming messages, however.
|
||||
},
|
||||
Poll::Ready(Some(Err(err))) => {
|
||||
return Poll::Ready(Err(ConnectionError::Sink(err)))
|
||||
},
|
||||
Poll::Ready(None) => {
|
||||
return Poll::Ready(Err(ConnectionError::Closed))
|
||||
},
|
||||
Poll::Pending => {},
|
||||
}
|
||||
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
||||
impl<TTrans: Transport> fmt::Debug for Node<TTrans> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
let state = match self.socket {
|
||||
NodeSocket::Connected(_) => "Connected",
|
||||
NodeSocket::Dialing(_) => "Dialing",
|
||||
NodeSocket::ReconnectNow => "Pending reconnect",
|
||||
NodeSocket::WaitingReconnect(_) => "Pending reconnect",
|
||||
NodeSocket::Poisoned => "Poisoned",
|
||||
};
|
||||
|
||||
f.debug_struct("Node")
|
||||
.field("addr", &self.addr)
|
||||
.field("state", &state)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user