grandpa: progressively increase target gossip peers (#4050)

* grandpa: stricter gossip message filtering

* gossip: remove filtered message on send_message

* gossip: add test for tracking of broadcast attempts

* grandpa: only restrict gossip if we're connected to more than 5 authorities

* grandpa: add test for progressive gossip

* grandpa: add test for gossip filtering on local non-authority node

* grandpa: fix doc

* gossip, grandpa: tabify

* grandpa: relax filtering logic for global messages
This commit is contained in:
André Silva
2019-11-08 20:08:14 +01:00
committed by Gavin Wood
parent 77407e1694
commit a8ce80b72d
2 changed files with 469 additions and 18 deletions
@@ -73,6 +73,7 @@ const UNREGISTERED_TOPIC_REPUTATION_CHANGE: i32 = -(1 << 10);
struct PeerConsensus<H> {
known_messages: HashSet<H>,
filtered_messages: HashMap<H, usize>,
roles: Roles,
}
@@ -104,9 +105,14 @@ pub enum MessageRecipient {
/// The reason for sending out the message.
#[derive(Eq, PartialEq, Copy, Clone)]
#[cfg_attr(test, derive(Debug))]
pub enum MessageIntent {
/// Requested broadcast
Broadcast,
/// Requested broadcast.
Broadcast {
/// How many times this message was previously filtered by the gossip
/// validator when trying to propagate to a given peer.
previous_attempts: usize
},
/// Requested broadcast to all peers.
ForcedBroadcast,
/// Periodic rebroadcast of all messages to all peers.
@@ -123,6 +129,12 @@ pub enum ValidationResult<H> {
Discard,
}
impl MessageIntent {
fn broadcast() -> MessageIntent {
MessageIntent::Broadcast { previous_attempts: 0 }
}
}
/// Validation context. Allows reacting to incoming messages by sending out further messages.
pub trait ValidatorContext<B: BlockT> {
/// Broadcast all messages with given topic to peers that do not have it yet.
@@ -196,12 +208,17 @@ fn propagate<'a, B: BlockT, I>(
for (message_hash, topic, message) in messages {
for (id, ref mut peer) in peers.iter_mut() {
let previous_attempts = peer.filtered_messages
.get(&message_hash)
.cloned()
.unwrap_or(0);
let intent = match intent {
MessageIntent::Broadcast =>
MessageIntent::Broadcast { .. } =>
if peer.known_messages.contains(&message_hash) {
continue
continue;
} else {
MessageIntent::Broadcast
MessageIntent::Broadcast { previous_attempts }
},
MessageIntent::PeriodicRebroadcast =>
if peer.known_messages.contains(&message_hash) {
@@ -209,15 +226,24 @@ fn propagate<'a, B: BlockT, I>(
} else {
// peer doesn't know message, so the logic should treat it as an
// initial broadcast.
MessageIntent::Broadcast
MessageIntent::Broadcast { previous_attempts }
},
other => other,
};
if !message_allowed(id, intent, &topic, &message) {
continue
let count = peer.filtered_messages
.entry(message_hash.clone())
.or_insert(0);
*count += 1;
continue;
}
peer.filtered_messages.remove(message_hash);
peer.known_messages.insert(message_hash.clone());
trace!(target: "gossip", "Propagating to {}: {:?}", id, message);
protocol.send_consensus(id.clone(), message.clone());
}
@@ -310,6 +336,7 @@ impl<B: BlockT> ConsensusGossip<B> {
trace!(target:"gossip", "Registering {:?} {}", roles, who);
self.peers.insert(who.clone(), PeerConsensus {
known_messages: HashSet::new(),
filtered_messages: HashMap::new(),
roles,
});
for (engine_id, v) in self.validators.clone() {
@@ -379,7 +406,7 @@ impl<B: BlockT> ConsensusGossip<B> {
.filter_map(|entry|
if entry.topic == topic { Some((&entry.message_hash, &entry.topic, &entry.message)) } else { None }
);
let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::Broadcast };
let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::broadcast() };
propagate(protocol, messages, intent, &mut self.peers, &self.validators);
}
@@ -527,17 +554,36 @@ impl<B: BlockT> ConsensusGossip<B> {
Some(validator) => validator.message_allowed(),
};
let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::Broadcast };
if let Some(ref mut peer) = self.peers.get_mut(who) {
for entry in self.messages.iter().filter(|m| m.topic == topic && m.message.engine_id == engine_id) {
let intent = if force {
MessageIntent::ForcedBroadcast
} else {
let previous_attempts = peer.filtered_messages
.get(&entry.message_hash)
.cloned()
.unwrap_or(0);
MessageIntent::Broadcast { previous_attempts }
};
if !force && peer.known_messages.contains(&entry.message_hash) {
continue
continue;
}
if !message_allowed(who, intent, &entry.topic, &entry.message.data) {
continue
let count = peer.filtered_messages
.entry(entry.message_hash)
.or_insert(0);
*count += 1;
continue;
}
peer.filtered_messages.remove(&entry.message_hash);
peer.known_messages.insert(entry.message_hash.clone());
trace!(target: "gossip", "Sending topic message to {}: {:?}", who, entry.message);
protocol.send_consensus(who.clone(), ConsensusMessage {
engine_id: engine_id.clone(),
@@ -557,7 +603,7 @@ impl<B: BlockT> ConsensusGossip<B> {
) {
let message_hash = HashFor::<B>::hash(&message.data);
self.register_message_hashed(message_hash, topic, message.clone(), None);
let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::Broadcast };
let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::broadcast() };
propagate(protocol, iter::once((&message_hash, &topic, &message)), intent, &mut self.peers, &self.validators);
}
@@ -578,7 +624,9 @@ impl<B: BlockT> ConsensusGossip<B> {
trace!(target: "gossip", "Sending direct to {}: {:?}", who, message);
peer.filtered_messages.remove(&message_hash);
peer.known_messages.insert(message_hash);
protocol.send_consensus(who.clone(), message.clone());
}
}
@@ -607,6 +655,8 @@ impl<B: BlockT> Validator<B> for DiscardAll {
#[cfg(test)]
mod tests {
use std::sync::{Arc, atomic::{AtomicBool, Ordering}};
use parking_lot::Mutex;
use sr_primitives::testing::{H256, Block as RawBlock, ExtrinsicWrapper};
use futures03::executor::block_on_stream;
@@ -657,7 +707,7 @@ mod tests {
}
fn message_expired<'a>(&'a self) -> Box<dyn FnMut(H256, &[u8]) -> bool + 'a> {
Box::new(move |_topic, data| data[0] != 1 )
Box::new(move |_topic, data| data[0] != 1)
}
}
@@ -755,4 +805,109 @@ mod tests {
let _ = consensus.live_message_sinks.remove(&([0, 0, 0, 0], topic));
assert_eq!(stream.next(), None);
}
#[test]
fn keeps_track_of_broadcast_attempts() {
struct DummyNetworkContext;
impl<B: BlockT> Context<B> for DummyNetworkContext {
fn report_peer(&mut self, _who: PeerId, _reputation: i32) {}
fn disconnect_peer(&mut self, _who: PeerId) {}
fn send_consensus(&mut self, _who: PeerId, _consensus: ConsensusMessage) {}
fn send_chain_specific(&mut self, _who: PeerId, _message: Vec<u8>) {}
}
// A mock gossip validator that never expires any message, allows
// setting whether messages should be allowed and keeps track of any
// messages passed to `message_allowed`.
struct MockValidator {
allow: AtomicBool,
messages: Arc<Mutex<Vec<(Vec<u8>, MessageIntent)>>>,
}
impl MockValidator {
fn new() -> MockValidator {
MockValidator {
allow: AtomicBool::new(false),
messages: Arc::new(Mutex::new(Vec::new())),
}
}
}
impl Validator<Block> for MockValidator {
fn validate(
&self,
_context: &mut dyn ValidatorContext<Block>,
_sender: &PeerId,
_data: &[u8],
) -> ValidationResult<H256> {
ValidationResult::ProcessAndKeep(H256::default())
}
fn message_expired<'a>(&'a self) -> Box<dyn FnMut(H256, &[u8]) -> bool + 'a> {
Box::new(move |_topic, _data| false)
}
fn message_allowed<'a>(&'a self) -> Box<dyn FnMut(&PeerId, MessageIntent, &H256, &[u8]) -> bool + 'a> {
let messages = self.messages.clone();
Box::new(move |_, intent, _, data| {
messages.lock().push((data.to_vec(), intent));
self.allow.load(Ordering::SeqCst)
})
}
}
// we setup an instance of the mock gossip validator, add a new peer to
// it and register a message.
let mut consensus = ConsensusGossip::<Block>::new();
let validator = Arc::new(MockValidator::new());
consensus.register_validator_internal([0, 0, 0, 0], validator.clone());
consensus.new_peer(
&mut DummyNetworkContext,
PeerId::random(),
Roles::AUTHORITY,
);
let data = vec![1, 2, 3];
let msg = ConsensusMessage { data: data.clone(), engine_id: [0, 0, 0, 0] };
consensus.register_message(H256::default(), msg);
// tick the gossip handler and make sure it triggers a message rebroadcast
let mut tick = || {
consensus.next_broadcast = std::time::Instant::now();
consensus.tick(&mut DummyNetworkContext);
};
// by default we won't allow the message we registered, so everytime we
// tick the gossip handler, the message intent should be kept as
// `Broadcast` but the previous attempts should be incremented.
tick();
assert_eq!(
validator.messages.lock().pop().unwrap(),
(data.clone(), MessageIntent::Broadcast { previous_attempts: 0 }),
);
tick();
assert_eq!(
validator.messages.lock().pop().unwrap(),
(data.clone(), MessageIntent::Broadcast { previous_attempts: 1 }),
);
// we set the validator to allow the message to go through
validator.allow.store(true, Ordering::SeqCst);
// we still get the same message intent but it should be delivered now
tick();
assert_eq!(
validator.messages.lock().pop().unwrap(),
(data.clone(), MessageIntent::Broadcast { previous_attempts: 2 }),
);
// ticking the gossip handler again the message intent should change to
// `PeriodicRebroadcast` since it was sent.
tick();
assert_eq!(
validator.messages.lock().pop().unwrap(),
(data.clone(), MessageIntent::PeriodicRebroadcast),
);
}
}