Reduce CPU overhead of gossip (#10859)

This commit is contained in:
Koute
2022-02-16 13:14:00 +09:00
committed by GitHub
parent 9a78839696
commit ee6223327c
5 changed files with 28 additions and 29 deletions
+4 -2
View File
@@ -64,9 +64,9 @@ dependencies = [
[[package]] [[package]]
name = "ahash" name = "ahash"
version = "0.7.4" version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43bb833f0bf979d8475d38fbf09ed3b8a55e1885fe93ad3f93239fc6a4f17b98" checksum = "fcb51a0695d8f838b1ee009b3fbf66bda078cd64590202a864a8f3e8c4315c47"
dependencies = [ dependencies = [
"getrandom 0.2.3", "getrandom 0.2.3",
"once_cell", "once_cell",
@@ -8333,6 +8333,7 @@ dependencies = [
name = "sc-finality-grandpa" name = "sc-finality-grandpa"
version = "0.10.0-dev" version = "0.10.0-dev"
dependencies = [ dependencies = [
"ahash",
"assert_matches", "assert_matches",
"async-trait", "async-trait",
"dyn-clone", "dyn-clone",
@@ -8491,6 +8492,7 @@ dependencies = [
name = "sc-network-gossip" name = "sc-network-gossip"
version = "0.10.0-dev" version = "0.10.0-dev"
dependencies = [ dependencies = [
"ahash",
"async-std", "async-std",
"futures 0.3.16", "futures 0.3.16",
"futures-timer", "futures-timer",
@@ -22,6 +22,7 @@ futures-timer = "3.0.1"
log = "0.4.8" log = "0.4.8"
parking_lot = "0.11.2" parking_lot = "0.11.2"
rand = "0.8.4" rand = "0.8.4"
ahash = "0.7.6"
parity-scale-codec = { version = "2.3.1", features = ["derive"] } parity-scale-codec = { version = "2.3.1", features = ["derive"] }
sp-application-crypto = { version = "5.0.0", path = "../../primitives/application-crypto" } sp-application-crypto = { version = "5.0.0", path = "../../primitives/application-crypto" }
sp-arithmetic = { version = "4.0.0", path = "../../primitives/arithmetic" } sp-arithmetic = { version = "4.0.0", path = "../../primitives/arithmetic" }
@@ -84,23 +84,23 @@
//! //!
//! We only send polite messages to peers, //! We only send polite messages to peers,
use parity_scale_codec::{Decode, Encode}; use ahash::{AHashMap, AHashSet};
use sc_network::{ObservedRole, PeerId, ReputationChange};
use sc_network_gossip::{MessageIntent, ValidatorContext};
use sp_finality_grandpa::AuthorityId;
use sp_runtime::traits::{Block as BlockT, NumberFor, Zero};
use log::{debug, trace}; use log::{debug, trace};
use parity_scale_codec::{Decode, Encode};
use prometheus_endpoint::{register, CounterVec, Opts, PrometheusError, Registry, U64}; use prometheus_endpoint::{register, CounterVec, Opts, PrometheusError, Registry, U64};
use rand::seq::SliceRandom; use rand::seq::SliceRandom;
use sc_network::{ObservedRole, PeerId, ReputationChange};
use sc_network_gossip::{MessageIntent, ValidatorContext};
use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG}; use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use sp_finality_grandpa::AuthorityId;
use sp_runtime::traits::{Block as BlockT, NumberFor, Zero};
use super::{benefit, cost, Round, SetId}; use super::{benefit, cost, Round, SetId};
use crate::{environment, CatchUp, CompactCommit, SignedMessage}; use crate::{environment, CatchUp, CompactCommit, SignedMessage};
use std::{ use std::{
collections::{HashMap, HashSet, VecDeque}, collections::{HashSet, VecDeque},
time::{Duration, Instant}, time::{Duration, Instant},
}; };
@@ -260,7 +260,7 @@ const KEEP_RECENT_ROUNDS: usize = 3;
struct KeepTopics<B: BlockT> { struct KeepTopics<B: BlockT> {
current_set: SetId, current_set: SetId,
rounds: VecDeque<(Round, SetId)>, rounds: VecDeque<(Round, SetId)>,
reverse_map: HashMap<B::Hash, (Option<Round>, SetId)>, reverse_map: AHashMap<B::Hash, (Option<Round>, SetId)>,
} }
impl<B: BlockT> KeepTopics<B> { impl<B: BlockT> KeepTopics<B> {
@@ -268,7 +268,7 @@ impl<B: BlockT> KeepTopics<B> {
KeepTopics { KeepTopics {
current_set: SetId(0), current_set: SetId(0),
rounds: VecDeque::with_capacity(KEEP_RECENT_ROUNDS + 2), rounds: VecDeque::with_capacity(KEEP_RECENT_ROUNDS + 2),
reverse_map: HashMap::new(), reverse_map: Default::default(),
} }
} }
@@ -290,7 +290,7 @@ impl<B: BlockT> KeepTopics<B> {
let _ = self.rounds.pop_front(); let _ = self.rounds.pop_front();
} }
let mut map = HashMap::with_capacity(KEEP_RECENT_ROUNDS + 3); let mut map = AHashMap::with_capacity(KEEP_RECENT_ROUNDS + 3);
map.insert(super::global_topic::<B>(self.current_set.0), (None, self.current_set)); map.insert(super::global_topic::<B>(self.current_set.0), (None, self.current_set));
for &(round, set) in &self.rounds { for &(round, set) in &self.rounds {
@@ -477,10 +477,10 @@ impl<N> PeerInfo<N> {
/// The peers we're connected to in gossip. /// The peers we're connected to in gossip.
struct Peers<N> { struct Peers<N> {
inner: HashMap<PeerId, PeerInfo<N>>, inner: AHashMap<PeerId, PeerInfo<N>>,
/// The randomly picked set of `LUCKY_PEERS` we'll gossip to in the first stage of round /// The randomly picked set of `LUCKY_PEERS` we'll gossip to in the first stage of round
/// gossiping. /// gossiping.
first_stage_peers: HashSet<PeerId>, first_stage_peers: AHashSet<PeerId>,
/// The randomly picked set of peers we'll gossip to in the second stage of gossiping if the /// The randomly picked set of peers we'll gossip to in the second stage of gossiping if the
/// first stage didn't allow us to spread the voting data enough to conclude the round. This /// first stage didn't allow us to spread the voting data enough to conclude the round. This
/// set should have size `sqrt(connected_peers)`. /// set should have size `sqrt(connected_peers)`.
@@ -492,10 +492,10 @@ struct Peers<N> {
impl<N> Default for Peers<N> { impl<N> Default for Peers<N> {
fn default() -> Self { fn default() -> Self {
Peers { Peers {
inner: HashMap::new(), inner: Default::default(),
first_stage_peers: HashSet::new(), first_stage_peers: Default::default(),
second_stage_peers: HashSet::new(), second_stage_peers: Default::default(),
lucky_light_peers: HashSet::new(), lucky_light_peers: Default::default(),
} }
} }
} }
@@ -608,7 +608,7 @@ impl<N: Ord> Peers<N> {
} }
}); });
let mut first_stage_peers = HashSet::new(); let mut first_stage_peers = AHashSet::new();
let mut second_stage_peers = HashSet::new(); let mut second_stage_peers = HashSet::new();
// we start by allocating authorities to the first stage set and when the minimum of // we start by allocating authorities to the first stage set and when the minimum of
@@ -20,6 +20,7 @@ futures-timer = "3.0.1"
libp2p = { version = "0.40.0", default-features = false } libp2p = { version = "0.40.0", default-features = false }
log = "0.4.8" log = "0.4.8"
lru = "0.7.0" lru = "0.7.0"
ahash = "0.7.6"
prometheus-endpoint = { package = "substrate-prometheus-endpoint", version = "0.10.0-dev", path = "../../utils/prometheus" } prometheus-endpoint = { package = "substrate-prometheus-endpoint", version = "0.10.0-dev", path = "../../utils/prometheus" }
sc-network = { version = "0.10.0-dev", path = "../network" } sc-network = { version = "0.10.0-dev", path = "../network" }
sp-runtime = { version = "5.0.0", path = "../../primitives/runtime" } sp-runtime = { version = "5.0.0", path = "../../primitives/runtime" }
@@ -18,19 +18,13 @@
use crate::{MessageIntent, Network, ValidationResult, Validator, ValidatorContext}; use crate::{MessageIntent, Network, ValidationResult, Validator, ValidatorContext};
use ahash::AHashSet;
use libp2p::PeerId; use libp2p::PeerId;
use lru::LruCache; use lru::LruCache;
use prometheus_endpoint::{register, Counter, PrometheusError, Registry, U64}; use prometheus_endpoint::{register, Counter, PrometheusError, Registry, U64};
use sc_network::ObservedRole; use sc_network::ObservedRole;
use sp_runtime::traits::{Block as BlockT, Hash, HashFor}; use sp_runtime::traits::{Block as BlockT, Hash, HashFor};
use std::{ use std::{borrow::Cow, collections::HashMap, iter, sync::Arc, time, time::Instant};
borrow::Cow,
collections::{HashMap, HashSet},
iter,
sync::Arc,
time,
time::Instant,
};
// FIXME: Add additional spam/DoS attack protection: https://github.com/paritytech/substrate/issues/1115 // FIXME: Add additional spam/DoS attack protection: https://github.com/paritytech/substrate/issues/1115
// NOTE: The current value is adjusted based on largest production network deployment (Kusama) and // NOTE: The current value is adjusted based on largest production network deployment (Kusama) and
@@ -56,7 +50,7 @@ mod rep {
} }
struct PeerConsensus<H> { struct PeerConsensus<H> {
known_messages: HashSet<H>, known_messages: AHashSet<H>,
} }
/// Topic stream message with sender. /// Topic stream message with sender.
@@ -204,7 +198,8 @@ impl<B: BlockT> ConsensusGossip<B> {
?role, ?role,
"Registering peer", "Registering peer",
); );
self.peers.insert(who.clone(), PeerConsensus { known_messages: HashSet::new() }); self.peers
.insert(who.clone(), PeerConsensus { known_messages: Default::default() });
let validator = self.validator.clone(); let validator = self.validator.clone();
let mut context = NetworkContext { gossip: self, network }; let mut context = NetworkContext { gossip: self, network };