improved gossip topology (#3270)

* gossip-support: gossip topology

* some fixes

* handle view update for newly added gossip peers

* fix neighbors calculation

* fix test

* resolve TODOs

* typo

* guide updates

* spaces in the guide

* sneaky spaces

* hash randomness

* address some review nits

* use unbounded in bridge for subsystem msg
This commit is contained in:
Andronik Ordian
2021-06-18 21:30:35 +02:00
committed by GitHub
parent ae5b355754
commit ad9c02886d
21 changed files with 720 additions and 287 deletions
+181 -175
View File
File diff suppressed because it is too large Load Diff
@@ -81,6 +81,10 @@ struct State {
/// Peer view data is partially stored here, and partially inline within the [`BlockEntry`]s
peer_views: HashMap<PeerId, View>,
/// Track all our neighbors in the current gossip topology.
/// We're not necessarily connected to all of them.
gossip_peers: HashSet<PeerId>,
}
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
@@ -209,6 +213,15 @@ impl State {
entry.known_by.remove(&peer_id);
})
}
NetworkBridgeEvent::NewGossipTopology(peers) => {
let newly_added: Vec<PeerId> = peers.difference(&self.gossip_peers).cloned().collect();
self.gossip_peers = peers;
for peer_id in newly_added {
if let Some(view) = self.peer_views.remove(&peer_id) {
self.handle_peer_view_change(ctx, metrics, peer_id, view).await;
}
}
}
NetworkBridgeEvent::PeerViewChange(peer_id, view) => {
self.handle_peer_view_change(ctx, metrics, peer_id, view).await;
}
@@ -336,6 +349,7 @@ impl State {
);
Self::unify_with_peer(
ctx,
&self.gossip_peers,
metrics,
&mut self.blocks,
peer_id.clone(),
@@ -439,11 +453,9 @@ impl State {
peer_id: PeerId,
view: View,
) {
let lucky = util::gen_ratio_sqrt_subset(self.peer_views.len(), util::MIN_GOSSIP_PEERS);
tracing::trace!(
target: LOG_TARGET,
?view,
?lucky,
"Peer view change",
);
let finalized_number = view.finalized_number;
@@ -468,12 +480,9 @@ impl State {
});
}
if !lucky {
return;
}
Self::unify_with_peer(
ctx,
&self.gossip_peers,
metrics,
&mut self.blocks,
peer_id.clone(),
@@ -704,7 +713,12 @@ impl State {
.collect::<Vec<_>>();
let assignments = vec![(assignment, claimed_candidate_index)];
let peers = util::choose_random_sqrt_subset(peers, MIN_GOSSIP_PEERS);
let gossip_peers = &self.gossip_peers;
let peers = util::choose_random_subset(
|e| gossip_peers.contains(e),
peers,
MIN_GOSSIP_PEERS,
);
// Add the fingerprint of the assignment to the knowledge of each peer.
for peer in peers.iter() {
@@ -943,7 +957,13 @@ impl State {
.cloned()
.filter(|key| maybe_peer_id.as_ref().map_or(true, |id| id != key))
.collect::<Vec<_>>();
let peers = util::choose_random_sqrt_subset(peers, MIN_GOSSIP_PEERS);
let gossip_peers = &self.gossip_peers;
let peers = util::choose_random_subset(
|e| gossip_peers.contains(e),
peers,
MIN_GOSSIP_PEERS,
);
// Add the fingerprint of the assignment to the knowledge of each peer.
for peer in peers.iter() {
@@ -975,11 +995,27 @@ impl State {
async fn unify_with_peer(
ctx: &mut impl SubsystemContext<Message = ApprovalDistributionMessage>,
gossip_peers: &HashSet<PeerId>,
metrics: &Metrics,
entries: &mut HashMap<Hash, BlockEntry>,
peer_id: PeerId,
view: View,
) {
let is_gossip_peer = gossip_peers.contains(&peer_id);
let lucky = is_gossip_peer || util::gen_ratio(
util::MIN_GOSSIP_PEERS.saturating_sub(gossip_peers.len()),
util::MIN_GOSSIP_PEERS,
);
if !lucky {
tracing::trace!(
target: LOG_TARGET,
?peer_id,
"Unlucky peer",
);
return;
}
metrics.on_unify_with_peer();
let _timer = metrics.time_unify_with_peer();
let mut to_send: Vec<Hash> = Vec::new();
@@ -80,10 +80,14 @@ impl BitfieldGossipMessage {
/// overseer ordered us to work on.
#[derive(Default, Debug)]
struct ProtocolState {
/// track all active peers and their views
/// Track all active peers and their views
/// to determine what is relevant to them.
peer_views: HashMap<PeerId, View>,
/// Track all our neighbors in the current gossip topology.
/// We're not necessarily connected to all of them.
gossip_peers: HashSet<PeerId>,
/// Our current view.
view: OurView,
@@ -294,13 +298,14 @@ where
return;
};
let peer_views = &mut state.peer_views;
let msg = BitfieldGossipMessage {
relay_parent,
signed_availability,
};
relay_message(ctx, job_data, peer_views, validator, msg).await;
let gossip_peers = &state.gossip_peers;
let peer_views = &mut state.peer_views;
relay_message(ctx, job_data, gossip_peers, peer_views, validator, msg).await;
metrics.on_own_bitfield_gossipped();
}
@@ -311,6 +316,7 @@ where
async fn relay_message<Context>(
ctx: &mut Context,
job_data: &mut PerRelayParentData,
gossip_peers: &HashSet<PeerId>,
peer_views: &mut HashMap<PeerId, View>,
validator: ValidatorId,
message: BitfieldGossipMessage,
@@ -353,7 +359,11 @@ where
}
})
.collect::<Vec<PeerId>>();
let interested_peers = util::choose_random_sqrt_subset(interested_peers, MIN_GOSSIP_PEERS);
let interested_peers = util::choose_random_subset(
|e| gossip_peers.contains(e),
interested_peers,
MIN_GOSSIP_PEERS,
);
interested_peers.iter()
.for_each(|peer|{
// track the message as sent for this peer
@@ -497,7 +507,7 @@ where
metrics.on_bitfield_received();
one_per_validator.insert(validator.clone(), message.clone());
relay_message(ctx, job_data, &mut state.peer_views, validator, message).await;
relay_message(ctx, job_data, &state.gossip_peers, &mut state.peer_views, validator, message).await;
modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE_FIRST).await
}
@@ -535,6 +545,15 @@ where
// get rid of superfluous data
state.peer_views.remove(&peerid);
}
NetworkBridgeEvent::NewGossipTopology(peers) => {
let newly_added: Vec<PeerId> = peers.difference(&state.gossip_peers).cloned().collect();
state.gossip_peers = peers;
for peer in newly_added {
if let Some(view) = state.peer_views.remove(&peer) {
handle_peer_view_change(ctx, state, peer, view).await;
}
}
}
NetworkBridgeEvent::PeerViewChange(peerid, view) => {
tracing::trace!(
target: LOG_TARGET,
@@ -590,7 +609,13 @@ where
Context: SubsystemContext<Message = BitfieldDistributionMessage>,
{
let added = state.peer_views.entry(origin.clone()).or_default().replace_difference(view).cloned().collect::<Vec<_>>();
let lucky = util::gen_ratio_sqrt_subset(state.peer_views.len(), util::MIN_GOSSIP_PEERS);
let is_gossip_peer = state.gossip_peers.contains(&origin);
let lucky = is_gossip_peer || util::gen_ratio(
util::MIN_GOSSIP_PEERS.saturating_sub(state.gossip_peers.len()),
util::MIN_GOSSIP_PEERS,
);
if !lucky {
tracing::trace!(
target: LOG_TARGET,
@@ -599,9 +624,9 @@ where
);
return;
}
// Send all messages we've seen before and the peer is now interested
// in to that peer.
let delta_set: Vec<(ValidatorId, BitfieldGossipMessage)> = added
.into_iter()
.filter_map(|new_relay_parent_interest| {
@@ -812,3 +837,4 @@ impl metrics::Metrics for Metrics {
Ok(Metrics(Some(metrics)))
}
}
@@ -26,6 +26,7 @@ use sp_application_crypto::AppKey;
use sp_keystore::testing::KeyStore;
use std::sync::Arc;
use std::time::Duration;
use std::iter::FromIterator as _;
use assert_matches::assert_matches;
use polkadot_node_network_protocol::{view, ObservedRole, our_view};
use polkadot_subsystem::jaeger;
@@ -64,9 +65,11 @@ fn prewarmed_state(
},
},
peer_views: peers
.into_iter()
.iter()
.cloned()
.map(|peer| (peer, view!(relay_parent)))
.collect(),
gossip_peers: peers.into_iter().collect(),
view: our_view!(relay_parent),
}
}
@@ -425,9 +428,13 @@ fn do_not_relay_message_twice() {
make_subsystem_context::<BitfieldDistributionMessage, _>(pool);
executor::block_on(async move {
let gossip_peers = HashSet::from_iter(vec![
peer_a.clone(), peer_b.clone(),
].into_iter());
relay_message(
&mut ctx,
state.per_relay_parent.get_mut(&hash).unwrap(),
&gossip_peers,
&mut state.peer_views,
validator.clone(),
msg.clone(),
@@ -460,6 +467,7 @@ fn do_not_relay_message_twice() {
relay_message(
&mut ctx,
state.per_relay_parent.get_mut(&hash).unwrap(),
&gossip_peers,
&mut state.peer_views,
validator.clone(),
msg.clone(),
+30 -2
View File
@@ -48,7 +48,7 @@ use polkadot_node_subsystem_util::metrics::{self, prometheus};
/// To be added to [`NetworkConfiguration::extra_sets`].
pub use polkadot_node_network_protocol::peer_set::{peer_sets_info, IsAuthority};
use std::collections::{HashMap, hash_map};
use std::collections::{HashMap, hash_map, HashSet};
use std::iter::ExactSizeIterator;
use std::sync::Arc;
@@ -58,7 +58,7 @@ mod validator_discovery;
///
/// Defines the `Network` trait with an implementation for an `Arc<NetworkService>`.
mod network;
use network::{Network, send_message};
use network::{Network, send_message, get_peer_id_by_authority_id};
/// Request multiplexer for combining the multiple request sources into a single `Stream` of `AllMessages`.
mod multiplexer;
@@ -557,6 +557,34 @@ where
network_service = ns;
authority_discovery_service = ads;
}
NetworkBridgeMessage::NewGossipTopology {
our_neighbors,
} => {
tracing::debug!(
target: LOG_TARGET,
action = "NewGossipTopology",
neighbors = our_neighbors.len(),
"Gossip topology has changed",
);
let ads = &mut authority_discovery_service;
let mut gossip_peers = HashSet::with_capacity(our_neighbors.len());
for authority in our_neighbors {
let addr = get_peer_id_by_authority_id(
ads,
authority.clone(),
).await;
if let Some(peer_id) = addr {
gossip_peers.insert(peer_id);
}
}
dispatch_validation_event_to_all_unbounded(
NetworkBridgeEvent::NewGossipTopology(gossip_peers),
ctx.sender(),
);
}
}
Err(e) => return Err(e.into()),
},
+15 -1
View File
@@ -35,7 +35,7 @@ use polkadot_node_network_protocol::{
request_response::{OutgoingRequest, Requests, Recipient},
PeerId, UnifiedReputationChange as Rep,
};
use polkadot_primitives::v1::{Block, Hash};
use polkadot_primitives::v1::{AuthorityDiscoveryId, Block, Hash};
use polkadot_subsystem::{SubsystemError, SubsystemResult};
use crate::validator_discovery::AuthorityDiscovery;
@@ -303,3 +303,17 @@ impl Network for Arc<NetworkService<Block, Hash>> {
);
}
}
/// We assume one peer_id per authority_id.
pub async fn get_peer_id_by_authority_id<AD: AuthorityDiscovery>(
authority_discovery: &mut AD,
authority: AuthorityDiscoveryId,
) -> Option<PeerId> {
// Note: `get_addresses_by_authority_id` searched in a cache, and it thus expected
// to be very quick.
authority_discovery
.get_addresses_by_authority_id(authority).await
.into_iter()
.flat_map(|list| list.into_iter())
.find_map(|addr| parse_addr(addr).ok().map(|(p, _)| p))
}
@@ -795,6 +795,9 @@ async fn handle_network_msg(
PeerMessage(remote, msg) => {
handle_incoming_peer_message(ctx, runtime, state, remote, msg).await?;
}
NewGossipTopology(..) => {
// impossibru!
}
}
Ok(())
@@ -899,6 +899,9 @@ where
state.peer_data.remove(&peer_id);
state.metrics.note_collator_peer_count(state.peer_data.len());
},
NewGossipTopology(..) => {
// impossibru!
}
PeerViewChange(peer_id, view) => {
handle_peer_view_change(state, peer_id, view).await?;
},
@@ -7,6 +7,7 @@ edition = "2018"
[dependencies]
sp-application-crypto = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
polkadot-node-network-protocol = { path = "../protocol" }
polkadot-node-subsystem = { path = "../../subsystem" }
@@ -14,12 +15,15 @@ polkadot-node-subsystem-util = { path = "../../subsystem-util" }
polkadot-primitives = { path = "../../../primitives" }
futures = "0.3.15"
rand = { version = "0.8.3", default-features = false }
rand_chacha = { version = "0.3.0", default-features = false }
tracing = "0.1.26"
[dev-dependencies]
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master", features = ["std"] }
sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-consensus-babe = { git = "https://github.com/paritytech/substrate", branch = "master" }
polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" }
+151 -41
View File
@@ -15,14 +15,23 @@
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! This subsystem is responsible for keeping track of session changes
//! and issuing a connection request to the validators relevant to
//! the gossiping subsystems on every new session.
//! and issuing a connection request to the relevant validators
//! on every new session.
//!
//! In addition to that, it creates a gossip overlay topology
//! which limits the amount of messages sent and received
//! to be an order of sqrt of the validators. Our neighbors
//! in this graph will be forwarded to the network bridge with
//! the `NetworkBridgeMessage::NewGossipTopology` message.
use std::time::{Duration, Instant};
use futures::{channel::oneshot, FutureExt as _};
use rand::{SeedableRng, seq::SliceRandom as _};
use rand_chacha::ChaCha20Rng;
use polkadot_node_subsystem::{
messages::{
AllMessages, GossipSupportMessage, NetworkBridgeMessage,
RuntimeApiMessage, RuntimeApiRequest,
},
ActiveLeavesUpdate, FromOverseer, OverseerSignal,
Subsystem, SpawnedSubsystem, SubsystemContext,
@@ -39,8 +48,8 @@ use sp_application_crypto::{Public, AppKey};
mod tests;
const LOG_TARGET: &str = "parachain::gossip-support";
// How much time should we wait since the last
// authority discovery resolution failure.
// How much time should we wait to reissue a connection request
// since the last authority discovery resolution failure.
const BACKOFF_DURATION: Duration = Duration::from_secs(5);
/// The Gossip Support subsystem.
@@ -85,7 +94,7 @@ impl GossipSupport {
tracing::debug!(
target: LOG_TARGET,
err = ?e,
"Failed to receive a message from Overseer, exiting"
"Failed to receive a message from Overseer, exiting",
);
return;
},
@@ -120,28 +129,30 @@ async fn determine_relevant_authorities(
tracing::debug!(
target: LOG_TARGET,
authority_count = ?authorities.len(),
"Determined relevant authorities"
"Determined relevant authorities",
);
Ok(authorities)
}
/// Return an error if we're not a validator in the given set (do not have keys).
/// Otherwise, returns the index of our keys in `authorities`.
async fn ensure_i_am_an_authority(
keystore: &SyncCryptoStorePtr,
authorities: &[AuthorityDiscoveryId],
) -> Result<(), util::Error> {
for v in authorities {
if CryptoStore::has_keys(&**keystore, &[(v.to_raw_vec(), AuthorityDiscoveryId::ID)])
.await
{
return Ok(());
) -> Result<usize, util::Error> {
for (i, v) in authorities.iter().enumerate() {
if CryptoStore::has_keys(
&**keystore,
&[(v.to_raw_vec(), AuthorityDiscoveryId::ID)]
).await {
return Ok(i);
}
}
Err(util::Error::NotAValidator)
}
/// A helper function for making a `ConnectToValidators` request.
pub async fn connect_to_authorities(
async fn connect_to_authorities(
ctx: &mut impl SubsystemContext,
validator_ids: Vec<AuthorityDiscoveryId>,
peer_set: PeerSet,
@@ -157,6 +168,79 @@ pub async fn connect_to_authorities(
failed_rx
}
/// We partition the list of all sorted `authorities` into sqrt(len) groups of sqrt(len) size
/// and form a matrix where each validator is connected to all validators in its row and column.
/// This is similar to [web3] research proposed topology, except for the groups are not parachain
/// groups (because not all validators are parachain validators and the group size is small),
/// but formed randomly via BABE randomness from two epochs ago.
/// This limits the amount of gossip peers to 2 * sqrt(len) and ensures the diameter of 2.
///
/// [web3]: https://research.web3.foundation/en/latest/polkadot/networking/3-avail-valid.html#topology
async fn update_gossip_topology(
ctx: &mut impl SubsystemContext,
our_index: usize,
authorities: Vec<AuthorityDiscoveryId>,
relay_parent: Hash,
) -> Result<(), util::Error> {
// retrieve BABE randomness
let random_seed = {
let (tx, rx) = oneshot::channel();
ctx.send_message(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::CurrentBabeEpoch(tx),
).into()).await;
let randomness = rx.await??.randomness;
let mut subject = [0u8; 40];
subject[..8].copy_from_slice(b"gossipsu");
subject[8..].copy_from_slice(&randomness);
sp_core::blake2_256(&subject)
};
// shuffle the indices
let mut rng: ChaCha20Rng = SeedableRng::from_seed(random_seed);
let len = authorities.len();
let mut indices: Vec<usize> = (0..len).collect();
indices.shuffle(&mut rng);
let our_shuffled_position = indices.iter()
.position(|i| *i == our_index)
.expect("our_index < len; indices contains it; qed");
let neighbors = matrix_neighbors(our_shuffled_position, len);
let our_neighbors = neighbors.map(|i| authorities[indices[i]].clone()).collect();
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::NewGossipTopology {
our_neighbors,
}
)).await;
Ok(())
}
/// Compute our row and column neighbors in a matrix
fn matrix_neighbors(our_index: usize, len: usize) -> impl Iterator<Item=usize> {
assert!(our_index < len, "our_index is computed using `enumerate`; qed");
// e.g. for size 11 the matrix would be
//
// 0 1 2
// 3 4 5
// 6 7 8
// 9 10
//
// and for index 10, the neighbors would be 1, 4, 7, 9
let sqrt = (len as f64).sqrt() as usize;
let our_row = our_index / sqrt;
let our_column = our_index % sqrt;
let row_neighbors = our_row * sqrt..std::cmp::min(our_row * sqrt + sqrt, len);
let column_neighbors = (our_column..len).step_by(sqrt);
row_neighbors.chain(column_neighbors).filter(move |i| *i != our_index)
}
impl State {
/// 1. Determine if the current session index has changed.
/// 2. If it has, determine relevant validators
@@ -171,46 +255,72 @@ impl State {
let current_index = util::request_session_index_for_child(leaf, ctx.sender()).await.await??;
let since_failure = self.last_failure.map(|i| i.elapsed()).unwrap_or_default();
let force_request = since_failure >= BACKOFF_DURATION;
let leaf_session = Some((current_index, leaf));
let maybe_new_session = match self.last_session_index {
Some(i) if current_index <= i && !force_request => None,
_ => Some((current_index, leaf)),
Some(i) if current_index <= i => None,
_ => leaf_session,
};
if let Some((new_session, relay_parent)) = maybe_new_session {
tracing::debug!(
target: LOG_TARGET,
%new_session,
%force_request,
"New session detected",
);
let maybe_issue_connection = if force_request {
leaf_session
} else {
maybe_new_session
};
if let Some((session_index, relay_parent)) = maybe_issue_connection {
let is_new_session = maybe_new_session.is_some();
if is_new_session {
tracing::debug!(
target: LOG_TARGET,
%session_index,
"New session detected",
);
}
let authorities = determine_relevant_authorities(ctx, relay_parent).await?;
ensure_i_am_an_authority(keystore, &authorities).await?;
let num = authorities.len();
tracing::debug!(target: LOG_TARGET, %num, "Issuing a connection request");
let our_index = ensure_i_am_an_authority(keystore, &authorities).await?;
let failures = connect_to_authorities(
ctx,
authorities,
PeerSet::Validation,
).await;
self.issue_connection_request(ctx, authorities.clone()).await?;
// we await for the request to be processed
// this is fine, it should take much less time than one session
let failures = failures.await.unwrap_or(num);
self.last_session_index = Some(new_session);
// issue another request for the same session
// if at least a third of the authorities were not resolved
self.last_failure = if failures >= num / 3 {
Some(Instant::now())
} else {
None
if is_new_session {
self.last_session_index = Some(session_index);
update_gossip_topology(ctx, our_index, authorities, relay_parent).await?;
}
}
}
Ok(())
}
async fn issue_connection_request(
&mut self,
ctx: &mut impl SubsystemContext,
authorities: Vec<AuthorityDiscoveryId>,
) -> Result<(), util::Error> {
let num = authorities.len();
tracing::debug!(target: LOG_TARGET, %num, "Issuing a connection request");
let failures = connect_to_authorities(
ctx,
authorities,
PeerSet::Validation,
).await;
// we await for the request to be processed
// this is fine, it should take much less time than one session
let failures = failures.await.unwrap_or(num);
// issue another request for the same session
// if at least a third of the authorities were not resolved
self.last_failure = if failures >= num / 3 {
Some(Instant::now())
} else {
None
};
Ok(())
}
}
impl<Context> Subsystem<Context> for GossipSupport
@@ -26,6 +26,9 @@ use polkadot_node_subsystem_util::TimeoutExt as _;
use sc_keystore::LocalKeystore;
use sp_keyring::Sr25519Keyring;
use sp_keystore::SyncCryptoStore;
use sp_consensus_babe::{
Epoch as BabeEpoch, BabeEpochConfiguration, AllowedSlots,
};
use std::sync::Arc;
use std::time::Duration;
@@ -117,6 +120,47 @@ fn authorities() -> Vec<AuthorityDiscoveryId> {
]
}
fn neighbors() -> Vec<AuthorityDiscoveryId> {
vec![
Sr25519Keyring::One.public().into(),
Sr25519Keyring::Alice.public().into(),
Sr25519Keyring::Eve.public().into(),
]
}
async fn test_neighbors(overseer: &mut VirtualOverseer) {
assert_matches!(
overseer_recv(overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
_,
RuntimeApiRequest::CurrentBabeEpoch(tx),
)) => {
let _ = tx.send(Ok(BabeEpoch {
epoch_index: 2 as _,
start_slot: 0.into(),
duration: 200,
authorities: vec![(Sr25519Keyring::Alice.public().into(), 1)],
randomness: [0u8; 32],
config: BabeEpochConfiguration {
c: (1, 4),
allowed_slots: AllowedSlots::PrimarySlots,
},
})).unwrap();
}
);
assert_matches!(
overseer_recv(overseer).await,
AllMessages::NetworkBridge(NetworkBridgeMessage::NewGossipTopology {
our_neighbors,
}) => {
let mut got: Vec<_> = our_neighbors.into_iter().collect();
got.sort();
assert_eq!(got, neighbors());
}
);
}
#[test]
fn issues_a_connection_request_on_new_session() {
let hash = Hash::repeat_byte(0xAA);
@@ -157,6 +201,8 @@ fn issues_a_connection_request_on_new_session() {
}
);
test_neighbors(overseer).await;
virtual_overseer
});
@@ -223,6 +269,8 @@ fn issues_a_connection_request_on_new_session() {
}
);
test_neighbors(overseer).await;
virtual_overseer
});
assert_eq!(state.last_session_index, Some(2));
@@ -268,6 +316,9 @@ fn issues_a_connection_request_when_last_request_was_mostly_unresolved() {
failed.send(2).unwrap();
}
);
test_neighbors(overseer).await;
virtual_overseer
});
@@ -312,6 +363,7 @@ fn issues_a_connection_request_when_last_request_was_mostly_unresolved() {
failed.send(1).unwrap();
}
);
virtual_overseer
});
@@ -319,3 +371,18 @@ fn issues_a_connection_request_when_last_request_was_mostly_unresolved() {
assert!(state.last_failure.is_none());
}
#[test]
fn test_matrix_neighbors() {
for (our_index, len, expected) in vec![
(0usize, 1usize, vec![]),
(1, 2, vec![0usize]),
(0, 9, vec![1, 2, 3, 6]),
(9, 10, vec![0, 3, 6]),
(10, 11, vec![1, 4, 7, 9]),
(7, 11, vec![1, 4, 6, 8, 10]),
].into_iter() {
let mut result: Vec<_> = matrix_neighbors(our_index, len).collect();
result.sort();
assert_eq!(result, expected);
}
}
@@ -843,6 +843,7 @@ fn check_statement_signature(
/// sends all statements dependent on that statement to peers who could previously not receive
/// them but now can.
async fn circulate_statement_and_dependents(
gossip_peers: &HashSet<PeerId>,
peers: &mut HashMap<PeerId, PeerData>,
active_heads: &mut HashMap<Hash, ActiveHeadData>,
ctx: &mut impl SubsystemContext,
@@ -868,7 +869,14 @@ async fn circulate_statement_and_dependents(
{
Some((
*stored.compact().candidate_hash(),
circulate_statement(peers, ctx, relay_parent, stored, priority_peers).await,
circulate_statement(
gossip_peers,
peers,
ctx,
relay_parent,
stored,
priority_peers,
).await,
))
},
_ => None,
@@ -943,6 +951,7 @@ fn is_statement_large(statement: &SignedFullStatement) -> bool {
/// Circulates a statement to all peers who have not seen it yet, and returns
/// an iterator over peers who need to have dependent statements sent.
async fn circulate_statement<'a>(
gossip_peers: &HashSet<PeerId>,
peers: &mut HashMap<PeerId, PeerData>,
ctx: &mut impl SubsystemContext,
relay_parent: Hash,
@@ -968,7 +977,11 @@ async fn circulate_statement<'a>(
peers_to_send.retain(|p| !priority_set.contains(p));
let mut peers_to_send =
util::choose_random_sqrt_subset(peers_to_send, MIN_GOSSIP_PEERS);
util::choose_random_subset(
|e| gossip_peers.contains(e),
peers_to_send,
MIN_GOSSIP_PEERS,
);
// We don't want to use less peers, than we would without any priority peers:
let min_size = std::cmp::max(peers_to_send.len(), MIN_GOSSIP_PEERS);
// Make set full:
@@ -1248,6 +1261,7 @@ async fn launch_request(
///
async fn handle_incoming_message_and_circulate<'a>(
peer: PeerId,
gossip_peers: &HashSet<PeerId>,
peers: &mut HashMap<PeerId, PeerData>,
active_heads: &'a mut HashMap<Hash, ActiveHeadData>,
ctx: &mut impl SubsystemContext,
@@ -1280,6 +1294,7 @@ async fn handle_incoming_message_and_circulate<'a>(
// that require dependents. Thus, if this is a `Seconded` statement for a candidate we
// were not aware of before, we cannot have any dependent statements from the candidate.
let _ = circulate_statement(
gossip_peers,
peers,
ctx,
relay_parent,
@@ -1444,8 +1459,9 @@ async fn handle_incoming_message<'a>(
}
/// Update a peer's view. Sends all newly unlocked statements based on the previous
async fn update_peer_view_and_send_unlocked(
async fn update_peer_view_and_maybe_send_unlocked(
peer: PeerId,
gossip_peers: &HashSet<PeerId>,
peer_data: &mut PeerData,
ctx: &mut impl SubsystemContext,
active_heads: &HashMap<Hash, ActiveHeadData>,
@@ -1459,12 +1475,20 @@ async fn update_peer_view_and_send_unlocked(
let _ = peer_data.view_knowledge.remove(removed);
}
let is_gossip_peer = gossip_peers.contains(&peer);
let lucky = is_gossip_peer || util::gen_ratio(
util::MIN_GOSSIP_PEERS.saturating_sub(gossip_peers.len()),
util::MIN_GOSSIP_PEERS,
);
// Add entries for all relay-parents in the new view but not the old.
// Furthermore, send all statements we have for those relay parents.
let new_view = peer_data.view.difference(&old_view).copied().collect::<Vec<_>>();
for new in new_view.iter().copied() {
peer_data.view_knowledge.insert(new, Default::default());
if !lucky {
continue;
}
if let Some(active_head) = active_heads.get(&new) {
send_statements(
peer.clone(),
@@ -1480,6 +1504,7 @@ async fn update_peer_view_and_send_unlocked(
async fn handle_network_update(
peers: &mut HashMap<PeerId, PeerData>,
gossip_peers: &mut HashSet<PeerId>,
authorities: &mut HashMap<AuthorityDiscoveryId, PeerId>,
active_heads: &mut HashMap<Hash, ActiveHeadData>,
ctx: &mut impl SubsystemContext,
@@ -1514,9 +1539,28 @@ async fn handle_network_update(
authorities.remove(&auth_id);
}
}
NetworkBridgeEvent::NewGossipTopology(new_peers) => {
let newly_added: Vec<PeerId> = new_peers.difference(gossip_peers).cloned().collect();
*gossip_peers = new_peers;
for peer in newly_added {
if let Some(data) = peers.get_mut(&peer) {
let view = std::mem::take(&mut data.view);
update_peer_view_and_maybe_send_unlocked(
peer,
gossip_peers,
data,
ctx,
&*active_heads,
view,
metrics,
).await
}
}
}
NetworkBridgeEvent::PeerMessage(peer, message) => {
handle_incoming_message_and_circulate(
peer,
gossip_peers,
peers,
active_heads,
ctx,
@@ -1534,8 +1578,9 @@ async fn handle_network_update(
);
match peers.get_mut(&peer) {
Some(data) => {
update_peer_view_and_send_unlocked(
update_peer_view_and_maybe_send_unlocked(
peer,
gossip_peers,
data,
ctx,
&*active_heads,
@@ -1558,6 +1603,7 @@ impl StatementDistribution {
mut ctx: impl SubsystemContext<Message = StatementDistributionMessage>,
) -> std::result::Result<(), Fatal> {
let mut peers: HashMap<PeerId, PeerData> = HashMap::new();
let mut gossip_peers: HashSet<PeerId> = HashSet::new();
let mut authorities: HashMap<AuthorityDiscoveryId, PeerId> = HashMap::new();
let mut active_heads: HashMap<Hash, ActiveHeadData> = HashMap::new();
@@ -1576,6 +1622,7 @@ impl StatementDistribution {
&mut ctx,
&mut runtime,
&mut peers,
&mut gossip_peers,
&mut authorities,
&mut active_heads,
&req_sender,
@@ -1594,6 +1641,7 @@ impl StatementDistribution {
Message::Requester(result) => {
let result = self.handle_requester_message(
&mut ctx,
&gossip_peers,
&mut peers,
&mut active_heads,
&req_sender,
@@ -1663,6 +1711,7 @@ impl StatementDistribution {
async fn handle_requester_message(
&self,
ctx: &mut impl SubsystemContext,
gossip_peers: &HashSet<PeerId>,
peers: &mut HashMap<PeerId, PeerData>,
active_heads: &mut HashMap<Hash, ActiveHeadData>,
req_sender: &mpsc::Sender<RequesterMessage>,
@@ -1712,6 +1761,7 @@ impl StatementDistribution {
for message in messages {
handle_incoming_message_and_circulate(
peer,
gossip_peers,
peers,
active_heads,
ctx,
@@ -1785,6 +1835,7 @@ impl StatementDistribution {
ctx: &mut impl SubsystemContext,
runtime: &mut RuntimeInfo,
peers: &mut HashMap<PeerId, PeerData>,
gossip_peers: &mut HashSet<PeerId>,
authorities: &mut HashMap<AuthorityDiscoveryId, PeerId>,
active_heads: &mut HashMap<Hash, ActiveHeadData>,
req_sender: &mpsc::Sender<RequesterMessage>,
@@ -1873,6 +1924,7 @@ impl StatementDistribution {
}
};
circulate_statement_and_dependents(
gossip_peers,
peers,
active_heads,
ctx,
@@ -1887,6 +1939,7 @@ impl StatementDistribution {
handle_network_update(
peers,
gossip_peers,
authorities,
active_heads,
ctx,
@@ -15,9 +15,10 @@
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use std::time::Duration;
use std::sync::Arc;
use std::iter::FromIterator as _;
use parity_scale_codec::{Decode, Encode};
use super::*;
use std::sync::Arc;
use sp_keyring::Sr25519Keyring;
use sp_application_crypto::{AppKey, sr25519::Pair, Pair as TraitPair};
use polkadot_node_primitives::Statement;
@@ -441,8 +442,10 @@ fn peer_view_update_sends_messages() {
let peer = PeerId::random();
executor::block_on(async move {
update_peer_view_and_send_unlocked(
let gossip_peers = HashSet::from_iter(vec![peer.clone()].into_iter());
update_peer_view_and_maybe_send_unlocked(
peer.clone(),
&gossip_peers,
&mut peer_data,
&mut ctx,
&active_heads,
@@ -562,7 +565,11 @@ fn circulated_statement_goes_to_all_peers_with_view() {
statement: &statement,
};
let gossip_peers = HashSet::from_iter(vec![
peer_a.clone(), peer_b.clone(), peer_c.clone(),
].into_iter());
let needs_dependents = circulate_statement(
&gossip_peers,
&mut peer_data,
&mut ctx,
hash_b,
+1
View File
@@ -9,6 +9,7 @@ description = "Subsystem traits and message definitions"
async-trait = "0.1.42"
futures = "0.3.15"
futures-timer = "3.0.2"
itertools = "0.10"
parity-scale-codec = { version = "2.0.0", default-features = false, features = ["derive"] }
parking_lot = { version = "0.11.1", optional = true }
pin-project = "1.0.7"
+19 -17
View File
@@ -224,30 +224,32 @@ pub fn find_validator_group(groups: &[Vec<ValidatorIndex>], index: ValidatorInde
})
}
/// Chooses a random subset of sqrt(v.len()), but at least `min` elements.
pub fn choose_random_sqrt_subset<T>(mut v: Vec<T>, min: usize) -> Vec<T> {
/// Choose a random subset of `min` elements.
/// But always include `is_priority` elements.
pub fn choose_random_subset<T, F: FnMut(&T) -> bool>(is_priority: F, mut v: Vec<T>, min: usize) -> Vec<T> {
use rand::seq::SliceRandom as _;
let mut rng = rand::thread_rng();
v.shuffle(&mut rng);
let len = max_of_min_and_sqrt_len(v.len(), min);
v.truncate(len);
// partition the elements into priority first
// the returned index is when non_priority elements start
let i = itertools::partition(&mut v, is_priority);
if i >= min || v.len() <= i {
v.truncate(i);
return v;
}
let mut rng = rand::thread_rng();
v[i..].shuffle(&mut rng);
v.truncate(min);
v
}
/// Returns bool with a probability of `max(len.sqrt(), min) / len`
/// being true.
pub fn gen_ratio_sqrt_subset(len: usize, min: usize) -> bool {
/// Returns a bool with a probability of `a / b` of being true.
pub fn gen_ratio(a: usize, b: usize) -> bool {
use rand::Rng as _;
let mut rng = rand::thread_rng();
let threshold = max_of_min_and_sqrt_len(len, min);
let n = rng.gen_range(0..len);
n < threshold
}
fn max_of_min_and_sqrt_len(len: usize, min: usize) -> usize {
let len_sqrt = (len as f64).sqrt() as usize;
std::cmp::max(min, len_sqrt)
rng.gen_ratio(a as u32, b as u32)
}
/// Local validator information
+9 -1
View File
@@ -22,7 +22,7 @@
//!
//! Subsystems' APIs are defined separately from their implementation, leading to easier mocking.
use std::{collections::btree_map::BTreeMap, sync::Arc};
use std::{collections::{BTreeMap, HashSet}, sync::Arc};
use futures::channel::{mpsc, oneshot};
use thiserror::Error;
@@ -304,6 +304,13 @@ pub enum NetworkBridgeMessage {
/// authority discovery has failed to resolve.
failed: oneshot::Sender<usize>,
},
/// Inform the distribution subsystems about the new
/// gossip network topology formed.
NewGossipTopology {
/// Ids of our neighbors in the new gossip topology.
/// We're not necessarily connected to all of them, but we should.
our_neighbors: HashSet<AuthorityDiscoveryId>,
}
}
impl NetworkBridgeMessage {
@@ -318,6 +325,7 @@ impl NetworkBridgeMessage {
Self::SendCollationMessages(_) => None,
Self::ConnectToValidators { .. } => None,
Self::SendRequests { .. } => None,
Self::NewGossipTopology { .. } => None,
}
}
}
@@ -15,6 +15,7 @@
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use std::convert::TryFrom;
use std::collections::HashSet;
pub use sc_network::{ReputationChange, PeerId};
@@ -30,6 +31,15 @@ pub enum NetworkBridgeEvent<M> {
/// A peer has disconnected.
PeerDisconnected(PeerId),
/// Our neighbors in the new gossip topology.
/// We're not necessarily connected to all of them.
///
/// This message is issued only on the validation peer set.
///
/// Note, that the distribution subsystems need to handle the last
/// view update of the newly added gossip peers manually.
NewGossipTopology(HashSet<PeerId>),
/// Peer has sent a message.
PeerMessage(PeerId, M),
@@ -64,6 +74,8 @@ impl<M> NetworkBridgeEvent<M> {
=> NetworkBridgeEvent::PeerConnected(peer.clone(), role.clone(), authority_id.clone()),
NetworkBridgeEvent::PeerDisconnected(ref peer)
=> NetworkBridgeEvent::PeerDisconnected(peer.clone()),
NetworkBridgeEvent::NewGossipTopology(ref peers)
=> NetworkBridgeEvent::NewGossipTopology(peers.clone()),
NetworkBridgeEvent::PeerMessage(ref peer, ref msg)
=> NetworkBridgeEvent::PeerMessage(peer.clone(), <&'a T>::try_from(msg)?.clone()),
NetworkBridgeEvent::PeerViewChange(ref peer, ref view)
@@ -1,15 +1,19 @@
# Gossip Support
The Gossip Support Subsystem is responsible for keeping track of session changes
and issuing a connection request to all validators in the next, current and a few past sessions
if we are a validator in these sessions.
The request will add all validators to a reserved PeerSet, meaning we will not reject a connection request
from any validator in that set.
and issuing a connection request to all validators in the next, current and
a few past sessions if we are a validator in these sessions.
The request will add all validators to a reserved PeerSet, meaning we will not
reject a connection request from any validator in that set.
Gossiping subsystems will be notified when a new peer connects or disconnects by network bridge.
It is their responsibility to limit the amount of outgoing gossip messages.
At the moment we enforce a cap of `max(sqrt(peers.len()), 25)` message recipients at a time in each gossiping subsystem.
In addition to that, it creates a gossip overlay topology per session which
limits the amount of messages sent and received to be an order of sqrt of the
validators. Our neighbors in this graph will be forwarded to the network bridge
with the `NetworkBridgeMessage::NewGossipTopology` message.
We also flip a coin with the same probability when handling peer view updates in the distribution subsystems.
Over time the probability of not handling a peer view update converges to zero, so it shouldn't be cause much trouble.
This should be considered as a temporary measure until we implement a more robust solution for gossiping.
See https://github.com/paritytech/polkadot/issues/3239 for more details.
The gossip topology is used by parachain distribution subsystems,
such as Bitfield Distrubution, (small) Statement Distributuion and
Approval Distibution to limit the amount of peers we send messages to
and handle view updates.
@@ -21,11 +21,10 @@ Input: [`NetworkBridgeMessage`][NBM]
Output:
- [`AvailabilityDistributionMessage`][AvD]`::NetworkBridgeUpdateV1`
- [`ApprovalDistributionMessage`][AppD]`::NetworkBridgeUpdateV1`
- [`BitfieldDistributionMessage`][BitD]`::NetworkBridgeUpdateV1`
- [`PoVDistributionMessage`][PoVD]`::NetworkBridgeUpdateV1`
- [`StatementDistributionMessage`][StmtD]`::NetworkBridgeUpdateV1`
- [`CollatorProtocolMessage`][CollP]`::NetworkBridgeUpdateV1`
- [`StatementDistributionMessage`][StmtD]`::NetworkBridgeUpdateV1`
## Functionality
@@ -59,7 +58,7 @@ Each network event is associated with a particular peer-set.
The `activated` and `deactivated` lists determine the evolution of our local view over time. A `ProtocolMessage::ViewUpdate` is issued to each connected peer on each peer-set, and a `NetworkBridgeEvent::OurViewChange` is issued to each event handler for each protocol.
We only send view updates if the node has indicated that it has finished major blockchain synchronization.
We only send view updates if the node has indicated that it has finished major blockchain synchronization.
If we are connected to the same peer on both peer-sets, we will send the peer two view updates as a result.
@@ -107,25 +106,28 @@ Map the message onto the corresponding [Event Handler](#event-handlers) based on
- Send all `(ValidatorId, PeerId)` pairs on the response channel.
- Feed all Peer IDs to peer set manager the underlying network provides.
### NewGossipTopology
- Map all `AuthorityDiscoveryId`s to `PeerId`s and issue a corresponding `NetworkBridgeUpdateV1`
to all validation subsystems.
## Event Handlers
Network bridge event handlers are the intended recipients of particular network protocol messages. These are each a variant of a message to be sent via the overseer.
### Validation V1
* `StatementDistributionV1Message -> StatementDistributionMessage::NetworkBridgeUpdateV1`
* `PoVDistributionV1Message -> PoVDistributionMessage::NetworkBridgeUpdateV1`
* `AvailabilityDistributionV1Message -> AvailabilityDistributionMessage::NetworkBridgeUpdateV1`
* `ApprovalDistributionV1Message -> ApprovalDistributionMessage::NetworkBridgeUpdateV1`
* `BitfieldDistributionV1Message -> BitfieldDistributionMessage::NetworkBridgeUpdateV1`
* `StatementDistributionV1Message -> StatementDistributionMessage::NetworkBridgeUpdateV1`
### Collation V1
* `CollatorProtocolV1Message -> CollatorProtocolMessage::NetworkBridgeUpdateV1`
[NBM]: ../../types/overseer-protocol.md#network-bridge-message
[AvD]: ../../types/overseer-protocol.md#availability-distribution-message
[AppD]: ../../types/overseer-protocol.md#approval-distribution-message
[BitD]: ../../types/overseer-protocol.md#bitfield-distribution-message
[PoVD]: ../../types/overseer-protocol.md#pov-distribution-message
[StmtD]: ../../types/overseer-protocol.md#statement-distribution-message
[CollP]: ../../types/overseer-protocol.md#collator-protocol-message
@@ -147,6 +147,14 @@ enum NetworkBridgeEvent<M> {
PeerConnected(PeerId, ObservedRole),
/// A peer with given ID is now disconnected.
PeerDisconnected(PeerId),
/// Our neighbors in the new gossip topology.
/// We're not necessarily connected to all of them.
///
/// This message is issued only on the validation peer set.
///
/// Note, that the distribution subsystems need to handle the last
/// view update of the newly added gossip peers manually.
NewGossipTopology(HashSet<PeerId>),
/// We received a message from the given peer.
PeerMessage(PeerId, M),
/// The given peer has updated its description of its view.
@@ -45,7 +45,35 @@ struct ActiveLeavesUpdate {
}
```
## Approval Voting
## All Messages
A message type tying together all message types that are used across Subsystems.
```rust
enum AllMessages {
CandidateValidation(CandidateValidationMessage),
CandidateBacking(CandidateBackingMessage),
ChainApi(ChainApiMessage),
CollatorProtocol(CollatorProtocolMessage),
StatementDistribution(StatementDistributionMessage),
AvailabilityDistribution(AvailabilityDistributionMessage),
AvailabilityRecovery(AvailabilityRecoveryMessage),
BitfieldDistribution(BitfieldDistributionMessage),
BitfieldSigning(BitfieldSigningMessage),
Provisioner(ProvisionerMessage),
RuntimeApi(RuntimeApiMessage),
AvailabilityStore(AvailabilityStoreMessage),
NetworkBridge(NetworkBridgeMessage),
CollationGeneration(CollationGenerationMessage),
ApprovalVoting(ApprovalVotingMessage),
ApprovalDistribution(ApprovalDistributionMessage),
GossipSupport(GossipSupportMessage),
DisputeCoordinator(DisputeCoordinatorMessage),
DisputeParticipation(DisputeParticipationMessage),
}
```
## Approval Voting Message
Messages received by the approval voting subsystem.
@@ -127,9 +155,9 @@ enum ApprovalVotingMessage {
}
```
## Approval Distribution
## Approval Distribution Message
Messages received by the approval Distribution subsystem.
Messages received by the approval distribution subsystem.
```rust
/// Metadata about a block which is now live in the approval protocol.
@@ -166,10 +194,6 @@ enum ApprovalDistributionMessage {
}
```
## All Messages
> TODO (now)
## Availability Distribution Message
Messages received by the availability distribution subsystem.
@@ -334,7 +358,7 @@ enum ChainSelectionMessage {
/// Request the best leaf containing the given block in its ancestry. Return `None` if
/// there is no such leaf.
BestLeafContaining(Hash, ResponseChannel<Option<Hash>>),
}
```
@@ -495,6 +519,13 @@ enum NetworkBridgeMessage {
/// authority discovery has failed to resolve.
failed: oneshot::Sender<usize>,
},
/// Inform the distribution subsystems about the new
/// gossip network topology formed.
NewGossipTopology {
/// Ids of our neighbors in the new gossip topology.
/// We're not necessarily connected to all of them, but we should.
our_neighbors: HashSet<AuthorityDiscoveryId>,
}
}
```