replace lru with schnellru (#14539)

This commit is contained in:
Qinxuan Chen
2023-07-10 05:24:23 +08:00
committed by GitHub
parent 6dd625d568
commit 6d2c1ed719
11 changed files with 58 additions and 71 deletions
@@ -20,12 +20,13 @@ use crate::{MessageIntent, Network, ValidationResult, Validator, ValidatorContex
use ahash::AHashSet;
use libp2p::PeerId;
use lru::LruCache;
use schnellru::{ByLength, LruMap};
use prometheus_endpoint::{register, Counter, PrometheusError, Registry, U64};
use sc_network::types::ProtocolName;
use sc_network_common::role::ObservedRole;
use sp_runtime::traits::{Block as BlockT, Hash, HashFor};
use std::{collections::HashMap, iter, num::NonZeroUsize, sync::Arc, time, time::Instant};
use std::{collections::HashMap, iter, sync::Arc, time, time::Instant};
// FIXME: Add additional spam/DoS attack protection: https://github.com/paritytech/substrate/issues/1115
// NOTE: The current value is adjusted based on largest production network deployment (Kusama) and
@@ -36,7 +37,7 @@ use std::{collections::HashMap, iter, num::NonZeroUsize, sync::Arc, time, time::
//
// Assuming that each known message is tracked with a 32 byte hash (common for `Block::Hash`), then
// this cache should take about 256 KB of memory.
const KNOWN_MESSAGES_CACHE_SIZE: usize = 8192;
const KNOWN_MESSAGES_CACHE_SIZE: u32 = 8192;
const REBROADCAST_INTERVAL: time::Duration = time::Duration::from_millis(750);
@@ -155,7 +156,7 @@ where
pub struct ConsensusGossip<B: BlockT> {
peers: HashMap<PeerId, PeerConsensus<B::Hash>>,
messages: Vec<MessageEntry<B>>,
known_messages: LruCache<B::Hash, ()>,
known_messages: LruMap<B::Hash, ()>,
protocol: ProtocolName,
validator: Arc<dyn Validator<B>>,
next_broadcast: Instant,
@@ -181,11 +182,7 @@ impl<B: BlockT> ConsensusGossip<B> {
ConsensusGossip {
peers: HashMap::new(),
messages: Default::default(),
known_messages: {
let cap = NonZeroUsize::new(KNOWN_MESSAGES_CACHE_SIZE)
.expect("cache capacity is not zero");
LruCache::new(cap)
},
known_messages: { LruMap::new(ByLength::new(KNOWN_MESSAGES_CACHE_SIZE)) },
protocol,
validator,
next_broadcast: Instant::now() + REBROADCAST_INTERVAL,
@@ -216,7 +213,7 @@ impl<B: BlockT> ConsensusGossip<B> {
message: Vec<u8>,
sender: Option<PeerId>,
) {
if self.known_messages.put(message_hash, ()).is_none() {
if self.known_messages.insert(message_hash, ()) {
self.messages.push(MessageEntry { message_hash, topic, message, sender });
if let Some(ref metrics) = self.metrics {
@@ -313,7 +310,7 @@ impl<B: BlockT> ConsensusGossip<B> {
);
for (_, ref mut peer) in self.peers.iter_mut() {
peer.known_messages.retain(|h| known_messages.contains(h));
peer.known_messages.retain(|h| known_messages.get(h).is_some());
}
}
@@ -348,7 +345,7 @@ impl<B: BlockT> ConsensusGossip<B> {
for message in messages {
let message_hash = HashFor::<B>::hash(&message[..]);
if self.known_messages.contains(&message_hash) {
if self.known_messages.get(&message_hash).is_some() {
tracing::trace!(
target: "gossip",
%who,
@@ -545,7 +542,7 @@ mod tests {
macro_rules! push_msg {
($consensus:expr, $topic:expr, $hash: expr, $m:expr) => {
if $consensus.known_messages.put($hash, ()).is_none() {
if $consensus.known_messages.insert($hash, ()) {
$consensus.messages.push(MessageEntry {
message_hash: $hash,
topic: $topic,
@@ -720,8 +717,8 @@ mod tests {
push_msg!(consensus, prev_hash, m1_hash, m1);
push_msg!(consensus, best_hash, m2_hash, m2);
consensus.known_messages.put(m1_hash, ());
consensus.known_messages.put(m2_hash, ());
consensus.known_messages.insert(m1_hash, ());
consensus.known_messages.insert(m2_hash, ());
consensus.collect_garbage();
assert_eq!(consensus.messages.len(), 2);
@@ -734,7 +731,7 @@ mod tests {
assert_eq!(consensus.messages.len(), 1);
// known messages are only pruned based on size.
assert_eq!(consensus.known_messages.len(), 2);
assert!(consensus.known_messages.contains(&m2_hash));
assert!(consensus.known_messages.get(&m2_hash).is_some());
}
#[test]