Reduce consensus spam (#1658)

* core: fix predicate for dropping grandpa round messages

* core: grandpa: drop commits topic on authority set change

* core: gossip: only drop known messages based on expiration time

* core: grandpa: don't broadcast commit messages

* core: gossip: don't assume topics are header hashes

* core: gossip: expire messages more agressively

* core: grandpa: fix test environment

* core: gossip: fix tests

* core: gossip: track dead topics (and ignore messages)

* core: gossip: test dead topic pruning
This commit is contained in:
André Silva
2019-02-01 17:25:07 +00:00
committed by Robert Habermeier
parent 641bb7cb46
commit 4983f113e6
7 changed files with 131 additions and 60 deletions
+1
View File
@@ -14,6 +14,7 @@ error-chain = "0.12"
bitflags = "1.0"
futures = "0.1.17"
linked-hash-map = "0.5"
lru-cache = "0.1.1"
rustc-hex = "2.0"
rand = "0.6"
substrate-primitives = { path = "../../core/primitives" }
+75 -41
View File
@@ -18,18 +18,20 @@
//! Handles chain-specific and standard BFT messages.
use std::collections::{HashMap, HashSet};
use futures::sync::mpsc;
use std::time::{Instant, Duration};
use futures::sync::mpsc;
use rand::{self, seq::SliceRandom};
use lru_cache::LruCache;
use network_libp2p::NodeIndex;
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, Hash, HashFor};
use runtime_primitives::traits::{Block as BlockT, Hash, HashFor};
use runtime_primitives::generic::BlockId;
pub use message::generic::{Message, ConsensusMessage};
use protocol::Context;
use config::Roles;
// FIXME: Add additional spam/DoS attack protection: https://github.com/paritytech/substrate/issues/1115
const MESSAGE_LIFETIME: Duration = Duration::from_secs(600);
const MESSAGE_LIFETIME: Duration = Duration::from_secs(120);
const DEAD_TOPICS_CACHE_SIZE: usize = 4096;
struct PeerConsensus<H> {
known_messages: HashSet<H>,
@@ -49,6 +51,7 @@ pub struct ConsensusGossip<B: BlockT> {
live_message_sinks: HashMap<B::Hash, Vec<mpsc::UnboundedSender<ConsensusMessage>>>,
messages: Vec<MessageEntry<B>>,
known_messages: HashSet<(B::Hash, B::Hash)>,
known_dead_topics: LruCache<B::Hash, ()>,
message_times: HashMap<(B::Hash, B::Hash), Instant>,
session_start: Option<B::Hash>,
}
@@ -61,6 +64,7 @@ impl<B: BlockT> ConsensusGossip<B> {
live_message_sinks: HashMap::new(),
messages: Default::default(),
known_messages: Default::default(),
known_dead_topics: LruCache::new(DEAD_TOPICS_CACHE_SIZE),
message_times: Default::default(),
session_start: None
}
@@ -150,7 +154,9 @@ impl<B: BlockT> ConsensusGossip<B> {
fn register_message<F>(&mut self, message_hash: B::Hash, topic: B::Hash, broadcast: bool, get_message: F)
where F: Fn() -> ConsensusMessage
{
if self.known_messages.insert((topic, message_hash)) {
if !self.known_dead_topics.contains_key(&topic) &&
self.known_messages.insert((topic, message_hash))
{
self.messages.push(MessageEntry {
topic,
message_hash,
@@ -167,6 +173,11 @@ impl<B: BlockT> ConsensusGossip<B> {
self.peers.remove(&who);
}
pub fn collect_garbage_for_topic(&mut self, topic: B::Hash) {
self.known_dead_topics.insert(topic, ());
self.collect_garbage(|_| true);
}
/// Prune old or no longer relevant consensus messages. Provide a predicate
/// for pruning, which returns `false` when the items with a given topic should be pruned.
pub fn collect_garbage<P: Fn(&B::Hash) -> bool>(&mut self, predicate: P) {
@@ -177,22 +188,24 @@ impl<B: BlockT> ConsensusGossip<B> {
let message_times = &mut self.message_times;
let known_messages = &mut self.known_messages;
let known_dead_topics = &mut self.known_dead_topics;
let before = self.messages.len();
let now = Instant::now();
self.messages.retain(|entry| {
message_times.get(&(entry.topic, entry.message_hash))
.map(|instant| *instant + MESSAGE_LIFETIME >= now && predicate(&entry.topic))
.unwrap_or(false)
!known_dead_topics.contains_key(&entry.topic) &&
message_times.get(&(entry.topic, entry.message_hash))
.map(|instant| *instant + MESSAGE_LIFETIME >= now && predicate(&entry.topic))
.unwrap_or(false)
});
known_messages.retain(|(topic, message_hash)| {
message_times.get(&(*topic, *message_hash))
.map(|instant| *instant + (2 * MESSAGE_LIFETIME) >= now && predicate(topic))
.map(|instant| *instant + (5 * MESSAGE_LIFETIME) >= now)
.unwrap_or(false)
});
trace!(target:"gossip", "Cleaned up {} stale messages, {} left ({} known)",
trace!(target: "gossip", "Cleaned up {} stale messages, {} left ({} known)",
before - self.messages.len(),
self.messages.len(),
known_messages.len(),
@@ -230,26 +243,16 @@ impl<B: BlockT> ConsensusGossip<B> {
) -> Option<(B::Hash, ConsensusMessage)> {
let message_hash = HashFor::<B>::hash(&message[..]);
if self.known_dead_topics.contains_key(&topic) {
trace!(target:"gossip", "Ignored message from {} in dead topic {}", who, topic);
return None;
}
if self.known_messages.contains(&(topic, message_hash)) {
trace!(target:"gossip", "Ignored already known message from {} in {}", who, topic);
return None;
}
match (protocol.client().info(), protocol.client().header(&BlockId::Hash(topic))) {
(_, Err(e)) | (Err(e), _) => {
debug!(target:"gossip", "Error reading blockchain: {:?}", e);
return None;
},
(Ok(info), Ok(Some(header))) => {
if header.number() < &info.chain.best_number {
trace!(target:"gossip", "Ignored ancient message from {}, hash={}", who, topic);
return None;
}
},
(Ok(_), Ok(None)) => {},
}
if let Some(ref mut peer) = self.peers.get_mut(&who) {
use std::collections::hash_map::Entry;
peer.known_messages.insert((topic, message_hash));
@@ -316,6 +319,20 @@ mod tests {
type Block = RawBlock<ExtrinsicWrapper<u64>>;
macro_rules! push_msg {
($consensus:expr, $topic:expr, $hash: expr, $now: expr, $m:expr) => {
if $consensus.known_messages.insert(($topic, $hash)) {
$consensus.messages.push(MessageEntry {
topic: $topic,
message_hash: $hash,
message: $m,
broadcast: false,
});
$consensus.message_times.insert(($topic, $hash), $now);
}
}
}
#[test]
fn collects_garbage() {
let prev_hash = H256::random();
@@ -327,20 +344,8 @@ mod tests {
let m1 = vec![1, 2, 3];
let m2 = vec![4, 5, 6];
macro_rules! push_msg {
($topic:expr, $hash: expr, $now: expr, $m:expr) => {
consensus.messages.push(MessageEntry {
topic: $topic,
message_hash: $hash,
message: $m,
broadcast: false,
});
consensus.message_times.insert(($topic, $hash), $now);
}
}
push_msg!(prev_hash, m1_hash, now, m1);
push_msg!(best_hash, m2_hash, now, m2.clone());
push_msg!(consensus, prev_hash, m1_hash, now, m1);
push_msg!(consensus, best_hash, m2_hash, now, m2.clone());
consensus.known_messages.insert((prev_hash, m1_hash));
consensus.known_messages.insert((best_hash, m2_hash));
@@ -357,23 +362,52 @@ mod tests {
// topic that was used in one message.
consensus.collect_garbage(|topic| topic != &prev_hash);
assert_eq!(consensus.messages.len(), 1);
assert_eq!(consensus.known_messages.len(), 1);
// known messages are only pruned based on expiration time
assert_eq!(consensus.known_messages.len(), 2);
assert!(consensus.known_messages.contains(&(best_hash, m2_hash)));
// make timestamp expired, but the message is still kept as known
consensus.messages.clear();
push_msg!(best_hash, m2_hash, now - MESSAGE_LIFETIME, m2.clone());
consensus.known_messages.clear();
push_msg!(consensus, best_hash, m2_hash, now - MESSAGE_LIFETIME, m2.clone());
consensus.collect_garbage(|_topic| true);
assert!(consensus.messages.is_empty());
assert_eq!(consensus.known_messages.len(), 1);
// make timestamp expired past the known message lifetime
push_msg!(best_hash, m2_hash, now - (2 * MESSAGE_LIFETIME), m2);
consensus.known_messages.clear();
push_msg!(consensus, best_hash, m2_hash, now - (5 * MESSAGE_LIFETIME), m2);
consensus.collect_garbage(|_topic| true);
assert!(consensus.messages.is_empty());
assert!(consensus.known_messages.is_empty());
}
#[test]
fn collects_garbage_for_topic() {
let topic = H256::random();
let dead_topic = H256::random();
let message = Vec::new();
let now = Instant::now();
let mut consensus = ConsensusGossip::<Block>::new();
let message_hash = H256::random();
push_msg!(consensus, topic, message_hash, now, message.clone());
push_msg!(consensus, dead_topic, message_hash, now, message.clone());
assert_eq!(consensus.messages.len(), 2);
consensus.collect_garbage_for_topic(topic);
// removes all messages for the topic and marks the topic as dead
assert_eq!(consensus.messages.len(), 1);
assert_eq!(consensus.known_messages.len(), 2);
assert_eq!(consensus.known_dead_topics.len(), 1);
// new messages for dead topics are ignored
consensus.register_message(HashFor::<Block>::hash(&message), topic, false, || message.clone());
assert_eq!(consensus.messages.len(), 1);
assert_eq!(consensus.known_messages.len(), 2);
}
#[test]
fn message_stream_include_those_sent_before_asking_for_stream() {
use futures::Stream;
+1
View File
@@ -21,6 +21,7 @@
//! Allows attachment of an optional subprotocol for chain-specific requests.
extern crate linked_hash_map;
extern crate lru_cache;
extern crate parking_lot;
extern crate substrate_primitives as primitives;
extern crate substrate_client as client;