Adjust consensus telemetry (#2198)

* Send high-level consensus telemetry by default

* Notify telemetry on finalized

* Send used authority set to telemetry

* Do not send commit message telemetry by default

* Fix typo

* Allow for notifications on telemetry connect

...and send the current authority set on each connect.

* Send authority set to telemetry on change

* Revert "Send used authority set to telemetry"

This reverts commit 1deceead52bb7443a02879ac8138afad9a6ca5ff.

* Merge branch 'master' into 'cmichi-send-high-level-consensus-telemetry-by-default'

Squashed commit of the following:

commit 19d77cbc23
Author: Xiliang Chen <xlchen1291@gmail.com>
Date:   Wed Apr 10 20:26:29 2019 +1200

    update authers for rest of the node-template cargo.toml files (#2242)

commit 0afc357a97
Author: Bastian Köcher <bkchr@users.noreply.github.com>
Date:   Tue Apr 9 10:31:18 2019 +0200

    Throw a compile error for `on_finalise` and `on_initialise` (#2236)

commit e57e54ab9c
Author: Pierre Krieger <pierre.krieger1708@gmail.com>
Date:   Tue Apr 9 05:30:43 2019 -0300

    Add warning when using default protocol ID (#2234)

    * Add warning when using default protocol ID

    * Update core/service/src/lib.rs

commit cb766e5f5d
Author: Xiliang Chen <xlchen1291@gmail.com>
Date:   Tue Apr 9 17:22:20 2019 +1200

    update name and authors to placeholder text for node-template (#2222)

    * update name and authors to placeholder text

    * revert package name change

commit a1e15ae55a
Author: André Silva <andre.beat@gmail.com>
Date:   Mon Apr 8 12:50:34 2019 +0100

    grandpa: Voter persistence and upgrade to finality-grandpa v0.7 (#2139)

    * core: grandpa: migrate to grandpa 0.7

    * core: grandpa: store current round votes and load them on startup

    * core: grandpa: resend old persisted votes for the current round

    * core: grandpa: store base and votes for last completed round

    * core: grandpa: fix latest grandpa 0.7 changes

    * core: grandpa: update to grandpa 0.7.1

    * core: grandpa: persist votes for last two completed rounds

    * core: grandpa: simplify VoterSetState usage

    * core: grandpa: use Environment::update_voter_set_state

    * core: grandpa: fix aux_schema test

    * core: grandpa: add docs

    * core: grandpa: add note about environment assumption

    * core: grandpa: don't update voter set state on ignored votes

    * core: grandpa: add test for v1 -> v2 aux_schema migration

    * core: grandpa: add test for voter vote persistence

    * core: grandpa: use grandpa 0.7.1 from crates.io

    * core: grandpa: use try_init in test

    * core: grandpa: add comment about block_import in test

    * core: grandpa: avoid cloning HasVoted

    * core: grandpa: add missing docs

    * core: grandpa: cleanup up can_propose/prevote/precommit

commit ed3ae4ac39
Author: Gregory Terzian <2792687+gterzian@users.noreply.github.com>
Date:   Mon Apr 8 13:17:00 2019 +0200

    remove clone bound on specialization in testnet factory (#2157)

commit 03f3fb1442
Author: Andrew Jones <ascjones@gmail.com>
Date:   Sat Apr 6 12:23:56 2019 +0100

    Contract import/export validation (#2203)

    * Reject validation of contract with unknown exports

    * Validate imports eagerly

    * Increment spec version

commit decddaab0f
Author: Pierre Krieger <pierre.krieger1708@gmail.com>
Date:   Fri Apr 5 14:07:09 2019 -0300

    Fix state inconsistency between handler and behaviour (#2220)

    * Fix state inconsistency between handler and behaviour

    * Fix the error! being in the wrong place

commit dce0b4ea49
Author: Bastian Köcher <bkchr@users.noreply.github.com>
Date:   Fri Apr 5 18:50:38 2019 +0200

    Use `storage_root` of newly calculated header (#2216)

    Instead of calculating the `storage_root` a second time, we just can
    take the `storage_root` from the new header.

commit b01136c90d
Author: Marek Kotewicz <marek.kotewicz@gmail.com>
Date:   Fri Apr 5 14:44:46 2019 +0200

    Peerset::discovered accepts many peer ids (#2213)

    * Peerset::discovered accepts many peer ids

    * Improve tracing in peerset

commit 1142bcde97
Author: Marek Kotewicz <marek.kotewicz@gmail.com>
Date:   Thu Apr 4 19:40:40 2019 +0200

    simplification of peerset api (#2123)

    * Introduction of PeersetHandle

    * integrate PeersetHandle with the rest of the codebase

    * fix compilation errors

    * more tests for peerset, fixed overwriting bug in add_reserved_peer

    * Slots data structure and bugfixes for peerset

    * bend to pressure

    * updated lru-cache to 0.1.2 and updated linked-hash-map to 0.5.2

    * peerset discovered list is now a LinkedHashMap

    * fix review suggestions

    * split back Peerset and PeersetHandle

    * test for Peerset::discovered

    * applied review suggestions

    * fixes to peerset::incoming

    * peerset disconnects are all instantaneous

    * instantaneous drop in peerset finished

    * Peerset::set_reserved_only can also reconnect nodes

    * Peerset scores cache uses lru-cache

    * remove redundant function call and comment from Peerset::on_set_reserved_only

    * add_peer returns SlotState enum

    * apply review suggestions

    * is_reserved -> is_connected_and_reserved

commit 301844dd56
Author: Arkadiy Paronyan <arkady.paronyan@gmail.com>
Date:   Thu Apr 4 18:01:28 2019 +0200

    Disconnect on protocol timeout (#2212)

commit cb3c912b1a
Author: André Silva <andre.beat@gmail.com>
Date:   Thu Apr 4 15:56:49 2019 +0100

    core: grandpa: verify commit target in justification (#2201)

commit 6920b169cd
Author: Bastian Köcher <bkchr@users.noreply.github.com>
Date:   Thu Apr 4 16:56:16 2019 +0200

    Introduce `original_storage` and `original_storage_hash` (#2211)

    Both functions will ignore any overlayed changes and access the backend
    directly.

commit cb7a8161f5
Author: Xiliang Chen <xlchen1291@gmail.com>
Date:   Fri Apr 5 03:55:55 2019 +1300

    code cleanup (#2206)

commit acaf1fe625
Author: Arkadiy Paronyan <arkady.paronyan@gmail.com>
Date:   Wed Apr 3 15:52:46 2019 +0200

    Emberic elm testnet (#2197)

* Make telemetry onconnect hoook optional

* Merge branch 'master' into 'cmichi-send-high-level-consensus-telemetry-by-default'

* Introduce GrandpaParams struct to condense parameters

* Remove debug statement

* Fix tests

* Rename parameter

* Fix tests

* Rename struct

* Do not send verbosity level

* Combine imports

* Implement comments

* Run cargo build --all

* Remove noisy telemetry

* Add docs for public items

* Unbox and support Clone trait

* Fix merge

* Fix merge

* Update core/finality-grandpa/src/lib.rs

Co-Authored-By: cmichi <mich@elmueller.net>
This commit is contained in:
Michael Müller
2019-04-23 15:49:28 +02:00
committed by Robert Habermeier
parent beadb4b1bd
commit 87776e63bb
12 changed files with 166 additions and 55 deletions
+5
View File
@@ -956,6 +956,11 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
let header = self.header(&BlockId::Hash(finalized_hash))?
.expect("header already known to exist in DB because it is indicated in the tree route; qed");
telemetry!(SUBSTRATE_INFO; "notify.finalized";
"height" => format!("{}", header.number()),
"best" => ?finalized_hash,
);
let notification = FinalityNotification {
header,
hash: finalized_hash,
@@ -16,6 +16,7 @@ runtime_primitives = { package = "sr-primitives", path = "../sr-primitives" }
consensus_common = { package = "substrate-consensus-common", path = "../consensus/common" }
substrate-primitives = { path = "../primitives" }
substrate-telemetry = { path = "../telemetry" }
serde_json = "1.0"
client = { package = "substrate-client", path = "../client" }
inherents = { package = "substrate-inherents", path = "../../core/inherents" }
network = { package = "substrate-network", path = "../network" }
@@ -25,6 +25,7 @@ use fork_tree::ForkTree;
use grandpa::round::State as RoundState;
use runtime_primitives::traits::{Block as BlockT, NumberFor};
use log::{info, warn};
use substrate_telemetry::{telemetry, CONSENSUS_INFO};
use crate::authorities::{AuthoritySet, SharedAuthoritySet, PendingChange, DelayKind};
use crate::consensus_changes::{SharedConsensusChanges, ConsensusChanges};
@@ -365,6 +366,17 @@ pub(crate) fn update_authority_set<Block: BlockT, F, R>(
let encoded_set = set.encode();
if let Some(new_set) = new_set {
telemetry!(CONSENSUS_INFO; "afg.authority_set";
"hash" => ?new_set.canon_hash,
"number" => ?new_set.canon_number,
"authority_set_id" => ?new_set.set_id,
"authorities" => {
let authorities: Vec<String> =
new_set.authorities.iter().map(|(id, _)| format!("{}", id)).collect();
format!("{:?}", authorities)
}
);
// we also overwrite the "last completed round" entry with a blank slate
// because from the perspective of the finality gadget, the chain has
// reset.
@@ -681,7 +681,7 @@ impl<Block: BlockT, N: Network<Block>> Sink for CommitsOut<Block, N> {
let (round, commit) = input;
let round = Round(round);
telemetry!(CONSENSUS_INFO; "afg.commit_issued";
telemetry!(CONSENSUS_DEBUG; "afg.commit_issued";
"target_number" => ?commit.target_number, "target_hash" => ?commit.target_hash,
);
let (precommits, auth_data) = commit.precommits.into_iter()
+56 -13
View File
@@ -69,6 +69,7 @@ use inherents::InherentDataProviders;
use runtime_primitives::generic::BlockId;
use substrate_primitives::{ed25519, H256, Pair, Blake2Hasher};
use substrate_telemetry::{telemetry, CONSENSUS_INFO, CONSENSUS_DEBUG, CONSENSUS_WARN};
use serde_json;
use srml_finality_tracker;
@@ -105,6 +106,7 @@ use environment::{CompletedRound, CompletedRounds, Environment, HasVoted, Shared
use import::GrandpaBlockImport;
use until_imported::UntilCommitBlocksImported;
use communication::NetworkBridge;
use service::TelemetryOnConnect;
use ed25519::{Public as AuthorityId, Signature as AuthoritySignature};
@@ -433,14 +435,26 @@ fn register_finality_tracker_inherent_data_provider<B, E, Block: BlockT<Hash=H25
}
}
/// Parameters used to run Grandpa.
pub struct GrandpaParams<'a, B, E, Block: BlockT<Hash=H256>, N, RA, X> {
/// Configuration for the GRANDPA service.
pub config: Config,
/// A link to the block import worker.
pub link: LinkHalf<B, E, Block, RA>,
/// The Network instance.
pub network: N,
/// The inherent data providers.
pub inherent_data_providers: InherentDataProviders,
/// Handle to a future that will resolve on exit.
pub on_exit: X,
/// If supplied, can be used to hook on telemetry connection established events.
pub telemetry_on_connect: Option<TelemetryOnConnect<'a>>,
}
/// Run a GRANDPA voter as a task. Provide configuration and a link to a
/// block import worker that has already been instantiated with `block_import`.
pub fn run_grandpa_voter<B, E, Block: BlockT<Hash=H256>, N, RA>(
config: Config,
link: LinkHalf<B, E, Block, RA>,
network: N,
inherent_data_providers: InherentDataProviders,
on_exit: impl Future<Item=(),Error=()> + Clone + Send + 'static,
pub fn run_grandpa_voter<B, E, Block: BlockT<Hash=H256>, N, RA, X>(
grandpa_params: GrandpaParams<B, E, Block, N, RA, X>,
) -> ::client::error::Result<impl Future<Item=(),Error=()> + Send + 'static> where
Block::Hash: Ord,
B: Backend<Block, Blake2Hasher> + 'static,
@@ -451,7 +465,17 @@ pub fn run_grandpa_voter<B, E, Block: BlockT<Hash=H256>, N, RA>(
DigestFor<Block>: Encode,
DigestItemFor<Block>: DigestItem<AuthorityId=AuthorityId>,
RA: Send + Sync + 'static,
X: Future<Item=(),Error=()> + Clone + Send + 'static,
{
let GrandpaParams {
config,
link,
network,
inherent_data_providers,
on_exit,
telemetry_on_connect,
} = grandpa_params;
use futures::future::{self, Loop as FutureLoop};
let (network, network_startup) = NetworkBridge::new(network, config.clone(), on_exit.clone());
@@ -465,6 +489,28 @@ pub fn run_grandpa_voter<B, E, Block: BlockT<Hash=H256>, N, RA>(
register_finality_tracker_inherent_data_provider(client.clone(), &inherent_data_providers)?;
if let Some(telemetry_on_connect) = telemetry_on_connect {
let authorities = authority_set.clone();
let events = telemetry_on_connect.telemetry_connection_sinks
.for_each(move |_| {
telemetry!(CONSENSUS_INFO; "afg.authority_set";
"authority_set_id" => ?authorities.set_id(),
"authorities" => {
let curr = authorities.current_authorities();
let voters = curr.voters();
let authorities: Vec<String> =
voters.iter().map(|(id, _)| id.to_string()).collect();
serde_json::to_string(&authorities)
.expect("authorities is always at least an empty vector; elements are always of type string")
}
);
Ok(())
})
.then(|_| Ok(()));
let events = events.select(telemetry_on_connect.on_exit).then(|_| Ok(()));
telemetry_on_connect.executor.spawn(events);
}
let voters = authority_set.current_authorities();
let initial_environment = Arc::new(Environment {
inner: client.clone(),
@@ -660,12 +706,8 @@ pub fn run_grandpa_voter<B, E, Block: BlockT<Hash=H256>, N, RA>(
}
#[deprecated(since = "1.1", note = "Please switch to run_grandpa_voter.")]
pub fn run_grandpa<B, E, Block: BlockT<Hash=H256>, N, RA>(
config: Config,
link: LinkHalf<B, E, Block, RA>,
network: N,
inherent_data_providers: InherentDataProviders,
on_exit: impl Future<Item=(),Error=()> + Clone + Send + 'static,
pub fn run_grandpa<B, E, Block: BlockT<Hash=H256>, N, RA, X>(
grandpa_params: GrandpaParams<B, E, Block, N, RA, X>,
) -> ::client::error::Result<impl Future<Item=(),Error=()> + Send + 'static> where
Block::Hash: Ord,
B: Backend<Block, Blake2Hasher> + 'static,
@@ -676,6 +718,7 @@ pub fn run_grandpa<B, E, Block: BlockT<Hash=H256>, N, RA>(
DigestFor<Block>: Encode,
DigestItemFor<Block>: DigestItem<AuthorityId=AuthorityId>,
RA: Send + Sync + 'static,
X: Future<Item=(),Error=()> + Clone + Send + 'static,
{
run_grandpa_voter(config, link, network, inherent_data_providers, on_exit)
run_grandpa_voter(grandpa_params)
}
+37 -28
View File
@@ -419,18 +419,20 @@ fn run_to_completion_with<F>(
fn assert_send<T: Send>(_: &T) { }
let voter = run_grandpa_voter(
Config {
let grandpa_params = GrandpaParams {
config: Config {
gossip_duration: TEST_GOSSIP_DURATION,
justification_period: 32,
local_key: Some(Arc::new(key.clone().into())),
name: Some(format!("peer#{}", peer_id)),
},
link,
MessageRouting::new(net.clone(), peer_id),
InherentDataProviders::new(),
Exit,
).expect("all in order with client and network");
link: link,
network: MessageRouting::new(net.clone(), peer_id),
inherent_data_providers: InherentDataProviders::new(),
on_exit: Exit,
telemetry_on_connect: None,
};
let voter = run_grandpa_voter(grandpa_params).expect("all in order with client and network");
assert_send(&voter);
@@ -517,18 +519,21 @@ fn finalize_3_voters_1_full_observer() {
.take_while(|n| Ok(n.header.number() < &20))
.for_each(move |_| Ok(()))
);
let voter = run_grandpa_voter(
Config {
let grandpa_params = GrandpaParams {
config: Config {
gossip_duration: TEST_GOSSIP_DURATION,
justification_period: 32,
local_key,
name: Some(format!("peer#{}", peer_id)),
},
link,
MessageRouting::new(net.clone(), peer_id),
InherentDataProviders::new(),
Exit,
).expect("all in order with client and network");
link: link,
network: MessageRouting::new(net.clone(), peer_id),
inherent_data_providers: InherentDataProviders::new(),
on_exit: Exit,
telemetry_on_connect: None,
};
let voter = run_grandpa_voter(grandpa_params).expect("all in order with client and network");
runtime.spawn(voter);
}
@@ -679,18 +684,20 @@ fn transition_3_voters_twice_1_full_observer() {
assert_eq!(set.pending_changes().count(), 0);
})
);
let voter = run_grandpa_voter(
Config {
let grandpa_params = GrandpaParams {
config: Config {
gossip_duration: TEST_GOSSIP_DURATION,
justification_period: 32,
local_key,
name: Some(format!("peer#{}", peer_id)),
},
link,
MessageRouting::new(net.clone(), peer_id),
InherentDataProviders::new(),
Exit,
).expect("all in order with client and network");
link: link,
network: MessageRouting::new(net.clone(), peer_id),
inherent_data_providers: InherentDataProviders::new(),
on_exit: Exit,
telemetry_on_connect: None,
};
let voter = run_grandpa_voter(grandpa_params).expect("all in order with client and network");
runtime.spawn(voter);
}
@@ -1081,18 +1088,20 @@ fn voter_persists_its_votes() {
let (_block_import, _, link) = net.lock().make_block_import(client.clone());
let link = link.lock().take().unwrap();
let mut voter = run_grandpa_voter(
Config {
let grandpa_params = GrandpaParams {
config: Config {
gossip_duration: TEST_GOSSIP_DURATION,
justification_period: 32,
local_key: Some(Arc::new(peers[0].clone().into())),
name: Some(format!("peer#{}", 0)),
},
link,
MessageRouting::new(net.clone(), 0),
InherentDataProviders::new(),
Exit,
).expect("all in order with client and network");
link: link,
network: MessageRouting::new(net.clone(), 0),
inherent_data_providers: InherentDataProviders::new(),
on_exit: Exit,
telemetry_on_connect: None,
};
let mut voter = run_grandpa_voter(grandpa_params).expect("all in order with client and network");
let voter = future::poll_fn(move || {
// we need to keep the block_import alive since it owns the
+1
View File
@@ -22,6 +22,7 @@ mod slots;
use std::collections::VecDeque;
use futures::{prelude::*, sync::mpsc, try_ready};
use libp2p::PeerId;
use linked_hash_map::LinkedHashMap;
use log::trace;
use lru_cache::LruCache;
use slots::{SlotType, SlotState, Slots};
+32 -1
View File
@@ -28,6 +28,8 @@ pub mod chain_ops;
use std::io;
use std::net::SocketAddr;
use std::collections::HashMap;
use futures::sync::mpsc;
use parking_lot::Mutex;
use client::BlockchainEvents;
use exit_future::Signal;
@@ -82,6 +84,7 @@ pub struct Service<Components: components::Components> {
_rpc: Box<::std::any::Any + Send + Sync>,
_telemetry: Option<Arc<tel::Telemetry>>,
_offchain_workers: Option<Arc<offchain::OffchainWorkers<ComponentClient<Components>, ComponentBlock<Components>>>>,
_telemetry_on_connect_sinks: Arc<Mutex<Vec<mpsc::UnboundedSender<()>>>>,
}
/// Creates bare client without any networking.
@@ -96,7 +99,27 @@ pub fn new_client<Factory: components::ServiceFactory>(config: &FactoryFullConfi
Ok(client)
}
/// Stream of events for connection established to a telemetry server.
pub type TelemetryOnConnectNotifications = mpsc::UnboundedReceiver<()>;
/// Used to hook on telemetry connection established events.
pub struct TelemetryOnConnect<'a> {
/// Handle to a future that will resolve on exit.
pub on_exit: Box<Future<Item=(), Error=()> + Send + 'static>,
/// Event stream.
pub telemetry_connection_sinks: TelemetryOnConnectNotifications,
/// Executor to which the hook is spawned.
pub executor: &'a TaskExecutor,
}
impl<Components: components::Components> Service<Components> {
/// Get event stream for telemetry connection established events.
pub fn telemetry_on_connect_stream(&self) -> TelemetryOnConnectNotifications {
let (sink, stream) = mpsc::unbounded();
self._telemetry_on_connect_sinks.lock().push(sink);
stream
}
/// Creates a new service.
pub fn new(
mut config: FactoryFullConfiguration<Components::Factory>,
@@ -304,6 +327,8 @@ impl<Components: components::Components> Service<Components> {
config.rpc_ws, config.rpc_cors.clone(), task_executor.clone(), transaction_pool.clone(),
)?;
let telemetry_connection_sinks: Arc<Mutex<Vec<mpsc::UnboundedSender<()>>>> = Default::default();
// Telemetry
let telemetry = config.telemetry_endpoints.clone().map(|endpoints| {
let is_authority = config.roles == Roles::AUTHORITY;
@@ -313,6 +338,7 @@ impl<Components: components::Components> Service<Components> {
let impl_name = config.impl_name.to_owned();
let version = version.clone();
let chain_name = config.chain_spec.name().to_owned();
let telemetry_connection_sinks_ = telemetry_connection_sinks.clone();
Arc::new(tel::init_telemetry(tel::TelemetryConfig {
endpoints,
on_connect: Box::new(move || {
@@ -326,6 +352,10 @@ impl<Components: components::Components> Service<Components> {
"authority" => is_authority,
"network_id" => network_id.clone()
);
telemetry_connection_sinks_.lock().retain(|sink| {
sink.unbounded_send(()).is_ok()
});
}),
}))
});
@@ -342,6 +372,7 @@ impl<Components: components::Components> Service<Components> {
_rpc: Box::new(rpc),
_telemetry: telemetry,
_offchain_workers: offchain_workers,
_telemetry_on_connect_sinks: telemetry_connection_sinks.clone(),
})
}
@@ -358,7 +389,7 @@ impl<Components: components::Components> Service<Components> {
}
}
/// return a shared instance of Telemtry (if enabled)
/// return a shared instance of Telemetry (if enabled)
pub fn telemetry(&self) -> Option<Arc<tel::Telemetry>> {
self._telemetry.as_ref().map(|t| t.clone())
}
+1
View File
@@ -12,6 +12,7 @@ log = "0.4"
rand = "0.6"
serde = "1.0.81"
serde_derive = "1.0"
serde_json = "1.0"
slog = { version = "^2", features = ["nested-values"] }
slog-json = { version = "^2", features = ["nested-values"] }
slog-async = { version = "^2", features = ["nested-values"] }
+3 -5
View File
@@ -24,14 +24,12 @@
use std::{io, time, thread};
use std::sync::Arc;
use parking_lot::Mutex;
use slog::{Drain, o};
use slog::{Drain, o, OwnedKVList, Record};
use log::trace;
use rand::{thread_rng, Rng};
pub use slog_scope::with_logger;
pub use slog;
use serde_derive::{Serialize, Deserialize};
use slog::OwnedKVList;
use slog::Record;
use core::result;
/// Configuration for telemetry.
@@ -56,7 +54,7 @@ pub const SUBSTRATE_INFO: &str = "0";
pub const CONSENSUS_TRACE: &str = "9";
pub const CONSENSUS_DEBUG: &str = "5";
pub const CONSENSUS_WARN: &str = "4";
pub const CONSENSUS_INFO: &str = "3";
pub const CONSENSUS_INFO: &str = "0";
/// Multiply logging to all drains. This is similar to `slog::Duplicate`, which is
/// limited to two drains though and doesn't support dynamic nesting at runtime.
@@ -166,7 +164,7 @@ pub fn init_telemetry(config: TelemetryConfig) -> slog_scope::GlobalLoggerGuard
macro_rules! telemetry {
( $a:expr; $b:expr; $( $t:tt )* ) => {
$crate::with_logger(|l| {
$crate::slog::slog_info!(l, #$a, $b; "verbosity" => stringify!($a), $($t)* )
$crate::slog::slog_info!(l, #$a, $b; $($t)* )
})
}
}