diff --git a/substrate/.gitlab-ci.yml b/substrate/.gitlab-ci.yml index 82fb604112..127aba4781 100644 --- a/substrate/.gitlab-ci.yml +++ b/substrate/.gitlab-ci.yml @@ -153,7 +153,6 @@ check-web-wasm: - time cargo web build -p substrate-keystore - time cargo web build -p substrate-executor - time cargo web build -p substrate-network - - time cargo web build -p substrate-network-libp2p - time cargo web build -p substrate-panic-handler - time cargo web build -p substrate-peerset - time cargo web build -p substrate-primitives diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index a9d7945194..6e30fb4a75 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -4304,10 +4304,14 @@ name = "substrate-network" version = "2.0.0" dependencies = [ "bitflags 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", "derive_more 0.14.1 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)", + "erased-serde 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", + "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", "fork-tree 2.0.0", "futures 0.1.27 (registry+https://github.com/rust-lang/crates.io-index)", + "libp2p 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)", "linked-hash-map 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)", "linked_hash_set 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", @@ -4316,39 +4320,18 @@ dependencies = [ "parking_lot 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", "rustc-hex 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)", - "sr-primitives 2.0.0", - "substrate-client 2.0.0", - "substrate-consensus-common 2.0.0", - "substrate-keyring 2.0.0", - "substrate-network-libp2p 2.0.0", - "substrate-peerset 2.0.0", - "substrate-primitives 2.0.0", - "substrate-test-runtime-client 2.0.0", - "tokio 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-timer 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", - "void 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", -] - -[[package]] -name = "substrate-network-libp2p" -version = "2.0.0" -dependencies = [ - "byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)", - "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", - "erased-serde 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", - "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", - "futures 0.1.27 (registry+https://github.com/rust-lang/crates.io-index)", - "lazy_static 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)", - "libp2p 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)", - "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", - "parking_lot 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", - "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.92 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.39 (registry+https://github.com/rust-lang/crates.io-index)", "slog 2.4.1 (registry+https://github.com/rust-lang/crates.io-index)", "slog_derive 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "smallvec 0.6.10 (registry+https://github.com/rust-lang/crates.io-index)", + "sr-primitives 2.0.0", + "substrate-client 2.0.0", + "substrate-consensus-common 2.0.0", + "substrate-keyring 2.0.0", "substrate-peerset 2.0.0", + "substrate-primitives 2.0.0", + "substrate-test-runtime-client 2.0.0", "tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", "tokio 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/substrate/core/network-libp2p/Cargo.toml b/substrate/core/network-libp2p/Cargo.toml deleted file mode 100644 index 16978ed02c..0000000000 --- a/substrate/core/network-libp2p/Cargo.toml +++ /dev/null @@ -1,36 +0,0 @@ -[package] -description = "libp2p implementation of the ethcore network library" -homepage = "http://parity.io" -license = "GPL-3.0" -name = "substrate-network-libp2p" -version = "2.0.0" -authors = ["Parity Technologies "] -edition = "2018" - -[dependencies] -byteorder = "1.3" -bytes = "0.4" -fnv = "1.0" -futures = "0.1" -libp2p = { version = "0.9.1", default-features = false, features = ["secp256k1", "libp2p-websocket"] } -parking_lot = "0.8.0" -lazy_static = "1.2" -log = "0.4" -rand = "0.6" -serde = { version = "1.0.70", features = ["derive"] } -serde_json = "1.0.24" -smallvec = "0.6" -substrate-peerset = { path = "../peerset" } -tokio-io = "0.1" -tokio-timer = "0.2" -unsigned-varint = { version = "0.2.1", features = ["codec"] } -void = "1.0" -zeroize = "0.6.0" - -slog = { version = "^2", features = ["nested-values"] } -slog_derive = "0.1.1" -erased-serde = "0.3.9" - -[dev-dependencies] -tempdir = "0.3" -tokio = "0.1" diff --git a/substrate/core/network-libp2p/src/config.rs b/substrate/core/network-libp2p/src/config.rs deleted file mode 100644 index b9ccbb6428..0000000000 --- a/substrate/core/network-libp2p/src/config.rs +++ /dev/null @@ -1,304 +0,0 @@ -// Copyright 2015-2019 Parity Technologies (UK) Ltd. -// This file is part of Substrate. - -// Substrate is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Substrate is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Substrate. If not, see . - -//! Libp2p network configuration. - -use libp2p::identity::{Keypair, secp256k1, ed25519}; -use libp2p::wasm_ext; -use libp2p::{Multiaddr, multiaddr::Protocol}; -use std::error::Error; -use std::{io::{self, Write}, iter, fs, net::Ipv4Addr, path::{Path, PathBuf}}; -use zeroize::Zeroize; - -/// Network service configuration. -#[derive(Clone)] -pub struct NetworkConfiguration { - /// Directory path to store general network configuration. None means nothing will be saved. - pub config_path: Option, - /// Directory path to store network-specific configuration. None means nothing will be saved. - pub net_config_path: Option, - /// Multiaddresses to listen for incoming connections. - pub listen_addresses: Vec, - /// Multiaddresses to advertise. Detected automatically if empty. - pub public_addresses: Vec, - /// List of initial node addresses - pub boot_nodes: Vec, - /// The node key configuration, which determines the node's network identity keypair. - pub node_key: NodeKeyConfig, - /// Maximum allowed number of incoming connections. - pub in_peers: u32, - /// Number of outgoing connections we're trying to maintain. - pub out_peers: u32, - /// List of reserved node addresses. - pub reserved_nodes: Vec, - /// The non-reserved peer mode. - pub non_reserved_mode: NonReservedPeerMode, - /// Client identifier. Sent over the wire for debugging purposes. - pub client_version: String, - /// Name of the node. Sent over the wire for debugging purposes. - pub node_name: String, - /// If true, the network will use mDNS to discover other libp2p nodes on the local network - /// and connect to them if they support the same chain. - pub enable_mdns: bool, - /// Optional external implementation of a libp2p transport. Used in WASM contexts where we need - /// some binding between the networking provided by the operating system or environment and - /// libp2p. - /// - /// This parameter exists whatever the target platform is, but it is expected to be set to - /// `Some` only when compiling for WASM. - pub wasm_external_transport: Option, -} - -impl Default for NetworkConfiguration { - fn default() -> Self { - NetworkConfiguration { - config_path: None, - net_config_path: None, - listen_addresses: Vec::new(), - public_addresses: Vec::new(), - boot_nodes: Vec::new(), - node_key: NodeKeyConfig::Ed25519(Secret::New), - in_peers: 25, - out_peers: 75, - reserved_nodes: Vec::new(), - non_reserved_mode: NonReservedPeerMode::Accept, - client_version: "unknown".into(), - node_name: "unknown".into(), - enable_mdns: false, - wasm_external_transport: None, - } - } -} - -impl NetworkConfiguration { - /// Create a new instance of default settings. - pub fn new() -> Self { - Self::default() - } - - /// Create new default configuration for localhost-only connection with random port (useful for testing) - pub fn new_local() -> NetworkConfiguration { - let mut config = NetworkConfiguration::new(); - config.listen_addresses = vec![ - iter::once(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1))) - .chain(iter::once(Protocol::Tcp(0))) - .collect() - ]; - config - } -} - -/// The policy for connections to non-reserved peers. -#[derive(Clone, Debug, PartialEq, Eq)] -pub enum NonReservedPeerMode { - /// Accept them. This is the default. - Accept, - /// Deny them. - Deny, -} - -impl NonReservedPeerMode { - /// Attempt to parse the peer mode from a string. - pub fn parse(s: &str) -> Option { - match s { - "accept" => Some(NonReservedPeerMode::Accept), - "deny" => Some(NonReservedPeerMode::Deny), - _ => None, - } - } -} - -/// The configuration of a node's secret key, describing the type of key -/// and how it is obtained. A node's identity keypair is the result of -/// the evaluation of the node key configuration. -#[derive(Clone)] -pub enum NodeKeyConfig { - /// A Secp256k1 secret key configuration. - Secp256k1(Secret), - /// A Ed25519 secret key configuration. - Ed25519(Secret) -} - -/// The options for obtaining a Secp256k1 secret key. -pub type Secp256k1Secret = Secret; - -/// The options for obtaining a Ed25519 secret key. -pub type Ed25519Secret = Secret; - -/// The configuration options for obtaining a secret key `K`. -#[derive(Clone)] -pub enum Secret { - /// Use the given secret key `K`. - Input(K), - /// Read the secret key from a file. If the file does not exist, - /// it is created with a newly generated secret key `K`. The format - /// of the file is determined by `K`: - /// - /// * `secp256k1::SecretKey`: An unencoded 32 bytes Secp256k1 secret key. - /// * `ed25519::SecretKey`: An unencoded 32 bytes Ed25519 secret key. - File(PathBuf), - /// Always generate a new secret key `K`. - New -} - -impl NodeKeyConfig { - /// Evaluate a `NodeKeyConfig` to obtain an identity `Keypair`: - /// - /// * If the secret is configured as input, the corresponding keypair is returned. - /// - /// * If the secret is configured as a file, it is read from that file, if it exists. - /// Otherwise a new secret is generated and stored. In either case, the - /// keypair obtained from the secret is returned. - /// - /// * If the secret is configured to be new, it is generated and the corresponding - /// keypair is returned. - pub fn into_keypair(self) -> io::Result { - use NodeKeyConfig::*; - match self { - Secp256k1(Secret::New) => - Ok(Keypair::generate_secp256k1()), - - Secp256k1(Secret::Input(k)) => - Ok(Keypair::Secp256k1(k.into())), - - Secp256k1(Secret::File(f)) => - get_secret(f, - |mut b| secp256k1::SecretKey::from_bytes(&mut b), - secp256k1::SecretKey::generate, - |b| b.to_bytes().to_vec()) - .map(secp256k1::Keypair::from) - .map(Keypair::Secp256k1), - - Ed25519(Secret::New) => - Ok(Keypair::generate_ed25519()), - - Ed25519(Secret::Input(k)) => - Ok(Keypair::Ed25519(k.into())), - - Ed25519(Secret::File(f)) => - get_secret(f, - |mut b| ed25519::SecretKey::from_bytes(&mut b), - ed25519::SecretKey::generate, - |b| b.as_ref().to_vec()) - .map(ed25519::Keypair::from) - .map(Keypair::Ed25519), - } - } -} - -/// Load a secret key from a file, if it exists, or generate a -/// new secret key and write it to that file. In either case, -/// the secret key is returned. -fn get_secret(file: P, parse: F, generate: G, serialize: W) -> io::Result -where - P: AsRef, - F: for<'r> FnOnce(&'r mut [u8]) -> Result, - G: FnOnce() -> K, - E: Error + Send + Sync + 'static, - W: Fn(&K) -> Vec, -{ - std::fs::read(&file) - .and_then(|mut sk_bytes| - parse(&mut sk_bytes) - .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))) - .or_else(|e| { - if e.kind() == io::ErrorKind::NotFound { - file.as_ref().parent().map_or(Ok(()), fs::create_dir_all)?; - let sk = generate(); - let mut sk_vec = serialize(&sk); - write_secret_file(file, &sk_vec)?; - sk_vec.zeroize(); - Ok(sk) - } else { - Err(e) - } - }) -} - -/// Write secret bytes to a file. -fn write_secret_file

(path: P, sk_bytes: &[u8]) -> io::Result<()> -where - P: AsRef -{ - let mut file = open_secret_file(&path)?; - file.write_all(sk_bytes) -} - -/// Opens a file containing a secret key in write mode. -#[cfg(unix)] -fn open_secret_file

(path: P) -> io::Result -where - P: AsRef -{ - use std::os::unix::fs::OpenOptionsExt; - fs::OpenOptions::new() - .write(true) - .create_new(true) - .mode(0o600) - .open(path) -} - -/// Opens a file containing a secret key in write mode. -#[cfg(not(unix))] -fn open_secret_file

(path: P) -> Result -where - P: AsRef -{ - fs::OpenOptions::new() - .write(true) - .create_new(true) - .open(path) -} - -#[cfg(test)] -mod tests { - use super::*; - use tempdir::TempDir; - - fn secret_bytes(kp: &Keypair) -> Vec { - match kp { - Keypair::Ed25519(p) => p.secret().as_ref().iter().cloned().collect(), - Keypair::Secp256k1(p) => p.secret().to_bytes().to_vec(), - _ => panic!("Unexpected keypair.") - } - } - - #[test] - fn test_secret_file() { - let tmp = TempDir::new("x").unwrap(); - std::fs::remove_dir(tmp.path()).unwrap(); // should be recreated - let file = tmp.path().join("x").to_path_buf(); - let kp1 = NodeKeyConfig::Ed25519(Secret::File(file.clone())).into_keypair().unwrap(); - let kp2 = NodeKeyConfig::Ed25519(Secret::File(file.clone())).into_keypair().unwrap(); - assert!(file.is_file() && secret_bytes(&kp1) == secret_bytes(&kp2)) - } - - #[test] - fn test_secret_input() { - let sk = secp256k1::SecretKey::generate(); - let kp1 = NodeKeyConfig::Secp256k1(Secret::Input(sk.clone())).into_keypair().unwrap(); - let kp2 = NodeKeyConfig::Secp256k1(Secret::Input(sk)).into_keypair().unwrap(); - assert!(secret_bytes(&kp1) == secret_bytes(&kp2)); - } - - #[test] - fn test_secret_new() { - let kp1 = NodeKeyConfig::Ed25519(Secret::New).into_keypair().unwrap(); - let kp2 = NodeKeyConfig::Ed25519(Secret::New).into_keypair().unwrap(); - assert!(secret_bytes(&kp1) != secret_bytes(&kp2)); - } -} - diff --git a/substrate/core/network-libp2p/src/lib.rs b/substrate/core/network-libp2p/src/lib.rs deleted file mode 100644 index 540d8d7f0b..0000000000 --- a/substrate/core/network-libp2p/src/lib.rs +++ /dev/null @@ -1,282 +0,0 @@ -// Copyright 2018-2019 Parity Technologies (UK) Ltd. -// This file is part of Substrate. - -// Substrate is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Substrate is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Substrate. If not, see . - -//! Networking layer of Substrate. -//! -//! # Overview -//! -//! This crate handles the following network mechanisms: -//! -//! - Discovering nodes that are part of the network. -//! - Connecting to said nodes and accepting incoming connections. -//! - Encrypting communications between nodes. -//! - Ensure that nodes are really the `PeerId` that they pretend to be. -//! - Ensuring that the nodes belong to the same chain as us before reporting a new connection. -//! -//! From the point of view of our node, each other node on the network has a reputation value in -//! the form of an `i32`. We try to establish connections towards nodes with a higher reputation -//! over nodes with a lower reputation. -//! -//! Establishing connections to other nodes is automatically performed by this crate, and there is -//! no way to influence this, except by adjusting reputations. -//! -//! ## About the term "connecting" -//! -//! The documentation of this crate uses the words "connected" and "disconnected". It is important -//! to note that this doesn't correspond to actual TCP/IP connections and disconnections. Libp2p -//! will maintain connections that aren't reported by the API of this crate, and TCP/IP connections -//! can be kept alive even after we have reported a disconnect in order to potentially reuse them. -//! -//! # Usage -//! -//! > **Important**: This crate is unstable and the API and usage may change. -//! -//! The first step is to crate a [`RegisteredProtocol`] describing the protocol, and a -//! [`NetworkConfiguration`] describing the network. Then call [`start_service`] with them, which -//! returns a [`Service`] object and a [`substrate_peerset::PeersetHandle`]. -//! -//! The former allows you to know what happens on the network and to send messages, while the -//! latter can be used to adjust the reputations of nodes. -//! -//! You must call the `poll` method of [`Service`] in order to make the network progress and in -//! order to update the internal state of the [`Service`]. Calling `poll` will produce -//! [`ServiceEvent`]s, which inform you of what happened on the network. -//! -//! Please keep in mind that the state of the [`Service`] only updates itself in a way -//! corresponding to the [`ServiceEvent`] that `poll` returns. -//! -//! Illustration: -//! -//! - You call [`Service::connected_peers`] to get the list of nodes we are connected to. -//! - If you then call [`Service::connected_peers`] again, the returned list will always be the -//! same, no matter what happened on the wire. -//! - If you then call [`Service::poll`] and a [`ServiceEvent::OpenedCustomProtocol`] event is -//! returned, then the concerned node, and only the concerned node, will be added to the list of -//! nodes we are connected to. -//! - Similarly, if [`Service::poll`] produces a [`ServiceEvent::ClosedCustomProtocol`] event, then -//! only the concerned node will disappear from the list. -//! - And if [`Service::poll`] returns neither [`ServiceEvent::OpenedCustomProtocol`] nor -//! [`ServiceEvent::ClosedCustomProtocol`], then the list of connected nodes doesn't change. -//! -//! ## Example -//! -//! ```no_run -//! # use futures::prelude::*; -//! use substrate_network_libp2p::ServiceEvent; -//! -//! let proto = substrate_network_libp2p::RegisteredProtocol::new(&b"hello"[..], &[0]); -//! let config = substrate_network_libp2p::NetworkConfiguration::default(); -//! let (mut service, _peerset) = substrate_network_libp2p::start_service(config, proto).unwrap(); -//! -//! tokio::run(futures::future::poll_fn(move || { -//! loop { -//! match service.poll().unwrap() { -//! Async::NotReady => return Ok(Async::NotReady), -//! Async::Ready(Some(ServiceEvent::OpenedCustomProtocol { peer_id, .. })) => { -//! println!("now connected to {:?}", peer_id); -//! service.send_custom_message(&peer_id, b"hello world!".to_vec()); -//! } -//! Async::Ready(Some(ServiceEvent::ClosedCustomProtocol { peer_id, .. })) => -//! println!("now disconnected from {:?}", peer_id), -//! Async::Ready(Some(ServiceEvent::CustomMessage { peer_id, message })) => -//! println!("received message from {:?}: {:?}", peer_id, message), -//! Async::Ready(None) => return Ok(Async::Ready(())), -//! _ => {} -//! } -//! } -//! })); -//! ``` -//! - -mod behaviour; -mod config; -mod custom_proto; -mod service_task; -mod transport; - -pub use crate::config::*; -pub use crate::custom_proto::{CustomMessage, RegisteredProtocol}; -pub use crate::config::{NetworkConfiguration, NodeKeyConfig, Secret, NonReservedPeerMode}; -pub use crate::service_task::{start_service, Service, ServiceEvent}; -pub use libp2p::{Multiaddr, multiaddr, build_multiaddr}; -pub use libp2p::{identity, PeerId, core::PublicKey}; - -use libp2p::core::nodes::ConnectedPoint; -use serde::{Deserialize, Serialize}; -use slog_derive::SerdeValue; -use std::{collections::{HashMap, HashSet}, error, fmt, time::Duration}; - -/// Extension trait for `NetworkBehaviour` that also accepts discovering nodes. -pub trait DiscoveryNetBehaviour { - /// Notify the protocol that we have learned about the existence of nodes. - /// - /// Can (or most likely will) be called multiple times with the same `PeerId`s. - /// - /// Also note that there is no notification for expired nodes. The implementer must add a TTL - /// system, or remove nodes that will fail to reach. - fn add_discovered_nodes(&mut self, nodes: impl Iterator); -} - -/// Name of a protocol, transmitted on the wire. Should be unique for each chain. -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct ProtocolId(smallvec::SmallVec<[u8; 6]>); - -impl<'a> From<&'a [u8]> for ProtocolId { - fn from(bytes: &'a [u8]) -> ProtocolId { - ProtocolId(bytes.into()) - } -} - -impl ProtocolId { - /// Exposes the `ProtocolId` as bytes. - pub fn as_bytes(&self) -> &[u8] { - self.0.as_ref() - } -} - -/// Parses a string address and returns the component, if valid. -pub fn parse_str_addr(addr_str: &str) -> Result<(PeerId, Multiaddr), ParseErr> { - let mut addr: Multiaddr = addr_str.parse()?; - - let who = match addr.pop() { - Some(multiaddr::Protocol::P2p(key)) => PeerId::from_multihash(key) - .map_err(|_| ParseErr::InvalidPeerId)?, - _ => return Err(ParseErr::PeerIdMissing), - }; - - Ok((who, addr)) -} - -/// Error that can be generated by `parse_str_addr`. -#[derive(Debug)] -pub enum ParseErr { - /// Error while parsing the multiaddress. - MultiaddrParse(multiaddr::Error), - /// Multihash of the peer ID is invalid. - InvalidPeerId, - /// The peer ID is missing from the address. - PeerIdMissing, -} - -impl fmt::Display for ParseErr { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - ParseErr::MultiaddrParse(err) => write!(f, "{}", err), - ParseErr::InvalidPeerId => write!(f, "Peer id at the end of the address is invalid"), - ParseErr::PeerIdMissing => write!(f, "Peer id is missing from the address"), - } - } -} - -impl error::Error for ParseErr { - fn source(&self) -> Option<&(dyn error::Error + 'static)> { - match self { - ParseErr::MultiaddrParse(err) => Some(err), - ParseErr::InvalidPeerId => None, - ParseErr::PeerIdMissing => None, - } - } -} - -impl From for ParseErr { - fn from(err: multiaddr::Error) -> ParseErr { - ParseErr::MultiaddrParse(err) - } -} - -/// Returns general information about the networking. -/// -/// Meant for general diagnostic purposes. -/// -/// **Warning**: This API is not stable. -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, SerdeValue)] -#[serde(rename_all = "camelCase")] -pub struct NetworkState { - /// PeerId of the local node. - pub peer_id: String, - /// List of addresses the node is currently listening on. - pub listened_addresses: HashSet, - /// List of addresses the node knows it can be reached as. - pub external_addresses: HashSet, - /// List of node we're connected to. - pub connected_peers: HashMap, - /// List of node that we know of but that we're not connected to. - pub not_connected_peers: HashMap, - /// Downloaded bytes per second averaged over the past few seconds. - pub average_download_per_sec: u64, - /// Uploaded bytes per second averaged over the past few seconds. - pub average_upload_per_sec: u64, - /// State of the peerset manager. - pub peerset: serde_json::Value, -} - -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct NetworkStatePeer { - /// How we are connected to the node. - pub endpoint: NetworkStatePeerEndpoint, - /// Node information, as provided by the node itself. Can be empty if not known yet. - pub version_string: Option, - /// Latest ping duration with this node. - pub latest_ping_time: Option, - /// If true, the peer is "enabled", which means that we try to open Substrate-related protocols - /// with this peer. If false, we stick to Kademlia and/or other network-only protocols. - pub enabled: bool, - /// If true, the peer is "open", which means that we have a Substrate-related protocol - /// with this peer. - pub open: bool, - /// List of addresses known for this node. - pub known_addresses: HashSet, -} - -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct NetworkStateNotConnectedPeer { - /// List of addresses known for this node. - pub known_addresses: HashSet, - /// Node information, as provided by the node itself, if we were ever connected to this node. - pub version_string: Option, - /// Latest ping duration with this node, if we were ever connected to this node. - pub latest_ping_time: Option, -} - -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub enum NetworkStatePeerEndpoint { - /// We are dialing the given address. - Dialing(Multiaddr), - /// We are listening. - Listening { - /// Address we're listening on that received the connection. - listen_addr: Multiaddr, - /// Address data is sent back to. - send_back_addr: Multiaddr, - }, -} - -impl From for NetworkStatePeerEndpoint { - fn from(endpoint: ConnectedPoint) -> Self { - match endpoint { - ConnectedPoint::Dialer { address } => - NetworkStatePeerEndpoint::Dialing(address), - ConnectedPoint::Listener { listen_addr, send_back_addr } => - NetworkStatePeerEndpoint::Listening { - listen_addr, - send_back_addr - } - } - } -} diff --git a/substrate/core/network-libp2p/src/service_task.rs b/substrate/core/network-libp2p/src/service_task.rs deleted file mode 100644 index e74757e0b1..0000000000 --- a/substrate/core/network-libp2p/src/service_task.rs +++ /dev/null @@ -1,368 +0,0 @@ -// Copyright 2018-2019 Parity Technologies (UK) Ltd. -// This file is part of Substrate. - -// Substrate is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Substrate is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Substrate. If not, see . - -use crate::{ - behaviour::Behaviour, - config::NodeKeyConfig, - transport, NetworkState, NetworkStatePeer, NetworkStateNotConnectedPeer -}; -use crate::custom_proto::{CustomProto, CustomProtoOut, CustomMessage, RegisteredProtocol}; -use crate::{NetworkConfiguration, NonReservedPeerMode, parse_str_addr}; -use futures::{prelude::*, Stream}; -use libp2p::{Multiaddr, core::swarm::NetworkBehaviour, PeerId}; -use libp2p::core::{Swarm, nodes::Substream, transport::boxed::Boxed, muxing::StreamMuxerBox}; -use libp2p::core::nodes::ConnectedPoint; -use log::{info, error, warn}; -use std::fs; -use std::io::Error as IoError; -use std::path::Path; -use std::sync::Arc; - -/// Starts the substrate libp2p service. -/// -/// Returns a stream that must be polled regularly in order for the networking to function. -pub fn start_service( - config: NetworkConfiguration, - registered_custom: RegisteredProtocol, -) -> Result<(Service, substrate_peerset::PeersetHandle), IoError> -where TMessage: CustomMessage + Send + 'static { - - if let Some(ref path) = config.net_config_path { - fs::create_dir_all(Path::new(path))?; - } - - // List of multiaddresses that we know in the network. - let mut known_addresses = Vec::new(); - let mut bootnodes = Vec::new(); - let mut reserved_nodes = Vec::new(); - - // Process the bootnodes. - for bootnode in config.boot_nodes.iter() { - match parse_str_addr(bootnode) { - Ok((peer_id, addr)) => { - bootnodes.push(peer_id.clone()); - known_addresses.push((peer_id, addr)); - }, - Err(_) => warn!(target: "sub-libp2p", "Not a valid bootnode address: {}", bootnode), - } - } - - // Initialize the reserved peers. - for reserved in config.reserved_nodes.iter() { - if let Ok((peer_id, addr)) = parse_str_addr(reserved) { - reserved_nodes.push(peer_id.clone()); - known_addresses.push((peer_id, addr)); - } else { - warn!(target: "sub-libp2p", "Not a valid reserved node address: {}", reserved); - } - } - - // Build the peerset. - let (peerset, peerset_handle) = substrate_peerset::Peerset::from_config(substrate_peerset::PeersetConfig { - in_peers: config.in_peers, - out_peers: config.out_peers, - bootnodes, - reserved_only: config.non_reserved_mode == NonReservedPeerMode::Deny, - reserved_nodes, - }); - - // Private and public keys configuration. - if let NodeKeyConfig::Secp256k1(_) = config.node_key { - warn!(target: "sub-libp2p", "Secp256k1 keys are deprecated in favour of ed25519"); - } - let local_identity = config.node_key.clone().into_keypair()?; - let local_public = local_identity.public(); - let local_peer_id = local_public.clone().into_peer_id(); - info!(target: "sub-libp2p", "Local node identity is: {}", local_peer_id.to_base58()); - - // Build the swarm. - let (mut swarm, bandwidth) = { - let user_agent = format!("{} ({})", config.client_version, config.node_name); - let proto = CustomProto::new(registered_custom, peerset); - let behaviour = Behaviour::new(proto, user_agent, local_public, known_addresses, config.enable_mdns); - let (transport, bandwidth) = transport::build_transport( - local_identity, - config.wasm_external_transport - ); - (Swarm::new(transport, behaviour, local_peer_id.clone()), bandwidth) - }; - - // Listen on multiaddresses. - for addr in &config.listen_addresses { - if let Err(err) = Swarm::listen_on(&mut swarm, addr.clone()) { - warn!(target: "sub-libp2p", "Can't listen on {} because: {:?}", addr, err) - } - } - - // Add external addresses. - for addr in &config.public_addresses { - Swarm::add_external_address(&mut swarm, addr.clone()); - } - - let service = Service { - swarm, - bandwidth, - injected_events: Vec::new(), - }; - - Ok((service, peerset_handle)) -} - -/// Event produced by the service. -#[derive(Debug)] -pub enum ServiceEvent { - /// A custom protocol substream has been opened with a node. - OpenedCustomProtocol { - /// Identity of the node. - peer_id: PeerId, - /// Version of the protocol that was opened. - version: u8, - /// Node debug info - debug_info: String, - }, - - /// A custom protocol substream has been closed. - ClosedCustomProtocol { - /// Identity of the node. - peer_id: PeerId, - /// Node debug info - debug_info: String, - }, - - /// Receives a message on a custom protocol stream. - CustomMessage { - /// Identity of the node. - peer_id: PeerId, - /// Message that has been received. - message: TMessage, - }, - - /// The substream with a node is clogged. We should avoid sending data to it if possible. - Clogged { - /// Index of the node. - peer_id: PeerId, - /// Copy of the messages that are within the buffer, for further diagnostic. - messages: Vec, - }, -} - -/// Network service. Must be polled regularly in order for the networking to work. -pub struct Service where TMessage: CustomMessage { - /// Stream of events of the swarm. - swarm: Swarm< - Boxed<(PeerId, StreamMuxerBox), IoError>, - Behaviour>, CustomProtoOut, Substream> - >, - - /// Bandwidth logging system. Can be queried to know the average bandwidth consumed. - bandwidth: Arc, - - /// Events to produce on the Stream. - injected_events: Vec>, -} - -impl Service -where TMessage: CustomMessage + Send + 'static { - /// Returns a struct containing tons of useful information about the network. - pub fn state(&mut self) -> NetworkState { - let open = self.swarm.user_protocol().open_peers().cloned().collect::>(); - - let connected_peers = { - let swarm = &mut self.swarm; - open.iter().filter_map(move |peer_id| { - let known_addresses = NetworkBehaviour::addresses_of_peer(&mut **swarm, peer_id) - .into_iter().collect(); - - let endpoint = if let Some(e) = swarm.node(peer_id).map(|i| i.endpoint()) { - e.clone().into() - } else { - error!(target: "sub-libp2p", "Found state inconsistency between custom protocol \ - and debug information about {:?}", peer_id); - return None - }; - - Some((peer_id.to_base58(), NetworkStatePeer { - endpoint, - version_string: swarm.node(peer_id).and_then(|i| i.client_version().map(|s| s.to_owned())).clone(), - latest_ping_time: swarm.node(peer_id).and_then(|i| i.latest_ping()), - enabled: swarm.user_protocol().is_enabled(&peer_id), - open: swarm.user_protocol().is_open(&peer_id), - known_addresses, - })) - }).collect() - }; - - let not_connected_peers = { - let swarm = &mut self.swarm; - let list = swarm.known_peers().filter(|p| open.iter().all(|n| n != *p)) - .cloned().collect::>(); - list.into_iter().map(move |peer_id| { - (peer_id.to_base58(), NetworkStateNotConnectedPeer { - version_string: swarm.node(&peer_id).and_then(|i| i.client_version().map(|s| s.to_owned())).clone(), - latest_ping_time: swarm.node(&peer_id).and_then(|i| i.latest_ping()), - known_addresses: NetworkBehaviour::addresses_of_peer(&mut **swarm, &peer_id) - .into_iter().collect(), - }) - }).collect() - }; - - NetworkState { - peer_id: Swarm::local_peer_id(&self.swarm).to_base58(), - listened_addresses: Swarm::listeners(&self.swarm).cloned().collect(), - external_addresses: Swarm::external_addresses(&self.swarm).cloned().collect(), - average_download_per_sec: self.bandwidth.average_download_per_sec(), - average_upload_per_sec: self.bandwidth.average_upload_per_sec(), - connected_peers, - not_connected_peers, - peerset: self.swarm.user_protocol_mut().peerset_debug_info(), - } - } - - /// Returns an iterator that produces the list of addresses we're listening on. - #[inline] - pub fn listeners(&self) -> impl Iterator { - Swarm::listeners(&self.swarm) - } - - /// Returns the downloaded bytes per second averaged over the past few seconds. - #[inline] - pub fn average_download_per_sec(&self) -> u64 { - self.bandwidth.average_download_per_sec() - } - - /// Returns the uploaded bytes per second averaged over the past few seconds. - #[inline] - pub fn average_upload_per_sec(&self) -> u64 { - self.bandwidth.average_upload_per_sec() - } - - /// Returns the peer id of the local node. - pub fn peer_id(&self) -> &PeerId { - Swarm::local_peer_id(&self.swarm) - } - - /// Returns the list of all the peers we are connected to. - pub fn connected_peers<'a>(&'a self) -> impl Iterator + 'a { - self.swarm.user_protocol().open_peers() - } - - /// Returns the way we are connected to a node. Returns `None` if we are not connected to it. - pub fn node_endpoint(&self, peer_id: &PeerId) -> Option<&ConnectedPoint> { - if self.swarm.user_protocol().is_open(peer_id) { - self.swarm.node(peer_id).map(|n| n.endpoint()) - } else { - None - } - } - - /// Returns the latest client version reported by a node. Can return `Some` even for nodes - /// we're not connected to. - pub fn node_client_version(&self, peer_id: &PeerId) -> Option<&str> { - self.swarm.node(peer_id).and_then(|n| n.client_version()) - } - - /// Sends a message to a peer using the custom protocol. - /// - /// Has no effect if the connection to the node has been closed, or if the node index is - /// invalid. - pub fn send_custom_message( - &mut self, - peer_id: &PeerId, - message: TMessage - ) { - self.swarm.user_protocol_mut().send_packet(peer_id, message); - } - - /// Disconnects a peer. - /// - /// This is asynchronous and will not immediately close the peer. - /// Corresponding closing events will be generated once the closing actually happens. - pub fn drop_node(&mut self, peer_id: &PeerId) { - self.swarm.user_protocol_mut().disconnect_peer(peer_id); - } - - /// Adds a hard-coded address for the given peer, that never expires. - pub fn add_known_address(&mut self, peer_id: PeerId, addr: Multiaddr) { - self.swarm.add_known_address(peer_id, addr) - } - - /// Get debug info for a given peer. - pub fn peer_debug_info(&self, who: &PeerId) -> String { - if let Some(node) = self.swarm.node(who) { - format!("{:?} {}", who, node.debug_info()) - } else { - format!("{:?} (unknown)", who) - } - } - - /// Polls for what happened on the network. - fn poll_swarm(&mut self) -> Poll>, IoError> { - loop { - match self.swarm.poll() { - Ok(Async::Ready(Some(CustomProtoOut::CustomProtocolOpen { peer_id, version, .. }))) => { - let debug_info = self.peer_debug_info(&peer_id); - break Ok(Async::Ready(Some(ServiceEvent::OpenedCustomProtocol { - peer_id, - version, - debug_info, - }))) - } - Ok(Async::Ready(Some(CustomProtoOut::CustomProtocolClosed { peer_id, .. }))) => { - let debug_info = self.peer_debug_info(&peer_id); - break Ok(Async::Ready(Some(ServiceEvent::ClosedCustomProtocol { - peer_id, - debug_info, - }))) - } - Ok(Async::Ready(Some(CustomProtoOut::CustomMessage { peer_id, message }))) => { - break Ok(Async::Ready(Some(ServiceEvent::CustomMessage { - peer_id, - message, - }))) - } - Ok(Async::Ready(Some(CustomProtoOut::Clogged { peer_id, messages }))) => { - break Ok(Async::Ready(Some(ServiceEvent::Clogged { - peer_id, - messages, - }))) - } - Ok(Async::NotReady) => break Ok(Async::NotReady), - Ok(Async::Ready(None)) => unreachable!("The Swarm stream never ends"), - Err(_) => unreachable!("The Swarm never errors"), - } - } - } -} - -impl Stream for Service where TMessage: CustomMessage + Send + 'static { - type Item = ServiceEvent; - type Error = IoError; - - fn poll(&mut self) -> Poll, Self::Error> { - if !self.injected_events.is_empty() { - return Ok(Async::Ready(Some(self.injected_events.remove(0)))); - } - - match self.poll_swarm()? { - Async::Ready(value) => return Ok(Async::Ready(value)), - Async::NotReady => (), - } - - // The only way we reach this is if we went through all the `NotReady` paths above, - // ensuring the current task is registered everywhere. - Ok(Async::NotReady) - } -} diff --git a/substrate/core/network/Cargo.toml b/substrate/core/network/Cargo.toml index 1b3a473288..85fef9c3c7 100644 --- a/substrate/core/network/Cargo.toml +++ b/substrate/core/network/Cargo.toml @@ -6,38 +6,48 @@ license = "GPL-3.0" authors = ["Parity Technologies "] edition = "2018" -[lib] - [dependencies] +bytes = "0.4" derive_more = "0.14.0" log = "0.4" parking_lot = "0.8.0" bitflags = "1.0" +fnv = "1.0" futures = "0.1.17" linked-hash-map = "0.5" linked_hash_set = "0.1.3" lru-cache = "0.1.1" rustc-hex = "2.0" rand = "0.6" +libp2p = { version = "0.9.1", default-features = false, features = ["secp256k1", "libp2p-websocket"] } fork-tree = { path = "../../core/util/fork-tree" } primitives = { package = "substrate-primitives", path = "../../core/primitives" } consensus = { package = "substrate-consensus-common", path = "../../core/consensus/common" } client = { package = "substrate-client", path = "../../core/client" } runtime_primitives = { package = "sr-primitives", path = "../../core/sr-primitives" } parity-codec = { version = "3.3", features = ["derive"] } -network_libp2p = { package = "substrate-network-libp2p", path = "../../core/network-libp2p" } peerset = { package = "substrate-peerset", path = "../../core/peerset" } +serde = { version = "1.0.70", features = ["derive"] } +serde_json = "1.0.24" +slog = { version = "^2", features = ["nested-values"] } +slog_derive = "0.1.1" +smallvec = "0.6" +tokio-io = "0.1" tokio-timer = "0.2.11" tokio = { version = "0.1.11", optional = true } +unsigned-varint = { version = "0.2.1", features = ["codec"] } keyring = { package = "substrate-keyring", path = "../../core/keyring", optional = true } test-client = { package = "substrate-test-runtime-client", path = "../../core/test-runtime/client", optional = true } +erased-serde = "0.3.9" void = "1.0" +zeroize = "0.6.0" [dev-dependencies] env_logger = { version = "0.6" } keyring = { package = "substrate-keyring", path = "../../core/keyring" } test-client = { package = "substrate-test-runtime-client", path = "../../core/test-runtime/client" } consensus = { package = "substrate-consensus-common", path = "../../core/consensus/common", features = ["test-helpers"] } +tempdir = "0.3" tokio = "0.1.11" [features] diff --git a/substrate/core/network-libp2p/src/behaviour.rs b/substrate/core/network/src/behaviour.rs similarity index 61% rename from substrate/core/network-libp2p/src/behaviour.rs rename to substrate/core/network/src/behaviour.rs index 379983a3fb..35684bc257 100644 --- a/substrate/core/network-libp2p/src/behaviour.rs +++ b/substrate/core/network/src/behaviour.rs @@ -14,7 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . -use crate::DiscoveryNetBehaviour; +use crate::{debug_info, discovery::DiscoveryBehaviour, discovery::DiscoveryOut, DiscoveryNetBehaviour}; use futures::prelude::*; use libp2p::NetworkBehaviour; use libp2p::core::{Multiaddr, PeerId, ProtocolsHandler, protocols_handler::IntoProtocolsHandler, PublicKey}; @@ -22,18 +22,12 @@ use libp2p::core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourActi use libp2p::core::swarm::{NetworkBehaviourEventProcess, PollParameters}; #[cfg(not(target_os = "unknown"))] use libp2p::core::swarm::toggle::Toggle; -use libp2p::kad::{Kademlia, KademliaOut}; #[cfg(not(target_os = "unknown"))] use libp2p::mdns::{Mdns, MdnsEvent}; -use libp2p::multiaddr::Protocol; -use log::{debug, info, trace, warn}; -use std::{cmp, iter, time::Duration}; -use tokio_io::{AsyncRead, AsyncWrite}; -use tokio_timer::{Delay, clock::Clock}; +use log::warn; +use std::iter; use void; -mod debug_info; - /// General behaviour of the network. #[derive(NetworkBehaviour)] #[behaviour(out_event = "TBehaviourEv", poll_method = "poll")] @@ -65,28 +59,15 @@ impl Behaviour Self { let debug_info = debug_info::DebugInfoBehaviour::new(user_agent, local_public_key.clone()); - let mut kademlia = Kademlia::new(local_public_key.clone().into_peer_id()); - for (peer_id, addr) in &known_addresses { - kademlia.add_address(peer_id, addr.clone()); - } - if enable_mdns { #[cfg(target_os = "unknown")] warn!(target: "sub-libp2p", "mDNS is not available on this platform"); } - let clock = Clock::new(); Behaviour { user_protocol: UserBehaviourWrap(user_protocol), debug_info, - discovery: DiscoveryBehaviour { - user_defined: known_addresses, - kademlia, - next_kad_random_query: Delay::new(clock.now()), - duration_to_next_kad: Duration::from_secs(1), - clock, - local_peer_id: local_public_key.into_peer_id(), - }, + discovery: DiscoveryBehaviour::new(local_public_key, known_addresses), #[cfg(not(target_os = "unknown"))] mdns: if enable_mdns { match Mdns::new() { @@ -105,14 +86,12 @@ impl Behaviour impl Iterator { - self.discovery.kademlia.kbuckets_entries() + self.discovery.known_peers() } /// Adds a hard-coded address for the given peer, that never expires. pub fn add_known_address(&mut self, peer_id: PeerId, addr: Multiaddr) { - if self.discovery.user_defined.iter().all(|(p, a)| *p != peer_id && *a != addr) { - self.discovery.user_defined.push((peer_id, addr)); - } + self.discovery.add_known_address(peer_id, addr) } /// Borrows `self` and returns a struct giving access to the information about a node. @@ -165,33 +144,20 @@ impl NetworkBehaviourEventProcess NetworkBehaviourEventProcess +impl NetworkBehaviourEventProcess for Behaviour where TBehaviour: DiscoveryNetBehaviour { - fn inject_event(&mut self, out: KademliaOut) { + fn inject_event(&mut self, out: DiscoveryOut) { match out { - KademliaOut::Discovered { .. } => {} - KademliaOut::KBucketAdded { peer_id, .. } => { + DiscoveryOut::Discovered(peer_id) => { self.user_protocol.0.add_discovered_nodes(iter::once(peer_id)); } - KademliaOut::FindNodeResult { key, closer_peers } => { - trace!(target: "sub-libp2p", "Libp2p => Query for {:?} yielded {:?} results", - key, closer_peers.len()); - if closer_peers.is_empty() { - warn!(target: "sub-libp2p", "Libp2p => Random Kademlia query has yielded empty \ - results"); - } - } - // We never start any other type of query. - KademliaOut::GetProvidersResult { .. } => {} - KademliaOut::GetValueResult(_) => {} - KademliaOut::PutValueResult(_) => {} } } } @@ -287,120 +253,3 @@ impl NetworkBehaviour for UserBehaviourWrap { self.0.inject_new_external_addr(addr) } } - -/// Implementation of `NetworkBehaviour` that discovers the nodes on the network. -pub struct DiscoveryBehaviour { - /// User-defined list of nodes and their addresses. Typically includes bootstrap nodes and - /// reserved nodes. - user_defined: Vec<(PeerId, Multiaddr)>, - /// Kademlia requests and answers. - kademlia: Kademlia, - /// Stream that fires when we need to perform the next random Kademlia query. - next_kad_random_query: Delay, - /// After `next_kad_random_query` triggers, the next one triggers after this duration. - duration_to_next_kad: Duration, - /// `Clock` instance that uses the current execution context's source of time. - clock: Clock, - /// Identity of our local node. - local_peer_id: PeerId, -} - -impl NetworkBehaviour for DiscoveryBehaviour -where - TSubstream: AsyncRead + AsyncWrite, -{ - type ProtocolsHandler = as NetworkBehaviour>::ProtocolsHandler; - type OutEvent = as NetworkBehaviour>::OutEvent; - - fn new_handler(&mut self) -> Self::ProtocolsHandler { - NetworkBehaviour::new_handler(&mut self.kademlia) - } - - fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { - let mut list = self.user_defined.iter() - .filter_map(|(p, a)| if p == peer_id { Some(a.clone()) } else { None }) - .collect::>(); - list.extend(self.kademlia.addresses_of_peer(peer_id)); - trace!(target: "sub-libp2p", "Addresses of {:?} are {:?}", peer_id, list); - if list.is_empty() { - if self.kademlia.kbuckets_entries().any(|p| p == peer_id) { - debug!(target: "sub-libp2p", "Requested dialing to {:?} (peer in k-buckets), \ - and no address was found", peer_id); - } else { - debug!(target: "sub-libp2p", "Requested dialing to {:?} (peer not in k-buckets), \ - and no address was found", peer_id); - } - } - list - } - - fn inject_connected(&mut self, peer_id: PeerId, endpoint: ConnectedPoint) { - NetworkBehaviour::inject_connected(&mut self.kademlia, peer_id, endpoint) - } - - fn inject_disconnected(&mut self, peer_id: &PeerId, endpoint: ConnectedPoint) { - NetworkBehaviour::inject_disconnected(&mut self.kademlia, peer_id, endpoint) - } - - fn inject_replaced(&mut self, peer_id: PeerId, closed: ConnectedPoint, opened: ConnectedPoint) { - NetworkBehaviour::inject_replaced(&mut self.kademlia, peer_id, closed, opened) - } - - fn inject_node_event( - &mut self, - peer_id: PeerId, - event: ::OutEvent, - ) { - NetworkBehaviour::inject_node_event(&mut self.kademlia, peer_id, event) - } - - fn inject_new_external_addr(&mut self, addr: &Multiaddr) { - let new_addr = addr.clone() - .with(Protocol::P2p(self.local_peer_id.clone().into())); - info!(target: "sub-libp2p", "Discovered new external address for our node: {}", new_addr); - } - - fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) { - info!(target: "sub-libp2p", "No longer listening on {}", addr); - } - - fn poll( - &mut self, - params: &mut PollParameters, - ) -> Async< - NetworkBehaviourAction< - ::InEvent, - Self::OutEvent, - >, - > { - // Poll Kademlia. - match self.kademlia.poll(params) { - Async::Ready(action) => return Async::Ready(action), - Async::NotReady => (), - } - - // Poll the stream that fires when we need to start a random Kademlia query. - loop { - match self.next_kad_random_query.poll() { - Ok(Async::NotReady) => break, - Ok(Async::Ready(_)) => { - let random_peer_id = PeerId::random(); - debug!(target: "sub-libp2p", "Libp2p <= Starting random Kademlia request for \ - {:?}", random_peer_id); - self.kademlia.find_node(random_peer_id); - - // Reset the `Delay` to the next random. - self.next_kad_random_query.reset(self.clock.now() + self.duration_to_next_kad); - self.duration_to_next_kad = cmp::min(self.duration_to_next_kad * 2, - Duration::from_secs(60)); - }, - Err(err) => { - warn!(target: "sub-libp2p", "Kademlia query timer errored: {:?}", err); - break - } - } - } - - Async::NotReady - } -} diff --git a/substrate/core/network/src/config.rs b/substrate/core/network/src/config.rs index d8fd0f68c7..fd0a3a924e 100644 --- a/substrate/core/network/src/config.rs +++ b/substrate/core/network/src/config.rs @@ -17,16 +17,22 @@ //! Configuration for the networking layer of Substrate. pub use crate::protocol::ProtocolConfig; -pub use network_libp2p::{NonReservedPeerMode, NetworkConfiguration, NodeKeyConfig, ProtocolId, Secret}; +use crate::ProtocolId; +use crate::chain::{Client, FinalityProofProvider}; +use crate::on_demand_layer::OnDemand; +use crate::service::{ExHashT, TransactionPool}; use bitflags::bitflags; use consensus::import_queue::ImportQueue; -use crate::chain::{Client, FinalityProofProvider}; use parity_codec; -use crate::on_demand_layer::OnDemand; use runtime_primitives::traits::{Block as BlockT}; -use crate::service::{ExHashT, TransactionPool}; use std::sync::Arc; +use libp2p::identity::{Keypair, secp256k1, ed25519}; +use libp2p::wasm_ext; +use libp2p::{Multiaddr, multiaddr::Protocol}; +use std::error::Error; +use std::{io::{self, Write}, iter, fs, net::Ipv4Addr, path::{Path, PathBuf}}; +use zeroize::Zeroize; /// Service initialization parameters. pub struct Params { @@ -87,3 +93,282 @@ impl parity_codec::Decode for Roles { Self::from_bits(input.read_byte()?) } } + +/// Network service configuration. +#[derive(Clone)] +pub struct NetworkConfiguration { + /// Directory path to store general network configuration. None means nothing will be saved. + pub config_path: Option, + /// Directory path to store network-specific configuration. None means nothing will be saved. + pub net_config_path: Option, + /// Multiaddresses to listen for incoming connections. + pub listen_addresses: Vec, + /// Multiaddresses to advertise. Detected automatically if empty. + pub public_addresses: Vec, + /// List of initial node addresses + pub boot_nodes: Vec, + /// The node key configuration, which determines the node's network identity keypair. + pub node_key: NodeKeyConfig, + /// Maximum allowed number of incoming connections. + pub in_peers: u32, + /// Number of outgoing connections we're trying to maintain. + pub out_peers: u32, + /// List of reserved node addresses. + pub reserved_nodes: Vec, + /// The non-reserved peer mode. + pub non_reserved_mode: NonReservedPeerMode, + /// Client identifier. Sent over the wire for debugging purposes. + pub client_version: String, + /// Name of the node. Sent over the wire for debugging purposes. + pub node_name: String, + /// If true, the network will use mDNS to discover other libp2p nodes on the local network + /// and connect to them if they support the same chain. + pub enable_mdns: bool, + /// Optional external implementation of a libp2p transport. Used in WASM contexts where we need + /// some binding between the networking provided by the operating system or environment and + /// libp2p. + /// + /// This parameter exists whatever the target platform is, but it is expected to be set to + /// `Some` only when compiling for WASM. + pub wasm_external_transport: Option, +} + +impl Default for NetworkConfiguration { + fn default() -> Self { + NetworkConfiguration { + config_path: None, + net_config_path: None, + listen_addresses: Vec::new(), + public_addresses: Vec::new(), + boot_nodes: Vec::new(), + node_key: NodeKeyConfig::Ed25519(Secret::New), + in_peers: 25, + out_peers: 75, + reserved_nodes: Vec::new(), + non_reserved_mode: NonReservedPeerMode::Accept, + client_version: "unknown".into(), + node_name: "unknown".into(), + enable_mdns: false, + wasm_external_transport: None, + } + } +} + +impl NetworkConfiguration { + /// Create a new instance of default settings. + pub fn new() -> Self { + Self::default() + } + + /// Create new default configuration for localhost-only connection with random port (useful for testing) + pub fn new_local() -> NetworkConfiguration { + let mut config = NetworkConfiguration::new(); + config.listen_addresses = vec![ + iter::once(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1))) + .chain(iter::once(Protocol::Tcp(0))) + .collect() + ]; + config + } +} + +/// The policy for connections to non-reserved peers. +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum NonReservedPeerMode { + /// Accept them. This is the default. + Accept, + /// Deny them. + Deny, +} + +impl NonReservedPeerMode { + /// Attempt to parse the peer mode from a string. + pub fn parse(s: &str) -> Option { + match s { + "accept" => Some(NonReservedPeerMode::Accept), + "deny" => Some(NonReservedPeerMode::Deny), + _ => None, + } + } +} + +/// The configuration of a node's secret key, describing the type of key +/// and how it is obtained. A node's identity keypair is the result of +/// the evaluation of the node key configuration. +#[derive(Clone)] +pub enum NodeKeyConfig { + /// A Secp256k1 secret key configuration. + Secp256k1(Secret), + /// A Ed25519 secret key configuration. + Ed25519(Secret) +} + +/// The options for obtaining a Secp256k1 secret key. +pub type Secp256k1Secret = Secret; + +/// The options for obtaining a Ed25519 secret key. +pub type Ed25519Secret = Secret; + +/// The configuration options for obtaining a secret key `K`. +#[derive(Clone)] +pub enum Secret { + /// Use the given secret key `K`. + Input(K), + /// Read the secret key from a file. If the file does not exist, + /// it is created with a newly generated secret key `K`. The format + /// of the file is determined by `K`: + /// + /// * `secp256k1::SecretKey`: An unencoded 32 bytes Secp256k1 secret key. + /// * `ed25519::SecretKey`: An unencoded 32 bytes Ed25519 secret key. + File(PathBuf), + /// Always generate a new secret key `K`. + New +} + +impl NodeKeyConfig { + /// Evaluate a `NodeKeyConfig` to obtain an identity `Keypair`: + /// + /// * If the secret is configured as input, the corresponding keypair is returned. + /// + /// * If the secret is configured as a file, it is read from that file, if it exists. + /// Otherwise a new secret is generated and stored. In either case, the + /// keypair obtained from the secret is returned. + /// + /// * If the secret is configured to be new, it is generated and the corresponding + /// keypair is returned. + pub fn into_keypair(self) -> io::Result { + use NodeKeyConfig::*; + match self { + Secp256k1(Secret::New) => + Ok(Keypair::generate_secp256k1()), + + Secp256k1(Secret::Input(k)) => + Ok(Keypair::Secp256k1(k.into())), + + Secp256k1(Secret::File(f)) => + get_secret(f, + |mut b| secp256k1::SecretKey::from_bytes(&mut b), + secp256k1::SecretKey::generate, + |b| b.to_bytes().to_vec()) + .map(secp256k1::Keypair::from) + .map(Keypair::Secp256k1), + + Ed25519(Secret::New) => + Ok(Keypair::generate_ed25519()), + + Ed25519(Secret::Input(k)) => + Ok(Keypair::Ed25519(k.into())), + + Ed25519(Secret::File(f)) => + get_secret(f, + |mut b| ed25519::SecretKey::from_bytes(&mut b), + ed25519::SecretKey::generate, + |b| b.as_ref().to_vec()) + .map(ed25519::Keypair::from) + .map(Keypair::Ed25519), + } + } +} + +/// Load a secret key from a file, if it exists, or generate a +/// new secret key and write it to that file. In either case, +/// the secret key is returned. +fn get_secret(file: P, parse: F, generate: G, serialize: W) -> io::Result +where + P: AsRef, + F: for<'r> FnOnce(&'r mut [u8]) -> Result, + G: FnOnce() -> K, + E: Error + Send + Sync + 'static, + W: Fn(&K) -> Vec, +{ + std::fs::read(&file) + .and_then(|mut sk_bytes| + parse(&mut sk_bytes) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))) + .or_else(|e| { + if e.kind() == io::ErrorKind::NotFound { + file.as_ref().parent().map_or(Ok(()), fs::create_dir_all)?; + let sk = generate(); + let mut sk_vec = serialize(&sk); + write_secret_file(file, &sk_vec)?; + sk_vec.zeroize(); + Ok(sk) + } else { + Err(e) + } + }) +} + +/// Write secret bytes to a file. +fn write_secret_file

(path: P, sk_bytes: &[u8]) -> io::Result<()> +where + P: AsRef +{ + let mut file = open_secret_file(&path)?; + file.write_all(sk_bytes) +} + +/// Opens a file containing a secret key in write mode. +#[cfg(unix)] +fn open_secret_file

(path: P) -> io::Result +where + P: AsRef +{ + use std::os::unix::fs::OpenOptionsExt; + fs::OpenOptions::new() + .write(true) + .create_new(true) + .mode(0o600) + .open(path) +} + +/// Opens a file containing a secret key in write mode. +#[cfg(not(unix))] +fn open_secret_file

(path: P) -> Result +where + P: AsRef +{ + fs::OpenOptions::new() + .write(true) + .create_new(true) + .open(path) +} + +#[cfg(test)] +mod tests { + use super::*; + use tempdir::TempDir; + + fn secret_bytes(kp: &Keypair) -> Vec { + match kp { + Keypair::Ed25519(p) => p.secret().as_ref().iter().cloned().collect(), + Keypair::Secp256k1(p) => p.secret().to_bytes().to_vec(), + _ => panic!("Unexpected keypair.") + } + } + + #[test] + fn test_secret_file() { + let tmp = TempDir::new("x").unwrap(); + std::fs::remove_dir(tmp.path()).unwrap(); // should be recreated + let file = tmp.path().join("x").to_path_buf(); + let kp1 = NodeKeyConfig::Ed25519(Secret::File(file.clone())).into_keypair().unwrap(); + let kp2 = NodeKeyConfig::Ed25519(Secret::File(file.clone())).into_keypair().unwrap(); + assert!(file.is_file() && secret_bytes(&kp1) == secret_bytes(&kp2)) + } + + #[test] + fn test_secret_input() { + let sk = secp256k1::SecretKey::generate(); + let kp1 = NodeKeyConfig::Secp256k1(Secret::Input(sk.clone())).into_keypair().unwrap(); + let kp2 = NodeKeyConfig::Secp256k1(Secret::Input(sk)).into_keypair().unwrap(); + assert!(secret_bytes(&kp1) == secret_bytes(&kp2)); + } + + #[test] + fn test_secret_new() { + let kp1 = NodeKeyConfig::Ed25519(Secret::New).into_keypair().unwrap(); + let kp2 = NodeKeyConfig::Ed25519(Secret::New).into_keypair().unwrap(); + assert!(secret_bytes(&kp1) != secret_bytes(&kp2)); + } +} diff --git a/substrate/core/network-libp2p/src/custom_proto/behaviour.rs b/substrate/core/network/src/custom_proto/behaviour.rs similarity index 97% rename from substrate/core/network-libp2p/src/custom_proto/behaviour.rs rename to substrate/core/network/src/custom_proto/behaviour.rs index 41b3b32218..975a1d2f3a 100644 --- a/substrate/core/network-libp2p/src/custom_proto/behaviour.rs +++ b/substrate/core/network/src/custom_proto/behaviour.rs @@ -14,7 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . -use crate::DiscoveryNetBehaviour; +use crate::{DiscoveryNetBehaviour, ProtocolId}; use crate::custom_proto::handler::{CustomProtoHandlerProto, CustomProtoHandlerOut, CustomProtoHandlerIn}; use crate::custom_proto::upgrade::{CustomMessage, RegisteredProtocol}; use fnv::FnvHashMap; @@ -23,7 +23,8 @@ use libp2p::core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourActi use libp2p::core::{Multiaddr, PeerId}; use log::{debug, error, trace, warn}; use smallvec::SmallVec; -use std::{borrow::Cow, collections::hash_map::Entry, cmp, error, marker::PhantomData, mem, time::Duration, time::Instant}; +use std::{borrow::Cow, collections::hash_map::Entry, cmp, error, marker::PhantomData, mem}; +use std::time::{Duration, Instant}; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_timer::clock::Clock; @@ -62,7 +63,7 @@ pub struct CustomProto { protocol: RegisteredProtocol, /// Receiver for instructions about who to connect to or disconnect from. - peerset: substrate_peerset::Peerset, + peerset: peerset::Peerset, /// List of peers in our state. peers: FnvHashMap, @@ -73,7 +74,7 @@ pub struct CustomProto { /// We generate indices to identify incoming connections. This is the next value for the index /// to use when a connection is incoming. - next_incoming_index: substrate_peerset::IncomingIndex, + next_incoming_index: peerset::IncomingIndex, /// Events to produce from `poll()`. events: SmallVec<[NetworkBehaviourAction, CustomProtoOut>; 4]>, @@ -179,7 +180,7 @@ struct IncomingPeer { /// connection corresponding to it has been closed or replaced already. alive: bool, /// Id that the we sent to the peerset. - incoming_id: substrate_peerset::IncomingIndex, + incoming_id: peerset::IncomingIndex, } /// Event that can be emitted by the `CustomProto`. @@ -224,15 +225,18 @@ pub enum CustomProtoOut { impl CustomProto { /// Creates a `CustomProtos`. pub fn new( - protocol: RegisteredProtocol, - peerset: substrate_peerset::Peerset, + protocol: impl Into, + versions: &[u8], + peerset: peerset::Peerset, ) -> Self { + let protocol = RegisteredProtocol::new(protocol, versions); + CustomProto { protocol, peerset, peers: FnvHashMap::default(), incoming: SmallVec::new(), - next_incoming_index: substrate_peerset::IncomingIndex(0), + next_incoming_index: peerset::IncomingIndex(0), events: SmallVec::new(), marker: PhantomData, clock: Clock::new(), @@ -514,7 +518,7 @@ impl CustomProto { } /// Function that is called when the peerset wants us to accept an incoming node. - fn peerset_report_accept(&mut self, index: substrate_peerset::IncomingIndex) { + fn peerset_report_accept(&mut self, index: peerset::IncomingIndex) { let incoming = if let Some(pos) = self.incoming.iter().position(|i| i.incoming_id == index) { self.incoming.remove(pos) } else { @@ -558,7 +562,7 @@ impl CustomProto { } /// Function that is called when the peerset wants us to reject an incoming node. - fn peerset_report_reject(&mut self, index: substrate_peerset::IncomingIndex) { + fn peerset_report_reject(&mut self, index: peerset::IncomingIndex) { let incoming = if let Some(pos) = self.incoming.iter().position(|i| i.incoming_id == index) { self.incoming.remove(pos) } else { @@ -939,16 +943,16 @@ where // Note that the peerset is a *best effort* crate, and we have to use defensive programming. loop { match self.peerset.poll() { - Ok(Async::Ready(Some(substrate_peerset::Message::Accept(index)))) => { + Ok(Async::Ready(Some(peerset::Message::Accept(index)))) => { self.peerset_report_accept(index); } - Ok(Async::Ready(Some(substrate_peerset::Message::Reject(index)))) => { + Ok(Async::Ready(Some(peerset::Message::Reject(index)))) => { self.peerset_report_reject(index); } - Ok(Async::Ready(Some(substrate_peerset::Message::Connect(id)))) => { + Ok(Async::Ready(Some(peerset::Message::Connect(id)))) => { self.peerset_report_connect(id); } - Ok(Async::Ready(Some(substrate_peerset::Message::Drop(id)))) => { + Ok(Async::Ready(Some(peerset::Message::Drop(id)))) => { self.peerset_report_disconnect(id); } Ok(Async::Ready(None)) => { diff --git a/substrate/core/network-libp2p/src/custom_proto/handler.rs b/substrate/core/network/src/custom_proto/handler.rs similarity index 100% rename from substrate/core/network-libp2p/src/custom_proto/handler.rs rename to substrate/core/network/src/custom_proto/handler.rs diff --git a/substrate/core/network-libp2p/src/custom_proto/mod.rs b/substrate/core/network/src/custom_proto/mod.rs similarity index 93% rename from substrate/core/network-libp2p/src/custom_proto/mod.rs rename to substrate/core/network/src/custom_proto/mod.rs index 261f710d8d..a4fdebbb31 100644 --- a/substrate/core/network-libp2p/src/custom_proto/mod.rs +++ b/substrate/core/network/src/custom_proto/mod.rs @@ -15,7 +15,7 @@ // along with Substrate. If not, see . pub use self::behaviour::{CustomProto, CustomProtoOut}; -pub use self::upgrade::{CustomMessage, RegisteredProtocol}; +pub use self::upgrade::CustomMessage; mod behaviour; mod handler; diff --git a/substrate/core/network-libp2p/src/custom_proto/upgrade.rs b/substrate/core/network/src/custom_proto/upgrade.rs similarity index 97% rename from substrate/core/network-libp2p/src/custom_proto/upgrade.rs rename to substrate/core/network/src/custom_proto/upgrade.rs index bc61ff74e8..0df280ad1a 100644 --- a/substrate/core/network-libp2p/src/custom_proto/upgrade.rs +++ b/substrate/core/network/src/custom_proto/upgrade.rs @@ -62,11 +62,6 @@ impl RegisteredProtocol { marker: PhantomData, } } - - /// Returns the ID of the protocol. - pub fn id(&self) -> &ProtocolId { - &self.id - } } impl Clone for RegisteredProtocol { @@ -93,8 +88,6 @@ pub struct RegisteredProtocolSubstream { requires_poll_complete: bool, /// The underlying substream. inner: stream::Fuse, UviBytes>>>, - /// Id of the protocol. - protocol_id: ProtocolId, /// Version of the protocol that was negotiated. protocol_version: u8, /// If true, we have sent a "remote is clogged" event recently and shouldn't send another one @@ -105,14 +98,7 @@ pub struct RegisteredProtocolSubstream { } impl RegisteredProtocolSubstream { - /// Returns the protocol id. - #[inline] - pub fn protocol_id(&self) -> &ProtocolId { - &self.protocol_id - } - /// Returns the version of the protocol that was negotiated. - #[inline] pub fn protocol_version(&self) -> u8 { self.protocol_version } @@ -310,7 +296,6 @@ where TSubstream: AsyncRead + AsyncWrite, send_queue: VecDeque::new(), requires_poll_complete: false, inner: framed.fuse(), - protocol_id: self.id, protocol_version: info.version, clogged_fuse: false, marker: PhantomData, @@ -338,7 +323,6 @@ where TSubstream: AsyncRead + AsyncWrite, send_queue: VecDeque::new(), requires_poll_complete: false, inner: framed.fuse(), - protocol_id: self.id, protocol_version: info.version, clogged_fuse: false, marker: PhantomData, diff --git a/substrate/core/network-libp2p/src/behaviour/debug_info.rs b/substrate/core/network/src/debug_info.rs similarity index 98% rename from substrate/core/network-libp2p/src/behaviour/debug_info.rs rename to substrate/core/network/src/debug_info.rs index 46c7422fd7..f482f13fc2 100644 --- a/substrate/core/network-libp2p/src/behaviour/debug_info.rs +++ b/substrate/core/network/src/debug_info.rs @@ -133,11 +133,6 @@ impl<'a> Node<'a> { pub fn latest_ping(&self) -> Option { self.0.latest_ping } - - /// Generates an arbitrary string containing debug information about the node. - pub fn debug_info(&self) -> String { - format!("(version: {:?}) through {:?}", self.0.client_version, self.0.endpoint) - } } /// Event that can be emitted by the behaviour. diff --git a/substrate/core/network/src/discovery.rs b/substrate/core/network/src/discovery.rs new file mode 100644 index 0000000000..4e44d9fa9e --- /dev/null +++ b/substrate/core/network/src/discovery.rs @@ -0,0 +1,302 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +use futures::prelude::*; +use libp2p::core::{Multiaddr, PeerId, ProtocolsHandler, PublicKey}; +use libp2p::core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction}; +use libp2p::core::swarm::PollParameters; +use libp2p::kad::{Kademlia, KademliaOut}; +use libp2p::multiaddr::Protocol; +use log::{debug, info, trace, warn}; +use std::{cmp, time::Duration}; +use tokio_io::{AsyncRead, AsyncWrite}; +use tokio_timer::{Delay, clock::Clock}; + +/// Implementation of `NetworkBehaviour` that discovers the nodes on the network. +pub struct DiscoveryBehaviour { + /// User-defined list of nodes and their addresses. Typically includes bootstrap nodes and + /// reserved nodes. + user_defined: Vec<(PeerId, Multiaddr)>, + /// Kademlia requests and answers. + kademlia: Kademlia, + /// Stream that fires when we need to perform the next random Kademlia query. + next_kad_random_query: Delay, + /// After `next_kad_random_query` triggers, the next one triggers after this duration. + duration_to_next_kad: Duration, + /// `Clock` instance that uses the current execution context's source of time. + clock: Clock, + /// Identity of our local node. + local_peer_id: PeerId, +} + +impl DiscoveryBehaviour { + /// Builds a new `DiscoveryBehaviour`. + /// + /// `user_defined` is a list of known address for nodes that never expire. + pub fn new(local_public_key: PublicKey, user_defined: Vec<(PeerId, Multiaddr)>) -> Self { + let mut kademlia = Kademlia::new(local_public_key.clone().into_peer_id()); + for (peer_id, addr) in &user_defined { + kademlia.add_address(peer_id, addr.clone()); + } + + let clock = Clock::new(); + DiscoveryBehaviour { + user_defined, + kademlia, + next_kad_random_query: Delay::new(clock.now()), + duration_to_next_kad: Duration::from_secs(1), + clock, + local_peer_id: local_public_key.into_peer_id(), + } + } + + /// Returns the list of nodes that we know exist in the network. + pub fn known_peers(&mut self) -> impl Iterator { + self.kademlia.kbuckets_entries() + } + + /// Adds a hard-coded address for the given peer, that never expires. + /// + /// This adds an entry to the parameter that was passed to `new`. + pub fn add_known_address(&mut self, peer_id: PeerId, addr: Multiaddr) { + if self.user_defined.iter().all(|(p, a)| *p != peer_id && *a != addr) { + self.user_defined.push((peer_id, addr)); + } + } + + /// Call this method when a node reports an address for itself. + pub fn add_self_reported_address(&mut self, peer_id: &PeerId, addr: Multiaddr) { + self.kademlia.add_address(peer_id, addr); + } +} + +/// Event generated by the `DiscoveryBehaviour`. +pub enum DiscoveryOut { + /// We have discovered a node. Can be called multiple times with the same identity. + Discovered(PeerId), +} + +impl NetworkBehaviour for DiscoveryBehaviour +where + TSubstream: AsyncRead + AsyncWrite, +{ + type ProtocolsHandler = as NetworkBehaviour>::ProtocolsHandler; + type OutEvent = DiscoveryOut; + + fn new_handler(&mut self) -> Self::ProtocolsHandler { + NetworkBehaviour::new_handler(&mut self.kademlia) + } + + fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { + let mut list = self.user_defined.iter() + .filter_map(|(p, a)| if p == peer_id { Some(a.clone()) } else { None }) + .collect::>(); + list.extend(self.kademlia.addresses_of_peer(peer_id)); + trace!(target: "sub-libp2p", "Addresses of {:?} are {:?}", peer_id, list); + if list.is_empty() { + if self.kademlia.kbuckets_entries().any(|p| p == peer_id) { + debug!(target: "sub-libp2p", "Requested dialing to {:?} (peer in k-buckets), \ + and no address was found", peer_id); + } else { + debug!(target: "sub-libp2p", "Requested dialing to {:?} (peer not in k-buckets), \ + and no address was found", peer_id); + } + } + list + } + + fn inject_connected(&mut self, peer_id: PeerId, endpoint: ConnectedPoint) { + NetworkBehaviour::inject_connected(&mut self.kademlia, peer_id, endpoint) + } + + fn inject_disconnected(&mut self, peer_id: &PeerId, endpoint: ConnectedPoint) { + NetworkBehaviour::inject_disconnected(&mut self.kademlia, peer_id, endpoint) + } + + fn inject_replaced(&mut self, peer_id: PeerId, closed: ConnectedPoint, opened: ConnectedPoint) { + NetworkBehaviour::inject_replaced(&mut self.kademlia, peer_id, closed, opened) + } + + fn inject_node_event( + &mut self, + peer_id: PeerId, + event: ::OutEvent, + ) { + NetworkBehaviour::inject_node_event(&mut self.kademlia, peer_id, event) + } + + fn inject_new_external_addr(&mut self, addr: &Multiaddr) { + let new_addr = addr.clone() + .with(Protocol::P2p(self.local_peer_id.clone().into())); + info!(target: "sub-libp2p", "Discovered new external address for our node: {}", new_addr); + } + + fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) { + info!(target: "sub-libp2p", "No longer listening on {}", addr); + } + + fn poll( + &mut self, + params: &mut PollParameters, + ) -> Async< + NetworkBehaviourAction< + ::InEvent, + Self::OutEvent, + >, + > { + // Poll Kademlia. + match self.kademlia.poll(params) { + Async::NotReady => (), + Async::Ready(NetworkBehaviourAction::GenerateEvent(ev)) => { + match ev { + KademliaOut::Discovered { .. } => {} + KademliaOut::KBucketAdded { peer_id, .. } => { + let ev = DiscoveryOut::Discovered(peer_id); + return Async::Ready(NetworkBehaviourAction::GenerateEvent(ev)); + } + KademliaOut::FindNodeResult { key, closer_peers } => { + trace!(target: "sub-libp2p", "Libp2p => Query for {:?} yielded {:?} results", + key, closer_peers.len()); + if closer_peers.is_empty() { + warn!(target: "sub-libp2p", "Libp2p => Random Kademlia query has yielded empty \ + results"); + } + } + // We never start any other type of query. + KademliaOut::GetProvidersResult { .. } => {} + KademliaOut::GetValueResult(_) => {} + KademliaOut::PutValueResult(_) => {} + } + }, + Async::Ready(NetworkBehaviourAction::DialAddress { address }) => + return Async::Ready(NetworkBehaviourAction::DialAddress { address }), + Async::Ready(NetworkBehaviourAction::DialPeer { peer_id }) => + return Async::Ready(NetworkBehaviourAction::DialPeer { peer_id }), + Async::Ready(NetworkBehaviourAction::SendEvent { peer_id, event }) => + return Async::Ready(NetworkBehaviourAction::SendEvent { peer_id, event }), + Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address }) => + return Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address }), + } + + // Poll the stream that fires when we need to start a random Kademlia query. + loop { + match self.next_kad_random_query.poll() { + Ok(Async::NotReady) => break, + Ok(Async::Ready(_)) => { + let random_peer_id = PeerId::random(); + debug!(target: "sub-libp2p", "Libp2p <= Starting random Kademlia request for \ + {:?}", random_peer_id); + self.kademlia.find_node(random_peer_id); + + // Reset the `Delay` to the next random. + self.next_kad_random_query.reset(self.clock.now() + self.duration_to_next_kad); + self.duration_to_next_kad = cmp::min(self.duration_to_next_kad * 2, + Duration::from_secs(60)); + }, + Err(err) => { + warn!(target: "sub-libp2p", "Kademlia query timer errored: {:?}", err); + break + } + } + } + + Async::NotReady + } +} + +#[cfg(test)] +mod tests { + use futures::prelude::*; + use libp2p::identity::Keypair; + use libp2p::Multiaddr; + use libp2p::core::{upgrade, Swarm}; + use libp2p::core::transport::{Transport, MemoryTransport}; + use libp2p::core::upgrade::{InboundUpgradeExt, OutboundUpgradeExt}; + use std::collections::HashSet; + use super::{DiscoveryBehaviour, DiscoveryOut}; + + #[test] + fn discovery_working() { + let mut user_defined = Vec::new(); + + // Build swarms whose behaviour is `DiscoveryBehaviour`. + let mut swarms = (0..25).map(|_| { + let keypair = Keypair::generate_ed25519(); + + let transport = MemoryTransport + .with_upgrade(libp2p::secio::SecioConfig::new(keypair.clone())) + .and_then(move |out, endpoint| { + let peer_id = out.remote_key.into_peer_id(); + let peer_id2 = peer_id.clone(); + let upgrade = libp2p::yamux::Config::default() + .map_inbound(move |muxer| (peer_id, muxer)) + .map_outbound(move |muxer| (peer_id2, muxer)); + upgrade::apply(out.stream, upgrade, endpoint) + }); + + let behaviour = DiscoveryBehaviour::new(keypair.public(), user_defined.clone()); + let mut swarm = Swarm::new(transport, behaviour, keypair.public().into_peer_id()); + let listen_addr: Multiaddr = format!("/memory/{}", rand::random::()).parse().unwrap(); + + if user_defined.is_empty() { + user_defined.push((keypair.public().into_peer_id(), listen_addr.clone())); + } + + Swarm::listen_on(&mut swarm, listen_addr.clone()).unwrap(); + (swarm, listen_addr) + }).collect::>(); + + // Build a `Vec>` with the list of nodes remaining to be discovered. + let mut to_discover = (0..swarms.len()).map(|n| { + (0..swarms.len()).filter(|p| *p != n) + .map(|p| Swarm::local_peer_id(&swarms[p].0).clone()) + .collect::>() + }).collect::>(); + + let fut = futures::future::poll_fn(move || -> Result<_, ()> { + loop { + let mut keep_polling = false; + + for swarm_n in 0..swarms.len() { + if let Async::Ready(Some(DiscoveryOut::Discovered(other))) = + swarms[swarm_n].0.poll().unwrap() { + if to_discover[swarm_n].remove(&other) { + keep_polling = true; + // Call `add_self_reported_address` to simulate identify happening. + let addr = swarms.iter() + .find(|s| *Swarm::local_peer_id(&s.0) == other) + .unwrap() + .1.clone(); + swarms[swarm_n].0.add_self_reported_address(&other, addr); + } + } + } + + if !keep_polling { + break; + } + } + + if to_discover.iter().all(|l| l.is_empty()) { + Ok(Async::Ready(())) + } else { + Ok(Async::NotReady) + } + }); + + tokio::runtime::Runtime::new().unwrap().block_on(fut).unwrap(); + } +} diff --git a/substrate/core/network/src/lib.rs b/substrate/core/network/src/lib.rs index a65df59fd8..2ef682f33f 100644 --- a/substrate/core/network/src/lib.rs +++ b/substrate/core/network/src/lib.rs @@ -23,11 +23,17 @@ //! **Important**: This crate is unstable and the API and usage may change. //! -mod service; +mod behaviour; +mod chain; +mod custom_proto; +mod debug_info; +mod discovery; +mod on_demand_layer; #[macro_use] mod protocol; -mod chain; -mod on_demand_layer; +mod service; +mod transport; + pub mod config; pub mod error; @@ -39,18 +45,185 @@ pub use service::{ NetworkService, NetworkWorker, FetchFuture, TransactionPool, ManageNetwork, NetworkMsg, SyncProvider, ExHashT, ReportHandle, }; +pub use config::{NodeKeyConfig, Secret, Secp256k1Secret, Ed25519Secret}; pub use protocol::{ProtocolStatus, PeerInfo, Context, consensus_gossip, message, specialization}; pub use protocol::sync::{Status as SyncStatus, SyncState}; -pub use network_libp2p::{ - identity, multiaddr, - ProtocolId, Multiaddr, - NetworkState, NetworkStatePeer, NetworkStateNotConnectedPeer, NetworkStatePeerEndpoint, - NodeKeyConfig, Secret, Secp256k1Secret, Ed25519Secret, - build_multiaddr, PeerId, PublicKey -}; +pub use libp2p::{Multiaddr, multiaddr, build_multiaddr}; +pub use libp2p::{identity, PeerId, core::PublicKey}; + pub use message::{generic as generic_message, RequestId, Status as StatusMessage}; pub use error::Error; pub use protocol::on_demand::AlwaysBadChecker; pub use on_demand_layer::{OnDemand, RemoteResponse}; #[doc(hidden)] pub use runtime_primitives::traits::Block as BlockT; + +use libp2p::core::nodes::ConnectedPoint; +use serde::{Deserialize, Serialize}; +use slog_derive::SerdeValue; +use std::{collections::{HashMap, HashSet}, fmt, time::Duration}; + +/// Extension trait for `NetworkBehaviour` that also accepts discovering nodes. +pub trait DiscoveryNetBehaviour { + /// Notify the protocol that we have learned about the existence of nodes. + /// + /// Can (or most likely will) be called multiple times with the same `PeerId`s. + /// + /// Also note that there is no notification for expired nodes. The implementer must add a TTL + /// system, or remove nodes that will fail to reach. + fn add_discovered_nodes(&mut self, nodes: impl Iterator); +} + +/// Name of a protocol, transmitted on the wire. Should be unique for each chain. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct ProtocolId(smallvec::SmallVec<[u8; 6]>); + +impl<'a> From<&'a [u8]> for ProtocolId { + fn from(bytes: &'a [u8]) -> ProtocolId { + ProtocolId(bytes.into()) + } +} + +impl ProtocolId { + /// Exposes the `ProtocolId` as bytes. + pub fn as_bytes(&self) -> &[u8] { + self.0.as_ref() + } +} + +/// Parses a string address and returns the component, if valid. +pub fn parse_str_addr(addr_str: &str) -> Result<(PeerId, Multiaddr), ParseErr> { + let mut addr: Multiaddr = addr_str.parse()?; + + let who = match addr.pop() { + Some(multiaddr::Protocol::P2p(key)) => PeerId::from_multihash(key) + .map_err(|_| ParseErr::InvalidPeerId)?, + _ => return Err(ParseErr::PeerIdMissing), + }; + + Ok((who, addr)) +} + +/// Error that can be generated by `parse_str_addr`. +#[derive(Debug)] +pub enum ParseErr { + /// Error while parsing the multiaddress. + MultiaddrParse(multiaddr::Error), + /// Multihash of the peer ID is invalid. + InvalidPeerId, + /// The peer ID is missing from the address. + PeerIdMissing, +} + +impl fmt::Display for ParseErr { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + ParseErr::MultiaddrParse(err) => write!(f, "{}", err), + ParseErr::InvalidPeerId => write!(f, "Peer id at the end of the address is invalid"), + ParseErr::PeerIdMissing => write!(f, "Peer id is missing from the address"), + } + } +} + +impl std::error::Error for ParseErr { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match self { + ParseErr::MultiaddrParse(err) => Some(err), + ParseErr::InvalidPeerId => None, + ParseErr::PeerIdMissing => None, + } + } +} + +impl From for ParseErr { + fn from(err: multiaddr::Error) -> ParseErr { + ParseErr::MultiaddrParse(err) + } +} + +/// Returns general information about the networking. +/// +/// Meant for general diagnostic purposes. +/// +/// **Warning**: This API is not stable. +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, SerdeValue)] +#[serde(rename_all = "camelCase")] +pub struct NetworkState { + /// PeerId of the local node. + pub peer_id: String, + /// List of addresses the node is currently listening on. + pub listened_addresses: HashSet, + /// List of addresses the node knows it can be reached as. + pub external_addresses: HashSet, + /// List of node we're connected to. + pub connected_peers: HashMap, + /// List of node that we know of but that we're not connected to. + pub not_connected_peers: HashMap, + /// Downloaded bytes per second averaged over the past few seconds. + pub average_download_per_sec: u64, + /// Uploaded bytes per second averaged over the past few seconds. + pub average_upload_per_sec: u64, + /// State of the peerset manager. + pub peerset: serde_json::Value, +} + +/// Part of the `NetworkState` struct. Unstable. +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct NetworkStatePeer { + /// How we are connected to the node. + pub endpoint: NetworkStatePeerEndpoint, + /// Node information, as provided by the node itself. Can be empty if not known yet. + pub version_string: Option, + /// Latest ping duration with this node. + pub latest_ping_time: Option, + /// If true, the peer is "enabled", which means that we try to open Substrate-related protocols + /// with this peer. If false, we stick to Kademlia and/or other network-only protocols. + pub enabled: bool, + /// If true, the peer is "open", which means that we have a Substrate-related protocol + /// with this peer. + pub open: bool, + /// List of addresses known for this node. + pub known_addresses: HashSet, +} + +/// Part of the `NetworkState` struct. Unstable. +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct NetworkStateNotConnectedPeer { + /// List of addresses known for this node. + pub known_addresses: HashSet, + /// Node information, as provided by the node itself, if we were ever connected to this node. + pub version_string: Option, + /// Latest ping duration with this node, if we were ever connected to this node. + pub latest_ping_time: Option, +} + +/// Part of the `NetworkState` struct. Unstable. +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum NetworkStatePeerEndpoint { + /// We are dialing the given address. + Dialing(Multiaddr), + /// We are listening. + Listening { + /// Address we're listening on that received the connection. + listen_addr: Multiaddr, + /// Address data is sent back to. + send_back_addr: Multiaddr, + }, +} + +impl From for NetworkStatePeerEndpoint { + fn from(endpoint: ConnectedPoint) -> Self { + match endpoint { + ConnectedPoint::Dialer { address } => + NetworkStatePeerEndpoint::Dialing(address), + ConnectedPoint::Listener { listen_addr, send_back_addr } => + NetworkStatePeerEndpoint::Listening { + listen_addr, + send_back_addr + } + } + } +} diff --git a/substrate/core/network/src/protocol.rs b/substrate/core/network/src/protocol.rs index 576d37a82f..4b300e7a3e 100644 --- a/substrate/core/network/src/protocol.rs +++ b/substrate/core/network/src/protocol.rs @@ -15,7 +15,7 @@ // along with Substrate. If not, see . use futures::prelude::*; -use network_libp2p::PeerId; +use libp2p::PeerId; use primitives::storage::StorageKey; use consensus::{import_queue::IncomingBlock, import_queue::Origin, BlockOrigin}; use runtime_primitives::{generic::BlockId, ConsensusEngineId, Justification}; @@ -618,15 +618,15 @@ impl, H: ExHashT> Protocol { } /// Called when a new peer is connected - pub fn on_peer_connected(&mut self, network_out: &mut dyn NetworkOut, who: PeerId, debug_info: String) { - trace!(target: "sync", "Connecting {}: {}", who, debug_info); + pub fn on_peer_connected(&mut self, network_out: &mut dyn NetworkOut, who: PeerId) { + trace!(target: "sync", "Connecting {}", who); self.handshaking_peers.insert(who.clone(), HandshakingPeer { timestamp: time::Instant::now() }); self.send_status(network_out, who); } /// Called by peer when it is disconnecting - pub fn on_peer_disconnected(&mut self, mut network_out: &mut dyn NetworkOut, peer: PeerId, debug_info: String) { - trace!(target: "sync", "Disconnecting {}: {}", peer, debug_info); + pub fn on_peer_disconnected(&mut self, mut network_out: &mut dyn NetworkOut, peer: PeerId) { + trace!(target: "sync", "Disconnecting {}", peer); // lock all the the peer lists so that add/remove peer events are in order let removed = { self.handshaking_peers.remove(&peer); diff --git a/substrate/core/network/src/protocol/consensus_gossip.rs b/substrate/core/network/src/protocol/consensus_gossip.rs index 5de6fb2602..a1c9783b91 100644 --- a/substrate/core/network/src/protocol/consensus_gossip.rs +++ b/substrate/core/network/src/protocol/consensus_gossip.rs @@ -24,7 +24,7 @@ use std::time; use log::{trace, debug}; use futures::sync::mpsc; use lru_cache::LruCache; -use network_libp2p::PeerId; +use libp2p::PeerId; use runtime_primitives::traits::{Block as BlockT, Hash, HashFor}; use runtime_primitives::ConsensusEngineId; pub use crate::message::generic::{Message, ConsensusMessage}; diff --git a/substrate/core/network/src/protocol/message.rs b/substrate/core/network/src/protocol/message.rs index 6a38c106b7..7b9b684cd8 100644 --- a/substrate/core/network/src/protocol/message.rs +++ b/substrate/core/network/src/protocol/message.rs @@ -125,8 +125,8 @@ pub struct RemoteReadResponse { /// Generic types. pub mod generic { + use crate::custom_proto::CustomMessage; use parity_codec::{Encode, Decode}; - use network_libp2p::CustomMessage; use runtime_primitives::Justification; use crate::config::Roles; use super::{ diff --git a/substrate/core/network/src/protocol/on_demand.rs b/substrate/core/network/src/protocol/on_demand.rs index 90051e9caf..76c926df10 100644 --- a/substrate/core/network/src/protocol/on_demand.rs +++ b/substrate/core/network/src/protocol/on_demand.rs @@ -27,7 +27,7 @@ use client::light::fetcher::{FetchChecker, RemoteHeaderRequest, RemoteCallRequest, RemoteReadRequest, RemoteChangesRequest, ChangesProof, RemoteReadChildRequest, RemoteBodyRequest}; use crate::message::{self, BlockAttributes, Direction, FromBlock, RequestId}; -use network_libp2p::PeerId; +use libp2p::PeerId; use crate::config::Roles; use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, NumberFor}; @@ -644,7 +644,7 @@ pub mod tests { RemoteReadChildRequest, RemoteChangesRequest, RemoteBodyRequest}; use crate::config::Roles; use crate::message::{self, BlockAttributes, Direction, FromBlock, RequestId}; - use network_libp2p::PeerId; + use libp2p::PeerId; use super::{REQUEST_TIMEOUT, OnDemandCore, OnDemandNetwork, RequestData}; use test_client::runtime::{changes_trie_config, Block, Extrinsic, Header}; diff --git a/substrate/core/network/src/protocol/specialization.rs b/substrate/core/network/src/protocol/specialization.rs index e8a0a0c949..41b10bf707 100644 --- a/substrate/core/network/src/protocol/specialization.rs +++ b/substrate/core/network/src/protocol/specialization.rs @@ -16,9 +16,9 @@ //! Specializations of the substrate network protocol to allow more complex forms of communication. -use crate::PeerId; -use runtime_primitives::traits::Block as BlockT; use crate::protocol::Context; +use libp2p::PeerId; +use runtime_primitives::traits::Block as BlockT; /// A specialization of the substrate network protocol. Handles events and sends messages. pub trait NetworkSpecialization: Send + Sync + 'static { diff --git a/substrate/core/network/src/protocol/sync.rs b/substrate/core/network/src/protocol/sync.rs index 4ec8f18e35..0f78348b18 100644 --- a/substrate/core/network/src/protocol/sync.rs +++ b/substrate/core/network/src/protocol/sync.rs @@ -35,7 +35,7 @@ use std::ops::Range; use std::collections::{HashMap, VecDeque}; use log::{debug, trace, warn, info, error}; use crate::protocol::PeerInfo as ProtocolPeerInfo; -use network_libp2p::PeerId; +use libp2p::PeerId; use client::{BlockStatus, ClientInfo}; use consensus::{BlockOrigin, import_queue::{IncomingBlock, SharedFinalityProofRequestBuilder}}; use client::error::Error as ClientError; diff --git a/substrate/core/network/src/protocol/sync/blocks.rs b/substrate/core/network/src/protocol/sync/blocks.rs index 66730fcc3e..ff8d9907af 100644 --- a/substrate/core/network/src/protocol/sync/blocks.rs +++ b/substrate/core/network/src/protocol/sync/blocks.rs @@ -20,7 +20,7 @@ use std::ops::Range; use std::collections::{HashMap, BTreeMap}; use std::collections::hash_map::Entry; use log::trace; -use network_libp2p::PeerId; +use libp2p::PeerId; use runtime_primitives::traits::{Block as BlockT, NumberFor, One}; use crate::message; diff --git a/substrate/core/network/src/protocol/sync/extra_requests.rs b/substrate/core/network/src/protocol/sync/extra_requests.rs index f41997a05c..589a5d3787 100644 --- a/substrate/core/network/src/protocol/sync/extra_requests.rs +++ b/substrate/core/network/src/protocol/sync/extra_requests.rs @@ -20,7 +20,7 @@ use log::{trace, warn}; use client::error::Error as ClientError; use consensus::import_queue::SharedFinalityProofRequestBuilder; use fork_tree::ForkTree; -use network_libp2p::PeerId; +use libp2p::PeerId; use runtime_primitives::Justification; use runtime_primitives::traits::{Block as BlockT, NumberFor}; use crate::protocol::message; diff --git a/substrate/core/network/src/service.rs b/substrate/core/network/src/service.rs index 6a328e854d..7009310a8d 100644 --- a/substrate/core/network/src/service.rs +++ b/substrate/core/network/src/service.rs @@ -15,16 +15,20 @@ // along with Substrate. If not, see . use std::collections::HashMap; -use std::io; +use std::{fs, io, path::Path}; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use std::time::Duration; use log::{warn, debug, error, info}; +use libp2p::core::swarm::NetworkBehaviour; +use libp2p::core::{nodes::Substream, transport::boxed::Boxed, muxing::StreamMuxerBox}; use futures::{prelude::*, sync::oneshot, sync::mpsc}; use parking_lot::{Mutex, RwLock}; -use network_libp2p::{start_service, parse_str_addr, Service as Libp2pNetService, ServiceEvent as Libp2pNetServiceEvent}; -use network_libp2p::{RegisteredProtocol, NetworkState}; +use crate::custom_proto::{CustomProto, CustomProtoOut}; +use crate::{behaviour::Behaviour, parse_str_addr, ProtocolId}; +use crate::{NetworkState, NetworkStateNotConnectedPeer, NetworkStatePeer}; +use crate::{transport, config::NodeKeyConfig, config::NonReservedPeerMode, config::NetworkConfiguration}; use peerset::PeersetHandle; use consensus::import_queue::{ImportQueue, Link, SharedFinalityProofRequestBuilder}; use runtime_primitives::{traits::{Block as BlockT, NumberFor}, ConsensusEngineId}; @@ -40,12 +44,14 @@ use crate::config::Params; use crate::error::Error; use crate::protocol::specialization::NetworkSpecialization; +mod tests; + /// Interval at which we send status updates on the SyncProvider status stream. const STATUS_INTERVAL: Duration = Duration::from_millis(5000); /// Interval at which we update the `peers` field on the main thread. const CONNECTED_PEERS_INTERVAL: Duration = Duration::from_millis(500); -pub use network_libp2p::PeerId; +pub use libp2p::PeerId; /// Type that represents fetch completion future. pub type FetchFuture = oneshot::Receiver>; @@ -187,7 +193,9 @@ pub struct NetworkService> { /// Channel for networking messages processed by the background thread. network_chan: mpsc::UnboundedSender>, /// Network service - network: Arc>>>, + network: Arc>>, + /// Bandwidth logging system. Can be queried to know the average bandwidth consumed. + bandwidth: Arc, /// Peerset manager (PSM); manages the reputation of nodes and indicates the network which /// nodes it should be connected to or not. peerset: PeersetHandle, @@ -227,19 +235,20 @@ impl, H: ExHashT> NetworkWorker params.specialization, )?; let versions: Vec<_> = ((protocol::MIN_VERSION as u8)..=(protocol::CURRENT_VERSION as u8)).collect(); - let registered = RegisteredProtocol::new(params.protocol_id, &versions); // Start the main service. - let (network, peerset) = match start_service(params.network_config, registered) { - Ok((network, peerset)) => (Arc::new(Mutex::new(network)), peerset), - Err(err) => { - warn!("Error starting network: {}", err); - return Err(err.into()) - }, - }; + let (network, bandwidth, peerset) = + match start_service::(params.network_config, params.protocol_id, &versions) { + Ok((network, bandwidth, peerset)) => (Arc::new(Mutex::new(network)), bandwidth, peerset), + Err(err) => { + warn!("Error starting network: {}", err); + return Err(err.into()) + }, + }; let service = Arc::new(NetworkService { status_sinks: status_sinks.clone(), + bandwidth, is_offline: is_offline.clone(), is_major_syncing: is_major_syncing.clone(), network_chan, @@ -278,20 +287,18 @@ impl, H: ExHashT> NetworkWorker impl> NetworkService { /// Returns the downloaded bytes per second averaged over the past few seconds. - #[inline] pub fn average_download_per_sec(&self) -> u64 { - self.network.lock().average_download_per_sec() + self.bandwidth.average_download_per_sec() } /// Returns the uploaded bytes per second averaged over the past few seconds. - #[inline] pub fn average_upload_per_sec(&self) -> u64 { - self.network.lock().average_upload_per_sec() + self.bandwidth.average_upload_per_sec() } /// Returns the network identity of the node. pub fn local_peer_id(&self) -> PeerId { - self.network.lock().peer_id().clone() + Swarm::::local_peer_id(&*self.network.lock()).clone() } /// Called when a new block is imported by the client. @@ -404,7 +411,58 @@ impl> SyncProvider for Netwo } fn network_state(&self) -> NetworkState { - self.network.lock().state() + let mut swarm = self.network.lock(); + let open = swarm.user_protocol().open_peers().cloned().collect::>(); + + let connected_peers = { + let swarm = &mut *swarm; + open.iter().filter_map(move |peer_id| { + let known_addresses = NetworkBehaviour::addresses_of_peer(&mut **swarm, peer_id) + .into_iter().collect(); + + let endpoint = if let Some(e) = swarm.node(peer_id).map(|i| i.endpoint()) { + e.clone().into() + } else { + error!(target: "sub-libp2p", "Found state inconsistency between custom protocol \ + and debug information about {:?}", peer_id); + return None + }; + + Some((peer_id.to_base58(), NetworkStatePeer { + endpoint, + version_string: swarm.node(peer_id).and_then(|i| i.client_version().map(|s| s.to_owned())).clone(), + latest_ping_time: swarm.node(peer_id).and_then(|i| i.latest_ping()), + enabled: swarm.user_protocol().is_enabled(&peer_id), + open: swarm.user_protocol().is_open(&peer_id), + known_addresses, + })) + }).collect() + }; + + let not_connected_peers = { + let swarm = &mut *swarm; + let list = swarm.known_peers().filter(|p| open.iter().all(|n| n != *p)) + .cloned().collect::>(); + list.into_iter().map(move |peer_id| { + (peer_id.to_base58(), NetworkStateNotConnectedPeer { + version_string: swarm.node(&peer_id).and_then(|i| i.client_version().map(|s| s.to_owned())).clone(), + latest_ping_time: swarm.node(&peer_id).and_then(|i| i.latest_ping()), + known_addresses: NetworkBehaviour::addresses_of_peer(&mut **swarm, &peer_id) + .into_iter().collect(), + }) + }).collect() + }; + + NetworkState { + peer_id: Swarm::::local_peer_id(&swarm).to_base58(), + listened_addresses: Swarm::::listeners(&swarm).cloned().collect(), + external_addresses: Swarm::::external_addresses(&swarm).cloned().collect(), + average_download_per_sec: self.bandwidth.average_download_per_sec(), + average_upload_per_sec: self.bandwidth.average_upload_per_sec(), + connected_peers, + not_connected_peers, + peerset: swarm.user_protocol_mut().peerset_debug_info(), + } } fn peers_debug_info(&self) -> Vec<(PeerId, PeerInfo)> { @@ -533,7 +591,7 @@ pub struct NetworkWorker, H: Ex protocol: Protocol, /// The network service that can be extracted and shared through the codebase. service: Arc>, - network_service: Arc>>>, + network_service: Arc>>, peers: Arc>>>, import_queue: Box>, transaction_pool: Arc>, @@ -556,16 +614,16 @@ impl, H: ExHashT> Future for Ne fn poll(&mut self) -> Poll { // Implementation of `protocol::NetworkOut` using the available local variables. - struct Context<'a, B: BlockT>(&'a mut Libp2pNetService>, &'a PeersetHandle); + struct Context<'a, B: BlockT>(&'a mut Swarm, &'a PeersetHandle); impl<'a, B: BlockT> NetworkOut for Context<'a, B> { fn report_peer(&mut self, who: PeerId, reputation: i32) { self.1.report_peer(who, reputation) } fn disconnect_peer(&mut self, who: PeerId) { - self.0.drop_node(&who) + self.0.user_protocol_mut().disconnect_peer(&who) } fn send_message(&mut self, who: PeerId, message: Message) { - self.0.send_custom_message(&who, message) + self.0.user_protocol_mut().send_packet(&who, message) } } @@ -598,11 +656,11 @@ impl, H: ExHashT> Future for Ne match self.network_port.poll() { Ok(Async::NotReady) => break, Ok(Async::Ready(Some(NetworkMsg::Outgoing(who, outgoing_message)))) => - self.network_service.lock().send_custom_message(&who, outgoing_message), + self.network_service.lock().user_protocol_mut().send_packet(&who, outgoing_message), Ok(Async::Ready(Some(NetworkMsg::ReportPeer(who, reputation)))) => self.peerset.report_peer(who, reputation), Ok(Async::Ready(Some(NetworkMsg::DisconnectPeer(who)))) => - self.network_service.lock().drop_node(&who), + self.network_service.lock().user_protocol_mut().disconnect_peer(&who), #[cfg(any(test, feature = "test-helpers"))] Ok(Async::Ready(Some(NetworkMsg::Synchronized))) => {} @@ -672,19 +730,19 @@ impl, H: ExHashT> Future for Ne let outcome = match poll_value { Ok(Async::NotReady) => break, - Ok(Async::Ready(Some(Libp2pNetServiceEvent::OpenedCustomProtocol { peer_id, version, debug_info, .. }))) => { + Ok(Async::Ready(Some(CustomProtoOut::CustomProtocolOpen { peer_id, version, .. }))) => { debug_assert!( version <= protocol::CURRENT_VERSION as u8 && version >= protocol::MIN_VERSION as u8 ); - self.protocol.on_peer_connected(&mut network_out, peer_id, debug_info); + self.protocol.on_peer_connected(&mut network_out, peer_id); CustomMessageOutcome::None } - Ok(Async::Ready(Some(Libp2pNetServiceEvent::ClosedCustomProtocol { peer_id, debug_info, .. }))) => { - self.protocol.on_peer_disconnected(&mut network_out, peer_id, debug_info); + Ok(Async::Ready(Some(CustomProtoOut::CustomProtocolClosed { peer_id, .. }))) => { + self.protocol.on_peer_disconnected(&mut network_out, peer_id); CustomMessageOutcome::None }, - Ok(Async::Ready(Some(Libp2pNetServiceEvent::CustomMessage { peer_id, message, .. }))) => + Ok(Async::Ready(Some(CustomProtoOut::CustomMessage { peer_id, message }))) => self.protocol.on_custom_message( &mut network_out, &*self.transaction_pool, @@ -692,7 +750,7 @@ impl, H: ExHashT> Future for Ne message, self.finality_proof_provider.as_ref().map(|p| &**p) ), - Ok(Async::Ready(Some(Libp2pNetServiceEvent::Clogged { peer_id, messages, .. }))) => { + Ok(Async::Ready(Some(CustomProtoOut::Clogged { peer_id, messages, .. }))) => { debug!(target: "sync", "{} clogging messages:", messages.len()); for msg in messages.into_iter().take(5) { debug!(target: "sync", "{:?}", msg); @@ -724,3 +782,93 @@ impl, H: ExHashT> Future for Ne Ok(Async::NotReady) } } + +/// The libp2p swarm, customized for our needs. +type Swarm = libp2p::core::Swarm< + Boxed<(PeerId, StreamMuxerBox), io::Error>, + Behaviour, Substream>, CustomProtoOut>, Substream> +>; + +/// Starts the substrate libp2p service. +/// +/// Returns a stream that must be polled regularly in order for the networking to function. +fn start_service>( + config: NetworkConfiguration, + protocol_id: Pid, + versions: &[u8], +) -> Result<(Swarm, Arc, peerset::PeersetHandle), io::Error> { + + if let Some(ref path) = config.net_config_path { + fs::create_dir_all(Path::new(path))?; + } + + // List of multiaddresses that we know in the network. + let mut known_addresses = Vec::new(); + let mut bootnodes = Vec::new(); + let mut reserved_nodes = Vec::new(); + + // Process the bootnodes. + for bootnode in config.boot_nodes.iter() { + match parse_str_addr(bootnode) { + Ok((peer_id, addr)) => { + bootnodes.push(peer_id.clone()); + known_addresses.push((peer_id, addr)); + }, + Err(_) => warn!(target: "sub-libp2p", "Not a valid bootnode address: {}", bootnode), + } + } + + // Initialize the reserved peers. + for reserved in config.reserved_nodes.iter() { + if let Ok((peer_id, addr)) = parse_str_addr(reserved) { + reserved_nodes.push(peer_id.clone()); + known_addresses.push((peer_id, addr)); + } else { + warn!(target: "sub-libp2p", "Not a valid reserved node address: {}", reserved); + } + } + + // Build the peerset. + let (peerset, peerset_handle) = peerset::Peerset::from_config(peerset::PeersetConfig { + in_peers: config.in_peers, + out_peers: config.out_peers, + bootnodes, + reserved_only: config.non_reserved_mode == NonReservedPeerMode::Deny, + reserved_nodes, + }); + + // Private and public keys configuration. + if let NodeKeyConfig::Secp256k1(_) = config.node_key { + warn!(target: "sub-libp2p", "Secp256k1 keys are deprecated in favour of ed25519"); + } + let local_identity = config.node_key.clone().into_keypair()?; + let local_public = local_identity.public(); + let local_peer_id = local_public.clone().into_peer_id(); + info!(target: "sub-libp2p", "Local node identity is: {}", local_peer_id.to_base58()); + + // Build the swarm. + let (mut swarm, bandwidth) = { + let user_agent = format!("{} ({})", config.client_version, config.node_name); + let proto = CustomProto::new(protocol_id, versions, peerset); + let behaviour = Behaviour::new(proto, user_agent, local_public, known_addresses, config.enable_mdns); + let (transport, bandwidth) = transport::build_transport( + local_identity, + config.wasm_external_transport + ); + (Swarm::::new(transport, behaviour, local_peer_id.clone()), bandwidth) + }; + + // Listen on multiaddresses. + for addr in &config.listen_addresses { + if let Err(err) = Swarm::::listen_on(&mut swarm, addr.clone()) { + warn!(target: "sub-libp2p", "Can't listen on {} because: {:?}", addr, err) + } + } + + // Add external addresses. + for addr in &config.public_addresses { + Swarm::::add_external_address(&mut swarm, addr.clone()); + } + + Ok((swarm, bandwidth, peerset_handle)) +} diff --git a/substrate/core/network-libp2p/tests/test.rs b/substrate/core/network/src/service/tests.rs similarity index 79% rename from substrate/core/network-libp2p/tests/test.rs rename to substrate/core/network/src/service/tests.rs index 36e00e1318..1bcd4e90f9 100644 --- a/substrate/core/network-libp2p/tests/test.rs +++ b/substrate/core/network/src/service/tests.rs @@ -14,17 +14,23 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . +#![cfg(test)] + use futures::{future, stream, prelude::*, try_ready}; +use libp2p::core::swarm::ExpandedSwarm; use rand::seq::SliceRandom; +use runtime_primitives::traits::Block as BlockT; use std::{io, time::Duration, time::Instant}; -use substrate_network_libp2p::{CustomMessage, Multiaddr, multiaddr::Protocol, ServiceEvent, build_multiaddr}; +use test_client::runtime::Block; +use crate::protocol::message::generic::Message; +use crate::{Multiaddr, multiaddr::Protocol, build_multiaddr}; +use crate::custom_proto::CustomProtoOut; +use super::{start_service, Swarm}; /// Builds two services. The second one and further have the first one as its bootstrap node. /// This is to be used only for testing, and a panic will happen if something goes wrong. -fn build_nodes(num: usize, base_port: u16) -> Vec> - where TMsg: CustomMessage + Send + 'static -{ - let mut result: Vec> = Vec::with_capacity(num); +fn build_nodes(num: usize, base_port: u16) -> Vec> { + let mut result: Vec> = Vec::with_capacity(num); let mut first_addr = None::; for index in 0 .. num { @@ -32,22 +38,21 @@ fn build_nodes(num: usize, base_port: u16) -> Vec(config, &b"tst"[..], &[1]).unwrap().0); } result @@ -56,7 +61,7 @@ fn build_nodes(num: usize, base_port: u16) -> Vec>(2, 50400).into_iter(); + let mut l = build_nodes::(2, 50400).into_iter(); let a = l.next().unwrap(); let b = l.next().unwrap(); (a, b) @@ -64,7 +69,7 @@ fn basic_two_nodes_connectivity() { let fut1 = future::poll_fn(move || -> io::Result<_> { match try_ready!(service1.poll()) { - Some(ServiceEvent::OpenedCustomProtocol { version, .. }) => { + Some(CustomProtoOut::CustomProtocolOpen { version, .. }) => { assert_eq!(version, 1); Ok(Async::Ready(())) }, @@ -74,7 +79,7 @@ fn basic_two_nodes_connectivity() { let fut2 = future::poll_fn(move || -> io::Result<_> { match try_ready!(service2.poll()) { - Some(ServiceEvent::OpenedCustomProtocol { version, .. }) => { + Some(CustomProtoOut::CustomProtocolOpen { version, .. }) => { assert_eq!(version, 1); Ok(Async::Ready(())) }, @@ -96,7 +101,7 @@ fn two_nodes_transfer_lots_of_packets() { const NUM_PACKETS: u32 = 5000; let (mut service1, mut service2) = { - let mut l = build_nodes::>(2, 50450).into_iter(); + let mut l = build_nodes::(2, 50450).into_iter(); let a = l.next().unwrap(); let b = l.next().unwrap(); (a, b) @@ -105,9 +110,12 @@ fn two_nodes_transfer_lots_of_packets() { let fut1 = future::poll_fn(move || -> io::Result<_> { loop { match try_ready!(service1.poll()) { - Some(ServiceEvent::OpenedCustomProtocol { peer_id, .. }) => { + Some(CustomProtoOut::CustomProtocolOpen { peer_id, .. }) => { for n in 0 .. NUM_PACKETS { - service1.send_custom_message(&peer_id, vec![(n % 256) as u8]); + service1.user_protocol_mut().send_packet( + &peer_id, + Message::ChainSpecific(vec![(n % 256) as u8]) + ); } }, _ => panic!(), @@ -119,8 +127,8 @@ fn two_nodes_transfer_lots_of_packets() { let fut2 = future::poll_fn(move || -> io::Result<_> { loop { match try_ready!(service2.poll()) { - Some(ServiceEvent::OpenedCustomProtocol { .. }) => {}, - Some(ServiceEvent::CustomMessage { message, .. }) => { + Some(CustomProtoOut::CustomProtocolOpen { .. }) => {}, + Some(CustomProtoOut::CustomMessage { message: Message::ChainSpecific(message), .. }) => { assert_eq!(message.len(), 1); packet_counter += 1; if packet_counter == NUM_PACKETS { @@ -144,25 +152,21 @@ fn many_nodes_connectivity() { // increased in the `NetworkConfiguration`. const NUM_NODES: usize = 25; - let mut futures = build_nodes::>(NUM_NODES, 50500) + let mut futures = build_nodes::(NUM_NODES, 50500) .into_iter() .map(move |mut node| { let mut num_connecs = 0; stream::poll_fn(move || -> io::Result<_> { loop { - const MAX_BANDWIDTH: u64 = NUM_NODES as u64 * 2048; // 2kiB/s/node - assert!(node.average_download_per_sec() < MAX_BANDWIDTH); - assert!(node.average_upload_per_sec() < MAX_BANDWIDTH); - match try_ready!(node.poll()) { - Some(ServiceEvent::OpenedCustomProtocol { .. }) => { + Some(CustomProtoOut::CustomProtocolOpen { .. }) => { num_connecs += 1; assert!(num_connecs < NUM_NODES); if num_connecs == NUM_NODES - 1 { return Ok(Async::Ready(Some(true))) } } - Some(ServiceEvent::ClosedCustomProtocol { .. }) => { + Some(CustomProtoOut::CustomProtocolClosed { .. }) => { let was_success = num_connecs == NUM_NODES - 1; num_connecs -= 1; if was_success && num_connecs < NUM_NODES - 1 { @@ -200,7 +204,7 @@ fn many_nodes_connectivity() { #[test] fn basic_two_nodes_requests_in_parallel() { let (mut service1, mut service2) = { - let mut l = build_nodes::>(2, 50550).into_iter(); + let mut l = build_nodes::(2, 50550).into_iter(); let a = l.next().unwrap(); let b = l.next().unwrap(); (a, b) @@ -211,7 +215,7 @@ fn basic_two_nodes_requests_in_parallel() { let mut to_send = Vec::new(); for _ in 0..200 { // Note: don't make that number too high or the CPU usage will explode. let msg = (0..10).map(|_| rand::random::()).collect::>(); - to_send.push(msg); + to_send.push(Message::ChainSpecific(msg)); } to_send }; @@ -224,9 +228,9 @@ fn basic_two_nodes_requests_in_parallel() { let fut1 = future::poll_fn(move || -> io::Result<_> { loop { match try_ready!(service1.poll()) { - Some(ServiceEvent::OpenedCustomProtocol { peer_id, .. }) => { + Some(CustomProtoOut::CustomProtocolOpen { peer_id, .. }) => { for msg in to_send.drain(..) { - service1.send_custom_message(&peer_id, msg); + service1.user_protocol_mut().send_packet(&peer_id, msg); } }, _ => panic!(), @@ -237,8 +241,8 @@ fn basic_two_nodes_requests_in_parallel() { let fut2 = future::poll_fn(move || -> io::Result<_> { loop { match try_ready!(service2.poll()) { - Some(ServiceEvent::OpenedCustomProtocol { .. }) => {}, - Some(ServiceEvent::CustomMessage { message, .. }) => { + Some(CustomProtoOut::CustomProtocolOpen { .. }) => {}, + Some(CustomProtoOut::CustomMessage { message, .. }) => { let pos = to_receive.iter().position(|m| *m == message).unwrap(); to_receive.remove(pos); if to_receive.is_empty() { @@ -260,7 +264,7 @@ fn reconnect_after_disconnect() { // check that the disconnect worked, and finally check whether they successfully reconnect. let (mut service1, mut service2) = { - let mut l = build_nodes::>(2, 50350).into_iter(); + let mut l = build_nodes::(2, 50350).into_iter(); let a = l.next().unwrap(); let b = l.next().unwrap(); (a, b) @@ -281,19 +285,19 @@ fn reconnect_after_disconnect() { let mut service1_not_ready = false; match service1.poll().unwrap() { - Async::Ready(Some(ServiceEvent::OpenedCustomProtocol { .. })) => { + Async::Ready(Some(CustomProtoOut::CustomProtocolOpen { .. })) => { match service1_state { ServiceState::NotConnected => { service1_state = ServiceState::FirstConnec; if service2_state == ServiceState::FirstConnec { - service1.drop_node(service2.peer_id()); + service1.user_protocol_mut().disconnect_peer(ExpandedSwarm::local_peer_id(&service2)); } }, ServiceState::Disconnected => service1_state = ServiceState::ConnectedAgain, ServiceState::FirstConnec | ServiceState::ConnectedAgain => panic!(), } }, - Async::Ready(Some(ServiceEvent::ClosedCustomProtocol { .. })) => { + Async::Ready(Some(CustomProtoOut::CustomProtocolClosed { .. })) => { match service1_state { ServiceState::FirstConnec => service1_state = ServiceState::Disconnected, ServiceState::ConnectedAgain| ServiceState::NotConnected | @@ -305,19 +309,19 @@ fn reconnect_after_disconnect() { } match service2.poll().unwrap() { - Async::Ready(Some(ServiceEvent::OpenedCustomProtocol { .. })) => { + Async::Ready(Some(CustomProtoOut::CustomProtocolOpen { .. })) => { match service2_state { ServiceState::NotConnected => { service2_state = ServiceState::FirstConnec; if service1_state == ServiceState::FirstConnec { - service1.drop_node(service2.peer_id()); + service1.user_protocol_mut().disconnect_peer(ExpandedSwarm::local_peer_id(&service2)); } }, ServiceState::Disconnected => service2_state = ServiceState::ConnectedAgain, ServiceState::FirstConnec | ServiceState::ConnectedAgain => panic!(), } }, - Async::Ready(Some(ServiceEvent::ClosedCustomProtocol { .. })) => { + Async::Ready(Some(CustomProtoOut::CustomProtocolClosed { .. })) => { match service2_state { ServiceState::FirstConnec => service2_state = ServiceState::Disconnected, ServiceState::ConnectedAgain| ServiceState::NotConnected | diff --git a/substrate/core/network/src/test/mod.rs b/substrate/core/network/src/test/mod.rs index b27214d07b..1f41c43e07 100644 --- a/substrate/core/network/src/test/mod.rs +++ b/substrate/core/network/src/test/mod.rs @@ -42,7 +42,7 @@ use consensus::{BlockOrigin, ForkChoiceStrategy, ImportBlock, JustificationImpor use crate::consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient, TopicNotification}; use futures::{prelude::*, sync::{mpsc, oneshot}}; use crate::message::Message; -use network_libp2p::PeerId; +use libp2p::PeerId; use parking_lot::{Mutex, RwLock}; use primitives::{H256, sr25519::Public as AuthorityId, Blake2Hasher}; use crate::protocol::{Context, Protocol, ProtocolConfig, ProtocolStatus, CustomMessageOutcome, NetworkOut}; @@ -296,10 +296,10 @@ pub struct Peer> { type MessageFilter = dyn Fn(&NetworkMsg) -> bool; pub enum FromNetworkMsg { - /// A peer connected, with debug info. - PeerConnected(PeerId, String), - /// A peer disconnected, with debug info. - PeerDisconnected(PeerId, String), + /// A peer connected. + PeerConnected(PeerId), + /// A peer disconnected. + PeerDisconnected(PeerId), /// A custom message from another peer. CustomMessage(PeerId, Message), /// Synchronization request. @@ -504,12 +504,12 @@ impl> Peer { /// Called on connection to other indicated peer. fn on_connect(&self, other: &Self) { - self.net_proto_channel.send_from_net(FromNetworkMsg::PeerConnected(other.peer_id.clone(), String::new())); + self.net_proto_channel.send_from_net(FromNetworkMsg::PeerConnected(other.peer_id.clone())); } /// Called on disconnect from other indicated peer. fn on_disconnect(&self, other: &Self) { - self.net_proto_channel.send_from_net(FromNetworkMsg::PeerDisconnected(other.peer_id.clone(), String::new())); + self.net_proto_channel.send_from_net(FromNetworkMsg::PeerDisconnected(other.peer_id.clone())); } /// Receive a message from another peer. Return a set of peers to disconnect. @@ -828,12 +828,12 @@ pub trait TestNetFactory: Sized { tokio::runtime::current_thread::run(futures::future::poll_fn(move || { while let Async::Ready(msg) = network_to_protocol_rx.poll().unwrap() { let outcome = match msg { - Some(FromNetworkMsg::PeerConnected(peer_id, debug_msg)) => { - protocol.on_peer_connected(&mut Ctxt(&network_sender), peer_id, debug_msg); + Some(FromNetworkMsg::PeerConnected(peer_id)) => { + protocol.on_peer_connected(&mut Ctxt(&network_sender), peer_id); CustomMessageOutcome::None }, - Some(FromNetworkMsg::PeerDisconnected(peer_id, debug_msg)) => { - protocol.on_peer_disconnected(&mut Ctxt(&network_sender), peer_id, debug_msg); + Some(FromNetworkMsg::PeerDisconnected(peer_id)) => { + protocol.on_peer_disconnected(&mut Ctxt(&network_sender), peer_id); CustomMessageOutcome::None }, Some(FromNetworkMsg::CustomMessage(peer_id, message)) => diff --git a/substrate/core/network-libp2p/src/transport.rs b/substrate/core/network/src/transport.rs similarity index 100% rename from substrate/core/network-libp2p/src/transport.rs rename to substrate/core/network/src/transport.rs