From e1e6c19f64a1c2ce8a9615dcad012178741eb3da Mon Sep 17 00:00:00 2001 From: Arkadiy Paronyan Date: Sat, 29 Sep 2018 14:08:22 +0200 Subject: [PATCH] Limit gossip for non-authorities (#838) * Limit gossip for non-authorities * Random shuffle --- substrate/Cargo.lock | 1 + substrate/core/network/Cargo.toml | 1 + .../core/network/src/consensus_gossip.rs | 33 +++++++++++++++++-- substrate/core/network/src/lib.rs | 1 + substrate/core/network/src/sync.rs | 7 ++-- 5 files changed, 37 insertions(+), 6 deletions(-) diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index bae34ae367..a11aa8d28b 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -2991,6 +2991,7 @@ dependencies = [ "parity-codec 2.0.3 (registry+https://github.com/rust-lang/crates.io-index)", "parity-codec-derive 2.0.2 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)", "rustc-hex 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "sr-primitives 0.1.0", "substrate-client 0.1.0", diff --git a/substrate/core/network/Cargo.toml b/substrate/core/network/Cargo.toml index 8d602184d9..ecb79fccc1 100644 --- a/substrate/core/network/Cargo.toml +++ b/substrate/core/network/Cargo.toml @@ -15,6 +15,7 @@ bitflags = "1.0" futures = "0.1.17" linked-hash-map = "0.5" rustc-hex = "1.0" +rand = "0.5" substrate-primitives = { path = "../../core/primitives" } substrate-client = { path = "../../core/client" } sr-primitives = { path = "../../core/sr-primitives" } diff --git a/substrate/core/network/src/consensus_gossip.rs b/substrate/core/network/src/consensus_gossip.rs index 7b2f72bd66..449aab26ee 100644 --- a/substrate/core/network/src/consensus_gossip.rs +++ b/substrate/core/network/src/consensus_gossip.rs @@ -20,6 +20,7 @@ use std::collections::{HashMap, HashSet}; use futures::sync::mpsc; use std::time::{Instant, Duration}; +use rand::{self, Rng}; use network_libp2p::NodeIndex; use runtime_primitives::traits::{Block as BlockT, Header as HeaderT}; use runtime_primitives::generic::BlockId; @@ -32,6 +33,7 @@ const MESSAGE_LIFETIME: Duration = Duration::from_secs(600); struct PeerConsensus { known_messages: HashSet, + is_authority: bool, } /// Consensus messages. @@ -75,9 +77,9 @@ impl ConsensusGossip where B::Header: HeaderT { /// Handle new connected peer. pub fn new_peer(&mut self, protocol: &mut Context, who: NodeIndex, roles: Roles) { - if roles.intersects(Roles::AUTHORITY | Roles::FULL) { + if roles.intersects(Roles::AUTHORITY) { trace!(target:"gossip", "Registering {:?} {}", roles, who); - // Send out all known messages. + // Send out all known messages to authorities. // TODO: limit by size let mut known_messages = HashSet::new(); for entry in self.messages.iter() { @@ -91,14 +93,39 @@ impl ConsensusGossip where B::Header: HeaderT { } self.peers.insert(who, PeerConsensus { known_messages, + is_authority: true, + }); + } + else if roles.intersects(Roles::FULL) { + self.peers.insert(who, PeerConsensus { + known_messages: HashSet::new(), + is_authority: false, }); } } fn propagate(&mut self, protocol: &mut Context, message: message::Message, hash: B::Hash) { + let mut non_authorities: Vec<_> = self.peers.iter() + .filter_map(|(id, ref peer)| if !peer.is_authority && !peer.known_messages.contains(&hash) { Some(*id) } else { None }) + .collect(); + + rand::thread_rng().shuffle(&mut non_authorities); + let non_authorities: HashSet<_> = if non_authorities.is_empty() { + HashSet::new() + } else { + non_authorities[0..non_authorities.len().min(((non_authorities.len() as f64).sqrt() as usize).max(3))].iter().collect() + }; + for (id, ref mut peer) in self.peers.iter_mut() { - if peer.known_messages.insert(hash.clone()) { + if peer.is_authority { + if peer.known_messages.insert(hash.clone()) { + trace!(target:"gossip", "Propagating to authority {}: {:?}", id, message); + protocol.send_message(*id, message.clone()); + } + } + else if non_authorities.contains(&id) { trace!(target:"gossip", "Propagating to {}: {:?}", id, message); + peer.known_messages.insert(hash.clone()); protocol.send_message(*id, message.clone()); } } diff --git a/substrate/core/network/src/lib.rs b/substrate/core/network/src/lib.rs index 149ad868c6..dfaf101482 100644 --- a/substrate/core/network/src/lib.rs +++ b/substrate/core/network/src/lib.rs @@ -31,6 +31,7 @@ extern crate substrate_network_libp2p as network_libp2p; extern crate parity_codec as codec; extern crate futures; extern crate rustc_hex; +extern crate rand; #[macro_use] extern crate log; #[macro_use] extern crate bitflags; #[macro_use] extern crate error_chain; diff --git a/substrate/core/network/src/sync.rs b/substrate/core/network/src/sync.rs index 33c3e57d78..25da0dea88 100644 --- a/substrate/core/network/src/sync.rs +++ b/substrate/core/network/src/sync.rs @@ -296,11 +296,11 @@ impl ChainSync { if !self.is_known_or_already_downloading(protocol, header.parent_hash()) { trace!(target: "sync", "Ignoring unknown stale block announce from {}: {} {:?}", who, hash, header); } else { - trace!(target: "sync", "Downloading new stale block announced from {}: {} {:?}", who, hash, header); + trace!(target: "sync", "Considering new stale block announced from {}: {} {:?}", who, hash, header); self.download_stale(protocol, who, &hash); } } else { - trace!(target: "sync", "Downloading new block announced from {}: {} {:?}", who, hash, header); + trace!(target: "sync", "Considering new block announced from {}: {} {:?}", who, hash, header); self.download_new(protocol, who); } } else { @@ -371,6 +371,7 @@ impl ChainSync { let import_status = self.import_queue.status(); // when there are too many blocks in the queue => do not try to download new blocks if import_status.importing_count > MAX_IMPORTING_BLOCKS { + trace!(target: "sync", "Too many blocks in the queue."); return; } // we should not download already queued blocks @@ -395,7 +396,7 @@ impl ChainSync { trace!(target: "sync", "Nothing to request"); } }, - _ => (), + _ => trace!(target: "sync", "Peer {} is busy", who), } } }