Time-based gradual gossip (#4176)

This commit is contained in:
Arkadiy Paronyan
2019-11-26 18:17:14 +01:00
committed by Gavin Wood
parent afaf66a853
commit 53a482146b
2 changed files with 152 additions and 223 deletions
@@ -92,12 +92,12 @@ use substrate_telemetry::{telemetry, CONSENSUS_DEBUG};
use log::{trace, debug, warn};
use futures::prelude::*;
use futures::sync::mpsc;
use rand::Rng;
use rand::seq::SliceRandom;
use crate::{environment, CatchUp, CompactCommit, SignedMessage};
use super::{cost, benefit, Round, SetId};
use std::collections::{HashMap, VecDeque};
use std::collections::{HashMap, VecDeque, HashSet};
use std::time::{Duration, Instant};
const REBROADCAST_AFTER: Duration = Duration::from_secs(60 * 5);
@@ -107,6 +107,13 @@ const CATCH_UP_PROCESS_TIMEOUT: Duration = Duration::from_secs(30);
/// catch up request.
const CATCH_UP_THRESHOLD: u64 = 2;
const PROPAGATION_ALL: u32 = 4; //in rounds;
const PROPAGATION_ALL_AUTHORITIES: u32 = 2; //in rounds;
const PROPAGATION_SOME_NON_AUTHORITIES: u32 = 3; //in rounds;
const ROUND_DURATION: u32 = 4; // measured in gossip durations
const MIN_LUCKY: usize = 5;
type Report = (PeerId, i32);
/// An outcome of examining a message.
@@ -417,21 +424,37 @@ impl<N> PeerInfo<N> {
/// The peers we're connected do in gossip.
struct Peers<N> {
inner: HashMap<PeerId, PeerInfo<N>>,
lucky_peers: HashSet<PeerId>,
lucky_authorities: HashSet<PeerId>,
}
impl<N> Default for Peers<N> {
fn default() -> Self {
Peers { inner: HashMap::new() }
Peers {
inner: HashMap::new(),
lucky_peers: HashSet::new(),
lucky_authorities: HashSet::new(),
}
}
}
impl<N: Ord> Peers<N> {
fn new_peer(&mut self, who: PeerId, roles: Roles) {
if roles.is_authority() && self.lucky_authorities.len() < MIN_LUCKY {
self.lucky_authorities.insert(who.clone());
}
if !roles.is_authority() && self.lucky_peers.len() < MIN_LUCKY {
self.lucky_peers.insert(who.clone());
}
self.inner.insert(who, PeerInfo::new(roles));
}
fn peer_disconnected(&mut self, who: &PeerId) {
self.inner.remove(who);
// This does not happen often enough compared to round duration,
// so we don't reshuffle.
self.lucky_peers.remove(who);
self.lucky_authorities.remove(who);
}
// returns a reference to the new view, if the peer is known.
@@ -492,6 +515,37 @@ impl<N: Ord> Peers<N> {
fn non_authorities(&self) -> usize {
self.inner.iter().filter(|(_, info)| !info.roles.is_authority()).count()
}
fn reshuffle(&mut self) {
let mut lucky_peers : Vec<_> = self.inner
.iter()
.filter_map(|(id, info)| if !info.roles.is_authority() { Some(id.clone()) } else { None })
.collect();
let mut lucky_authorities: Vec<_> = self.inner
.iter()
.filter_map(|(id, info)| if info.roles.is_authority() { Some(id.clone()) } else { None })
.collect();
let num_non_authorities = ((lucky_peers.len() as f32).sqrt() as usize)
.max(MIN_LUCKY)
.min(lucky_peers.len());
let num_authorities = ((lucky_authorities.len() as f32).sqrt() as usize)
.max(MIN_LUCKY)
.min(lucky_authorities.len());
lucky_peers.partial_shuffle(&mut rand::thread_rng(), num_non_authorities);
lucky_peers.truncate(num_non_authorities);
lucky_authorities.partial_shuffle(&mut rand::thread_rng(), num_authorities);
lucky_authorities.truncate(num_authorities);
self.lucky_peers.clear();
self.lucky_peers.extend(lucky_peers.into_iter());
self.lucky_authorities.clear();
self.lucky_authorities.extend(lucky_authorities.into_iter());
}
}
#[derive(Debug, PartialEq)]
@@ -559,6 +613,7 @@ struct Inner<Block: BlockT> {
local_view: Option<View<NumberFor<Block>>>,
peers: Peers<NumberFor<Block>>,
live_topics: KeepTopics<Block>,
round_start: Instant,
authorities: Vec<AuthorityId>,
config: crate::Config,
next_rebroadcast: Instant,
@@ -591,6 +646,7 @@ impl<Block: BlockT> Inner<Block> {
local_view: None,
peers: Peers::default(),
live_topics: KeepTopics::new(),
round_start: Instant::now(),
next_rebroadcast: Instant::now() + REBROADCAST_AFTER,
authorities: Vec::new(),
pending_catch_up: PendingCatchUp::None,
@@ -619,6 +675,8 @@ impl<Block: BlockT> Inner<Block> {
local_view.round = round;
self.live_topics.push(round, set_id);
self.round_start = Instant::now();
self.peers.reshuffle();
}
self.multicast_neighbor_packet()
}
@@ -1001,10 +1059,14 @@ impl<Block: BlockT> Inner<Block> {
///
/// Transitions will be triggered on repropagation attempts by the
/// underlying gossip layer, which should happen every 30 seconds.
fn round_message_allowed<N>(&self, peer: &PeerInfo<N>, mut previous_attempts: usize) -> bool {
const MIN_AUTHORITIES: usize = 5;
fn round_message_allowed<N>(&self, who: &PeerId, peer: &PeerInfo<N>) -> bool {
let round_duration = self.config.gossip_duration * ROUND_DURATION;
let round_elapsed = self.round_start.elapsed();
if !self.config.is_authority && previous_attempts == 0 {
if !self.config.is_authority
&& round_elapsed < round_duration * PROPAGATION_ALL
{
// non-authority nodes don't gossip any messages right away. we
// assume that authorities (and sentries) are strongly connected, so
// it should be unnecessary for non-authorities to gossip all
@@ -1012,24 +1074,16 @@ impl<Block: BlockT> Inner<Block> {
return false;
}
if !self.config.is_authority {
// since the node is not an authority we skipped the initial attempt
// to gossip the message, therefore we decrement `previous_attempts`
// so that the state machine below works the same way it does for
// authority nodes.
previous_attempts -= 1;
}
if peer.roles.is_authority() {
let authorities = self.peers.authorities();
// the target node is an authority, on the first attempt we start by
// the target node is an authority, on the first round duration we start by
// sending the message to only `sqrt(authorities)` (if we're
// connected to at least `MIN_AUTHORITIES`).
if previous_attempts == 0 && authorities > MIN_AUTHORITIES {
let authorities = authorities as f64;
let p = (authorities.sqrt()).max(MIN_AUTHORITIES as f64) / authorities;
rand::thread_rng().gen_bool(p)
// connected to at least `MIN_LUCKY`).
if round_elapsed < round_duration * PROPAGATION_ALL_AUTHORITIES
&& authorities > MIN_LUCKY
{
self.peers.lucky_authorities.contains(who)
} else {
// otherwise we already went through the step above, so
// we won't filter the message and send it to all
@@ -1038,15 +1092,13 @@ impl<Block: BlockT> Inner<Block> {
}
} else {
// the node is not an authority so we apply stricter filters
if previous_attempts >= 3 {
// if we previously tried to send this message 3 (or more)
// times, then it is allowed to be sent to all peers.
if round_elapsed >= round_duration * PROPAGATION_ALL {
// if we waited for 3 (or more) rounds
// then it is allowed to be sent to all peers.
true
} else if previous_attempts == 2 {
} else if round_elapsed >= round_duration * PROPAGATION_SOME_NON_AUTHORITIES {
// otherwise we only send it to `sqrt(non-authorities)`.
let non_authorities = self.peers.non_authorities() as f64;
let p = non_authorities.sqrt() / non_authorities;
rand::thread_rng().gen_bool(p)
self.peers.lucky_peers.contains(who)
} else {
false
}
@@ -1067,19 +1119,20 @@ impl<Block: BlockT> Inner<Block> {
///
/// Transitions will be triggered on repropagation attempts by the
/// underlying gossip layer, which should happen every 30 seconds.
fn global_message_allowed<N>(&self, peer: &PeerInfo<N>, previous_attempts: usize) -> bool {
const MIN_PEERS: usize = 5;
fn global_message_allowed<N>(&self, who: &PeerId, peer: &PeerInfo<N>) -> bool {
let round_duration = self.config.gossip_duration * ROUND_DURATION;
let round_elapsed = self.round_start.elapsed();
if peer.roles.is_authority() {
let authorities = self.peers.authorities();
// the target node is an authority, on the first attempt we start by
// the target node is an authority, on the first round duration we start by
// sending the message to only `sqrt(authorities)` (if we're
// connected to at least `MIN_PEERS`).
if previous_attempts == 0 && authorities > MIN_PEERS {
let authorities = authorities as f64;
let p = (authorities.sqrt()).max(MIN_PEERS as f64) / authorities;
rand::thread_rng().gen_bool(p)
// connected to at least `MIN_LUCKY`).
if round_elapsed < round_duration * PROPAGATION_ALL_AUTHORITIES
&& authorities > MIN_LUCKY
{
self.peers.lucky_authorities.contains(who)
} else {
// otherwise we already went through the step above, so
// we won't filter the message and send it to all
@@ -1090,13 +1143,13 @@ impl<Block: BlockT> Inner<Block> {
let non_authorities = self.peers.non_authorities();
// the target node is not an authority, on the first and second
// attempt we start by sending the message to only
// round duration we start by sending the message to only
// `sqrt(non_authorities)` (if we're connected to at least
// `MIN_PEERS`).
if previous_attempts <= 1 && non_authorities > MIN_PEERS {
let non_authorities = non_authorities as f64;
let p = (non_authorities.sqrt()).max(MIN_PEERS as f64) / non_authorities ;
rand::thread_rng().gen_bool(p)
// `MIN_LUCKY`).
if round_elapsed < round_duration * PROPAGATION_SOME_NON_AUTHORITIES
&& non_authorities > MIN_LUCKY
{
self.peers.lucky_peers.contains(who)
} else {
// otherwise we already went through the step above, so
// we won't filter the message and send it to all
@@ -1315,14 +1368,14 @@ impl<Block: BlockT> network_gossip::Validator<Block> for GossipValidator<Block>
Some(x) => x,
};
if let MessageIntent::Broadcast { previous_attempts } = intent {
if let MessageIntent::Broadcast = intent {
if maybe_round.is_some() {
if !inner.round_message_allowed(peer, previous_attempts) {
if !inner.round_message_allowed(who, peer) {
// early return if the vote message isn't allowed at this stage.
return false;
}
} else {
if !inner.global_message_allowed(peer, previous_attempts) {
if !inner.global_message_allowed(who, peer) {
// early return if the global message isn't allowed at this stage.
return false;
}
@@ -2132,8 +2185,12 @@ mod tests {
#[test]
fn progressively_gossips_to_more_peers() {
let mut config = config();
config.gossip_duration = Duration::from_secs(300); // Set to high value to prevent test race
let round_duration = config.gossip_duration * ROUND_DURATION;
let (val, _) = GossipValidator::<Block>::new(
config(),
config,
voter_set_state(),
);
@@ -2152,7 +2209,9 @@ mod tests {
val.inner.write().peers.new_peer(full_nodes[i].clone(), Roles::FULL);
}
let test = |previous_attempts, peers| {
let test = |num_round, peers| {
// rewind n round durations
val.inner.write().round_start = Instant::now() - round_duration * num_round;
let mut message_allowed = val.message_allowed();
move || {
@@ -2160,7 +2219,7 @@ mod tests {
for peer in peers {
if message_allowed(
peer,
MessageIntent::Broadcast { previous_attempts },
MessageIntent::Broadcast,
&crate::communication::round_topic::<Block>(1, 0),
&[],
) {
@@ -2187,22 +2246,22 @@ mod tests {
// on the first attempt we will only gossip to `sqrt(authorities)`,
// which should average out to 5 peers after a couple of trials
assert_eq!(trial(test(0, &authorities)), 5);
assert_eq!(trial(test(1, &authorities)), 5);
// on the second (and subsequent attempts) we should gossip to all
// authorities we're connected to.
assert_eq!(trial(test(1, &authorities)), 30);
assert_eq!(trial(test(2, &authorities)), 30);
assert_eq!(trial(test(3, &authorities)), 30);
// we should only gossip to non-authorities after the third attempt
assert_eq!(trial(test(0, &full_nodes)), 0);
assert_eq!(trial(test(1, &full_nodes)), 0);
assert_eq!(trial(test(2, &full_nodes)), 0);
// and only to `sqrt(non-authorities)`
assert_eq!(trial(test(2, &full_nodes)), 5);
assert_eq!(trial(test(3, &full_nodes)), 5);
// only on the fourth attempt should we gossip to all non-authorities
assert_eq!(trial(test(3, &full_nodes)), 30);
assert_eq!(trial(test(4, &full_nodes)), 30);
}
#[test]
@@ -2231,7 +2290,7 @@ mod tests {
assert!(
message_allowed(
authority,
MessageIntent::Broadcast { previous_attempts: 0 },
MessageIntent::Broadcast,
&crate::communication::round_topic::<Block>(1, 0),
&[],
)
@@ -2240,9 +2299,11 @@ mod tests {
}
#[test]
fn non_authorities_never_gossip_messages_on_first_attempt() {
fn non_authorities_never_gossip_messages_on_first_round_duration() {
let mut config = config();
config.gossip_duration = Duration::from_secs(300); // Set to high value to prevent test race
config.is_authority = false;
let round_duration = config.gossip_duration * ROUND_DURATION;
let (val, _) = GossipValidator::<Block>::new(
config,
@@ -2259,32 +2320,37 @@ mod tests {
authorities.push(peer_id);
}
let mut message_allowed = val.message_allowed();
// since our node is not an authority we should **never** gossip any
// messages on the first attempt.
for authority in &authorities {
assert!(
!message_allowed(
authority,
MessageIntent::Broadcast { previous_attempts: 0 },
&crate::communication::round_topic::<Block>(1, 0),
&[],
)
);
{
let mut message_allowed = val.message_allowed();
// since our node is not an authority we should **never** gossip any
// messages on the first attempt.
for authority in &authorities {
assert!(
!message_allowed(
authority,
MessageIntent::Broadcast,
&crate::communication::round_topic::<Block>(1, 0),
&[],
)
);
}
}
// on the third attempt we should allow messages to authorities
// (on the second attempt we would do `sqrt(authorities)`)
for authority in &authorities {
assert!(
message_allowed(
authority,
MessageIntent::Broadcast { previous_attempts: 2 },
&crate::communication::round_topic::<Block>(1, 0),
&[],
)
);
{
val.inner.write().round_start = Instant::now() - round_duration * 4;
let mut message_allowed = val.message_allowed();
// on the fourth round duration we should allow messages to authorities
// (on the second we would do `sqrt(authorities)`)
for authority in &authorities {
assert!(
message_allowed(
authority,
MessageIntent::Broadcast,
&crate::communication::round_topic::<Block>(1, 0),
&[],
)
);
}
}
}
}
@@ -73,7 +73,6 @@ const UNREGISTERED_TOPIC_REPUTATION_CHANGE: i32 = -(1 << 10);
struct PeerConsensus<H> {
known_messages: HashSet<H>,
filtered_messages: HashMap<H, usize>,
roles: Roles,
}
@@ -108,11 +107,7 @@ pub enum MessageRecipient {
#[cfg_attr(test, derive(Debug))]
pub enum MessageIntent {
/// Requested broadcast.
Broadcast {
/// How many times this message was previously filtered by the gossip
/// validator when trying to propagate to a given peer.
previous_attempts: usize
},
Broadcast,
/// Requested broadcast to all peers.
ForcedBroadcast,
/// Periodic rebroadcast of all messages to all peers.
@@ -131,7 +126,7 @@ pub enum ValidationResult<H> {
impl MessageIntent {
fn broadcast() -> MessageIntent {
MessageIntent::Broadcast { previous_attempts: 0 }
MessageIntent::Broadcast
}
}
@@ -190,7 +185,8 @@ fn propagate<'a, B: BlockT, I>(
peers: &mut HashMap<PeerId, PeerConsensus<B::Hash>>,
validators: &HashMap<ConsensusEngineId, Arc<dyn Validator<B>>>,
)
where I: Clone + IntoIterator<Item=(&'a B::Hash, &'a B::Hash, &'a ConsensusMessage)>, // (msg_hash, topic, message)
// (msg_hash, topic, message)
where I: Clone + IntoIterator<Item=(&'a B::Hash, &'a B::Hash, &'a ConsensusMessage)>,
{
let mut check_fns = HashMap::new();
let mut message_allowed = move |who: &PeerId, intent: MessageIntent, topic: &B::Hash, message: &ConsensusMessage| {
@@ -209,17 +205,12 @@ fn propagate<'a, B: BlockT, I>(
for (id, ref mut peer) in peers.iter_mut() {
let mut batch = Vec::new();
for (message_hash, topic, message) in messages.clone() {
let previous_attempts = peer.filtered_messages
.get(&message_hash)
.cloned()
.unwrap_or(0);
let intent = match intent {
MessageIntent::Broadcast { .. } =>
if peer.known_messages.contains(&message_hash) {
continue;
} else {
MessageIntent::Broadcast { previous_attempts }
MessageIntent::Broadcast
},
MessageIntent::PeriodicRebroadcast =>
if peer.known_messages.contains(&message_hash) {
@@ -227,22 +218,15 @@ fn propagate<'a, B: BlockT, I>(
} else {
// peer doesn't know message, so the logic should treat it as an
// initial broadcast.
MessageIntent::Broadcast { previous_attempts }
MessageIntent::Broadcast
},
other => other,
};
if !message_allowed(id, intent, &topic, &message) {
let count = peer.filtered_messages
.entry(message_hash.clone())
.or_insert(0);
*count += 1;
continue;
}
peer.filtered_messages.remove(message_hash);
peer.known_messages.insert(message_hash.clone());
trace!(target: "gossip", "Propagating to {}: {:?}", id, message);
@@ -338,7 +322,6 @@ impl<B: BlockT> ConsensusGossip<B> {
trace!(target:"gossip", "Registering {:?} {}", roles, who);
self.peers.insert(who.clone(), PeerConsensus {
known_messages: HashSet::new(),
filtered_messages: HashMap::new(),
roles,
});
for (engine_id, v) in self.validators.clone() {
@@ -448,7 +431,6 @@ impl<B: BlockT> ConsensusGossip<B> {
for (_, ref mut peer) in self.peers.iter_mut() {
peer.known_messages.retain(|h| known_messages.contains(h));
peer.filtered_messages.retain(|h, _| known_messages.contains(h));
}
}
@@ -566,12 +548,7 @@ impl<B: BlockT> ConsensusGossip<B> {
let intent = if force {
MessageIntent::ForcedBroadcast
} else {
let previous_attempts = peer.filtered_messages
.get(&entry.message_hash)
.cloned()
.unwrap_or(0);
MessageIntent::Broadcast { previous_attempts }
MessageIntent::Broadcast
};
if !force && peer.known_messages.contains(&entry.message_hash) {
@@ -579,16 +556,9 @@ impl<B: BlockT> ConsensusGossip<B> {
}
if !message_allowed(who, intent, &entry.topic, &entry.message.data) {
let count = peer.filtered_messages
.entry(entry.message_hash)
.or_insert(0);
*count += 1;
continue;
}
peer.filtered_messages.remove(&entry.message_hash);
peer.known_messages.insert(entry.message_hash.clone());
trace!(target: "gossip", "Sending topic message to {}: {:?}", who, entry.message);
@@ -632,7 +602,6 @@ impl<B: BlockT> ConsensusGossip<B> {
trace!(target: "gossip", "Sending direct to {}: {:?}", who, message);
peer.filtered_messages.remove(&message_hash);
peer.known_messages.insert(message_hash);
protocol.send_consensus(who.clone(), vec![message.clone()]);
}
@@ -662,8 +631,7 @@ impl<B: BlockT> Validator<B> for DiscardAll {
#[cfg(test)]
mod tests {
use std::sync::{Arc, atomic::{AtomicBool, Ordering}};
use parking_lot::Mutex;
use std::sync::Arc;
use sr_primitives::testing::{H256, Block as RawBlock, ExtrinsicWrapper};
use futures03::executor::block_on_stream;
@@ -812,109 +780,4 @@ mod tests {
let _ = consensus.live_message_sinks.remove(&([0, 0, 0, 0], topic));
assert_eq!(stream.next(), None);
}
#[test]
fn keeps_track_of_broadcast_attempts() {
struct DummyNetworkContext;
impl<B: BlockT> Context<B> for DummyNetworkContext {
fn report_peer(&mut self, _who: PeerId, _reputation: i32) {}
fn disconnect_peer(&mut self, _who: PeerId) {}
fn send_consensus(&mut self, _who: PeerId, _consensus: Vec<ConsensusMessage>) {}
fn send_chain_specific(&mut self, _who: PeerId, _message: Vec<u8>) {}
}
// A mock gossip validator that never expires any message, allows
// setting whether messages should be allowed and keeps track of any
// messages passed to `message_allowed`.
struct MockValidator {
allow: AtomicBool,
messages: Arc<Mutex<Vec<(Vec<u8>, MessageIntent)>>>,
}
impl MockValidator {
fn new() -> MockValidator {
MockValidator {
allow: AtomicBool::new(false),
messages: Arc::new(Mutex::new(Vec::new())),
}
}
}
impl Validator<Block> for MockValidator {
fn validate(
&self,
_context: &mut dyn ValidatorContext<Block>,
_sender: &PeerId,
_data: &[u8],
) -> ValidationResult<H256> {
ValidationResult::ProcessAndKeep(H256::default())
}
fn message_expired<'a>(&'a self) -> Box<dyn FnMut(H256, &[u8]) -> bool + 'a> {
Box::new(move |_topic, _data| false)
}
fn message_allowed<'a>(&'a self) -> Box<dyn FnMut(&PeerId, MessageIntent, &H256, &[u8]) -> bool + 'a> {
let messages = self.messages.clone();
Box::new(move |_, intent, _, data| {
messages.lock().push((data.to_vec(), intent));
self.allow.load(Ordering::SeqCst)
})
}
}
// we setup an instance of the mock gossip validator, add a new peer to
// it and register a message.
let mut consensus = ConsensusGossip::<Block>::new();
let validator = Arc::new(MockValidator::new());
consensus.register_validator_internal([0, 0, 0, 0], validator.clone());
consensus.new_peer(
&mut DummyNetworkContext,
PeerId::random(),
Roles::AUTHORITY,
);
let data = vec![1, 2, 3];
let msg = ConsensusMessage { data: data.clone(), engine_id: [0, 0, 0, 0] };
consensus.register_message(H256::default(), msg);
// tick the gossip handler and make sure it triggers a message rebroadcast
let mut tick = || {
consensus.next_broadcast = std::time::Instant::now();
consensus.tick(&mut DummyNetworkContext);
};
// by default we won't allow the message we registered, so everytime we
// tick the gossip handler, the message intent should be kept as
// `Broadcast` but the previous attempts should be incremented.
tick();
assert_eq!(
validator.messages.lock().pop().unwrap(),
(data.clone(), MessageIntent::Broadcast { previous_attempts: 0 }),
);
tick();
assert_eq!(
validator.messages.lock().pop().unwrap(),
(data.clone(), MessageIntent::Broadcast { previous_attempts: 1 }),
);
// we set the validator to allow the message to go through
validator.allow.store(true, Ordering::SeqCst);
// we still get the same message intent but it should be delivered now
tick();
assert_eq!(
validator.messages.lock().pop().unwrap(),
(data.clone(), MessageIntent::Broadcast { previous_attempts: 2 }),
);
// ticking the gossip handler again the message intent should change to
// `PeriodicRebroadcast` since it was sent.
tick();
assert_eq!(
validator.messages.lock().pop().unwrap(),
(data.clone(), MessageIntent::PeriodicRebroadcast),
);
}
}