Some gossip improvements (#1892)

* queue messages in future

* use new gossip API in GRANDPA

* implement message_expired for grandpa

* fix indent
This commit is contained in:
Robert Habermeier
2019-02-28 14:19:42 -05:00
committed by Gav Wood
parent c21d7436cc
commit 4c8fec17fc
4 changed files with 200 additions and 81 deletions
+139 -44
View File
@@ -40,17 +40,26 @@ struct PeerConsensus<H> {
is_authority: bool,
}
#[derive(Clone, Copy)]
enum Status {
Live,
Future,
}
struct MessageEntry<B: BlockT> {
message_hash: B::Hash,
topic: B::Hash,
message: ConsensusMessage,
timestamp: Instant,
status: Status,
}
/// Message validation result.
pub enum ValidationResult<H> {
/// Message is valid with this topic.
Valid(H),
/// Message is future with this topic.
Future(H),
/// Invalid message.
Invalid,
/// Obsolete message.
@@ -61,12 +70,20 @@ pub enum ValidationResult<H> {
pub trait Validator<H> {
/// Validate consensus message.
fn validate(&self, data: &[u8]) -> ValidationResult<H>;
/// Produce a closure for validating messages on a given topic.
fn message_expired<'a>(&'a self) -> Box<FnMut(H, &[u8]) -> bool + 'a> {
Box::new(move |_topic, data| match self.validate(data) {
ValidationResult::Valid(_) | ValidationResult::Future(_) => false,
ValidationResult::Invalid | ValidationResult::Expired => true,
})
}
}
/// Consensus network protocol handler. Manages statements and candidate requests.
pub struct ConsensusGossip<B: BlockT> {
peers: HashMap<NodeIndex, PeerConsensus<B::Hash>>,
live_message_sinks: HashMap<B::Hash, Vec<mpsc::UnboundedSender<Vec<u8>>>>,
live_message_sinks: HashMap<(ConsensusEngineId, B::Hash), Vec<mpsc::UnboundedSender<Vec<u8>>>>,
messages: Vec<MessageEntry<B>>,
known_messages: LruCache<B::Hash, ()>,
validators: HashMap<ConsensusEngineId, Arc<Validator<B::Hash>>>,
@@ -102,7 +119,9 @@ impl<B: BlockT> ConsensusGossip<B> {
// Send out all known messages to authorities.
let mut known_messages = HashSet::new();
for entry in self.messages.iter() {
if entry.timestamp + MESSAGE_LIFETIME < now { continue };
if entry.timestamp + MESSAGE_LIFETIME < now { continue }
if let Status::Future = entry.status { continue }
known_messages.insert(entry.message_hash);
protocol.send_message(who, Message::Consensus(entry.message.clone()));
}
@@ -161,18 +180,23 @@ impl<B: BlockT> ConsensusGossip<B> {
}
}
fn register_message<F>(&mut self, message_hash: B::Hash, topic: B::Hash, get_message: F)
fn register_message<F>(
&mut self,
message_hash: B::Hash,
topic: B::Hash,
status: Status,
get_message: F,
)
where F: Fn() -> ConsensusMessage
{
if self.known_messages.insert(message_hash, ()).is_none()
{
if self.known_messages.insert(message_hash, ()).is_none() {
self.messages.push(MessageEntry {
topic,
message_hash,
message: get_message(),
timestamp: Instant::now(),
status,
});
}
}
@@ -184,6 +208,8 @@ impl<B: BlockT> ConsensusGossip<B> {
/// Prune old or no longer relevant consensus messages. Provide a predicate
/// for pruning, which returns `false` when the items with a given topic should be pruned.
pub fn collect_garbage(&mut self) {
use std::collections::hash_map::Entry;
self.live_message_sinks.retain(|_, sinks| {
sinks.retain(|sink| !sink.is_closed());
!sinks.is_empty()
@@ -194,15 +220,23 @@ impl<B: BlockT> ConsensusGossip<B> {
let validators = &self.validators;
let now = Instant::now();
self.messages.retain(|entry| {
entry.timestamp + MESSAGE_LIFETIME >= now
&& match validators.get(&entry.message.engine_id)
.map(|v| v.validate(&entry.message.data))
{
Some(ValidationResult::Valid(_)) => true,
_ => false,
}
});
let mut check_fns = HashMap::new();
let mut message_expired = move |entry: &MessageEntry<B>| {
let engine_id = entry.message.engine_id;
let check_fn = match check_fns.entry(engine_id) {
Entry::Occupied(entry) => entry.into_mut(),
Entry::Vacant(vacant) => match validators.get(&engine_id) {
None => return true, // treat all messages with no validator as expired
Some(validator) => vacant.insert(validator.message_expired()),
}
};
(check_fn)(entry.topic, &entry.message.data)
};
self.messages.retain(|entry|
entry.timestamp + MESSAGE_LIFETIME >= now && !message_expired(entry)
);
trace!(target: "gossip", "Cleaned up {} stale messages, {} left ({} known)",
before - self.messages.len(),
@@ -216,12 +250,46 @@ impl<B: BlockT> ConsensusGossip<B> {
}
/// Get data of valid, incoming messages for a topic (but might have expired meanwhile)
pub fn messages_for(&mut self, topic: B::Hash) -> mpsc::UnboundedReceiver<Vec<u8>> {
pub fn messages_for(&mut self, engine_id: ConsensusEngineId, topic: B::Hash)
-> mpsc::UnboundedReceiver<Vec<u8>>
{
let (tx, rx) = mpsc::unbounded();
for entry in self.messages.iter().filter(|e| e.topic == topic) {
tx.unbounded_send(entry.message.data.clone()).expect("receiver known to be live; qed");
let validator = match self.validators.get(&engine_id) {
None => {
self.live_message_sinks.entry((engine_id, topic)).or_default().push(tx);
return rx;
}
Some(v) => v,
};
for entry in self.messages.iter_mut()
.filter(|e| e.topic == topic && e.message.engine_id == engine_id)
{
let live = match entry.status {
Status::Live => true,
Status::Future => match validator.validate(&entry.message.data) {
ValidationResult::Valid(_) => {
entry.status = Status::Live;
true
}
_ => {
// don't send messages considered to be future still.
// if messages are considered expired they'll be cleaned up when we
// collect garbage.
false
}
}
};
if live {
entry.status = Status::Live;
tx.unbounded_send(entry.message.data.clone())
.expect("receiver known to be live; qed");
}
}
self.live_message_sinks.entry(topic).or_default().push(tx);
self.live_message_sinks.entry((engine_id, topic)).or_default().push(tx);
rx
}
@@ -247,11 +315,13 @@ impl<B: BlockT> ConsensusGossip<B> {
if let Some(ref mut peer) = self.peers.get_mut(&who) {
use std::collections::hash_map::Entry;
let engine_id = message.engine_id;
//validate the message
let topic = match self.validators.get(&message.engine_id)
let (topic, status) = match self.validators.get(&engine_id)
.map(|v| v.validate(&message.data))
{
Some(ValidationResult::Valid(topic)) => topic,
Some(ValidationResult::Valid(topic)) => (topic, Status::Live),
Some(ValidationResult::Future(topic)) => (topic, Status::Future),
Some(ValidationResult::Invalid) => {
trace!(target:"gossip", "Invalid message from {}", who);
protocol.report_peer(
@@ -275,13 +345,14 @@ impl<B: BlockT> ConsensusGossip<B> {
who,
Severity::Useless(format!("Sent unknown consensus engine id")),
);
trace!(target:"gossip", "Unknown message engine id {:?} from {}", message.engine_id, who);
trace!(target:"gossip", "Unknown message engine id {:?} from {}",
engine_id, who);
return None;
}
};
peer.known_messages.insert(message_hash);
if let Entry::Occupied(mut entry) = self.live_message_sinks.entry(topic) {
if let Entry::Occupied(mut entry) = self.live_message_sinks.entry((engine_id, topic)) {
debug!(target: "gossip", "Pushing consensus message to sinks for {}.", topic);
entry.get_mut().retain(|sink| {
if let Err(e) = sink.unbounded_send(message.data.clone()) {
@@ -293,7 +364,7 @@ impl<B: BlockT> ConsensusGossip<B> {
entry.remove_entry();
}
}
self.multicast_inner(protocol, message_hash, topic, || message.clone());
self.multicast_inner(protocol, message_hash, topic, status, || message.clone());
Some((topic, message))
} else {
trace!(target:"gossip", "Ignored statement from unregistered peer {}", who);
@@ -309,7 +380,7 @@ impl<B: BlockT> ConsensusGossip<B> {
message: ConsensusMessage,
) {
let message_hash = HashFor::<B>::hash(&message.data);
self.multicast_inner(protocol, message_hash, topic, || message.clone());
self.multicast_inner(protocol, message_hash, topic, Status::Live, || message.clone());
}
fn multicast_inner<F>(
@@ -317,12 +388,15 @@ impl<B: BlockT> ConsensusGossip<B> {
protocol: &mut Context<B>,
message_hash: B::Hash,
topic: B::Hash,
status: Status,
get_message: F,
)
where F: Fn() -> ConsensusMessage
{
self.register_message(message_hash, topic, &get_message);
self.propagate(protocol, message_hash, get_message);
self.register_message(message_hash, topic, status, &get_message);
if let Status::Live = status {
self.propagate(protocol, message_hash, get_message);
}
}
/// Note new consensus session.
@@ -335,6 +409,8 @@ impl<B: BlockT> ConsensusGossip<B> {
mod tests {
use runtime_primitives::testing::{H256, Block as RawBlock, ExtrinsicWrapper};
use std::time::Instant;
use futures::Stream;
use super::*;
type Block = RawBlock<ExtrinsicWrapper<u64>>;
@@ -347,21 +423,21 @@ mod tests {
message_hash: $hash,
message: ConsensusMessage { data: $m, engine_id: [0, 0, 0, 0]},
timestamp: $now,
status: Status::Live,
});
}
}
}
struct AllowAll;
impl Validator<H256> for AllowAll {
fn validate(&self, _data: &[u8]) -> ValidationResult<H256> {
ValidationResult::Valid(H256::default())
}
}
#[test]
fn collects_garbage() {
struct AllowAll;
impl Validator<H256> for AllowAll {
fn validate(&self, _data: &[u8]) -> ValidationResult<H256> {
ValidationResult::Valid(H256::default())
}
}
struct AllowOne;
impl Validator<H256> for AllowOne {
fn validate(&self, data: &[u8]) -> ValidationResult<H256> {
@@ -417,14 +493,15 @@ mod tests {
use futures::Stream;
let mut consensus = ConsensusGossip::<Block>::new();
consensus.register_validator([0, 0, 0, 0], Arc::new(AllowAll));
let message = ConsensusMessage { data: vec![4, 5, 6], engine_id: [0, 0, 0, 0] };
let message_hash = HashFor::<Block>::hash(&message.data);
let topic = HashFor::<Block>::hash(&[1,2,3]);
consensus.register_message(message_hash, topic, || message.clone());
let stream = consensus.messages_for(topic);
consensus.register_message(message_hash, topic, Status::Live, || message.clone());
let stream = consensus.messages_for([0, 0, 0, 0], topic);
assert_eq!(stream.wait().next(), Some(Ok(message.data)));
}
@@ -437,29 +514,47 @@ mod tests {
let msg_a = ConsensusMessage { data: vec![1, 2, 3], engine_id: [0, 0, 0, 0] };
let msg_b = ConsensusMessage { data: vec![4, 5, 6], engine_id: [0, 0, 0, 0] };
consensus.register_message(HashFor::<Block>::hash(&msg_a.data), topic, || msg_a.clone());
consensus.register_message(HashFor::<Block>::hash(&msg_b.data), topic, || msg_b.clone());
consensus.register_message(HashFor::<Block>::hash(&msg_a.data), topic, Status::Live, || msg_a.clone());
consensus.register_message(HashFor::<Block>::hash(&msg_b.data), topic, Status::Live, || msg_b.clone());
assert_eq!(consensus.messages.len(), 2);
}
#[test]
fn can_keep_multiple_subscribers_per_topic() {
use futures::Stream;
let mut consensus = ConsensusGossip::<Block>::new();
consensus.register_validator([0, 0, 0, 0], Arc::new(AllowAll));
let message = ConsensusMessage { data: vec![4, 5, 6], engine_id: [0, 0, 0, 0] };
let message_hash = HashFor::<Block>::hash(&message.data);
let topic = HashFor::<Block>::hash(&[1,2,3]);
consensus.register_message(message_hash, topic, || message.clone());
consensus.register_message(message_hash, topic, Status::Live, || message.clone());
let stream1 = consensus.messages_for(topic);
let stream2 = consensus.messages_for(topic);
let stream1 = consensus.messages_for([0, 0, 0, 0], topic);
let stream2 = consensus.messages_for([0, 0, 0, 0], topic);
assert_eq!(stream1.wait().next(), Some(Ok(message.data.clone())));
assert_eq!(stream2.wait().next(), Some(Ok(message.data)));
}
#[test]
fn topics_are_localized_to_engine_id() {
let mut consensus = ConsensusGossip::<Block>::new();
consensus.register_validator([0, 0, 0, 0], Arc::new(AllowAll));
let topic = [1; 32].into();
let msg_a = ConsensusMessage { data: vec![1, 2, 3], engine_id: [0, 0, 0, 0] };
let msg_b = ConsensusMessage { data: vec![4, 5, 6], engine_id: [0, 0, 0, 1] };
consensus.register_message(HashFor::<Block>::hash(&msg_a.data), topic, Status::Live, || msg_a.clone());
consensus.register_message(HashFor::<Block>::hash(&msg_b.data), topic, Status::Live, || msg_b.clone());
let mut stream = consensus.messages_for([0, 0, 0, 0], topic).wait();
assert_eq!(stream.next(), Some(Ok(vec![1, 2, 3])));
let _ = consensus.live_message_sinks.remove(&([0, 0, 0, 0], topic));
assert_eq!(stream.next(), None);
}
}
+2 -1
View File
@@ -369,11 +369,12 @@ impl<D, S: NetworkSpecialization<Block> + Clone> Peer<D, S> {
/// access the underlying consensus gossip handler
pub fn consensus_gossip_messages_for(
&self,
engine_id: ConsensusEngineId,
topic: <Block as BlockT>::Hash,
) -> mpsc::UnboundedReceiver<Vec<u8>> {
let (tx, rx) = oneshot::channel();
self.with_gossip(move |gossip, _| {
let inner_rx = gossip.messages_for(topic);
let inner_rx = gossip.messages_for(engine_id, topic);
let _ = tx.send(inner_rx);
});
rx.wait().ok().expect("1. Network is running, 2. it should handle the above closure successfully")