diff --git a/substrate/core/network-libp2p/src/service_task.rs b/substrate/core/network-libp2p/src/service_task.rs index 9db3d41bd7..77948d29a3 100644 --- a/substrate/core/network-libp2p/src/service_task.rs +++ b/substrate/core/network-libp2p/src/service_task.rs @@ -134,6 +134,8 @@ where TProtos: IntoIterator>, pub enum ServiceEvent { /// A custom protocol substream has been opened with a node. OpenedCustomProtocol { + /// The Id of the node. + peer_id: PeerId, /// Index of the node. node_index: NodeIndex, /// Protocol that has been opened. @@ -381,8 +383,9 @@ where TMessage: CustomMessage + Send + 'static { match self.swarm.poll() { Ok(Async::Ready(Some(BehaviourOut::CustomProtocolOpen { protocol_id, peer_id, version, endpoint }))) => { debug!(target: "sub-libp2p", "Opened custom protocol with {:?}", peer_id); - let node_index = self.index_of_peer_or_assign(peer_id, endpoint); + let node_index = self.index_of_peer_or_assign(peer_id.clone(), endpoint); break Ok(Async::Ready(Some(ServiceEvent::OpenedCustomProtocol { + peer_id, node_index, protocol: protocol_id, version, diff --git a/substrate/core/network/src/protocol.rs b/substrate/core/network/src/protocol.rs index 82e763bdd3..65c02a39aa 100644 --- a/substrate/core/network/src/protocol.rs +++ b/substrate/core/network/src/protocol.rs @@ -15,7 +15,7 @@ // along with Substrate. If not, see . use crossbeam_channel::{self as channel, Receiver, Sender, select}; -use network_libp2p::{NodeIndex, Severity}; +use network_libp2p::{NodeIndex, PeerId, Severity}; use primitives::storage::StorageKey; use runtime_primitives::generic::BlockId; use runtime_primitives::traits::{As, Block as BlockT, Header as HeaderT, NumberFor, Zero}; @@ -28,6 +28,7 @@ use crate::specialization::NetworkSpecialization; use crate::sync::{ChainSync, Status as SyncStatus, SyncState}; use crate::service::{NetworkChan, NetworkMsg, TransactionPool, ExHashT}; use crate::config::{ProtocolConfig, Roles}; +use parking_lot::RwLock; use rustc_hex::ToHex; use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; @@ -67,10 +68,26 @@ pub struct Protocol, H: ExHashT> { consensus_gossip: ConsensusGossip, context_data: ContextData, // Connected peers pending Status message. - handshaking_peers: HashMap, + handshaking_peers: HashMap, + // Connected peers from whom we received a Status message, + // similar to context_data.peers but shared with the SyncProvider. + connected_peers: Arc>>>, transaction_pool: Arc>, } +/// A peer from whom we have received a Status message. +#[derive(Clone)] +pub struct ConnectedPeer { + pub peer_info: PeerInfo +} + +/// A peer that we are connected to +/// and from whom we have not yet received a Status message. +struct HandshakingPeer { + timestamp: time::Instant, + peer_id: PeerId, +} + /// Syncing status and statistics #[derive(Clone)] pub struct ProtocolStatus { @@ -101,6 +118,8 @@ struct Peer { /// Info about a peer's known state. #[derive(Clone, Debug)] pub struct PeerInfo { + /// Network id. + pub peer_id: PeerId, /// Roles pub roles: Roles, /// Protocol version @@ -213,8 +232,6 @@ pub enum ProtocolMsg> { ExecuteWithGossip(Box + Send + 'static>), /// Incoming gossip consensus message. GossipConsensusMessage(B::Hash, ConsensusEngineId, Vec), - /// Return a list of peers currently known to protocol. - Peers(Sender)>>), /// Tell protocol to abort sync (does not stop protocol). /// Only used in tests. #[cfg(any(test, feature = "test-helpers"))] @@ -228,7 +245,7 @@ pub enum ProtocolMsg> { /// Messages sent to Protocol from Network-libp2p. pub enum FromNetworkMsg { /// A peer connected, with debug info. - PeerConnected(NodeIndex, String), + PeerConnected(PeerId, NodeIndex, String), /// A peer disconnected, with debug info. PeerDisconnected(NodeIndex, String), /// A custom message from another peer. @@ -247,6 +264,7 @@ impl, H: ExHashT> Protocol { pub fn new( is_offline: Arc, is_major_syncing: Arc, + connected_peers: Arc>>>, network_chan: NetworkChan, config: ProtocolConfig, chain: Arc>, @@ -277,6 +295,7 @@ impl, H: ExHashT> Protocol { specialization: specialization, consensus_gossip: ConsensusGossip::new(), handshaking_peers: HashMap::new(), + connected_peers, transaction_pool: transaction_pool, }; let tick_timeout = channel::tick(TICK_TIMEOUT); @@ -332,10 +351,6 @@ impl, H: ExHashT> Protocol { fn handle_client_msg(&mut self, msg: ProtocolMsg) -> bool { match msg { - ProtocolMsg::Peers(sender) => { - let peers = self.context_data.peers.iter().map(|(idx, p)| (*idx, p.info.clone())).collect(); - let _ = sender.send(peers); - }, ProtocolMsg::Status(sender) => self.status(sender), ProtocolMsg::BlockImported(hash, header) => self.on_block_imported(hash, &header), ProtocolMsg::BlockFinalized(hash, header) => self.on_block_finalized(hash, &header), @@ -385,7 +400,7 @@ impl, H: ExHashT> Protocol { fn handle_network_msg(&mut self, msg: FromNetworkMsg) -> bool { match msg { FromNetworkMsg::PeerDisconnected(who, debug_info) => self.on_peer_disconnected(who, debug_info), - FromNetworkMsg::PeerConnected(who, debug_info) => self.on_peer_connected(who, debug_info), + FromNetworkMsg::PeerConnected(peer_id, who, debug_info) => self.on_peer_connected(peer_id, who, debug_info), FromNetworkMsg::PeerClogged(who, message) => self.on_clogged_peer(who, message), FromNetworkMsg::CustomMessage(who, message) => { self.on_custom_message(who, message) @@ -480,9 +495,9 @@ impl, H: ExHashT> Protocol { } /// Called when a new peer is connected - fn on_peer_connected(&mut self, who: NodeIndex, debug_info: String) { + fn on_peer_connected(&mut self, peer_id: PeerId, who: NodeIndex, debug_info: String) { trace!(target: "sync", "Connecting {}: {}", who, debug_info); - self.handshaking_peers.insert(who, time::Instant::now()); + self.handshaking_peers.insert(who, HandshakingPeer { timestamp: time::Instant::now(), peer_id }); self.send_status(who); } @@ -492,6 +507,7 @@ impl, H: ExHashT> Protocol { // lock all the the peer lists so that add/remove peer events are in order let removed = { self.handshaking_peers.remove(&peer); + self.connected_peers.write().remove(&peer); self.context_data.peers.remove(&peer).is_some() }; if removed { @@ -644,7 +660,7 @@ impl, H: ExHashT> Protocol { aborting.push(*who); } } - for (who, _) in self.handshaking_peers.iter().filter(|(_, t)| (tick - **t).as_secs() > REQUEST_TIMEOUT_SEC) { + for (who, _) in self.handshaking_peers.iter().filter(|(_, handshaking)| (tick - handshaking.timestamp).as_secs() > REQUEST_TIMEOUT_SEC) { trace!(target: "sync", "Handshake timeout {}", who); aborting.push(*who); } @@ -711,13 +727,28 @@ impl, H: ExHashT> Protocol { let cache_limit = NonZeroUsize::new(1_000_000).expect("1_000_000 > 0; qed"); - let peer = Peer { - info: PeerInfo { - protocol_version: status.version, - roles: status.roles, - best_hash: status.best_hash, - best_number: status.best_number + let info = match self.handshaking_peers.remove(&who) { + Some(handshaking) => { + let peer_info = PeerInfo { + peer_id: handshaking.peer_id, + protocol_version: status.version, + roles: status.roles, + best_hash: status.best_hash, + best_number: status.best_number + }; + self.connected_peers + .write() + .insert(who, ConnectedPeer { peer_info: peer_info.clone() }); + peer_info }, + None => { + debug!(target: "sync", "Received status from previously unconnected node {}", who); + return; + }, + }; + + let peer = Peer { + info, block_request: None, known_extrinsics: LruHashSet::new(cache_limit), known_blocks: LruHashSet::new(cache_limit), @@ -725,7 +756,7 @@ impl, H: ExHashT> Protocol { obsolete_requests: HashMap::new(), }; self.context_data.peers.insert(who.clone(), peer); - self.handshaking_peers.remove(&who); + debug!(target: "sync", "Connected {}", who); } @@ -769,7 +800,7 @@ impl, H: ExHashT> Protocol { let extrinsics = self.transaction_pool.transactions(); let mut propagated_to = HashMap::new(); - for (who, ref mut peer) in self.context_data.peers.iter_mut() { + for (who, peer) in self.context_data.peers.iter_mut() { let (hashes, to_send): (Vec<_>, Vec<_>) = extrinsics .iter() .filter(|&(ref hash, _)| peer.known_extrinsics.insert(hash.clone())) @@ -777,18 +808,11 @@ impl, H: ExHashT> Protocol { .unzip(); if !to_send.is_empty() { - let (sender, port) = channel::unbounded(); - let _ = self - .network_chan - .send(NetworkMsg::GetPeerId(who.clone(), sender)); - let node_id = port.recv().expect("1. We are running 2. Network should be running too."); - if let Some(id) = node_id { - for hash in hashes { - propagated_to - .entry(hash) - .or_insert_with(Vec::new) - .push(id.clone()); - } + for hash in hashes { + propagated_to + .entry(hash) + .or_insert_with(Vec::new) + .push(peer.info.peer_id.to_base58()); } trace!(target: "sync", "Sending {} transactions to {}", to_send.len(), who); self.network_chan.send(NetworkMsg::Outgoing(*who, GenericMessage::Transactions(to_send))) diff --git a/substrate/core/network/src/service.rs b/substrate/core/network/src/service.rs index bcdc636d4c..0741c9f1d9 100644 --- a/substrate/core/network/src/service.rs +++ b/substrate/core/network/src/service.rs @@ -20,14 +20,14 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::{io, thread}; use log::{warn, debug, error, trace, info}; use futures::{Async, Future, Stream, stream, sync::oneshot}; -use parking_lot::Mutex; +use parking_lot::{Mutex, RwLock}; use network_libp2p::{ProtocolId, NetworkConfiguration, NodeIndex, ErrorKind, Severity}; use network_libp2p::{start_service, parse_str_addr, Service as NetworkService, ServiceEvent as NetworkServiceEvent}; use network_libp2p::{Protocol as Libp2pProtocol, RegisteredProtocol}; use consensus::import_queue::{ImportQueue, Link}; use crate::consensus_gossip::ConsensusGossip; use crate::message::{Message, ConsensusEngineId}; -use crate::protocol::{self, Context, FromNetworkMsg, Protocol, ProtocolMsg, ProtocolStatus, PeerInfo}; +use crate::protocol::{self, Context, FromNetworkMsg, Protocol, ConnectedPeer, ProtocolMsg, ProtocolStatus, PeerInfo}; use crate::config::Params; use crossbeam_channel::{self as channel, Receiver, Sender, TryRecvError}; use crate::error::Error; @@ -47,7 +47,7 @@ pub trait SyncProvider: Send + Sync { /// Get sync status fn status(&self) -> ProtocolStatus; /// Get currently connected peers - fn peers(&self) -> Vec<(NodeIndex, Option, PeerInfo)>; + fn peers(&self) -> Vec<(NodeIndex, PeerInfo)>; } /// Minimum Requirements for a Hash within Networking @@ -118,10 +118,12 @@ impl> Link for NetworkLink { /// Substrate network service. Handles network IO and manages connectivity. pub struct Service> { - // Are we connected to any peer? + /// Are we connected to any peer? is_offline: Arc, - // Are we actively catching up with the chain? + /// Are we actively catching up with the chain? is_major_syncing: Arc, + /// Peers whom we are connected with. + peers: Arc>>>, /// Network service network: Arc>>>, /// Protocol sender @@ -143,9 +145,11 @@ impl> Service { // Start in off-line mode, since we're not connected to any nodes yet. let is_offline = Arc::new(AtomicBool::new(true)); let is_major_syncing = Arc::new(AtomicBool::new(false)); + let peers: Arc>>> = Arc::new(Default::default()); let (protocol_sender, network_to_protocol_sender) = Protocol::new( is_offline.clone(), is_major_syncing.clone(), + peers.clone(), network_chan.clone(), params.config, params.chain, @@ -166,6 +170,7 @@ impl> Service { let service = Arc::new(Service { is_offline, is_major_syncing, + peers, network, protocol_sender: protocol_sender.clone(), bg_thread: Some(thread), @@ -284,16 +289,9 @@ impl> SyncProvider for Servi 2 Service keeps a sender to protocol, and the ProtocolMsg::Stop is never sent.") } - fn peers(&self) -> Vec<(NodeIndex, Option, PeerInfo)> { - let (sender, port) = channel::unbounded(); - let _ = self.protocol_sender.send(ProtocolMsg::Peers(sender)); - let peers = port.recv().expect("1. Protocol keeps handling messages until all senders are dropped, - or the ProtocolMsg::Stop message is received, - 2 Service keeps a sender to protocol, and the ProtocolMsg::Stop is never sent."); - let network = self.network.lock(); - peers.into_iter().map(|(idx, info)| { - (idx, network.peer_id_of_node(idx).map(|p| p.clone()), info) - }).collect::>() + fn peers(&self) -> Vec<(NodeIndex, PeerInfo)> { + let peers = (*self.peers.read()).clone(); + peers.into_iter().map(|(idx, connected)| (idx, connected.peer_info)).collect() } } @@ -430,8 +428,6 @@ pub enum NetworkMsg { Outgoing(NodeIndex, Message), /// Report a peer. ReportPeer(NodeIndex, Severity), - /// Get a peer id. - GetPeerId(NodeIndex, Sender>), } /// Starts the background thread that handles the networking. @@ -517,14 +513,6 @@ fn run_thread( Severity::Timeout => network_service_2.lock().drop_node(who), } }, - NetworkMsg::GetPeerId(who, sender) => { - let node_id = network_service_2 - .lock() - .peer_id_of_node(who) - .cloned() - .map(|id| id.to_base58()); - let _ = sender.send(node_id); - }, } Ok(()) }) @@ -546,9 +534,9 @@ fn run_thread( FromNetworkMsg::PeerDisconnected(node_index, debug_info)); } } - NetworkServiceEvent::OpenedCustomProtocol { node_index, version, debug_info, .. } => { + NetworkServiceEvent::OpenedCustomProtocol { peer_id, node_index, version, debug_info, .. } => { debug_assert_eq!(version, protocol::CURRENT_VERSION as u8); - let _ = protocol_sender.send(FromNetworkMsg::PeerConnected(node_index, debug_info)); + let _ = protocol_sender.send(FromNetworkMsg::PeerConnected(peer_id, node_index, debug_info)); } NetworkServiceEvent::ClosedCustomProtocol { node_index, debug_info, .. } => { let _ = protocol_sender.send(FromNetworkMsg::PeerDisconnected(node_index, debug_info)); diff --git a/substrate/core/network/src/test/mod.rs b/substrate/core/network/src/test/mod.rs index ae51b9b0da..1ff52c9916 100644 --- a/substrate/core/network/src/test/mod.rs +++ b/substrate/core/network/src/test/mod.rs @@ -41,11 +41,11 @@ use futures::Future; use futures::sync::{mpsc, oneshot}; use keyring::Keyring; use crate::message::{Message, ConsensusEngineId}; -use network_libp2p::{NodeIndex, ProtocolId}; +use network_libp2p::{NodeIndex, ProtocolId, PeerId}; use parity_codec::Encode; -use parking_lot::Mutex; +use parking_lot::{Mutex, RwLock}; use primitives::{H256, Ed25519AuthorityId}; -use crate::protocol::{Context, FromNetworkMsg, Protocol, ProtocolMsg}; +use crate::protocol::{ConnectedPeer, Context, FromNetworkMsg, Protocol, ProtocolMsg}; use runtime_primitives::generic::BlockId; use runtime_primitives::traits::{AuthorityIdFor, Block as BlockT, Digest, DigestItem, Header, NumberFor}; use runtime_primitives::Justification; @@ -120,6 +120,7 @@ pub type PeersClient = client::Client { pub is_offline: Arc, pub is_major_syncing: Arc, + pub peers: Arc>>>, client: Arc, network_to_protocol_sender: Sender>, pub protocol_sender: Sender>, @@ -136,6 +137,7 @@ impl Peer { fn new( is_offline: Arc, is_major_syncing: Arc, + peers: Arc>>>, client: Arc, import_queue: Box>, network_to_protocol_sender: Sender>, @@ -148,6 +150,7 @@ impl Peer { Peer { is_offline, is_major_syncing, + peers, client, network_to_protocol_sender, protocol_sender, @@ -201,7 +204,7 @@ impl Peer { /// Called on connection to other indicated peer. fn on_connect(&self, other: NodeIndex) { - let _ = self.network_to_protocol_sender.send(FromNetworkMsg::PeerConnected(other, String::new())); + let _ = self.network_to_protocol_sender.send(FromNetworkMsg::PeerConnected(PeerId::random(), other, String::new())); } /// Called on disconnect from other indicated peer. @@ -483,9 +486,11 @@ pub trait TestNetFactory: Sized { let specialization = DummySpecialization {}; let is_offline = Arc::new(AtomicBool::new(true)); let is_major_syncing = Arc::new(AtomicBool::new(false)); + let peers: Arc>>> = Arc::new(Default::default()); let (protocol_sender, network_to_protocol_sender) = Protocol::new( is_offline.clone(), is_major_syncing.clone(), + peers.clone(), network_sender.clone(), config.clone(), client.clone(), @@ -498,6 +503,7 @@ pub trait TestNetFactory: Sized { let peer = Arc::new(Peer::new( is_offline, is_major_syncing, + peers, client, import_queue, network_to_protocol_sender, diff --git a/substrate/core/network/src/test/sync.rs b/substrate/core/network/src/test/sync.rs index 795ac202f5..d23b98496b 100644 --- a/substrate/core/network/src/test/sync.rs +++ b/substrate/core/network/src/test/sync.rs @@ -24,6 +24,30 @@ use std::thread; use std::time::Duration; use super::*; +#[test] +fn sync_peers_works() { + let _ = ::env_logger::try_init(); + let mut net = TestNet::new(3); + net.sync(); + for peer in 0..3 { + // Assert peers is up to date. + let peers = net.peer(peer).peers.read(); + assert_eq!(peers.len(), 2); + // And then disconnect. + for other in 0..3 { + if other != peer { + net.peer(peer).on_disconnect(other); + } + } + } + net.sync(); + // Now peers are disconnected. + for peer in 0..3 { + let peers = net.peer(peer).peers.read(); + assert_eq!(peers.len(), 0); + } +} + #[test] fn sync_cycle_from_offline_to_syncing_to_offline() { let _ = ::env_logger::try_init(); diff --git a/substrate/core/rpc/src/system/mod.rs b/substrate/core/rpc/src/system/mod.rs index e3856fac9f..a7684f2379 100644 --- a/substrate/core/rpc/src/system/mod.rs +++ b/substrate/core/rpc/src/system/mod.rs @@ -111,9 +111,9 @@ impl SystemApi::Number> for Sy } fn system_peers(&self) -> Result::Number>>> { - Ok(self.sync.peers().into_iter().map(|(idx, peer_id, p)| PeerInfo { - index: idx, - peer_id: peer_id.map_or_else(Default::default, |p| p.to_base58()), + Ok(self.sync.peers().into_iter().map(|(index, p)| PeerInfo { + index, + peer_id: p.peer_id.to_base58(), roles: format!("{:?}", p.roles), protocol_version: p.protocol_version, best_hash: p.best_hash, diff --git a/substrate/core/rpc/src/system/tests.rs b/substrate/core/rpc/src/system/tests.rs index d28fa202af..18c753f533 100644 --- a/substrate/core/rpc/src/system/tests.rs +++ b/substrate/core/rpc/src/system/tests.rs @@ -16,16 +16,27 @@ use super::*; -use network::{self, SyncState, SyncStatus, ProtocolStatus, NodeIndex, PeerId, PeerInfo as NetworkPeerInfo, PublicKey}; +use network::{self, SyncState, SyncStatus, ProtocolStatus, NodeIndex, PeerId, PeerInfo as NetworkPeerInfo}; use network::config::Roles; use test_client::runtime::Block; use assert_matches::assert_matches; -#[derive(Default)] struct Status { pub peers: usize, pub is_syncing: bool, pub is_dev: bool, + pub peer_id: PeerId, +} + +impl Default for Status { + fn default() -> Status { + Status { + peer_id: PeerId::random(), + peers: 0, + is_syncing: false, + is_dev: false, + } + } } impl network::SyncProvider for Status { @@ -41,8 +52,9 @@ impl network::SyncProvider for Status { } } - fn peers(&self) -> Vec<(NodeIndex, Option, NetworkPeerInfo)> { - vec![(1, Some(PublicKey::Ed25519((0 .. 32).collect::>()).into()), NetworkPeerInfo { + fn peers(&self) -> Vec<(NodeIndex, NetworkPeerInfo)> { + vec![(1, NetworkPeerInfo { + peer_id: self.peer_id.clone(), roles: Roles::FULL, protocol_version: 1, best_hash: Default::default(), @@ -108,6 +120,7 @@ fn system_health() { assert_matches!( api(Status { + peer_id: PeerId::random(), peers: 5, is_syncing: true, is_dev: true, @@ -121,6 +134,7 @@ fn system_health() { assert_eq!( api(Status { + peer_id: PeerId::random(), peers: 5, is_syncing: false, is_dev: false, @@ -134,6 +148,7 @@ fn system_health() { assert_eq!( api(Status { + peer_id: PeerId::random(), peers: 0, is_syncing: false, is_dev: true, @@ -148,11 +163,17 @@ fn system_health() { #[test] fn system_peers() { + let peer_id = PeerId::random(); assert_eq!( - api(None).system_peers().unwrap(), + api(Status { + peer_id: peer_id.clone(), + peers: 1, + is_syncing: false, + is_dev: true, + }).system_peers().unwrap(), vec![PeerInfo { index: 1, - peer_id: "QmS5oyTmdjwBowwAH1D9YQnoe2HyWpVemH8qHiU5RqWPh4".into(), + peer_id: peer_id.to_base58(), roles: "FULL".into(), protocol_version: 1, best_hash: Default::default(),