From 4983f113e605b942e39f00f090c41968d341c99f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Fri, 1 Feb 2019 17:25:07 +0000 Subject: [PATCH] Reduce consensus spam (#1658) * core: fix predicate for dropping grandpa round messages * core: grandpa: drop commits topic on authority set change * core: gossip: only drop known messages based on expiration time * core: grandpa: don't broadcast commit messages * core: gossip: don't assume topics are header hashes * core: gossip: expire messages more agressively * core: grandpa: fix test environment * core: gossip: fix tests * core: gossip: track dead topics (and ignore messages) * core: gossip: test dead topic pruning --- substrate/Cargo.lock | 1 + .../finality-grandpa/src/communication.rs | 31 +++-- substrate/core/finality-grandpa/src/lib.rs | 16 ++- substrate/core/finality-grandpa/src/tests.rs | 25 ++-- substrate/core/network/Cargo.toml | 1 + .../core/network/src/consensus_gossip.rs | 116 +++++++++++------- substrate/core/network/src/lib.rs | 1 + 7 files changed, 131 insertions(+), 60 deletions(-) diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index 6771530327..7caa98e8dc 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -3708,6 +3708,7 @@ dependencies = [ "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", "linked-hash-map 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", + "lru-cache 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "parity-codec 3.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "parity-codec-derive 3.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/substrate/core/finality-grandpa/src/communication.rs b/substrate/core/finality-grandpa/src/communication.rs index b1378944e4..51994e7d38 100644 --- a/substrate/core/finality-grandpa/src/communication.rs +++ b/substrate/core/finality-grandpa/src/communication.rs @@ -18,6 +18,8 @@ //! that sign or re-shape. use std::collections::HashMap; +use std::sync::Arc; + use grandpa::VoterSet; use futures::prelude::*; use futures::sync::mpsc; @@ -27,8 +29,6 @@ use runtime_primitives::traits::Block as BlockT; use tokio::timer::Interval; use {Error, Network, Message, SignedMessage, Commit, CompactCommit}; -use std::sync::Arc; - fn localized_payload(round: u64, set_id: u64, message: &E) -> Vec { (message, round, set_id).encode() } @@ -47,6 +47,8 @@ enum Broadcast { Announcement(Round, SetId, Block::Hash), // round, set id being dropped. DropRound(Round, SetId), + // set_id being dropped. + DropSet(SetId), } impl Broadcast { @@ -56,6 +58,7 @@ impl Broadcast { Broadcast::Message(_, s, _) => s, Broadcast::Announcement(_, s, _) => s, Broadcast::DropRound(_, s) => s, + Broadcast::DropSet(s) => s, } } } @@ -187,7 +190,11 @@ impl> Future for BroadcastWorker { Broadcast::DropRound(round, set_id) => { // stop making announcements for any dead rounds. self.announcements.retain(|_, &mut r| r > round); - self.network.drop_messages(round.0, set_id.0); + self.network.drop_round_messages(round.0, set_id.0); + } + Broadcast::DropSet(set_id) => { + // stop making announcements for any dead rounds. + self.network.drop_set_messages(set_id.0); } } } @@ -207,10 +214,14 @@ impl> Network for BroadcastHandle { let _ = self.relay.unbounded_send(Broadcast::Message(Round(round), SetId(set_id), message)); } - fn drop_messages(&self, round: u64, set_id: u64) { + fn drop_round_messages(&self, round: u64, set_id: u64) { let _ = self.relay.unbounded_send(Broadcast::DropRound(Round(round), SetId(set_id))); } + fn drop_set_messages(&self, set_id: u64) { + let _ = self.relay.unbounded_send(Broadcast::DropSet(SetId(set_id))); + } + fn commit_messages(&self, set_id: u64) -> Self::In { self.network.commit_messages(set_id) } @@ -332,7 +343,7 @@ impl> Sink for OutgoingMessages impl> Drop for OutgoingMessages { fn drop(&mut self) { - self.network.drop_messages(self.round, self.set_id); + self.network.drop_round_messages(self.round, self.set_id); } } @@ -439,14 +450,14 @@ pub(crate) fn checked_commit_stream( } /// An output sink for commit messages. -pub(crate) struct CommitsOut { +pub(crate) struct CommitsOut> { network: N, set_id: u64, _marker: ::std::marker::PhantomData, is_voter: bool, } -impl CommitsOut { +impl> CommitsOut { /// Create a new commit output stream. pub(crate) fn new(network: N, set_id: u64, is_voter: bool) -> Self { CommitsOut { @@ -487,3 +498,9 @@ impl> Sink for CommitsOut { fn close(&mut self) -> Poll<(), Error> { Ok(Async::Ready(())) } fn poll_complete(&mut self) -> Poll<(), Error> { Ok(Async::Ready(())) } } + +impl> Drop for CommitsOut { + fn drop(&mut self) { + self.network.drop_set_messages(self.set_id); + } +} diff --git a/substrate/core/finality-grandpa/src/lib.rs b/substrate/core/finality-grandpa/src/lib.rs index fd2828b9bb..08e95a2955 100644 --- a/substrate/core/finality-grandpa/src/lib.rs +++ b/substrate/core/finality-grandpa/src/lib.rs @@ -231,7 +231,10 @@ pub trait Network: Clone { fn send_message(&self, round: u64, set_id: u64, message: Vec); /// Clean up messages for a round. - fn drop_messages(&self, round: u64, set_id: u64); + fn drop_round_messages(&self, round: u64, set_id: u64); + + /// Clean up messages for a given authority set id (e.g. commit messages). + fn drop_set_messages(&self, set_id: u64); /// Get a stream of commit messages for a specific set-id. This stream /// should never logically conclude. @@ -283,9 +286,14 @@ impl, H: ExHashT self.service.gossip_consensus_message(topic, message, false); } - fn drop_messages(&self, round: u64, set_id: u64) { + fn drop_round_messages(&self, round: u64, set_id: u64) { let topic = message_topic::(round, set_id); - self.service.consensus_gossip().write().collect_garbage(|t| t == &topic); + self.service.consensus_gossip().write().collect_garbage_for_topic(topic); + } + + fn drop_set_messages(&self, set_id: u64) { + let topic = commit_topic::(set_id); + self.service.consensus_gossip().write().collect_garbage_for_topic(topic); } fn commit_messages(&self, set_id: u64) -> Self::In { @@ -294,7 +302,7 @@ impl, H: ExHashT fn send_commit(&self, _round: u64, set_id: u64, message: Vec) { let topic = commit_topic::(set_id); - self.service.gossip_consensus_message(topic, message, true); + self.service.gossip_consensus_message(topic, message, false); } fn announce(&self, round: u64, _set_id: u64, block: B::Hash) { diff --git a/substrate/core/finality-grandpa/src/tests.rs b/substrate/core/finality-grandpa/src/tests.rs index 1e2b25ad4b..a44bd3f32e 100644 --- a/substrate/core/finality-grandpa/src/tests.rs +++ b/substrate/core/finality-grandpa/src/tests.rs @@ -145,6 +145,15 @@ impl MessageRouting { peer_id, } } + + fn drop_messages(&self, topic: Hash) { + let inner = self.inner.lock(); + let peer = inner.peer(self.peer_id); + let mut gossip = peer.consensus_gossip().write(); + peer.with_spec(move |_, _| { + gossip.collect_garbage_for_topic(topic); + }); + } } fn make_topic(round: u64, set_id: u64) -> Hash { @@ -199,14 +208,14 @@ impl Network for MessageRouting { inner.route_until_complete(); } - fn drop_messages(&self, round: u64, set_id: u64) { + fn drop_round_messages(&self, round: u64, set_id: u64) { let topic = make_topic(round, set_id); - let inner = self.inner.lock(); - let peer = inner.peer(self.peer_id); - let mut gossip = peer.consensus_gossip().write(); - peer.with_spec(move |_, _| { - gossip.collect_garbage(|t| t == &topic) - }); + self.drop_messages(topic); + } + + fn drop_set_messages(&self, set_id: u64) { + let topic = make_commit_topic(set_id); + self.drop_messages(topic); } fn commit_messages(&self, set_id: u64) -> Self::In { @@ -226,7 +235,7 @@ impl Network for MessageRouting { fn send_commit(&self, _round: u64, set_id: u64, message: Vec) { let mut inner = self.inner.lock(); - inner.peer(self.peer_id).gossip_message(make_commit_topic(set_id), message, true); + inner.peer(self.peer_id).gossip_message(make_commit_topic(set_id), message, false); inner.route_until_complete(); } diff --git a/substrate/core/network/Cargo.toml b/substrate/core/network/Cargo.toml index 87052b7e41..1a3a230778 100644 --- a/substrate/core/network/Cargo.toml +++ b/substrate/core/network/Cargo.toml @@ -14,6 +14,7 @@ error-chain = "0.12" bitflags = "1.0" futures = "0.1.17" linked-hash-map = "0.5" +lru-cache = "0.1.1" rustc-hex = "2.0" rand = "0.6" substrate-primitives = { path = "../../core/primitives" } diff --git a/substrate/core/network/src/consensus_gossip.rs b/substrate/core/network/src/consensus_gossip.rs index 186919e509..a5a67b9507 100644 --- a/substrate/core/network/src/consensus_gossip.rs +++ b/substrate/core/network/src/consensus_gossip.rs @@ -18,18 +18,20 @@ //! Handles chain-specific and standard BFT messages. use std::collections::{HashMap, HashSet}; -use futures::sync::mpsc; use std::time::{Instant, Duration}; +use futures::sync::mpsc; use rand::{self, seq::SliceRandom}; +use lru_cache::LruCache; use network_libp2p::NodeIndex; -use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, Hash, HashFor}; +use runtime_primitives::traits::{Block as BlockT, Hash, HashFor}; use runtime_primitives::generic::BlockId; pub use message::generic::{Message, ConsensusMessage}; use protocol::Context; use config::Roles; // FIXME: Add additional spam/DoS attack protection: https://github.com/paritytech/substrate/issues/1115 -const MESSAGE_LIFETIME: Duration = Duration::from_secs(600); +const MESSAGE_LIFETIME: Duration = Duration::from_secs(120); +const DEAD_TOPICS_CACHE_SIZE: usize = 4096; struct PeerConsensus { known_messages: HashSet, @@ -49,6 +51,7 @@ pub struct ConsensusGossip { live_message_sinks: HashMap>>, messages: Vec>, known_messages: HashSet<(B::Hash, B::Hash)>, + known_dead_topics: LruCache, message_times: HashMap<(B::Hash, B::Hash), Instant>, session_start: Option, } @@ -61,6 +64,7 @@ impl ConsensusGossip { live_message_sinks: HashMap::new(), messages: Default::default(), known_messages: Default::default(), + known_dead_topics: LruCache::new(DEAD_TOPICS_CACHE_SIZE), message_times: Default::default(), session_start: None } @@ -150,7 +154,9 @@ impl ConsensusGossip { fn register_message(&mut self, message_hash: B::Hash, topic: B::Hash, broadcast: bool, get_message: F) where F: Fn() -> ConsensusMessage { - if self.known_messages.insert((topic, message_hash)) { + if !self.known_dead_topics.contains_key(&topic) && + self.known_messages.insert((topic, message_hash)) + { self.messages.push(MessageEntry { topic, message_hash, @@ -167,6 +173,11 @@ impl ConsensusGossip { self.peers.remove(&who); } + pub fn collect_garbage_for_topic(&mut self, topic: B::Hash) { + self.known_dead_topics.insert(topic, ()); + self.collect_garbage(|_| true); + } + /// Prune old or no longer relevant consensus messages. Provide a predicate /// for pruning, which returns `false` when the items with a given topic should be pruned. pub fn collect_garbage bool>(&mut self, predicate: P) { @@ -177,22 +188,24 @@ impl ConsensusGossip { let message_times = &mut self.message_times; let known_messages = &mut self.known_messages; + let known_dead_topics = &mut self.known_dead_topics; let before = self.messages.len(); let now = Instant::now(); self.messages.retain(|entry| { - message_times.get(&(entry.topic, entry.message_hash)) - .map(|instant| *instant + MESSAGE_LIFETIME >= now && predicate(&entry.topic)) - .unwrap_or(false) + !known_dead_topics.contains_key(&entry.topic) && + message_times.get(&(entry.topic, entry.message_hash)) + .map(|instant| *instant + MESSAGE_LIFETIME >= now && predicate(&entry.topic)) + .unwrap_or(false) }); known_messages.retain(|(topic, message_hash)| { message_times.get(&(*topic, *message_hash)) - .map(|instant| *instant + (2 * MESSAGE_LIFETIME) >= now && predicate(topic)) + .map(|instant| *instant + (5 * MESSAGE_LIFETIME) >= now) .unwrap_or(false) }); - trace!(target:"gossip", "Cleaned up {} stale messages, {} left ({} known)", + trace!(target: "gossip", "Cleaned up {} stale messages, {} left ({} known)", before - self.messages.len(), self.messages.len(), known_messages.len(), @@ -230,26 +243,16 @@ impl ConsensusGossip { ) -> Option<(B::Hash, ConsensusMessage)> { let message_hash = HashFor::::hash(&message[..]); + if self.known_dead_topics.contains_key(&topic) { + trace!(target:"gossip", "Ignored message from {} in dead topic {}", who, topic); + return None; + } + if self.known_messages.contains(&(topic, message_hash)) { trace!(target:"gossip", "Ignored already known message from {} in {}", who, topic); return None; } - match (protocol.client().info(), protocol.client().header(&BlockId::Hash(topic))) { - (_, Err(e)) | (Err(e), _) => { - debug!(target:"gossip", "Error reading blockchain: {:?}", e); - return None; - }, - (Ok(info), Ok(Some(header))) => { - if header.number() < &info.chain.best_number { - trace!(target:"gossip", "Ignored ancient message from {}, hash={}", who, topic); - return None; - } - }, - (Ok(_), Ok(None)) => {}, - } - - if let Some(ref mut peer) = self.peers.get_mut(&who) { use std::collections::hash_map::Entry; peer.known_messages.insert((topic, message_hash)); @@ -316,6 +319,20 @@ mod tests { type Block = RawBlock>; + macro_rules! push_msg { + ($consensus:expr, $topic:expr, $hash: expr, $now: expr, $m:expr) => { + if $consensus.known_messages.insert(($topic, $hash)) { + $consensus.messages.push(MessageEntry { + topic: $topic, + message_hash: $hash, + message: $m, + broadcast: false, + }); + $consensus.message_times.insert(($topic, $hash), $now); + } + } + } + #[test] fn collects_garbage() { let prev_hash = H256::random(); @@ -327,20 +344,8 @@ mod tests { let m1 = vec![1, 2, 3]; let m2 = vec![4, 5, 6]; - macro_rules! push_msg { - ($topic:expr, $hash: expr, $now: expr, $m:expr) => { - consensus.messages.push(MessageEntry { - topic: $topic, - message_hash: $hash, - message: $m, - broadcast: false, - }); - consensus.message_times.insert(($topic, $hash), $now); - } - } - - push_msg!(prev_hash, m1_hash, now, m1); - push_msg!(best_hash, m2_hash, now, m2.clone()); + push_msg!(consensus, prev_hash, m1_hash, now, m1); + push_msg!(consensus, best_hash, m2_hash, now, m2.clone()); consensus.known_messages.insert((prev_hash, m1_hash)); consensus.known_messages.insert((best_hash, m2_hash)); @@ -357,23 +362,52 @@ mod tests { // topic that was used in one message. consensus.collect_garbage(|topic| topic != &prev_hash); assert_eq!(consensus.messages.len(), 1); - assert_eq!(consensus.known_messages.len(), 1); + // known messages are only pruned based on expiration time + assert_eq!(consensus.known_messages.len(), 2); assert!(consensus.known_messages.contains(&(best_hash, m2_hash))); // make timestamp expired, but the message is still kept as known consensus.messages.clear(); - push_msg!(best_hash, m2_hash, now - MESSAGE_LIFETIME, m2.clone()); + consensus.known_messages.clear(); + push_msg!(consensus, best_hash, m2_hash, now - MESSAGE_LIFETIME, m2.clone()); consensus.collect_garbage(|_topic| true); assert!(consensus.messages.is_empty()); assert_eq!(consensus.known_messages.len(), 1); // make timestamp expired past the known message lifetime - push_msg!(best_hash, m2_hash, now - (2 * MESSAGE_LIFETIME), m2); + consensus.known_messages.clear(); + push_msg!(consensus, best_hash, m2_hash, now - (5 * MESSAGE_LIFETIME), m2); consensus.collect_garbage(|_topic| true); assert!(consensus.messages.is_empty()); assert!(consensus.known_messages.is_empty()); } + #[test] + fn collects_garbage_for_topic() { + let topic = H256::random(); + let dead_topic = H256::random(); + let message = Vec::new(); + let now = Instant::now(); + let mut consensus = ConsensusGossip::::new(); + + let message_hash = H256::random(); + push_msg!(consensus, topic, message_hash, now, message.clone()); + push_msg!(consensus, dead_topic, message_hash, now, message.clone()); + assert_eq!(consensus.messages.len(), 2); + + consensus.collect_garbage_for_topic(topic); + + // removes all messages for the topic and marks the topic as dead + assert_eq!(consensus.messages.len(), 1); + assert_eq!(consensus.known_messages.len(), 2); + assert_eq!(consensus.known_dead_topics.len(), 1); + + // new messages for dead topics are ignored + consensus.register_message(HashFor::::hash(&message), topic, false, || message.clone()); + assert_eq!(consensus.messages.len(), 1); + assert_eq!(consensus.known_messages.len(), 2); + } + #[test] fn message_stream_include_those_sent_before_asking_for_stream() { use futures::Stream; diff --git a/substrate/core/network/src/lib.rs b/substrate/core/network/src/lib.rs index 95b6d68113..d2868f1a5c 100644 --- a/substrate/core/network/src/lib.rs +++ b/substrate/core/network/src/lib.rs @@ -21,6 +21,7 @@ //! Allows attachment of an optional subprotocol for chain-specific requests. extern crate linked_hash_map; +extern crate lru_cache; extern crate parking_lot; extern crate substrate_primitives as primitives; extern crate substrate_client as client;