Traffic statistics (#4017)

* Network stats

* Fixed tests
This commit is contained in:
Arkadiy Paronyan
2019-11-05 12:15:59 +01:00
committed by Gavin Wood
parent 4ca6f8d1b2
commit b92674d88a
6 changed files with 283 additions and 171 deletions
+163 -54
View File
@@ -16,6 +16,7 @@
use crate::{DiscoveryNetBehaviour, config::ProtocolId};
use crate::legacy_proto::{LegacyProto, LegacyProtoOut};
use bytes::BytesMut;
use futures::prelude::*;
use futures03::{StreamExt as _, TryStreamExt as _};
use libp2p::{Multiaddr, PeerId};
@@ -28,6 +29,7 @@ use consensus::{
block_validation::BlockAnnounceValidator,
import_queue::{BlockImportResult, BlockImportError, IncomingBlock, Origin}
};
use codec::{Decode, Encode};
use sr_primitives::{generic::BlockId, ConsensusEngineId, Justification};
use sr_primitives::traits::{
Block as BlockT, Header as HeaderT, NumberFor, One, Zero,
@@ -44,6 +46,7 @@ use crate::config::{BoxFinalityProofRequestBuilder, Roles};
use rustc_hex::ToHex;
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use std::fmt::Write;
use std::{cmp, num::NonZeroUsize, time};
use log::{trace, debug, warn, error};
use crate::chain::{Client, FinalityProofProvider};
@@ -90,6 +93,8 @@ const PEER_BEHIND_US_LIGHT_REPUTATION_CHANGE: i32 = -(1 << 8);
const NEW_EXTRINSIC_REPUTATION_CHANGE: i32 = 1 << 7;
/// We sent an RPC query to the given node, but it failed.
const RPC_FAILED_REPUTATION_CHANGE: i32 = -(1 << 12);
/// We received a message that failed to decode.
const BAD_MESSAGE_REPUTATION_CHANGE: i32 = -(1 << 12);
// Lock must always be taken in order declared here.
pub struct Protocol<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> {
@@ -113,7 +118,15 @@ pub struct Protocol<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> {
/// When asked for a proof of finality, we use this struct to build one.
finality_proof_provider: Option<Arc<dyn FinalityProofProvider<B>>>,
/// Handles opening the unique substream and sending and receiving raw messages.
behaviour: LegacyProto<B, Substream<StreamMuxerBox>>,
behaviour: LegacyProto<Substream<StreamMuxerBox>>,
}
#[derive(Default)]
struct PacketStats {
bytes_in: u64,
bytes_out: u64,
count_in: u64,
count_out: u64,
}
/// A peer that we are connected to
@@ -151,12 +164,12 @@ pub struct PeerInfo<B: BlockT> {
pub best_number: <B::Header as HeaderT>::Number,
}
struct LightDispatchIn<'a, B: BlockT> {
behaviour: &'a mut LegacyProto<B, Substream<StreamMuxerBox>>,
struct LightDispatchIn<'a> {
behaviour: &'a mut LegacyProto<Substream<StreamMuxerBox>>,
peerset: peerset::PeersetHandle,
}
impl<'a, B: BlockT> LightDispatchNetwork<B> for LightDispatchIn<'a, B> {
impl<'a, B: BlockT> LightDispatchNetwork<B> for LightDispatchIn<'a> {
fn report_peer(&mut self, who: &PeerId, reputation: i32) {
self.peerset.report_peer(who.clone(), reputation)
}
@@ -166,12 +179,12 @@ impl<'a, B: BlockT> LightDispatchNetwork<B> for LightDispatchIn<'a, B> {
}
fn send_header_request(&mut self, who: &PeerId, id: RequestId, block: <<B as BlockT>::Header as HeaderT>::Number) {
let message = message::generic::Message::RemoteHeaderRequest(message::RemoteHeaderRequest {
let message: Message<B> = message::generic::Message::RemoteHeaderRequest(message::RemoteHeaderRequest {
id,
block,
});
self.behaviour.send_packet(who, message)
self.behaviour.send_packet(who, message.encode())
}
fn send_read_request(
@@ -181,13 +194,13 @@ impl<'a, B: BlockT> LightDispatchNetwork<B> for LightDispatchIn<'a, B> {
block: <B as BlockT>::Hash,
keys: Vec<Vec<u8>>,
) {
let message = message::generic::Message::RemoteReadRequest(message::RemoteReadRequest {
let message: Message<B> = message::generic::Message::RemoteReadRequest(message::RemoteReadRequest {
id,
block,
keys,
});
self.behaviour.send_packet(who, message)
self.behaviour.send_packet(who, message.encode())
}
fn send_read_child_request(
@@ -198,14 +211,14 @@ impl<'a, B: BlockT> LightDispatchNetwork<B> for LightDispatchIn<'a, B> {
storage_key: Vec<u8>,
keys: Vec<Vec<u8>>,
) {
let message = message::generic::Message::RemoteReadChildRequest(message::RemoteReadChildRequest {
let message: Message<B> = message::generic::Message::RemoteReadChildRequest(message::RemoteReadChildRequest {
id,
block,
storage_key,
keys,
});
self.behaviour.send_packet(who, message)
self.behaviour.send_packet(who, message.encode())
}
fn send_call_request(
@@ -216,14 +229,14 @@ impl<'a, B: BlockT> LightDispatchNetwork<B> for LightDispatchIn<'a, B> {
method: String,
data: Vec<u8>
) {
let message = message::generic::Message::RemoteCallRequest(message::RemoteCallRequest {
let message: Message<B> = message::generic::Message::RemoteCallRequest(message::RemoteCallRequest {
id,
block,
method,
data,
});
self.behaviour.send_packet(who, message)
self.behaviour.send_packet(who, message.encode())
}
fn send_changes_request(
@@ -237,7 +250,7 @@ impl<'a, B: BlockT> LightDispatchNetwork<B> for LightDispatchIn<'a, B> {
storage_key: Option<Vec<u8>>,
key: Vec<u8>,
) {
let message = message::generic::Message::RemoteChangesRequest(message::RemoteChangesRequest {
let message: Message<B> = message::generic::Message::RemoteChangesRequest(message::RemoteChangesRequest {
id,
first,
last,
@@ -247,7 +260,7 @@ impl<'a, B: BlockT> LightDispatchNetwork<B> for LightDispatchIn<'a, B> {
key,
});
self.behaviour.send_packet(who, message)
self.behaviour.send_packet(who, message.encode())
}
fn send_body_request(
@@ -260,7 +273,7 @@ impl<'a, B: BlockT> LightDispatchNetwork<B> for LightDispatchIn<'a, B> {
direction: Direction,
max: Option<u32>
) {
let message = message::generic::Message::BlockRequest(message::BlockRequest::<B> {
let message: Message<B> = message::generic::Message::BlockRequest(message::BlockRequest::<B> {
id,
fields,
from,
@@ -269,7 +282,7 @@ impl<'a, B: BlockT> LightDispatchNetwork<B> for LightDispatchIn<'a, B> {
max,
});
self.behaviour.send_packet(who, message)
self.behaviour.send_packet(who, message.encode())
}
}
@@ -291,7 +304,7 @@ pub trait Context<B: BlockT> {
/// Protocol context.
struct ProtocolContext<'a, B: 'a + BlockT, H: 'a + ExHashT> {
behaviour: &'a mut LegacyProto<B, Substream<StreamMuxerBox>>,
behaviour: &'a mut LegacyProto<Substream<StreamMuxerBox>>,
context_data: &'a mut ContextData<B, H>,
peerset_handle: &'a peerset::PeersetHandle,
}
@@ -299,7 +312,7 @@ struct ProtocolContext<'a, B: 'a + BlockT, H: 'a + ExHashT> {
impl<'a, B: BlockT + 'a, H: 'a + ExHashT> ProtocolContext<'a, B, H> {
fn new(
context_data: &'a mut ContextData<B, H>,
behaviour: &'a mut LegacyProto<B, Substream<StreamMuxerBox>>,
behaviour: &'a mut LegacyProto<Substream<StreamMuxerBox>>,
peerset_handle: &'a peerset::PeersetHandle,
) -> Self {
ProtocolContext { context_data, peerset_handle, behaviour }
@@ -316,19 +329,19 @@ impl<'a, B: BlockT + 'a, H: ExHashT + 'a> Context<B> for ProtocolContext<'a, B,
}
fn send_consensus(&mut self, who: PeerId, consensus: ConsensusMessage) {
send_message(
send_message::<B> (
self.behaviour,
&mut self.context_data.peers,
who,
&mut self.context_data.stats,
&who,
GenericMessage::Consensus(consensus)
)
}
fn send_chain_specific(&mut self, who: PeerId, message: Vec<u8>) {
send_message(
send_message::<B> (
self.behaviour,
&mut self.context_data.peers,
who,
&mut self.context_data.stats,
&who,
GenericMessage::ChainSpecific(message)
)
}
@@ -338,6 +351,7 @@ impl<'a, B: BlockT + 'a, H: ExHashT + 'a> Context<B> for ProtocolContext<'a, B,
struct ContextData<B: BlockT, H: ExHashT> {
// All connected peers
peers: HashMap<PeerId, Peer<B, H>>,
stats: HashMap<&'static str, PacketStats>,
pub chain: Arc<dyn Client<B>>,
}
@@ -388,6 +402,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
config,
context_data: ContextData {
peers: HashMap::new(),
stats: HashMap::new(),
chain,
},
light_dispatch: LightDispatch::new(checker),
@@ -517,8 +532,22 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
pub fn on_custom_message(
&mut self,
who: PeerId,
message: Message<B>,
data: BytesMut,
) -> CustomMessageOutcome<B> {
let message = match <Message<B> as Decode>::decode(&mut &data[..]) {
Ok(message) => message,
Err(err) => {
debug!(target: "sync", "Couldn't decode packet sent by {}: {:?}: {}", who, data, err.what());
self.peerset_handle.report_peer(who.clone(), BAD_MESSAGE_REPUTATION_CHANGE);
return CustomMessageOutcome::None;
}
};
let mut stats = self.context_data.stats.entry(message.id()).or_default();
stats.bytes_in += data.len() as u64;
stats.count_in += 1;
match message {
GenericMessage::Status(s) => self.on_status_message(who, s),
GenericMessage::BlockRequest(r) => self.on_block_request(who, r),
@@ -581,15 +610,25 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
CustomMessageOutcome::None
}
fn send_message(&mut self, who: PeerId, message: Message<B>) {
send_message::<B, H>(
fn send_request(&mut self, who: &PeerId, message: Message<B>) {
send_request::<B, H>(
&mut self.behaviour,
&mut self.context_data.stats,
&mut self.context_data.peers,
who,
message,
);
}
fn send_message(&mut self, who: &PeerId, message: Message<B>) {
send_message::<B>(
&mut self.behaviour,
&mut self.context_data.stats,
who,
message,
);
}
/// Locks `self` and returns a context plus the `ConsensusGossip` struct.
pub fn consensus_gossip_lock<'a>(
&'a mut self,
@@ -622,7 +661,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
GossipMessageRecipient::BroadcastNew =>
self.consensus_gossip.multicast(&mut context, topic, message, false),
GossipMessageRecipient::Peer(who) =>
self.send_message(who, GenericMessage::Consensus(message)),
self.send_message(&who, GenericMessage::Consensus(message)),
}
}
@@ -745,7 +784,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
blocks: blocks,
};
trace!(target: "sync", "Sending BlockResponse with {} blocks", response.blocks.len());
self.send_message(peer, GenericMessage::BlockResponse(response))
self.send_message(&peer, GenericMessage::BlockResponse(response))
}
/// Adjusts the reputation of a node.
@@ -792,7 +831,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
Ok(sync::OnBlockData::Import(origin, blocks)) =>
CustomMessageOutcome::BlockImport(origin, blocks),
Ok(sync::OnBlockData::Request(peer, req)) => {
self.send_message(peer, GenericMessage::BlockRequest(req));
self.send_request(&peer, GenericMessage::BlockRequest(req));
CustomMessageOutcome::None
}
Err(sync::BadPeer(id, repu)) => {
@@ -939,7 +978,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
}, who.clone(), status.roles, status.best_number);
match self.sync.new_peer(who.clone(), info) {
Ok(None) => (),
Ok(Some(req)) => self.send_message(who.clone(), GenericMessage::BlockRequest(req)),
Ok(Some(req)) => self.send_request(&who, GenericMessage::BlockRequest(req)),
Err(sync::BadPeer(id, repu)) => {
self.behaviour.disconnect_peer(&id);
self.peerset_handle.report_peer(id, repu)
@@ -1020,7 +1059,12 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
.push(who.to_base58());
}
trace!(target: "sync", "Sending {} transactions to {}", to_send.len(), who);
self.behaviour.send_packet(who, GenericMessage::Transactions(to_send))
send_message::<B> (
&mut self.behaviour,
&mut self.context_data.stats,
&who,
GenericMessage::Transactions(to_send)
)
}
}
@@ -1061,7 +1105,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
trace!(target: "sync", "Announcing block {:?} to {}", hash, who);
let inserted = peer.known_blocks.insert(hash);
if inserted || force {
let message = GenericMessage::BlockAnnounce(message::BlockAnnounce {
let message: Message<B> = GenericMessage::BlockAnnounce(message::BlockAnnounce {
header: header.clone(),
state: if peer.info.protocol_version >= 4 {
if is_best {
@@ -1079,7 +1123,12 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
},
});
self.behaviour.send_packet(who, message)
send_message::<B> (
&mut self.behaviour,
&mut self.context_data.stats,
&who,
message,
)
}
}
}
@@ -1097,7 +1146,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
chain_status: self.specialization.status(),
};
self.send_message(who, GenericMessage::Status(status))
self.send_message(&who, GenericMessage::Status(status))
}
fn on_block_announce(&mut self, who: PeerId, announce: BlockAnnounce<B::Header>) -> CustomMessageOutcome<B> {
@@ -1157,7 +1206,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
match blocks_to_import {
Ok(sync::OnBlockData::Import(origin, blocks)) => CustomMessageOutcome::BlockImport(origin, blocks),
Ok(sync::OnBlockData::Request(peer, req)) => {
self.send_message(peer, GenericMessage::BlockRequest(req));
self.send_request(&peer, GenericMessage::BlockRequest(req));
CustomMessageOutcome::None
}
Err(sync::BadPeer(id, repu)) => {
@@ -1226,7 +1275,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
};
self.send_message(
who,
&who,
GenericMessage::RemoteCallResponse(message::RemoteCallResponse {
id: request.id,
proof,
@@ -1269,7 +1318,13 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
match result {
Ok((id, req)) => {
let msg = GenericMessage::BlockRequest(req);
send_message(&mut self.behaviour, &mut self.context_data.peers, id, msg)
send_request(
&mut self.behaviour,
&mut self.context_data.stats,
&mut self.context_data.peers,
&id,
msg
)
}
Err(sync::BadPeer(id, repu)) => {
self.behaviour.disconnect_peer(&id);
@@ -1342,7 +1397,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
}
};
self.send_message(
who,
&who,
GenericMessage::RemoteReadResponse(message::RemoteReadResponse {
id: request.id,
proof,
@@ -1385,7 +1440,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
}
};
self.send_message(
who,
&who,
GenericMessage::RemoteReadResponse(message::RemoteReadResponse {
id: request.id,
proof,
@@ -1425,7 +1480,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
}
};
self.send_message(
who,
&who,
GenericMessage::RemoteHeaderResponse(message::RemoteHeaderResponse {
id: request.id,
header,
@@ -1495,7 +1550,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
}
};
self.send_message(
who,
&who,
GenericMessage::RemoteChangesResponse(message::RemoteChangesResponse {
id: request.id,
max: proof.max_block,
@@ -1545,7 +1600,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
},
};
self.send_message(
who,
&who,
GenericMessage::FinalityProofResponse(message::FinalityProofResponse {
id: 0,
block: request.block,
@@ -1582,6 +1637,22 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
peerset: self.peerset_handle.clone(),
}, peer, response);
}
fn format_stats(&self) -> String {
let mut out = String::new();
for (id, stats) in &self.context_data.stats {
let _ = writeln!(
&mut out,
"{}: In: {} bytes ({}), Out: {} bytes ({})",
id,
stats.bytes_in,
stats.count_in,
stats.bytes_out,
stats.count_out,
);
}
out
}
}
/// Outcome of an incoming custom message.
@@ -1593,14 +1664,15 @@ pub enum CustomMessageOutcome<B: BlockT> {
None,
}
fn send_message<B: BlockT, H: ExHashT>(
behaviour: &mut LegacyProto<B, Substream<StreamMuxerBox>>,
fn send_request<B: BlockT, H: ExHashT>(
behaviour: &mut LegacyProto<Substream<StreamMuxerBox>>,
stats: &mut HashMap<&'static str, PacketStats>,
peers: &mut HashMap<PeerId, Peer<B, H>>,
who: PeerId,
who: &PeerId,
mut message: Message<B>,
) {
if let GenericMessage::BlockRequest(ref mut r) = message {
if let Some(ref mut peer) = peers.get_mut(&who) {
if let Some(ref mut peer) = peers.get_mut(who) {
r.id = peer.next_request_id;
peer.next_request_id = peer.next_request_id + 1;
if let Some((timestamp, request)) = peer.block_request.take() {
@@ -1610,12 +1682,25 @@ fn send_message<B: BlockT, H: ExHashT>(
peer.block_request = Some((time::Instant::now(), r.clone()));
}
}
behaviour.send_packet(&who, message);
send_message::<B>(behaviour, stats, who, message)
}
fn send_message<B: BlockT>(
behaviour: &mut LegacyProto<Substream<StreamMuxerBox>>,
stats: &mut HashMap<&'static str, PacketStats>,
who: &PeerId,
message: Message<B>,
) {
let encoded = message.encode();
let mut stats = stats.entry(message.id()).or_default();
stats.bytes_out += encoded.len() as u64;
stats.count_out += 1;
behaviour.send_packet(who, encoded);
}
impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> NetworkBehaviour for
Protocol<B, S, H> {
type ProtocolsHandler = <LegacyProto<B, Substream<StreamMuxerBox>> as NetworkBehaviour>::ProtocolsHandler;
type ProtocolsHandler = <LegacyProto<Substream<StreamMuxerBox>> as NetworkBehaviour>::ProtocolsHandler;
type OutEvent = CustomMessageOutcome<B>;
fn new_handler(&mut self) -> Self::ProtocolsHandler {
@@ -1660,13 +1745,30 @@ Protocol<B, S, H> {
}
for (id, r) in self.sync.block_requests() {
send_message(&mut self.behaviour, &mut self.context_data.peers, id, GenericMessage::BlockRequest(r))
send_request(
&mut self.behaviour,
&mut self.context_data.stats,
&mut self.context_data.peers,
&id,
GenericMessage::BlockRequest(r)
)
}
for (id, r) in self.sync.justification_requests() {
send_message(&mut self.behaviour, &mut self.context_data.peers, id, GenericMessage::BlockRequest(r))
send_request(
&mut self.behaviour,
&mut self.context_data.stats,
&mut self.context_data.peers,
&id,
GenericMessage::BlockRequest(r)
)
}
for (id, r) in self.sync.finality_proof_requests() {
send_message(&mut self.behaviour, &mut self.context_data.peers, id, GenericMessage::FinalityProofRequest(r))
send_request(
&mut self.behaviour,
&mut self.context_data.stats,
&mut self.context_data.peers,
&id,
GenericMessage::FinalityProofRequest(r))
}
let event = match self.behaviour.poll(params) {
@@ -1700,8 +1802,9 @@ Protocol<B, S, H> {
LegacyProtoOut::Clogged { peer_id, messages } => {
debug!(target: "sync", "{} clogging messages:", messages.len());
for msg in messages.into_iter().take(5) {
debug!(target: "sync", "{:?}", msg);
self.on_clogged_peer(peer_id.clone(), Some(msg));
let message: Option<Message<B>> = Decode::decode(&mut &msg[..]).ok();
debug!(target: "sync", "{:?}", message);
self.on_clogged_peer(peer_id.clone(), message);
}
CustomMessageOutcome::None
}
@@ -1749,3 +1852,9 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> DiscoveryNetBehaviour f
self.behaviour.add_discovered_nodes(peer_ids)
}
}
impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Drop for Protocol<B, S, H> {
fn drop(&mut self) {
debug!(target: "sync", "Network stats:\n{}", self.format_stats());
}
}