Add reputation system to network crate (#2454)

This commit is contained in:
Pierre Krieger
2019-05-04 13:51:06 +02:00
committed by Gavin Wood
parent 18ec2d14d0
commit 8ca343ca8c
10 changed files with 166 additions and 168 deletions
+53 -35
View File
@@ -17,7 +17,7 @@
use crossbeam_channel::{self as channel, Receiver, Sender, select};
use futures::sync::mpsc;
use parking_lot::Mutex;
use network_libp2p::{PeerId, Severity};
use network_libp2p::PeerId;
use primitives::storage::StorageKey;
use runtime_primitives::{generic::BlockId, ConsensusEngineId};
use runtime_primitives::traits::{As, Block as BlockT, Header as HeaderT, NumberFor, Zero};
@@ -36,7 +36,7 @@ use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::{cmp, num::NonZeroUsize, thread, time};
use log::{trace, debug, warn};
use log::{trace, debug, warn, error};
use crate::chain::Client;
use client::light::fetcher::ChangesProof;
use crate::{error, util::LruHashSet};
@@ -61,6 +61,24 @@ const MAX_BLOCK_DATA_RESPONSE: u32 = 128;
/// and disconnect to free connection slot.
const LIGHT_MAXIMAL_BLOCKS_DIFFERENCE: u64 = 8192;
/// Reputation change when a peer is "clogged", meaning that it's not fast enough to process our
/// messages.
const CLOGGED_PEER_REPUTATION_CHANGE: i32 = -(1 << 12);
/// Reputation change when a peer doesn't respond in time to our messages.
const TIMEOUT_REPUTATION_CHANGE: i32 = -(1 << 10);
/// Reputation change when a peer sends us a status message while we already received one.
const UNEXPECTED_STATUS_REPUTATION_CHANGE: i32 = -(1 << 20);
/// Reputation change when we are a light client and a peer is behind us.
const PEER_BEHIND_US_LIGHT_REPUTATION_CHANGE: i32 = -(1 << 8);
/// Reputation change when a peer sends us an extrinsic that we didn't know about.
const NEW_EXTRINSIC_REPUTATION_CHANGE: i32 = 1 << 7;
/// Reputation change when a peer sends us a block. We don't know whether this block is valid or
/// already known to us. Since this has a small cost, we decrease the reputation of the node, and
/// will increase it back later if the import is successful.
const BLOCK_ANNOUNCE_REPUTATION_CHANGE: i32 = -(1 << 2);
/// We sent an RPC query to the given node, but it failed.
const RPC_FAILED_REPUTATION_CHANGE: i32 = -(1 << 12);
// Lock must always be taken in order declared here.
pub struct Protocol<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> {
status_sinks: Arc<Mutex<Vec<mpsc::UnboundedSender<ProtocolStatus<B>>>>>,
@@ -139,8 +157,12 @@ pub trait Context<B: BlockT> {
/// Get a reference to the client.
fn client(&self) -> &crate::chain::Client<B>;
/// Point out that a peer has been malign or irresponsible or appeared lazy.
fn report_peer(&mut self, who: PeerId, reason: Severity);
/// Adjusts the reputation of the peer. Use this to point out that a peer has been malign or
/// irresponsible or appeared lazy.
fn report_peer(&mut self, who: PeerId, reputation: i32);
/// Force disconnecting from a peer. Use this when a peer misbehaved.
fn disconnect_peer(&mut self, who: PeerId);
/// Get peer info.
fn peer_info(&self, peer: &PeerId) -> Option<PeerInfo<B>>;
@@ -168,8 +190,12 @@ impl<'a, B: BlockT + 'a, H: 'a + ExHashT> ProtocolContext<'a, B, H> {
}
impl<'a, B: BlockT + 'a, H: ExHashT + 'a> Context<B> for ProtocolContext<'a, B, H> {
fn report_peer(&mut self, who: PeerId, reason: Severity) {
self.network_chan.send(NetworkMsg::ReportPeer(who, reason))
fn report_peer(&mut self, who: PeerId, reputation: i32) {
self.network_chan.send(NetworkMsg::ReportPeer(who, reputation))
}
fn disconnect_peer(&mut self, who: PeerId) {
self.network_chan.send(NetworkMsg::DisconnectPeer(who))
}
fn peer_info(&self, who: &PeerId) -> Option<PeerInfo<B>> {
@@ -468,9 +494,9 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
if request.as_ref().map_or(false, |(_, r)| r.id == response.id) {
return request.map(|(_, r)| r)
}
trace!(target: "sync", "Unexpected response packet from {} ({})", who, response.id,);
let severity = Severity::Bad("Unexpected response packet received from peer".to_string());
self.network_chan.send(NetworkMsg::ReportPeer(who, severity))
trace!(target: "sync", "Unexpected response packet from {} ({})", who, response.id);
self.network_chan.send(NetworkMsg::ReportPeer(who.clone(), i32::min_value()));
self.network_chan.send(NetworkMsg::DisconnectPeer(who));
}
None
}
@@ -602,7 +628,9 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
/// Called as a back-pressure mechanism if the networking detects that the peer cannot process
/// our messaging rate fast enough.
pub fn on_clogged_peer(&self, who: PeerId, _msg: Option<Message<B>>) {
// We don't do anything but print some diagnostics for now.
self.network_chan.send(NetworkMsg::ReportPeer(who.clone(), CLOGGED_PEER_REPUTATION_CHANGE));
// Print some diagnostics.
if let Some(peer) = self.context_data.peers.get(&who) {
debug!(target: "sync", "Clogged peer {} (protocol_version: {:?}; roles: {:?}; \
known_extrinsics: {:?}; known_blocks: {:?}; best_hash: {:?}; best_number: {:?})",
@@ -739,9 +767,8 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
self.specialization.maintain_peers(&mut ProtocolContext::new(&mut self.context_data, &self.network_chan));
for p in aborting {
let _ = self
.network_chan
.send(NetworkMsg::ReportPeer(p, Severity::Timeout));
let _ = self.network_chan.send(NetworkMsg::DisconnectPeer(p.clone()));
let _ = self.network_chan.send(NetworkMsg::ReportPeer(p, TIMEOUT_REPUTATION_CHANGE));
}
}
@@ -751,31 +778,23 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
let protocol_version = {
if self.context_data.peers.contains_key(&who) {
debug!("Unexpected status packet from {}", who);
self.network_chan.send(NetworkMsg::ReportPeer(who, UNEXPECTED_STATUS_REPUTATION_CHANGE));
return;
}
if status.genesis_hash != self.genesis_hash {
let reason = format!(
"Peer is on different chain (our genesis: {} theirs: {})",
self.genesis_hash, status.genesis_hash
);
trace!(
target: "protocol",
"Peer is on different chain (our genesis: {} theirs: {})",
self.genesis_hash, status.genesis_hash
);
self.network_chan.send(NetworkMsg::ReportPeer(
who,
Severity::Bad(reason),
));
self.network_chan.send(NetworkMsg::ReportPeer(who.clone(), i32::min_value()));
self.network_chan.send(NetworkMsg::DisconnectPeer(who));
return;
}
if status.version < MIN_VERSION && CURRENT_VERSION < status.min_supported_version {
let reason = format!("Peer using unsupported protocol version {}", status.version);
trace!(target: "protocol", "Peer {:?} using unsupported protocol version {}", who, status.version);
self.network_chan.send(NetworkMsg::ReportPeer(
who,
Severity::Bad(reason),
));
self.network_chan.send(NetworkMsg::ReportPeer(who.clone(), i32::min_value()));
self.network_chan.send(NetworkMsg::DisconnectPeer(who));
return;
}
if self.config.roles & Roles::LIGHT == Roles::LIGHT {
@@ -791,13 +810,9 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
.checked_sub(status.best_number.as_())
.unwrap_or(0);
if blocks_difference > LIGHT_MAXIMAL_BLOCKS_DIFFERENCE {
self.network_chan.send(NetworkMsg::ReportPeer(
who,
Severity::Useless(
"Peer is far behind us and will unable to serve light requests"
.to_string(),
),
));
debug!(target: "sync", "Peer {} is far behind us and will unable to serve light requests", who);
self.network_chan.send(NetworkMsg::ReportPeer(who.clone(), PEER_BEHIND_US_LIGHT_REPUTATION_CHANGE));
self.network_chan.send(NetworkMsg::DisconnectPeer(who));
return;
}
}
@@ -818,7 +833,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
peer_info
},
None => {
debug!(target: "sync", "Received status from previously unconnected node {}", who);
error!(target: "sync", "Received status from previously unconnected node {}", who);
return;
},
};
@@ -859,6 +874,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
if let Some(ref mut peer) = self.context_data.peers.get_mut(&who) {
for t in extrinsics {
if let Some(hash) = self.transaction_pool.import(&t) {
self.network_chan.send(NetworkMsg::ReportPeer(who.clone(), NEW_EXTRINSIC_REPUTATION_CHANGE));
peer.known_extrinsics.insert(hash);
} else {
trace!(target: "sync", "Extrinsic rejected");
@@ -971,10 +987,11 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
.map(|s| s.on_block_announce(who.clone(), *header.number()));
self.sync.on_block_announce(
&mut ProtocolContext::new(&mut self.context_data, &self.network_chan),
who,
who.clone(),
hash,
&header,
);
self.network_chan.send(NetworkMsg::ReportPeer(who, BLOCK_ANNOUNCE_REPUTATION_CHANGE));
}
fn on_block_imported(&mut self, hash: B::Hash, header: &B::Header) {
@@ -1025,6 +1042,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
Err(error) => {
trace!(target: "sync", "Remote call request {} from {} ({} at {}) failed with: {}",
request.id, who, request.method, request.block, error);
self.network_chan.send(NetworkMsg::ReportPeer(who.clone(), RPC_FAILED_REPUTATION_CHANGE));
Default::default()
}
};