From 6e394464b8a5d10c2140a0ef2d003f2d3a573f4c Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Sat, 23 Mar 2019 10:34:28 +0100 Subject: [PATCH] Replace NodeIndex with PeerId everywhere (#2077) * Replace NodeIndex with PeerId * Fix tests * More test fixing * Whitespace --- substrate/Cargo.lock | 45 ++--- substrate/core/consensus/common/Cargo.toml | 1 + .../core/consensus/common/src/import_queue.rs | 22 ++- substrate/core/network-libp2p/src/lib.rs | 6 - .../core/network-libp2p/src/service_task.rs | 152 +++++---------- substrate/core/network-libp2p/tests/test.rs | 8 +- substrate/core/network/src/blocks.rs | 75 ++++---- .../core/network/src/consensus_gossip.rs | 20 +- substrate/core/network/src/lib.rs | 2 +- substrate/core/network/src/on_demand.rs | 180 ++++++++++-------- substrate/core/network/src/protocol.rs | 138 +++++++------- substrate/core/network/src/service.rs | 52 +++-- substrate/core/network/src/specialization.rs | 8 +- substrate/core/network/src/sync.rs | 68 +++---- .../core/network/src/test/block_import.rs | 17 +- substrate/core/network/src/test/mod.rs | 73 +++---- substrate/core/network/src/test/sync.rs | 19 +- substrate/core/rpc/src/system/helpers.rs | 5 +- substrate/core/rpc/src/system/mod.rs | 5 +- substrate/core/rpc/src/system/tests.rs | 8 +- 20 files changed, 424 insertions(+), 480 deletions(-) diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index 52eec67507..c744835520 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -1287,7 +1287,7 @@ dependencies = [ [[package]] name = "libp2p" version = "0.5.0" -source = "git+https://github.com/tomaka/libp2p-rs?branch=substrate-tmp-2019-03-20#afd9623fccaac5ce1c41db1d82ad4def2652390f" +source = "git+https://github.com/tomaka/libp2p-rs?branch=substrate-tmp-2019-03-20#f0a2243eddd71c31e924044e80f03abd2dc61267" dependencies = [ "bytes 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1322,7 +1322,7 @@ dependencies = [ [[package]] name = "libp2p-core" version = "0.5.0" -source = "git+https://github.com/tomaka/libp2p-rs?branch=substrate-tmp-2019-03-20#afd9623fccaac5ce1c41db1d82ad4def2652390f" +source = "git+https://github.com/tomaka/libp2p-rs?branch=substrate-tmp-2019-03-20#f0a2243eddd71c31e924044e80f03abd2dc61267" dependencies = [ "asn1_der 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)", "bs58 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1357,7 +1357,7 @@ dependencies = [ [[package]] name = "libp2p-core-derive" version = "0.5.0" -source = "git+https://github.com/tomaka/libp2p-rs?branch=substrate-tmp-2019-03-20#afd9623fccaac5ce1c41db1d82ad4def2652390f" +source = "git+https://github.com/tomaka/libp2p-rs?branch=substrate-tmp-2019-03-20#f0a2243eddd71c31e924044e80f03abd2dc61267" dependencies = [ "quote 0.6.11 (registry+https://github.com/rust-lang/crates.io-index)", "syn 0.15.26 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1366,7 +1366,7 @@ dependencies = [ [[package]] name = "libp2p-dns" version = "0.5.0" -source = "git+https://github.com/tomaka/libp2p-rs?branch=substrate-tmp-2019-03-20#afd9623fccaac5ce1c41db1d82ad4def2652390f" +source = "git+https://github.com/tomaka/libp2p-rs?branch=substrate-tmp-2019-03-20#f0a2243eddd71c31e924044e80f03abd2dc61267" dependencies = [ "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", "libp2p-core 0.5.0 (git+https://github.com/tomaka/libp2p-rs?branch=substrate-tmp-2019-03-20)", @@ -1379,7 +1379,7 @@ dependencies = [ [[package]] name = "libp2p-floodsub" version = "0.5.0" -source = "git+https://github.com/tomaka/libp2p-rs?branch=substrate-tmp-2019-03-20#afd9623fccaac5ce1c41db1d82ad4def2652390f" +source = "git+https://github.com/tomaka/libp2p-rs?branch=substrate-tmp-2019-03-20#f0a2243eddd71c31e924044e80f03abd2dc61267" dependencies = [ "bs58 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", "bytes 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1398,7 +1398,7 @@ dependencies = [ [[package]] name = "libp2p-identify" version = "0.5.0" -source = "git+https://github.com/tomaka/libp2p-rs?branch=substrate-tmp-2019-03-20#afd9623fccaac5ce1c41db1d82ad4def2652390f" +source = "git+https://github.com/tomaka/libp2p-rs?branch=substrate-tmp-2019-03-20#f0a2243eddd71c31e924044e80f03abd2dc61267" dependencies = [ "bytes 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)", "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1419,7 +1419,7 @@ dependencies = [ [[package]] name = "libp2p-kad" version = "0.5.0" -source = "git+https://github.com/tomaka/libp2p-rs?branch=substrate-tmp-2019-03-20#afd9623fccaac5ce1c41db1d82ad4def2652390f" +source = "git+https://github.com/tomaka/libp2p-rs?branch=substrate-tmp-2019-03-20#f0a2243eddd71c31e924044e80f03abd2dc61267" dependencies = [ "arrayvec 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)", "bigint 4.4.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1447,7 +1447,7 @@ dependencies = [ [[package]] name = "libp2p-mdns" version = "0.5.0" -source = "git+https://github.com/tomaka/libp2p-rs?branch=substrate-tmp-2019-03-20#afd9623fccaac5ce1c41db1d82ad4def2652390f" +source = "git+https://github.com/tomaka/libp2p-rs?branch=substrate-tmp-2019-03-20#f0a2243eddd71c31e924044e80f03abd2dc61267" dependencies = [ "data-encoding 2.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "dns-parser 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1468,7 +1468,7 @@ dependencies = [ [[package]] name = "libp2p-mplex" version = "0.5.0" -source = "git+https://github.com/tomaka/libp2p-rs?branch=substrate-tmp-2019-03-20#afd9623fccaac5ce1c41db1d82ad4def2652390f" +source = "git+https://github.com/tomaka/libp2p-rs?branch=substrate-tmp-2019-03-20#f0a2243eddd71c31e924044e80f03abd2dc61267" dependencies = [ "bytes 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)", "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1484,7 +1484,7 @@ dependencies = [ [[package]] name = "libp2p-noise" version = "0.3.0" -source = "git+https://github.com/tomaka/libp2p-rs?branch=substrate-tmp-2019-03-20#afd9623fccaac5ce1c41db1d82ad4def2652390f" +source = "git+https://github.com/tomaka/libp2p-rs?branch=substrate-tmp-2019-03-20#f0a2243eddd71c31e924044e80f03abd2dc61267" dependencies = [ "curve25519-dalek 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1502,7 +1502,7 @@ dependencies = [ [[package]] name = "libp2p-ping" version = "0.5.0" -source = "git+https://github.com/tomaka/libp2p-rs?branch=substrate-tmp-2019-03-20#afd9623fccaac5ce1c41db1d82ad4def2652390f" +source = "git+https://github.com/tomaka/libp2p-rs?branch=substrate-tmp-2019-03-20#f0a2243eddd71c31e924044e80f03abd2dc61267" dependencies = [ "arrayvec 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)", "bytes 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1521,7 +1521,7 @@ dependencies = [ [[package]] name = "libp2p-plaintext" version = "0.5.0" -source = "git+https://github.com/tomaka/libp2p-rs?branch=substrate-tmp-2019-03-20#afd9623fccaac5ce1c41db1d82ad4def2652390f" +source = "git+https://github.com/tomaka/libp2p-rs?branch=substrate-tmp-2019-03-20#f0a2243eddd71c31e924044e80f03abd2dc61267" dependencies = [ "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", "libp2p-core 0.5.0 (git+https://github.com/tomaka/libp2p-rs?branch=substrate-tmp-2019-03-20)", @@ -1531,7 +1531,7 @@ dependencies = [ [[package]] name = "libp2p-ratelimit" version = "0.5.0" -source = "git+https://github.com/tomaka/libp2p-rs?branch=substrate-tmp-2019-03-20#afd9623fccaac5ce1c41db1d82ad4def2652390f" +source = "git+https://github.com/tomaka/libp2p-rs?branch=substrate-tmp-2019-03-20#f0a2243eddd71c31e924044e80f03abd2dc61267" dependencies = [ "aio-limited 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1544,7 +1544,7 @@ dependencies = [ [[package]] name = "libp2p-secio" version = "0.5.0" -source = "git+https://github.com/tomaka/libp2p-rs?branch=substrate-tmp-2019-03-20#afd9623fccaac5ce1c41db1d82ad4def2652390f" +source = "git+https://github.com/tomaka/libp2p-rs?branch=substrate-tmp-2019-03-20#f0a2243eddd71c31e924044e80f03abd2dc61267" dependencies = [ "aes-ctr 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "asn1_der 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1570,7 +1570,7 @@ dependencies = [ [[package]] name = "libp2p-tcp" version = "0.5.0" -source = "git+https://github.com/tomaka/libp2p-rs?branch=substrate-tmp-2019-03-20#afd9623fccaac5ce1c41db1d82ad4def2652390f" +source = "git+https://github.com/tomaka/libp2p-rs?branch=substrate-tmp-2019-03-20#f0a2243eddd71c31e924044e80f03abd2dc61267" dependencies = [ "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", "libp2p-core 0.5.0 (git+https://github.com/tomaka/libp2p-rs?branch=substrate-tmp-2019-03-20)", @@ -1584,7 +1584,7 @@ dependencies = [ [[package]] name = "libp2p-uds" version = "0.5.0" -source = "git+https://github.com/tomaka/libp2p-rs?branch=substrate-tmp-2019-03-20#afd9623fccaac5ce1c41db1d82ad4def2652390f" +source = "git+https://github.com/tomaka/libp2p-rs?branch=substrate-tmp-2019-03-20#f0a2243eddd71c31e924044e80f03abd2dc61267" dependencies = [ "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", "libp2p-core 0.5.0 (git+https://github.com/tomaka/libp2p-rs?branch=substrate-tmp-2019-03-20)", @@ -1596,7 +1596,7 @@ dependencies = [ [[package]] name = "libp2p-websocket" version = "0.5.0" -source = "git+https://github.com/tomaka/libp2p-rs?branch=substrate-tmp-2019-03-20#afd9623fccaac5ce1c41db1d82ad4def2652390f" +source = "git+https://github.com/tomaka/libp2p-rs?branch=substrate-tmp-2019-03-20#f0a2243eddd71c31e924044e80f03abd2dc61267" dependencies = [ "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", "libp2p-core 0.5.0 (git+https://github.com/tomaka/libp2p-rs?branch=substrate-tmp-2019-03-20)", @@ -1611,7 +1611,7 @@ dependencies = [ [[package]] name = "libp2p-yamux" version = "0.5.0" -source = "git+https://github.com/tomaka/libp2p-rs?branch=substrate-tmp-2019-03-20#afd9623fccaac5ce1c41db1d82ad4def2652390f" +source = "git+https://github.com/tomaka/libp2p-rs?branch=substrate-tmp-2019-03-20#f0a2243eddd71c31e924044e80f03abd2dc61267" dependencies = [ "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", "libp2p-core 0.5.0 (git+https://github.com/tomaka/libp2p-rs?branch=substrate-tmp-2019-03-20)", @@ -1807,7 +1807,7 @@ dependencies = [ [[package]] name = "multistream-select" version = "0.3.0" -source = "git+https://github.com/tomaka/libp2p-rs?branch=substrate-tmp-2019-03-20#afd9623fccaac5ce1c41db1d82ad4def2652390f" +source = "git+https://github.com/tomaka/libp2p-rs?branch=substrate-tmp-2019-03-20#f0a2243eddd71c31e924044e80f03abd2dc61267" dependencies = [ "bytes 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2179,7 +2179,7 @@ dependencies = [ [[package]] name = "parity-multiaddr" version = "0.2.0" -source = "git+https://github.com/tomaka/libp2p-rs?branch=substrate-tmp-2019-03-20#afd9623fccaac5ce1c41db1d82ad4def2652390f" +source = "git+https://github.com/tomaka/libp2p-rs?branch=substrate-tmp-2019-03-20#f0a2243eddd71c31e924044e80f03abd2dc61267" dependencies = [ "arrayref 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "bs58 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2193,7 +2193,7 @@ dependencies = [ [[package]] name = "parity-multihash" version = "0.1.0" -source = "git+https://github.com/tomaka/libp2p-rs?branch=substrate-tmp-2019-03-20#afd9623fccaac5ce1c41db1d82ad4def2652390f" +source = "git+https://github.com/tomaka/libp2p-rs?branch=substrate-tmp-2019-03-20#f0a2243eddd71c31e924044e80f03abd2dc61267" dependencies = [ "blake2 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2728,7 +2728,7 @@ dependencies = [ [[package]] name = "rw-stream-sink" version = "0.1.1" -source = "git+https://github.com/tomaka/libp2p-rs?branch=substrate-tmp-2019-03-20#afd9623fccaac5ce1c41db1d82ad4def2652390f" +source = "git+https://github.com/tomaka/libp2p-rs?branch=substrate-tmp-2019-03-20#f0a2243eddd71c31e924044e80f03abd2dc61267" dependencies = [ "bytes 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", @@ -3783,6 +3783,7 @@ dependencies = [ "crossbeam-channel 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", "error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", + "libp2p 0.5.0 (git+https://github.com/tomaka/libp2p-rs?branch=substrate-tmp-2019-03-20)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "parity-codec 3.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "parity-codec-derive 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/substrate/core/consensus/common/Cargo.toml b/substrate/core/consensus/common/Cargo.toml index 2eaf177874..814857fe69 100644 --- a/substrate/core/consensus/common/Cargo.toml +++ b/substrate/core/consensus/common/Cargo.toml @@ -7,6 +7,7 @@ edition = "2018" [dependencies] crossbeam-channel = "0.3.4" +libp2p = { git = "https://github.com/tomaka/libp2p-rs", branch = "substrate-tmp-2019-03-20", default-features = false } log = "0.4" primitives = { package = "substrate-primitives", path= "../../primitives" } inherents = { package = "substrate-inherents", path = "../../inherents" } diff --git a/substrate/core/consensus/common/src/import_queue.rs b/substrate/core/consensus/common/src/import_queue.rs index 1952869187..2e1c29d4d8 100644 --- a/substrate/core/consensus/common/src/import_queue.rs +++ b/substrate/core/consensus/common/src/import_queue.rs @@ -46,7 +46,7 @@ pub type SharedBlockImport = Arc + pub type SharedJustificationImport = Arc + Send + Sync>; /// Maps to the Origin used by the network. -pub type Origin = usize; +pub type Origin = libp2p::PeerId; /// Block data used by the queue. #[derive(Debug, PartialEq, Eq, Clone)] @@ -179,7 +179,7 @@ impl ImportQueue for BasicQueue { fn import_justification(&self, who: Origin, hash: B::Hash, number: NumberFor, justification: Justification) { let _ = self .sender - .send(BlockImportMsg::ImportJustification(who, hash, number, justification)) + .send(BlockImportMsg::ImportJustification(who.clone(), hash, number, justification)) .expect("1. self is holding a sender to the Importer, 2. Importer should handle messages while there are senders around; qed"); } } @@ -516,7 +516,7 @@ pub fn import_single_block>( let (header, justification) = match (block.header, block.justification) { (Some(header), justification) => (header, justification), (None, _) => { - if let Some(peer) = peer { + if let Some(ref peer) = peer { debug!(target: "sync", "Header {} was not provided by {} ", block.hash, peer); } else { debug!(target: "sync", "Header {} was not provided ", block.hash); @@ -535,14 +535,14 @@ pub fn import_single_block>( trace!(target: "sync", "Block already in chain {}: {:?}", number, hash); Ok(BlockImportResult::ImportedKnown(number)) }, - Ok(ImportResult::Imported(aux)) => Ok(BlockImportResult::ImportedUnknown(number, aux, peer)), + Ok(ImportResult::Imported(aux)) => Ok(BlockImportResult::ImportedUnknown(number, aux, peer.clone())), Ok(ImportResult::UnknownParent) => { debug!(target: "sync", "Block with unknown parent {}: {:?}, parent: {:?}", number, hash, parent); Err(BlockImportError::UnknownParent) }, Ok(ImportResult::KnownBad) => { debug!(target: "sync", "Peer gave us a bad block {}: {:?}", number, hash); - Err(BlockImportError::BadBlock(peer)) + Err(BlockImportError::BadBlock(peer.clone())) }, Err(e) => { debug!(target: "sync", "Error importing block {}: {:?}: {:?}", number, hash, e); @@ -558,12 +558,12 @@ pub fn import_single_block>( let (import_block, new_authorities) = verifier.verify(block_origin, header, justification, block.body) .map_err(|msg| { - if let Some(peer) = peer { + if let Some(ref peer) = peer { trace!(target: "sync", "Verifying {}({}) from {} failed: {}", number, hash, peer, msg); } else { trace!(target: "sync", "Verifying {}({}) failed: {}", number, hash, msg); } - BlockImportError::VerificationFailed(peer, msg) + BlockImportError::VerificationFailed(peer.clone(), msg) })?; import_error(import_handle.import_block(import_block, new_authorities)) @@ -572,6 +572,7 @@ pub fn import_single_block>( #[cfg(test)] mod tests { use super::*; + use libp2p::PeerId; use test_client::runtime::{Block, Hash}; #[derive(Debug, PartialEq)] @@ -639,15 +640,16 @@ mod tests { assert_eq!(link_port.recv(), Ok(LinkMsg::BlockImported)); // Send an unknown with peer and bad justification + let peer_id = PeerId::random(); let results = vec![(Ok(BlockImportResult::ImportedUnknown(Default::default(), ImportedAux { needs_justification: true, clear_justification_requests: false, bad_justification: true }, - Some(0))), Default::default())]; + Some(peer_id.clone()))), Default::default())]; let _ = result_sender.send(BlockImportWorkerMsg::Imported(results)).ok().unwrap(); assert_eq!(link_port.recv(), Ok(LinkMsg::BlockImported)); assert_eq!(link_port.recv(), Ok(LinkMsg::Disconnected)); // Send an incomplete header - let results = vec![(Err(BlockImportError::IncompleteHeader(Some(Default::default()))), Default::default())]; + let results = vec![(Err(BlockImportError::IncompleteHeader(Some(peer_id.clone()))), Default::default())]; let _ = result_sender.send(BlockImportWorkerMsg::Imported(results)).ok().unwrap(); assert_eq!(link_port.recv(), Ok(LinkMsg::Disconnected)); assert_eq!(link_port.recv(), Ok(LinkMsg::Restarted)); @@ -658,7 +660,7 @@ mod tests { assert_eq!(link_port.recv(), Ok(LinkMsg::Restarted)); // Send a verification failed - let results = vec![(Err(BlockImportError::VerificationFailed(Some(0), String::new())), Default::default())]; + let results = vec![(Err(BlockImportError::VerificationFailed(Some(peer_id.clone()), String::new())), Default::default())]; let _ = result_sender.send(BlockImportWorkerMsg::Imported(results)).ok().unwrap(); assert_eq!(link_port.recv(), Ok(LinkMsg::Disconnected)); assert_eq!(link_port.recv(), Ok(LinkMsg::Restarted)); diff --git a/substrate/core/network-libp2p/src/lib.rs b/substrate/core/network-libp2p/src/lib.rs index 1736fd44d7..34644da440 100644 --- a/substrate/core/network-libp2p/src/lib.rs +++ b/substrate/core/network-libp2p/src/lib.rs @@ -37,12 +37,6 @@ use std::{collections::{HashMap, HashSet}, error, fmt, time::Duration}; /// Protocol / handler id pub type ProtocolId = [u8; 3]; -/// Node public key -pub type NodeId = PeerId; - -/// Local (temporary) peer session ID. -pub type NodeIndex = usize; - /// Parses a string address and returns the component, if valid. pub fn parse_str_addr(addr_str: &str) -> Result<(PeerId, Multiaddr), ParseErr> { let mut addr: Multiaddr = addr_str.parse()?; diff --git a/substrate/core/network-libp2p/src/service_task.rs b/substrate/core/network-libp2p/src/service_task.rs index 4e7e0a33a8..b8cf445d64 100644 --- a/substrate/core/network-libp2p/src/service_task.rs +++ b/substrate/core/network-libp2p/src/service_task.rs @@ -19,14 +19,13 @@ use crate::{ transport, NetworkState, NetworkStatePeer, NetworkStateNotConnectedPeer }; use crate::custom_proto::{CustomMessage, RegisteredProtocol}; -use crate::{NetworkConfiguration, NonReservedPeerMode, NodeIndex, parse_str_addr}; +use crate::{NetworkConfiguration, NonReservedPeerMode, parse_str_addr}; use fnv::FnvHashMap; use futures::{prelude::*, Stream}; use libp2p::{multiaddr::Protocol, Multiaddr, core::swarm::NetworkBehaviour, PeerId}; use libp2p::core::{Swarm, nodes::Substream, transport::boxed::Boxed, muxing::StreamMuxerBox}; use libp2p::core::nodes::ConnectedPoint; -use log::{debug, error, info, warn}; -use std::collections::hash_map::Entry; +use log::{debug, info, warn}; use std::fs; use std::io::Error as IoError; use std::path::Path; @@ -114,8 +113,6 @@ where TMessage: CustomMessage + Send + 'static { swarm, bandwidth, nodes_info: Default::default(), - index_by_id: Default::default(), - next_node_id: 1, injected_events: Vec::new(), }; @@ -127,10 +124,8 @@ where TMessage: CustomMessage + Send + 'static { pub enum ServiceEvent { /// A custom protocol substream has been opened with a node. OpenedCustomProtocol { - /// The Id of the node. + /// Identity of the node. peer_id: PeerId, - /// Index of the node. - node_index: NodeIndex, /// Version of the protocol that was opened. version: u8, /// Node debug info @@ -139,16 +134,16 @@ pub enum ServiceEvent { /// A custom protocol substream has been closed. ClosedCustomProtocol { - /// Index of the node. - node_index: NodeIndex, + /// Identity of the node. + peer_id: PeerId, /// Node debug info debug_info: String, }, /// Receives a message on a custom protocol stream. CustomMessage { - /// Index of the node. - node_index: NodeIndex, + /// Identity of the node. + peer_id: PeerId, /// Message that has been received. message: TMessage, }, @@ -156,7 +151,7 @@ pub enum ServiceEvent { /// The substream with a node is clogged. We should avoid sending data to it if possible. Clogged { /// Index of the node. - node_index: NodeIndex, + peer_id: PeerId, /// Copy of the messages that are within the buffer, for further diagnostic. messages: Vec, }, @@ -171,13 +166,7 @@ pub struct Service where TMessage: CustomMessage { bandwidth: Arc, /// Information about all the nodes we're connected to. - nodes_info: FnvHashMap, - - /// Opposite of `nodes_info`. - index_by_id: FnvHashMap, - - /// Next index to assign to a node. - next_node_id: NodeIndex, + nodes_info: FnvHashMap, /// Events to produce on the Stream. injected_events: Vec>, @@ -186,8 +175,6 @@ pub struct Service where TMessage: CustomMessage { /// Information about a node we're connected to. #[derive(Debug)] struct NodeInfo { - /// Hash of the public key of the node. - peer_id: PeerId, /// How we're connected to the node. endpoint: ConnectedPoint, /// Version reported by the remote, or `None` if unknown. @@ -202,16 +189,16 @@ where TMessage: CustomMessage + Send + 'static { pub fn state(&mut self) -> NetworkState { let connected_peers = { let swarm = &mut self.swarm; - self.nodes_info.values().map(move |info| { - let known_addresses = NetworkBehaviour::addresses_of_peer(&mut **swarm, &info.peer_id) + self.nodes_info.iter().map(move |(peer_id, info)| { + let known_addresses = NetworkBehaviour::addresses_of_peer(&mut **swarm, peer_id) .into_iter().collect(); - (info.peer_id.to_base58(), NetworkStatePeer { + (peer_id.to_base58(), NetworkStatePeer { endpoint: info.endpoint.clone().into(), version_string: info.client_version.clone(), latest_ping_time: info.latest_ping, - enabled: swarm.is_enabled(&info.peer_id), - open: swarm.is_open(&info.peer_id), + enabled: swarm.is_enabled(&peer_id), + open: swarm.is_open(&peer_id), known_addresses, }) }).collect() @@ -219,8 +206,8 @@ where TMessage: CustomMessage + Send + 'static { let not_connected_peers = { let swarm = &mut self.swarm; - let index_by_id = &self.index_by_id; - let list = swarm.known_peers().filter(|p| !index_by_id.contains_key(p)) + let nodes_info = &self.nodes_info; + let list = swarm.known_peers().filter(|p| !nodes_info.contains_key(p)) .cloned().collect::>(); list.into_iter().map(move |peer_id| { (peer_id.to_base58(), NetworkStateNotConnectedPeer { @@ -266,25 +253,19 @@ where TMessage: CustomMessage + Send + 'static { /// Returns the list of all the peers we are connected to. #[inline] - pub fn connected_peers<'a>(&'a self) -> impl Iterator + 'a { - self.nodes_info.keys().cloned() - } - - /// Returns the `PeerId` of a node. - #[inline] - pub fn peer_id_of_node(&self, node_index: NodeIndex) -> Option<&PeerId> { - self.nodes_info.get(&node_index).map(|info| &info.peer_id) + pub fn connected_peers<'a>(&'a self) -> impl Iterator + 'a { + self.nodes_info.keys() } /// Returns the way we are connected to a node. #[inline] - pub fn node_endpoint(&self, node_index: NodeIndex) -> Option<&ConnectedPoint> { - self.nodes_info.get(&node_index).map(|info| &info.endpoint) + pub fn node_endpoint(&self, peer_id: &PeerId) -> Option<&ConnectedPoint> { + self.nodes_info.get(peer_id).map(|info| &info.endpoint) } /// Returns the client version reported by a node. - pub fn node_client_version(&self, node_index: NodeIndex) -> Option<&str> { - self.nodes_info.get(&node_index) + pub fn node_client_version(&self, peer_id: &PeerId) -> Option<&str> { + self.nodes_info.get(peer_id) .and_then(|info| info.client_version.as_ref().map(|s| &s[..])) } @@ -294,25 +275,21 @@ where TMessage: CustomMessage + Send + 'static { /// invalid. pub fn send_custom_message( &mut self, - node_index: NodeIndex, + peer_id: &PeerId, message: TMessage ) { - if let Some(peer_id) = self.nodes_info.get(&node_index).map(|info| &info.peer_id) { - self.swarm.send_custom_message(peer_id, message); - } else { - warn!(target: "sub-libp2p", "Tried to send message to unknown node: {:}", node_index); - } + self.swarm.send_custom_message(peer_id, message); } /// Disconnects a peer. /// /// This is asynchronous and will not immediately close the peer. /// Corresponding closing events will be generated once the closing actually happens. - pub fn drop_node(&mut self, node_index: NodeIndex) { - if let Some(info) = self.nodes_info.get(&node_index) { - debug!(target: "sub-libp2p", "Dropping {:?} on purpose (#{:?}, {:?}, {:?})", - info.peer_id, node_index, info.endpoint, info.client_version); - self.swarm.drop_node(&info.peer_id); + pub fn drop_node(&mut self, peer_id: &PeerId) { + if let Some(info) = self.nodes_info.get(peer_id) { + debug!(target: "sub-libp2p", "Dropping {:?} on purpose ({:?}, {:?})", + peer_id, info.endpoint, info.client_version); + self.swarm.drop_node(peer_id); } } @@ -322,73 +299,42 @@ where TMessage: CustomMessage + Send + 'static { } /// Get debug info for a given peer. - pub fn peer_debug_info(&self, who: NodeIndex) -> String { - if let Some(info) = self.nodes_info.get(&who) { - format!("{:?} (version: {:?}) through {:?}", info.peer_id, info.client_version, info.endpoint) + pub fn peer_debug_info(&self, who: &PeerId) -> String { + if let Some(info) = self.nodes_info.get(who) { + format!("{:?} (version: {:?}) through {:?}", who, info.client_version, info.endpoint) } else { "unknown".to_string() } } - /// Returns the `NodeIndex` of a peer, or assigns one if none exists. - fn index_of_peer_or_assign(&mut self, peer: PeerId, endpoint: ConnectedPoint) -> NodeIndex { - match self.index_by_id.entry(peer) { - Entry::Occupied(entry) => { - let id = *entry.get(); - self.nodes_info.insert(id, NodeInfo { - peer_id: entry.key().clone(), - endpoint, - client_version: None, - latest_ping: None, - }); - id - }, - Entry::Vacant(entry) => { - let id = self.next_node_id; - self.next_node_id += 1; - self.nodes_info.insert(id, NodeInfo { - peer_id: entry.key().clone(), - endpoint, - client_version: None, - latest_ping: None, - }); - entry.insert(id); - id - }, - } - } - /// Polls for what happened on the network. fn poll_swarm(&mut self) -> Poll>, IoError> { loop { match self.swarm.poll() { - Ok(Async::Ready(Some(BehaviourOut::CustomProtocolOpen { peer_id, version, endpoint }))) => { - let node_index = self.index_of_peer_or_assign(peer_id.clone(), endpoint); + Ok(Async::Ready(Some(BehaviourOut::CustomProtocolOpen { peer_id, version, .. }))) => { + let debug_info = self.peer_debug_info(&peer_id); break Ok(Async::Ready(Some(ServiceEvent::OpenedCustomProtocol { peer_id, - node_index, version, - debug_info: self.peer_debug_info(node_index), + debug_info, }))) } Ok(Async::Ready(Some(BehaviourOut::CustomProtocolClosed { peer_id, .. }))) => { - let node_index = *self.index_by_id.get(&peer_id).expect("index_by_id is always kept in sync with the state of the behaviour"); + let debug_info = self.peer_debug_info(&peer_id); break Ok(Async::Ready(Some(ServiceEvent::ClosedCustomProtocol { - node_index, - debug_info: self.peer_debug_info(node_index), + peer_id, + debug_info, }))) } Ok(Async::Ready(Some(BehaviourOut::CustomMessage { peer_id, message }))) => { - let node_index = *self.index_by_id.get(&peer_id).expect("index_by_id is always kept in sync with the state of the behaviour"); break Ok(Async::Ready(Some(ServiceEvent::CustomMessage { - node_index, + peer_id, message, }))) } Ok(Async::Ready(Some(BehaviourOut::Clogged { peer_id, messages }))) => { - let node_index = *self.index_by_id.get(&peer_id).expect("index_by_id is always kept in sync with the state of the behaviour"); break Ok(Async::Ready(Some(ServiceEvent::Clogged { - node_index, + peer_id, messages, }))) } @@ -396,26 +342,16 @@ where TMessage: CustomMessage + Send + 'static { // Contrary to the other events, this one can happen even on nodes which don't // have any open custom protocol slot. Therefore it is not necessarily in the // list. - if let Some(id) = self.index_by_id.get(&peer_id) { - if let Some(n) = self.nodes_info.get_mut(id) { - n.client_version = Some(info.agent_version); - } else { - error!(target: "sub-libp2p", - "State inconsistency between index_by_id and nodes_info"); - } + if let Some(n) = self.nodes_info.get_mut(&peer_id) { + n.client_version = Some(info.agent_version); } } Ok(Async::Ready(Some(BehaviourOut::PingSuccess { peer_id, ping_time }))) => { // Contrary to the other events, this one can happen even on nodes which don't // have any open custom protocol slot. Therefore it is not necessarily in the // list. - if let Some(id) = self.index_by_id.get(&peer_id) { - if let Some(n) = self.nodes_info.get_mut(id) { - n.latest_ping = Some(ping_time); - } else { - error!(target: "sub-libp2p", - "State inconsistency between index_by_id and nodes_info"); - } + if let Some(n) = self.nodes_info.get_mut(&peer_id) { + n.latest_ping = Some(ping_time); } } Ok(Async::NotReady) => break Ok(Async::NotReady), diff --git a/substrate/core/network-libp2p/tests/test.rs b/substrate/core/network-libp2p/tests/test.rs index a7761a6fca..437f651184 100644 --- a/substrate/core/network-libp2p/tests/test.rs +++ b/substrate/core/network-libp2p/tests/test.rs @@ -99,9 +99,9 @@ fn two_nodes_transfer_lots_of_packets() { let fut1 = future::poll_fn(move || -> io::Result<_> { loop { match try_ready!(service1.poll()) { - Some(ServiceEvent::OpenedCustomProtocol { node_index, .. }) => { + Some(ServiceEvent::OpenedCustomProtocol { peer_id, .. }) => { for n in 0 .. NUM_PACKETS { - service1.send_custom_message(node_index, vec![(n % 256) as u8]); + service1.send_custom_message(&peer_id, vec![(n % 256) as u8]); } }, _ => panic!(), @@ -227,9 +227,9 @@ fn basic_two_nodes_requests_in_parallel() { let fut1 = future::poll_fn(move || -> io::Result<_> { loop { match try_ready!(service1.poll()) { - Some(ServiceEvent::OpenedCustomProtocol { node_index, .. }) => { + Some(ServiceEvent::OpenedCustomProtocol { peer_id, .. }) => { for msg in to_send.drain(..) { - service1.send_custom_message(node_index, msg); + service1.send_custom_message(&peer_id, msg); } }, _ => panic!(), diff --git a/substrate/core/network/src/blocks.rs b/substrate/core/network/src/blocks.rs index d1e71ca68e..60c6886f09 100644 --- a/substrate/core/network/src/blocks.rs +++ b/substrate/core/network/src/blocks.rs @@ -20,7 +20,7 @@ use std::ops::Range; use std::collections::{HashMap, BTreeMap}; use std::collections::hash_map::Entry; use log::trace; -use network_libp2p::NodeIndex; +use network_libp2p::PeerId; use runtime_primitives::traits::{Block as BlockT, NumberFor, As}; use crate::message; @@ -32,7 +32,7 @@ pub struct BlockData { /// The Block Message from the wire pub block: message::BlockData, /// The peer, we received this from - pub origin: Option, + pub origin: Option, } #[derive(Debug)] @@ -58,7 +58,7 @@ impl BlockRangeState { pub struct BlockCollection { /// Downloaded blocks. blocks: BTreeMap, BlockRangeState>, - peer_requests: HashMap>, + peer_requests: HashMap>, } impl BlockCollection { @@ -77,7 +77,7 @@ impl BlockCollection { } /// Insert a set of blocks into collection. - pub fn insert(&mut self, start: NumberFor, blocks: Vec>, who: NodeIndex) { + pub fn insert(&mut self, start: NumberFor, blocks: Vec>, who: PeerId) { if blocks.is_empty() { return; } @@ -96,11 +96,11 @@ impl BlockCollection { } self.blocks.insert(start, BlockRangeState::Complete(blocks.into_iter() - .map(|b| BlockData { origin: Some(who), block: b }).collect())); + .map(|b| BlockData { origin: Some(who.clone()), block: b }).collect())); } /// Returns a set of block hashes that require a header download. The returned set is marked as being downloaded. - pub fn needed_blocks(&mut self, who: NodeIndex, count: usize, peer_best: NumberFor, common: NumberFor) -> Option>> { + pub fn needed_blocks(&mut self, who: PeerId, count: usize, peer_best: NumberFor, common: NumberFor) -> Option>> { // First block number that we need to download let first_different = common + As::sa(1); let count = As::sa(count as u64); @@ -166,8 +166,8 @@ impl BlockCollection { drained } - pub fn clear_peer_download(&mut self, who: NodeIndex) { - match self.peer_requests.entry(who) { + pub fn clear_peer_download(&mut self, who: &PeerId) { + match self.peer_requests.entry(who.clone()) { Entry::Occupied(entry) => { let start = entry.remove(); let remove = match self.blocks.get_mut(&start) { @@ -195,7 +195,7 @@ impl BlockCollection { #[cfg(test)] mod test { use super::{BlockCollection, BlockData, BlockRangeState}; - use crate::message; + use crate::{message, PeerId}; use runtime_primitives::testing::{Block as RawBlock, ExtrinsicWrapper}; use primitives::H256; @@ -221,7 +221,7 @@ mod test { fn create_clear() { let mut bc = BlockCollection::new(); assert!(is_empty(&bc)); - bc.insert(1, generate_blocks(100), 0); + bc.insert(1, generate_blocks(100), PeerId::random()); assert!(!is_empty(&bc)); bc.clear(); assert!(is_empty(&bc)); @@ -231,43 +231,43 @@ mod test { fn insert_blocks() { let mut bc = BlockCollection::new(); assert!(is_empty(&bc)); - let peer0 = 0; - let peer1 = 1; - let peer2 = 2; + let peer0 = PeerId::random(); + let peer1 = PeerId::random(); + let peer2 = PeerId::random(); let blocks = generate_blocks(150); - assert_eq!(bc.needed_blocks(peer0, 40, 150, 0), Some(1 .. 41)); - assert_eq!(bc.needed_blocks(peer1, 40, 150, 0), Some(41 .. 81)); - assert_eq!(bc.needed_blocks(peer2, 40, 150, 0), Some(81 .. 121)); + assert_eq!(bc.needed_blocks(peer0.clone(), 40, 150, 0), Some(1 .. 41)); + assert_eq!(bc.needed_blocks(peer1.clone(), 40, 150, 0), Some(41 .. 81)); + assert_eq!(bc.needed_blocks(peer2.clone(), 40, 150, 0), Some(81 .. 121)); - bc.clear_peer_download(peer1); - bc.insert(41, blocks[41..81].to_vec(), peer1); + bc.clear_peer_download(&peer1); + bc.insert(41, blocks[41..81].to_vec(), peer1.clone()); assert_eq!(bc.drain(1), vec![]); - assert_eq!(bc.needed_blocks(peer1, 40, 150, 0), Some(121 .. 151)); - bc.clear_peer_download(peer0); - bc.insert(1, blocks[1..11].to_vec(), peer0); + assert_eq!(bc.needed_blocks(peer1.clone(), 40, 150, 0), Some(121 .. 151)); + bc.clear_peer_download(&peer0); + bc.insert(1, blocks[1..11].to_vec(), peer0.clone()); - assert_eq!(bc.needed_blocks(peer0, 40, 150, 0), Some(11 .. 41)); - assert_eq!(bc.drain(1), blocks[1..11].iter().map(|b| BlockData { block: b.clone(), origin: Some(0) }).collect::>()); + assert_eq!(bc.needed_blocks(peer0.clone(), 40, 150, 0), Some(11 .. 41)); + assert_eq!(bc.drain(1), blocks[1..11].iter().map(|b| BlockData { block: b.clone(), origin: Some(peer0.clone()) }).collect::>()); - bc.clear_peer_download(peer0); - bc.insert(11, blocks[11..41].to_vec(), peer0); + bc.clear_peer_download(&peer0); + bc.insert(11, blocks[11..41].to_vec(), peer0.clone()); let drained = bc.drain(12); - assert_eq!(drained[..30], blocks[11..41].iter().map(|b| BlockData { block: b.clone(), origin: Some(0) }).collect::>()[..]); - assert_eq!(drained[30..], blocks[41..81].iter().map(|b| BlockData { block: b.clone(), origin: Some(1) }).collect::>()[..]); + assert_eq!(drained[..30], blocks[11..41].iter().map(|b| BlockData { block: b.clone(), origin: Some(peer0.clone()) }).collect::>()[..]); + assert_eq!(drained[30..], blocks[41..81].iter().map(|b| BlockData { block: b.clone(), origin: Some(peer1.clone()) }).collect::>()[..]); - bc.clear_peer_download(peer2); - assert_eq!(bc.needed_blocks(peer2, 40, 150, 80), Some(81 .. 121)); - bc.clear_peer_download(peer2); - bc.insert(81, blocks[81..121].to_vec(), peer2); - bc.clear_peer_download(peer1); - bc.insert(121, blocks[121..150].to_vec(), peer1); + bc.clear_peer_download(&peer2); + assert_eq!(bc.needed_blocks(peer2.clone(), 40, 150, 80), Some(81 .. 121)); + bc.clear_peer_download(&peer2); + bc.insert(81, blocks[81..121].to_vec(), peer2.clone()); + bc.clear_peer_download(&peer1); + bc.insert(121, blocks[121..150].to_vec(), peer1.clone()); assert_eq!(bc.drain(80), vec![]); let drained = bc.drain(81); - assert_eq!(drained[..40], blocks[81..121].iter().map(|b| BlockData { block: b.clone(), origin: Some(2) }).collect::>()[..]); - assert_eq!(drained[40..], blocks[121..150].iter().map(|b| BlockData { block: b.clone(), origin: Some(1) }).collect::>()[..]); + assert_eq!(drained[..40], blocks[81..121].iter().map(|b| BlockData { block: b.clone(), origin: Some(peer2.clone()) }).collect::>()[..]); + assert_eq!(drained[40..], blocks[121..150].iter().map(|b| BlockData { block: b.clone(), origin: Some(peer1.clone()) }).collect::>()[..]); } #[test] @@ -280,7 +280,8 @@ mod test { let blocks = generate_blocks(10).into_iter().map(|b| BlockData { block: b, origin: None }).collect(); bc.blocks.insert(114305, BlockRangeState::Complete(blocks)); - assert_eq!(bc.needed_blocks(0, 128, 10000, 000), Some(1 .. 100)); - assert_eq!(bc.needed_blocks(0, 128, 10000, 600), Some(100 + 128 .. 100 + 128 + 128)); + let peer0 = PeerId::random(); + assert_eq!(bc.needed_blocks(peer0.clone(), 128, 10000, 000), Some(1 .. 100)); + assert_eq!(bc.needed_blocks(peer0.clone(), 128, 10000, 600), Some(100 + 128 .. 100 + 128 + 128)); } } diff --git a/substrate/core/network/src/consensus_gossip.rs b/substrate/core/network/src/consensus_gossip.rs index 4d45f26143..3f6073f1d4 100644 --- a/substrate/core/network/src/consensus_gossip.rs +++ b/substrate/core/network/src/consensus_gossip.rs @@ -23,7 +23,7 @@ use log::{trace, debug}; use futures::sync::mpsc; use rand::{self, seq::SliceRandom}; use lru_cache::LruCache; -use network_libp2p::{Severity, NodeIndex}; +use network_libp2p::{Severity, PeerId}; use runtime_primitives::traits::{Block as BlockT, Hash, HashFor}; pub use crate::message::generic::{Message, ConsensusMessage}; use crate::protocol::Context; @@ -79,7 +79,7 @@ pub trait Validator { /// Consensus network protocol handler. Manages statements and candidate requests. pub struct ConsensusGossip { - peers: HashMap>, + peers: HashMap>, live_message_sinks: HashMap<(ConsensusEngineId, B::Hash), Vec>>>, messages: Vec>, known_messages: LruCache, @@ -109,7 +109,7 @@ impl ConsensusGossip { } /// Handle new connected peer. - pub fn new_peer(&mut self, protocol: &mut Context, who: NodeIndex, roles: Roles) { + pub fn new_peer(&mut self, protocol: &mut Context, who: PeerId, roles: Roles) { if roles.intersects(Roles::AUTHORITY) { trace!(target:"gossip", "Registering {:?} {}", roles, who); // Send out all known messages to authorities. @@ -118,7 +118,7 @@ impl ConsensusGossip { if let Status::Future = entry.status { continue } known_messages.insert(entry.message_hash); - protocol.send_message(who, Message::Consensus(entry.message.clone())); + protocol.send_message(who.clone(), Message::Consensus(entry.message.clone())); } self.peers.insert(who, PeerConsensus { known_messages, @@ -145,7 +145,7 @@ impl ConsensusGossip { let mut non_authorities: Vec<_> = self.peers.iter() .filter_map(|(id, ref peer)| if !peer.is_authority && (!peer.known_messages.contains(&message_hash) || force) { - Some(*id) + Some(id.clone()) } else { None } @@ -164,12 +164,12 @@ impl ConsensusGossip { if peer.known_messages.insert(message_hash.clone()) || force { let message = get_message(); trace!(target:"gossip", "Propagating to authority {}: {:?}", id, message); - protocol.send_message(*id, Message::Consensus(message)); + protocol.send_message(id.clone(), Message::Consensus(message)); } } else if non_authorities.contains(&id) { let message = get_message(); trace!(target:"gossip", "Propagating to {}: {:?}", id, message); - protocol.send_message(*id, Message::Consensus(message)); + protocol.send_message(id.clone(), Message::Consensus(message)); } } } @@ -194,7 +194,7 @@ impl ConsensusGossip { } /// Call when a peer has been disconnected to stop tracking gossip status. - pub fn peer_disconnected(&mut self, _protocol: &mut Context, who: NodeIndex) { + pub fn peer_disconnected(&mut self, _protocol: &mut Context, who: PeerId) { self.peers.remove(&who); } @@ -291,7 +291,7 @@ impl ConsensusGossip { pub fn on_incoming( &mut self, protocol: &mut Context, - who: NodeIndex, + who: PeerId, message: ConsensusMessage, ) -> Option<(B::Hash, ConsensusMessage)> { let message_hash = HashFor::::hash(&message.data[..]); @@ -325,7 +325,7 @@ impl ConsensusGossip { }, None => { protocol.report_peer( - who, + who.clone(), Severity::Useless(format!("Sent unknown consensus engine id")), ); trace!(target:"gossip", "Unknown message engine id {:?} from {}", diff --git a/substrate/core/network/src/lib.rs b/substrate/core/network/src/lib.rs index 296eaeb00c..0f184af773 100644 --- a/substrate/core/network/src/lib.rs +++ b/substrate/core/network/src/lib.rs @@ -43,7 +43,7 @@ pub use protocol::{ProtocolStatus, PeerInfo, Context}; pub use sync::{Status as SyncStatus, SyncState}; pub use network_libp2p::{ identity, multiaddr, - NodeIndex, ProtocolId, Severity, Multiaddr, + ProtocolId, Severity, Multiaddr, NetworkState, NetworkStatePeer, NetworkStateNotConnectedPeer, NetworkStatePeerEndpoint, NodeKeyConfig, Secret, Secp256k1Secret, Ed25519Secret, build_multiaddr, PeerId, PublicKey diff --git a/substrate/core/network/src/on_demand.rs b/substrate/core/network/src/on_demand.rs index f5505cd3aa..6a89014465 100644 --- a/substrate/core/network/src/on_demand.rs +++ b/substrate/core/network/src/on_demand.rs @@ -29,7 +29,7 @@ use client::{error::{Error as ClientError, ErrorKind as ClientErrorKind}}; use client::light::fetcher::{Fetcher, FetchChecker, RemoteHeaderRequest, RemoteCallRequest, RemoteReadRequest, RemoteChangesRequest, ChangesProof}; use crate::message; -use network_libp2p::{Severity, NodeIndex}; +use network_libp2p::{Severity, PeerId}; use crate::config::Roles; use crate::service::{NetworkChan, NetworkMsg}; use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, NumberFor}; @@ -42,13 +42,13 @@ const RETRY_COUNT: usize = 1; /// On-demand service API. pub trait OnDemandService: Send + Sync { /// When new node is connected. - fn on_connect(&self, peer: NodeIndex, role: Roles, best_number: NumberFor); + fn on_connect(&self, peer: PeerId, role: Roles, best_number: NumberFor); /// When block is announced by the peer. - fn on_block_announce(&self, peer: NodeIndex, best_number: NumberFor); + fn on_block_announce(&self, peer: PeerId, best_number: NumberFor); /// When node is disconnected. - fn on_disconnect(&self, peer: NodeIndex); + fn on_disconnect(&self, peer: PeerId); /// Maintain peers requests. fn maintain_peers(&self); @@ -56,20 +56,20 @@ pub trait OnDemandService: Send + Sync { /// When header response is received from remote node. fn on_remote_header_response( &self, - peer: NodeIndex, + peer: PeerId, response: message::RemoteHeaderResponse ); /// When read response is received from remote node. - fn on_remote_read_response(&self, peer: NodeIndex, response: message::RemoteReadResponse); + fn on_remote_read_response(&self, peer: PeerId, response: message::RemoteReadResponse); /// When call response is received from remote node. - fn on_remote_call_response(&self, peer: NodeIndex, response: message::RemoteCallResponse); + fn on_remote_call_response(&self, peer: PeerId, response: message::RemoteCallResponse); /// When changes response is received from remote node. fn on_remote_changes_response( &self, - peer: NodeIndex, + peer: PeerId, response: message::RemoteChangesResponse, Block::Hash> ); } @@ -90,9 +90,9 @@ pub struct RemoteResponse { struct OnDemandCore { next_request_id: u64, pending_requests: VecDeque>, - active_peers: LinkedHashMap>, - idle_peers: VecDeque, - best_blocks: HashMap>, + active_peers: LinkedHashMap>, + idle_peers: VecDeque, + best_blocks: HashMap>, } struct Request { @@ -170,13 +170,13 @@ impl OnDemand where } /// Try to accept response from given peer. - fn accept_response) -> Accept>(&self, rtype: &str, peer: NodeIndex, request_id: u64, try_accept: F) { + fn accept_response) -> Accept>(&self, rtype: &str, peer: PeerId, request_id: u64, try_accept: F) { let mut core = self.core.lock(); - let request = match core.remove(peer, request_id) { + let request = match core.remove(peer.clone(), request_id) { Some(request) => request, None => { let reason = format!("Invalid remote {} response from peer", rtype); - self.send(NetworkMsg::ReportPeer(peer, Severity::Bad(reason))); + self.send(NetworkMsg::ReportPeer(peer.clone(), Severity::Bad(reason))); core.remove_peer(peer); return; }, @@ -187,7 +187,7 @@ impl OnDemand where Accept::Ok => (retry_count, None), Accept::CheckFailed(error, retry_request_data) => { let reason = format!("Failed to check remote {} response from peer: {}", rtype, error); - self.send(NetworkMsg::ReportPeer(peer, Severity::Bad(reason))); + self.send(NetworkMsg::ReportPeer(peer.clone(), Severity::Bad(reason))); core.remove_peer(peer); if retry_count > 0 { @@ -200,7 +200,7 @@ impl OnDemand where }, Accept::Unexpected(retry_request_data) => { let reason = format!("Unexpected response to remote {} from peer", rtype); - self.send(NetworkMsg::ReportPeer(peer, Severity::Bad(reason))); + self.send(NetworkMsg::ReportPeer(peer.clone(), Severity::Bad(reason))); core.remove_peer(peer); (retry_count, Some(retry_request_data)) @@ -219,7 +219,7 @@ impl OnDemandService for OnDemand where B: BlockT, B::Header: HeaderT, { - fn on_connect(&self, peer: NodeIndex, role: Roles, best_number: NumberFor) { + fn on_connect(&self, peer: PeerId, role: Roles, best_number: NumberFor) { if !role.intersects(Roles::FULL | Roles::AUTHORITY) { return; } @@ -229,13 +229,13 @@ impl OnDemandService for OnDemand where core.dispatch(self); } - fn on_block_announce(&self, peer: NodeIndex, best_number: NumberFor) { + fn on_block_announce(&self, peer: PeerId, best_number: NumberFor) { let mut core = self.core.lock(); core.update_peer(peer, best_number); core.dispatch(self); } - fn on_disconnect(&self, peer: NodeIndex) { + fn on_disconnect(&self, peer: PeerId) { let mut core = self.core.lock(); core.remove_peer(peer); core.dispatch(self); @@ -249,7 +249,7 @@ impl OnDemandService for OnDemand where core.dispatch(self); } - fn on_remote_header_response(&self, peer: NodeIndex, response: message::RemoteHeaderResponse) { + fn on_remote_header_response(&self, peer: PeerId, response: message::RemoteHeaderResponse) { self.accept_response("header", peer, response.id, |request| match request.data { RequestData::RemoteHeader(request, sender) => match self.checker.check_header_proof(&request, response.header, response.proof) { Ok(response) => { @@ -263,7 +263,7 @@ impl OnDemandService for OnDemand where }) } - fn on_remote_read_response(&self, peer: NodeIndex, response: message::RemoteReadResponse) { + fn on_remote_read_response(&self, peer: PeerId, response: message::RemoteReadResponse) { self.accept_response("read", peer, response.id, |request| match request.data { RequestData::RemoteRead(request, sender) => match self.checker.check_read_proof(&request, response.proof) { Ok(response) => { @@ -277,7 +277,7 @@ impl OnDemandService for OnDemand where }) } - fn on_remote_call_response(&self, peer: NodeIndex, response: message::RemoteCallResponse) { + fn on_remote_call_response(&self, peer: PeerId, response: message::RemoteCallResponse) { self.accept_response("call", peer, response.id, |request| match request.data { RequestData::RemoteCall(request, sender) => match self.checker.check_execution_proof(&request, response.proof) { Ok(response) => { @@ -291,7 +291,7 @@ impl OnDemandService for OnDemand where }) } - fn on_remote_changes_response(&self, peer: NodeIndex, response: message::RemoteChangesResponse, B::Hash>) { + fn on_remote_changes_response(&self, peer: PeerId, response: message::RemoteChangesResponse, B::Hash>) { self.accept_response("changes", peer, response.id, |request| match request.data { RequestData::RemoteChanges(request, sender) => match self.checker.check_changes_proof( &request, ChangesProof { @@ -350,16 +350,16 @@ impl OnDemandCore where B: BlockT, B::Header: HeaderT, { - pub fn add_peer(&mut self, peer: NodeIndex, best_number: NumberFor) { - self.idle_peers.push_back(peer); + pub fn add_peer(&mut self, peer: PeerId, best_number: NumberFor) { + self.idle_peers.push_back(peer.clone()); self.best_blocks.insert(peer, best_number); } - pub fn update_peer(&mut self, peer: NodeIndex, best_number: NumberFor) { + pub fn update_peer(&mut self, peer: PeerId, best_number: NumberFor) { self.best_blocks.insert(peer, best_number); } - pub fn remove_peer(&mut self, peer: NodeIndex) { + pub fn remove_peer(&mut self, peer: PeerId) { self.best_blocks.remove(&peer); if let Some(request) = self.active_peers.remove(&peer) { @@ -372,7 +372,7 @@ impl OnDemandCore where } } - pub fn maintain_peers(&mut self) -> Vec { + pub fn maintain_peers(&mut self) -> Vec { let now = Instant::now(); let mut bad_peers = Vec::new(); loop { @@ -399,8 +399,8 @@ impl OnDemandCore where }); } - pub fn remove(&mut self, peer: NodeIndex, id: u64) -> Option> { - match self.active_peers.entry(peer) { + pub fn remove(&mut self, peer: PeerId, id: u64) -> Option> { + match self.active_peers.entry(peer.clone()) { Entry::Occupied(entry) => match entry.get().id == id { true => { self.idle_peers.push_back(peer); @@ -441,7 +441,7 @@ impl OnDemandCore where if !can_be_processed_by_peer { // return peer to the back of the queue - self.idle_peers.push_back(peer); + self.idle_peers.push_back(peer.clone()); // we have enumerated all peers and noone can handle request if Some(peer) == last_peer { @@ -458,7 +458,7 @@ impl OnDemandCore where let mut request = self.pending_requests.pop_front().expect("checked in loop condition; qed"); request.timestamp = Instant::now(); trace!(target: "sync", "Dispatching remote request {} to peer {}", request.id, peer); - on_demand.send(NetworkMsg::Outgoing(peer, request.message())); + on_demand.send(NetworkMsg::Outgoing(peer.clone(), request.message())); self.active_peers.insert(peer, request); } @@ -532,7 +532,7 @@ pub mod tests { RemoteCallRequest, RemoteReadRequest, RemoteChangesRequest, ChangesProof}; use crate::config::Roles; use crate::message; - use network_libp2p::{NodeIndex, Severity}; + use network_libp2p::{PeerId, Severity}; use crate::service::{network_channel, NetworkPort, NetworkMsg}; use super::{REQUEST_TIMEOUT, OnDemand, OnDemandService}; use test_client::runtime::{changes_trie_config, Block, Header}; @@ -586,7 +586,7 @@ pub mod tests { core.idle_peers.len() + core.active_peers.len() } - fn receive_call_response(on_demand: &OnDemand, peer: NodeIndex, id: message::RequestId) { + fn receive_call_response(on_demand: &OnDemand, peer: PeerId, id: message::RequestId) { on_demand.on_remote_call_response(peer, message::RemoteCallResponse { id: id, proof: vec![vec![2]], @@ -621,22 +621,27 @@ pub mod tests { #[test] fn knows_about_peers_roles() { let (_, on_demand) = dummy(true); - on_demand.on_connect(0, Roles::LIGHT, 1000); - on_demand.on_connect(1, Roles::FULL, 2000); - on_demand.on_connect(2, Roles::AUTHORITY, 3000); - assert_eq!(vec![1, 2], on_demand.core.lock().idle_peers.iter().cloned().collect::>()); - assert_eq!(on_demand.core.lock().best_blocks.get(&1), Some(&2000)); - assert_eq!(on_demand.core.lock().best_blocks.get(&2), Some(&3000)); + let peer0 = PeerId::random(); + let peer1 = PeerId::random(); + let peer2 = PeerId::random(); + on_demand.on_connect(peer0, Roles::LIGHT, 1000); + on_demand.on_connect(peer1.clone(), Roles::FULL, 2000); + on_demand.on_connect(peer2.clone(), Roles::AUTHORITY, 3000); + assert_eq!(vec![peer1.clone(), peer2.clone()], on_demand.core.lock().idle_peers.iter().cloned().collect::>()); + assert_eq!(on_demand.core.lock().best_blocks.get(&peer1), Some(&2000)); + assert_eq!(on_demand.core.lock().best_blocks.get(&peer2), Some(&3000)); } #[test] fn disconnects_from_idle_peer() { + let peer0 = PeerId::random(); + let (_, on_demand) = dummy(true); - on_demand.on_connect(0, Roles::FULL, 100); + on_demand.on_connect(peer0.clone(), Roles::FULL, 100); assert_eq!(1, total_peers(&*on_demand)); assert!(!on_demand.core.lock().best_blocks.is_empty()); - on_demand.on_disconnect(0); + on_demand.on_disconnect(peer0); assert_eq!(0, total_peers(&*on_demand)); assert!(on_demand.core.lock().best_blocks.is_empty()); } @@ -645,10 +650,12 @@ pub mod tests { fn disconnects_from_timeouted_peer() { let (_x, on_demand) = dummy(true); let (network_sender, network_port) = network_channel(); + let peer0 = PeerId::random(); + let peer1 = PeerId::random(); on_demand.set_network_sender(network_sender.clone()); - on_demand.on_connect(0, Roles::FULL, 1000); - on_demand.on_connect(1, Roles::FULL, 1000); - assert_eq!(vec![0, 1], on_demand.core.lock().idle_peers.iter().cloned().collect::>()); + on_demand.on_connect(peer0.clone(), Roles::FULL, 1000); + on_demand.on_connect(peer1.clone(), Roles::FULL, 1000); + assert_eq!(vec![peer0.clone(), peer1.clone()], on_demand.core.lock().idle_peers.iter().cloned().collect::>()); assert!(on_demand.core.lock().active_peers.is_empty()); on_demand.remote_call(RemoteCallRequest { @@ -658,22 +665,23 @@ pub mod tests { call_data: vec![], retry_count: None, }); - assert_eq!(vec![1], on_demand.core.lock().idle_peers.iter().cloned().collect::>()); - assert_eq!(vec![0], on_demand.core.lock().active_peers.keys().cloned().collect::>()); + assert_eq!(vec![peer1.clone()], on_demand.core.lock().idle_peers.iter().cloned().collect::>()); + assert_eq!(vec![peer0.clone()], on_demand.core.lock().active_peers.keys().cloned().collect::>()); - on_demand.core.lock().active_peers[&0].timestamp = Instant::now() - REQUEST_TIMEOUT - REQUEST_TIMEOUT; + on_demand.core.lock().active_peers[&peer0].timestamp = Instant::now() - REQUEST_TIMEOUT - REQUEST_TIMEOUT; on_demand.maintain_peers(); assert!(on_demand.core.lock().idle_peers.is_empty()); - assert_eq!(vec![1], on_demand.core.lock().active_peers.keys().cloned().collect::>()); + assert_eq!(vec![peer1.clone()], on_demand.core.lock().active_peers.keys().cloned().collect::>()); assert_disconnected_peer(network_port, Severity::Timeout); } #[test] fn disconnects_from_peer_on_response_with_wrong_id() { let (_x, on_demand) = dummy(true); + let peer0 = PeerId::random(); let (network_sender, network_port) = network_channel(); on_demand.set_network_sender(network_sender.clone()); - on_demand.on_connect(0, Roles::FULL, 1000); + on_demand.on_connect(peer0.clone(), Roles::FULL, 1000); on_demand.remote_call(RemoteCallRequest { block: Default::default(), @@ -682,7 +690,7 @@ pub mod tests { call_data: vec![], retry_count: None, }); - receive_call_response(&*on_demand, 0, 1); + receive_call_response(&*on_demand, peer0, 1); assert_disconnected_peer(network_port, Severity::Bad("Invalid remote call response from peer".to_string())); assert_eq!(on_demand.core.lock().pending_requests.len(), 1); } @@ -691,6 +699,7 @@ pub mod tests { fn disconnects_from_peer_on_incorrect_response() { let (_x, on_demand) = dummy(false); let (network_sender, network_port) = network_channel(); + let peer0 = PeerId::random(); on_demand.set_network_sender(network_sender.clone()); on_demand.remote_call(RemoteCallRequest { block: Default::default(), @@ -700,8 +709,8 @@ pub mod tests { retry_count: Some(1), }); - on_demand.on_connect(0, Roles::FULL, 1000); - receive_call_response(&*on_demand, 0, 0); + on_demand.on_connect(peer0.clone(), Roles::FULL, 1000); + receive_call_response(&*on_demand, peer0.clone(), 0); assert_disconnected_peer(network_port, Severity::Bad("Failed to check remote call response from peer: Backend error: Test error".to_string())); assert_eq!(on_demand.core.lock().pending_requests.len(), 1); } @@ -710,19 +719,21 @@ pub mod tests { fn disconnects_from_peer_on_unexpected_response() { let (_x, on_demand) = dummy(true); let (network_sender, network_port) = network_channel(); + let peer0 = PeerId::random(); on_demand.set_network_sender(network_sender.clone()); - on_demand.on_connect(0, Roles::FULL, 1000); + on_demand.on_connect(peer0.clone(), Roles::FULL, 1000); - receive_call_response(&*on_demand, 0, 0); + receive_call_response(&*on_demand, peer0, 0); assert_disconnected_peer(network_port, Severity::Bad("Invalid remote call response from peer".to_string())); } #[test] fn disconnects_from_peer_on_wrong_response_type() { let (_x, on_demand) = dummy(false); + let peer0 = PeerId::random(); let (network_sender, network_port) = network_channel(); on_demand.set_network_sender(network_sender.clone()); - on_demand.on_connect(0, Roles::FULL, 1000); + on_demand.on_connect(peer0.clone(), Roles::FULL, 1000); on_demand.remote_call(RemoteCallRequest { block: Default::default(), @@ -732,7 +743,7 @@ pub mod tests { retry_count: Some(1), }); - on_demand.on_remote_read_response(0, message::RemoteReadResponse { + on_demand.on_remote_read_response(peer0.clone(), message::RemoteReadResponse { id: 0, proof: vec![vec![2]], }); @@ -745,11 +756,12 @@ pub mod tests { use parking_lot::{Condvar, Mutex}; let retry_count = 2; + let peer_ids = (0 .. retry_count + 1).map(|_| PeerId::random()).collect::>(); let (_x, on_demand) = dummy(false); let (network_sender, _network_port) = network_channel(); on_demand.set_network_sender(network_sender.clone()); for i in 0..retry_count+1 { - on_demand.on_connect(i, Roles::FULL, 1000); + on_demand.on_connect(peer_ids[i].clone(), Roles::FULL, 1000); } let sync = Arc::new((Mutex::new(0), Mutex::new(0), Condvar::new())); @@ -773,7 +785,7 @@ pub mod tests { for i in 0..retry_count+1 { let mut current = current.lock(); *current = *current + 1; - receive_call_response(&*on_demand, i, i as u64); + receive_call_response(&*on_demand, peer_ids[i].clone(), i as u64); } let mut finished_at = finished_at.lock(); @@ -787,8 +799,9 @@ pub mod tests { fn receives_remote_call_response() { let (_x, on_demand) = dummy(true); let (network_sender, _network_port) = network_channel(); + let peer0 = PeerId::random(); on_demand.set_network_sender(network_sender.clone()); - on_demand.on_connect(0, Roles::FULL, 1000); + on_demand.on_connect(peer0.clone(), Roles::FULL, 1000); let response = on_demand.remote_call(RemoteCallRequest { block: Default::default(), @@ -802,7 +815,7 @@ pub mod tests { assert_eq!(result, vec![42]); }); - receive_call_response(&*on_demand, 0, 0); + receive_call_response(&*on_demand, peer0.clone(), 0); thread.join().unwrap(); } @@ -810,8 +823,9 @@ pub mod tests { fn receives_remote_read_response() { let (_x, on_demand) = dummy(true); let (network_sender, _network_port) = network_channel(); + let peer0 = PeerId::random(); on_demand.set_network_sender(network_sender.clone()); - on_demand.on_connect(0, Roles::FULL, 1000); + on_demand.on_connect(peer0.clone(), Roles::FULL, 1000); let response = on_demand.remote_read(RemoteReadRequest { header: dummy_header(), @@ -824,7 +838,7 @@ pub mod tests { assert_eq!(result, Some(vec![42])); }); - on_demand.on_remote_read_response(0, message::RemoteReadResponse { + on_demand.on_remote_read_response(peer0.clone(), message::RemoteReadResponse { id: 0, proof: vec![vec![2]], }); @@ -835,8 +849,9 @@ pub mod tests { fn receives_remote_header_response() { let (_x, on_demand) = dummy(true); let (network_sender, _network_port) = network_channel(); + let peer0 = PeerId::random(); on_demand.set_network_sender(network_sender.clone()); - on_demand.on_connect(0, Roles::FULL, 1000); + on_demand.on_connect(peer0.clone(), Roles::FULL, 1000); let response = on_demand.remote_header(RemoteHeaderRequest { cht_root: Default::default(), @@ -852,7 +867,7 @@ pub mod tests { ); }); - on_demand.on_remote_header_response(0, message::RemoteHeaderResponse { + on_demand.on_remote_header_response(peer0.clone(), message::RemoteHeaderResponse { id: 0, header: Some(Header { parent_hash: Default::default(), @@ -870,8 +885,9 @@ pub mod tests { fn receives_remote_changes_response() { let (_x, on_demand) = dummy(true); let (network_sender, _network_port) = network_channel(); + let peer0 = PeerId::random(); on_demand.set_network_sender(network_sender.clone()); - on_demand.on_connect(0, Roles::FULL, 1000); + on_demand.on_connect(peer0.clone(), Roles::FULL, 1000); let response = on_demand.remote_changes(RemoteChangesRequest { changes_trie_config: changes_trie_config(), @@ -887,7 +903,7 @@ pub mod tests { assert_eq!(result, vec![(100, 2)]); }); - on_demand.on_remote_changes_response(0, message::RemoteChangesResponse { + on_demand.on_remote_changes_response(peer0.clone(), message::RemoteChangesResponse { id: 0, max: 1000, proof: vec![vec![2]], @@ -901,9 +917,11 @@ pub mod tests { fn does_not_sends_request_to_peer_who_has_no_required_block() { let (_x, on_demand) = dummy(true); let (network_sender, _network_port) = network_channel(); + let peer1 = PeerId::random(); + let peer2 = PeerId::random(); on_demand.set_network_sender(network_sender.clone()); - on_demand.on_connect(1, Roles::FULL, 100); + on_demand.on_connect(peer1.clone(), Roles::FULL, 100); on_demand.remote_header(RemoteHeaderRequest { cht_root: Default::default(), @@ -921,22 +939,22 @@ pub mod tests { retry_count: None, }); - on_demand.on_connect(2, Roles::FULL, 150); + on_demand.on_connect(peer2.clone(), Roles::FULL, 150); - assert_eq!(vec![1, 2], on_demand.core.lock().idle_peers.iter().cloned().collect::>()); + assert_eq!(vec![peer1.clone(), peer2.clone()], on_demand.core.lock().idle_peers.iter().cloned().collect::>()); assert_eq!(on_demand.core.lock().pending_requests.len(), 3); - on_demand.on_block_announce(1, 250); + on_demand.on_block_announce(peer1.clone(), 250); - assert_eq!(vec![2], on_demand.core.lock().idle_peers.iter().cloned().collect::>()); + assert_eq!(vec![peer2.clone()], on_demand.core.lock().idle_peers.iter().cloned().collect::>()); assert_eq!(on_demand.core.lock().pending_requests.len(), 2); - on_demand.on_block_announce(2, 250); + on_demand.on_block_announce(peer2.clone(), 250); assert!(!on_demand.core.lock().idle_peers.iter().any(|_| true)); assert_eq!(on_demand.core.lock().pending_requests.len(), 1); - on_demand.on_remote_header_response(1, message::RemoteHeaderResponse { + on_demand.on_remote_header_response(peer1.clone(), message::RemoteHeaderResponse { id: 0, header: Some(dummy_header()), proof: vec![], @@ -953,6 +971,9 @@ pub mod tests { // last peer was not updated let (_x, on_demand) = dummy(true); let (network_sender, _network_port) = network_channel(); + let peer1 = PeerId::random(); + let peer2 = PeerId::random(); + let peer3 = PeerId::random(); on_demand.set_network_sender(network_sender.clone()); on_demand.remote_header(RemoteHeaderRequest { @@ -966,11 +987,11 @@ pub mod tests { retry_count: None, }); - on_demand.on_connect(1, Roles::FULL, 200); - on_demand.on_connect(2, Roles::FULL, 200); - on_demand.on_connect(3, Roles::FULL, 250); + on_demand.on_connect(peer1.clone(), Roles::FULL, 200); + on_demand.on_connect(peer2.clone(), Roles::FULL, 200); + on_demand.on_connect(peer3.clone(), Roles::FULL, 250); - assert_eq!(vec![1, 2], on_demand.core.lock().idle_peers.iter().cloned().collect::>()); + assert_eq!(vec![peer1.clone(), peer2.clone()], on_demand.core.lock().idle_peers.iter().cloned().collect::>()); assert_eq!(on_demand.core.lock().pending_requests.len(), 1); } @@ -978,6 +999,7 @@ pub mod tests { fn tries_to_send_all_pending_requests() { let (_x, on_demand) = dummy(true); let (network_sender, _network_port) = network_channel(); + let peer1 = PeerId::random(); on_demand.set_network_sender(network_sender.clone()); on_demand.remote_header(RemoteHeaderRequest { @@ -991,7 +1013,7 @@ pub mod tests { retry_count: None, }); - on_demand.on_connect(1, Roles::FULL, 250); + on_demand.on_connect(peer1.clone(), Roles::FULL, 250); assert!(on_demand.core.lock().idle_peers.iter().cloned().collect::>().is_empty()); assert_eq!(on_demand.core.lock().pending_requests.len(), 1); diff --git a/substrate/core/network/src/protocol.rs b/substrate/core/network/src/protocol.rs index d33e0d0af1..27e615e56f 100644 --- a/substrate/core/network/src/protocol.rs +++ b/substrate/core/network/src/protocol.rs @@ -17,7 +17,7 @@ use crossbeam_channel::{self as channel, Receiver, Sender, select}; use futures::sync::mpsc; use parking_lot::Mutex; -use network_libp2p::{NodeIndex, PeerId, Severity}; +use network_libp2p::{PeerId, Severity}; use primitives::storage::StorageKey; use runtime_primitives::generic::BlockId; use runtime_primitives::traits::{As, Block as BlockT, Header as HeaderT, NumberFor, Zero}; @@ -75,10 +75,10 @@ pub struct Protocol, H: ExHashT> { consensus_gossip: ConsensusGossip, context_data: ContextData, // Connected peers pending Status message. - handshaking_peers: HashMap, + handshaking_peers: HashMap, // Connected peers from whom we received a Status message, // similar to context_data.peers but shared with the SyncProvider. - connected_peers: Arc>>>, + connected_peers: Arc>>>, transaction_pool: Arc>, } @@ -92,7 +92,6 @@ pub struct ConnectedPeer { /// and from whom we have not yet received a Status message. struct HandshakingPeer { timestamp: time::Instant, - peer_id: PeerId, } /// Syncing status and statistics @@ -125,8 +124,6 @@ struct Peer { /// Info about a peer's known state. #[derive(Clone, Debug)] pub struct PeerInfo { - /// Network id. - pub peer_id: PeerId, /// Roles pub roles: Roles, /// Protocol version @@ -143,13 +140,13 @@ pub trait Context { fn client(&self) -> &crate::chain::Client; /// Point out that a peer has been malign or irresponsible or appeared lazy. - fn report_peer(&mut self, who: NodeIndex, reason: Severity); + fn report_peer(&mut self, who: PeerId, reason: Severity); /// Get peer info. - fn peer_info(&self, peer: NodeIndex) -> Option>; + fn peer_info(&self, peer: &PeerId) -> Option>; /// Send a message to a peer. - fn send_message(&mut self, who: NodeIndex, data: crate::message::Message); + fn send_message(&mut self, who: PeerId, data: crate::message::Message); } /// Protocol context. @@ -165,16 +162,16 @@ impl<'a, B: BlockT + 'a, H: 'a + ExHashT> ProtocolContext<'a, B, H> { } impl<'a, B: BlockT + 'a, H: ExHashT + 'a> Context for ProtocolContext<'a, B, H> { - fn send_message(&mut self, who: NodeIndex, message: Message) { + fn send_message(&mut self, who: PeerId, message: Message) { send_message(&mut self.context_data.peers, &self.network_chan, who, message) } - fn report_peer(&mut self, who: NodeIndex, reason: Severity) { + fn report_peer(&mut self, who: PeerId, reason: Severity) { self.network_chan.send(NetworkMsg::ReportPeer(who, reason)) } - fn peer_info(&self, who: NodeIndex) -> Option> { - self.context_data.peers.get(&who).map(|p| p.info.clone()) + fn peer_info(&self, who: &PeerId) -> Option> { + self.context_data.peers.get(who).map(|p| p.info.clone()) } fn client(&self) -> &Client { @@ -185,7 +182,7 @@ impl<'a, B: BlockT + 'a, H: ExHashT + 'a> Context for ProtocolContext<'a, B, /// Data necessary to create a context. struct ContextData { // All connected peers - peers: HashMap>, + peers: HashMap>, pub chain: Arc>, } @@ -254,13 +251,13 @@ pub enum ProtocolMsg> { /// Messages sent to Protocol from Network-libp2p. pub enum FromNetworkMsg { /// A peer connected, with debug info. - PeerConnected(PeerId, NodeIndex, String), + PeerConnected(PeerId, String), /// A peer disconnected, with debug info. - PeerDisconnected(NodeIndex, String), + PeerDisconnected(PeerId, String), /// A custom message from another peer. - CustomMessage(NodeIndex, Message), + CustomMessage(PeerId, Message), /// Let protocol know a peer is currenlty clogged. - PeerClogged(NodeIndex, Option>), + PeerClogged(PeerId, Option>), } enum Incoming> { @@ -274,7 +271,7 @@ impl, H: ExHashT> Protocol { status_sinks: Arc>>>>, is_offline: Arc, is_major_syncing: Arc, - connected_peers: Arc>>>, + connected_peers: Arc>>>, network_chan: NetworkChan, config: ProtocolConfig, chain: Arc>, @@ -418,7 +415,7 @@ impl, H: ExHashT> Protocol { fn handle_network_msg(&mut self, msg: FromNetworkMsg) -> bool { match msg { FromNetworkMsg::PeerDisconnected(who, debug_info) => self.on_peer_disconnected(who, debug_info), - FromNetworkMsg::PeerConnected(peer_id, who, debug_info) => self.on_peer_connected(peer_id, who, debug_info), + FromNetworkMsg::PeerConnected(who, debug_info) => self.on_peer_connected(who, debug_info), FromNetworkMsg::PeerClogged(who, message) => self.on_clogged_peer(who, message), FromNetworkMsg::CustomMessage(who, message) => { self.on_custom_message(who, message) @@ -427,7 +424,7 @@ impl, H: ExHashT> Protocol { true } - fn handle_response(&mut self, who: NodeIndex, response: &message::BlockResponse) -> Option> { + fn handle_response(&mut self, who: PeerId, response: &message::BlockResponse) -> Option> { if let Some(ref mut peer) = self.context_data.peers.get_mut(&who) { if let Some(_) = peer.obsolete_requests.remove(&response.id) { trace!(target: "sync", "Ignoring obsolete block response packet from {} ({})", who, response.id,); @@ -445,14 +442,14 @@ impl, H: ExHashT> Protocol { None } - fn update_peer_info(&mut self, who: NodeIndex) { + fn update_peer_info(&mut self, who: &PeerId) { if let Some(info) = self.sync.peer_info(who) { - if let Some(ref mut peer) = self.context_data.peers.get_mut(&who) { + if let Some(ref mut peer) = self.context_data.peers.get_mut(who) { peer.info.best_hash = info.best_hash; peer.info.best_number = info.best_number; } let mut peers = self.connected_peers.write(); - if let Some(ref mut peer) = peers.get_mut(&who) { + if let Some(ref mut peer) = peers.get_mut(who) { peer.peer_info.best_hash = info.best_hash; peer.peer_info.best_number = info.best_number; } @@ -474,19 +471,19 @@ impl, H: ExHashT> Protocol { self.status_sinks.lock().retain(|sink| sink.unbounded_send(status.clone()).is_ok()); } - fn on_custom_message(&mut self, who: NodeIndex, message: Message) { + fn on_custom_message(&mut self, who: PeerId, message: Message) { match message { GenericMessage::Status(s) => self.on_status_message(who, s), GenericMessage::BlockRequest(r) => self.on_block_request(who, r), GenericMessage::BlockResponse(r) => { - if let Some(request) = self.handle_response(who, &r) { - self.on_block_response(who, request, r); - self.update_peer_info(who); + if let Some(request) = self.handle_response(who.clone(), &r) { + self.on_block_response(who.clone(), request, r); + self.update_peer_info(&who); } }, GenericMessage::BlockAnnounce(announce) => { - self.on_block_announce(who, announce); - self.update_peer_info(who); + self.on_block_announce(who.clone(), announce); + self.update_peer_info(&who); }, GenericMessage::Transactions(m) => self.on_extrinsics(who, m), GenericMessage::RemoteCallRequest(request) => self.on_remote_call_request(who, request), @@ -512,7 +509,7 @@ impl, H: ExHashT> Protocol { } } - fn send_message(&mut self, who: NodeIndex, message: Message) { + fn send_message(&mut self, who: PeerId, message: Message) { send_message::( &mut self.context_data.peers, &self.network_chan, @@ -537,14 +534,14 @@ impl, H: ExHashT> Protocol { } /// Called when a new peer is connected - fn on_peer_connected(&mut self, peer_id: PeerId, who: NodeIndex, debug_info: String) { + fn on_peer_connected(&mut self, who: PeerId, debug_info: String) { trace!(target: "sync", "Connecting {}: {}", who, debug_info); - self.handshaking_peers.insert(who, HandshakingPeer { timestamp: time::Instant::now(), peer_id }); + self.handshaking_peers.insert(who.clone(), HandshakingPeer { timestamp: time::Instant::now() }); self.send_status(who); } /// Called by peer when it is disconnecting - fn on_peer_disconnected(&mut self, peer: NodeIndex, debug_info: String) { + fn on_peer_disconnected(&mut self, peer: PeerId, debug_info: String) { trace!(target: "sync", "Disconnecting {}: {}", peer, debug_info); // lock all the the peer lists so that add/remove peer events are in order let removed = { @@ -554,16 +551,16 @@ impl, H: ExHashT> Protocol { }; if removed { let mut context = ProtocolContext::new(&mut self.context_data, &self.network_chan); - self.consensus_gossip.peer_disconnected(&mut context, peer); - self.sync.peer_disconnected(&mut context, peer); - self.specialization.on_disconnect(&mut context, peer); + self.consensus_gossip.peer_disconnected(&mut context, peer.clone()); + self.sync.peer_disconnected(&mut context, peer.clone()); + self.specialization.on_disconnect(&mut context, peer.clone()); self.on_demand.as_ref().map(|s| s.on_disconnect(peer)); } } /// Called as a back-pressure mechanism if the networking detects that the peer cannot process /// our messaging rate fast enough. - pub fn on_clogged_peer(&self, who: NodeIndex, _msg: Option>) { + pub fn on_clogged_peer(&self, who: PeerId, _msg: Option>) { // We don't do anything but print some diagnostics for now. if let Some(peer) = self.context_data.peers.get(&who) { debug!(target: "sync", "Clogged peer {} (protocol_version: {:?}; roles: {:?}; \ @@ -575,7 +572,7 @@ impl, H: ExHashT> Protocol { } } - fn on_block_request(&mut self, peer: NodeIndex, request: message::BlockRequest) { + fn on_block_request(&mut self, peer: PeerId, request: message::BlockRequest) { trace!(target: "sync", "BlockRequest {} from {}: from {:?} to {:?} max {:?}", request.id, peer, @@ -641,7 +638,7 @@ impl, H: ExHashT> Protocol { fn on_block_response( &mut self, - peer: NodeIndex, + peer: PeerId, request: message::BlockRequest, response: message::BlockResponse, ) { @@ -687,15 +684,15 @@ impl, H: ExHashT> Protocol { for (who, peer) in self.context_data.peers.iter() { if peer.block_request.as_ref().map_or(false, |(t, _)| (tick - *t).as_secs() > REQUEST_TIMEOUT_SEC) { trace!(target: "sync", "Reqeust timeout {}", who); - aborting.push(*who); + aborting.push(who.clone()); } else if peer.obsolete_requests.values().any(|t| (tick - *t).as_secs() > REQUEST_TIMEOUT_SEC) { trace!(target: "sync", "Obsolete timeout {}", who); - aborting.push(*who); + aborting.push(who.clone()); } } for (who, _) in self.handshaking_peers.iter().filter(|(_, handshaking)| (tick - handshaking.timestamp).as_secs() > REQUEST_TIMEOUT_SEC) { trace!(target: "sync", "Handshake timeout {}", who); - aborting.push(*who); + aborting.push(who.clone()); } } @@ -708,7 +705,7 @@ impl, H: ExHashT> Protocol { } /// Called by peer to report status - fn on_status_message(&mut self, who: NodeIndex, status: message::Status) { + fn on_status_message(&mut self, who: PeerId, status: message::Status) { trace!(target: "sync", "New peer {} {:?}", who, status); { if self.context_data.peers.contains_key(&who) { @@ -761,9 +758,8 @@ impl, H: ExHashT> Protocol { let cache_limit = NonZeroUsize::new(1_000_000).expect("1_000_000 > 0; qed"); let info = match self.handshaking_peers.remove(&who) { - Some(handshaking) => { + Some(_handshaking) => { let peer_info = PeerInfo { - peer_id: handshaking.peer_id, protocol_version: status.version, roles: status.roles, best_hash: status.best_hash, @@ -771,7 +767,7 @@ impl, H: ExHashT> Protocol { }; self.connected_peers .write() - .insert(who, ConnectedPeer { peer_info: peer_info.clone() }); + .insert(who.clone(), ConnectedPeer { peer_info: peer_info.clone() }); peer_info }, None => { @@ -796,15 +792,15 @@ impl, H: ExHashT> Protocol { let mut context = ProtocolContext::new(&mut self.context_data, &self.network_chan); self.on_demand .as_ref() - .map(|s| s.on_connect(who, status.roles, status.best_number)); - self.sync.new_peer(&mut context, who); + .map(|s| s.on_connect(who.clone(), status.roles, status.best_number)); + self.sync.new_peer(&mut context, who.clone()); self.consensus_gossip - .new_peer(&mut context, who, status.roles); + .new_peer(&mut context, who.clone(), status.roles); self.specialization.on_connect(&mut context, who, status); } /// Called when peer sends us new extrinsics - fn on_extrinsics(&mut self, who: NodeIndex, extrinsics: message::Transactions) { + fn on_extrinsics(&mut self, who: PeerId, extrinsics: message::Transactions) { // Accept extrinsics only when fully synced if self.sync.status().state != SyncState::Idle { trace!(target: "sync", "{} Ignoring extrinsics while syncing", who); @@ -845,10 +841,10 @@ impl, H: ExHashT> Protocol { propagated_to .entry(hash) .or_insert_with(Vec::new) - .push(peer.info.peer_id.to_base58()); + .push(who.to_base58()); } trace!(target: "sync", "Sending {} transactions to {}", to_send.len(), who); - self.network_chan.send(NetworkMsg::Outgoing(*who, GenericMessage::Transactions(to_send))) + self.network_chan.send(NetworkMsg::Outgoing(who.clone(), GenericMessage::Transactions(to_send))) } } self.transaction_pool.on_broadcasted(propagated_to); @@ -877,12 +873,12 @@ impl, H: ExHashT> Protocol { for (who, ref mut peer) in self.context_data.peers.iter_mut() { trace!(target: "sync", "Reannouncing block {:?} to {}", hash, who); peer.known_blocks.insert(hash); - self.network_chan.send(NetworkMsg::Outgoing(*who, message.clone())) + self.network_chan.send(NetworkMsg::Outgoing(who.clone(), message.clone())) } } /// Send Status message - fn send_status(&mut self, who: NodeIndex) { + fn send_status(&mut self, who: PeerId) { if let Ok(info) = self.context_data.chain.info() { let status = message::generic::Status { version: CURRENT_VERSION, @@ -913,7 +909,7 @@ impl, H: ExHashT> Protocol { self.abort(); } - fn on_block_announce(&mut self, who: NodeIndex, announce: message::BlockAnnounce) { + fn on_block_announce(&mut self, who: PeerId, announce: message::BlockAnnounce) { let header = announce.header; let hash = header.hash(); { @@ -923,7 +919,7 @@ impl, H: ExHashT> Protocol { } self.on_demand .as_ref() - .map(|s| s.on_block_announce(who, *header.number())); + .map(|s| s.on_block_announce(who.clone(), *header.number())); self.sync.on_block_announce( &mut ProtocolContext::new(&mut self.context_data, &self.network_chan), who, @@ -952,7 +948,7 @@ impl, H: ExHashT> Protocol { for (who, ref mut peer) in self.context_data.peers.iter_mut() { if peer.known_blocks.insert(hash.clone()) { trace!(target: "sync", "Announcing block {:?} to {}", hash, who); - self.network_chan.send(NetworkMsg::Outgoing(*who, message.clone())) + self.network_chan.send(NetworkMsg::Outgoing(who.clone(), message.clone())) } } } @@ -967,7 +963,7 @@ impl, H: ExHashT> Protocol { fn on_remote_call_request( &mut self, - who: NodeIndex, + who: PeerId, request: message::RemoteCallRequest, ) { trace!(target: "sync", "Remote call request {} from {} ({} at {})", request.id, who, request.method, request.block); @@ -993,7 +989,7 @@ impl, H: ExHashT> Protocol { ); } - fn on_remote_call_response(&mut self, who: NodeIndex, response: message::RemoteCallResponse) { + fn on_remote_call_response(&mut self, who: PeerId, response: message::RemoteCallResponse) { trace!(target: "sync", "Remote call response {} from {}", response.id, who); self.on_demand .as_ref() @@ -1002,7 +998,7 @@ impl, H: ExHashT> Protocol { fn on_remote_read_request( &mut self, - who: NodeIndex, + who: PeerId, request: message::RemoteReadRequest, ) { trace!(target: "sync", "Remote read request {} from {} ({} at {})", @@ -1023,7 +1019,7 @@ impl, H: ExHashT> Protocol { }), ); } - fn on_remote_read_response(&mut self, who: NodeIndex, response: message::RemoteReadResponse) { + fn on_remote_read_response(&mut self, who: PeerId, response: message::RemoteReadResponse) { trace!(target: "sync", "Remote read response {} from {}", response.id, who); self.on_demand .as_ref() @@ -1032,7 +1028,7 @@ impl, H: ExHashT> Protocol { fn on_remote_header_request( &mut self, - who: NodeIndex, + who: PeerId, request: message::RemoteHeaderRequest>, ) { trace!(target: "sync", "Remote header proof request {} from {} ({})", @@ -1057,7 +1053,7 @@ impl, H: ExHashT> Protocol { fn on_remote_header_response( &mut self, - who: NodeIndex, + who: PeerId, response: message::RemoteHeaderResponse, ) { trace!(target: "sync", "Remote header proof response {} from {}", response.id, who); @@ -1068,7 +1064,7 @@ impl, H: ExHashT> Protocol { fn on_remote_changes_request( &mut self, - who: NodeIndex, + who: PeerId, request: message::RemoteChangesRequest, ) { trace!(target: "sync", "Remote changes proof request {} from {} for key {} ({}..{})", @@ -1101,7 +1097,7 @@ impl, H: ExHashT> Protocol { fn on_remote_changes_response( &mut self, - who: NodeIndex, + who: PeerId, response: message::RemoteChangesResponse, B::Hash>, ) { trace!(target: "sync", "Remote changes proof response {} from {} (max={})", @@ -1113,9 +1109,9 @@ impl, H: ExHashT> Protocol { } fn send_message( - peers: &mut HashMap>, + peers: &mut HashMap>, network_chan: &NetworkChan, - who: NodeIndex, + who: PeerId, mut message: Message, ) { if let GenericMessage::BlockRequest(ref mut r) = message { @@ -1192,20 +1188,20 @@ macro_rules! construct_simple_protocol { fn on_connect( &mut self, _ctx: &mut $crate::Context<$block>, - _who: $crate::NodeIndex, + _who: $crate::PeerId, _status: $crate::StatusMessage<$block> ) { $( self.$sub_protocol_name.on_connect(_ctx, _who, _status); )* } - fn on_disconnect(&mut self, _ctx: &mut $crate::Context<$block>, _who: $crate::NodeIndex) { + fn on_disconnect(&mut self, _ctx: &mut $crate::Context<$block>, _who: $crate::PeerId) { $( self.$sub_protocol_name.on_disconnect(_ctx, _who); )* } fn on_message( &mut self, _ctx: &mut $crate::Context<$block>, - _who: $crate::NodeIndex, + _who: $crate::PeerId, _message: &mut Option<$crate::message::Message<$block>> ) { $( self.$sub_protocol_name.on_message(_ctx, _who, _message); )* diff --git a/substrate/core/network/src/service.rs b/substrate/core/network/src/service.rs index e55db474ee..643fd3bbe7 100644 --- a/substrate/core/network/src/service.rs +++ b/substrate/core/network/src/service.rs @@ -21,7 +21,7 @@ use std::{io, thread}; use log::{warn, debug, error, trace, info}; use futures::{Async, Future, Stream, stream, sync::oneshot, sync::mpsc}; use parking_lot::{Mutex, RwLock}; -use network_libp2p::{ProtocolId, NetworkConfiguration, NodeIndex, Severity}; +use network_libp2p::{ProtocolId, NetworkConfiguration, Severity}; use network_libp2p::{start_service, parse_str_addr, Service as NetworkService, ServiceEvent as NetworkServiceEvent}; use network_libp2p::{multiaddr, RegisteredProtocol, NetworkState}; use peerset::Peerset; @@ -51,7 +51,7 @@ pub trait SyncProvider: Send + Sync { /// Get network state. fn network_state(&self) -> NetworkState; /// Get currently connected peers - fn peers(&self) -> Vec<(NodeIndex, PeerInfo)>; + fn peers(&self) -> Vec<(PeerId, PeerInfo)>; /// Are we in the process of downloading the chain? fn is_major_syncing(&self) -> bool; } @@ -94,7 +94,7 @@ impl> Link for NetworkLink { let _ = self.protocol_sender.send(ProtocolMsg::BlocksProcessed(processed_blocks, has_error)); } - fn justification_imported(&self, who: NodeIndex, hash: &B::Hash, number: NumberFor, success: bool) { + fn justification_imported(&self, who: PeerId, hash: &B::Hash, number: NumberFor, success: bool) { let _ = self.protocol_sender.send(ProtocolMsg::JustificationImportResult(hash.clone(), number, success)); if !success { let reason = Severity::Bad(format!("Invalid justification provided for #{}", hash).to_string()); @@ -110,12 +110,12 @@ impl> Link for NetworkLink { let _ = self.protocol_sender.send(ProtocolMsg::RequestJustification(hash.clone(), number)); } - fn useless_peer(&self, who: NodeIndex, reason: &str) { + fn useless_peer(&self, who: PeerId, reason: &str) { trace!(target:"sync", "Useless peer {}, {}", who, reason); self.network_sender.send(NetworkMsg::ReportPeer(who, Severity::Useless(reason.to_string()))); } - fn note_useless_and_restart_sync(&self, who: NodeIndex, reason: &str) { + fn note_useless_and_restart_sync(&self, who: PeerId, reason: &str) { trace!(target:"sync", "Bad peer {}, {}", who, reason); // is this actually malign or just useless? self.network_sender.send(NetworkMsg::ReportPeer(who, Severity::Useless(reason.to_string()))); @@ -136,7 +136,7 @@ pub struct Service> { /// Are we actively catching up with the chain? is_major_syncing: Arc, /// Peers whom we are connected with. - peers: Arc>>>, + peers: Arc>>>, /// Network service network: Arc>>>, /// Peerset manager (PSM); manages the reputation of nodes and indicates the network which @@ -162,7 +162,7 @@ impl> Service { // Start in off-line mode, since we're not connected to any nodes yet. let is_offline = Arc::new(AtomicBool::new(true)); let is_major_syncing = Arc::new(AtomicBool::new(false)); - let peers: Arc>>> = Arc::new(Default::default()); + let peers: Arc>>> = Arc::new(Default::default()); let (protocol_sender, network_to_protocol_sender) = Protocol::new( status_sinks.clone(), is_offline.clone(), @@ -326,7 +326,7 @@ impl> SyncProvider for Servi self.network.lock().state() } - fn peers(&self) -> Vec<(NodeIndex, PeerInfo)> { + fn peers(&self) -> Vec<(PeerId, PeerInfo)> { let peers = (*self.peers.read()).clone(); peers.into_iter().map(|(idx, connected)| (idx, connected.peer_info)).collect() } @@ -458,12 +458,10 @@ impl NetworkPort { /// Messages to be handled by NetworkService. #[derive(Debug)] pub enum NetworkMsg { - /// Ask network to convert a list of nodes, to a list of peers. - PeerIds(Vec, Sender)>>), /// Send an outgoing custom message. - Outgoing(NodeIndex, Message), + Outgoing(PeerId, Message), /// Report a peer. - ReportPeer(NodeIndex, Severity), + ReportPeer(PeerId, Severity), } /// Starts the background thread that handles the networking. @@ -521,16 +519,10 @@ fn run_thread( }).for_each(move |msg| { // Handle message from Protocol. match msg { - NetworkMsg::PeerIds(node_idxs, sender) => { - let reply = node_idxs.into_iter().map(|idx| { - (idx, network_service_2.lock().peer_id_of_node(idx).map(|p| p.clone())) - }).collect::>(); - let _ = sender.send(reply); - } NetworkMsg::Outgoing(who, outgoing_message) => { network_service_2 .lock() - .send_custom_message(who, outgoing_message); + .send_custom_message(&who, outgoing_message); }, NetworkMsg::ReportPeer(who, severity) => { match severity { @@ -538,15 +530,15 @@ fn run_thread( info!(target: "sync", "Banning {:?} because {:?}", who, message); warn!(target: "sync", "Banning a node is a deprecated mechanism that \ should be removed"); - network_service_2.lock().drop_node(who) + network_service_2.lock().drop_node(&who) }, Severity::Useless(message) => { info!(target: "sync", "Dropping {:?} because {:?}", who, message); - network_service_2.lock().drop_node(who) + network_service_2.lock().drop_node(&who) }, Severity::Timeout => { info!(target: "sync", "Dropping {:?} because it timed out", who); - network_service_2.lock().drop_node(who) + network_service_2.lock().drop_node(&who) }, } }, @@ -564,22 +556,22 @@ fn run_thread( // The network service produces events about what happens on the network. Let's process them. let network = stream::poll_fn(move || network_service.lock().poll()).for_each(move |event| { match event { - NetworkServiceEvent::OpenedCustomProtocol { peer_id, node_index, version, debug_info, .. } => { + NetworkServiceEvent::OpenedCustomProtocol { peer_id, version, debug_info, .. } => { debug_assert_eq!(version, protocol::CURRENT_VERSION as u8); - let _ = protocol_sender.send(FromNetworkMsg::PeerConnected(peer_id, node_index, debug_info)); + let _ = protocol_sender.send(FromNetworkMsg::PeerConnected(peer_id, debug_info)); } - NetworkServiceEvent::ClosedCustomProtocol { node_index, debug_info, .. } => { - let _ = protocol_sender.send(FromNetworkMsg::PeerDisconnected(node_index, debug_info)); + NetworkServiceEvent::ClosedCustomProtocol { peer_id, debug_info, .. } => { + let _ = protocol_sender.send(FromNetworkMsg::PeerDisconnected(peer_id, debug_info)); } - NetworkServiceEvent::CustomMessage { node_index, message, .. } => { - let _ = protocol_sender.send(FromNetworkMsg::CustomMessage(node_index, message)); + NetworkServiceEvent::CustomMessage { peer_id, message, .. } => { + let _ = protocol_sender.send(FromNetworkMsg::CustomMessage(peer_id, message)); return Ok(()) } - NetworkServiceEvent::Clogged { node_index, messages, .. } => { + NetworkServiceEvent::Clogged { peer_id, messages, .. } => { debug!(target: "sync", "{} clogging messages:", messages.len()); for msg in messages.into_iter().take(5) { debug!(target: "sync", "{:?}", msg); - let _ = protocol_sender.send(FromNetworkMsg::PeerClogged(node_index, Some(msg))); + let _ = protocol_sender.send(FromNetworkMsg::PeerClogged(peer_id.clone(), Some(msg))); } } }; diff --git a/substrate/core/network/src/specialization.rs b/substrate/core/network/src/specialization.rs index 3a772f1f3a..e440097dd1 100644 --- a/substrate/core/network/src/specialization.rs +++ b/substrate/core/network/src/specialization.rs @@ -16,7 +16,7 @@ //! Specializations of the substrate network protocol to allow more complex forms of communication. -use crate::NodeIndex; +use crate::PeerId; use runtime_primitives::traits::Block as BlockT; use crate::protocol::Context; @@ -26,13 +26,13 @@ pub trait NetworkSpecialization: Send + Sync + 'static { fn status(&self) -> Vec; /// Called when a peer successfully handshakes. - fn on_connect(&mut self, ctx: &mut Context, who: NodeIndex, status: crate::message::Status); + fn on_connect(&mut self, ctx: &mut Context, who: PeerId, status: crate::message::Status); /// Called when a peer is disconnected. If the peer ID is unknown, it should be ignored. - fn on_disconnect(&mut self, ctx: &mut Context, who: NodeIndex); + fn on_disconnect(&mut self, ctx: &mut Context, who: PeerId); /// Called when a network-specific message arrives. - fn on_message(&mut self, ctx: &mut Context, who: NodeIndex, message: &mut Option>); + fn on_message(&mut self, ctx: &mut Context, who: PeerId, message: &mut Option>); /// Called on abort. fn on_abort(&mut self) { } diff --git a/substrate/core/network/src/sync.rs b/substrate/core/network/src/sync.rs index f1464cbed9..80ff8221a1 100644 --- a/substrate/core/network/src/sync.rs +++ b/substrate/core/network/src/sync.rs @@ -20,7 +20,7 @@ use std::time::{Duration, Instant}; use log::{debug, trace, warn}; use crate::protocol::Context; use fork_tree::ForkTree; -use network_libp2p::{Severity, NodeIndex}; +use network_libp2p::{Severity, PeerId}; use client::{BlockStatus, ClientInfo}; use consensus::BlockOrigin; use consensus::import_queue::{ImportQueue, IncomingBlock}; @@ -96,8 +96,8 @@ type PendingJustification = (::Hash, NumberFor); struct PendingJustifications { justifications: ForkTree, ()>, pending_requests: VecDeque>, - peer_requests: HashMap>, - previous_requests: HashMap, Vec<(NodeIndex, Instant)>>, + peer_requests: HashMap>, + previous_requests: HashMap, Vec<(PeerId, Instant)>>, importing_requests: HashSet>, } @@ -117,7 +117,7 @@ impl PendingJustifications { /// justification request for block #10 to a peer at block #2), and we also /// throttle requests to the same peer if a previous justification request /// yielded no results. - fn dispatch(&mut self, peers: &mut HashMap>, protocol: &mut Context) { + fn dispatch(&mut self, peers: &mut HashMap>, protocol: &mut Context) { if self.pending_requests.is_empty() { return; } @@ -134,11 +134,11 @@ impl PendingJustifications { if sync.state != PeerSyncState::Available || self.peer_requests.contains_key(&peer) { None } else { - Some((*peer, sync.best_number)) + Some((peer.clone(), sync.best_number)) } }).collect::>(); - let mut last_peer = available_peers.back().map(|p| p.0); + let mut last_peer = available_peers.back().map(|p| p.0.clone()); let mut unhandled_requests = VecDeque::new(); loop { @@ -164,11 +164,11 @@ impl PendingJustifications { }; if !peer_eligible { - available_peers.push_back((peer, peer_best_number)); + available_peers.push_back((peer.clone(), peer_best_number)); // we tried all peers and none can answer this request if Some(peer) == last_peer { - last_peer = available_peers.back().map(|p| p.0); + last_peer = available_peers.back().map(|p| p.0.clone()); let request = self.pending_requests.pop_front() .expect("verified to be Some in the beginning of the loop; qed"); @@ -179,12 +179,12 @@ impl PendingJustifications { continue; } - last_peer = available_peers.back().map(|p| p.0); + last_peer = available_peers.back().map(|p| p.0.clone()); let request = self.pending_requests.pop_front() .expect("verified to be Some in the beginning of the loop; qed"); - self.peer_requests.insert(peer, request); + self.peer_requests.insert(peer.clone(), request); peers.get_mut(&peer) .expect("peer was is taken from available_peers; available_peers is a subset of peers; qed") @@ -235,7 +235,7 @@ impl PendingJustifications { } /// Retry any pending request if a peer disconnected. - fn peer_disconnected(&mut self, who: NodeIndex) { + fn peer_disconnected(&mut self, who: PeerId) { if let Some(request) = self.peer_requests.remove(&who) { self.pending_requests.push_front(request); } @@ -281,7 +281,7 @@ impl PendingJustifications { /// was `None`. fn on_response( &mut self, - who: NodeIndex, + who: PeerId, justification: Option, import_queue: &ImportQueue, ) { @@ -343,7 +343,7 @@ impl PendingJustifications { /// Relay chain sync strategy. pub struct ChainSync { genesis_hash: B::Hash, - peers: HashMap>, + peers: HashMap>, blocks: BlockCollection, best_queued_number: NumberFor, best_queued_hash: B::Hash, @@ -436,8 +436,8 @@ impl ChainSync { } /// Returns peer sync status (if any). - pub(crate) fn peer_info(&self, who: NodeIndex) -> Option> { - self.peers.get(&who).map(|peer| { + pub(crate) fn peer_info(&self, who: &PeerId) -> Option> { + self.peers.get(who).map(|peer| { PeerInfo { best_hash: peer.best_hash, best_number: peer.best_number, @@ -457,7 +457,7 @@ impl ChainSync { } /// Handle new connected peer. - pub(crate) fn new_peer(&mut self, protocol: &mut Context, who: NodeIndex) { + pub(crate) fn new_peer(&mut self, protocol: &mut Context, who: PeerId) { // Initialize some variables to determine if // is_offline or is_major_syncing should be updated // after processing this new peer. @@ -465,7 +465,7 @@ impl ChainSync { let previous_best_seen = self.best_seen_block(); let previous_state = self.state(&previous_best_seen); - if let Some(info) = protocol.peer_info(who) { + if let Some(info) = protocol.peer_info(&who) { let status = block_status(&*protocol.client(), &self.queue_blocks, info.best_hash); match (status, info.best_number) { (Err(e), _) => { @@ -497,7 +497,7 @@ impl ChainSync { if our_best > As::sa(0) { let common_best = ::std::cmp::min(our_best, info.best_number); debug!(target:"sync", "New peer with unknown best hash {} ({}), searching for common ancestor.", info.best_hash, info.best_number); - self.peers.insert(who, PeerSync { + self.peers.insert(who.clone(), PeerSync { common_number: As::sa(0), best_hash: info.best_hash, best_number: info.best_number, @@ -508,7 +508,7 @@ impl ChainSync { } else { // We are at genesis, just start downloading debug!(target:"sync", "New peer with best hash {} ({}).", info.best_hash, info.best_number); - self.peers.insert(who, PeerSync { + self.peers.insert(who.clone(), PeerSync { common_number: As::sa(0), best_hash: info.best_hash, best_number: info.best_number, @@ -520,7 +520,7 @@ impl ChainSync { }, (Ok(BlockStatus::Queued), _) | (Ok(BlockStatus::InChainWithState), _) | (Ok(BlockStatus::InChainPruned), _) => { debug!(target:"sync", "New peer with known best hash {} ({}).", info.best_hash, info.best_number); - self.peers.insert(who, PeerSync { + self.peers.insert(who.clone(), PeerSync { common_number: info.best_number, best_hash: info.best_hash, best_number: info.best_number, @@ -590,7 +590,7 @@ impl ChainSync { pub(crate) fn on_block_data( &mut self, protocol: &mut Context, - who: NodeIndex, + who: PeerId, request: message::BlockRequest, response: message::BlockResponse ) { @@ -603,7 +603,7 @@ impl ChainSync { let peer_state = peer.state.clone(); match peer_state { PeerSyncState::DownloadingNew(start_block) => { - self.blocks.clear_peer_download(who); + self.blocks.clear_peer_download(&who); peer.state = PeerSyncState::Available; self.blocks.insert(start_block, blocks, who); self.blocks @@ -627,7 +627,7 @@ impl ChainSync { header: b.header, body: b.body, justification: b.justification, - origin: Some(who), + origin: Some(who.clone()), } }).collect() }, @@ -698,7 +698,7 @@ impl ChainSync { pub(crate) fn on_block_justification_data( &mut self, protocol: &mut Context, - who: NodeIndex, + who: PeerId, _request: message::BlockRequest, response: message::BlockResponse, ) { @@ -757,7 +757,7 @@ impl ChainSync { if self.is_stopping.load(Ordering::SeqCst) { return } - let peers: Vec = self.peers.keys().map(|p| *p).collect(); + let peers: Vec = self.peers.keys().map(|p| p.clone()).collect(); for peer in peers { self.download_new(protocol, peer); } @@ -846,7 +846,7 @@ impl ChainSync { } /// Handle new block announcement. - pub(crate) fn on_block_announce(&mut self, protocol: &mut Context, who: NodeIndex, hash: B::Hash, header: &B::Header) { + pub(crate) fn on_block_announce(&mut self, protocol: &mut Context, who: PeerId, hash: B::Hash, header: &B::Header) { let number = *header.number(); if number <= As::sa(0) { trace!(target: "sync", "Ignored invalid block announcement from {}: {}", who, hash); @@ -921,10 +921,10 @@ impl ChainSync { } /// Handle disconnected peer. - pub(crate) fn peer_disconnected(&mut self, protocol: &mut Context, who: NodeIndex) { + pub(crate) fn peer_disconnected(&mut self, protocol: &mut Context, who: PeerId) { let previous_best_seen = self.best_seen_block(); let previous_state = self.state(&previous_best_seen); - self.blocks.clear_peer_download(who); + self.blocks.clear_peer_download(&who); self.peers.remove(&who); if self.peers.len() == 0 { // We're not connected to any peer anymore. @@ -958,7 +958,7 @@ impl ChainSync { self.best_queued_number = As::sa(0); } } - let ids: Vec = self.peers.drain().map(|(id, _)| id).collect(); + let ids: Vec = self.peers.drain().map(|(id, _)| id).collect(); for id in ids { self.new_peer(protocol, id); } @@ -971,7 +971,7 @@ impl ChainSync { } // Download old block with known parent. - fn download_stale(&mut self, protocol: &mut Context, who: NodeIndex, hash: &B::Hash) { + fn download_stale(&mut self, protocol: &mut Context, who: PeerId, hash: &B::Hash) { if let Some(ref mut peer) = self.peers.get_mut(&who) { match peer.state { PeerSyncState::Available => { @@ -992,7 +992,7 @@ impl ChainSync { } // Download old block with unknown parent. - fn download_unknown_stale(&mut self, protocol: &mut Context, who: NodeIndex, hash: &B::Hash) { + fn download_unknown_stale(&mut self, protocol: &mut Context, who: PeerId, hash: &B::Hash) { if let Some(ref mut peer) = self.peers.get_mut(&who) { match peer.state { PeerSyncState::Available => { @@ -1013,7 +1013,7 @@ impl ChainSync { } // Issue a request for a peer to download new blocks, if any are available - fn download_new(&mut self, protocol: &mut Context, who: NodeIndex) { + fn download_new(&mut self, protocol: &mut Context, who: PeerId) { if let Some(ref mut peer) = self.peers.get_mut(&who) { // when there are too many blocks in the queue => do not try to download new blocks if self.queue_blocks.len() > MAX_IMPORTING_BLOCKS { @@ -1023,7 +1023,7 @@ impl ChainSync { match peer.state { PeerSyncState::Available => { trace!(target: "sync", "Considering new block download from {}, common block is {}, best is {:?}", who, peer.common_number, peer.best_number); - if let Some(range) = self.blocks.needed_blocks(who, MAX_BLOCKS_TO_REQUEST, peer.best_number, peer.common_number) { + if let Some(range) = self.blocks.needed_blocks(who.clone(), MAX_BLOCKS_TO_REQUEST, peer.best_number, peer.common_number) { trace!(target: "sync", "Requesting blocks from {}, ({} to {})", who, range.start, range.end); let request = message::generic::BlockRequest { id: 0, @@ -1044,7 +1044,7 @@ impl ChainSync { } } - fn request_ancestry(protocol: &mut Context, who: NodeIndex, block: NumberFor) { + fn request_ancestry(protocol: &mut Context, who: PeerId, block: NumberFor) { trace!(target: "sync", "Requesting ancestry block #{} from {}", block, who); let request = message::generic::BlockRequest { id: 0, diff --git a/substrate/core/network/src/test/block_import.rs b/substrate/core/network/src/test/block_import.rs index 0916d698c3..3b5e44cc47 100644 --- a/substrate/core/network/src/test/block_import.rs +++ b/substrate/core/network/src/test/block_import.rs @@ -26,7 +26,7 @@ struct TestLink {} impl Link for TestLink {} -fn prepare_good_block() -> (client::Client, Hash, u64, IncomingBlock) { +fn prepare_good_block() -> (client::Client, Hash, u64, PeerId, IncomingBlock) { let client = test_client::new(); let block = client.new_block().unwrap().bake().unwrap(); client.import(BlockOrigin::File, block).unwrap(); @@ -34,27 +34,28 @@ fn prepare_good_block() -> (client::Client for DummySpecialization { vec![] } - fn on_connect(&mut self, _ctx: &mut Context, _peer_id: NodeIndex, _status: crate::message::Status) { + fn on_connect(&mut self, _ctx: &mut Context, _peer_id: PeerId, _status: crate::message::Status) { } - fn on_disconnect(&mut self, _ctx: &mut Context, _peer_id: NodeIndex) { + fn on_disconnect(&mut self, _ctx: &mut Context, _peer_id: PeerId) { } fn on_message( &mut self, _ctx: &mut Context, - _peer_id: NodeIndex, + _peer_id: PeerId, _message: &mut Option>, ) { } @@ -166,7 +166,7 @@ impl + Clone> Link for TestLink { self.link.blocks_processed(processed_blocks, has_error); } - fn justification_imported(&self, who: NodeIndex, hash: &Hash, number:NumberFor, success: bool) { + fn justification_imported(&self, who: PeerId, hash: &Hash, number:NumberFor, success: bool) { self.link.justification_imported(who, hash, number, success); } @@ -174,11 +174,11 @@ impl + Clone> Link for TestLink { self.link.request_justification(hash, number); } - fn useless_peer(&self, who: NodeIndex, reason: &str) { + fn useless_peer(&self, who: PeerId, reason: &str) { self.link.useless_peer(who, reason); } - fn note_useless_and_restart_sync(&self, who: NodeIndex, reason: &str) { + fn note_useless_and_restart_sync(&self, who: PeerId, reason: &str) { self.link.note_useless_and_restart_sync(who, reason); } @@ -190,7 +190,8 @@ impl + Clone> Link for TestLink { pub struct Peer + Clone> { pub is_offline: Arc, pub is_major_syncing: Arc, - pub peers: Arc>>>, + pub peers: Arc>>>, + pub peer_id: PeerId, client: Arc, network_to_protocol_sender: Sender>, pub protocol_sender: Sender>, @@ -206,7 +207,7 @@ impl + Clone> Peer { fn new( is_offline: Arc, is_major_syncing: Arc, - peers: Arc>>>, + peers: Arc>>>, client: Arc, import_queue: Box>, network_to_protocol_sender: Sender>, @@ -222,6 +223,7 @@ impl + Clone> Peer { is_offline, is_major_syncing, peers, + peer_id: PeerId::random(), client, network_to_protocol_sender, protocol_sender, @@ -268,22 +270,22 @@ impl + Clone> Peer { } /// Called on connection to other indicated peer. - fn on_connect(&self, other: NodeIndex) { - let _ = self.network_to_protocol_sender.send(FromNetworkMsg::PeerConnected(PeerId::random(), other, String::new())); + fn on_connect(&self, other: &Self) { + let _ = self.network_to_protocol_sender.send(FromNetworkMsg::PeerConnected(other.peer_id.clone(), String::new())); } /// Called on disconnect from other indicated peer. - fn on_disconnect(&self, other: NodeIndex) { + fn on_disconnect(&self, other: &Self) { let _ = self .network_to_protocol_sender - .send(FromNetworkMsg::PeerDisconnected(other, String::new())); + .send(FromNetworkMsg::PeerDisconnected(other.peer_id.clone(), String::new())); } /// Receive a message from another peer. Return a set of peers to disconnect. - fn receive_message(&self, from: NodeIndex, msg: Message) { + fn receive_message(&self, from: &Self, msg: Message) { let _ = self .network_to_protocol_sender - .send(FromNetworkMsg::CustomMessage(from, msg)); + .send(FromNetworkMsg::CustomMessage(from.peer_id.clone(), msg)); } /// Produce the next pending message to send to another peer. @@ -567,7 +569,7 @@ pub trait TestNetFactory: Sized { let is_offline = Arc::new(AtomicBool::new(true)); let is_major_syncing = Arc::new(AtomicBool::new(false)); let specialization = self::SpecializationFactory::create(); - let peers: Arc>>> = Arc::new(Default::default()); + let peers: Arc>>> = Arc::new(Default::default()); let (protocol_sender, network_to_protocol_sender) = Protocol::new( status_sinks, @@ -606,39 +608,38 @@ pub trait TestNetFactory: Sized { if self.started() { return; } - self.mut_peers(|peers| { - for peer in 0..peers.len() { - peers[peer].start(); - for client in 0..peers.len() { - if peer != client { - peers[peer].on_connect(client as NodeIndex); - } + for peer in self.peers() { + peer.start(); + for client in self.peers() { + if peer.peer_id != client.peer_id { + peer.on_connect(client); } } - }); + } self.route(None); self.set_started(true); } /// Do one step of routing. - fn route(&mut self, disconnected: Option>) { + fn route(&mut self, disconnected: Option>) { self.mut_peers(move |peers| { let mut to_disconnect = HashSet::new(); - for peer in 0..peers.len() { - let packet = peers[peer].pending_message(); + for (peer_pos, peer) in peers.iter().enumerate() { + let packet = peer.pending_message(); match packet { None => continue, Some(NetworkMsg::Outgoing(recipient, packet)) => { + let recipient = peers.iter().position(|p| p.peer_id == recipient).unwrap(); if let Some(disconnected) = disconnected.as_ref() { let mut current = HashSet::new(); - current.insert(peer); + current.insert(peer_pos); current.insert(recipient); // Not routing message between "disconnected" nodes. if disconnected.is_subset(¤t) { continue; } } - peers[recipient].receive_message(peer as NodeIndex, packet) + peers[recipient].receive_message(peer, packet) } Some(NetworkMsg::ReportPeer(who, _)) => { to_disconnect.insert(who); @@ -647,8 +648,10 @@ pub trait TestNetFactory: Sized { } } for d in to_disconnect { - for peer in 0..peers.len() { - peers[peer].on_disconnect(d); + if let Some(d) = peers.iter().find(|p| p.peer_id == d) { + for peer in 0..peers.len() { + peers[peer].on_disconnect(d); + } } } }); @@ -659,7 +662,9 @@ pub trait TestNetFactory: Sized { self.mut_peers(move |peers| { for peer in 0..peers.len() { while let Some(NetworkMsg::Outgoing(recipient, packet)) = peers[peer].pending_message_fast() { - peers[recipient].receive_message(peer as NodeIndex, packet) + if let Some(p) = peers.iter().find(|p| p.peer_id == recipient) { + p.receive_message(&peers[peer], packet) + } } } }); @@ -701,7 +706,7 @@ pub trait TestNetFactory: Sized { /// Perform synchronization until complete, if provided the /// given nodes set are excluded from sync. - fn sync_with(&mut self, disconnected: Option>) -> u32 { + fn sync_with(&mut self, disconnected: Option>) -> u32 { self.start(); let mut total_steps = 0; let mut done = 0; @@ -730,7 +735,7 @@ pub trait TestNetFactory: Sized { /// Perform synchronization until complete, /// excluding sync between certain nodes. - fn sync_with_disconnected(&mut self, disconnected: HashSet) -> u32 { + fn sync_with_disconnected(&mut self, disconnected: HashSet) -> u32 { self.sync_with(Some(disconnected)) } diff --git a/substrate/core/network/src/test/sync.rs b/substrate/core/network/src/test/sync.rs index cb54c05526..9bbf0a32b7 100644 --- a/substrate/core/network/src/test/sync.rs +++ b/substrate/core/network/src/test/sync.rs @@ -18,7 +18,6 @@ use client::backend::Backend; use client::blockchain::HeaderBackend as BlockchainHeaderBackend; use crate::config::Roles; use consensus::BlockOrigin; -use network_libp2p::NodeIndex; use std::collections::HashSet; use std::thread; use std::time::Duration; @@ -54,7 +53,7 @@ fn sync_peers_works() { // And then disconnect. for other in 0..3 { if other != peer { - net.peer(peer).on_disconnect(other); + net.peer(peer).on_disconnect(net.peer(other)); } } } @@ -100,7 +99,7 @@ fn sync_cycle_from_offline_to_syncing_to_offline() { for peer in 0..3 { for other in 0..3 { if other != peer { - net.peer(peer).on_disconnect(other); + net.peer(peer).on_disconnect(net.peer(other)); } } thread::sleep(Duration::from_millis(100)); @@ -125,8 +124,8 @@ fn syncing_node_not_major_syncing_when_disconnected() { assert!(net.peer(1).is_major_syncing()); // Disconnect peer 1 form everyone else. - net.peer(1).on_disconnect(0); - net.peer(1).on_disconnect(2); + net.peer(1).on_disconnect(net.peer(0)); + net.peer(1).on_disconnect(net.peer(2)); thread::sleep(Duration::from_millis(100)); // Peer 1 is not major-syncing. @@ -162,7 +161,7 @@ fn sync_from_two_peers_with_ancestry_search_works() { fn ancestry_search_works_when_backoff_is_one() { let _ = ::env_logger::try_init(); let mut net = TestNet::new(3); - + net.peer(0).push_blocks(1, false); net.peer(1).push_blocks(2, false); net.peer(2).push_blocks(2, false); @@ -357,13 +356,13 @@ fn blocks_are_not_announced_by_light_nodes() { net.peer(0).start(); net.peer(1).start(); net.peer(2).start(); - net.peer(0).on_connect(1); - net.peer(1).on_connect(2); + net.peer(0).on_connect(net.peer(1)); + net.peer(1).on_connect(net.peer(2)); // Only sync between 0 -> 1, and 1 -> 2 let mut disconnected = HashSet::new(); - disconnected.insert(0 as NodeIndex); - disconnected.insert(2 as NodeIndex); + disconnected.insert(0); + disconnected.insert(2); net.sync_with_disconnected(disconnected); // peer 0 has the best chain diff --git a/substrate/core/rpc/src/system/helpers.rs b/substrate/core/rpc/src/system/helpers.rs index ea35b8803c..9f64318d5d 100644 --- a/substrate/core/rpc/src/system/helpers.rs +++ b/substrate/core/rpc/src/system/helpers.rs @@ -54,8 +54,6 @@ pub struct Health { #[derive(Debug, PartialEq, Serialize)] #[serde(rename_all = "camelCase")] pub struct PeerInfo { - /// Peer Node Index - pub index: usize, /// Peer ID pub peer_id: String, /// Roles @@ -96,14 +94,13 @@ mod tests { fn should_serialize_peer_info() { assert_eq!( ::serde_json::to_string(&PeerInfo { - index: 1, peer_id: "2".into(), roles: "a".into(), protocol_version: 2, best_hash: 5u32, best_number: 6u32, }).unwrap(), - r#"{"index":1,"peerId":"2","roles":"a","protocolVersion":2,"bestHash":5,"bestNumber":6}"#, + r#"{"peerId":"2","roles":"a","protocolVersion":2,"bestHash":5,"bestNumber":6}"#, ); } } diff --git a/substrate/core/rpc/src/system/mod.rs b/substrate/core/rpc/src/system/mod.rs index b0fc4e5679..331d9cd85b 100644 --- a/substrate/core/rpc/src/system/mod.rs +++ b/substrate/core/rpc/src/system/mod.rs @@ -117,9 +117,8 @@ impl SystemApi::Number> for Sy } fn system_peers(&self) -> Result::Number>>> { - Ok(self.sync.peers().into_iter().map(|(index, p)| PeerInfo { - index, - peer_id: p.peer_id.to_base58(), + Ok(self.sync.peers().into_iter().map(|(peer_id, p)| PeerInfo { + peer_id: peer_id.to_base58(), roles: format!("{:?}", p.roles), protocol_version: p.protocol_version, best_hash: p.best_hash, diff --git a/substrate/core/rpc/src/system/tests.rs b/substrate/core/rpc/src/system/tests.rs index 66f10694fb..8f85d9c2df 100644 --- a/substrate/core/rpc/src/system/tests.rs +++ b/substrate/core/rpc/src/system/tests.rs @@ -16,7 +16,7 @@ use super::*; -use network::{self, ProtocolStatus, NodeIndex, PeerId, PeerInfo as NetworkPeerInfo}; +use network::{self, ProtocolStatus, PeerId, PeerInfo as NetworkPeerInfo}; use network::config::Roles; use test_client::runtime::Block; use assert_matches::assert_matches; @@ -57,12 +57,11 @@ impl network::SyncProvider for Status { } } - fn peers(&self) -> Vec<(NodeIndex, NetworkPeerInfo)> { + fn peers(&self) -> Vec<(PeerId, NetworkPeerInfo)> { let mut peers = vec![]; for _peer in 0..self.peers { peers.push( - (1, NetworkPeerInfo { - peer_id: self.peer_id.clone(), + (self.peer_id.clone(), NetworkPeerInfo { roles: Roles::FULL, protocol_version: 1, best_hash: Default::default(), @@ -187,7 +186,6 @@ fn system_peers() { is_dev: true, }).system_peers().unwrap(), vec![PeerInfo { - index: 1, peer_id: peer_id.to_base58(), roles: "FULL".into(), protocol_version: 1,