network-gossip: add metric for number of local messages (#7871)

* network-gossip: add metric for number of local messages

* grandpa: fix GossipEngine missing metrics registry parameter

* network-gossip: increase known messages cache size

* network-gossip: fix tests

* grandpa: remove unnecessary clone

Co-authored-by: Max Inden <mail@max-inden.de>

* network-gossip: count registered and expired messages separately

* network-gossip: add comment on known messages cache size

* network-gossip: extend comment with cache size in memory

Co-authored-by: Max Inden <mail@max-inden.de>
This commit is contained in:
André Silva
2021-01-12 15:04:48 +00:00
committed by GitHub
parent da9c73125f
commit 2d1b9a8d38
5 changed files with 101 additions and 34 deletions
+1
View File
@@ -7218,6 +7218,7 @@ dependencies = [
"rand 0.7.3", "rand 0.7.3",
"sc-network", "sc-network",
"sp-runtime", "sp-runtime",
"substrate-prometheus-endpoint",
"substrate-test-runtime-client", "substrate-test-runtime-client",
"wasm-timer", "wasm-timer",
] ]
@@ -217,7 +217,8 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
let gossip_engine = Arc::new(Mutex::new(GossipEngine::new( let gossip_engine = Arc::new(Mutex::new(GossipEngine::new(
service.clone(), service.clone(),
GRANDPA_PROTOCOL_NAME, GRANDPA_PROTOCOL_NAME,
validator.clone() validator.clone(),
prometheus_registry,
))); )));
{ {
@@ -20,6 +20,7 @@ futures-timer = "3.0.1"
libp2p = { version = "0.33.0", default-features = false } libp2p = { version = "0.33.0", default-features = false }
log = "0.4.8" log = "0.4.8"
lru = "0.6.1" lru = "0.6.1"
prometheus-endpoint = { package = "substrate-prometheus-endpoint", version = "0.8.0", path = "../../utils/prometheus" }
sc-network = { version = "0.8.0", path = "../network" } sc-network = { version = "0.8.0", path = "../network" }
sp-runtime = { version = "2.0.0", path = "../../primitives/runtime" } sp-runtime = { version = "2.0.0", path = "../../primitives/runtime" }
wasm-timer = "0.2" wasm-timer = "0.2"
+12 -5
View File
@@ -25,6 +25,7 @@ use futures::prelude::*;
use futures::channel::mpsc::{channel, Sender, Receiver}; use futures::channel::mpsc::{channel, Sender, Receiver};
use libp2p::PeerId; use libp2p::PeerId;
use log::trace; use log::trace;
use prometheus_endpoint::Registry;
use sp_runtime::traits::Block as BlockT; use sp_runtime::traits::Block as BlockT;
use std::{ use std::{
borrow::Cow, borrow::Cow,
@@ -72,12 +73,13 @@ impl<B: BlockT> GossipEngine<B> {
network: N, network: N,
protocol: impl Into<Cow<'static, str>>, protocol: impl Into<Cow<'static, str>>,
validator: Arc<dyn Validator<B>>, validator: Arc<dyn Validator<B>>,
metrics_registry: Option<&Registry>,
) -> Self where B: 'static { ) -> Self where B: 'static {
let protocol = protocol.into(); let protocol = protocol.into();
let network_event_stream = network.event_stream(); let network_event_stream = network.event_stream();
GossipEngine { GossipEngine {
state_machine: ConsensusGossip::new(validator, protocol.clone()), state_machine: ConsensusGossip::new(validator, protocol.clone(), metrics_registry),
network: Box::new(network), network: Box::new(network),
periodic_maintenance_interval: futures_timer::Delay::new(PERIODIC_MAINTENANCE_INTERVAL), periodic_maintenance_interval: futures_timer::Delay::new(PERIODIC_MAINTENANCE_INTERVAL),
protocol, protocol,
@@ -372,7 +374,8 @@ mod tests {
let mut gossip_engine = GossipEngine::<Block>::new( let mut gossip_engine = GossipEngine::<Block>::new(
network.clone(), network.clone(),
"/my_protocol", "/my_protocol",
Arc::new(AllowAll{}), Arc::new(AllowAll {}),
None,
); );
// Drop network event stream sender side. // Drop network event stream sender side.
@@ -399,7 +402,8 @@ mod tests {
let mut gossip_engine = GossipEngine::<Block>::new( let mut gossip_engine = GossipEngine::<Block>::new(
network.clone(), network.clone(),
protocol.clone(), protocol.clone(),
Arc::new(AllowAll{}), Arc::new(AllowAll {}),
None,
); );
let mut event_sender = network.inner.lock() let mut event_sender = network.inner.lock()
@@ -533,7 +537,8 @@ mod tests {
let mut gossip_engine = GossipEngine::<Block>::new( let mut gossip_engine = GossipEngine::<Block>::new(
network.clone(), network.clone(),
protocol.clone(), protocol.clone(),
Arc::new(TestValidator{}), Arc::new(TestValidator {}),
None,
); );
// Create channels. // Create channels.
@@ -550,7 +555,9 @@ mod tests {
for (topic, tx) in txs { for (topic, tx) in txs {
match gossip_engine.message_sinks.get_mut(&topic) { match gossip_engine.message_sinks.get_mut(&topic) {
Some(entry) => entry.push(tx), Some(entry) => entry.push(tx),
None => {gossip_engine.message_sinks.insert(topic, vec![tx]);}, None => {
gossip_engine.message_sinks.insert(topic, vec![tx]);
}
} }
} }
@@ -23,15 +23,24 @@ use std::collections::{HashMap, HashSet};
use std::sync::Arc; use std::sync::Arc;
use std::iter; use std::iter;
use std::time; use std::time;
use log::{error, trace}; use log::{debug, error, trace};
use lru::LruCache; use lru::LruCache;
use libp2p::PeerId; use libp2p::PeerId;
use prometheus_endpoint::{register, Counter, PrometheusError, Registry, U64};
use sp_runtime::traits::{Block as BlockT, Hash, HashFor}; use sp_runtime::traits::{Block as BlockT, Hash, HashFor};
use sc_network::ObservedRole; use sc_network::ObservedRole;
use wasm_timer::Instant; use wasm_timer::Instant;
// FIXME: Add additional spam/DoS attack protection: https://github.com/paritytech/substrate/issues/1115 // FIXME: Add additional spam/DoS attack protection: https://github.com/paritytech/substrate/issues/1115
const KNOWN_MESSAGES_CACHE_SIZE: usize = 4096; // NOTE: The current value is adjusted based on largest production network deployment (Kusama) and
// the current main gossip user (GRANDPA). Currently there are ~800 validators on Kusama, as such,
// each GRANDPA round should generate ~1600 messages, and we currently keep track of the last 2
// completed rounds and the current live one. That makes it so that at any point we will be holding
// ~4800 live messages.
//
// Assuming that each known message is tracked with a 32 byte hash (common for `Block::Hash`), then
// this cache should take about 256 KB of memory.
const KNOWN_MESSAGES_CACHE_SIZE: usize = 8192;
const REBROADCAST_INTERVAL: time::Duration = time::Duration::from_secs(30); const REBROADCAST_INTERVAL: time::Duration = time::Duration::from_secs(30);
@@ -151,11 +160,25 @@ pub struct ConsensusGossip<B: BlockT> {
protocol: Cow<'static, str>, protocol: Cow<'static, str>,
validator: Arc<dyn Validator<B>>, validator: Arc<dyn Validator<B>>,
next_broadcast: Instant, next_broadcast: Instant,
metrics: Option<Metrics>,
} }
impl<B: BlockT> ConsensusGossip<B> { impl<B: BlockT> ConsensusGossip<B> {
/// Create a new instance using the given validator. /// Create a new instance using the given validator.
pub fn new(validator: Arc<dyn Validator<B>>, protocol: Cow<'static, str>) -> Self { pub fn new(
validator: Arc<dyn Validator<B>>,
protocol: Cow<'static, str>,
metrics_registry: Option<&Registry>,
) -> Self {
let metrics = match metrics_registry.map(Metrics::register) {
Some(Ok(metrics)) => Some(metrics),
Some(Err(e)) => {
debug!(target: "gossip", "Failed to register metrics: {:?}", e);
None
}
None => None,
};
ConsensusGossip { ConsensusGossip {
peers: HashMap::new(), peers: HashMap::new(),
messages: Default::default(), messages: Default::default(),
@@ -163,6 +186,7 @@ impl<B: BlockT> ConsensusGossip<B> {
protocol, protocol,
validator, validator,
next_broadcast: Instant::now() + REBROADCAST_INTERVAL, next_broadcast: Instant::now() + REBROADCAST_INTERVAL,
metrics,
} }
} }
@@ -197,6 +221,10 @@ impl<B: BlockT> ConsensusGossip<B> {
message, message,
sender, sender,
}); });
if let Some(ref metrics) = self.metrics {
metrics.registered_messages.inc();
}
} }
} }
@@ -264,10 +292,17 @@ impl<B: BlockT> ConsensusGossip<B> {
let before = self.messages.len(); let before = self.messages.len();
let mut message_expired = self.validator.message_expired(); let mut message_expired = self.validator.message_expired();
self.messages.retain(|entry| !message_expired(entry.topic, &entry.message)); self.messages
.retain(|entry| !message_expired(entry.topic, &entry.message));
let expired_messages = before - self.messages.len();
if let Some(ref metrics) = self.metrics {
metrics.expired_messages.inc_by(expired_messages as u64)
}
trace!(target: "gossip", "Cleaned up {} stale messages, {} left ({} known)", trace!(target: "gossip", "Cleaned up {} stale messages, {} left ({} known)",
before - self.messages.len(), expired_messages,
self.messages.len(), self.messages.len(),
known_messages.len(), known_messages.len(),
); );
@@ -429,6 +464,32 @@ impl<B: BlockT> ConsensusGossip<B> {
} }
} }
struct Metrics {
registered_messages: Counter<U64>,
expired_messages: Counter<U64>,
}
impl Metrics {
fn register(registry: &Registry) -> Result<Self, PrometheusError> {
Ok(Self {
registered_messages: register(
Counter::new(
"network_gossip_registered_messages_total",
"Number of registered messages by the gossip service.",
)?,
registry,
)?,
expired_messages: register(
Counter::new(
"network_gossip_expired_messages_total",
"Number of expired messages by the gossip service.",
)?,
registry,
)?,
})
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use futures::prelude::*; use futures::prelude::*;
@@ -538,7 +599,7 @@ mod tests {
let prev_hash = H256::random(); let prev_hash = H256::random();
let best_hash = H256::random(); let best_hash = H256::random();
let mut consensus = ConsensusGossip::<Block>::new(Arc::new(AllowAll), "/foo".into()); let mut consensus = ConsensusGossip::<Block>::new(Arc::new(AllowAll), "/foo".into(), None);
let m1_hash = H256::random(); let m1_hash = H256::random();
let m2_hash = H256::random(); let m2_hash = H256::random();
let m1 = vec![1, 2, 3]; let m1 = vec![1, 2, 3];
@@ -565,11 +626,11 @@ mod tests {
#[test] #[test]
fn message_stream_include_those_sent_before_asking() { fn message_stream_include_those_sent_before_asking() {
let mut consensus = ConsensusGossip::<Block>::new(Arc::new(AllowAll), "/foo".into()); let mut consensus = ConsensusGossip::<Block>::new(Arc::new(AllowAll), "/foo".into(), None);
// Register message. // Register message.
let message = vec![4, 5, 6]; let message = vec![4, 5, 6];
let topic = HashFor::<Block>::hash(&[1,2,3]); let topic = HashFor::<Block>::hash(&[1, 2, 3]);
consensus.register_message(topic, message.clone()); consensus.register_message(topic, message.clone());
assert_eq!( assert_eq!(
@@ -580,7 +641,7 @@ mod tests {
#[test] #[test]
fn can_keep_multiple_messages_per_topic() { fn can_keep_multiple_messages_per_topic() {
let mut consensus = ConsensusGossip::<Block>::new(Arc::new(AllowAll), "/foo".into()); let mut consensus = ConsensusGossip::<Block>::new(Arc::new(AllowAll), "/foo".into(), None);
let topic = [1; 32].into(); let topic = [1; 32].into();
let msg_a = vec![1, 2, 3]; let msg_a = vec![1, 2, 3];
@@ -594,7 +655,7 @@ mod tests {
#[test] #[test]
fn peer_is_removed_on_disconnect() { fn peer_is_removed_on_disconnect() {
let mut consensus = ConsensusGossip::<Block>::new(Arc::new(AllowAll), "/foo".into()); let mut consensus = ConsensusGossip::<Block>::new(Arc::new(AllowAll), "/foo".into(), None);
let mut network = NoOpNetwork::default(); let mut network = NoOpNetwork::default();
@@ -608,10 +669,8 @@ mod tests {
#[test] #[test]
fn on_incoming_ignores_discarded_messages() { fn on_incoming_ignores_discarded_messages() {
let to_forward = ConsensusGossip::<Block>::new( let to_forward = ConsensusGossip::<Block>::new(Arc::new(DiscardAll), "/foo".into(), None)
Arc::new(DiscardAll), .on_incoming(
"/foo".into(),
).on_incoming(
&mut NoOpNetwork::default(), &mut NoOpNetwork::default(),
PeerId::random(), PeerId::random(),
vec![vec![1, 2, 3]], vec![vec![1, 2, 3]],
@@ -628,10 +687,8 @@ mod tests {
let mut network = NoOpNetwork::default(); let mut network = NoOpNetwork::default();
let remote = PeerId::random(); let remote = PeerId::random();
let to_forward = ConsensusGossip::<Block>::new( let to_forward = ConsensusGossip::<Block>::new(Arc::new(AllowAll), "/foo".into(), None)
Arc::new(AllowAll), .on_incoming(
"/foo".into(),
).on_incoming(
&mut network, &mut network,
// Unregistered peer. // Unregistered peer.
remote.clone(), remote.clone(),