Minor gossip changes (#2038)

* core: gossip: don't expire messages based on time

* core: gossip: allow forcing resend of gossip messages

* core: grandpa: fix tests
This commit is contained in:
André Silva
2019-03-19 12:13:05 +01:00
committed by Robert Habermeier
parent 002143d0a2
commit e56a5cd00b
7 changed files with 70 additions and 70 deletions
+18 -43
View File
@@ -19,7 +19,6 @@
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::{Instant, Duration};
use log::{trace, debug};
use futures::sync::mpsc;
use rand::{self, seq::SliceRandom};
@@ -32,7 +31,6 @@ use crate::config::Roles;
use crate::ConsensusEngineId;
// FIXME: Add additional spam/DoS attack protection: https://github.com/paritytech/substrate/issues/1115
const MESSAGE_LIFETIME: Duration = Duration::from_secs(120);
const KNOWN_MESSAGES_CACHE_SIZE: usize = 4096;
struct PeerConsensus<H> {
@@ -50,7 +48,6 @@ struct MessageEntry<B: BlockT> {
message_hash: B::Hash,
topic: B::Hash,
message: ConsensusMessage,
timestamp: Instant,
status: Status,
}
@@ -115,11 +112,9 @@ impl<B: BlockT> ConsensusGossip<B> {
pub fn new_peer(&mut self, protocol: &mut Context<B>, who: NodeIndex, roles: Roles) {
if roles.intersects(Roles::AUTHORITY) {
trace!(target:"gossip", "Registering {:?} {}", roles, who);
let now = Instant::now();
// Send out all known messages to authorities.
let mut known_messages = HashSet::new();
for entry in self.messages.iter() {
if entry.timestamp + MESSAGE_LIFETIME < now { continue }
if let Status::Future = entry.status { continue }
known_messages.insert(entry.message_hash);
@@ -143,12 +138,13 @@ impl<B: BlockT> ConsensusGossip<B> {
protocol: &mut Context<B>,
message_hash: B::Hash,
get_message: F,
force: bool,
)
where F: Fn() -> ConsensusMessage,
{
let mut non_authorities: Vec<_> = self.peers.iter()
.filter_map(|(id, ref peer)|
if !peer.is_authority && !peer.known_messages.contains(&message_hash) {
if !peer.is_authority && (!peer.known_messages.contains(&message_hash) || force) {
Some(*id)
} else {
None
@@ -165,17 +161,15 @@ impl<B: BlockT> ConsensusGossip<B> {
for (id, ref mut peer) in self.peers.iter_mut() {
if peer.is_authority {
if peer.known_messages.insert(message_hash.clone()) {
if peer.known_messages.insert(message_hash.clone()) || force {
let message = get_message();
trace!(target:"gossip", "Propagating to authority {}: {:?}", id, message);
protocol.send_message(*id, Message::Consensus(message));
}
} else if non_authorities.contains(&id) {
if peer.known_messages.insert(message_hash.clone()) {
let message = get_message();
trace!(target:"gossip", "Propagating to {}: {:?}", id, message);
protocol.send_message(*id, Message::Consensus(message));
}
let message = get_message();
trace!(target:"gossip", "Propagating to {}: {:?}", id, message);
protocol.send_message(*id, Message::Consensus(message));
}
}
}
@@ -194,7 +188,6 @@ impl<B: BlockT> ConsensusGossip<B> {
topic,
message_hash,
message: get_message(),
timestamp: Instant::now(),
status,
});
}
@@ -218,7 +211,6 @@ impl<B: BlockT> ConsensusGossip<B> {
let known_messages = &mut self.known_messages;
let before = self.messages.len();
let validators = &self.validators;
let now = Instant::now();
let mut check_fns = HashMap::new();
let mut message_expired = move |entry: &MessageEntry<B>| {
@@ -234,9 +226,7 @@ impl<B: BlockT> ConsensusGossip<B> {
(check_fn)(entry.topic, &entry.message.data)
};
self.messages.retain(|entry|
entry.timestamp + MESSAGE_LIFETIME >= now && !message_expired(entry)
);
self.messages.retain(|entry| !message_expired(entry));
trace!(target: "gossip", "Cleaned up {} stale messages, {} left ({} known)",
before - self.messages.len(),
@@ -332,7 +322,7 @@ impl<B: BlockT> ConsensusGossip<B> {
Some(ValidationResult::Expired) => {
trace!(target:"gossip", "Ignored expired message from {}", who);
return None;
}
},
None => {
protocol.report_peer(
who,
@@ -341,7 +331,7 @@ impl<B: BlockT> ConsensusGossip<B> {
trace!(target:"gossip", "Unknown message engine id {:?} from {}",
engine_id, who);
return None;
}
},
};
peer.known_messages.insert(message_hash);
@@ -357,7 +347,7 @@ impl<B: BlockT> ConsensusGossip<B> {
entry.remove_entry();
}
}
self.multicast_inner(protocol, message_hash, topic, status, || message.clone());
self.multicast_inner(protocol, message_hash, topic, status, || message.clone(), false);
Some((topic, message))
} else {
trace!(target:"gossip", "Ignored statement from unregistered peer {}", who);
@@ -371,9 +361,10 @@ impl<B: BlockT> ConsensusGossip<B> {
protocol: &mut Context<B>,
topic: B::Hash,
message: ConsensusMessage,
force: bool,
) {
let message_hash = HashFor::<B>::hash(&message.data);
self.multicast_inner(protocol, message_hash, topic, Status::Live, || message.clone());
self.multicast_inner(protocol, message_hash, topic, Status::Live, || message.clone(), force);
}
fn multicast_inner<F>(
@@ -383,25 +374,20 @@ impl<B: BlockT> ConsensusGossip<B> {
topic: B::Hash,
status: Status,
get_message: F,
force: bool,
)
where F: Fn() -> ConsensusMessage
{
self.register_message(message_hash, topic, status, &get_message);
if let Status::Live = status {
self.propagate(protocol, message_hash, get_message);
self.propagate(protocol, message_hash, get_message, force);
}
}
/// Note new consensus session.
pub fn new_session(&mut self, _parent_hash: B::Hash) {
self.collect_garbage();
}
}
#[cfg(test)]
mod tests {
use runtime_primitives::testing::{H256, Block as RawBlock, ExtrinsicWrapper};
use std::time::Instant;
use futures::Stream;
use super::*;
@@ -409,13 +395,12 @@ mod tests {
type Block = RawBlock<ExtrinsicWrapper<u64>>;
macro_rules! push_msg {
($consensus:expr, $topic:expr, $hash: expr, $now: expr, $m:expr) => {
($consensus:expr, $topic:expr, $hash: expr, $m:expr) => {
if $consensus.known_messages.insert($hash, ()).is_none() {
$consensus.messages.push(MessageEntry {
topic: $topic,
message_hash: $hash,
message: ConsensusMessage { data: $m, engine_id: [0, 0, 0, 0]},
timestamp: $now,
message: ConsensusMessage { data: $m, engine_id: [0, 0, 0, 0] },
status: Status::Live,
});
}
@@ -450,9 +435,8 @@ mod tests {
let m1 = vec![1, 2, 3];
let m2 = vec![4, 5, 6];
let now = Instant::now();
push_msg!(consensus, prev_hash, m1_hash, now, m1);
push_msg!(consensus, best_hash, m2_hash, now, m2.clone());
push_msg!(consensus, prev_hash, m1_hash, m1);
push_msg!(consensus, best_hash, m2_hash, m2.clone());
consensus.known_messages.insert(m1_hash, ());
consensus.known_messages.insert(m2_hash, ());
@@ -470,15 +454,6 @@ mod tests {
// known messages are only pruned based on size.
assert_eq!(consensus.known_messages.len(), 2);
assert!(consensus.known_messages.contains_key(&m2_hash));
// make timestamp expired, but the message is still kept as known
consensus.messages.clear();
consensus.known_messages.clear();
consensus.register_validator(test_engine_id, Arc::new(AllowAll));
push_msg!(consensus, best_hash, m2_hash, now - MESSAGE_LIFETIME, m2.clone());
consensus.collect_garbage();
assert!(consensus.messages.is_empty());
assert_eq!(consensus.known_messages.len(), 1);
}
#[test]
+11 -4
View File
@@ -240,7 +240,7 @@ pub enum ProtocolMsg<B: BlockT, S: NetworkSpecialization<B>> {
/// Execute a closure with the consensus gossip.
ExecuteWithGossip(Box<GossipTask<B> + Send + 'static>),
/// Incoming gossip consensus message.
GossipConsensusMessage(B::Hash, ConsensusEngineId, Vec<u8>),
GossipConsensusMessage(B::Hash, ConsensusEngineId, Vec<u8>, bool),
/// Tell protocol to abort sync (does not stop protocol).
/// Only used in tests.
#[cfg(any(test, feature = "test-helpers"))]
@@ -380,8 +380,8 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
ProtocolContext::new(&mut self.context_data, &self.network_chan);
task.call_box(&mut self.consensus_gossip, &mut context);
}
ProtocolMsg::GossipConsensusMessage(topic, engine_id, message) => {
self.gossip_consensus_message(topic, engine_id, message)
ProtocolMsg::GossipConsensusMessage(topic, engine_id, message, force) => {
self.gossip_consensus_message(topic, engine_id, message, force)
}
ProtocolMsg::BlocksProcessed(hashes, has_error) => {
self.sync.blocks_processed(hashes, has_error);
@@ -503,11 +503,18 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
);
}
fn gossip_consensus_message(&mut self, topic: B::Hash, engine_id: ConsensusEngineId, message: Vec<u8>) {
fn gossip_consensus_message(
&mut self,
topic: B::Hash,
engine_id: ConsensusEngineId,
message: Vec<u8>,
force: bool,
) {
self.consensus_gossip.multicast(
&mut ProtocolContext::new(&mut self.context_data, &self.network_chan),
topic,
ConsensusMessage{ data: message, engine_id },
force,
);
}
+8 -2
View File
@@ -247,11 +247,17 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> Service<B, S> {
}
/// Send a consensus message through the gossip
pub fn gossip_consensus_message(&self, topic: B::Hash, engine_id: ConsensusEngineId, message: Vec<u8>) {
pub fn gossip_consensus_message(
&self,
topic: B::Hash,
engine_id: ConsensusEngineId,
message: Vec<u8>,
force: bool,
) {
let _ = self
.protocol_sender
.send(ProtocolMsg::GossipConsensusMessage(
topic, engine_id, message,
topic, engine_id, message, force,
));
}
+8 -2
View File
@@ -355,10 +355,16 @@ impl<D, S: NetworkSpecialization<Block> + Clone> Peer<D, S> {
/// Push a message into the gossip network and relay to peers.
/// `TestNet::sync_step` needs to be called to ensure it's propagated.
pub fn gossip_message(&self, topic: <Block as BlockT>::Hash, engine_id: ConsensusEngineId, data: Vec<u8>) {
pub fn gossip_message(
&self,
topic: <Block as BlockT>::Hash,
engine_id: ConsensusEngineId,
data: Vec<u8>,
force: bool,
) {
let _ = self
.protocol_sender
.send(ProtocolMsg::GossipConsensusMessage(topic, engine_id, data));
.send(ProtocolMsg::GossipConsensusMessage(topic, engine_id, data, force));
}
pub fn consensus_gossip_collect_garbage_for_topic(&self, _topic: <Block as BlockT>::Hash) {