diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index 959869e595..16f49c7082 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -5497,7 +5497,6 @@ dependencies = [ "polkadot-node-subsystem", "polkadot-node-subsystem-util", "polkadot-primitives", - "rand 0.8.3", "tracing", ] @@ -5877,6 +5876,7 @@ dependencies = [ "polkadot-node-subsystem", "polkadot-node-subsystem-test-helpers", "polkadot-primitives", + "rand 0.8.3", "sc-network", "sp-application-crypto", "sp-core", diff --git a/polkadot/node/network/approval-distribution/src/lib.rs b/polkadot/node/network/approval-distribution/src/lib.rs index 944fba0904..0b1e274465 100644 --- a/polkadot/node/network/approval-distribution/src/lib.rs +++ b/polkadot/node/network/approval-distribution/src/lib.rs @@ -39,10 +39,12 @@ use polkadot_node_subsystem::{ }, ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem, SubsystemContext, }; -use polkadot_node_subsystem_util::metrics::{self, prometheus}; +use polkadot_node_subsystem_util::{ + metrics::{self, prometheus}, + self as util, MIN_GOSSIP_PEERS, +}; use polkadot_node_network_protocol::{ PeerId, View, v1 as protocol_v1, UnifiedReputationChange as Rep, - }; const LOG_TARGET: &str = "parachain::approval-distribution"; @@ -653,6 +655,7 @@ impl State { .collect::>(); let assignments = vec![(assignment, claimed_candidate_index)]; + let peers = util::choose_random_sqrt_subset(peers, MIN_GOSSIP_PEERS); // Add the fingerprint of the assignment to the knowledge of each peer. for peer in peers.iter() { @@ -886,6 +889,7 @@ impl State { .cloned() .filter(|key| maybe_peer_id.as_ref().map_or(true, |id| id != key)) .collect::>(); + let peers = util::choose_random_sqrt_subset(peers, MIN_GOSSIP_PEERS); // Add the fingerprint of the assignment to the knowledge of each peer. for peer in peers.iter() { diff --git a/polkadot/node/network/bitfield-distribution/src/lib.rs b/polkadot/node/network/bitfield-distribution/src/lib.rs index 546a086a08..03409eb3c8 100644 --- a/polkadot/node/network/bitfield-distribution/src/lib.rs +++ b/polkadot/node/network/bitfield-distribution/src/lib.rs @@ -31,7 +31,10 @@ use polkadot_subsystem::{ SubsystemContext, SubsystemResult, jaeger, }; -use polkadot_node_subsystem_util::metrics::{self, prometheus}; +use polkadot_node_subsystem_util::{ + metrics::{self, prometheus}, + self as util, MIN_GOSSIP_PEERS, +}; use polkadot_primitives::v1::{Hash, SignedAvailabilityBitfield, SigningContext, ValidatorId}; use polkadot_node_network_protocol::{v1 as protocol_v1, PeerId, View, UnifiedReputationChange as Rep, OurView}; use std::collections::{HashMap, HashSet}; @@ -358,6 +361,7 @@ where } }) .collect::>(); + let interested_peers = util::choose_random_sqrt_subset(interested_peers, MIN_GOSSIP_PEERS); drop(_span); if interested_peers.is_empty() { diff --git a/polkadot/node/network/gossip-support/Cargo.toml b/polkadot/node/network/gossip-support/Cargo.toml index b8d66f05d7..56d239f121 100644 --- a/polkadot/node/network/gossip-support/Cargo.toml +++ b/polkadot/node/network/gossip-support/Cargo.toml @@ -12,4 +12,3 @@ polkadot-primitives = { path = "../../../primitives" } futures = "0.3.8" tracing = "0.1.25" -rand = "0.8.3" diff --git a/polkadot/node/network/gossip-support/src/lib.rs b/polkadot/node/network/gossip-support/src/lib.rs index 0d9fb55abc..2e3e7e9bc3 100644 --- a/polkadot/node/network/gossip-support/src/lib.rs +++ b/polkadot/node/network/gossip-support/src/lib.rs @@ -19,7 +19,6 @@ //! the gossiping subsystems on every new session. use futures::FutureExt as _; -use rand::seq::SliceRandom as _; use polkadot_node_subsystem::{ messages::{ GossipSupportMessage, @@ -103,17 +102,6 @@ async fn determine_relevant_validators( Ok(validators) } -// chooses a random subset of sqrt(v.len()), but at least 25 elements -fn choose_random_subset(mut v: Vec) -> Vec { - let mut rng = rand::thread_rng(); - v.shuffle(&mut rng); - - let sqrt = (v.len() as f64).sqrt() as usize; - let len = std::cmp::max(25, sqrt); - v.truncate(len); - v -} - impl State { /// 1. Determine if the current session index has changed. /// 2. If it has, determine relevant validators @@ -133,8 +121,7 @@ impl State { if let Some((new_session, relay_parent)) = maybe_new_session { tracing::debug!(target: LOG_TARGET, %new_session, "New session detected"); let validators = determine_relevant_validators(ctx, relay_parent, new_session).await?; - let validators = choose_random_subset(validators); - tracing::debug!(target: LOG_TARGET, targets = ?validators, "Issuing a connection request"); + tracing::debug!(target: LOG_TARGET, num = ?validators.len(), "Issuing a connection request"); let request = validator_discovery::connect_to_validators_in_session( ctx, diff --git a/polkadot/node/network/protocol/src/lib.rs b/polkadot/node/network/protocol/src/lib.rs index 211f2dcf37..d6918df09f 100644 --- a/polkadot/node/network/protocol/src/lib.rs +++ b/polkadot/node/network/protocol/src/lib.rs @@ -40,6 +40,9 @@ pub mod request_response; /// A version of the protocol. pub type ProtocolVersion = u32; +/// The minimum amount of peers to send gossip messages to. +pub const MIN_GOSSIP_PEERS: usize = 25; + /// An error indicating that this the over-arching message type had the wrong variant #[derive(Debug, Clone, Copy, PartialEq)] diff --git a/polkadot/node/network/protocol/src/peer_set.rs b/polkadot/node/network/protocol/src/peer_set.rs index 222869937b..ee85da54a5 100644 --- a/polkadot/node/network/protocol/src/peer_set.rs +++ b/polkadot/node/network/protocol/src/peer_set.rs @@ -57,7 +57,8 @@ impl PeerSet { notifications_protocol: protocol, max_notification_size, set_config: sc_network::config::SetConfig { - in_peers: 25, + // we want our gossip subset to always include reserved peers + in_peers: super::MIN_GOSSIP_PEERS as u32 / 2, out_peers: 0, reserved_nodes: Vec::new(), non_reserved_mode: sc_network::config::NonReservedPeerMode::Accept, diff --git a/polkadot/node/network/statement-distribution/src/lib.rs b/polkadot/node/network/statement-distribution/src/lib.rs index 22e82b33ca..4a4d62523a 100644 --- a/polkadot/node/network/statement-distribution/src/lib.rs +++ b/polkadot/node/network/statement-distribution/src/lib.rs @@ -31,7 +31,10 @@ use polkadot_subsystem::{ RuntimeApiMessage, RuntimeApiRequest, NetworkBridgeEvent, }, }; -use polkadot_node_subsystem_util::metrics::{self, prometheus}; +use polkadot_node_subsystem_util::{ + metrics::{self, prometheus}, + MIN_GOSSIP_PEERS, +}; use polkadot_node_primitives::{SignedFullStatement}; use polkadot_primitives::v1::{ Hash, CompactStatement, ValidatorIndex, ValidatorId, SigningContext, ValidatorSignature, CandidateHash, @@ -620,13 +623,12 @@ async fn circulate_statement( ) -> Vec { let fingerprint = stored.fingerprint(); - let mut peers_to_send = HashMap::new(); + let len_sqrt = (peers.len() as f64).sqrt() as usize; + let cap = std::cmp::max(MIN_GOSSIP_PEERS, len_sqrt); - for (peer, data) in peers.iter_mut() { - if let Some(new_known) = data.send(&relay_parent, &fingerprint) { - peers_to_send.insert(peer.clone(), new_known); - } - } + let peers_to_send: HashMap = peers.iter_mut().filter_map(|(peer, data)| { + data.send(&relay_parent, &fingerprint).map(|new| (peer.clone(), new)) + }).take(cap).collect(); // Send all these peers the initial statement. if !peers_to_send.is_empty() { diff --git a/polkadot/node/subsystem-util/Cargo.toml b/polkadot/node/subsystem-util/Cargo.toml index abd04d3cc4..15db5e9c1a 100644 --- a/polkadot/node/subsystem-util/Cargo.toml +++ b/polkadot/node/subsystem-util/Cargo.toml @@ -12,6 +12,7 @@ futures-timer = "3.0.2" parity-scale-codec = { version = "2.0.0", default-features = false, features = ["derive"] } parking_lot = { version = "0.11.1", optional = true } pin-project = "1.0.4" +rand = "0.8.3" streamunordered = "0.5.1" thiserror = "1.0.23" tracing = "0.1.25" diff --git a/polkadot/node/subsystem-util/src/lib.rs b/polkadot/node/subsystem-util/src/lib.rs index 0358cf7f9b..47c4406a95 100644 --- a/polkadot/node/subsystem-util/src/lib.rs +++ b/polkadot/node/subsystem-util/src/lib.rs @@ -52,6 +52,7 @@ use thiserror::Error; pub mod validator_discovery; pub use metered_channel as metered; +pub use polkadot_node_network_protocol::MIN_GOSSIP_PEERS; /// These reexports are required so that external crates can use the `delegated_subsystem` macro properly. pub mod reexports { @@ -267,6 +268,18 @@ pub async fn signing_key(validators: &[ValidatorId], keystore: SyncCryptoStorePt None } +/// Chooses a random subset of sqrt(v.len()), but at least `min` elements. +pub fn choose_random_sqrt_subset(mut v: Vec, min: usize) -> Vec { + use rand::seq::SliceRandom as _; + let mut rng = rand::thread_rng(); + v.shuffle(&mut rng); + + let len_sqrt = (v.len() as f64).sqrt() as usize; + let len = std::cmp::max(min, len_sqrt); + v.truncate(len); + v +} + /// Local validator information /// /// It can be created if the local node is a validator in the context of a particular