Clean-up pass in network/src/protocol.rs (#7889)

* Remove statistics system

* Remove ContextData struct

* Remove next_request_id

* Some TryFrom nit-picking

* Use constants for peer sets
This commit is contained in:
Pierre Krieger
2021-01-13 11:27:49 +01:00
committed by GitHub
parent 2d1b9a8d38
commit 4678da3efb
2 changed files with 31 additions and 107 deletions
+31 -83
View File
@@ -54,9 +54,9 @@ use sp_runtime::traits::{
use sp_arithmetic::traits::SaturatedConversion;
use sync::{ChainSync, SyncState};
use std::borrow::Cow;
use std::convert::TryFrom as _;
use std::collections::{HashMap, HashSet, VecDeque, hash_map::Entry};
use std::sync::Arc;
use std::fmt::Write;
use std::{io, iter, num::NonZeroUsize, pin::Pin, task::Poll, time};
mod generic_proto;
@@ -213,7 +213,9 @@ pub struct Protocol<B: BlockT, H: ExHashT> {
config: ProtocolConfig,
genesis_hash: B::Hash,
sync: ChainSync<B>,
context_data: ContextData<B, H>,
// All connected peers
peers: HashMap<PeerId, Peer<B, H>>,
chain: Arc<dyn Client<B>>,
/// List of nodes for which we perform additional logging because they are important for the
/// user.
important_peers: HashSet<PeerId>,
@@ -230,14 +232,6 @@ pub struct Protocol<B: BlockT, H: ExHashT> {
boot_node_ids: HashSet<PeerId>,
}
#[derive(Default)]
struct PacketStats {
bytes_in: u64,
bytes_out: u64,
count_in: u64,
count_out: u64,
}
/// Peer information
#[derive(Debug)]
struct Peer<B: BlockT, H: ExHashT> {
@@ -251,8 +245,6 @@ struct Peer<B: BlockT, H: ExHashT> {
known_transactions: LruHashSet<H>,
/// Holds a set of blocks known to this peer.
known_blocks: LruHashSet<B::Hash>,
/// Request counter,
next_request_id: message::RequestId,
}
/// Info about a peer's known state.
@@ -266,14 +258,6 @@ pub struct PeerInfo<B: BlockT> {
pub best_number: <B::Header as HeaderT>::Number,
}
/// Data necessary to create a context.
struct ContextData<B: BlockT, H: ExHashT> {
// All connected peers
peers: HashMap<PeerId, Peer<B, H>>,
stats: HashMap<&'static str, PacketStats>,
pub chain: Arc<dyn Client<B>>,
}
/// Configuration for the Substrate-specific part of the networking layer.
#[derive(Clone)]
pub struct ProtocolConfig {
@@ -511,11 +495,8 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
pending_transactions: FuturesUnordered::new(),
pending_transactions_peers: HashMap::new(),
config,
context_data: ContextData {
peers: HashMap::new(),
stats: HashMap::new(),
chain,
},
peers: HashMap::new(),
chain,
genesis_hash: info.genesis_hash,
sync,
important_peers,
@@ -567,13 +548,12 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
/// Returns the number of peers we're connected to.
pub fn num_connected_peers(&self) -> usize {
self.context_data.peers.values().count()
self.peers.values().count()
}
/// Returns the number of peers we're connected to and that are being queried.
pub fn num_active_peers(&self) -> usize {
self.context_data
.peers
self.peers
.values()
.filter(|p| p.block_request.is_some())
.count()
@@ -631,7 +611,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
fn update_peer_info(&mut self, who: &PeerId) {
if let Some(info) = self.sync.peer_info(who) {
if let Some(ref mut peer) = self.context_data.peers.get_mut(who) {
if let Some(ref mut peer) = self.peers.get_mut(who) {
peer.info.best_hash = info.best_hash;
peer.info.best_number = info.best_number;
}
@@ -640,7 +620,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
/// Returns information about all the peers we are connected to after the handshake message.
pub fn peers_info(&self) -> impl Iterator<Item = (&PeerId, &PeerInfo<B>)> {
self.context_data.peers.iter().map(|(id, peer)| (id, &peer.info))
self.peers.iter().map(|(id, peer)| (id, &peer.info))
}
fn on_custom_message(
@@ -663,10 +643,6 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
}
};
let mut stats = self.context_data.stats.entry(message.id()).or_default();
stats.bytes_in += data.len() as u64;
stats.count_in += 1;
match message {
GenericMessage::Status(_) =>
debug!(target: "sub-libp2p", "Received unexpected Status"),
@@ -710,7 +686,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
who: PeerId,
request: message::BlockRequest<B>,
) -> CustomMessageOutcome<B> {
prepare_block_request::<B, H>(&mut self.context_data.peers, who, request)
prepare_block_request::<B, H>(&mut self.peers, who, request)
}
/// Called by peer when it is disconnecting.
@@ -723,7 +699,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
trace!(target: "sync", "{} disconnected", peer);
}
if let Some(_peer_data) = self.context_data.peers.remove(&peer) {
if let Some(_peer_data) = self.peers.remove(&peer) {
self.sync.peer_disconnected(&peer);
Ok(())
} else {
@@ -854,7 +830,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
) -> Result<(), ()> {
trace!(target: "sync", "New peer {} {:?}", who, status);
if self.context_data.peers.contains_key(&who) {
if self.peers.contains_key(&who) {
log::error!(target: "sync", "Called on_sync_peer_connected with already connected peer {}", who);
debug_assert!(false);
return Err(());
@@ -894,7 +870,6 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
// we don't interested in peers that are far behind us
let self_best_block = self
.context_data
.chain
.info()
.best_number;
@@ -921,7 +896,6 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
.expect("Constant is nonzero")),
known_blocks: LruHashSet::new(NonZeroUsize::new(MAX_KNOWN_BLOCKS)
.expect("Constant is nonzero")),
next_request_id: 0,
};
let req = if peer.info.roles.is_full() {
@@ -939,7 +913,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
debug!(target: "sync", "Connected {}", who);
self.context_data.peers.insert(who.clone(), peer);
self.peers.insert(who.clone(), peer);
self.pending_messages.push_back(CustomMessageOutcome::PeerNewBest(who.clone(), status.best_number));
if let Some(req) = req {
@@ -971,7 +945,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
}
trace!(target: "sync", "Received {} transactions from {}", transactions.len(), who);
if let Some(ref mut peer) = self.context_data.peers.get_mut(&who) {
if let Some(ref mut peer) = self.peers.get_mut(&who) {
for t in transactions {
if self.pending_transactions.len() > MAX_PENDING_TRANSACTIONS {
debug!(
@@ -1035,7 +1009,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
let mut propagated_to = HashMap::<_, Vec<_>>::new();
let mut propagated_transactions = 0;
for (who, peer) in self.context_data.peers.iter_mut() {
for (who, peer) in self.peers.iter_mut() {
// never send transactions to the light node
if !peer.info.roles.is_full() {
continue;
@@ -1093,7 +1067,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
/// In chain-based consensus, we often need to make sure non-best forks are
/// at least temporarily synced.
pub fn announce_block(&mut self, hash: B::Hash, data: Vec<u8>) {
let header = match self.context_data.chain.header(BlockId::Hash(hash)) {
let header = match self.chain.header(BlockId::Hash(hash)) {
Ok(Some(header)) => header,
Ok(None) => {
warn!("Trying to announce unknown block: {}", hash);
@@ -1110,10 +1084,10 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
return;
}
let is_best = self.context_data.chain.info().best_hash == hash;
let is_best = self.chain.info().best_hash == hash;
debug!(target: "sync", "Reannouncing block {:?} is_best: {}", hash, is_best);
for (who, ref mut peer) in self.context_data.peers.iter_mut() {
for (who, ref mut peer) in self.peers.iter_mut() {
let inserted = peer.known_blocks.insert(hash);
if inserted {
trace!(target: "sync", "Announcing block {:?} to {}", hash, who);
@@ -1156,7 +1130,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
) {
let hash = announce.header.hash();
let peer = match self.context_data.peers.get_mut(&who) {
let peer = match self.peers.get_mut(&who) {
Some(p) => p,
None => {
log::error!(target: "sync", "Received block announce from disconnected peer {}", who);
@@ -1294,7 +1268,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
match result {
Ok((id, req)) => {
self.pending_messages.push_back(
prepare_block_request(&mut self.context_data.peers, id, req)
prepare_block_request(&mut self.peers, id, req)
);
}
Err(sync::BadPeer(id, repu)) => {
@@ -1404,27 +1378,9 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
}
}
fn format_stats(&self) -> String {
let mut out = String::new();
for (id, stats) in &self.context_data.stats {
let _ = writeln!(
&mut out,
"{}: In: {} bytes ({}), Out: {} bytes ({})",
id,
stats.bytes_in,
stats.count_in,
stats.bytes_out,
stats.count_out,
);
}
out
}
fn report_metrics(&self) {
use std::convert::TryInto;
if let Some(metrics) = &self.metrics {
let n = self.context_data.peers.len().try_into().unwrap_or(std::u64::MAX);
let n = u64::try_from(self.peers.len()).unwrap_or(std::u64::MAX);
metrics.peers.set(n);
let m = self.sync.metrics();
@@ -1447,13 +1403,11 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
fn prepare_block_request<B: BlockT, H: ExHashT>(
peers: &mut HashMap<PeerId, Peer<B, H>>,
who: PeerId,
mut request: message::BlockRequest<B>,
request: message::BlockRequest<B>,
) -> CustomMessageOutcome<B> {
let (tx, rx) = oneshot::channel();
if let Some(ref mut peer) = peers.get_mut(&who) {
request.id = peer.next_request_id;
peer.next_request_id += 1;
peer.block_request = Some((request.clone(), rx));
}
@@ -1568,7 +1522,7 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
// Check for finished outgoing requests.
let mut finished_block_requests = Vec::new();
for (id, peer) in self.context_data.peers.iter_mut() {
for (id, peer) in self.peers.iter_mut() {
if let Peer { block_request: Some((_, pending_response)), .. } = peer {
match pending_response.poll_unpin(cx) {
Poll::Ready(Ok(Ok(resp))) => {
@@ -1649,11 +1603,11 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
}
for (id, request) in self.sync.block_requests() {
let event = prepare_block_request(&mut self.context_data.peers, id.clone(), request);
let event = prepare_block_request(&mut self.peers, id.clone(), request);
self.pending_messages.push_back(event);
}
for (id, request) in self.sync.justification_requests() {
let event = prepare_block_request(&mut self.context_data.peers, id, request);
let event = prepare_block_request(&mut self.peers, id, request);
self.pending_messages.push_back(event);
}
if let Poll::Ready(Some((tx_hash, result))) = self.pending_transactions.poll_next_unpin(cx) {
@@ -1816,15 +1770,15 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
}
},
GenericProtoOut::LegacyMessage { peer_id, message } => {
if self.context_data.peers.contains_key(&peer_id) {
if self.peers.contains_key(&peer_id) {
self.on_custom_message(peer_id, message)
} else {
CustomMessageOutcome::None
}
},
GenericProtoOut::Notification { peer_id, set_id, message } =>
match usize::from(set_id) {
0 if self.context_data.peers.contains_key(&peer_id) => {
match set_id {
HARDCODED_PEERSETS_SYNC if self.peers.contains_key(&peer_id) => {
if let Ok(announce) = message::BlockAnnounce::decode(&mut message.as_ref()) {
self.push_block_announce_validation(peer_id, announce);
@@ -1840,7 +1794,7 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
CustomMessageOutcome::None
}
}
1 if self.context_data.peers.contains_key(&peer_id) => {
HARDCODED_PEERSETS_TX if self.peers.contains_key(&peer_id) => {
if let Ok(m) = <message::Transactions<B::Extrinsic> as Decode>::decode(
&mut message.as_ref(),
) {
@@ -1850,7 +1804,7 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
}
CustomMessageOutcome::None
}
0 | 1 => {
HARDCODED_PEERSETS_SYNC | HARDCODED_PEERSETS_TX => {
debug!(
target: "sync",
"Received sync or transaction for peer earlier refused by sync layer: {}",
@@ -1916,9 +1870,3 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
self.behaviour.inject_listener_closed(id, reason);
}
}
impl<B: BlockT, H: ExHashT> Drop for Protocol<B, H> {
fn drop(&mut self) {
debug!(target: "sync", "Network stats:\n{}", self.format_stats());
}
}