mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-07 02:38:02 +00:00
87776e63bb
* Send high-level consensus telemetry by default * Notify telemetry on finalized * Send used authority set to telemetry * Do not send commit message telemetry by default * Fix typo * Allow for notifications on telemetry connect ...and send the current authority set on each connect. * Send authority set to telemetry on change * Revert "Send used authority set to telemetry" This reverts commit 1deceead52bb7443a02879ac8138afad9a6ca5ff. * Merge branch 'master' into 'cmichi-send-high-level-consensus-telemetry-by-default' Squashed commit of the following: commit19d77cbc23Author: Xiliang Chen <xlchen1291@gmail.com> Date: Wed Apr 10 20:26:29 2019 +1200 update authers for rest of the node-template cargo.toml files (#2242) commit0afc357a97Author: Bastian Köcher <bkchr@users.noreply.github.com> Date: Tue Apr 9 10:31:18 2019 +0200 Throw a compile error for `on_finalise` and `on_initialise` (#2236) commite57e54ab9cAuthor: Pierre Krieger <pierre.krieger1708@gmail.com> Date: Tue Apr 9 05:30:43 2019 -0300 Add warning when using default protocol ID (#2234) * Add warning when using default protocol ID * Update core/service/src/lib.rs commitcb766e5f5dAuthor: Xiliang Chen <xlchen1291@gmail.com> Date: Tue Apr 9 17:22:20 2019 +1200 update name and authors to placeholder text for node-template (#2222) * update name and authors to placeholder text * revert package name change commita1e15ae55aAuthor: André Silva <andre.beat@gmail.com> Date: Mon Apr 8 12:50:34 2019 +0100 grandpa: Voter persistence and upgrade to finality-grandpa v0.7 (#2139) * core: grandpa: migrate to grandpa 0.7 * core: grandpa: store current round votes and load them on startup * core: grandpa: resend old persisted votes for the current round * core: grandpa: store base and votes for last completed round * core: grandpa: fix latest grandpa 0.7 changes * core: grandpa: update to grandpa 0.7.1 * core: grandpa: persist votes for last two completed rounds * core: grandpa: simplify VoterSetState usage * core: grandpa: use Environment::update_voter_set_state * core: grandpa: fix aux_schema test * core: grandpa: add docs * core: grandpa: add note about environment assumption * core: grandpa: don't update voter set state on ignored votes * core: grandpa: add test for v1 -> v2 aux_schema migration * core: grandpa: add test for voter vote persistence * core: grandpa: use grandpa 0.7.1 from crates.io * core: grandpa: use try_init in test * core: grandpa: add comment about block_import in test * core: grandpa: avoid cloning HasVoted * core: grandpa: add missing docs * core: grandpa: cleanup up can_propose/prevote/precommit commited3ae4ac39Author: Gregory Terzian <2792687+gterzian@users.noreply.github.com> Date: Mon Apr 8 13:17:00 2019 +0200 remove clone bound on specialization in testnet factory (#2157) commit03f3fb1442Author: Andrew Jones <ascjones@gmail.com> Date: Sat Apr 6 12:23:56 2019 +0100 Contract import/export validation (#2203) * Reject validation of contract with unknown exports * Validate imports eagerly * Increment spec version commitdecddaab0fAuthor: Pierre Krieger <pierre.krieger1708@gmail.com> Date: Fri Apr 5 14:07:09 2019 -0300 Fix state inconsistency between handler and behaviour (#2220) * Fix state inconsistency between handler and behaviour * Fix the error! being in the wrong place commitdce0b4ea49Author: Bastian Köcher <bkchr@users.noreply.github.com> Date: Fri Apr 5 18:50:38 2019 +0200 Use `storage_root` of newly calculated header (#2216) Instead of calculating the `storage_root` a second time, we just can take the `storage_root` from the new header. commitb01136c90dAuthor: Marek Kotewicz <marek.kotewicz@gmail.com> Date: Fri Apr 5 14:44:46 2019 +0200 Peerset::discovered accepts many peer ids (#2213) * Peerset::discovered accepts many peer ids * Improve tracing in peerset commit1142bcde97Author: Marek Kotewicz <marek.kotewicz@gmail.com> Date: Thu Apr 4 19:40:40 2019 +0200 simplification of peerset api (#2123) * Introduction of PeersetHandle * integrate PeersetHandle with the rest of the codebase * fix compilation errors * more tests for peerset, fixed overwriting bug in add_reserved_peer * Slots data structure and bugfixes for peerset * bend to pressure * updated lru-cache to 0.1.2 and updated linked-hash-map to 0.5.2 * peerset discovered list is now a LinkedHashMap * fix review suggestions * split back Peerset and PeersetHandle * test for Peerset::discovered * applied review suggestions * fixes to peerset::incoming * peerset disconnects are all instantaneous * instantaneous drop in peerset finished * Peerset::set_reserved_only can also reconnect nodes * Peerset scores cache uses lru-cache * remove redundant function call and comment from Peerset::on_set_reserved_only * add_peer returns SlotState enum * apply review suggestions * is_reserved -> is_connected_and_reserved commit301844dd56Author: Arkadiy Paronyan <arkady.paronyan@gmail.com> Date: Thu Apr 4 18:01:28 2019 +0200 Disconnect on protocol timeout (#2212) commitcb3c912b1aAuthor: André Silva <andre.beat@gmail.com> Date: Thu Apr 4 15:56:49 2019 +0100 core: grandpa: verify commit target in justification (#2201) commit6920b169cdAuthor: Bastian Köcher <bkchr@users.noreply.github.com> Date: Thu Apr 4 16:56:16 2019 +0200 Introduce `original_storage` and `original_storage_hash` (#2211) Both functions will ignore any overlayed changes and access the backend directly. commitcb7a8161f5Author: Xiliang Chen <xlchen1291@gmail.com> Date: Fri Apr 5 03:55:55 2019 +1300 code cleanup (#2206) commitacaf1fe625Author: Arkadiy Paronyan <arkady.paronyan@gmail.com> Date: Wed Apr 3 15:52:46 2019 +0200 Emberic elm testnet (#2197) * Make telemetry onconnect hoook optional * Merge branch 'master' into 'cmichi-send-high-level-consensus-telemetry-by-default' * Introduce GrandpaParams struct to condense parameters * Remove debug statement * Fix tests * Rename parameter * Fix tests * Rename struct * Do not send verbosity level * Combine imports * Implement comments * Run cargo build --all * Remove noisy telemetry * Add docs for public items * Unbox and support Clone trait * Fix merge * Fix merge * Update core/finality-grandpa/src/lib.rs Co-Authored-By: cmichi <mich@elmueller.net>
288 lines
8.0 KiB
Rust
288 lines
8.0 KiB
Rust
// Copyright 2017-2019 Parity Technologies (UK) Ltd.
|
|
// This file is part of Substrate.
|
|
|
|
// Substrate is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
|
|
// Substrate is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU General Public License for more details.
|
|
|
|
// You should have received a copy of the GNU General Public License
|
|
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
//! Telemetry utils.
|
|
//!
|
|
//! `telemetry` macro may be used anywhere in the Substrate codebase
|
|
//! in order to send real-time logging information to the telemetry
|
|
//! server (if there is one). We use the async drain adapter of `slog`
|
|
//! so that the logging thread doesn't get held up at all.
|
|
|
|
use std::{io, time, thread};
|
|
use std::sync::Arc;
|
|
use parking_lot::Mutex;
|
|
use slog::{Drain, o, OwnedKVList, Record};
|
|
use log::trace;
|
|
use rand::{thread_rng, Rng};
|
|
pub use slog_scope::with_logger;
|
|
pub use slog;
|
|
use serde_derive::{Serialize, Deserialize};
|
|
use core::result;
|
|
|
|
/// Configuration for telemetry.
|
|
pub struct TelemetryConfig {
|
|
/// Collection of telemetry WebSocket servers with a corresponding verbosity level.
|
|
pub endpoints: TelemetryEndpoints,
|
|
/// What do do when we connect to the servers.
|
|
/// Note that this closure is executed each time we connect to a telemetry endpoint.
|
|
pub on_connect: Box<Fn() + Send + Sync + 'static>,
|
|
}
|
|
|
|
/// Telemetry service guard.
|
|
pub type Telemetry = slog_scope::GlobalLoggerGuard;
|
|
|
|
/// Size of the channel for passing messages to telemetry thread.
|
|
const CHANNEL_SIZE: usize = 262144;
|
|
|
|
/// Log levels.
|
|
pub const SUBSTRATE_DEBUG: &str = "9";
|
|
pub const SUBSTRATE_INFO: &str = "0";
|
|
|
|
pub const CONSENSUS_TRACE: &str = "9";
|
|
pub const CONSENSUS_DEBUG: &str = "5";
|
|
pub const CONSENSUS_WARN: &str = "4";
|
|
pub const CONSENSUS_INFO: &str = "0";
|
|
|
|
/// Multiply logging to all drains. This is similar to `slog::Duplicate`, which is
|
|
/// limited to two drains though and doesn't support dynamic nesting at runtime.
|
|
#[derive(Debug, Clone)]
|
|
pub struct Multiply<D: Drain> (pub Vec<Box<D>>);
|
|
|
|
impl<D: Drain> Multiply<D> {
|
|
pub fn new(v: Vec<Box<D>>) -> Self {
|
|
Multiply(v)
|
|
}
|
|
}
|
|
|
|
impl<D: Drain> Drain for Multiply<D> {
|
|
type Ok = Vec<D::Ok>;
|
|
type Err = Vec<D::Err>;
|
|
|
|
fn log(&self, record: &Record, logger_values: &OwnedKVList) -> result::Result<Self::Ok, Self::Err> {
|
|
let mut oks = Vec::new();
|
|
let mut errs = Vec::new();
|
|
|
|
self.0.iter().for_each(|l| {
|
|
let res: Result<<D as Drain>::Ok, <D as Drain>::Err> = (*l).log(record, logger_values);
|
|
match res {
|
|
Ok(o) => oks.push(o),
|
|
Err(e) => errs.push(e),
|
|
}
|
|
});
|
|
|
|
if !errs.is_empty() {
|
|
result::Result::Err(errs)
|
|
} else {
|
|
result::Result::Ok(oks)
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Initialize telemetry.
|
|
pub fn init_telemetry(config: TelemetryConfig) -> slog_scope::GlobalLoggerGuard {
|
|
let mut endpoint_drains: Vec<Box<slog::Filter<_, _>>> = Vec::new();
|
|
let mut out_syncs = Vec::new();
|
|
|
|
// Set up a filter/drain for each endpoint
|
|
config.endpoints.0.iter().for_each(|(url, verbosity)| {
|
|
let writer = TelemetryWriter::new(Arc::new(url.to_owned()));
|
|
let out_sync = writer.out.clone();
|
|
out_syncs.push(out_sync);
|
|
|
|
let until_verbosity = *verbosity;
|
|
let filter = slog::Filter(
|
|
slog_json::Json::default(writer).fuse(),
|
|
move |rec| {
|
|
let tag = rec.tag().parse::<u8>()
|
|
.expect("`telemetry!` macro requires tag.");
|
|
tag <= until_verbosity
|
|
});
|
|
|
|
let filter = Box::new(filter) as Box<slog::Filter<_, _>>;
|
|
endpoint_drains.push(filter);
|
|
});
|
|
|
|
// Set up logging to all endpoints
|
|
let drain = slog_async::Async::new(Multiply::new(endpoint_drains).fuse());
|
|
let root = slog::Logger::root(drain.chan_size(CHANNEL_SIZE)
|
|
.overflow_strategy(slog_async::OverflowStrategy::DropAndReport)
|
|
.build().fuse(), o!()
|
|
);
|
|
let logger_guard = slog_scope::set_global_logger(root);
|
|
|
|
// Spawn a thread for each endpoint
|
|
let on_connect = Arc::new(config.on_connect);
|
|
config.endpoints.0.into_iter().for_each(|(url, verbosity)| {
|
|
let inner_verbosity = Arc::new(verbosity.to_owned());
|
|
let inner_on_connect = Arc::clone(&on_connect);
|
|
|
|
let out_sync = out_syncs.remove(0);
|
|
let out_sync = Arc::clone(&out_sync);
|
|
|
|
thread::spawn(move || {
|
|
loop {
|
|
let on_connect = Arc::clone(&inner_on_connect);
|
|
let out_sync = Arc::clone(&out_sync);
|
|
let verbosity = Arc::clone(&inner_verbosity);
|
|
|
|
trace!(target: "telemetry",
|
|
"Connecting to Telemetry at {} with verbosity {}", url, Arc::clone(&verbosity));
|
|
|
|
let _ = ws::connect(url.to_owned(),
|
|
|out| {
|
|
Connection::new(out, Arc::clone(&out_sync), Arc::clone(&on_connect), url.clone())
|
|
});
|
|
|
|
// Sleep for a random time between 5-10 secs. If there are general connection
|
|
// issues not all threads should be synchronized in their re-connection time.
|
|
let random_sleep = thread_rng().gen_range(0, 5);
|
|
thread::sleep(time::Duration::from_secs(5) + time::Duration::from_secs(random_sleep));
|
|
}
|
|
});
|
|
});
|
|
|
|
return logger_guard;
|
|
}
|
|
|
|
/// Translates to `slog_scope::info`, but contains an additional verbosity
|
|
/// parameter which the log record is tagged with. Additionally the verbosity
|
|
/// parameter is added to the record as a key-value pair.
|
|
#[macro_export]
|
|
macro_rules! telemetry {
|
|
( $a:expr; $b:expr; $( $t:tt )* ) => {
|
|
$crate::with_logger(|l| {
|
|
$crate::slog::slog_info!(l, #$a, $b; $($t)* )
|
|
})
|
|
}
|
|
}
|
|
|
|
struct Connection {
|
|
out: ws::Sender,
|
|
out_sync: Arc<Mutex<Option<ws::Sender>>>,
|
|
on_connect: Arc<Box<Fn() + Send + Sync + 'static>>,
|
|
url: String,
|
|
}
|
|
|
|
impl Connection {
|
|
fn new(
|
|
out: ws::Sender,
|
|
out_sync: Arc<Mutex<Option<ws::Sender>>>,
|
|
on_connect: Arc<Box<Fn() + Send + Sync + 'static>>,
|
|
url: String
|
|
) -> Self {
|
|
Connection {
|
|
out,
|
|
out_sync,
|
|
on_connect,
|
|
url,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl ws::Handler for Connection {
|
|
fn on_open(&mut self, _: ws::Handshake) -> ws::Result<()> {
|
|
trace!(target: "telemetry", "Connected to {}!", self.url);
|
|
|
|
*self.out_sync.lock() = Some(self.out.clone());
|
|
(self.on_connect)();
|
|
Ok(())
|
|
}
|
|
|
|
fn on_close(&mut self, code: ws::CloseCode, reason: &str) {
|
|
*self.out_sync.lock() = None;
|
|
|
|
trace!(target: "telemetry", "Connection to {} closing due to ({:?}) {}",
|
|
self.url, code, reason);
|
|
}
|
|
|
|
fn on_error(&mut self, _: ws::Error) {
|
|
*self.out_sync.lock() = None;
|
|
|
|
// Sleep to ensure that reconnecting isn't spamming logs.
|
|
// This happens in it's own thread so it won't block anything.
|
|
thread::sleep(time::Duration::from_millis(1000));
|
|
}
|
|
}
|
|
|
|
struct TelemetryWriter {
|
|
buffer: Vec<u8>,
|
|
out: Arc<Mutex<Option<ws::Sender>>>,
|
|
url: Arc<String>,
|
|
}
|
|
|
|
impl TelemetryWriter {
|
|
fn new(url: Arc<String>) -> Self {
|
|
let out = Arc::new(Mutex::new(None));
|
|
|
|
TelemetryWriter {
|
|
buffer: Vec::new(),
|
|
out,
|
|
url,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl io::Write for TelemetryWriter {
|
|
fn write(&mut self, msg: &[u8]) -> io::Result<usize> {
|
|
let mut iter = msg.split(|x| *x == b'\n');
|
|
let first = iter.next().expect("Split iterator always has at least one element; qed");
|
|
|
|
self.buffer.extend_from_slice(first);
|
|
|
|
// Flush for each occurrence of new line character
|
|
for continued in iter {
|
|
let _ = self.flush();
|
|
self.buffer.extend_from_slice(continued);
|
|
}
|
|
|
|
Ok(msg.len())
|
|
}
|
|
|
|
fn flush(&mut self) -> io::Result<()> {
|
|
if self.buffer.is_empty() {
|
|
return Ok(());
|
|
}
|
|
if let Ok(s) = ::std::str::from_utf8(&self.buffer[..]) {
|
|
let mut out = self.out.lock();
|
|
|
|
let error = if let Some(ref mut o) = *out {
|
|
let r = o.send(s);
|
|
trace!(target: "telemetry", "Sent to telemetry {}: {} -> {:?}", self.url, s, r);
|
|
|
|
r.is_err()
|
|
} else {
|
|
trace!(target: "telemetry", "Telemetry socket closed to {}, failed to send: {}", self.url, s);
|
|
false
|
|
};
|
|
|
|
if error {
|
|
*out = None;
|
|
}
|
|
}
|
|
self.buffer.clear();
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
pub struct TelemetryEndpoints (Vec<(String, u8)>);
|
|
|
|
impl TelemetryEndpoints {
|
|
pub fn new(endpoints: Vec<(String, u8)>) -> Self {
|
|
TelemetryEndpoints(endpoints)
|
|
}
|
|
}
|