gossip: choose a random subset on send instead of limiting connections (#2776)

* gossip: choose random subset on send

* naming bikeshed
This commit is contained in:
Andronik Ordian
2021-03-30 20:59:53 +02:00
committed by GitHub
parent a3115401c3
commit 9ac35d9f2b
10 changed files with 41 additions and 27 deletions
+1 -1
View File
@@ -5497,7 +5497,6 @@ dependencies = [
"polkadot-node-subsystem",
"polkadot-node-subsystem-util",
"polkadot-primitives",
"rand 0.8.3",
"tracing",
]
@@ -5877,6 +5876,7 @@ dependencies = [
"polkadot-node-subsystem",
"polkadot-node-subsystem-test-helpers",
"polkadot-primitives",
"rand 0.8.3",
"sc-network",
"sp-application-crypto",
"sp-core",
@@ -39,10 +39,12 @@ use polkadot_node_subsystem::{
},
ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem, SubsystemContext,
};
use polkadot_node_subsystem_util::metrics::{self, prometheus};
use polkadot_node_subsystem_util::{
metrics::{self, prometheus},
self as util, MIN_GOSSIP_PEERS,
};
use polkadot_node_network_protocol::{
PeerId, View, v1 as protocol_v1, UnifiedReputationChange as Rep,
};
const LOG_TARGET: &str = "parachain::approval-distribution";
@@ -653,6 +655,7 @@ impl State {
.collect::<Vec<_>>();
let assignments = vec![(assignment, claimed_candidate_index)];
let peers = util::choose_random_sqrt_subset(peers, MIN_GOSSIP_PEERS);
// Add the fingerprint of the assignment to the knowledge of each peer.
for peer in peers.iter() {
@@ -886,6 +889,7 @@ impl State {
.cloned()
.filter(|key| maybe_peer_id.as_ref().map_or(true, |id| id != key))
.collect::<Vec<_>>();
let peers = util::choose_random_sqrt_subset(peers, MIN_GOSSIP_PEERS);
// Add the fingerprint of the assignment to the knowledge of each peer.
for peer in peers.iter() {
@@ -31,7 +31,10 @@ use polkadot_subsystem::{
SubsystemContext, SubsystemResult,
jaeger,
};
use polkadot_node_subsystem_util::metrics::{self, prometheus};
use polkadot_node_subsystem_util::{
metrics::{self, prometheus},
self as util, MIN_GOSSIP_PEERS,
};
use polkadot_primitives::v1::{Hash, SignedAvailabilityBitfield, SigningContext, ValidatorId};
use polkadot_node_network_protocol::{v1 as protocol_v1, PeerId, View, UnifiedReputationChange as Rep, OurView};
use std::collections::{HashMap, HashSet};
@@ -358,6 +361,7 @@ where
}
})
.collect::<Vec<PeerId>>();
let interested_peers = util::choose_random_sqrt_subset(interested_peers, MIN_GOSSIP_PEERS);
drop(_span);
if interested_peers.is_empty() {
@@ -12,4 +12,3 @@ polkadot-primitives = { path = "../../../primitives" }
futures = "0.3.8"
tracing = "0.1.25"
rand = "0.8.3"
@@ -19,7 +19,6 @@
//! the gossiping subsystems on every new session.
use futures::FutureExt as _;
use rand::seq::SliceRandom as _;
use polkadot_node_subsystem::{
messages::{
GossipSupportMessage,
@@ -103,17 +102,6 @@ async fn determine_relevant_validators(
Ok(validators)
}
// chooses a random subset of sqrt(v.len()), but at least 25 elements
fn choose_random_subset<T>(mut v: Vec<T>) -> Vec<T> {
let mut rng = rand::thread_rng();
v.shuffle(&mut rng);
let sqrt = (v.len() as f64).sqrt() as usize;
let len = std::cmp::max(25, sqrt);
v.truncate(len);
v
}
impl State {
/// 1. Determine if the current session index has changed.
/// 2. If it has, determine relevant validators
@@ -133,8 +121,7 @@ impl State {
if let Some((new_session, relay_parent)) = maybe_new_session {
tracing::debug!(target: LOG_TARGET, %new_session, "New session detected");
let validators = determine_relevant_validators(ctx, relay_parent, new_session).await?;
let validators = choose_random_subset(validators);
tracing::debug!(target: LOG_TARGET, targets = ?validators, "Issuing a connection request");
tracing::debug!(target: LOG_TARGET, num = ?validators.len(), "Issuing a connection request");
let request = validator_discovery::connect_to_validators_in_session(
ctx,
@@ -40,6 +40,9 @@ pub mod request_response;
/// A version of the protocol.
pub type ProtocolVersion = u32;
/// The minimum amount of peers to send gossip messages to.
pub const MIN_GOSSIP_PEERS: usize = 25;
/// An error indicating that this the over-arching message type had the wrong variant
#[derive(Debug, Clone, Copy, PartialEq)]
@@ -57,7 +57,8 @@ impl PeerSet {
notifications_protocol: protocol,
max_notification_size,
set_config: sc_network::config::SetConfig {
in_peers: 25,
// we want our gossip subset to always include reserved peers
in_peers: super::MIN_GOSSIP_PEERS as u32 / 2,
out_peers: 0,
reserved_nodes: Vec::new(),
non_reserved_mode: sc_network::config::NonReservedPeerMode::Accept,
@@ -31,7 +31,10 @@ use polkadot_subsystem::{
RuntimeApiMessage, RuntimeApiRequest, NetworkBridgeEvent,
},
};
use polkadot_node_subsystem_util::metrics::{self, prometheus};
use polkadot_node_subsystem_util::{
metrics::{self, prometheus},
MIN_GOSSIP_PEERS,
};
use polkadot_node_primitives::{SignedFullStatement};
use polkadot_primitives::v1::{
Hash, CompactStatement, ValidatorIndex, ValidatorId, SigningContext, ValidatorSignature, CandidateHash,
@@ -620,13 +623,12 @@ async fn circulate_statement(
) -> Vec<PeerId> {
let fingerprint = stored.fingerprint();
let mut peers_to_send = HashMap::new();
let len_sqrt = (peers.len() as f64).sqrt() as usize;
let cap = std::cmp::max(MIN_GOSSIP_PEERS, len_sqrt);
for (peer, data) in peers.iter_mut() {
if let Some(new_known) = data.send(&relay_parent, &fingerprint) {
peers_to_send.insert(peer.clone(), new_known);
}
}
let peers_to_send: HashMap<PeerId, bool> = peers.iter_mut().filter_map(|(peer, data)| {
data.send(&relay_parent, &fingerprint).map(|new| (peer.clone(), new))
}).take(cap).collect();
// Send all these peers the initial statement.
if !peers_to_send.is_empty() {
+1
View File
@@ -12,6 +12,7 @@ futures-timer = "3.0.2"
parity-scale-codec = { version = "2.0.0", default-features = false, features = ["derive"] }
parking_lot = { version = "0.11.1", optional = true }
pin-project = "1.0.4"
rand = "0.8.3"
streamunordered = "0.5.1"
thiserror = "1.0.23"
tracing = "0.1.25"
+13
View File
@@ -52,6 +52,7 @@ use thiserror::Error;
pub mod validator_discovery;
pub use metered_channel as metered;
pub use polkadot_node_network_protocol::MIN_GOSSIP_PEERS;
/// These reexports are required so that external crates can use the `delegated_subsystem` macro properly.
pub mod reexports {
@@ -267,6 +268,18 @@ pub async fn signing_key(validators: &[ValidatorId], keystore: SyncCryptoStorePt
None
}
/// Chooses a random subset of sqrt(v.len()), but at least `min` elements.
pub fn choose_random_sqrt_subset<T>(mut v: Vec<T>, min: usize) -> Vec<T> {
use rand::seq::SliceRandom as _;
let mut rng = rand::thread_rng();
v.shuffle(&mut rng);
let len_sqrt = (v.len() as f64).sqrt() as usize;
let len = std::cmp::max(min, len_sqrt);
v.truncate(len);
v
}
/// Local validator information
///
/// It can be created if the local node is a validator in the context of a particular