Telemetry improvements (#1886)

* Fix typo

* Support multiple telemetry endpoints and verbosity levels

* Bump substrate-telemetry version

* Telemetrify Aura consensus

* Telemetrify Grandpa

* Fix CI version conflicts

* Implement style remarks

* Fix fixture

* Implement style remarks

* Clone only when necessary

* Get rid of Arc for URL

* Handle connection issues better
This commit is contained in:
Michael Müller
2019-02-28 12:22:05 +01:00
committed by Bastian Köcher
parent 8a72abffdd
commit 90e5c5ddfb
18 changed files with 285 additions and 66 deletions
+10 -4
View File
@@ -1899,6 +1899,7 @@ dependencies = [
"substrate-primitives 0.1.0",
"substrate-service 0.3.0",
"substrate-service-test 0.3.0",
"substrate-telemetry 0.3.1",
"substrate-transaction-pool 0.1.0",
"tokio 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)",
]
@@ -3644,7 +3645,7 @@ dependencies = [
"substrate-primitives 0.1.0",
"substrate-service 0.3.0",
"substrate-state-machine 0.1.0",
"substrate-telemetry 0.3.0",
"substrate-telemetry 0.3.1",
"sysinfo 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
"time 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -3676,7 +3677,7 @@ dependencies = [
"substrate-keyring 0.1.0",
"substrate-primitives 0.1.0",
"substrate-state-machine 0.1.0",
"substrate-telemetry 0.3.0",
"substrate-telemetry 0.3.1",
"substrate-test-client 0.1.0",
"substrate-trie 0.4.0",
]
@@ -3732,6 +3733,7 @@ dependencies = [
"substrate-network 0.1.0",
"substrate-primitives 0.1.0",
"substrate-service 0.3.0",
"substrate-telemetry 0.3.1",
"substrate-test-client 0.1.0",
"tokio 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)",
]
@@ -3853,6 +3855,7 @@ dependencies = [
"substrate-network 0.1.0",
"substrate-primitives 0.1.0",
"substrate-service 0.3.0",
"substrate-telemetry 0.3.1",
"substrate-test-client 0.1.0",
"tokio 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)",
]
@@ -4073,7 +4076,7 @@ dependencies = [
"substrate-network 0.1.0",
"substrate-primitives 0.1.0",
"substrate-rpc-servers 0.1.0",
"substrate-telemetry 0.3.0",
"substrate-telemetry 0.3.1",
"substrate-test-client 0.1.0",
"substrate-transaction-pool 0.1.0",
"target_info 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -4129,11 +4132,14 @@ dependencies = [
[[package]]
name = "substrate-telemetry"
version = "0.3.0"
version = "0.3.1"
dependencies = [
"lazy_static 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
"parking_lot 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.87 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_derive 1.0.87 (registry+https://github.com/rust-lang/crates.io-index)",
"slog 2.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
"slog-async 2.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"slog-json 2.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
+3 -2
View File
@@ -25,7 +25,7 @@ use tokio::timer::Interval;
use sysinfo::{get_current_pid, ProcessExt, System, SystemExt};
use network::{SyncState, SyncProvider};
use client::{backend::Backend, BlockchainEvents};
use substrate_telemetry::telemetry;
use substrate_telemetry::*;
use log::{debug, info, warn};
use runtime_primitives::generic::BlockId;
@@ -86,6 +86,7 @@ pub fn start<C>(service: &Service<C>, exit: ::exit_future::Exit, handle: TaskExe
} else { (0.0, 0) };
telemetry!(
SUBSTRATE_INFO;
"system.interval";
"status" => format!("{}{}", status, target),
"peers" => num_peers,
@@ -144,7 +145,7 @@ pub fn start<C>(service: &Service<C>, exit: ::exit_future::Exit, handle: TaskExe
let txpool = service.transaction_pool();
let display_txpool_import = txpool.import_notification_stream().for_each(move |_| {
let status = txpool.status();
telemetry!("txpool.import"; "ready" => status.ready, "future" => status.future);
telemetry!(SUBSTRATE_INFO; "txpool.import"; "ready" => status.ready, "future" => status.future);
Ok(())
});
+4 -3
View File
@@ -59,6 +59,7 @@ use log::info;
use lazy_static::lazy_static;
use futures::Future;
use substrate_telemetry::TelemetryEndpoints;
const MAX_NODE_NAME_LENGTH: usize = 32;
@@ -401,9 +402,9 @@ where
// Override telemetry
if cli.no_telemetry {
config.telemetry_url = None;
} else if let Some(url) = cli.telemetry_url {
config.telemetry_url = Some(url);
config.telemetry_endpoints = None;
} else if !cli.telemetry_endpoints.is_empty() {
config.telemetry_endpoints = Some(TelemetryEndpoints::new(cli.telemetry_endpoints));
}
Ok(config)
+20 -3
View File
@@ -173,9 +173,11 @@ pub struct RunCmd {
#[structopt(long = "no-telemetry")]
pub no_telemetry: bool,
/// The URL of the telemetry server to connect to
#[structopt(long = "telemetry-url", value_name = "TELEMETRY_URL")]
pub telemetry_url: Option<String>,
/// The URL of the telemetry server to connect to. This flag can be passed multiple times
/// as a mean to specify multiple telemetry endpoints. Verbosity levels range from 0-9, with
/// 0 denoting the least verbosity. If no verbosity level is specified the default is 0.
#[structopt(long = "telemetry-url", value_name = "URL VERBOSITY", parse(try_from_str = "parse_telemetry_endpoints"))]
pub telemetry_endpoints: Vec<(String, u8)>,
/// The means of execution used when calling into the runtime while syncing blocks.
#[structopt(
@@ -239,6 +241,21 @@ pub struct RunCmd {
pub pool_config: TransactionPoolParams,
}
/// Default to verbosity level 0, if none is provided.
fn parse_telemetry_endpoints(s: &str) -> Result<(String, u8), Box<std::error::Error>> {
let pos = s.find(' ');
match pos {
None => {
Ok((s.to_owned(), 0))
},
Some(pos_) => {
let verbosity = s[pos_ + 1..].parse()?;
let url = s[..pos_].parse()?;
Ok((url, verbosity))
}
}
}
impl_augment_clap!(RunCmd);
impl_get_log_filter!(RunCmd);
+3 -3
View File
@@ -61,7 +61,7 @@ use crate::in_mem;
use crate::block_builder::{self, api::BlockBuilder as BlockBuilderAPI};
use crate::genesis;
use consensus;
use substrate_telemetry::telemetry;
use substrate_telemetry::*;
use log::{info, trace, warn};
use error_chain::bail;
@@ -729,7 +729,7 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
fork_choice,
);
telemetry!("block.import";
telemetry!(SUBSTRATE_INFO; "block.import";
"height" => height,
"best" => ?hash,
"origin" => ?origin
@@ -859,7 +859,7 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
warn!(" Header {:?}", header);
warn!(" Native result {:?}", native_result);
warn!(" Wasm result {:?}", wasm_result);
telemetry!("block.execute.consensus_failure";
telemetry!(SUBSTRATE_INFO; "block.execute.consensus_failure";
"hash" => ?hash,
"origin" => ?origin,
"header" => ?header
+1
View File
@@ -18,6 +18,7 @@ aura_primitives = { package = "substrate-consensus-aura-primitives", path = "pri
inherents = { package = "substrate-inherents", path = "../../inherents" }
srml-consensus = { path = "../../../srml/consensus" }
srml-aura = { path = "../../../srml/aura" }
substrate-telemetry = { path = "../../telemetry" }
futures = "0.1.17"
tokio = "0.1.7"
parking_lot = "0.7.1"
+33 -1
View File
@@ -52,6 +52,7 @@ use srml_aura::{
InherentType as AuraInherent, AuraInherentData,
timestamp::{TimestampInherentData, InherentType as TimestampInherent, InherentError as TIError}
};
use substrate_telemetry::*;
use aura_slots::{CheckedHeader, SlotWorker, SlotInfo, SlotCompatible};
@@ -265,12 +266,18 @@ impl<B: Block, C, E, I, Error, SO> SlotWorker<B> for AuraWorker<C, E, I, SO> whe
chain_head.hash(),
e
);
telemetry!(CONSENSUS_WARN; "aura.unable_fetching_authorities";
"slot" => ?chain_head.hash(), "err" => ?e
);
return Box::new(future::ok(()));
}
};
if self.sync_oracle.is_offline() && authorities.len() > 1 {
debug!(target: "aura", "Skipping proposal slot. Waiting for the netork.");
debug!(target: "aura", "Skipping proposal slot. Waiting for the network.");
telemetry!(CONSENSUS_DEBUG; "aura.skipping_proposal_slot";
"authorities_len" => authorities.len()
);
return Box::new(future::ok(()));
}
@@ -282,12 +289,18 @@ impl<B: Block, C, E, I, Error, SO> SlotWorker<B> for AuraWorker<C, E, I, SO> whe
slot_num,
timestamp
);
telemetry!(CONSENSUS_DEBUG; "aura.starting_authorship";
"slot_num" => slot_num, "timestamp" => timestamp
);
// we are the slot author. make a block and sign it.
let proposer = match env.init(&chain_head, &authorities) {
Ok(p) => p,
Err(e) => {
warn!("Unable to author block in slot {:?}: {:?}", slot_num, e);
telemetry!(CONSENSUS_WARN; "aura.unable_authoring_block";
"slot" => slot_num, "err" => ?e
);
return Box::new(future::ok(()))
}
};
@@ -315,6 +328,9 @@ impl<B: Block, C, E, I, Error, SO> SlotWorker<B> for AuraWorker<C, E, I, SO> whe
"Discarding proposal for slot {}; block production took too long",
slot_num
);
telemetry!(CONSENSUS_INFO; "aura.discarding_proposal_took_too_long";
"slot" => slot_num
);
return
}
@@ -348,10 +364,18 @@ impl<B: Block, C, E, I, Error, SO> SlotWorker<B> for AuraWorker<C, E, I, SO> whe
import_block.post_header().hash(),
pre_hash
);
telemetry!(CONSENSUS_INFO; "aura.pre_sealed_block";
"header_num" => ?header_num,
"hash_now" => ?import_block.post_header().hash(),
"hash_previously" => ?pre_hash
);
if let Err(e) = block_import.import_block(import_block, None) {
warn!(target: "aura", "Error with block built on {:?}: {:?}",
parent_hash, e);
telemetry!(CONSENSUS_WARN; "aura.err_with_block_built_on";
"hash" => ?parent_hash, "err" => ?e
);
}
})
.map_err(|e| consensus_common::ErrorKind::ClientImport(format!("{:?}", e)).into())
@@ -456,6 +480,9 @@ impl<C, E> AuraVerifier<C, E>
"halting for block {} seconds in the future",
diff
);
telemetry!(CONSENSUS_INFO; "aura.halting_for_future_block";
"diff" => ?diff
);
thread::sleep(Duration::from_secs(diff));
Ok(())
},
@@ -504,6 +531,7 @@ impl<C, E> AuraVerifier<C, E>
"halting for block {} seconds in the future",
diff
);
telemetry!(CONSENSUS_INFO; "aura.halting_for_future_block"; "diff" => ?diff);
thread::sleep(Duration::from_secs(diff));
Ok(())
},
@@ -589,6 +617,7 @@ impl<B: Block, C, E> Verifier<B> for AuraVerifier<C, E> where
}
trace!(target: "aura", "Checked {:?}; importing.", pre_header);
telemetry!(CONSENSUS_TRACE; "aura.checked_and_importing"; "pre_header" => ?pre_header);
extra_verification.into_future().wait()?;
@@ -608,6 +637,9 @@ impl<B: Block, C, E> Verifier<B> for AuraVerifier<C, E> where
}
CheckedHeader::Deferred(a, b) => {
debug!(target: "aura", "Checking {:?} failed; {:?}, {:?}.", hash, a, b);
telemetry!(CONSENSUS_DEBUG; "aura.header_too_far_in_future";
"hash" => ?hash, "a" => ?a, "b" => ?b
);
Err(format!("Header {:?} rejected: too far in the future", hash))
}
}
@@ -16,6 +16,7 @@ parity-codec-derive = "3.0"
runtime_primitives = { package = "sr-primitives", path = "../sr-primitives" }
consensus_common = { package = "substrate-consensus-common", path = "../consensus/common" }
substrate-primitives = { path = "../primitives" }
substrate-telemetry = { path = "../telemetry" }
client = { package = "substrate-client", path = "../client" }
network = { package = "substrate-network", path = "../network" }
service = { package = "substrate-service", path = "../service", optional = true }
+34 -1
View File
@@ -69,6 +69,7 @@ use runtime_primitives::traits::{
use fg_primitives::GrandpaApi;
use runtime_primitives::generic::BlockId;
use substrate_primitives::{ed25519, H256, Ed25519AuthorityId, Blake2Hasher};
use substrate_telemetry::*;
use grandpa::Error as GrandpaError;
use grandpa::{voter, round::State as RoundState, BlockNumberOps, VoterSet};
@@ -296,20 +297,32 @@ impl<Block: BlockT> GossipValidator<Block> {
let rounds = self.rounds.read();
if set_id < rounds.set_id {
trace!(target: "afg", "Expired: Message with expired set_id {} (ours {})", set_id, rounds.set_id);
telemetry!(CONSENSUS_TRACE; "afg.expired_set_id";
"set_id" => ?set_id, "ours" => ?rounds.set_id
);
return true;
} else if set_id == rounds.set_id + 1 {
// allow a few first rounds of future set.
if round > MESSAGE_ROUND_TOLERANCE {
trace!(target: "afg", "Expired: Message too far in the future set, round {} (ours set_id {})", round, rounds.set_id);
telemetry!(CONSENSUS_TRACE; "afg.expired_msg_too_far_in_future_set";
"round" => ?round, "ours" => ?rounds.set_id
);
return true;
}
} else if set_id == rounds.set_id {
if round < rounds.min_live_round.saturating_sub(MESSAGE_ROUND_TOLERANCE) {
trace!(target: "afg", "Expired: Message round is out of bounds {} (ours {}-{})", round, rounds.min_live_round, rounds.max_round);
telemetry!(CONSENSUS_TRACE; "afg.msg_round_oob";
"round" => ?round, "our_min_live_round" => ?rounds.min_live_round, "our_max_round" => ?rounds.max_round
);
return true;
}
} else {
trace!(target: "afg", "Expired: Message in invalid future set {} (ours {})", set_id, rounds.set_id);
telemetry!(CONSENSUS_TRACE; "afg.expired_msg_in_invalid_future_set";
"set_id" => ?set_id, "ours" => ?rounds.set_id
);
return true;
}
false
@@ -330,6 +343,7 @@ impl<Block: BlockT> GossipValidator<Block> {
full.set_id
) {
debug!(target: "afg", "Bad message signature {}", full.message.id);
telemetry!(CONSENSUS_DEBUG; "afg.bad_msg_signature"; "signature" => ?full.message.id);
return network_gossip::ValidationResult::Invalid;
}
@@ -347,6 +361,11 @@ impl<Block: BlockT> GossipValidator<Block> {
if full.message.precommits.len() != full.message.auth_data.len() || full.message.precommits.is_empty() {
debug!(target: "afg", "Malformed compact commit");
telemetry!(CONSENSUS_DEBUG; "afg.malformed_compact_commit";
"precommits_len" => ?full.message.precommits.len(),
"auth_data_len" => ?full.message.auth_data.len(),
"precommits_is_empty" => ?full.message.precommits.is_empty(),
);
return network_gossip::ValidationResult::Invalid;
}
@@ -360,6 +379,7 @@ impl<Block: BlockT> GossipValidator<Block> {
full.set_id,
) {
debug!(target: "afg", "Bad commit message signature {}", id);
telemetry!(CONSENSUS_DEBUG; "afg.bad_commit_msg_signature"; "id" => ?id);
return network_gossip::ValidationResult::Invalid;
}
}
@@ -376,6 +396,7 @@ impl<Block: BlockT> network_gossip::Validator<Block::Hash> for GossipValidator<B
Some(GossipMessage::Commit(message)) => self.validate_commit_message(message),
None => {
debug!(target: "afg", "Error decoding message");
telemetry!(CONSENSUS_DEBUG; "afg.err_decoding_msg"; "" => "");
network_gossip::ValidationResult::Invalid
}
}
@@ -493,6 +514,9 @@ impl<B: BlockT, S: network::specialization::NetworkSpecialization<B>,> Network<B
fn announce(&self, round: u64, _set_id: u64, block: B::Hash) {
debug!(target: "afg", "Announcing block {} to peers which we voted on in round {}", block, round);
telemetry!(CONSENSUS_DEBUG; "afg.announcing_blocks_to_voted_peers";
"block" => ?block, "round" => ?round
);
self.service.announce_block(block)
}
}
@@ -550,6 +574,9 @@ pub fn block_import<B, E, Block: BlockT<Hash=H256>, RA, PRA>(
// are unsupported for following GRANDPA directly.
let genesis_authorities = api.runtime_api()
.grandpa_authorities(&BlockId::number(Zero::zero()))?;
telemetry!(CONSENSUS_DEBUG; "afg.loading_authorities";
"authorities_len" => ?genesis_authorities.len()
);
let authority_set = SharedAuthoritySet::genesis(genesis_authorities);
let encoded = authority_set.inner().read().encode();
@@ -703,6 +730,9 @@ pub fn run_grandpa<B, E, Block: BlockT<Hash=H256>, N, RA>(
let voter_work = future::loop_fn(initial_state, move |params| {
let (env, last_round_number, last_state, authority_set_change) = params;
debug!(target: "afg", "{}: Starting new voter with set ID {}", config.name(), env.set_id);
telemetry!(CONSENSUS_DEBUG; "afg.starting_new_voter";
"name" => ?config.name(), "set_id" => ?env.set_id
);
let chain_info = match client.info() {
Ok(i) => i,
@@ -790,7 +820,10 @@ pub fn run_grandpa<B, E, Block: BlockT<Hash=H256>, N, RA>(
let voter_work = voter_work
.join(broadcast_worker)
.map(|((), ())| ())
.map_err(|e| warn!("GRANDPA Voter failed: {:?}", e));
.map_err(|e| {
warn!("GRANDPA Voter failed: {:?}", e);
telemetry!(CONSENSUS_WARN; "afg.voter_failed"; "e" => ?e);
});
Ok(voter_work.select(on_exit).then(|_| Ok(())))
}
+6 -5
View File
@@ -25,6 +25,7 @@ use runtime_primitives::{BuildStorage, StorageOverlay, ChildrenStorageOverlay};
use serde_json as json;
use crate::components::RuntimeGenesis;
use network::Multiaddr;
use tel::TelemetryEndpoints;
enum GenesisSource<G> {
File(PathBuf),
@@ -87,7 +88,7 @@ struct ChainSpecFile {
pub name: String,
pub id: String,
pub boot_nodes: Vec<String>,
pub telemetry_url: Option<String>,
pub telemetry_endpoints: Option<TelemetryEndpoints>,
pub protocol_id: Option<String>,
pub consensus_engine: Option<String>,
pub properties: Option<Properties>,
@@ -124,8 +125,8 @@ impl<G: RuntimeGenesis> ChainSpec<G> {
&self.spec.id
}
pub fn telemetry_url(&self) -> Option<&str> {
self.spec.telemetry_url.as_ref().map(String::as_str)
pub fn telemetry_endpoints(&self) -> &Option<TelemetryEndpoints> {
&self.spec.telemetry_endpoints
}
pub fn protocol_id(&self) -> Option<&str> {
@@ -170,7 +171,7 @@ impl<G: RuntimeGenesis> ChainSpec<G> {
id: &str,
constructor: fn() -> G,
boot_nodes: Vec<String>,
telemetry_url: Option<&str>,
telemetry_endpoints: Option<TelemetryEndpoints>,
protocol_id: Option<&str>,
consensus_engine: Option<&str>,
properties: Option<Properties>,
@@ -180,7 +181,7 @@ impl<G: RuntimeGenesis> ChainSpec<G> {
name: name.to_owned(),
id: id.to_owned(),
boot_nodes: boot_nodes,
telemetry_url: telemetry_url.map(str::to_owned),
telemetry_endpoints,
protocol_id: protocol_id.map(str::to_owned),
consensus_engine: consensus_engine.map(str::to_owned),
properties,
+6 -3
View File
@@ -25,6 +25,7 @@ pub use network::config::{NetworkConfiguration, Roles};
use runtime_primitives::BuildStorage;
use serde::{Serialize, de::DeserializeOwned};
use target_info::Target;
use tel::TelemetryEndpoints;
/// Service configuration.
#[derive(Clone)]
@@ -64,7 +65,7 @@ pub struct Configuration<C, G: Serialize + DeserializeOwned + BuildStorage> {
/// RPC over Websockets binding address. `None` if disabled.
pub rpc_ws: Option<SocketAddr>,
/// Telemetry service URL. `None` if disabled.
pub telemetry_url: Option<String>,
pub telemetry_endpoints: Option<TelemetryEndpoints>,
/// The default number of 64KB pages to allocate for Wasm execution
pub default_heap_pages: Option<u64>,
}
@@ -90,11 +91,13 @@ impl<C: Default, G: Serialize + DeserializeOwned + BuildStorage> Configuration<C
execution_strategies: Default::default(),
rpc_http: None,
rpc_ws: None,
telemetry_url: None,
telemetry_endpoints: None,
default_heap_pages: None,
};
configuration.network.boot_nodes = configuration.chain_spec.boot_nodes().to_vec();
configuration.telemetry_url = configuration.chain_spec.telemetry_url().map(str::to_owned);
configuration.telemetry_endpoints = configuration.chain_spec.telemetry_endpoints().clone();
configuration
}
+5 -5
View File
@@ -41,7 +41,7 @@ use exit_future::Signal;
pub use tokio::runtime::TaskExecutor;
use substrate_executor::NativeExecutor;
use parity_codec::{Encode, Decode};
use tel::telemetry;
use tel::*;
pub use self::error::{ErrorKind, Error};
pub use config::{Configuration, Roles, PruningMode};
@@ -128,7 +128,7 @@ impl<Components: components::Components> Service<Components> {
let version = config.full_version();
info!("Best block: #{}", best_header.number());
telemetry!("node.start"; "height" => best_header.number().as_(), "best" => ?best_header.hash());
telemetry!(SUBSTRATE_INFO; "node.start"; "height" => best_header.number().as_(), "best" => ?best_header.hash());
let network_protocol = <Components::Factory>::build_network_protocol(&config)?;
let transaction_pool = Arc::new(
@@ -269,7 +269,7 @@ impl<Components: components::Components> Service<Components> {
)?;
// Telemetry
let telemetry = config.telemetry_url.clone().map(|url| {
let telemetry = config.telemetry_endpoints.clone().map(|endpoints| {
let is_authority = config.roles == Roles::AUTHORITY;
let network_id = network.local_peer_id().to_base58();
let pubkey = format!("{}", public_key);
@@ -278,9 +278,9 @@ impl<Components: components::Components> Service<Components> {
let version = version.clone();
let chain_name = config.chain_spec.name().to_owned();
Arc::new(tel::init_telemetry(tel::TelemetryConfig {
url: url,
endpoints,
on_connect: Box::new(move || {
telemetry!("system.connected";
telemetry!(SUBSTRATE_INFO; "system.connected";
"name" => name.clone(),
"implementation" => impl_name.clone(),
"version" => version.clone(),
+1 -1
View File
@@ -120,7 +120,7 @@ fn node_config<F: ServiceFactory> (
execution_strategies: Default::default(),
rpc_http: None,
rpc_ws: None,
telemetry_url: None,
telemetry_endpoints: None,
default_heap_pages: None,
}
}
+4 -1
View File
@@ -1,6 +1,6 @@
[package]
name = "substrate-telemetry"
version = "0.3.0"
version = "0.3.1"
authors = ["Parity Technologies <admin@parity.io>"]
description = "Telemetry utils"
edition = "2018"
@@ -9,6 +9,9 @@ edition = "2018"
parking_lot = "0.7.1"
lazy_static = "1.0"
log = "0.4"
rand = "0.6"
serde = "1.0.81"
serde_derive = "1.0"
slog = "^2"
slog-json = "^2"
slog-async = "^2"
+148 -32
View File
@@ -26,15 +26,21 @@ use std::sync::Arc;
use parking_lot::Mutex;
use slog::{Drain, o};
use log::trace;
use rand::{thread_rng, Rng};
pub use slog_scope::with_logger;
pub use slog;
use serde_derive::{Serialize, Deserialize};
use slog::OwnedKVList;
use slog::Record;
use core::result;
/// Configuration for telemetry.
pub struct TelemetryConfig {
/// URL of the telemetry WebSocket server.
pub url: String,
/// What do do when we connect to the server.
pub on_connect: Box<Fn() + Send + 'static>,
/// Collection of telemetry WebSocket servers with a corresponding verbosity level.
pub endpoints: TelemetryEndpoints,
/// What do do when we connect to the servers.
/// Note that this closure is executed each time we connect to a telemetry endpoint.
pub on_connect: Box<Fn() + Send + Sync + 'static>,
}
/// Telemetry service guard.
@@ -43,66 +49,165 @@ pub type Telemetry = slog_scope::GlobalLoggerGuard;
/// Size of the channel for passing messages to telemetry thread.
const CHANNEL_SIZE: usize = 262144;
/// Log levels.
pub const SUBSTRATE_DEBUG: &str = "9";
pub const SUBSTRATE_INFO: &str = "0";
pub const CONSENSUS_TRACE: &str = "9";
pub const CONSENSUS_DEBUG: &str = "5";
pub const CONSENSUS_WARN: &str = "4";
pub const CONSENSUS_INFO: &str = "3";
/// Multiply logging to all drains. This is similar to `slog::Duplicate`, which is
/// limited to two drains though and doesn't support dynamic nesting at runtime.
#[derive(Debug, Clone)]
pub struct Multiply<D: Drain> (pub Vec<Box<D>>);
impl<D: Drain> Multiply<D> {
pub fn new(v: Vec<Box<D>>) -> Self {
Multiply(v)
}
}
impl<D: Drain> Drain for Multiply<D> {
type Ok = Vec<D::Ok>;
type Err = Vec<D::Err>;
fn log(&self, record: &Record, logger_values: &OwnedKVList) -> result::Result<Self::Ok, Self::Err> {
let mut oks = Vec::new();
let mut errs = Vec::new();
self.0.iter().for_each(|l| {
let res: Result<<D as Drain>::Ok, <D as Drain>::Err> = (*l).log(record, logger_values);
match res {
Ok(o) => oks.push(o),
Err(e) => errs.push(e),
}
});
if !errs.is_empty() {
result::Result::Err(errs)
} else {
result::Result::Ok(oks)
}
}
}
/// Initialise telemetry.
pub fn init_telemetry(config: TelemetryConfig) -> slog_scope::GlobalLoggerGuard {
let writer = TelemetryWriter::new();
let out_sync = writer.out.clone();
let log = slog::Logger::root(
slog_async::Async::new(
slog_json::Json::default(writer).fuse()
).chan_size(CHANNEL_SIZE)
let mut endpoint_drains: Vec<Box<slog::Filter<_, _>>> = Vec::new();
let mut out_syncs = Vec::new();
// Set up a filter/drain for each endpoint
config.endpoints.0.iter().for_each(|(url, verbosity)| {
let writer = TelemetryWriter::new(Arc::new(url.to_owned()));
let out_sync = writer.out.clone();
out_syncs.push(out_sync);
let until_verbosity = *verbosity;
let filter = slog::Filter(
slog_json::Json::default(writer).fuse(),
move |rec| {
let tag = rec.tag().parse::<u8>()
.expect("`telemetry!` macro requires tag.");
tag <= until_verbosity
});
let filter = Box::new(filter) as Box<slog::Filter<_, _>>;
endpoint_drains.push(filter);
});
// Set up logging to all endpoints
let drain = slog_async::Async::new(Multiply::new(endpoint_drains).fuse());
let root = slog::Logger::root(drain.chan_size(CHANNEL_SIZE)
.overflow_strategy(slog_async::OverflowStrategy::DropAndReport)
.build().fuse(), o!()
);
let logger_guard = slog_scope::set_global_logger(log);
let logger_guard = slog_scope::set_global_logger(root);
thread::spawn(move || {
loop {
trace!(target: "telemetry", "Connecting to Telemetry... {:?}", config.url);
let _ = ws::connect(config.url.as_str(), |out| Connection::new(out, &*out_sync, &config));
// Spawn a thread for each endpoint
let on_connect = Arc::new(config.on_connect);
config.endpoints.0.into_iter().for_each(|(url, verbosity)| {
let inner_verbosity = Arc::new(verbosity.to_owned());
let inner_on_connect = Arc::clone(&on_connect);
thread::sleep(time::Duration::from_millis(5000));
}
let out_sync = out_syncs.remove(0);
let out_sync = Arc::clone(&out_sync);
thread::spawn(move || {
loop {
let on_connect = Arc::clone(&inner_on_connect);
let out_sync = Arc::clone(&out_sync);
let verbosity = Arc::clone(&inner_verbosity);
trace!(target: "telemetry",
"Connecting to Telemetry at {} with verbosity {}", url, Arc::clone(&verbosity));
let _ = ws::connect(url.to_owned(),
|out| {
Connection::new(out, Arc::clone(&out_sync), Arc::clone(&on_connect), url.clone())
});
// Sleep for a random time between 5-10 secs. If there are general connection
// issues not all threads should be synchronized in their re-connection time.
let random_sleep = thread_rng().gen_range(0, 5);
thread::sleep(time::Duration::from_secs(5) + time::Duration::from_secs(random_sleep));
}
});
});
return logger_guard;
}
/// Exactly equivalent to `slog_scope::info`, provided as a convenience.
/// Translates to `slog_scope::info`, but contains an additional verbosity
/// parameter which the log record is tagged with. Additionally the verbosity
/// parameter is added to the record as a key-value pair.
#[macro_export]
macro_rules! telemetry {
( $($t:tt)* ) => { $crate::with_logger(|l| $crate::slog::slog_info!(l, $($t)* )) }
( $a:expr; $b:expr; $( $t:tt )* ) => {
$crate::with_logger(|l| {
$crate::slog::slog_info!(l, #$a, $b; "verbosity" => stringify!($a), $($t)* )
})
}
}
struct Connection<'a> {
struct Connection {
out: ws::Sender,
out_sync: &'a Mutex<Option<ws::Sender>>,
config: &'a TelemetryConfig,
out_sync: Arc<Mutex<Option<ws::Sender>>>,
on_connect: Arc<Box<Fn() + Send + Sync + 'static>>,
url: String,
}
impl<'a> Connection<'a> {
fn new(out: ws::Sender, out_sync: &'a Mutex<Option<ws::Sender>>, config: &'a TelemetryConfig) -> Self {
impl Connection {
fn new(
out: ws::Sender,
out_sync: Arc<Mutex<Option<ws::Sender>>>,
on_connect: Arc<Box<Fn() + Send + Sync + 'static>>,
url: String
) -> Self {
Connection {
out,
out_sync,
config,
on_connect,
url,
}
}
}
impl<'a> ws::Handler for Connection<'a> {
impl ws::Handler for Connection {
fn on_open(&mut self, _: ws::Handshake) -> ws::Result<()> {
trace!(target: "telemetry", "Connected!");
trace!(target: "telemetry", "Connected to {}!", self.url);
*self.out_sync.lock() = Some(self.out.clone());
(self.config.on_connect)();
(self.on_connect)();
Ok(())
}
fn on_close(&mut self, code: ws::CloseCode, reason: &str) {
*self.out_sync.lock() = None;
trace!(target: "telemetry", "Connection closing due to ({:?}) {}", code, reason);
trace!(target: "telemetry", "Connection to {} closing due to ({:?}) {}",
self.url, code, reason);
}
fn on_error(&mut self, _: ws::Error) {
@@ -117,15 +222,17 @@ impl<'a> ws::Handler for Connection<'a> {
struct TelemetryWriter {
buffer: Vec<u8>,
out: Arc<Mutex<Option<ws::Sender>>>,
url: Arc<String>,
}
impl TelemetryWriter {
fn new() -> Self {
fn new(url: Arc<String>) -> Self {
let out = Arc::new(Mutex::new(None));
TelemetryWriter {
buffer: Vec::new(),
out,
url,
}
}
}
@@ -155,11 +262,11 @@ impl io::Write for TelemetryWriter {
let error = if let Some(ref mut o) = *out {
let r = o.send(s);
trace!(target: "telemetry", "Sent to telemetry: {} -> {:?}", s, r);
trace!(target: "telemetry", "Sent to telemetry {}: {} -> {:?}", self.url, s, r);
r.is_err()
} else {
trace!(target: "telemetry", "Telemetry socket closed, failed to send: {}", s);
trace!(target: "telemetry", "Telemetry socket closed to {}, failed to send: {}", self.url, s);
false
};
@@ -171,3 +278,12 @@ impl io::Write for TelemetryWriter {
Ok(())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TelemetryEndpoints (Vec<(String, u8)>);
impl TelemetryEndpoints {
pub fn new(endpoints: Vec<(String, u8)>) -> Self {
TelemetryEndpoints(endpoints)
}
}
+1
View File
@@ -30,6 +30,7 @@ grandpa = { package = "substrate-finality-grandpa", path = "../../core/finality-
sr-primitives = { path = "../../core/sr-primitives" }
node-executor = { path = "../executor" }
substrate-keystore = { path = "../../core/keystore" }
substrate-telemetry = { package = "substrate-telemetry", path = "../../core/telemetry" }
[dev-dependencies]
service-test = { package = "substrate-service-test", path = "../../core/service/test" }
+3 -1
View File
@@ -11,7 +11,9 @@
"/ip4/104.211.48.247/tcp/30333/p2p/QmV2zjgFRfxbgYZQC9qFr4aHsQt7tDBJRAdgqqxqTq1Kta",
"/ip4/40.114.120.164/tcp/30333/p2p/QmQbPCeurXuKhzCw6Ar6ovizNKATMTnkkqFJKgZzbF2MJs"
],
"telemetryUrl": "wss://telemetry.polkadot.io/submit/",
"telemetryEndpoints": [
["wss://telemetry.polkadot.io/submit/", 0]
],
"protocolId": null,
"consensusEngine": null,
"genesis": {
+2 -1
View File
@@ -26,6 +26,7 @@ use substrate_service;
use hex_literal::{hex, hex_impl};
use substrate_keystore::pad_seed;
use substrate_telemetry::TelemetryEndpoints;
const STAGING_TELEMETRY_URL: &str = "wss://telemetry.polkadot.io/submit/";
@@ -153,7 +154,7 @@ pub fn staging_testnet_config() -> ChainSpec {
"staging_testnet",
staging_testnet_config_genesis,
boot_nodes,
Some(STAGING_TELEMETRY_URL.into()),
Some(TelemetryEndpoints::new(vec![(STAGING_TELEMETRY_URL.to_string(), 0)])),
None,
None,
None,