mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-11 21:11:07 +00:00
Gossip refactoring (#1811)
* First part of gossip protocol refactoring * Message validation in GRANDPA * Reverted to time-based expiration for future round messages * Removed collect_garbage_for_topic * Use consensus engine id instead of kind * Use try_init Co-Authored-By: arkpar <arkady.paronyan@gmail.com> * Comment Co-Authored-By: arkpar <arkady.paronyan@gmail.com> * Added expiration check on broadcast Co-Authored-By: arkpar <arkady.paronyan@gmail.com> * Apply suggestions from code review Co-Authored-By: arkpar <arkady.paronyan@gmail.com> * Style * Style
This commit is contained in:
committed by
Gav Wood
parent
21779b8cf2
commit
b3eae17f65
@@ -18,20 +18,22 @@
|
||||
//! Handles chain-specific and standard BFT messages.
|
||||
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::sync::Arc;
|
||||
use std::time::{Instant, Duration};
|
||||
use log::{trace, debug};
|
||||
use futures::sync::mpsc;
|
||||
use rand::{self, seq::SliceRandom};
|
||||
use lru_cache::LruCache;
|
||||
use network_libp2p::NodeIndex;
|
||||
use network_libp2p::{Severity, NodeIndex};
|
||||
use runtime_primitives::traits::{Block as BlockT, Hash, HashFor};
|
||||
pub use crate::message::generic::{Message, ConsensusMessage};
|
||||
use crate::protocol::Context;
|
||||
use crate::config::Roles;
|
||||
use crate::ConsensusEngineId;
|
||||
|
||||
// FIXME: Add additional spam/DoS attack protection: https://github.com/paritytech/substrate/issues/1115
|
||||
const MESSAGE_LIFETIME: Duration = Duration::from_secs(120);
|
||||
const DEAD_TOPICS_CACHE_SIZE: usize = 4096;
|
||||
const KNOWN_MESSAGES_CACHE_SIZE: usize = 4096;
|
||||
|
||||
struct PeerConsensus<H> {
|
||||
known_messages: HashSet<H>,
|
||||
@@ -39,21 +41,35 @@ struct PeerConsensus<H> {
|
||||
}
|
||||
|
||||
struct MessageEntry<B: BlockT> {
|
||||
topic: B::Hash,
|
||||
message_hash: B::Hash,
|
||||
topic: B::Hash,
|
||||
message: ConsensusMessage,
|
||||
broadcast: bool,
|
||||
timestamp: Instant,
|
||||
}
|
||||
|
||||
/// Message validation result.
|
||||
pub enum ValidationResult<H> {
|
||||
/// Message is valid with this topic.
|
||||
Valid(H),
|
||||
/// Invalid message.
|
||||
Invalid,
|
||||
/// Obsolete message.
|
||||
Expired,
|
||||
}
|
||||
|
||||
/// Validates consensus messages.
|
||||
pub trait Validator<H> {
|
||||
/// Validate consensus message.
|
||||
fn validate(&self, data: &[u8]) -> ValidationResult<H>;
|
||||
}
|
||||
|
||||
/// Consensus network protocol handler. Manages statements and candidate requests.
|
||||
pub struct ConsensusGossip<B: BlockT> {
|
||||
peers: HashMap<NodeIndex, PeerConsensus<(B::Hash, B::Hash)>>,
|
||||
live_message_sinks: HashMap<B::Hash, Vec<mpsc::UnboundedSender<ConsensusMessage>>>,
|
||||
peers: HashMap<NodeIndex, PeerConsensus<B::Hash>>,
|
||||
live_message_sinks: HashMap<B::Hash, Vec<mpsc::UnboundedSender<Vec<u8>>>>,
|
||||
messages: Vec<MessageEntry<B>>,
|
||||
known_messages: HashSet<(B::Hash, B::Hash)>,
|
||||
known_dead_topics: LruCache<B::Hash, ()>,
|
||||
message_times: HashMap<(B::Hash, B::Hash), Instant>,
|
||||
session_start: Option<B::Hash>,
|
||||
known_messages: LruCache<B::Hash, ()>,
|
||||
validators: HashMap<ConsensusEngineId, Arc<Validator<B::Hash>>>,
|
||||
}
|
||||
|
||||
impl<B: BlockT> ConsensusGossip<B> {
|
||||
@@ -63,10 +79,8 @@ impl<B: BlockT> ConsensusGossip<B> {
|
||||
peers: HashMap::new(),
|
||||
live_message_sinks: HashMap::new(),
|
||||
messages: Default::default(),
|
||||
known_messages: Default::default(),
|
||||
known_dead_topics: LruCache::new(DEAD_TOPICS_CACHE_SIZE),
|
||||
message_times: Default::default(),
|
||||
session_start: None
|
||||
known_messages: LruCache::new(KNOWN_MESSAGES_CACHE_SIZE),
|
||||
validators: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -75,15 +89,22 @@ impl<B: BlockT> ConsensusGossip<B> {
|
||||
self.live_message_sinks.clear();
|
||||
}
|
||||
|
||||
/// Register message validator for a message type.
|
||||
pub fn register_validator(&mut self, engine_id: ConsensusEngineId, validator: Arc<Validator<B::Hash>>) {
|
||||
self.validators.insert(engine_id, validator);
|
||||
}
|
||||
|
||||
/// Handle new connected peer.
|
||||
pub fn new_peer(&mut self, protocol: &mut Context<B>, who: NodeIndex, roles: Roles) {
|
||||
if roles.intersects(Roles::AUTHORITY) {
|
||||
trace!(target:"gossip", "Registering {:?} {}", roles, who);
|
||||
let now = Instant::now();
|
||||
// Send out all known messages to authorities.
|
||||
let mut known_messages = HashSet::new();
|
||||
for entry in self.messages.iter() {
|
||||
known_messages.insert((entry.topic, entry.message_hash));
|
||||
protocol.send_message(who, Message::Consensus(entry.topic.clone(), entry.message.clone(), entry.broadcast));
|
||||
if entry.timestamp + MESSAGE_LIFETIME < now { continue };
|
||||
known_messages.insert(entry.message_hash);
|
||||
protocol.send_message(who, Message::Consensus(entry.message.clone()));
|
||||
}
|
||||
self.peers.insert(who, PeerConsensus {
|
||||
known_messages,
|
||||
@@ -102,30 +123,18 @@ impl<B: BlockT> ConsensusGossip<B> {
|
||||
&mut self,
|
||||
protocol: &mut Context<B>,
|
||||
message_hash: B::Hash,
|
||||
topic: B::Hash,
|
||||
broadcast: bool,
|
||||
get_message: F,
|
||||
)
|
||||
where F: Fn() -> ConsensusMessage,
|
||||
{
|
||||
if broadcast {
|
||||
for (id, ref mut peer) in self.peers.iter_mut() {
|
||||
if peer.known_messages.insert((topic.clone(), message_hash.clone())) {
|
||||
let message = get_message();
|
||||
if peer.is_authority {
|
||||
trace!(target:"gossip", "Propagating to authority {}: {:?}", id, message);
|
||||
} else {
|
||||
trace!(target:"gossip", "Propagating to {}: {:?}", id, message);
|
||||
}
|
||||
protocol.send_message(*id, Message::Consensus(topic, message, broadcast));
|
||||
}
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
let mut non_authorities: Vec<_> = self.peers.iter()
|
||||
.filter_map(|(id, ref peer)| if !peer.is_authority && !peer.known_messages.contains(&(topic, message_hash)) { Some(*id) } else { None })
|
||||
.filter_map(|(id, ref peer)|
|
||||
if !peer.is_authority && !peer.known_messages.contains(&message_hash) {
|
||||
Some(*id)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
)
|
||||
.collect();
|
||||
|
||||
non_authorities.shuffle(&mut rand::thread_rng());
|
||||
@@ -137,34 +146,33 @@ impl<B: BlockT> ConsensusGossip<B> {
|
||||
|
||||
for (id, ref mut peer) in self.peers.iter_mut() {
|
||||
if peer.is_authority {
|
||||
if peer.known_messages.insert((topic.clone(), message_hash.clone())) {
|
||||
if peer.known_messages.insert(message_hash.clone()) {
|
||||
let message = get_message();
|
||||
trace!(target:"gossip", "Propagating to authority {}: {:?}", id, message);
|
||||
protocol.send_message(*id, Message::Consensus(topic, message, broadcast));
|
||||
protocol.send_message(*id, Message::Consensus(message));
|
||||
}
|
||||
} else if non_authorities.contains(&id) {
|
||||
let message = get_message();
|
||||
trace!(target:"gossip", "Propagating to {}: {:?}", id, message);
|
||||
peer.known_messages.insert((topic.clone(), message_hash.clone()));
|
||||
protocol.send_message(*id, Message::Consensus(topic, message, broadcast));
|
||||
if peer.known_messages.insert(message_hash.clone()) {
|
||||
let message = get_message();
|
||||
trace!(target:"gossip", "Propagating to {}: {:?}", id, message);
|
||||
protocol.send_message(*id, Message::Consensus(message));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn register_message<F>(&mut self, message_hash: B::Hash, topic: B::Hash, broadcast: bool, get_message: F)
|
||||
fn register_message<F>(&mut self, message_hash: B::Hash, topic: B::Hash, get_message: F)
|
||||
where F: Fn() -> ConsensusMessage
|
||||
{
|
||||
if !self.known_dead_topics.contains_key(&topic) &&
|
||||
self.known_messages.insert((topic, message_hash))
|
||||
if self.known_messages.insert(message_hash, ()).is_none()
|
||||
{
|
||||
self.messages.push(MessageEntry {
|
||||
topic,
|
||||
message_hash,
|
||||
broadcast,
|
||||
message: get_message(),
|
||||
timestamp: Instant::now(),
|
||||
});
|
||||
|
||||
self.message_times.insert((topic, message_hash), Instant::now());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -173,38 +181,27 @@ impl<B: BlockT> ConsensusGossip<B> {
|
||||
self.peers.remove(&who);
|
||||
}
|
||||
|
||||
/// Prune all existing messages for the given topic and mark it as dead, all
|
||||
/// new messages for the given topic are ignored.
|
||||
pub fn collect_garbage_for_topic(&mut self, topic: B::Hash) {
|
||||
self.known_dead_topics.insert(topic, ());
|
||||
self.collect_garbage(|_| true);
|
||||
}
|
||||
|
||||
/// Prune old or no longer relevant consensus messages. Provide a predicate
|
||||
/// for pruning, which returns `false` when the items with a given topic should be pruned.
|
||||
pub fn collect_garbage<P: Fn(&B::Hash) -> bool>(&mut self, predicate: P) {
|
||||
pub fn collect_garbage(&mut self) {
|
||||
self.live_message_sinks.retain(|_, sinks| {
|
||||
sinks.retain(|sink| !sink.is_closed());
|
||||
!sinks.is_empty()
|
||||
});
|
||||
|
||||
let message_times = &mut self.message_times;
|
||||
let known_messages = &mut self.known_messages;
|
||||
let known_dead_topics = &mut self.known_dead_topics;
|
||||
let before = self.messages.len();
|
||||
let validators = &self.validators;
|
||||
let now = Instant::now();
|
||||
|
||||
self.messages.retain(|entry| {
|
||||
!known_dead_topics.contains_key(&entry.topic) &&
|
||||
message_times.get(&(entry.topic, entry.message_hash))
|
||||
.map(|instant| *instant + MESSAGE_LIFETIME >= now && predicate(&entry.topic))
|
||||
.unwrap_or(false)
|
||||
});
|
||||
|
||||
known_messages.retain(|(topic, message_hash)| {
|
||||
message_times.get(&(*topic, *message_hash))
|
||||
.map(|instant| *instant + (5 * MESSAGE_LIFETIME) >= now)
|
||||
.unwrap_or(false)
|
||||
entry.timestamp + MESSAGE_LIFETIME >= now
|
||||
&& match validators.get(&entry.message.engine_id)
|
||||
.map(|v| v.validate(&entry.message.data))
|
||||
{
|
||||
Some(ValidationResult::Valid(_)) => true,
|
||||
_ => false,
|
||||
}
|
||||
});
|
||||
|
||||
trace!(target: "gossip", "Cleaned up {} stale messages, {} left ({} known)",
|
||||
@@ -213,18 +210,16 @@ impl<B: BlockT> ConsensusGossip<B> {
|
||||
known_messages.len(),
|
||||
);
|
||||
|
||||
message_times.retain(|h, _| known_messages.contains(h));
|
||||
|
||||
for (_, ref mut peer) in self.peers.iter_mut() {
|
||||
peer.known_messages.retain(|h| known_messages.contains(h));
|
||||
peer.known_messages.retain(|h| known_messages.contains_key(h));
|
||||
}
|
||||
}
|
||||
|
||||
/// Get all incoming messages for a topic.
|
||||
pub fn messages_for(&mut self, topic: B::Hash) -> mpsc::UnboundedReceiver<ConsensusMessage> {
|
||||
/// Get data of valid, incoming messages for a topic (but might have expired meanwhile)
|
||||
pub fn messages_for(&mut self, topic: B::Hash) -> mpsc::UnboundedReceiver<Vec<u8>> {
|
||||
let (tx, rx) = mpsc::unbounded();
|
||||
for entry in self.messages.iter().filter(|e| e.topic == topic) {
|
||||
tx.unbounded_send(entry.message.clone()).expect("receiver known to be live; qed");
|
||||
tx.unbounded_send(entry.message.data.clone()).expect("receiver known to be live; qed");
|
||||
}
|
||||
self.live_message_sinks.entry(topic).or_default().push(tx);
|
||||
|
||||
@@ -239,29 +234,57 @@ impl<B: BlockT> ConsensusGossip<B> {
|
||||
&mut self,
|
||||
protocol: &mut Context<B>,
|
||||
who: NodeIndex,
|
||||
topic: B::Hash,
|
||||
message: ConsensusMessage,
|
||||
broadcast: bool,
|
||||
is_syncing: bool,
|
||||
) -> Option<(B::Hash, ConsensusMessage)> {
|
||||
let message_hash = HashFor::<B>::hash(&message[..]);
|
||||
let message_hash = HashFor::<B>::hash(&message.data[..]);
|
||||
|
||||
if self.known_dead_topics.contains_key(&topic) {
|
||||
trace!(target:"gossip", "Ignored message from {} in dead topic {}", who, topic);
|
||||
return None;
|
||||
}
|
||||
|
||||
if self.known_messages.contains(&(topic, message_hash)) {
|
||||
trace!(target:"gossip", "Ignored already known message from {} in {}", who, topic);
|
||||
if self.known_messages.contains_key(&message_hash) {
|
||||
trace!(target:"gossip", "Ignored already known message from {}", who);
|
||||
return None;
|
||||
}
|
||||
|
||||
if let Some(ref mut peer) = self.peers.get_mut(&who) {
|
||||
use std::collections::hash_map::Entry;
|
||||
peer.known_messages.insert((topic, message_hash));
|
||||
|
||||
//validate the message
|
||||
let topic = match self.validators.get(&message.engine_id)
|
||||
.map(|v| v.validate(&message.data))
|
||||
{
|
||||
Some(ValidationResult::Valid(topic)) => topic,
|
||||
Some(ValidationResult::Invalid) => {
|
||||
trace!(target:"gossip", "Invalid message from {}", who);
|
||||
protocol.report_peer(
|
||||
who,
|
||||
Severity::Bad(format!("Sent invalid consensus message")),
|
||||
);
|
||||
return None;
|
||||
},
|
||||
Some(ValidationResult::Expired) => {
|
||||
trace!(target:"gossip", "Ignored expired message from {}", who);
|
||||
if !is_syncing {
|
||||
protocol.report_peer(
|
||||
who,
|
||||
Severity::Useless(format!("Sent expired consensus message")),
|
||||
);
|
||||
}
|
||||
return None;
|
||||
}
|
||||
None => {
|
||||
protocol.report_peer(
|
||||
who,
|
||||
Severity::Useless(format!("Sent unknown consensus engine id")),
|
||||
);
|
||||
trace!(target:"gossip", "Unknown message engine id {:?} from {}", message.engine_id, who);
|
||||
return None;
|
||||
}
|
||||
};
|
||||
|
||||
peer.known_messages.insert(message_hash);
|
||||
if let Entry::Occupied(mut entry) = self.live_message_sinks.entry(topic) {
|
||||
debug!(target: "gossip", "Pushing consensus message to sinks for {}.", topic);
|
||||
entry.get_mut().retain(|sink| {
|
||||
if let Err(e) = sink.unbounded_send(message.clone()) {
|
||||
if let Err(e) = sink.unbounded_send(message.data.clone()) {
|
||||
trace!(target:"gossip", "Error broadcasting message notification: {:?}", e);
|
||||
}
|
||||
!sink.is_closed()
|
||||
@@ -270,13 +293,12 @@ impl<B: BlockT> ConsensusGossip<B> {
|
||||
entry.remove_entry();
|
||||
}
|
||||
}
|
||||
self.multicast_inner(protocol, message_hash, topic, || message.clone());
|
||||
Some((topic, message))
|
||||
} else {
|
||||
trace!(target:"gossip", "Ignored statement from unregistered peer {}", who);
|
||||
return None;
|
||||
None
|
||||
}
|
||||
|
||||
self.multicast_inner(protocol, message_hash, topic, broadcast, || message.clone());
|
||||
Some((topic, message))
|
||||
}
|
||||
|
||||
/// Multicast a message to all peers.
|
||||
@@ -285,10 +307,9 @@ impl<B: BlockT> ConsensusGossip<B> {
|
||||
protocol: &mut Context<B>,
|
||||
topic: B::Hash,
|
||||
message: ConsensusMessage,
|
||||
broadcast: bool,
|
||||
) {
|
||||
let message_hash = HashFor::<B>::hash(&message);
|
||||
self.multicast_inner(protocol, message_hash, topic, broadcast, || message.clone());
|
||||
let message_hash = HashFor::<B>::hash(&message.data);
|
||||
self.multicast_inner(protocol, message_hash, topic, || message.clone());
|
||||
}
|
||||
|
||||
fn multicast_inner<F>(
|
||||
@@ -296,20 +317,17 @@ impl<B: BlockT> ConsensusGossip<B> {
|
||||
protocol: &mut Context<B>,
|
||||
message_hash: B::Hash,
|
||||
topic: B::Hash,
|
||||
broadcast: bool,
|
||||
get_message: F,
|
||||
)
|
||||
where F: Fn() -> ConsensusMessage
|
||||
{
|
||||
self.register_message(message_hash, topic, broadcast, &get_message);
|
||||
self.propagate(protocol, message_hash, topic, broadcast, get_message);
|
||||
self.register_message(message_hash, topic, &get_message);
|
||||
self.propagate(protocol, message_hash, get_message);
|
||||
}
|
||||
|
||||
/// Note new consensus session.
|
||||
pub fn new_session(&mut self, parent_hash: B::Hash) {
|
||||
let old_session = self.session_start.take();
|
||||
self.session_start = Some(parent_hash);
|
||||
self.collect_garbage(|topic| old_session.as_ref().map_or(true, |h| topic != h));
|
||||
pub fn new_session(&mut self, _parent_hash: B::Hash) {
|
||||
self.collect_garbage();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -323,91 +341,75 @@ mod tests {
|
||||
|
||||
macro_rules! push_msg {
|
||||
($consensus:expr, $topic:expr, $hash: expr, $now: expr, $m:expr) => {
|
||||
if $consensus.known_messages.insert(($topic, $hash)) {
|
||||
if $consensus.known_messages.insert($hash, ()).is_none() {
|
||||
$consensus.messages.push(MessageEntry {
|
||||
topic: $topic,
|
||||
message_hash: $hash,
|
||||
message: $m,
|
||||
broadcast: false,
|
||||
message: ConsensusMessage { data: $m, engine_id: [0, 0, 0, 0]},
|
||||
timestamp: $now,
|
||||
});
|
||||
$consensus.message_times.insert(($topic, $hash), $now);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn collects_garbage() {
|
||||
|
||||
struct AllowAll;
|
||||
impl Validator<H256> for AllowAll {
|
||||
fn validate(&self, _data: &[u8]) -> ValidationResult<H256> {
|
||||
ValidationResult::Valid(H256::default())
|
||||
}
|
||||
}
|
||||
|
||||
struct AllowOne;
|
||||
impl Validator<H256> for AllowOne {
|
||||
fn validate(&self, data: &[u8]) -> ValidationResult<H256> {
|
||||
if data[0] == 1 {
|
||||
ValidationResult::Valid(H256::default())
|
||||
} else {
|
||||
ValidationResult::Expired
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let prev_hash = H256::random();
|
||||
let best_hash = H256::random();
|
||||
let mut consensus = ConsensusGossip::<Block>::new();
|
||||
let now = Instant::now();
|
||||
let m1_hash = H256::random();
|
||||
let m2_hash = H256::random();
|
||||
let m1 = vec![1, 2, 3];
|
||||
let m2 = vec![4, 5, 6];
|
||||
|
||||
let now = Instant::now();
|
||||
push_msg!(consensus, prev_hash, m1_hash, now, m1);
|
||||
push_msg!(consensus, best_hash, m2_hash, now, m2.clone());
|
||||
consensus.known_messages.insert((prev_hash, m1_hash));
|
||||
consensus.known_messages.insert((best_hash, m2_hash));
|
||||
consensus.known_messages.insert(m1_hash, ());
|
||||
consensus.known_messages.insert(m2_hash, ());
|
||||
|
||||
// nothing to collect
|
||||
consensus.collect_garbage(|_t| true);
|
||||
let test_engine_id = Default::default();
|
||||
consensus.register_validator(test_engine_id, Arc::new(AllowAll));
|
||||
consensus.collect_garbage();
|
||||
assert_eq!(consensus.messages.len(), 2);
|
||||
assert_eq!(consensus.known_messages.len(), 2);
|
||||
|
||||
// nothing to collect with default.
|
||||
consensus.collect_garbage(|&topic| topic != Default::default());
|
||||
assert_eq!(consensus.messages.len(), 2);
|
||||
assert_eq!(consensus.known_messages.len(), 2);
|
||||
consensus.register_validator(test_engine_id, Arc::new(AllowOne));
|
||||
|
||||
// topic that was used in one message.
|
||||
consensus.collect_garbage(|topic| topic != &prev_hash);
|
||||
// m2 is expired
|
||||
consensus.collect_garbage();
|
||||
assert_eq!(consensus.messages.len(), 1);
|
||||
// known messages are only pruned based on expiration time
|
||||
// known messages are only pruned based on size.
|
||||
assert_eq!(consensus.known_messages.len(), 2);
|
||||
assert!(consensus.known_messages.contains(&(best_hash, m2_hash)));
|
||||
assert!(consensus.known_messages.contains_key(&m2_hash));
|
||||
|
||||
// make timestamp expired, but the message is still kept as known
|
||||
consensus.messages.clear();
|
||||
consensus.known_messages.clear();
|
||||
consensus.register_validator(test_engine_id, Arc::new(AllowAll));
|
||||
push_msg!(consensus, best_hash, m2_hash, now - MESSAGE_LIFETIME, m2.clone());
|
||||
consensus.collect_garbage(|_topic| true);
|
||||
consensus.collect_garbage();
|
||||
assert!(consensus.messages.is_empty());
|
||||
assert_eq!(consensus.known_messages.len(), 1);
|
||||
|
||||
// make timestamp expired past the known message lifetime
|
||||
consensus.known_messages.clear();
|
||||
push_msg!(consensus, best_hash, m2_hash, now - (5 * MESSAGE_LIFETIME), m2);
|
||||
consensus.collect_garbage(|_topic| true);
|
||||
assert!(consensus.messages.is_empty());
|
||||
assert!(consensus.known_messages.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn collects_garbage_for_topic() {
|
||||
let topic = H256::random();
|
||||
let dead_topic = H256::random();
|
||||
let message = Vec::new();
|
||||
let now = Instant::now();
|
||||
let mut consensus = ConsensusGossip::<Block>::new();
|
||||
|
||||
let message_hash = H256::random();
|
||||
push_msg!(consensus, topic, message_hash, now, message.clone());
|
||||
push_msg!(consensus, dead_topic, message_hash, now, message.clone());
|
||||
assert_eq!(consensus.messages.len(), 2);
|
||||
|
||||
consensus.collect_garbage_for_topic(topic);
|
||||
|
||||
// removes all messages for the topic and marks the topic as dead
|
||||
assert_eq!(consensus.messages.len(), 1);
|
||||
assert_eq!(consensus.known_messages.len(), 2);
|
||||
assert_eq!(consensus.known_dead_topics.len(), 1);
|
||||
|
||||
// new messages for dead topics are ignored
|
||||
consensus.register_message(HashFor::<Block>::hash(&message), topic, false, || message.clone());
|
||||
assert_eq!(consensus.messages.len(), 1);
|
||||
assert_eq!(consensus.known_messages.len(), 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -416,15 +418,15 @@ mod tests {
|
||||
|
||||
let mut consensus = ConsensusGossip::<Block>::new();
|
||||
|
||||
let message = vec![1, 2, 3];
|
||||
let message = ConsensusMessage { data: vec![4, 5, 6], engine_id: [0, 0, 0, 0] };
|
||||
|
||||
let message_hash = HashFor::<Block>::hash(&message);
|
||||
let message_hash = HashFor::<Block>::hash(&message.data);
|
||||
let topic = HashFor::<Block>::hash(&[1,2,3]);
|
||||
|
||||
consensus.register_message(message_hash, topic, false, || message.clone());
|
||||
consensus.register_message(message_hash, topic, || message.clone());
|
||||
let stream = consensus.messages_for(topic);
|
||||
|
||||
assert_eq!(stream.wait().next(), Some(Ok(message)));
|
||||
assert_eq!(stream.wait().next(), Some(Ok(message.data)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -432,11 +434,11 @@ mod tests {
|
||||
let mut consensus = ConsensusGossip::<Block>::new();
|
||||
|
||||
let topic = [1; 32].into();
|
||||
let msg_a = vec![1, 2, 3];
|
||||
let msg_b = vec![4, 5, 6];
|
||||
let msg_a = ConsensusMessage { data: vec![1, 2, 3], engine_id: [0, 0, 0, 0] };
|
||||
let msg_b = ConsensusMessage { data: vec![4, 5, 6], engine_id: [0, 0, 0, 0] };
|
||||
|
||||
consensus.register_message(HashFor::<Block>::hash(&msg_a), topic, false, || msg_a.clone());
|
||||
consensus.register_message(HashFor::<Block>::hash(&msg_b), topic, false, || msg_b.clone());
|
||||
consensus.register_message(HashFor::<Block>::hash(&msg_a.data), topic, || msg_a.clone());
|
||||
consensus.register_message(HashFor::<Block>::hash(&msg_b.data), topic, || msg_b.clone());
|
||||
|
||||
assert_eq!(consensus.messages.len(), 2);
|
||||
}
|
||||
@@ -447,17 +449,17 @@ mod tests {
|
||||
|
||||
let mut consensus = ConsensusGossip::<Block>::new();
|
||||
|
||||
let message = vec![1, 2, 3];
|
||||
let message = ConsensusMessage { data: vec![4, 5, 6], engine_id: [0, 0, 0, 0] };
|
||||
|
||||
let message_hash = HashFor::<Block>::hash(&message);
|
||||
let message_hash = HashFor::<Block>::hash(&message.data);
|
||||
let topic = HashFor::<Block>::hash(&[1,2,3]);
|
||||
|
||||
consensus.register_message(message_hash, topic, false, || message.clone());
|
||||
consensus.register_message(message_hash, topic, || message.clone());
|
||||
|
||||
let stream1 = consensus.messages_for(topic);
|
||||
let stream2 = consensus.messages_for(topic);
|
||||
|
||||
assert_eq!(stream1.wait().next(), Some(Ok(message.clone())));
|
||||
assert_eq!(stream2.wait().next(), Some(Ok(message)));
|
||||
assert_eq!(stream1.wait().next(), Some(Ok(message.data.clone())));
|
||||
assert_eq!(stream2.wait().next(), Some(Ok(message.data)));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -44,7 +44,7 @@ pub use network_libp2p::{
|
||||
NodeIndex, ProtocolId, Severity, Protocol, Multiaddr,
|
||||
obtain_private_key, multiaddr, PeerId, PublicKey
|
||||
};
|
||||
pub use message::{generic as generic_message, RequestId, Status as StatusMessage};
|
||||
pub use message::{generic as generic_message, RequestId, Status as StatusMessage, ConsensusEngineId};
|
||||
pub use error::Error;
|
||||
pub use on_demand::{OnDemand, OnDemandService, RemoteResponse};
|
||||
#[doc(hidden)]
|
||||
|
||||
@@ -30,6 +30,9 @@ pub use self::generic::{
|
||||
/// A unique ID of a request.
|
||||
pub type RequestId = u64;
|
||||
|
||||
/// Consensus engine unique ID.
|
||||
pub type ConsensusEngineId = [u8; 4];
|
||||
|
||||
/// Type alias for using the message type using block type parameters.
|
||||
pub type Message<B> = generic::Message<
|
||||
<B as BlockT>::Header,
|
||||
@@ -132,10 +135,16 @@ pub mod generic {
|
||||
use crate::config::Roles;
|
||||
use super::{
|
||||
BlockAttributes, RemoteCallResponse, RemoteReadResponse,
|
||||
RequestId, Transactions, Direction
|
||||
RequestId, Transactions, Direction, ConsensusEngineId,
|
||||
};
|
||||
/// Consensus is opaque to us
|
||||
pub type ConsensusMessage = Vec<u8>;
|
||||
/// Consensus is mostly opaque to us
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
|
||||
pub struct ConsensusMessage {
|
||||
/// Identifies consensus engine.
|
||||
pub engine_id: ConsensusEngineId,
|
||||
/// Message payload.
|
||||
pub data: Vec<u8>,
|
||||
}
|
||||
|
||||
/// Block data sent in the response.
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
|
||||
@@ -177,7 +186,7 @@ pub mod generic {
|
||||
/// Transactions.
|
||||
Transactions(Transactions<Extrinsic>),
|
||||
/// Consensus protocol message.
|
||||
Consensus(Hash, ConsensusMessage, bool), // topic, opaque Vec<u8>, broadcast
|
||||
Consensus(ConsensusMessage),
|
||||
/// Remote method call request.
|
||||
RemoteCallRequest(RemoteCallRequest<Hash>),
|
||||
/// Remote method call response.
|
||||
@@ -216,6 +225,8 @@ pub mod generic {
|
||||
pub struct Status<Hash, Number> {
|
||||
/// Protocol version.
|
||||
pub version: u32,
|
||||
/// Minimum supported version.
|
||||
pub min_supported_version: u32,
|
||||
/// Supported roles.
|
||||
pub roles: Roles,
|
||||
/// Best block number.
|
||||
|
||||
@@ -20,8 +20,8 @@ use primitives::storage::StorageKey;
|
||||
use runtime_primitives::generic::BlockId;
|
||||
use runtime_primitives::traits::{As, Block as BlockT, Header as HeaderT, NumberFor, Zero};
|
||||
use consensus::import_queue::ImportQueue;
|
||||
use crate::message::{self, Message};
|
||||
use crate::message::generic::Message as GenericMessage;
|
||||
use crate::message::{self, Message, ConsensusEngineId};
|
||||
use crate::message::generic::{Message as GenericMessage, ConsensusMessage};
|
||||
use crate::consensus_gossip::ConsensusGossip;
|
||||
use crate::on_demand::OnDemandService;
|
||||
use crate::specialization::NetworkSpecialization;
|
||||
@@ -44,7 +44,9 @@ const TICK_TIMEOUT: time::Duration = time::Duration::from_millis(1000);
|
||||
const PROPAGATE_TIMEOUT: time::Duration = time::Duration::from_millis(5000);
|
||||
|
||||
/// Current protocol version.
|
||||
pub(crate) const CURRENT_VERSION: u32 = 1;
|
||||
pub(crate) const CURRENT_VERSION: u32 = 2;
|
||||
/// Lowest version we support
|
||||
const MIN_VERSION: u32 = 2;
|
||||
|
||||
// Maximum allowed entries in `BlockResponse`
|
||||
const MAX_BLOCK_DATA_RESPONSE: u32 = 128;
|
||||
@@ -199,7 +201,7 @@ pub enum ProtocolMsg<B: BlockT, S: NetworkSpecialization<B>,> {
|
||||
/// Execute a closure with the consensus gossip.
|
||||
ExecuteWithGossip(Box<GossipTask<B> + Send + 'static>),
|
||||
/// Incoming gossip consensus message.
|
||||
GossipConsensusMessage(B::Hash, Vec<u8>, bool),
|
||||
GossipConsensusMessage(B::Hash, ConsensusEngineId, Vec<u8>),
|
||||
/// Is protocol currently major-syncing?
|
||||
IsMajorSyncing(Sender<bool>),
|
||||
/// Is protocol currently offline?
|
||||
@@ -327,8 +329,8 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
|
||||
ProtocolContext::new(&mut self.context_data, &self.network_chan);
|
||||
task.call_box(&mut self.consensus_gossip, &mut context);
|
||||
}
|
||||
ProtocolMsg::GossipConsensusMessage(topic, message, broadcast) => {
|
||||
self.gossip_consensus_message(topic, message, broadcast)
|
||||
ProtocolMsg::GossipConsensusMessage(topic, engine_id, message) => {
|
||||
self.gossip_consensus_message(topic, engine_id, message)
|
||||
}
|
||||
ProtocolMsg::IsMajorSyncing(sender) => {
|
||||
let is_syncing = self.sync.status().is_major_syncing();
|
||||
@@ -420,13 +422,12 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
|
||||
GenericMessage::RemoteHeaderResponse(response) => self.on_remote_header_response(who, response),
|
||||
GenericMessage::RemoteChangesRequest(request) => self.on_remote_changes_request(who, request),
|
||||
GenericMessage::RemoteChangesResponse(response) => self.on_remote_changes_response(who, response),
|
||||
GenericMessage::Consensus(topic, msg, broadcast) => {
|
||||
GenericMessage::Consensus(msg) => {
|
||||
self.consensus_gossip.on_incoming(
|
||||
&mut ProtocolContext::new(&mut self.context_data, &self.network_chan),
|
||||
who,
|
||||
topic,
|
||||
msg,
|
||||
broadcast,
|
||||
self.sync.status().is_major_syncing(),
|
||||
);
|
||||
}
|
||||
other => self.specialization.on_message(
|
||||
@@ -446,12 +447,11 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
|
||||
);
|
||||
}
|
||||
|
||||
fn gossip_consensus_message(&mut self, topic: B::Hash, message: Vec<u8>, broadcast: bool) {
|
||||
fn gossip_consensus_message(&mut self, topic: B::Hash, engine_id: ConsensusEngineId, message: Vec<u8>) {
|
||||
self.consensus_gossip.multicast(
|
||||
&mut ProtocolContext::new(&mut self.context_data, &self.network_chan),
|
||||
topic,
|
||||
message,
|
||||
broadcast,
|
||||
ConsensusMessage{ data: message, engine_id },
|
||||
);
|
||||
}
|
||||
|
||||
@@ -599,7 +599,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
|
||||
|
||||
/// Perform time based maintenance.
|
||||
fn tick(&mut self) {
|
||||
self.consensus_gossip.collect_garbage(|_| true);
|
||||
self.consensus_gossip.collect_garbage();
|
||||
self.maintain_peers();
|
||||
self.sync.tick(&mut ProtocolContext::new(&mut self.context_data, &self.network_chan));
|
||||
self.on_demand
|
||||
@@ -653,7 +653,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
|
||||
));
|
||||
return;
|
||||
}
|
||||
if status.version != CURRENT_VERSION {
|
||||
if status.version < MIN_VERSION && CURRENT_VERSION < status.min_supported_version {
|
||||
let reason = format!("Peer using unsupported protocol version {}", status.version);
|
||||
self.network_chan.send(NetworkMsg::ReportPeer(
|
||||
who,
|
||||
@@ -803,6 +803,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
|
||||
if let Ok(info) = self.context_data.chain.info() {
|
||||
let status = message::generic::Status {
|
||||
version: CURRENT_VERSION,
|
||||
min_supported_version: MIN_VERSION,
|
||||
genesis_hash: info.chain.genesis_hash,
|
||||
roles: self.config.roles.into(),
|
||||
best_number: info.chain.best_number,
|
||||
|
||||
@@ -25,7 +25,7 @@ use network_libp2p::{start_service, parse_str_addr, Service as NetworkService, S
|
||||
use network_libp2p::{Protocol as Libp2pProtocol, RegisteredProtocol};
|
||||
use consensus::import_queue::{ImportQueue, Link};
|
||||
use crate::consensus_gossip::ConsensusGossip;
|
||||
use crate::message::Message;
|
||||
use crate::message::{Message, ConsensusEngineId};
|
||||
use crate::protocol::{self, Context, Protocol, ProtocolMsg, ProtocolStatus, PeerInfo};
|
||||
use crate::config::Params;
|
||||
use crossbeam_channel::{self as channel, Receiver, Sender, TryRecvError};
|
||||
@@ -208,11 +208,11 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> Service<B, S> {
|
||||
}
|
||||
|
||||
/// Send a consensus message through the gossip
|
||||
pub fn gossip_consensus_message(&self, topic: B::Hash, message: Vec<u8>, broadcast: bool) {
|
||||
pub fn gossip_consensus_message(&self, topic: B::Hash, engine_id: ConsensusEngineId, message: Vec<u8>) {
|
||||
let _ = self
|
||||
.protocol_sender
|
||||
.send(ProtocolMsg::GossipConsensusMessage(
|
||||
topic, message, broadcast,
|
||||
topic, engine_id, message,
|
||||
));
|
||||
}
|
||||
|
||||
|
||||
@@ -34,12 +34,12 @@ use consensus::import_queue::{BasicQueue, ImportQueue, IncomingBlock};
|
||||
use consensus::import_queue::{Link, SharedBlockImport, SharedJustificationImport, Verifier};
|
||||
use consensus::{Error as ConsensusError, ErrorKind as ConsensusErrorKind};
|
||||
use consensus::{BlockOrigin, ForkChoiceStrategy, ImportBlock, JustificationImport};
|
||||
use crate::consensus_gossip::{ConsensusGossip, ConsensusMessage};
|
||||
use crate::consensus_gossip::ConsensusGossip;
|
||||
use crossbeam_channel::{self as channel, Sender, select};
|
||||
use futures::Future;
|
||||
use futures::sync::{mpsc, oneshot};
|
||||
use keyring::Keyring;
|
||||
use crate::message::Message;
|
||||
use crate::message::{Message, ConsensusEngineId};
|
||||
use network_libp2p::{NodeIndex, ProtocolId};
|
||||
use parity_codec::Encode;
|
||||
use parking_lot::Mutex;
|
||||
@@ -274,21 +274,21 @@ impl<D> Peer<D> {
|
||||
|
||||
/// Push a message into the gossip network and relay to peers.
|
||||
/// `TestNet::sync_step` needs to be called to ensure it's propagated.
|
||||
pub fn gossip_message(&self, topic: <Block as BlockT>::Hash, data: Vec<u8>, broadcast: bool) {
|
||||
pub fn gossip_message(&self, topic: <Block as BlockT>::Hash, engine_id: ConsensusEngineId, data: Vec<u8>) {
|
||||
let _ = self
|
||||
.protocol_sender
|
||||
.send(ProtocolMsg::GossipConsensusMessage(topic, data, broadcast));
|
||||
.send(ProtocolMsg::GossipConsensusMessage(topic, engine_id, data));
|
||||
}
|
||||
|
||||
pub fn consensus_gossip_collect_garbage_for_topic(&self, topic: <Block as BlockT>::Hash) {
|
||||
self.with_gossip(move |gossip, _| gossip.collect_garbage_for_topic(topic))
|
||||
pub fn consensus_gossip_collect_garbage_for_topic(&self, _topic: <Block as BlockT>::Hash) {
|
||||
self.with_gossip(move |gossip, _| gossip.collect_garbage())
|
||||
}
|
||||
|
||||
/// access the underlying consensus gossip handler
|
||||
pub fn consensus_gossip_messages_for(
|
||||
&self,
|
||||
topic: <Block as BlockT>::Hash,
|
||||
) -> mpsc::UnboundedReceiver<ConsensusMessage> {
|
||||
) -> mpsc::UnboundedReceiver<Vec<u8>> {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
self.with_gossip(move |gossip, _| {
|
||||
let inner_rx = gossip.messages_for(topic);
|
||||
|
||||
Reference in New Issue
Block a user