diff --git a/substrate/client/network/src/protocol.rs b/substrate/client/network/src/protocol.rs index 5679292967..0a9efbb3ba 100644 --- a/substrate/client/network/src/protocol.rs +++ b/substrate/client/network/src/protocol.rs @@ -54,9 +54,9 @@ use sp_runtime::traits::{ use sp_arithmetic::traits::SaturatedConversion; use sync::{ChainSync, SyncState}; use std::borrow::Cow; +use std::convert::TryFrom as _; use std::collections::{HashMap, HashSet, VecDeque, hash_map::Entry}; use std::sync::Arc; -use std::fmt::Write; use std::{io, iter, num::NonZeroUsize, pin::Pin, task::Poll, time}; mod generic_proto; @@ -213,7 +213,9 @@ pub struct Protocol { config: ProtocolConfig, genesis_hash: B::Hash, sync: ChainSync, - context_data: ContextData, + // All connected peers + peers: HashMap>, + chain: Arc>, /// List of nodes for which we perform additional logging because they are important for the /// user. important_peers: HashSet, @@ -230,14 +232,6 @@ pub struct Protocol { boot_node_ids: HashSet, } -#[derive(Default)] -struct PacketStats { - bytes_in: u64, - bytes_out: u64, - count_in: u64, - count_out: u64, -} - /// Peer information #[derive(Debug)] struct Peer { @@ -251,8 +245,6 @@ struct Peer { known_transactions: LruHashSet, /// Holds a set of blocks known to this peer. known_blocks: LruHashSet, - /// Request counter, - next_request_id: message::RequestId, } /// Info about a peer's known state. @@ -266,14 +258,6 @@ pub struct PeerInfo { pub best_number: ::Number, } -/// Data necessary to create a context. -struct ContextData { - // All connected peers - peers: HashMap>, - stats: HashMap<&'static str, PacketStats>, - pub chain: Arc>, -} - /// Configuration for the Substrate-specific part of the networking layer. #[derive(Clone)] pub struct ProtocolConfig { @@ -511,11 +495,8 @@ impl Protocol { pending_transactions: FuturesUnordered::new(), pending_transactions_peers: HashMap::new(), config, - context_data: ContextData { - peers: HashMap::new(), - stats: HashMap::new(), - chain, - }, + peers: HashMap::new(), + chain, genesis_hash: info.genesis_hash, sync, important_peers, @@ -567,13 +548,12 @@ impl Protocol { /// Returns the number of peers we're connected to. pub fn num_connected_peers(&self) -> usize { - self.context_data.peers.values().count() + self.peers.values().count() } /// Returns the number of peers we're connected to and that are being queried. pub fn num_active_peers(&self) -> usize { - self.context_data - .peers + self.peers .values() .filter(|p| p.block_request.is_some()) .count() @@ -631,7 +611,7 @@ impl Protocol { fn update_peer_info(&mut self, who: &PeerId) { if let Some(info) = self.sync.peer_info(who) { - if let Some(ref mut peer) = self.context_data.peers.get_mut(who) { + if let Some(ref mut peer) = self.peers.get_mut(who) { peer.info.best_hash = info.best_hash; peer.info.best_number = info.best_number; } @@ -640,7 +620,7 @@ impl Protocol { /// Returns information about all the peers we are connected to after the handshake message. pub fn peers_info(&self) -> impl Iterator)> { - self.context_data.peers.iter().map(|(id, peer)| (id, &peer.info)) + self.peers.iter().map(|(id, peer)| (id, &peer.info)) } fn on_custom_message( @@ -663,10 +643,6 @@ impl Protocol { } }; - let mut stats = self.context_data.stats.entry(message.id()).or_default(); - stats.bytes_in += data.len() as u64; - stats.count_in += 1; - match message { GenericMessage::Status(_) => debug!(target: "sub-libp2p", "Received unexpected Status"), @@ -710,7 +686,7 @@ impl Protocol { who: PeerId, request: message::BlockRequest, ) -> CustomMessageOutcome { - prepare_block_request::(&mut self.context_data.peers, who, request) + prepare_block_request::(&mut self.peers, who, request) } /// Called by peer when it is disconnecting. @@ -723,7 +699,7 @@ impl Protocol { trace!(target: "sync", "{} disconnected", peer); } - if let Some(_peer_data) = self.context_data.peers.remove(&peer) { + if let Some(_peer_data) = self.peers.remove(&peer) { self.sync.peer_disconnected(&peer); Ok(()) } else { @@ -854,7 +830,7 @@ impl Protocol { ) -> Result<(), ()> { trace!(target: "sync", "New peer {} {:?}", who, status); - if self.context_data.peers.contains_key(&who) { + if self.peers.contains_key(&who) { log::error!(target: "sync", "Called on_sync_peer_connected with already connected peer {}", who); debug_assert!(false); return Err(()); @@ -894,7 +870,6 @@ impl Protocol { // we don't interested in peers that are far behind us let self_best_block = self - .context_data .chain .info() .best_number; @@ -921,7 +896,6 @@ impl Protocol { .expect("Constant is nonzero")), known_blocks: LruHashSet::new(NonZeroUsize::new(MAX_KNOWN_BLOCKS) .expect("Constant is nonzero")), - next_request_id: 0, }; let req = if peer.info.roles.is_full() { @@ -939,7 +913,7 @@ impl Protocol { debug!(target: "sync", "Connected {}", who); - self.context_data.peers.insert(who.clone(), peer); + self.peers.insert(who.clone(), peer); self.pending_messages.push_back(CustomMessageOutcome::PeerNewBest(who.clone(), status.best_number)); if let Some(req) = req { @@ -971,7 +945,7 @@ impl Protocol { } trace!(target: "sync", "Received {} transactions from {}", transactions.len(), who); - if let Some(ref mut peer) = self.context_data.peers.get_mut(&who) { + if let Some(ref mut peer) = self.peers.get_mut(&who) { for t in transactions { if self.pending_transactions.len() > MAX_PENDING_TRANSACTIONS { debug!( @@ -1035,7 +1009,7 @@ impl Protocol { let mut propagated_to = HashMap::<_, Vec<_>>::new(); let mut propagated_transactions = 0; - for (who, peer) in self.context_data.peers.iter_mut() { + for (who, peer) in self.peers.iter_mut() { // never send transactions to the light node if !peer.info.roles.is_full() { continue; @@ -1093,7 +1067,7 @@ impl Protocol { /// In chain-based consensus, we often need to make sure non-best forks are /// at least temporarily synced. pub fn announce_block(&mut self, hash: B::Hash, data: Vec) { - let header = match self.context_data.chain.header(BlockId::Hash(hash)) { + let header = match self.chain.header(BlockId::Hash(hash)) { Ok(Some(header)) => header, Ok(None) => { warn!("Trying to announce unknown block: {}", hash); @@ -1110,10 +1084,10 @@ impl Protocol { return; } - let is_best = self.context_data.chain.info().best_hash == hash; + let is_best = self.chain.info().best_hash == hash; debug!(target: "sync", "Reannouncing block {:?} is_best: {}", hash, is_best); - for (who, ref mut peer) in self.context_data.peers.iter_mut() { + for (who, ref mut peer) in self.peers.iter_mut() { let inserted = peer.known_blocks.insert(hash); if inserted { trace!(target: "sync", "Announcing block {:?} to {}", hash, who); @@ -1156,7 +1130,7 @@ impl Protocol { ) { let hash = announce.header.hash(); - let peer = match self.context_data.peers.get_mut(&who) { + let peer = match self.peers.get_mut(&who) { Some(p) => p, None => { log::error!(target: "sync", "Received block announce from disconnected peer {}", who); @@ -1294,7 +1268,7 @@ impl Protocol { match result { Ok((id, req)) => { self.pending_messages.push_back( - prepare_block_request(&mut self.context_data.peers, id, req) + prepare_block_request(&mut self.peers, id, req) ); } Err(sync::BadPeer(id, repu)) => { @@ -1404,27 +1378,9 @@ impl Protocol { } } - fn format_stats(&self) -> String { - let mut out = String::new(); - for (id, stats) in &self.context_data.stats { - let _ = writeln!( - &mut out, - "{}: In: {} bytes ({}), Out: {} bytes ({})", - id, - stats.bytes_in, - stats.count_in, - stats.bytes_out, - stats.count_out, - ); - } - out - } - fn report_metrics(&self) { - use std::convert::TryInto; - if let Some(metrics) = &self.metrics { - let n = self.context_data.peers.len().try_into().unwrap_or(std::u64::MAX); + let n = u64::try_from(self.peers.len()).unwrap_or(std::u64::MAX); metrics.peers.set(n); let m = self.sync.metrics(); @@ -1447,13 +1403,11 @@ impl Protocol { fn prepare_block_request( peers: &mut HashMap>, who: PeerId, - mut request: message::BlockRequest, + request: message::BlockRequest, ) -> CustomMessageOutcome { let (tx, rx) = oneshot::channel(); if let Some(ref mut peer) = peers.get_mut(&who) { - request.id = peer.next_request_id; - peer.next_request_id += 1; peer.block_request = Some((request.clone(), rx)); } @@ -1568,7 +1522,7 @@ impl NetworkBehaviour for Protocol { // Check for finished outgoing requests. let mut finished_block_requests = Vec::new(); - for (id, peer) in self.context_data.peers.iter_mut() { + for (id, peer) in self.peers.iter_mut() { if let Peer { block_request: Some((_, pending_response)), .. } = peer { match pending_response.poll_unpin(cx) { Poll::Ready(Ok(Ok(resp))) => { @@ -1649,11 +1603,11 @@ impl NetworkBehaviour for Protocol { } for (id, request) in self.sync.block_requests() { - let event = prepare_block_request(&mut self.context_data.peers, id.clone(), request); + let event = prepare_block_request(&mut self.peers, id.clone(), request); self.pending_messages.push_back(event); } for (id, request) in self.sync.justification_requests() { - let event = prepare_block_request(&mut self.context_data.peers, id, request); + let event = prepare_block_request(&mut self.peers, id, request); self.pending_messages.push_back(event); } if let Poll::Ready(Some((tx_hash, result))) = self.pending_transactions.poll_next_unpin(cx) { @@ -1816,15 +1770,15 @@ impl NetworkBehaviour for Protocol { } }, GenericProtoOut::LegacyMessage { peer_id, message } => { - if self.context_data.peers.contains_key(&peer_id) { + if self.peers.contains_key(&peer_id) { self.on_custom_message(peer_id, message) } else { CustomMessageOutcome::None } }, GenericProtoOut::Notification { peer_id, set_id, message } => - match usize::from(set_id) { - 0 if self.context_data.peers.contains_key(&peer_id) => { + match set_id { + HARDCODED_PEERSETS_SYNC if self.peers.contains_key(&peer_id) => { if let Ok(announce) = message::BlockAnnounce::decode(&mut message.as_ref()) { self.push_block_announce_validation(peer_id, announce); @@ -1840,7 +1794,7 @@ impl NetworkBehaviour for Protocol { CustomMessageOutcome::None } } - 1 if self.context_data.peers.contains_key(&peer_id) => { + HARDCODED_PEERSETS_TX if self.peers.contains_key(&peer_id) => { if let Ok(m) = as Decode>::decode( &mut message.as_ref(), ) { @@ -1850,7 +1804,7 @@ impl NetworkBehaviour for Protocol { } CustomMessageOutcome::None } - 0 | 1 => { + HARDCODED_PEERSETS_SYNC | HARDCODED_PEERSETS_TX => { debug!( target: "sync", "Received sync or transaction for peer earlier refused by sync layer: {}", @@ -1916,9 +1870,3 @@ impl NetworkBehaviour for Protocol { self.behaviour.inject_listener_closed(id, reason); } } - -impl Drop for Protocol { - fn drop(&mut self) { - debug!(target: "sync", "Network stats:\n{}", self.format_stats()); - } -} diff --git a/substrate/client/network/src/protocol/message.rs b/substrate/client/network/src/protocol/message.rs index c0a92629d9..3aa1e2cf34 100644 --- a/substrate/client/network/src/protocol/message.rs +++ b/substrate/client/network/src/protocol/message.rs @@ -286,30 +286,6 @@ pub mod generic { ConsensusBatch(Vec), } - impl Message { - /// Message id useful for logging. - pub fn id(&self) -> &'static str { - match self { - Message::Status(_) => "Status", - Message::BlockRequest(_) => "BlockRequest", - Message::BlockResponse(_) => "BlockResponse", - Message::BlockAnnounce(_) => "BlockAnnounce", - Message::Transactions(_) => "Transactions", - Message::Consensus(_) => "Consensus", - Message::RemoteCallRequest(_) => "RemoteCallRequest", - Message::RemoteCallResponse(_) => "RemoteCallResponse", - Message::RemoteReadRequest(_) => "RemoteReadRequest", - Message::RemoteReadResponse(_) => "RemoteReadResponse", - Message::RemoteHeaderRequest(_) => "RemoteHeaderRequest", - Message::RemoteHeaderResponse(_) => "RemoteHeaderResponse", - Message::RemoteChangesRequest(_) => "RemoteChangesRequest", - Message::RemoteChangesResponse(_) => "RemoteChangesResponse", - Message::RemoteReadChildRequest(_) => "RemoteReadChildRequest", - Message::ConsensusBatch(_) => "ConsensusBatch", - } - } - } - /// Status sent on connection. // TODO https://github.com/paritytech/substrate/issues/4674: replace the `Status` // struct with this one, after waiting a few releases beyond `NetworkSpecialization`'s