mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-27 06:57:58 +00:00
Initial version of the polite-grandpa networking protocol (#2110)
* Consensus status packet * Allow for repropagation after status * More generic gossip * add a basic view struct and gossip module * move gossip stuff to the gossip module * integrate view into gossip * some reshuffling * alter rules for keeping one commit at a time in view * Allow sending addressed messages * don't cast outgoing votes if we know that we voted before * Handle one hop messages * initial run at polite grandpa * build WASM * handle neighbor messages * refactor validator's internals into an Inner struct * gossip only knows to keep or discard messages. optimize should_send_to * Periodic rebroadcast * implement `should_send_to` and message_expired * track peers' best received commit height * Pass peer id to topic steam * kill rebroadcasting network * Notify about existing peers * clean up network APIs a bunch * implement gossip::send_message for direct messages * refactor network trait * implement gossip::send_message for direct messages * get all non set-change tests passing * treat unknown rebroadcasts as broadcasts * get all other main tests passing * remove unimplemented test * everything compiles * treat unknown rebroadcasts as broadcasts * Rebradcast interval * Apply suggestions from code review Style Co-Authored-By: arkpar <arkady.paronyan@gmail.com> * Style * some module docs * address some grumbles + docs * allow rebroadcast every few minutes * send_topic && generic context * some tests for view change * more grumbles & tests * use send_peer
This commit is contained in:
committed by
GitHub
parent
b7eeb28de9
commit
bb95e7d6a2
@@ -17,11 +17,12 @@
|
||||
//! Utility for gossip of network messages between authorities.
|
||||
//! Handles chain-specific and standard BFT messages.
|
||||
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::collections::{HashMap, HashSet, hash_map::Entry};
|
||||
use std::sync::Arc;
|
||||
use std::iter;
|
||||
use std::time;
|
||||
use log::{trace, debug};
|
||||
use futures::sync::mpsc;
|
||||
use rand::{self, seq::SliceRandom};
|
||||
use lru_cache::LruCache;
|
||||
use network_libp2p::{Severity, PeerId};
|
||||
use runtime_primitives::traits::{Block as BlockT, Hash, HashFor};
|
||||
@@ -33,57 +34,192 @@ use crate::config::Roles;
|
||||
// FIXME: Add additional spam/DoS attack protection: https://github.com/paritytech/substrate/issues/1115
|
||||
const KNOWN_MESSAGES_CACHE_SIZE: usize = 4096;
|
||||
|
||||
const REBROADCAST_INTERVAL: time::Duration = time::Duration::from_secs(30);
|
||||
|
||||
struct PeerConsensus<H> {
|
||||
known_messages: HashSet<H>,
|
||||
is_authority: bool,
|
||||
roles: Roles,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
enum Status {
|
||||
Live,
|
||||
Future,
|
||||
/// Topic stream message with sender.
|
||||
#[derive(Debug, Eq, PartialEq)]
|
||||
pub struct TopicNotification {
|
||||
/// Message data.
|
||||
pub message: Vec<u8>,
|
||||
/// Sender if available.
|
||||
pub sender: Option<PeerId>,
|
||||
}
|
||||
|
||||
struct MessageEntry<B: BlockT> {
|
||||
message_hash: B::Hash,
|
||||
topic: B::Hash,
|
||||
message: ConsensusMessage,
|
||||
status: Status,
|
||||
}
|
||||
|
||||
/// Consensus message destination.
|
||||
pub enum MessageRecipient {
|
||||
/// Send to all peers.
|
||||
BroadcastToAll,
|
||||
/// Send to peers that don't have that message already.
|
||||
BroadcastNew,
|
||||
/// Send to specific peer.
|
||||
Peer(PeerId),
|
||||
}
|
||||
|
||||
/// The reason for sending out the message.
|
||||
#[derive(Eq, PartialEq, Copy, Clone)]
|
||||
pub enum MessageIntent {
|
||||
/// Requested broadcast
|
||||
Broadcast,
|
||||
/// Requested broadcast to all peers.
|
||||
ForcedBroadcast,
|
||||
/// Periodic rebroadcast of all messages to all peers.
|
||||
PeriodicRebroadcast,
|
||||
}
|
||||
|
||||
/// Message validation result.
|
||||
pub enum ValidationResult<H> {
|
||||
/// Message is valid with this topic.
|
||||
Valid(H),
|
||||
/// Message is future with this topic.
|
||||
Future(H),
|
||||
/// Invalid message.
|
||||
Invalid,
|
||||
/// Obsolete message.
|
||||
Expired,
|
||||
/// Message should be stored and propagated under given topic.
|
||||
ProcessAndKeep(H),
|
||||
/// Message should be processed, but not propagated.
|
||||
ProcessAndDiscard(H),
|
||||
/// Message should be ignored.
|
||||
Discard,
|
||||
}
|
||||
|
||||
/// Validation context. Allows reacting to incoming messages by sending out further messages.
|
||||
pub trait ValidatorContext<B: BlockT> {
|
||||
/// Broadcast all messages with given topic to peers that do not have it yet.
|
||||
fn broadcast_topic(&mut self, topic: B::Hash, force: bool);
|
||||
/// Broadcast a message to all peers that have not received it previously.
|
||||
fn broadcast_message(&mut self, topic: B::Hash, message: Vec<u8>, force: bool);
|
||||
/// Send addressed message to a peer.
|
||||
fn send_message(&mut self, who: &PeerId, message: Vec<u8>);
|
||||
/// Send all messages with given topic to a peer.
|
||||
fn send_topic(&mut self, who: &PeerId, topic: B::Hash, force: bool);
|
||||
}
|
||||
|
||||
struct NetworkContext<'g, 'p, B: BlockT> {
|
||||
gossip: &'g mut ConsensusGossip<B>,
|
||||
protocol: &'p mut Context<B>,
|
||||
engine_id: ConsensusEngineId,
|
||||
}
|
||||
|
||||
impl<'g, 'p, B: BlockT> ValidatorContext<B> for NetworkContext<'g, 'p, B> {
|
||||
/// Broadcast all messages with given topic to peers that do not have it yet.
|
||||
fn broadcast_topic(&mut self, topic: B::Hash, force: bool) {
|
||||
self.gossip.broadcast_topic(self.protocol, topic, force);
|
||||
}
|
||||
|
||||
/// Broadcast a message to all peers that have not received it previously.
|
||||
fn broadcast_message(&mut self, topic: B::Hash, message: Vec<u8>, force: bool) {
|
||||
self.gossip.multicast(
|
||||
self.protocol,
|
||||
topic,
|
||||
ConsensusMessage{ data: message, engine_id: self.engine_id.clone() },
|
||||
force,
|
||||
);
|
||||
}
|
||||
|
||||
/// Send addressed message to a peer.
|
||||
fn send_message(&mut self, who: &PeerId, message: Vec<u8>) {
|
||||
self.protocol.send_message(who.clone(), Message::Consensus(ConsensusMessage {
|
||||
engine_id: self.engine_id,
|
||||
data: message,
|
||||
}));
|
||||
}
|
||||
|
||||
/// Send all messages with given topic to a peer.
|
||||
fn send_topic(&mut self, who: &PeerId, topic: B::Hash, force: bool) {
|
||||
self.gossip.send_topic(self.protocol, who, topic, self.engine_id, force);
|
||||
}
|
||||
}
|
||||
|
||||
fn propagate<'a, B: BlockT, I>(
|
||||
protocol: &mut Context<B>,
|
||||
messages: I,
|
||||
intent: MessageIntent,
|
||||
peers: &mut HashMap<PeerId, PeerConsensus<B::Hash>>,
|
||||
validators: &HashMap<ConsensusEngineId, Arc<Validator<B>>>,
|
||||
)
|
||||
where I: IntoIterator<Item=(&'a B::Hash, &'a B::Hash, &'a ConsensusMessage)>, // (msg_hash, topic, message)
|
||||
{
|
||||
let mut check_fns = HashMap::new();
|
||||
let mut message_allowed = move |who: &PeerId, intent: MessageIntent, topic: &B::Hash, message: &ConsensusMessage| {
|
||||
let engine_id = message.engine_id;
|
||||
let check_fn = match check_fns.entry(engine_id) {
|
||||
Entry::Occupied(entry) => entry.into_mut(),
|
||||
Entry::Vacant(vacant) => match validators.get(&engine_id) {
|
||||
None => return false, // treat all messages with no validator as not allowed
|
||||
Some(validator) => vacant.insert(validator.message_allowed()),
|
||||
}
|
||||
};
|
||||
|
||||
(check_fn)(who, intent, topic, &message.data)
|
||||
};
|
||||
|
||||
for (message_hash, topic, message) in messages {
|
||||
for (id, ref mut peer) in peers.iter_mut() {
|
||||
let intent = match intent {
|
||||
MessageIntent::Broadcast =>
|
||||
if peer.known_messages.contains(&message_hash) {
|
||||
continue
|
||||
} else {
|
||||
MessageIntent::Broadcast
|
||||
},
|
||||
MessageIntent::PeriodicRebroadcast =>
|
||||
if peer.known_messages.contains(&message_hash) {
|
||||
MessageIntent::PeriodicRebroadcast
|
||||
} else {
|
||||
// peer doesn't know message, so the logic should treat it as an
|
||||
// initial broadcast.
|
||||
MessageIntent::Broadcast
|
||||
},
|
||||
other => other,
|
||||
};
|
||||
|
||||
if !message_allowed(id, intent, &topic, &message) {
|
||||
continue
|
||||
}
|
||||
peer.known_messages.insert(message_hash.clone());
|
||||
trace!(target: "gossip", "Propagating to {}: {:?}", id, message);
|
||||
protocol.send_message(id.clone(), Message::Consensus(message.clone()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Validates consensus messages.
|
||||
pub trait Validator<H> {
|
||||
pub trait Validator<B: BlockT>: Send + Sync {
|
||||
/// New peer is connected.
|
||||
fn new_peer(&self, _context: &mut ValidatorContext<B>, _who: &PeerId, _roles: Roles) {
|
||||
}
|
||||
|
||||
/// New connection is dropped.
|
||||
fn peer_disconnected(&self, _context: &mut ValidatorContext<B>, _who: &PeerId) {
|
||||
}
|
||||
|
||||
/// Validate consensus message.
|
||||
fn validate(&self, data: &[u8]) -> ValidationResult<H>;
|
||||
fn validate(&self, context: &mut ValidatorContext<B>, sender: &PeerId, data: &[u8]) -> ValidationResult<B::Hash>;
|
||||
|
||||
/// Produce a closure for validating messages on a given topic.
|
||||
fn message_expired<'a>(&'a self) -> Box<FnMut(H, &[u8]) -> bool + 'a> {
|
||||
Box::new(move |_topic, data| match self.validate(data) {
|
||||
ValidationResult::Valid(_) | ValidationResult::Future(_) => false,
|
||||
ValidationResult::Invalid | ValidationResult::Expired => true,
|
||||
})
|
||||
fn message_expired<'a>(&'a self) -> Box<FnMut(B::Hash, &[u8]) -> bool + 'a> {
|
||||
Box::new(move |_topic, _data| false)
|
||||
}
|
||||
|
||||
/// Produce a closure for filtering egress messages.
|
||||
fn message_allowed<'a>(&'a self) -> Box<FnMut(&PeerId, MessageIntent, &B::Hash, &[u8]) -> bool + 'a> {
|
||||
Box::new(move |_who, _intent, _topic, _data| true)
|
||||
}
|
||||
}
|
||||
|
||||
/// Consensus network protocol handler. Manages statements and candidate requests.
|
||||
pub struct ConsensusGossip<B: BlockT> {
|
||||
peers: HashMap<PeerId, PeerConsensus<B::Hash>>,
|
||||
live_message_sinks: HashMap<(ConsensusEngineId, B::Hash), Vec<mpsc::UnboundedSender<Vec<u8>>>>,
|
||||
live_message_sinks: HashMap<(ConsensusEngineId, B::Hash), Vec<mpsc::UnboundedSender<TopicNotification>>>,
|
||||
messages: Vec<MessageEntry<B>>,
|
||||
known_messages: LruCache<B::Hash, ()>,
|
||||
validators: HashMap<ConsensusEngineId, Arc<Validator<B::Hash>>>,
|
||||
validators: HashMap<ConsensusEngineId, Arc<Validator<B>>>,
|
||||
next_broadcast: time::Instant,
|
||||
}
|
||||
|
||||
impl<B: BlockT> ConsensusGossip<B> {
|
||||
@@ -95,6 +231,7 @@ impl<B: BlockT> ConsensusGossip<B> {
|
||||
messages: Default::default(),
|
||||
known_messages: LruCache::new(KNOWN_MESSAGES_CACHE_SIZE),
|
||||
validators: Default::default(),
|
||||
next_broadcast: time::Instant::now() + REBROADCAST_INTERVAL,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -104,105 +241,84 @@ impl<B: BlockT> ConsensusGossip<B> {
|
||||
}
|
||||
|
||||
/// Register message validator for a message type.
|
||||
pub fn register_validator(&mut self, engine_id: ConsensusEngineId, validator: Arc<Validator<B::Hash>>) {
|
||||
self.validators.insert(engine_id, validator);
|
||||
pub fn register_validator(&mut self, protocol: &mut Context<B>, engine_id: ConsensusEngineId, validator: Arc<Validator<B>>) {
|
||||
self.register_validator_internal(engine_id, validator.clone());
|
||||
let peers: Vec<_> = self.peers.iter().map(|(id, peer)| (id.clone(), peer.roles)).collect();
|
||||
for (id, roles) in peers {
|
||||
let mut context = NetworkContext { gossip: self, protocol, engine_id: engine_id.clone() };
|
||||
validator.new_peer(&mut context, &id, roles);
|
||||
}
|
||||
}
|
||||
|
||||
fn register_validator_internal(&mut self, engine_id: ConsensusEngineId, validator: Arc<Validator<B>>) {
|
||||
self.validators.insert(engine_id, validator.clone());
|
||||
}
|
||||
|
||||
/// Handle new connected peer.
|
||||
pub fn new_peer(&mut self, protocol: &mut Context<B>, who: PeerId, roles: Roles) {
|
||||
if roles.intersects(Roles::AUTHORITY) {
|
||||
trace!(target:"gossip", "Registering {:?} {}", roles, who);
|
||||
// Send out all known messages to authorities.
|
||||
let mut known_messages = HashSet::new();
|
||||
for entry in self.messages.iter() {
|
||||
if let Status::Future = entry.status { continue }
|
||||
|
||||
known_messages.insert(entry.message_hash);
|
||||
protocol.send_message(who.clone(), Message::Consensus(entry.message.clone()));
|
||||
}
|
||||
self.peers.insert(who, PeerConsensus {
|
||||
known_messages,
|
||||
is_authority: true,
|
||||
});
|
||||
}
|
||||
else if roles.intersects(Roles::FULL) {
|
||||
self.peers.insert(who, PeerConsensus {
|
||||
known_messages: HashSet::new(),
|
||||
is_authority: false,
|
||||
});
|
||||
trace!(target:"gossip", "Registering {:?} {}", roles, who);
|
||||
self.peers.insert(who.clone(), PeerConsensus {
|
||||
known_messages: HashSet::new(),
|
||||
roles,
|
||||
});
|
||||
for (engine_id, v) in self.validators.clone() {
|
||||
let mut context = NetworkContext { gossip: self, protocol, engine_id: engine_id.clone() };
|
||||
v.new_peer(&mut context, &who, roles);
|
||||
}
|
||||
}
|
||||
|
||||
fn propagate<F>(
|
||||
&mut self,
|
||||
protocol: &mut Context<B>,
|
||||
message_hash: B::Hash,
|
||||
get_message: F,
|
||||
force: bool,
|
||||
)
|
||||
where F: Fn() -> ConsensusMessage,
|
||||
{
|
||||
let mut non_authorities: Vec<_> = self.peers.iter()
|
||||
.filter_map(|(id, ref peer)|
|
||||
if !peer.is_authority && (!peer.known_messages.contains(&message_hash) || force) {
|
||||
Some(id.clone())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
)
|
||||
.collect();
|
||||
|
||||
non_authorities.shuffle(&mut rand::thread_rng());
|
||||
let non_authorities: HashSet<_> = if non_authorities.is_empty() {
|
||||
HashSet::new()
|
||||
} else {
|
||||
non_authorities[0..non_authorities.len().min(((non_authorities.len() as f64).sqrt() as usize).max(3))].iter().collect()
|
||||
};
|
||||
|
||||
for (id, ref mut peer) in self.peers.iter_mut() {
|
||||
if peer.is_authority {
|
||||
if peer.known_messages.insert(message_hash.clone()) || force {
|
||||
let message = get_message();
|
||||
trace!(target:"gossip", "Propagating to authority {}: {:?}", id, message);
|
||||
protocol.send_message(id.clone(), Message::Consensus(message));
|
||||
}
|
||||
} else if non_authorities.contains(&id) {
|
||||
let message = get_message();
|
||||
trace!(target:"gossip", "Propagating to {}: {:?}", id, message);
|
||||
protocol.send_message(id.clone(), Message::Consensus(message));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn register_message<F>(
|
||||
fn register_message(
|
||||
&mut self,
|
||||
message_hash: B::Hash,
|
||||
topic: B::Hash,
|
||||
status: Status,
|
||||
get_message: F,
|
||||
)
|
||||
where F: Fn() -> ConsensusMessage
|
||||
{
|
||||
if self.known_messages.insert(message_hash, ()).is_none() {
|
||||
message: ConsensusMessage,
|
||||
) {
|
||||
if self.known_messages.insert(message_hash.clone(), ()).is_none() {
|
||||
self.messages.push(MessageEntry {
|
||||
topic,
|
||||
message_hash,
|
||||
message: get_message(),
|
||||
status,
|
||||
topic,
|
||||
message,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/// Call when a peer has been disconnected to stop tracking gossip status.
|
||||
pub fn peer_disconnected(&mut self, _protocol: &mut Context<B>, who: PeerId) {
|
||||
self.peers.remove(&who);
|
||||
pub fn peer_disconnected(&mut self, protocol: &mut Context<B>, who: PeerId) {
|
||||
for (engine_id, v) in self.validators.clone() {
|
||||
let mut context = NetworkContext { gossip: self, protocol, engine_id: engine_id.clone() };
|
||||
v.peer_disconnected(&mut context, &who);
|
||||
}
|
||||
}
|
||||
|
||||
/// Perform periodic maintenance
|
||||
pub fn tick(&mut self, protocol: &mut Context<B>) {
|
||||
self.collect_garbage();
|
||||
if time::Instant::now() >= self.next_broadcast {
|
||||
self.rebroadcast(protocol);
|
||||
self.next_broadcast = time::Instant::now() + REBROADCAST_INTERVAL;
|
||||
}
|
||||
}
|
||||
|
||||
/// Rebroadcast all messages to all peers.
|
||||
fn rebroadcast(&mut self, protocol: &mut Context<B>) {
|
||||
let messages = self.messages.iter()
|
||||
.map(|entry| (&entry.message_hash, &entry.topic, &entry.message));
|
||||
propagate(protocol, messages, MessageIntent::PeriodicRebroadcast, &mut self.peers, &self.validators);
|
||||
}
|
||||
|
||||
/// Broadcast all messages with given topic.
|
||||
pub fn broadcast_topic(&mut self, protocol: &mut Context<B>, topic: B::Hash, force: bool) {
|
||||
let messages = self.messages.iter()
|
||||
.filter_map(|entry|
|
||||
if entry.topic == topic { Some((&entry.message_hash, &entry.topic, &entry.message)) } else { None }
|
||||
);
|
||||
let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::Broadcast };
|
||||
propagate(protocol, messages, intent, &mut self.peers, &self.validators);
|
||||
}
|
||||
|
||||
/// Prune old or no longer relevant consensus messages. Provide a predicate
|
||||
/// for pruning, which returns `false` when the items with a given topic should be pruned.
|
||||
pub fn collect_garbage(&mut self) {
|
||||
use std::collections::hash_map::Entry;
|
||||
|
||||
self.live_message_sinks.retain(|_, sinks| {
|
||||
sinks.retain(|sink| !sink.is_closed());
|
||||
!sinks.is_empty()
|
||||
@@ -241,42 +357,17 @@ impl<B: BlockT> ConsensusGossip<B> {
|
||||
|
||||
/// Get data of valid, incoming messages for a topic (but might have expired meanwhile)
|
||||
pub fn messages_for(&mut self, engine_id: ConsensusEngineId, topic: B::Hash)
|
||||
-> mpsc::UnboundedReceiver<Vec<u8>>
|
||||
-> mpsc::UnboundedReceiver<TopicNotification>
|
||||
{
|
||||
let (tx, rx) = mpsc::unbounded();
|
||||
|
||||
let validator = match self.validators.get(&engine_id) {
|
||||
None => {
|
||||
self.live_message_sinks.entry((engine_id, topic)).or_default().push(tx);
|
||||
return rx;
|
||||
}
|
||||
Some(v) => v,
|
||||
};
|
||||
|
||||
for entry in self.messages.iter_mut()
|
||||
.filter(|e| e.topic == topic && e.message.engine_id == engine_id)
|
||||
{
|
||||
let live = match entry.status {
|
||||
Status::Live => true,
|
||||
Status::Future => match validator.validate(&entry.message.data) {
|
||||
ValidationResult::Valid(_) => {
|
||||
entry.status = Status::Live;
|
||||
true
|
||||
}
|
||||
_ => {
|
||||
// don't send messages considered to be future still.
|
||||
// if messages are considered expired they'll be cleaned up when we
|
||||
// collect garbage.
|
||||
false
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
if live {
|
||||
entry.status = Status::Live;
|
||||
tx.unbounded_send(entry.message.data.clone())
|
||||
.expect("receiver known to be live; qed");
|
||||
}
|
||||
tx.unbounded_send(TopicNotification {
|
||||
message: entry.message.data.clone(),
|
||||
sender: None,
|
||||
})
|
||||
.expect("receiver known to be live; qed");
|
||||
}
|
||||
|
||||
self.live_message_sinks.entry((engine_id, topic)).or_default().push(tx);
|
||||
@@ -293,65 +384,80 @@ impl<B: BlockT> ConsensusGossip<B> {
|
||||
protocol: &mut Context<B>,
|
||||
who: PeerId,
|
||||
message: ConsensusMessage,
|
||||
) -> Option<(B::Hash, ConsensusMessage)> {
|
||||
) {
|
||||
let message_hash = HashFor::<B>::hash(&message.data[..]);
|
||||
|
||||
if self.known_messages.contains_key(&message_hash) {
|
||||
trace!(target:"gossip", "Ignored already known message from {}", who);
|
||||
return None;
|
||||
return;
|
||||
}
|
||||
|
||||
if let Some(ref mut peer) = self.peers.get_mut(&who) {
|
||||
use std::collections::hash_map::Entry;
|
||||
|
||||
let engine_id = message.engine_id;
|
||||
// validate the message
|
||||
let (topic, status) = match self.validators.get(&engine_id)
|
||||
.map(|v| v.validate(&message.data))
|
||||
{
|
||||
Some(ValidationResult::Valid(topic)) => (topic, Status::Live),
|
||||
Some(ValidationResult::Future(topic)) => (topic, Status::Future),
|
||||
Some(ValidationResult::Invalid) => {
|
||||
trace!(target:"gossip", "Invalid message from {}", who);
|
||||
protocol.report_peer(
|
||||
who,
|
||||
Severity::Bad(format!("Sent invalid consensus message")),
|
||||
);
|
||||
return None;
|
||||
},
|
||||
Some(ValidationResult::Expired) => {
|
||||
trace!(target:"gossip", "Ignored expired message from {}", who);
|
||||
return None;
|
||||
},
|
||||
None => {
|
||||
protocol.report_peer(
|
||||
who.clone(),
|
||||
Severity::Useless(format!("Sent unknown consensus engine id")),
|
||||
);
|
||||
trace!(target:"gossip", "Unknown message engine id {:?} from {}",
|
||||
engine_id, who);
|
||||
return None;
|
||||
},
|
||||
};
|
||||
|
||||
peer.known_messages.insert(message_hash);
|
||||
if let Entry::Occupied(mut entry) = self.live_message_sinks.entry((engine_id, topic)) {
|
||||
debug!(target: "gossip", "Pushing consensus message to sinks for {}.", topic);
|
||||
entry.get_mut().retain(|sink| {
|
||||
if let Err(e) = sink.unbounded_send(message.data.clone()) {
|
||||
trace!(target:"gossip", "Error broadcasting message notification: {:?}", e);
|
||||
}
|
||||
!sink.is_closed()
|
||||
});
|
||||
if entry.get().is_empty() {
|
||||
entry.remove_entry();
|
||||
}
|
||||
let engine_id = message.engine_id;
|
||||
//validate the message
|
||||
let validation = self.validators.get(&engine_id)
|
||||
.cloned()
|
||||
.map(|v| {
|
||||
let mut context = NetworkContext { gossip: self, protocol, engine_id };
|
||||
v.validate(&mut context, &who, &message.data)
|
||||
});
|
||||
let validation_result = match validation {
|
||||
Some(ValidationResult::ProcessAndKeep(topic)) => Some((topic, true)),
|
||||
Some(ValidationResult::ProcessAndDiscard(topic)) => Some((topic, false)),
|
||||
Some(ValidationResult::Discard) => None,
|
||||
None => {
|
||||
trace!(target:"gossip", "Unknown message engine id {:?} from {}", engine_id, who);
|
||||
protocol.report_peer(
|
||||
who,
|
||||
Severity::Useless(format!("Sent unknown consensus engine id")),
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
if let Some((topic, keep)) = validation_result {
|
||||
if let Some(ref mut peer) = self.peers.get_mut(&who) {
|
||||
use std::collections::hash_map::Entry;
|
||||
peer.known_messages.insert(message_hash);
|
||||
if let Entry::Occupied(mut entry) = self.live_message_sinks.entry((engine_id, topic)) {
|
||||
debug!(target: "gossip", "Pushing consensus message to sinks for {}.", topic);
|
||||
entry.get_mut().retain(|sink| {
|
||||
if let Err(e) = sink.unbounded_send(TopicNotification {
|
||||
message: message.data.clone(),
|
||||
sender: Some(who.clone())
|
||||
}) {
|
||||
trace!(target: "gossip", "Error broadcasting message notification: {:?}", e);
|
||||
}
|
||||
!sink.is_closed()
|
||||
});
|
||||
if entry.get().is_empty() {
|
||||
entry.remove_entry();
|
||||
}
|
||||
}
|
||||
if keep {
|
||||
self.register_message(message_hash, topic, message);
|
||||
}
|
||||
} else {
|
||||
trace!(target:"gossip", "Ignored statement from unregistered peer {}", who);
|
||||
}
|
||||
self.multicast_inner(protocol, message_hash, topic, status, || message.clone(), false);
|
||||
Some((topic, message))
|
||||
} else {
|
||||
trace!(target:"gossip", "Ignored statement from unregistered peer {}", who);
|
||||
None
|
||||
trace!(target:"gossip", "Handled valid one hop message from peer {}", who);
|
||||
}
|
||||
}
|
||||
|
||||
/// Send all messages with given topic to a peer.
|
||||
pub fn send_topic(&mut self, protocol: &mut Context<B>, who: &PeerId, topic: B::Hash, engine_id: ConsensusEngineId, force: bool) {
|
||||
if let Some(ref mut peer) = self.peers.get_mut(who) {
|
||||
for entry in self.messages.iter().filter(|m| m.topic == topic && m.message.engine_id == engine_id) {
|
||||
if !force && peer.known_messages.contains(&entry.message_hash) {
|
||||
continue
|
||||
}
|
||||
peer.known_messages.insert(entry.message_hash.clone());
|
||||
trace!(target: "gossip", "Sending topic message to {}: {:?}", who, entry.message);
|
||||
protocol.send_message(who.clone(), Message::Consensus(ConsensusMessage {
|
||||
engine_id: engine_id.clone(),
|
||||
data: entry.message.data.clone(),
|
||||
}));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -364,24 +470,30 @@ impl<B: BlockT> ConsensusGossip<B> {
|
||||
force: bool,
|
||||
) {
|
||||
let message_hash = HashFor::<B>::hash(&message.data);
|
||||
self.multicast_inner(protocol, message_hash, topic, Status::Live, || message.clone(), force);
|
||||
self.register_message(message_hash, topic, message.clone());
|
||||
let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::Broadcast };
|
||||
propagate(protocol, iter::once((&message_hash, &topic, &message)), intent, &mut self.peers, &self.validators);
|
||||
}
|
||||
|
||||
fn multicast_inner<F>(
|
||||
/// Send addressed message to a peer. The message is not kept or multicast
|
||||
/// later on.
|
||||
pub fn send_message(
|
||||
&mut self,
|
||||
protocol: &mut Context<B>,
|
||||
message_hash: B::Hash,
|
||||
topic: B::Hash,
|
||||
status: Status,
|
||||
get_message: F,
|
||||
force: bool,
|
||||
)
|
||||
where F: Fn() -> ConsensusMessage
|
||||
{
|
||||
self.register_message(message_hash, topic, status, &get_message);
|
||||
if let Status::Live = status {
|
||||
self.propagate(protocol, message_hash, get_message, force);
|
||||
}
|
||||
who: &PeerId,
|
||||
message: ConsensusMessage,
|
||||
) {
|
||||
let peer = match self.peers.get_mut(who) {
|
||||
None => return,
|
||||
Some(peer) => peer,
|
||||
};
|
||||
|
||||
let message_hash = HashFor::<B>::hash(&message.data);
|
||||
|
||||
trace!(target: "gossip", "Sending direct to {}: {:?}", who, message);
|
||||
|
||||
peer.known_messages.insert(message_hash);
|
||||
protocol.send_message(who.clone(), Message::Consensus(message.clone()));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -398,33 +510,36 @@ mod tests {
|
||||
($consensus:expr, $topic:expr, $hash: expr, $m:expr) => {
|
||||
if $consensus.known_messages.insert($hash, ()).is_none() {
|
||||
$consensus.messages.push(MessageEntry {
|
||||
topic: $topic,
|
||||
message_hash: $hash,
|
||||
message: ConsensusMessage { data: $m, engine_id: [0, 0, 0, 0] },
|
||||
status: Status::Live,
|
||||
topic: $topic,
|
||||
message: ConsensusMessage { data: $m, engine_id: [0, 0, 0, 0]},
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct AllowAll;
|
||||
impl Validator<H256> for AllowAll {
|
||||
fn validate(&self, _data: &[u8]) -> ValidationResult<H256> {
|
||||
ValidationResult::Valid(H256::default())
|
||||
impl Validator<Block> for AllowAll {
|
||||
fn validate(&self, _context: &mut ValidatorContext<Block>, _sender: &PeerId, _data: &[u8]) -> ValidationResult<H256> {
|
||||
ValidationResult::ProcessAndKeep(H256::default())
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn collects_garbage() {
|
||||
struct AllowOne;
|
||||
impl Validator<H256> for AllowOne {
|
||||
fn validate(&self, data: &[u8]) -> ValidationResult<H256> {
|
||||
impl Validator<Block> for AllowOne {
|
||||
fn validate(&self, _context: &mut ValidatorContext<Block>, _sender: &PeerId, data: &[u8]) -> ValidationResult<H256> {
|
||||
if data[0] == 1 {
|
||||
ValidationResult::Valid(H256::default())
|
||||
ValidationResult::ProcessAndKeep(H256::default())
|
||||
} else {
|
||||
ValidationResult::Expired
|
||||
ValidationResult::Discard
|
||||
}
|
||||
}
|
||||
|
||||
fn message_expired<'a>(&'a self) -> Box<FnMut(H256, &[u8]) -> bool + 'a> {
|
||||
Box::new(move |_topic, data| data[0] != 1 )
|
||||
}
|
||||
}
|
||||
|
||||
let prev_hash = H256::random();
|
||||
@@ -441,12 +556,12 @@ mod tests {
|
||||
consensus.known_messages.insert(m2_hash, ());
|
||||
|
||||
let test_engine_id = Default::default();
|
||||
consensus.register_validator(test_engine_id, Arc::new(AllowAll));
|
||||
consensus.register_validator_internal(test_engine_id, Arc::new(AllowAll));
|
||||
consensus.collect_garbage();
|
||||
assert_eq!(consensus.messages.len(), 2);
|
||||
assert_eq!(consensus.known_messages.len(), 2);
|
||||
|
||||
consensus.register_validator(test_engine_id, Arc::new(AllowOne));
|
||||
consensus.register_validator_internal(test_engine_id, Arc::new(AllowOne));
|
||||
|
||||
// m2 is expired
|
||||
consensus.collect_garbage();
|
||||
@@ -461,17 +576,17 @@ mod tests {
|
||||
use futures::Stream;
|
||||
|
||||
let mut consensus = ConsensusGossip::<Block>::new();
|
||||
consensus.register_validator([0, 0, 0, 0], Arc::new(AllowAll));
|
||||
consensus.register_validator_internal([0, 0, 0, 0], Arc::new(AllowAll));
|
||||
|
||||
let message = ConsensusMessage { data: vec![4, 5, 6], engine_id: [0, 0, 0, 0] };
|
||||
|
||||
let message_hash = HashFor::<Block>::hash(&message.data);
|
||||
let topic = HashFor::<Block>::hash(&[1,2,3]);
|
||||
|
||||
consensus.register_message(message_hash, topic, Status::Live, || message.clone());
|
||||
consensus.register_message(message_hash, topic, message.clone());
|
||||
let stream = consensus.messages_for([0, 0, 0, 0], topic);
|
||||
|
||||
assert_eq!(stream.wait().next(), Some(Ok(message.data)));
|
||||
assert_eq!(stream.wait().next(), Some(Ok(TopicNotification { message: message.data, sender: None })));
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -482,8 +597,8 @@ mod tests {
|
||||
let msg_a = ConsensusMessage { data: vec![1, 2, 3], engine_id: [0, 0, 0, 0] };
|
||||
let msg_b = ConsensusMessage { data: vec![4, 5, 6], engine_id: [0, 0, 0, 0] };
|
||||
|
||||
consensus.register_message(HashFor::<Block>::hash(&msg_a.data), topic, Status::Live, || msg_a.clone());
|
||||
consensus.register_message(HashFor::<Block>::hash(&msg_b.data), topic, Status::Live, || msg_b.clone());
|
||||
consensus.register_message(HashFor::<Block>::hash(&msg_a.data), topic,msg_a);
|
||||
consensus.register_message(HashFor::<Block>::hash(&msg_b.data), topic,msg_b);
|
||||
|
||||
assert_eq!(consensus.messages.len(), 2);
|
||||
}
|
||||
@@ -491,37 +606,37 @@ mod tests {
|
||||
#[test]
|
||||
fn can_keep_multiple_subscribers_per_topic() {
|
||||
let mut consensus = ConsensusGossip::<Block>::new();
|
||||
consensus.register_validator([0, 0, 0, 0], Arc::new(AllowAll));
|
||||
consensus.register_validator_internal([0, 0, 0, 0], Arc::new(AllowAll));
|
||||
|
||||
let message = ConsensusMessage { data: vec![4, 5, 6], engine_id: [0, 0, 0, 0] };
|
||||
|
||||
let message_hash = HashFor::<Block>::hash(&message.data);
|
||||
let topic = HashFor::<Block>::hash(&[1,2,3]);
|
||||
|
||||
consensus.register_message(message_hash, topic, Status::Live, || message.clone());
|
||||
consensus.register_message(message_hash, topic, message.clone());
|
||||
|
||||
let stream1 = consensus.messages_for([0, 0, 0, 0], topic);
|
||||
let stream2 = consensus.messages_for([0, 0, 0, 0], topic);
|
||||
|
||||
assert_eq!(stream1.wait().next(), Some(Ok(message.data.clone())));
|
||||
assert_eq!(stream2.wait().next(), Some(Ok(message.data)));
|
||||
assert_eq!(stream1.wait().next(), Some(Ok(TopicNotification { message: message.data.clone(), sender: None })));
|
||||
assert_eq!(stream2.wait().next(), Some(Ok(TopicNotification { message: message.data, sender: None })));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn topics_are_localized_to_engine_id() {
|
||||
let mut consensus = ConsensusGossip::<Block>::new();
|
||||
consensus.register_validator([0, 0, 0, 0], Arc::new(AllowAll));
|
||||
consensus.register_validator_internal([0, 0, 0, 0], Arc::new(AllowAll));
|
||||
|
||||
let topic = [1; 32].into();
|
||||
let msg_a = ConsensusMessage { data: vec![1, 2, 3], engine_id: [0, 0, 0, 0] };
|
||||
let msg_b = ConsensusMessage { data: vec![4, 5, 6], engine_id: [0, 0, 0, 1] };
|
||||
|
||||
consensus.register_message(HashFor::<Block>::hash(&msg_a.data), topic, Status::Live, || msg_a.clone());
|
||||
consensus.register_message(HashFor::<Block>::hash(&msg_b.data), topic, Status::Live, || msg_b.clone());
|
||||
consensus.register_message(HashFor::<Block>::hash(&msg_a.data), topic, msg_a);
|
||||
consensus.register_message(HashFor::<Block>::hash(&msg_b.data), topic, msg_b);
|
||||
|
||||
let mut stream = consensus.messages_for([0, 0, 0, 0], topic).wait();
|
||||
|
||||
assert_eq!(stream.next(), Some(Ok(vec![1, 2, 3])));
|
||||
assert_eq!(stream.next(), Some(Ok(TopicNotification { message: vec![1, 2, 3], sender: None })));
|
||||
let _ = consensus.live_message_sinks.remove(&([0, 0, 0, 0], topic));
|
||||
assert_eq!(stream.next(), None);
|
||||
}
|
||||
|
||||
@@ -24,7 +24,7 @@ use runtime_primitives::traits::{As, Block as BlockT, Header as HeaderT, NumberF
|
||||
use consensus::import_queue::ImportQueue;
|
||||
use crate::message::{self, Message};
|
||||
use crate::message::generic::{Message as GenericMessage, ConsensusMessage};
|
||||
use crate::consensus_gossip::ConsensusGossip;
|
||||
use crate::consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient};
|
||||
use crate::on_demand::OnDemandService;
|
||||
use crate::specialization::NetworkSpecialization;
|
||||
use crate::sync::{ChainSync, Status as SyncStatus, SyncState};
|
||||
@@ -237,7 +237,7 @@ pub enum ProtocolMsg<B: BlockT, S: NetworkSpecialization<B>> {
|
||||
/// Execute a closure with the consensus gossip.
|
||||
ExecuteWithGossip(Box<GossipTask<B> + Send + 'static>),
|
||||
/// Incoming gossip consensus message.
|
||||
GossipConsensusMessage(B::Hash, ConsensusEngineId, Vec<u8>, bool),
|
||||
GossipConsensusMessage(B::Hash, ConsensusEngineId, Vec<u8>, GossipMessageRecipient),
|
||||
/// Tell protocol to abort sync (does not stop protocol).
|
||||
/// Only used in tests.
|
||||
#[cfg(any(test, feature = "test-helpers"))]
|
||||
@@ -377,8 +377,8 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
|
||||
ProtocolContext::new(&mut self.context_data, &self.network_chan);
|
||||
task.call_box(&mut self.consensus_gossip, &mut context);
|
||||
}
|
||||
ProtocolMsg::GossipConsensusMessage(topic, engine_id, message, force) => {
|
||||
self.gossip_consensus_message(topic, engine_id, message, force)
|
||||
ProtocolMsg::GossipConsensusMessage(topic, engine_id, message, recipient) => {
|
||||
self.gossip_consensus_message(topic, engine_id, message, recipient)
|
||||
}
|
||||
ProtocolMsg::BlocksProcessed(hashes, has_error) => {
|
||||
self.sync.blocks_processed(hashes, has_error);
|
||||
@@ -523,14 +523,18 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
|
||||
topic: B::Hash,
|
||||
engine_id: ConsensusEngineId,
|
||||
message: Vec<u8>,
|
||||
force: bool,
|
||||
recipient: GossipMessageRecipient,
|
||||
) {
|
||||
self.consensus_gossip.multicast(
|
||||
&mut ProtocolContext::new(&mut self.context_data, &self.network_chan),
|
||||
topic,
|
||||
ConsensusMessage{ data: message, engine_id },
|
||||
force,
|
||||
);
|
||||
let mut context = ProtocolContext::new(&mut self.context_data, &self.network_chan);
|
||||
let message = ConsensusMessage { data: message, engine_id };
|
||||
match recipient {
|
||||
GossipMessageRecipient::BroadcastToAll =>
|
||||
self.consensus_gossip.multicast(&mut context, topic, message, true),
|
||||
GossipMessageRecipient::BroadcastNew =>
|
||||
self.consensus_gossip.multicast(&mut context, topic, message, false),
|
||||
GossipMessageRecipient::Peer(who) =>
|
||||
self.send_message(who, GenericMessage::Consensus(message)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Called when a new peer is connected
|
||||
@@ -669,7 +673,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
|
||||
|
||||
/// Perform time based maintenance.
|
||||
fn tick(&mut self) {
|
||||
self.consensus_gossip.collect_garbage();
|
||||
self.consensus_gossip.tick(&mut ProtocolContext::new(&mut self.context_data, &self.network_chan));
|
||||
self.maintain_peers();
|
||||
self.sync.tick(&mut ProtocolContext::new(&mut self.context_data, &self.network_chan));
|
||||
self.on_demand
|
||||
|
||||
@@ -18,6 +18,7 @@ use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::{io, thread};
|
||||
|
||||
use log::{warn, debug, error, trace, info};
|
||||
use futures::{Async, Future, Stream, stream, sync::oneshot, sync::mpsc};
|
||||
use parking_lot::{Mutex, RwLock};
|
||||
@@ -26,15 +27,16 @@ use network_libp2p::{start_service, parse_str_addr, Service as NetworkService, S
|
||||
use network_libp2p::{multiaddr, RegisteredProtocol, NetworkState};
|
||||
use peerset::Peerset;
|
||||
use consensus::import_queue::{ImportQueue, Link};
|
||||
use crate::consensus_gossip::ConsensusGossip;
|
||||
use runtime_primitives::{traits::{Block as BlockT, NumberFor}, ConsensusEngineId};
|
||||
|
||||
use crate::consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient};
|
||||
use crate::message::Message;
|
||||
use crate::protocol::{self, Context, FromNetworkMsg, Protocol, ConnectedPeer, ProtocolMsg, ProtocolStatus, PeerInfo};
|
||||
use crate::config::Params;
|
||||
use crossbeam_channel::{self as channel, Receiver, Sender, TryRecvError};
|
||||
use crate::error::Error;
|
||||
use runtime_primitives::{traits::{Block as BlockT, NumberFor}, ConsensusEngineId};
|
||||
use crate::specialization::NetworkSpecialization;
|
||||
|
||||
use crossbeam_channel::{self as channel, Receiver, Sender, TryRecvError};
|
||||
use tokio::prelude::task::AtomicTask;
|
||||
use tokio::runtime::Builder as RuntimeBuilder;
|
||||
|
||||
@@ -257,12 +259,12 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> Service<B, S> {
|
||||
topic: B::Hash,
|
||||
engine_id: ConsensusEngineId,
|
||||
message: Vec<u8>,
|
||||
force: bool,
|
||||
recipient: GossipMessageRecipient,
|
||||
) {
|
||||
let _ = self
|
||||
.protocol_sender
|
||||
.send(ProtocolMsg::GossipConsensusMessage(
|
||||
topic, engine_id, message, force,
|
||||
topic, engine_id, message, recipient,
|
||||
));
|
||||
}
|
||||
|
||||
|
||||
@@ -35,7 +35,7 @@ use consensus::import_queue::{BasicQueue, ImportQueue, IncomingBlock};
|
||||
use consensus::import_queue::{Link, SharedBlockImport, SharedJustificationImport, Verifier};
|
||||
use consensus::{Error as ConsensusError, ErrorKind as ConsensusErrorKind};
|
||||
use consensus::{BlockOrigin, ForkChoiceStrategy, ImportBlock, JustificationImport};
|
||||
use crate::consensus_gossip::ConsensusGossip;
|
||||
use crate::consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient, TopicNotification};
|
||||
use crossbeam_channel::{self as channel, Sender, select};
|
||||
use futures::Future;
|
||||
use futures::sync::{mpsc, oneshot};
|
||||
@@ -260,14 +260,12 @@ impl<D, S: NetworkSpecialization<Block> + Clone> Peer<D, S> {
|
||||
}
|
||||
|
||||
// SyncOracle: are we connected to any peer?
|
||||
#[cfg(test)]
|
||||
fn is_offline(&self) -> bool {
|
||||
pub fn is_offline(&self) -> bool {
|
||||
self.is_offline.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
// SyncOracle: are we in the process of catching-up with the chain?
|
||||
#[cfg(test)]
|
||||
fn is_major_syncing(&self) -> bool {
|
||||
pub fn is_major_syncing(&self) -> bool {
|
||||
self.is_major_syncing.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
@@ -366,9 +364,10 @@ impl<D, S: NetworkSpecialization<Block> + Clone> Peer<D, S> {
|
||||
data: Vec<u8>,
|
||||
force: bool,
|
||||
) {
|
||||
let recipient = if force { GossipMessageRecipient::BroadcastToAll } else { GossipMessageRecipient::BroadcastNew };
|
||||
let _ = self
|
||||
.protocol_sender
|
||||
.send(ProtocolMsg::GossipConsensusMessage(topic, engine_id, data, force));
|
||||
.send(ProtocolMsg::GossipConsensusMessage(topic, engine_id, data, recipient));
|
||||
}
|
||||
|
||||
pub fn consensus_gossip_collect_garbage_for_topic(&self, _topic: <Block as BlockT>::Hash) {
|
||||
@@ -380,7 +379,7 @@ impl<D, S: NetworkSpecialization<Block> + Clone> Peer<D, S> {
|
||||
&self,
|
||||
engine_id: ConsensusEngineId,
|
||||
topic: <Block as BlockT>::Hash,
|
||||
) -> mpsc::UnboundedReceiver<Vec<u8>> {
|
||||
) -> mpsc::UnboundedReceiver<TopicNotification> {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
self.with_gossip(move |gossip, _| {
|
||||
let inner_rx = gossip.messages_for(engine_id, topic);
|
||||
|
||||
Reference in New Issue
Block a user