diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index 96fe0e2871..9bd2d282c4 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -46,7 +46,7 @@ dependencies = [ "log 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.5.5 (registry+https://github.com/rust-lang/crates.io-index)", "quick-error 1.2.2 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-executor 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-executor 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-timer 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -1063,7 +1063,7 @@ dependencies = [ "multiaddr 0.3.0 (git+https://github.com/tomaka/libp2p-rs?rev=8111062f0177fd7423626f2db9560273644a4c4d)", "stdweb 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-codec 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-current-thread 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-current-thread 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -1085,7 +1085,7 @@ dependencies = [ "quick-error 1.2.2 (registry+https://github.com/rust-lang/crates.io-index)", "rw-stream-sink 0.1.0 (git+https://github.com/tomaka/libp2p-rs?rev=8111062f0177fd7423626f2db9560273644a4c4d)", "smallvec 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-executor 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-executor 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", "void 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -1227,7 +1227,7 @@ dependencies = [ "futures 0.1.24 (registry+https://github.com/rust-lang/crates.io-index)", "libp2p-core 0.1.0 (git+https://github.com/tomaka/libp2p-rs?rev=8111062f0177fd7423626f2db9560273644a4c4d)", "log 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-executor 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-executor 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -1585,7 +1585,7 @@ dependencies = [ "log 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", "node-service 0.1.0", "substrate-cli 0.3.0", - "tokio 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -1608,7 +1608,7 @@ dependencies = [ "substrate-client 0.1.0", "substrate-keyring 0.1.0", "substrate-primitives 0.1.0", - "tokio 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -1651,7 +1651,7 @@ dependencies = [ "substrate-bft 0.1.0", "substrate-network 0.1.0", "substrate-primitives 0.1.0", - "tokio 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -1730,7 +1730,7 @@ dependencies = [ "substrate-service-test 0.3.0", "substrate-telemetry 0.3.0", "substrate-test-client 0.1.0", - "tokio 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -2839,7 +2839,7 @@ dependencies = [ "substrate-executor 0.1.0", "substrate-keyring 0.1.0", "substrate-primitives 0.1.0", - "tokio 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -2870,7 +2870,7 @@ dependencies = [ "substrate-telemetry 0.3.0", "sysinfo 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -2961,7 +2961,7 @@ dependencies = [ "substrate-keyring 0.1.0", "substrate-network 0.1.0", "substrate-primitives 0.1.0", - "tokio 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -3033,6 +3033,7 @@ dependencies = [ "substrate-network-libp2p 0.1.0", "substrate-primitives 0.1.0", "substrate-test-client 0.1.0", + "tokio 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -3053,7 +3054,7 @@ dependencies = [ "serde 1.0.70 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.70 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.24 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-timer 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)", "unsigned-varint 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -3107,7 +3108,7 @@ dependencies = [ "substrate-primitives 0.1.0", "substrate-test-client 0.1.0", "substrate-transaction-pool 0.1.0", - "tokio 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -3159,7 +3160,7 @@ dependencies = [ "substrate-telemetry 0.3.0", "substrate-transaction-pool 0.1.0", "target_info 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -3176,7 +3177,7 @@ dependencies = [ "substrate-primitives 0.1.0", "substrate-service 0.3.0", "tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -3455,25 +3456,29 @@ source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "futures 0.1.24 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] name = "tokio" -version = "0.1.7" +version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ + "bytes 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.24 (registry+https://github.com/rust-lang/crates.io-index)", "mio 0.6.14 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-executor 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-fs 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-codec 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-current-thread 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-executor 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-fs 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-reactor 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-tcp 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-threadpool 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-timer 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-udp 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-uds 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -3497,8 +3502,8 @@ dependencies = [ "log 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", "mio 0.6.14 (registry+https://github.com/rust-lang/crates.io-index)", "scoped-tls 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-executor 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-executor 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-reactor 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-timer 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)", @@ -3506,11 +3511,11 @@ dependencies = [ [[package]] name = "tokio-current-thread" -version = "0.1.0" +version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "futures 0.1.24 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-executor 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-executor 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -3521,12 +3526,12 @@ dependencies = [ "futures 0.1.24 (registry+https://github.com/rust-lang/crates.io-index)", "futures-cpupool 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] name = "tokio-executor" -version = "0.1.4" +version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "futures 0.1.24 (registry+https://github.com/rust-lang/crates.io-index)", @@ -3534,7 +3539,7 @@ dependencies = [ [[package]] name = "tokio-fs" -version = "0.1.0" +version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "futures 0.1.24 (registry+https://github.com/rust-lang/crates.io-index)", @@ -3578,7 +3583,7 @@ dependencies = [ "log 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", "mio 0.6.14 (registry+https://github.com/rust-lang/crates.io-index)", "slab 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-executor 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-executor 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -3613,7 +3618,7 @@ dependencies = [ "log 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", "num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-executor 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-executor 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -3624,7 +3629,7 @@ dependencies = [ "crossbeam-utils 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.24 (registry+https://github.com/rust-lang/crates.io-index)", "slab 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-executor 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-executor 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -4355,13 +4360,13 @@ dependencies = [ "checksum time 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)" = "d825be0eb33fda1a7e68012d51e9c7f451dc1a69391e7fdc197060bb8c56667b" "checksum tiny-keccak 1.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e9175261fbdb60781fcd388a4d6cc7e14764a2b629a7ad94abb439aed223a44f" "checksum tk-listen 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "dec7ba6a80b7695fc2abb21af18bed445a362ffd80b64704771ce142d6d2151d" -"checksum tokio 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "8ee337e5f4e501fc32966fec6fe0ca0cc1c237b0b1b14a335f8bfe3c5f06e286" +"checksum tokio 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)" = "6e93c78d23cc61aa245a8acd2c4a79c4d7fa7fb5c3ca90d5737029f043a84895" "checksum tokio-codec 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "881e9645b81c2ce95fcb799ded2c29ffb9f25ef5bef909089a420e5961dd8ccb" "checksum tokio-core 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)" = "aeeffbbb94209023feaef3c196a41cbcdafa06b4a6f893f68779bb5e53796f71" -"checksum tokio-current-thread 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "9f785265962bde425bf3b77dd6abac6674b8c6d5e8831427383aa9c56c5210e1" +"checksum tokio-current-thread 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "f90fcd90952f0a496d438a976afba8e5c205fb12123f813d8ab3aa1c8436638c" "checksum tokio-dns-unofficial 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "bb9bf62ca2c53bf2f2faec3e48a98b6d8c9577c27011cb0203a4beacdc8ab328" -"checksum tokio-executor 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "84823b932d566bc3c6aa644df4ca36cb38593c50b7db06011fd4e12e31e4047e" -"checksum tokio-fs 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "76766830bbf9a2d5bfb50c95350d56a2e79e2c80f675967fff448bc615899708" +"checksum tokio-executor 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "c117b6cf86bb730aab4834f10df96e4dd586eff2c3c27d3781348da49e255bde" +"checksum tokio-fs 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "b5cbe4ca6e71cb0b62a66e4e6f53a8c06a6eefe46cc5f665ad6f274c9906f135" "checksum tokio-io 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "8d6cc2de7725863c86ac71b0b9068476fec50834f055a243558ef1655bbd34cb" "checksum tokio-proto 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8fbb47ae81353c63c487030659494b295f6cb6576242f907f203473b191b0389" "checksum tokio-reactor 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b3cedc8e5af5131dc3423ffa4f877cce78ad25259a9a62de0613735a13ebc64b" diff --git a/substrate/core/network-libp2p/src/connection_filter.rs b/substrate/core/network-libp2p/src/connection_filter.rs deleted file mode 100644 index 46d9d86b58..0000000000 --- a/substrate/core/network-libp2p/src/connection_filter.rs +++ /dev/null @@ -1,31 +0,0 @@ -// Copyright 2015-2018 Parity Technologies (UK) Ltd. -// This file is part of Substrate. - -// Substrate is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Substrate is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Substrate. If not, see . - -//! Connection filter trait. - -use super::NodeId; - -/// Filtered connection direction. -pub enum ConnectionDirection { - Inbound, - Outbound, -} - -/// Connection filter. Each connection is checked against `connection_allowed`. -pub trait ConnectionFilter : Send + Sync { - /// Filter a connection. Returns `true` if connection should be allowed. `false` if rejected. - fn connection_allowed(&self, own_id: &NodeId, connecting_id: &NodeId, direction: ConnectionDirection) -> bool; -} diff --git a/substrate/core/network-libp2p/src/custom_proto.rs b/substrate/core/network-libp2p/src/custom_proto.rs index 6d0a5d5b1e..3cdd8e2b43 100644 --- a/substrate/core/network-libp2p/src/custom_proto.rs +++ b/substrate/core/network-libp2p/src/custom_proto.rs @@ -14,12 +14,10 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . -use bytes::{Bytes, BytesMut}; +use bytes::Bytes; use libp2p::core::{Multiaddr, ConnectionUpgrade, Endpoint}; use libp2p::tokio_codec::Framed; -use std::collections::VecDeque; -use std::io::Error as IoError; -use std::vec::IntoIter as VecIntoIter; +use std::{collections::VecDeque, io, vec::IntoIter as VecIntoIter}; use futures::{prelude::*, future, stream, task}; use tokio_io::{AsyncRead, AsyncWrite}; use unsigned_varint::codec::UviBytes; @@ -30,24 +28,21 @@ use ProtocolId; /// Note that "a single protocol" here refers to `par` for example. However /// each protocol can have multiple different versions for networking purposes. #[derive(Clone)] -pub struct RegisteredProtocol { +pub struct RegisteredProtocol { /// Id of the protocol for API purposes. id: ProtocolId, /// Base name of the protocol as advertised on the network. /// Ends with `/` so that we can append a version number behind. base_name: Bytes, - /// List of protocol versions that we support, plus their packet count. + /// List of protocol versions that we support. /// Ordered in descending order so that the best comes first. - /// The packet count is used to filter out invalid messages. - supported_versions: Vec<(u8, u8)>, - /// Custom data. - custom_data: TUserData, + supported_versions: Vec, } -impl RegisteredProtocol { +impl RegisteredProtocol { /// Creates a new `RegisteredProtocol`. The `custom_data` parameter will be /// passed inside the `RegisteredProtocolOutput`. - pub fn new(custom_data: TUserData, protocol: ProtocolId, versions: &[(u8, u8)]) + pub fn new(protocol: ProtocolId, versions: &[u8]) -> Self { let mut base_name = Bytes::from_static(b"/substrate/"); base_name.extend_from_slice(&protocol); @@ -57,11 +52,10 @@ impl RegisteredProtocol { base_name, id: protocol, supported_versions: { - let mut tmp: Vec<_> = versions.iter().rev().cloned().collect(); - tmp.sort_unstable_by(|a, b| b.1.cmp(&a.1)); + let mut tmp = versions.to_vec(); + tmp.sort_unstable_by(|a, b| b.cmp(&a)); tmp }, - custom_data, } } @@ -70,12 +64,6 @@ impl RegisteredProtocol { pub fn id(&self) -> ProtocolId { self.id } - - /// Returns the custom data that was passed to `new`. - #[inline] - pub fn custom_data(&self) -> &TUserData { - &self.custom_data - } } /// Output of a `RegisteredProtocol` upgrade. @@ -88,8 +76,6 @@ pub struct RegisteredProtocolSubstream { requires_poll_complete: bool, /// The underlying substream. inner: stream::Fuse>>, - /// Maximum packet id. - packet_count: u8, /// Id of the protocol. protocol_id: ProtocolId, /// Version of the protocol that was negotiated. @@ -98,15 +84,6 @@ pub struct RegisteredProtocolSubstream { to_notify: Option, } -/// Packet of data that can be sent or received. -#[derive(Debug, Clone)] -pub struct Packet { - /// Identifier of the packet. - pub id: u8, - /// The raw data. - pub data: Bytes, -} - impl RegisteredProtocolSubstream { /// Returns the protocol id. #[inline] @@ -134,14 +111,10 @@ impl RegisteredProtocolSubstream { } /// Sends a message to the substream. - pub fn send_message(&mut self, Packet { id: packet_id, data }: Packet) { - if packet_id >= self.packet_count { - error!(target: "sub-libp2p", "Tried to send a packet with an invalid ID {}", packet_id); - return; - } - + pub fn send_message(&mut self, data: Bytes) { + // TODO: remove the packet id system let mut message = Bytes::with_capacity(1 + data.len()); - message.extend_from_slice(&[packet_id]); + message.extend_from_slice(&[0]); message.extend_from_slice(&data); self.send_queue.push_back(message); @@ -156,36 +129,13 @@ impl RegisteredProtocolSubstream { task.notify(); } } - - /// Turns raw data into a packet and checks whether it is valid. - fn data_to_packet(&self, mut data: BytesMut) -> Result { - // The `data` should be prefixed by the packet ID, therefore an empty packet is invalid. - if data.is_empty() { - debug!(target: "sub-libp2p", "ignoring incoming packet because it was empty"); - return Err(()); - } - - let packet = { - let id = data[0]; - let data = data.split_off(1); - Packet { id, data: data.freeze() } - }; - - if packet.id >= self.packet_count { - debug!(target: "sub-libp2p", "ignoring incoming packet because packet_id {} is \ - too large", packet.id); - return Err(()) - } - - Ok(packet) - } } impl Stream for RegisteredProtocolSubstream where TSubstream: AsyncRead + AsyncWrite, { - type Item = Packet; - type Error = IoError; + type Item = Bytes; + type Error = io::Error; fn poll(&mut self) -> Poll, Self::Error> { // If we are closing, close as soon as the Sink is closed. @@ -215,10 +165,16 @@ where TSubstream: AsyncRead + AsyncWrite, // Note that `inner` is wrapped in a `Fuse`, therefore we can poll it forever. loop { match self.inner.poll()? { - Async::Ready(Some(data)) => - if let Ok(packet) = self.data_to_packet(data) { - return Ok(Async::Ready(Some(packet))) - }, + Async::Ready(Some(mut data)) => { + // The `data` should be prefixed by the packet ID, therefore an empty + // packet is invalid. + // TODO: remove the packet id system + if data.is_empty() { + return Err(io::Error::new(io::ErrorKind::Other, "bad packet")); + } + let data = data.split_off(1); + return Ok(Async::Ready(Some(data.freeze()))) + }, Async::Ready(None) => if !self.requires_poll_complete && self.send_queue.is_empty() { return Ok(Async::Ready(None)) @@ -234,9 +190,8 @@ where TSubstream: AsyncRead + AsyncWrite, } } -impl ConnectionUpgrade for RegisteredProtocol +impl ConnectionUpgrade for RegisteredProtocol where TSubstream: AsyncRead + AsyncWrite, - TUserData: Clone, { type NamesIter = VecIntoIter<(Bytes, Self::UpgradeIdentifier)>; type UpgradeIdentifier = u8; // Protocol version @@ -244,7 +199,7 @@ where TSubstream: AsyncRead + AsyncWrite, #[inline] fn protocol_names(&self) -> Self::NamesIter { // Report each version as an individual protocol. - self.supported_versions.iter().map(|&(ver, _)| { + self.supported_versions.iter().map(|&ver| { let num = ver.to_string(); let mut name = self.base_name.clone(); name.extend_from_slice(num.as_bytes()); @@ -253,7 +208,7 @@ where TSubstream: AsyncRead + AsyncWrite, } type Output = RegisteredProtocolSubstream; - type Future = future::FutureResult; + type Future = future::FutureResult; #[allow(deprecated)] fn upgrade( @@ -263,13 +218,6 @@ where TSubstream: AsyncRead + AsyncWrite, _: Endpoint, _: &Multiaddr ) -> Self::Future { - let packet_count = self.supported_versions - .iter() - .find(|&(v, _)| *v == protocol_version) - .expect("negotiated protocol version that wasn't advertised ; \ - programmer error") - .1; - let framed = Framed::new(socket, UviBytes::default()); future::ok(RegisteredProtocolSubstream { @@ -277,7 +225,6 @@ where TSubstream: AsyncRead + AsyncWrite, send_queue: VecDeque::new(), requires_poll_complete: false, inner: framed.fuse(), - packet_count, protocol_id: self.id, protocol_version, to_notify: None, @@ -287,9 +234,9 @@ where TSubstream: AsyncRead + AsyncWrite, // Connection upgrade for all the protocols contained in it. #[derive(Clone)] -pub struct RegisteredProtocols(pub Vec>); +pub struct RegisteredProtocols(pub Vec); -impl RegisteredProtocols { +impl RegisteredProtocols { /// Returns the number of protocols. #[inline] pub fn len(&self) -> usize { @@ -298,7 +245,7 @@ impl RegisteredProtocols { /// Finds a protocol in the list by its id. pub fn find_protocol(&self, protocol: ProtocolId) - -> Option<&RegisteredProtocol> { + -> Option<&RegisteredProtocol> { self.0.iter().find(|p| p.id == protocol) } @@ -308,19 +255,18 @@ impl RegisteredProtocols { } } -impl Default for RegisteredProtocols { +impl Default for RegisteredProtocols { fn default() -> Self { RegisteredProtocols(Vec::new()) } } -impl ConnectionUpgrade for RegisteredProtocols +impl ConnectionUpgrade for RegisteredProtocols where TSubstream: AsyncRead + AsyncWrite, - TUserData: Clone, { type NamesIter = VecIntoIter<(Bytes, Self::UpgradeIdentifier)>; type UpgradeIdentifier = (usize, - as ConnectionUpgrade>::UpgradeIdentifier); + >::UpgradeIdentifier); fn protocol_names(&self) -> Self::NamesIter { // We concat the lists of `RegisteredProtocol::protocol_names` for @@ -331,8 +277,8 @@ where TSubstream: AsyncRead + AsyncWrite, ).collect::>().into_iter() } - type Output = as ConnectionUpgrade>::Output; - type Future = as ConnectionUpgrade>::Future; + type Output = >::Output; + type Future = >::Future; #[inline] fn upgrade( diff --git a/substrate/core/network-libp2p/src/lib.rs b/substrate/core/network-libp2p/src/lib.rs index 46d5e84fc1..9d53b01b01 100644 --- a/substrate/core/network-libp2p/src/lib.rs +++ b/substrate/core/network-libp2p/src/lib.rs @@ -44,29 +44,21 @@ extern crate log; #[cfg(test)] #[macro_use] extern crate assert_matches; -pub use connection_filter::{ConnectionFilter, ConnectionDirection}; -pub use error::{Error, ErrorKind, DisconnectReason}; -pub use libp2p::{Multiaddr, multiaddr::Protocol, PeerId}; -pub use traits::*; - -pub type TimerToken = usize; - -// TODO: remove as it is unused ; however modifying `network` causes a clusterfuck of dependencies -// resolve errors at the moment -mod connection_filter; mod custom_proto; mod error; mod node_handler; mod secret; -mod service; mod service_task; mod swarm; -mod timeouts; mod topology; mod traits; mod transport; -pub use service::NetworkService; +pub use custom_proto::RegisteredProtocol; +pub use error::{Error, ErrorKind, DisconnectReason}; +pub use libp2p::{Multiaddr, multiaddr::Protocol, PeerId}; +pub use service_task::{start_service, Service, ServiceEvent}; +pub use traits::*; // TODO: expand to actual items /// Check if node url is valid pub fn validate_node_url(url: &str) -> Result<(), Error> { @@ -77,7 +69,7 @@ pub fn validate_node_url(url: &str) -> Result<(), Error> { } /// Parses a string address and returns the component, if valid. -pub(crate) fn parse_str_addr(addr_str: &str) -> Result<(PeerId, Multiaddr), Error> { +pub fn parse_str_addr(addr_str: &str) -> Result<(PeerId, Multiaddr), Error> { let mut addr: Multiaddr = addr_str.parse().map_err(|_| ErrorKind::AddressParse)?; let who = match addr.pop() { Some(Protocol::P2p(key)) => diff --git a/substrate/core/network-libp2p/src/node_handler.rs b/substrate/core/network-libp2p/src/node_handler.rs index 8e6c50ae80..f7a388ddcd 100644 --- a/substrate/core/network-libp2p/src/node_handler.rs +++ b/substrate/core/network-libp2p/src/node_handler.rs @@ -15,7 +15,7 @@ // along with Substrate. If not, see . use bytes::Bytes; -use custom_proto::{Packet, RegisteredProtocols, RegisteredProtocolSubstream}; +use custom_proto::{RegisteredProtocols, RegisteredProtocolSubstream}; use futures::{prelude::*, task}; use libp2p::core::{ConnectionUpgrade, Endpoint, PeerId, PublicKey, upgrade}; use libp2p::core::nodes::handled_node::{NodeHandler, NodeHandlerEndpoint, NodeHandlerEvent}; @@ -28,7 +28,7 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_timer::{Delay, Interval}; -use {Multiaddr, PacketId, ProtocolId}; +use {Multiaddr, ProtocolId}; /// Duration after which we consider that a ping failed. const PING_TIMEOUT: Duration = Duration::from_secs(30); @@ -48,9 +48,9 @@ const DELAY_TO_FIRST_IDENTIFY: Duration = Duration::from_secs(2); /// /// The node will be pinged at a regular interval to determine whether it's still alive. We will /// also regularly query the remote for identification information, for statistics purposes. -pub struct SubstrateNodeHandler { +pub struct SubstrateNodeHandler { /// List of registered custom protocols. - registered_custom: Arc>, + registered_custom: Arc, /// Substreams open for "custom" protocols (eg. dot). custom_protocols_substreams: Vec>, @@ -140,8 +140,6 @@ pub enum SubstrateOutEvent { CustomMessage { /// Protocol which generated the message. protocol_id: ProtocolId, - /// Identifier of the packet. - packet_id: u8, /// Data that has been received. data: Bytes, }, @@ -235,7 +233,6 @@ pub enum SubstrateInEvent { /// Sends a message through a custom protocol substream. SendCustomMessage { protocol: ProtocolId, - packet_id: PacketId, data: Vec, }, @@ -258,13 +255,12 @@ macro_rules! listener_upgrade { ) } -impl SubstrateNodeHandler +impl SubstrateNodeHandler where TSubstream: AsyncRead + AsyncWrite + Send + 'static, - TUserData: Clone + Send + 'static, { /// Creates a new node handler. #[inline] - pub fn new(registered_custom: Arc>, endpoint: ConnectedPoint) -> Self { + pub fn new(registered_custom: Arc, endpoint: ConnectedPoint) -> Self { let registered_custom_len = registered_custom.len(); let queued_dial_upgrades = registered_custom.0 .iter() @@ -298,9 +294,8 @@ where TSubstream: AsyncRead + AsyncWrite + Send + 'static, } } -impl NodeHandler for SubstrateNodeHandler +impl NodeHandler for SubstrateNodeHandler where TSubstream: AsyncRead + AsyncWrite + Send + 'static, - TUserData: Clone + Send + 'static, { type InEvent = SubstrateInEvent; type OutEvent = SubstrateOutEvent; @@ -383,8 +378,8 @@ where TSubstream: AsyncRead + AsyncWrite + Send + 'static, fn inject_event(&mut self, event: Self::InEvent) { match event { - SubstrateInEvent::SendCustomMessage { protocol, packet_id, data } => { - self.send_custom_message(protocol, packet_id, data); + SubstrateInEvent::SendCustomMessage { protocol, data } => { + self.send_custom_message(protocol, data); }, SubstrateInEvent::OpenKademlia => self.open_kademlia(), SubstrateInEvent::Accept => { @@ -449,15 +444,13 @@ where TSubstream: AsyncRead + AsyncWrite + Send + 'static, } } -impl SubstrateNodeHandler +impl SubstrateNodeHandler where TSubstream: AsyncRead + AsyncWrite + Send + 'static, - TUserData: Clone + Send + 'static, { /// Sends a message on a custom protocol substream. fn send_custom_message( &mut self, protocol: ProtocolId, - packet_id: PacketId, data: Vec, ) { debug_assert!(self.registered_custom.has_protocol(protocol), @@ -471,7 +464,7 @@ where TSubstream: AsyncRead + AsyncWrite + Send + 'static, }, }; - proto.send_message(Packet { id: packet_id, data: data.into() }); + proto.send_message(data.into()); } /// The node will try to open a Kademlia substream and produce a `KadOpen` event containing the @@ -688,12 +681,11 @@ where TSubstream: AsyncRead + AsyncWrite + Send + 'static, let mut custom_proto = self.custom_protocols_substreams.swap_remove(n); match custom_proto.poll() { Ok(Async::NotReady) => self.custom_protocols_substreams.push(custom_proto), - Ok(Async::Ready(Some(Packet { id, data }))) => { + Ok(Async::Ready(Some(data))) => { let protocol_id = custom_proto.protocol_id(); self.custom_protocols_substreams.push(custom_proto); return Ok(Async::Ready(Some(SubstrateOutEvent::CustomMessage { protocol_id, - packet_id: id, data, }))); }, diff --git a/substrate/core/network-libp2p/src/service.rs b/substrate/core/network-libp2p/src/service.rs deleted file mode 100644 index 1b96edc174..0000000000 --- a/substrate/core/network-libp2p/src/service.rs +++ /dev/null @@ -1,597 +0,0 @@ -// Copyright 2018 Parity Technologies (UK) Ltd. -// This file is part of Substrate. - -// Substrate is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Substrate is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Substrate. If not, see . - -use fnv::FnvHashMap; -use parking_lot::Mutex; -use libp2p::core::{nodes::swarm::ConnectedPoint, Multiaddr, PeerId as PeerstorePeerId}; -use libp2p::multiaddr::Protocol; -use {PacketId, SessionInfo, TimerToken}; -use service_task::ServiceEvent; -use std::io::{Error as IoError, ErrorKind as IoErrorKind}; -use std::sync::Arc; -use std::sync::mpsc as sync_mpsc; -use std::thread; -use std::time::Duration; -use futures::{prelude::*, Future, stream, Stream, select_all}; -use futures::sync::{mpsc, oneshot}; -use tokio::runtime::current_thread; -use {Error, ErrorKind, NetworkConfiguration, NetworkProtocolHandler, parse_str_addr}; -use {NonReservedPeerMode, NetworkContext, Severity, NodeIndex, ProtocolId}; - -use custom_proto::{RegisteredProtocol, RegisteredProtocols}; -use service_task::start_service; -use timeouts; - -/// IO Service with networking. -pub struct NetworkService { - /// Information about peers. - peer_infos: Arc>>, - - /// Use this channel to send a timeout request to the background thread's - /// events loop. After the timeout, elapsed, it will call `timeout` on the - /// `NetworkProtocolHandler`. - timeouts_register_tx: mpsc::UnboundedSender<(Duration, (Arc, ProtocolId, TimerToken))>, - - /// Sender for messages to send to the background thread. - msg_tx: mpsc::UnboundedSender, - - /// Sender for messages to the backgound service task, and handle for the background thread. - /// Dropping the sender should close the task and the thread. - bg_thread: Option<(oneshot::Sender<()>, thread::JoinHandle<()>)>, - - /// Original configuration of the service. - config: NetworkConfiguration, - - /// List of registered protocols. - registered_custom: Arc>>, - - /// The external URL to report to the user. - external_url: Option, -} - -/// Known information about a peer. -struct PeerInfos { - /// Id of the peer. - id: PeerstorePeerId, - - /// List of open custom protocols, and their version. - protocols: Vec<(ProtocolId, u8)>, - - /// How we are connected to the remote. - endpoint: ConnectedPoint, - - /// Latest known ping duration. - ping: Option, - - /// The client version of the remote, or `None` if not known. - client_version: Option, - - /// The local multiaddress used to communicate with the remote, or `None` - /// if not known. - // TODO: never filled ; also shouldn't be an `Option` - local_address: Option, -} - -/// Message to send to the service task. -#[derive(Debug, Clone)] -enum MsgToBgThread { - /// Call `add_reserved_peer` on the network service. - AddReservedPeer(PeerstorePeerId, Multiaddr), - /// Call `remove_reserved_peer` on the network service. - RemoveReservedPeer(PeerstorePeerId, Multiaddr), - /// Call `set_non_reserved_mode` on the network service. - SetNonReserved(NonReservedPeerMode), - /// Call `send_custom_message` on the network service. - SendCustomMessage(NodeIndex, ProtocolId, PacketId, Vec), - /// Call `drop_peer` on the network service. - DropNode(NodeIndex), - /// Call `ban_peer` on the network service. - BanNode(NodeIndex), -} - -impl NetworkService { - /// Starts the networking service. - /// - /// Note that we could use an iterator for `protocols`, but having a - /// generic here is too much and crashes the Rust compiler. - pub fn new( - config: NetworkConfiguration, - protocols: Vec<(Arc, ProtocolId, &[(u8, u8)])> - ) -> Result { - // Start by creating the protocols list. - let registered_custom = Arc::new(RegisteredProtocols(protocols.into_iter() - .map(|(handler, protocol, versions)| - RegisteredProtocol::new(handler.clone(), protocol, versions)) - .collect())); - - let (init_tx, init_rx) = sync_mpsc::channel(); - let (close_tx, close_rx) = oneshot::channel(); - let (timeouts_register_tx, timeouts_register_rx) = mpsc::unbounded(); - let peer_infos = Arc::new(Mutex::new(Default::default())); - let timeouts_register_tx_clone = timeouts_register_tx.clone(); - let (msg_tx, msg_rx) = mpsc::unbounded(); - let registered_custom_clone = registered_custom.clone(); - let config_clone = config.clone(); - let peer_infos_clone = peer_infos.clone(); - let msg_tx_clone = msg_tx.clone(); - - // Initialize all the protocols now. - // TODO: remove this `initialize` method eventually, as it's only used for timers - for protocol in registered_custom.0.iter() { - protocol.custom_data().initialize(&NetworkContextImpl { - peer_infos: peer_infos.clone(), - registered_custom: registered_custom.clone(), - msg_tx: msg_tx.clone(), - timeouts_register_tx: timeouts_register_tx.clone(), - protocol: protocol.id(), - current_peer: None, - }); - } - - let join_handle = thread::spawn(move || { - // Tokio runtime that is going to run everything in this thread. - let mut runtime = match current_thread::Runtime::new() { - Ok(c) => c, - Err(err) => { - let _ = init_tx.send(Err(err.into())); - return - } - }; - - let fut = match init_thread( - config_clone, - registered_custom_clone, - peer_infos_clone, - timeouts_register_tx_clone, - timeouts_register_rx, - msg_tx_clone, - msg_rx - ) { - Ok((future, external_url)) => { - debug!(target: "sub-libp2p", "Successfully started networking service"); - let _ = init_tx.send(Ok(external_url)); - future - }, - Err(err) => { - let _ = init_tx.send(Err(err)); - return - } - }; - - let fut = fut.map_err(|_| ()) - .select(close_rx.then(|_| Ok(()))) - .map(|_| ()).map_err(|_| ()); - match runtime.block_on(fut) { - Ok(()) => debug!(target: "sub-libp2p", "libp2p future finished"), - Err(err) => error!(target: "sub-libp2p", "Error while running libp2p: {:?}", err), - } - }); - - let external_url = init_rx.recv().expect("libp2p background thread panicked")?; - - Ok(NetworkService { - config, - peer_infos, - timeouts_register_tx, - msg_tx, - external_url, - bg_thread: Some((close_tx, join_handle)), - registered_custom, - }) - } - - /// Returns network configuration. - // TODO: is this method really necessary? we could remove the `config` field if not - pub fn config(&self) -> &NetworkConfiguration { - &self.config - } - - pub fn external_url(&self) -> Option { - // TODO: cleanup this in external API layers - self.external_url.clone() - } - - /// Get a list of all connected peers by id. - pub fn connected_peers(&self) -> Vec { - self.peer_infos.lock().keys().cloned().collect() - } - - /// Try to add a reserved peer. - pub fn add_reserved_peer(&self, peer: &str) -> Result<(), Error> { - let (peer_id, addr) = parse_str_addr(peer)?; - let _ = self.msg_tx.unbounded_send(MsgToBgThread::AddReservedPeer(peer_id, addr)); - Ok(()) - } - - /// Try to remove a reserved peer. - pub fn remove_reserved_peer(&self, peer: &str) -> Result<(), Error> { - let (peer_id, addr) = parse_str_addr(peer)?; - let _ = self.msg_tx.unbounded_send(MsgToBgThread::RemoveReservedPeer(peer_id, addr)); - Ok(()) - } - - /// Set the non-reserved peer mode. - pub fn set_non_reserved_mode(&self, mode: NonReservedPeerMode) { - let _ = self.msg_tx.unbounded_send(MsgToBgThread::SetNonReserved(mode)); - } - - /// Executes action in the network context - pub fn with_context(&self, protocol: ProtocolId, action: F) - where F: FnOnce(&NetworkContext) { - self.with_context_eval(protocol, action); - } - - /// Evaluates function in the network context - pub fn with_context_eval(&self, protocol: ProtocolId, action: F) - -> Option - where F: FnOnce(&NetworkContext) -> T { - if !self.registered_custom.has_protocol(protocol) { - return None - } - - Some(action(&NetworkContextImpl { - peer_infos: self.peer_infos.clone(), - registered_custom: self.registered_custom.clone(), - msg_tx: self.msg_tx.clone(), - timeouts_register_tx: self.timeouts_register_tx.clone(), - protocol: protocol.clone(), - current_peer: None, - })) - } -} - -impl Drop for NetworkService { - fn drop(&mut self) { - if let Some((sender, join)) = self.bg_thread.take() { - drop(sender); - if let Err(e) = join.join() { - warn!(target: "sub-libp2p", "error while waiting on libp2p background thread: {:?}", e); - } - } - } -} - -/// Builds the main `Future` for the network service. -/// -/// Also returns a diagnostic external node address, to report to the user. -fn init_thread( - config: NetworkConfiguration, - registered_custom: Arc>>, - peers: Arc>>, - timeouts_register_tx: mpsc::UnboundedSender< - (Duration, (Arc, ProtocolId, TimerToken)) - >, - timeouts_register_rx: mpsc::UnboundedReceiver< - (Duration, (Arc, ProtocolId, TimerToken)) - >, - msg_tx: mpsc::UnboundedSender, - mut msg_rx: mpsc::UnboundedReceiver, -) -> Result<(impl Future, Option), Error> { - // Build the timeouts system for the `register_timeout` function. - // (note: this has nothing to do with socket timeouts) - let timeouts = timeouts::build_timeouts_stream(timeouts_register_rx) - .for_each({ - let peers = peers.clone(); - let msg_tx = msg_tx.clone(); - let registered_custom = registered_custom.clone(); - let timeouts_register_tx = timeouts_register_tx.clone(); - move |(handler, protocol_id, timer_token)| { - handler.timeout(&NetworkContextImpl { - peer_infos: peers.clone(), - registered_custom: registered_custom.clone(), - msg_tx: msg_tx.clone(), - protocol: protocol_id, - current_peer: None, - timeouts_register_tx: timeouts_register_tx.clone(), - }, timer_token); - Ok(()) - } - }) - .then(|val| { - warn!(target: "sub-libp2p", "Timeouts stream closed unexpectedly: {:?}", val); - val - }); - - // Start the main service. - let mut service = start_service(config, registered_custom.clone())?; - - // Build the external URL to report to the user. - let external_url = service - .listeners() - .next() - .map(|addr| { - let mut addr = addr.clone(); - addr.append(Protocol::P2p(service.peer_id().clone().into())); - addr.to_string() - }); - - let service_stream = stream::poll_fn(move || { - loop { - match msg_rx.poll() { - Ok(Async::NotReady) => break, - Ok(Async::Ready(None)) => return Ok(Async::Ready(None)), - Ok(Async::Ready(Some(MsgToBgThread::AddReservedPeer(peer_id, addr)))) => { - service.add_reserved_peer(peer_id, addr); - }, - Ok(Async::Ready(Some(MsgToBgThread::RemoveReservedPeer(peer_id, addr)))) => { - service.remove_reserved_peer(peer_id, addr); - }, - Ok(Async::Ready(Some(MsgToBgThread::SetNonReserved(mode)))) => { - service.set_non_reserved_mode(mode); - }, - Ok(Async::Ready(Some(MsgToBgThread::SendCustomMessage(node_index, protocol_id, packet_id, data)))) => { - service.send_custom_message(node_index, protocol_id, packet_id, data); - }, - Ok(Async::Ready(Some(MsgToBgThread::DropNode(node_index)))) => { - service.drop_node(node_index); - }, - Ok(Async::Ready(Some(MsgToBgThread::BanNode(node_index)))) => { - service.ban_node(node_index); - }, - Err(()) => unreachable!("An unbounded receiver never errors"), - } - } - - service.poll() - }) - .for_each(move |event| { - macro_rules! ctxt { - ($protocol:expr, $node_index:expr) => ( - NetworkContextImpl { - peer_infos: peers.clone(), - registered_custom: registered_custom.clone(), - msg_tx: msg_tx.clone(), - timeouts_register_tx: timeouts_register_tx.clone(), - protocol: $protocol, - current_peer: Some($node_index), - } - ); - } - - match event { - ServiceEvent::NewNode { node_index, peer_id, endpoint } => { - peers.lock().insert(node_index, PeerInfos { - id: peer_id, - protocols: Vec::new(), - endpoint, - ping: None, - client_version: None, - local_address: None, // TODO: fill - }); - }, - ServiceEvent::NodeClosed { node_index, closed_custom_protocols } => { - let old = peers.lock().remove(&node_index); - debug_assert!(old.is_some()); - for protocol in closed_custom_protocols { - registered_custom.find_protocol(protocol) - .expect("we passed a list of protocols when building the service, and never \ - modify that list ; therefore all the reported ids should always be valid") - .custom_data() - .disconnected(&ctxt!(protocol, node_index), &node_index); - } - }, - ServiceEvent::ClosedCustomProtocols { node_index, protocols } => { - peers.lock().get_mut(&node_index) - .expect("peers is kept in sync with the state in the service") - .protocols - .retain(|&(ref p, _)| !protocols.iter().any(|pr| pr == p)); - for protocol in protocols { - registered_custom.find_protocol(protocol) - .expect("we passed a list of protocols when building the service, and never \ - modify that list ; therefore all the reported ids should always be valid") - .custom_data() - .disconnected(&ctxt!(protocol, node_index), &node_index); - } - }, - ServiceEvent::PingDuration(node_index, ping) => - peers.lock().get_mut(&node_index) - .expect("peers is kept in sync with the state in the service") - .ping = Some(ping), - ServiceEvent::NodeInfos { node_index, client_version } => - peers.lock().get_mut(&node_index) - .expect("peers is kept in sync with the state in the service") - .client_version = Some(client_version), - ServiceEvent::OpenedCustomProtocol { node_index, protocol, version } => { - peers.lock().get_mut(&node_index) - .expect("peers is kept in sync with the state in the service") - .protocols - .push((protocol, version)); - registered_custom.find_protocol(protocol) - .expect("we passed a list of protocols when building the service, and never \ - modify that list ; therefore all the reported ids should always be valid") - .custom_data() - .connected(&ctxt!(protocol, node_index), &node_index) - }, - ServiceEvent::ClosedCustomProtocol { node_index, protocol } => { - peers.lock().get_mut(&node_index) - .expect("peers is kept in sync with the state in the service") - .protocols - .retain(|&(ref p, _)| p != &protocol); - registered_custom.find_protocol(protocol) - .expect("we passed a list of protocols when building the service, and never \ - modify that list ; therefore all the reported ids should always be valid") - .custom_data() - .disconnected(&ctxt!(protocol, node_index), &node_index) - }, - ServiceEvent::CustomMessage { node_index, protocol_id, packet_id, data } => { - registered_custom.find_protocol(protocol_id) - .expect("we passed a list of protocols when building the service, and never \ - modify that list ; therefore all the reported ids should always be valid") - .custom_data() - .read(&ctxt!(protocol_id, node_index), &node_index, packet_id, &data) - }, - }; - Ok(()) - }); - - // Merge all futures into one. - let futures: Vec>> = vec![ - Box::new(service_stream), - Box::new(timeouts), - ]; - let final_future = select_all(futures) - .and_then(move |_| { - debug!(target: "sub-libp2p", "Networking ended"); - Ok(()) - }) - .map_err(|(r, _, _)| r); - - Ok((final_future, external_url)) -} - -#[derive(Clone)] -struct NetworkContextImpl { - peer_infos: Arc>>, - msg_tx: mpsc::UnboundedSender, - protocol: ProtocolId, - current_peer: Option, - registered_custom: Arc>>, - /// Clone of `NetworkService::timeouts_register_tx`. - timeouts_register_tx: mpsc::UnboundedSender<(Duration, (Arc, ProtocolId, TimerToken))>, -} - -impl NetworkContext for NetworkContextImpl { - fn send(&self, peer: NodeIndex, packet_id: PacketId, data: Vec) { - self.send_protocol(self.protocol, peer, packet_id, data) - } - - fn send_protocol( - &self, - protocol: ProtocolId, - peer: NodeIndex, - packet_id: PacketId, - data: Vec - ) { - let msg = MsgToBgThread::SendCustomMessage(peer, protocol, packet_id, data); - let _ = self.msg_tx.unbounded_send(msg); - } - - fn respond(&self, packet_id: PacketId, data: Vec) { - if let Some(peer) = self.current_peer { - self.send_protocol(self.protocol, peer, packet_id, data) - } else { - panic!("respond() called outside of a received message"); - } - } - - fn report_peer(&self, node_index: NodeIndex, reason: Severity) { - let peer_infos = self.peer_infos.lock(); - if let Some(info) = peer_infos.get(&node_index) { - if let Some(ref client_version) = info.client_version { - info!(target: "sub-libp2p", - "Peer {:?} ({:?} {}) reported by client: {}", - info.id, - info.endpoint, - client_version, - reason - ); - } else { - info!(target: "sub-libp2p", "Peer {:?} reported by client: {}", info.id, reason); - } - } - - let _ = self.msg_tx.unbounded_send(match reason { - Severity::Bad(_) => MsgToBgThread::BanNode(node_index), - Severity::Useless(_) => MsgToBgThread::DropNode(node_index), - Severity::Timeout => MsgToBgThread::DropNode(node_index), - }); - } - - fn is_expired(&self) -> bool { - let peer_infos = self.peer_infos.lock(); - let current_peer = self.current_peer.as_ref() - .expect("Called is_expired outside of a context"); - !peer_infos.contains_key(current_peer) - } - - fn register_timer(&self, token: usize, duration: Duration) - -> Result<(), Error> { - let handler = self.registered_custom - .find_protocol(self.protocol) - .ok_or(ErrorKind::BadProtocol)? - .custom_data() - .clone(); - self.timeouts_register_tx - .unbounded_send((duration, (handler, self.protocol, token))) - .map_err(|err| ErrorKind::Io(IoError::new(IoErrorKind::Other, err)))?; - Ok(()) - } - - fn peer_client_version(&self, peer: NodeIndex) -> String { - // Devp2p returns "unknown" on unknown peer ID, so we do the same. - // TODO: implement more directly, without going through `session_info` - self.session_info(peer) - .map(|info| info.client_version) - .unwrap_or_else(|| "unknown".to_string()) - } - - fn session_info(&self, peer: NodeIndex) -> Option { - let peer_infos = self.peer_infos.lock(); - let info = match peer_infos.get(&peer) { - Some(info) => info, - None => return None, - }; - - let protocol_id = self.protocol; - let protocol_version = match info.protocols.iter().find(|&(ref p, _)| p == &protocol_id) { - Some(&(_, vers)) => vers, - None => return None, - }; - - Some(SessionInfo { - id: info.id.clone(), - client_version: info.client_version.clone().take().unwrap_or(String::new()), - protocol_version: From::from(protocol_version), - capabilities: Vec::new(), // TODO: list of supported protocols ; hard - peer_capabilities: Vec::new(), // TODO: difference with `peer_capabilities`? - ping: info.ping, - originated: if let ConnectedPoint::Dialer { .. } = info.endpoint { true } else { false }, - remote_address: String::new(), // TODO: - local_address: info.local_address.as_ref().map(|a| a.to_string()) - .unwrap_or(String::new()), - }) - } - - fn protocol_version(&self, protocol: ProtocolId, peer: NodeIndex) -> Option { - let peer_infos = self.peer_infos.lock(); - let info = match peer_infos.get(&peer) { - Some(info) => info, - None => return None, - }; - - let protocol_version = match info.protocols.iter().find(|&(ref p, _)| p == &protocol) { - Some(&(_, vers)) => vers, - None => return None, - }; - - Some(protocol_version) - } - - fn subprotocol_name(&self) -> ProtocolId { - self.protocol.clone() - } -} - -#[cfg(test)] -mod tests { - use super::NetworkService; - - #[test] - fn builds_and_finishes_in_finite_time() { - // Checks that merely starting the network doesn't end up in an infinite loop. - let _service = NetworkService::new(Default::default(), vec![]).unwrap(); - } -} diff --git a/substrate/core/network-libp2p/src/service_task.rs b/substrate/core/network-libp2p/src/service_task.rs index e51ee36009..8a8a7e2004 100644 --- a/substrate/core/network-libp2p/src/service_task.rs +++ b/substrate/core/network-libp2p/src/service_task.rs @@ -15,7 +15,7 @@ // along with Substrate. If not, see . use bytes::Bytes; -use custom_proto::RegisteredProtocols; +use custom_proto::{RegisteredProtocol, RegisteredProtocols}; use fnv::{FnvHashMap, FnvHashSet}; use futures::{prelude::*, task, Stream}; use futures::sync::{oneshot, mpsc}; @@ -36,8 +36,8 @@ use std::time::{Duration, Instant}; use swarm::{self, Swarm, SwarmEvent}; use topology::{DisconnectReason, NetTopology}; use tokio_timer::{Delay, Interval}; -use {Error, ErrorKind, NetworkConfiguration, NetworkProtocolHandler, NodeIndex, parse_str_addr}; -use {NonReservedPeerMode, PacketId, ProtocolId}; +use {Error, ErrorKind, NetworkConfiguration, NodeIndex, parse_str_addr}; +use {NonReservedPeerMode, ProtocolId}; // File where the network topology is stored. const NODES_FILE: &str = "nodes.json"; @@ -47,16 +47,18 @@ const PEER_DISABLE_DURATION: Duration = Duration::from_secs(5 * 60); /// Starts the substrate libp2p service. /// /// Returns a stream that must be polled regularly in order for the networking to function. -pub fn start_service( +pub fn start_service( config: NetworkConfiguration, - registered_custom: Arc>>, -) -> Result { + registered_custom: TProtos, +) -> Result +where TProtos: IntoIterator { // Private and public keys configuration. let local_private_key = obtain_private_key(&config)?; let local_public_key = local_private_key.to_public_key(); let local_peer_id = local_public_key.clone().into_peer_id(); // Build the swarm. + let registered_custom = RegisteredProtocols(registered_custom.into_iter().collect()); let mut swarm = swarm::start_swarm(registered_custom, local_private_key)?; // Listen on multiaddresses. @@ -170,18 +172,6 @@ pub fn start_service( /// Event produced by the service. pub enum ServiceEvent { - /// We have successfully connected to a new node. - NewNode { - /// Index that was attributed for this node. Will be used for all further interaction with - /// it. - node_index: NodeIndex, - /// Public key of the node as a peer id. - peer_id: PeerId, - /// Whether we dialed the node or if it came to us. Should be used only for statistics - /// purposes. - endpoint: ConnectedPoint, - }, - /// Closed connection to a node. /// /// It is guaranteed that this node has been opened with a `NewNode` event beforehand. However @@ -193,17 +183,6 @@ pub enum ServiceEvent { closed_custom_protocols: Vec, }, - /// Report the duration of the ping for the given node. - PingDuration(NodeIndex, Duration), - - /// Report information about the node. - NodeInfos { - /// Index of the node. - node_index: NodeIndex, - /// The client version. Note that it can be anything and should not be trusted. - client_version: String, - }, - /// A custom protocol substream has been opened with a node. OpenedCustomProtocol { /// Index of the node. @@ -238,8 +217,6 @@ pub enum ServiceEvent { node_index: NodeIndex, /// Protocol which generated the message. protocol_id: ProtocolId, - /// Identifier of the packet. - packet_id: u8, /// Data that has been received. data: Bytes, }, @@ -248,7 +225,7 @@ pub enum ServiceEvent { /// Network service. Must be polled regularly in order for the networking to work. pub struct Service { /// Stream of events of the swarm. - swarm: Swarm>, + swarm: Swarm, /// Maximum number of incoming non-reserved connections, taken from the config. max_incoming_connections: usize, @@ -256,10 +233,8 @@ pub struct Service { /// Maximum number of outgoing non-reserved connections, taken from the config. max_outgoing_connections: usize, - /// For each node we're connected to, its address if known. - /// - /// This is used purely to report disconnections to the topology. - nodes_addresses: FnvHashMap, + /// For each node we're connected to, how we're connected to it. + nodes_addresses: FnvHashMap, /// If true, only reserved peers can connect. reserved_only: bool, @@ -322,6 +297,12 @@ impl Service { self.kad_system.local_peer_id() } + /// Returns the list of all the peers we are connected to. + #[inline] + pub fn connected_peers<'a>(&'a self) -> impl Iterator + 'a { + self.nodes_addresses.keys().cloned() + } + /// Try to add a reserved peer. pub fn add_reserved_peer(&mut self, peer_id: PeerId, addr: Multiaddr) { self.reserved_peers.insert(peer_id.clone()); @@ -330,35 +311,59 @@ impl Service { } /// Try to remove a reserved peer. - // TODO: remove `_addr` parameter? - pub fn remove_reserved_peer(&mut self, peer_id: PeerId, _addr: Multiaddr) { + /// + /// If we are in reserved mode and we were connected to a node with this peer ID, then this + /// method will disconnect it and return its index. + pub fn remove_reserved_peer(&mut self, peer_id: PeerId) -> Option { self.reserved_peers.remove(&peer_id); if self.reserved_only { if let Some(node_index) = self.swarm.latest_node_by_peer_id(&peer_id) { self.drop_node_inner(node_index, DisconnectReason::NoSlot, None); + return Some(node_index); } } + None + } + + /// Start accepting all peers again if we weren't. + pub fn accept_unreserved_peers(&mut self) { + if self.reserved_only { + self.reserved_only = false; + self.connect_to_nodes(); + } } - /// Set the non-reserved peer mode. - pub fn set_non_reserved_mode(&mut self, mode: NonReservedPeerMode) { - self.reserved_only = mode == NonReservedPeerMode::Deny; - if self.reserved_only { - // Disconnect the nodes that are not reserved. - let to_disconnect: Vec = self.swarm - .nodes() - .filter(|&n| { - let peer_id = self.swarm.peer_id_of_node(n) - .expect("swarm.nodes() always returns valid node indices"); - !self.reserved_peers.contains(peer_id) - }) - .collect(); - for node_index in to_disconnect { - self.drop_node_inner(node_index, DisconnectReason::NoSlot, None); - } - } else { - self.connect_to_nodes(); + /// Start refusing non-reserved nodes. Returns the list of nodes that have been disconnected. + pub fn deny_unreserved_peers(&mut self) -> Vec { + self.reserved_only = true; + + // Disconnect the nodes that are not reserved. + let to_disconnect: Vec = self.swarm + .nodes() + .filter(|&n| { + let peer_id = self.swarm.peer_id_of_node(n) + .expect("swarm.nodes() always returns valid node indices"); + !self.reserved_peers.contains(peer_id) + }) + .collect(); + + for &node_index in &to_disconnect { + self.drop_node_inner(node_index, DisconnectReason::NoSlot, None); } + + to_disconnect + } + + /// Returns the `PeerId` of a node. + #[inline] + pub fn peer_id_of_node(&self, node_index: NodeIndex) -> Option<&PeerId> { + self.swarm.peer_id_of_node(node_index) + } + + /// Returns the way we are connected to a node. + #[inline] + pub fn node_endpoint(&self, node_index: NodeIndex) -> Option<&ConnectedPoint> { + self.nodes_addresses.get(&node_index) } /// Sends a message to a peer using the custom protocol. @@ -367,10 +372,9 @@ impl Service { &mut self, node_index: NodeIndex, protocol: ProtocolId, - packet_id: PacketId, data: Vec ) { - self.swarm.send_custom_message(node_index, protocol, packet_id, data) + self.swarm.send_custom_message(node_index, protocol, data) } /// Disconnects a peer and bans it for a little while. @@ -378,6 +382,10 @@ impl Service { /// Same as `drop_node`, except that the same peer will not be able to reconnect later. #[inline] pub fn ban_node(&mut self, node_index: NodeIndex) { + if let Some(peer_id) = self.swarm.peer_id_of_node(node_index) { + info!(target: "sub-libp2p", "Banned {:?}", peer_id); + } + self.drop_node_inner(node_index, DisconnectReason::Banned, Some(PEER_DISABLE_DURATION)); } @@ -387,6 +395,10 @@ impl Service { /// Corresponding closing events will be generated once the closing actually happens. #[inline] pub fn drop_node(&mut self, node_index: NodeIndex) { + if let Some(peer_id) = self.swarm.peer_id_of_node(node_index) { + info!(target: "sub-libp2p", "Dropped {:?}", peer_id); + } + self.drop_node_inner(node_index, DisconnectReason::Useless, None); } @@ -414,8 +426,8 @@ impl Service { to_notify.notify(); } - if let Some(addr) = self.nodes_addresses.remove(&node_index) { - self.topology.report_disconnected(&addr, reason); + if let Some(ConnectedPoint::Dialer { address }) = self.nodes_addresses.remove(&node_index) { + self.topology.report_disconnected(&address, reason); } if let Some(disable_duration) = disable_duration { @@ -472,10 +484,8 @@ impl Service { continue; } - // TODO: it is possible that we are connected to this peer, but the topology - // doesn't know about that because we don't know its multiaddress yet - // TODO: after some changes in libp2p, we can avoid this situation and also remove - // the `num_to_open` variable + // It is possible that we are connected to this peer, but the topology doesn't know + // about that because it is an incoming connection. match self.swarm.ensure_connection(peer_id.clone(), addr.clone()) { Ok(true) => (), Ok(false) => num_to_open -= 1, @@ -578,15 +588,13 @@ impl Service { /// Handles the swarm opening a connection to the given peer. /// - /// Returns the `NewNode` event to produce. - /// /// > **Note**: Must be called from inside `poll()`, otherwise it will panic. fn handle_connection( &mut self, node_index: NodeIndex, peer_id: PeerId, endpoint: ConnectedPoint - ) -> Option { + ) { // Reject connections to our own node, which can happen if the DHT contains `127.0.0.1` // for example. if &peer_id == self.kad_system.local_peer_id() { @@ -595,7 +603,7 @@ impl Service { if let ConnectedPoint::Dialer { ref address } = endpoint { self.topology.report_failed_to_connect(address); } - return None; + return; } // Reject non-reserved nodes if we're in reserved mode. @@ -606,7 +614,7 @@ impl Service { if let ConnectedPoint::Dialer { ref address } = endpoint { self.topology.report_failed_to_connect(address); } - return None; + return; } // Reject connections from disabled peers. @@ -617,7 +625,7 @@ impl Service { if let ConnectedPoint::Dialer { ref address } = endpoint { self.topology.report_failed_to_connect(address); } - return None; + return; } } @@ -629,18 +637,17 @@ impl Service { } else { info!(target: "sub-libp2p", "Rejected incoming peer {:?} because we are full", peer_id); assert_eq!(self.swarm.drop_node(node_index), Ok(Vec::new())); - return None; + return; } }, ConnectedPoint::Dialer { ref address } => { if is_reserved || self.num_outgoing_connections() < self.max_outgoing_connections { debug!(target: "sub-libp2p", "Connected to {:?} through {}", peer_id, address); self.topology.report_connected(address, &peer_id); - self.nodes_addresses.insert(node_index, address.clone()); } else { debug!(target: "sub-libp2p", "Rejected dialed peer {:?} because we are full", peer_id); assert_eq!(self.swarm.drop_node(node_index), Ok(Vec::new())); - return None; + return; } }, }; @@ -649,19 +656,19 @@ impl Service { error!(target: "sub-libp2p", "accept_node returned an error"); } + // We are finally sure that we're connected. + + if let ConnectedPoint::Dialer { ref address } = endpoint { + self.topology.report_connected(address, &peer_id); + } + self.nodes_addresses.insert(node_index, endpoint.clone()); + // If we're waiting for a Kademlia substream for this peer id, open one. let kad_pending_ctrls = self.kad_pending_ctrls.lock(); if kad_pending_ctrls.contains_key(&peer_id) { let res = self.swarm.open_kademlia(node_index); debug_assert!(res.is_ok()); } - drop(kad_pending_ctrls); - - Some(ServiceEvent::NewNode { - node_index, - peer_id, - endpoint - }) } /// Processes an event received by the swarm. @@ -674,22 +681,20 @@ impl Service { event: SwarmEvent ) -> Option { match event { - SwarmEvent::NodePending { node_index, peer_id, endpoint } => - if let Some(event) = self.handle_connection(node_index, peer_id, endpoint) { - Some(event) - } else { - None - }, + SwarmEvent::NodePending { node_index, peer_id, endpoint } => { + self.handle_connection(node_index, peer_id, endpoint); + None + }, SwarmEvent::Reconnected { node_index, endpoint, closed_custom_protocols } => { - if let Some(addr) = self.nodes_addresses.remove(&node_index) { - self.topology.report_disconnected(&addr, DisconnectReason::FoundBetterAddr); + if let Some(ConnectedPoint::Dialer { address }) = self.nodes_addresses.remove(&node_index) { + self.topology.report_disconnected(&address, DisconnectReason::FoundBetterAddr); } - if let ConnectedPoint::Dialer { address } = endpoint { + if let ConnectedPoint::Dialer { ref address } = endpoint { let peer_id = self.swarm.peer_id_of_node(node_index) .expect("the swarm always produces events containing valid node indices"); - self.nodes_addresses.insert(node_index, address.clone()); - self.topology.report_connected(&address, peer_id); + self.topology.report_connected(address, peer_id); } + self.nodes_addresses.insert(node_index, endpoint); Some(ServiceEvent::ClosedCustomProtocols { node_index, protocols: closed_custom_protocols, @@ -697,8 +702,8 @@ impl Service { }, SwarmEvent::NodeClosed { node_index, peer_id, closed_custom_protocols } => { debug!(target: "sub-libp2p", "Connection to {:?} closed gracefully", peer_id); - if let Some(addr) = self.nodes_addresses.get(&node_index) { - self.topology.report_disconnected(addr, DisconnectReason::RemoteClosed); + if let Some(ConnectedPoint::Dialer { ref address }) = self.nodes_addresses.get(&node_index) { + self.topology.report_disconnected(address, DisconnectReason::RemoteClosed); } self.connect_to_nodes(); Some(ServiceEvent::NodeClosed { @@ -715,8 +720,8 @@ impl Service { SwarmEvent::UnresponsiveNode { node_index } => { let closed_custom_protocols = self.swarm.drop_node(node_index) .expect("the swarm always produces events containing valid node indices"); - if let Some(addr) = self.nodes_addresses.remove(&node_index) { - self.topology.report_disconnected(&addr, DisconnectReason::Useless); + if let Some(ConnectedPoint::Dialer { address }) = self.nodes_addresses.remove(&node_index) { + self.topology.report_disconnected(&address, DisconnectReason::Useless); } Some(ServiceEvent::NodeClosed { node_index, @@ -730,27 +735,22 @@ impl Service { let closed_custom_protocols = self.swarm.drop_node(node_index) .expect("the swarm always produces events containing valid node indices"); self.topology.report_useless(&peer_id); - if let Some(addr) = self.nodes_addresses.remove(&node_index) { - self.topology.report_disconnected(&addr, DisconnectReason::Useless); + if let Some(ConnectedPoint::Dialer { address }) = self.nodes_addresses.remove(&node_index) { + self.topology.report_disconnected(&address, DisconnectReason::Useless); } Some(ServiceEvent::NodeClosed { node_index, closed_custom_protocols, }) }, - SwarmEvent::PingDuration(node_index, ping) => - Some(ServiceEvent::PingDuration(node_index, ping)), - SwarmEvent::NodeInfos { node_index, client_version, listen_addrs } => { + SwarmEvent::NodeInfos { node_index, listen_addrs, .. } => { let peer_id = self.swarm.peer_id_of_node(node_index) .expect("the swarm always produces events containing valid node indices"); self.topology.add_self_reported_listen_addrs( peer_id, listen_addrs.into_iter() ); - Some(ServiceEvent::NodeInfos { - node_index, - client_version, - }) + None }, SwarmEvent::KadFindNode { searched, responder, .. } => { let response = self.build_kademlia_response(&searched); @@ -786,14 +786,13 @@ impl Service { node_index, protocol, }), - SwarmEvent::CustomMessage { node_index, protocol_id, packet_id, data } => { + SwarmEvent::CustomMessage { node_index, protocol_id, data } => { let peer_id = self.swarm.peer_id_of_node(node_index) .expect("the swarm always produces events containing valid node indices"); self.kad_system.update_kbuckets(peer_id.clone()); Some(ServiceEvent::CustomMessage { node_index, protocol_id, - packet_id, data, }) }, diff --git a/substrate/core/network-libp2p/src/swarm.rs b/substrate/core/network-libp2p/src/swarm.rs index 7cd44ae2d7..497472d93f 100644 --- a/substrate/core/network-libp2p/src/swarm.rs +++ b/substrate/core/network-libp2p/src/swarm.rs @@ -28,18 +28,17 @@ use libp2p::kad::{KadConnecController, KadFindNodeRespond}; use libp2p::secio; use node_handler::{SubstrateOutEvent, SubstrateNodeHandler, SubstrateInEvent, IdentificationRequest}; use std::io::{Error as IoError, ErrorKind as IoErrorKind}; -use std::{mem, sync::Arc, time::Duration}; +use std::{mem, sync::Arc}; use transport; -use {Error, NodeIndex, PacketId, ProtocolId}; +use {Error, NodeIndex, ProtocolId}; /// Starts a swarm. /// /// Returns a stream that must be polled regularly in order for the networking to function. -pub fn start_swarm( - registered_custom: Arc>, +pub fn start_swarm( + registered_custom: RegisteredProtocols, local_private_key: secio::SecioKeyPair, -) -> Result, Error> -where TUserData: Send + Sync + Clone + 'static { +) -> Result { // Private and public keys. let local_public_key = local_private_key.to_public_key(); let local_peer_id = local_public_key.clone().into_peer_id(); @@ -48,7 +47,7 @@ where TUserData: Send + Sync + Clone + 'static { let transport = transport::build_transport(local_private_key); // Build the underlying libp2p swarm. - let swarm = Libp2pSwarm::with_handler_builder(transport, HandlerBuilder(registered_custom)); + let swarm = Libp2pSwarm::with_handler_builder(transport, HandlerBuilder(Arc::new(registered_custom))); Ok(Swarm { swarm, @@ -64,11 +63,10 @@ where TUserData: Send + Sync + Clone + 'static { /// Dummy structure that exists because we need to be able to express the type. Otherwise we would /// use a closure. #[derive(Clone)] -struct HandlerBuilder(Arc>); -impl HandlerFactory for HandlerBuilder -where TUserData: Clone + Send + Sync + 'static +struct HandlerBuilder(Arc); +impl HandlerFactory for HandlerBuilder { - type Handler = SubstrateNodeHandler, TUserData>; + type Handler = SubstrateNodeHandler>; #[inline] fn new_handler(&self, addr: ConnectedPoint) -> Self::Handler { @@ -122,9 +120,6 @@ pub enum SwarmEvent { error: IoError, }, - /// Report the duration of the ping for the given node. - PingDuration(NodeIndex, Duration), - /// Report information about the node. NodeInfos { /// Index of the node. @@ -159,8 +154,6 @@ pub enum SwarmEvent { node_index: NodeIndex, /// Protocol which generated the message. protocol_id: ProtocolId, - /// Identifier of the packet. - packet_id: u8, /// Data that has been received. data: Bytes, }, @@ -209,13 +202,13 @@ pub enum SwarmEvent { } /// Network swarm. Must be polled regularly in order for the networking to work. -pub struct Swarm { +pub struct Swarm { /// Stream of events of the swarm. swarm: Libp2pSwarm< Boxed<(PeerId, Muxer)>, SubstrateInEvent, SubstrateOutEvent>, - HandlerBuilder + HandlerBuilder >, /// Public key of the local node. @@ -252,8 +245,7 @@ struct NodeInfo { /// The muxer used by the transport. type Muxer = muxing::StreamMuxerBox; -impl Swarm - where TUserData: Clone + Send + Sync + 'static { +impl Swarm { /// Start listening on a multiaddr. #[inline] pub fn listen_on(&mut self, addr: Multiaddr) -> Result { @@ -311,12 +303,11 @@ impl Swarm &mut self, node_index: NodeIndex, protocol: ProtocolId, - packet_id: PacketId, data: Vec ) { if let Some(info) = self.nodes_info.get_mut(&node_index) { if let Some(mut connected) = self.swarm.peer(info.peer_id.clone()).as_connected() { - connected.send_event(SubstrateInEvent::SendCustomMessage { protocol, packet_id, data }); + connected.send_event(SubstrateInEvent::SendCustomMessage { protocol, data }); } else { error!(target: "sub-libp2p", "Tried to send message to {:?}, but we're not \ connected to it", info.peer_id); @@ -637,7 +628,7 @@ impl Swarm }, SubstrateOutEvent::PingSuccess(ping) => { trace!(target: "sub-libp2p", "Pong from {:?} in {:?}", peer_id, ping); - Some(SwarmEvent::PingDuration(node_index, ping)) + None }, SubstrateOutEvent::Identified { info, observed_addr } => { self.add_observed_addr(&peer_id, &observed_addr); @@ -689,11 +680,10 @@ impl Swarm protocol: protocol_id, }) }, - SubstrateOutEvent::CustomMessage { protocol_id, packet_id, data } => { + SubstrateOutEvent::CustomMessage { protocol_id, data } => { Some(SwarmEvent::CustomMessage { node_index, protocol_id, - packet_id, data, }) }, @@ -706,8 +696,7 @@ impl Swarm } } -impl Stream for Swarm - where TUserData: Clone + Send + Sync + 'static { +impl Stream for Swarm { type Item = SwarmEvent; type Error = IoError; diff --git a/substrate/core/network-libp2p/src/timeouts.rs b/substrate/core/network-libp2p/src/timeouts.rs deleted file mode 100644 index 9b5615b0d5..0000000000 --- a/substrate/core/network-libp2p/src/timeouts.rs +++ /dev/null @@ -1,115 +0,0 @@ -// Copyright 2018 Parity Technologies (UK) Ltd. -// This file is part of Substrate. - -// Substrate is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Substrate is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Substrate. If not, see . - -use futures::{Async, future, Future, Poll, stream, Stream, sync::mpsc}; -use std::io::{Error as IoError, ErrorKind as IoErrorKind}; -use std::marker::PhantomData; -use std::time::{Duration, Instant}; -use tokio_timer::{self, Delay}; - -/// Builds the timeouts system. -/// -/// The `timeouts_rx` should be a stream receiving newly-created timeout -/// requests. Returns a stream that produces items as their timeout elapses. -/// `T` can be anything you want, as it is transparently passed from the input -/// to the output. Timeouts continue to fire forever, as there is no way to -/// unregister them. -pub fn build_timeouts_stream<'a, T>( - timeouts_rx: mpsc::UnboundedReceiver<(Duration, T)> -) -> Box + 'a> - where T: Clone + 'a { - let next_timeout = next_in_timeouts_stream(timeouts_rx); - - // The `unfold` function is essentially a loop turned into a stream. The - // first parameter is the initial state, and the closure returns the new - // state and an item. - let stream = stream::unfold(vec![future::Either::A(next_timeout)], move |timeouts| { - // `timeouts` is a `Vec` of futures that produce an `Out`. - - // `select_ok` panics if `timeouts` is empty anyway. - if timeouts.is_empty() { - return None - } - - Some(future::select_ok(timeouts.into_iter()) - .and_then(move |(item, mut timeouts)| - match item { - Out::NewTimeout((Some((duration, item)), next_timeouts)) => { - // Received a new timeout request on the channel. - let next_timeout = next_in_timeouts_stream(next_timeouts); - let timeout = Delay::new(Instant::now() + duration); - let timeout = TimeoutWrapper(timeout, duration, Some(item), PhantomData); - timeouts.push(future::Either::B(timeout)); - timeouts.push(future::Either::A(next_timeout)); - Ok((None, timeouts)) - }, - Out::NewTimeout((None, _)) => - // The channel has been closed. - Ok((None, timeouts)), - Out::Timeout(duration, item) => { - // A timeout has happened. - let returned = item.clone(); - let timeout = Delay::new(Instant::now() + duration); - let timeout = TimeoutWrapper(timeout, duration, Some(item), PhantomData); - timeouts.push(future::Either::B(timeout)); - Ok((Some(returned), timeouts)) - }, - } - ) - ) - }).filter_map(|item| item); - - // Note that we use a `Box` in order to speed up compilation time. - Box::new(stream) as Box> -} - -/// Local enum representing the output of the selection. -enum Out { - NewTimeout(A), - Timeout(Duration, B), -} - -/// Convenience function that calls `.into_future()` on the timeouts stream, -/// and applies some modifiers. -/// This function is necessary. Otherwise if we copy-paste its content we run -/// into errors because the type of the copy-pasted closures differs. -fn next_in_timeouts_stream( - stream: mpsc::UnboundedReceiver -) -> impl Future, mpsc::UnboundedReceiver), B>, Error = IoError> { - stream - .into_future() - .map(Out::NewTimeout) - .map_err(|_| unreachable!("an UnboundedReceiver can never error")) -} - -/// Does the equivalent to `future.map(move |()| (duration, item)).map_err(|err| to_io_err(err))`. -struct TimeoutWrapper(F, Duration, Option, PhantomData); -impl Future for TimeoutWrapper - where F: Future { - type Item = Out; - type Error = IoError; - - fn poll(&mut self) -> Poll { - match self.0.poll() { - Ok(Async::Ready(())) => (), - Ok(Async::NotReady) => return Ok(Async::NotReady), - Err(err) => return Err(IoError::new(IoErrorKind::Other, err.to_string())), - } - - let out = Out::Timeout(self.1, self.2.take().expect("poll() called again after success")); - Ok(Async::Ready(out)) - } -} diff --git a/substrate/core/network-libp2p/src/traits.rs b/substrate/core/network-libp2p/src/traits.rs index 07c83474f3..d81702f314 100644 --- a/substrate/core/network-libp2p/src/traits.rs +++ b/substrate/core/network-libp2p/src/traits.rs @@ -14,18 +14,9 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . -use std::fmt; -use std::cmp::Ordering; -use std::iter; -use std::net::Ipv4Addr; -use std::str; -use std::time::Duration; -use TimerToken; +use std::{fmt, iter, net::Ipv4Addr, str}; use libp2p::{multiaddr::Protocol, Multiaddr, PeerId}; -use error::Error; -/// Protocol handler level packet id -pub type PacketId = u8; /// Protocol / handler id pub type ProtocolId = [u8; 3]; @@ -38,66 +29,6 @@ pub type NodeIndex = usize; /// secio secret key; pub type Secret = [u8; 32]; -/// Shared session information -#[derive(Debug, Clone)] -pub struct SessionInfo { - /// Peer public key - pub id: NodeId, - /// Peer client ID - pub client_version: String, - /// Peer RLPx protocol version - pub protocol_version: u32, - /// Session protocol capabilities - pub capabilities: Vec, - /// Peer protocol capabilities - pub peer_capabilities: Vec, - /// Peer ping delay - pub ping: Option, - /// True if this session was originated by us. - pub originated: bool, - /// Remote endpoint address of the session - pub remote_address: String, - /// Local endpoint address of the session - pub local_address: String, -} - -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct PeerCapabilityInfo { - pub protocol: ProtocolId, - pub version: u8, -} - -impl ToString for PeerCapabilityInfo { - fn to_string(&self) -> String { - format!("{}/{}", str::from_utf8(&self.protocol[..]).unwrap_or("???"), self.version) - } -} - -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct SessionCapabilityInfo { - pub protocol: [u8; 3], - pub version: u8, - pub packet_count: u8, - pub id_offset: u8, -} - -impl PartialOrd for SessionCapabilityInfo { - fn partial_cmp(&self, other: &SessionCapabilityInfo) -> Option { - Some(self.cmp(other)) - } -} - -impl Ord for SessionCapabilityInfo { - fn cmp(&self, b: &SessionCapabilityInfo) -> Ordering { - // By protocol id first - if self.protocol != b.protocol { - return self.protocol.cmp(&b.protocol); - } - // By version - self.version.cmp(&b.version) - } -} - /// Network service configuration #[derive(Debug, PartialEq, Clone)] pub struct NetworkConfiguration { @@ -188,97 +119,6 @@ impl<'a> fmt::Display for Severity<'a> { } } -/// IO access point. This is passed to all IO handlers and provides an interface to the IO subsystem. -pub trait NetworkContext { - /// Send a packet over the network to another peer. - fn send(&self, peer: NodeIndex, packet_id: PacketId, data: Vec); - - /// Send a packet over the network to another peer using specified protocol. - fn send_protocol(&self, protocol: ProtocolId, peer: NodeIndex, packet_id: PacketId, data: Vec); - - /// Respond to a current network message. Panics if no there is no packet in the context. If the session is expired returns nothing. - fn respond(&self, packet_id: PacketId, data: Vec); - - /// Report peer. Depending on the report, peer may be disconnected and possibly banned. - fn report_peer(&self, peer: NodeIndex, reason: Severity); - - /// Check if the session is still active. - fn is_expired(&self) -> bool; - - /// Register a new IO timer. 'IoHandler::timeout' will be called with the token. - fn register_timer(&self, token: TimerToken, delay: Duration) -> Result<(), Error>; - - /// Returns peer identification string - fn peer_client_version(&self, peer: NodeIndex) -> String; - - /// Returns information on p2p session - fn session_info(&self, peer: NodeIndex) -> Option; - - /// Returns max version for a given protocol. - fn protocol_version(&self, protocol: ProtocolId, peer: NodeIndex) -> Option; - - /// Returns this object's subprotocol name. - fn subprotocol_name(&self) -> ProtocolId; -} - -impl<'a, T> NetworkContext for &'a T where T: ?Sized + NetworkContext { - fn send(&self, peer: NodeIndex, packet_id: PacketId, data: Vec) { - (**self).send(peer, packet_id, data) - } - - fn send_protocol(&self, protocol: ProtocolId, peer: NodeIndex, packet_id: PacketId, data: Vec) { - (**self).send_protocol(protocol, peer, packet_id, data) - } - - fn respond(&self, packet_id: PacketId, data: Vec) { - (**self).respond(packet_id, data) - } - - fn report_peer(&self, peer: NodeIndex, reason: Severity) { - (**self).report_peer(peer, reason) - } - - fn is_expired(&self) -> bool { - (**self).is_expired() - } - - fn register_timer(&self, token: TimerToken, delay: Duration) -> Result<(), Error> { - (**self).register_timer(token, delay) - } - - fn peer_client_version(&self, peer: NodeIndex) -> String { - (**self).peer_client_version(peer) - } - - fn session_info(&self, peer: NodeIndex) -> Option { - (**self).session_info(peer) - } - - fn protocol_version(&self, protocol: ProtocolId, peer: NodeIndex) -> Option { - (**self).protocol_version(protocol, peer) - } - - fn subprotocol_name(&self) -> ProtocolId { - (**self).subprotocol_name() - } -} - -/// Network IO protocol handler. This needs to be implemented for each new subprotocol. -/// All the handler function are called from within IO event loop. -/// `Message` is the type for message data. -pub trait NetworkProtocolHandler: Sync + Send { - /// Initialize the handler - fn initialize(&self, _io: &NetworkContext) {} - /// Called when new network packet received. - fn read(&self, io: &NetworkContext, peer: &NodeIndex, packet_id: u8, data: &[u8]); - /// Called when new peer is connected. Only called when peer supports the same protocol. - fn connected(&self, io: &NetworkContext, peer: &NodeIndex); - /// Called when a previously connected peer disconnects. - fn disconnected(&self, io: &NetworkContext, peer: &NodeIndex); - /// Timer function called after a timeout created with `NetworkContext::timeout`. - fn timeout(&self, _io: &NetworkContext, _timer: TimerToken) {} -} - /// Non-reserved peer modes. #[derive(Clone, Debug, PartialEq, Eq)] pub enum NonReservedPeerMode { diff --git a/substrate/core/network-libp2p/tests/tests.rs b/substrate/core/network-libp2p/tests/tests.rs deleted file mode 100644 index 480253c1ea..0000000000 --- a/substrate/core/network-libp2p/tests/tests.rs +++ /dev/null @@ -1,124 +0,0 @@ -// Copyright 2018 Parity Technologies (UK) Ltd. -// This file is part of Substrate. - -// Parity is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Parity is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Parity. If not, see . - -extern crate parking_lot; -extern crate parity_bytes; -extern crate substrate_network_libp2p; - -use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering}; -use std::sync::Arc; -use std::thread; -use std::time::*; -use parking_lot::Mutex; -use parity_bytes::Bytes; -use substrate_network_libp2p::*; -use TimerToken; - -pub struct TestProtocol { - drop_session: bool, - pub packet: Mutex, - pub got_timeout: AtomicBool, - pub got_disconnect: AtomicBool, -} - -impl TestProtocol { - pub fn new(drop_session: bool) -> Self { - TestProtocol { - packet: Mutex::new(Vec::new()), - got_timeout: AtomicBool::new(false), - got_disconnect: AtomicBool::new(false), - drop_session: drop_session, - } - } - - pub fn got_packet(&self) -> bool { - self.packet.lock()[..] == b"hello"[..] - } - - pub fn got_timeout(&self) -> bool { - self.got_timeout.load(AtomicOrdering::Relaxed) - } - - pub fn got_disconnect(&self) -> bool { - self.got_disconnect.load(AtomicOrdering::Relaxed) - } -} - -impl NetworkProtocolHandler for TestProtocol { - fn initialize(&self, io: &NetworkContext) { - io.register_timer(0, Duration::from_millis(10)).unwrap(); - } - - fn read(&self, _io: &NetworkContext, _peer: &NodeIndex, packet_id: u8, data: &[u8]) { - assert_eq!(packet_id, 33); - self.packet.lock().extend(data); - } - - fn connected(&self, io: &NetworkContext, peer: &NodeIndex) { - if self.drop_session { - io.report_peer(*peer, Severity::Bad("We are evil and just want to drop")) - } else { - io.respond(33, "hello".to_owned().into_bytes()); - } - } - - fn disconnected(&self, _io: &NetworkContext, _peer: &NodeIndex) { - self.got_disconnect.store(true, AtomicOrdering::Relaxed); - } - - /// Timer function called after a timeout created with `NetworkContext::timeout`. - fn timeout(&self, _io: &NetworkContext, timer: TimerToken) { - assert_eq!(timer, 0); - self.got_timeout.store(true, AtomicOrdering::Relaxed); - } -} - - -#[test] -fn net_service() { - let _service = NetworkService::new( - NetworkConfiguration::new_local(), - vec![(Arc::new(TestProtocol::new(false)), *b"myp", &[(1u8, 1)])] - ).expect("Error creating network service"); -} - -#[test] -#[ignore] // TODO: how is this test even supposed to work? -fn net_disconnect() { - let mut config1 = NetworkConfiguration::new_local(); - config1.boot_nodes = vec![ ]; - let handler1 = Arc::new(TestProtocol::new(false)); - let service1 = NetworkService::new(config1, vec![(handler1.clone(), *b"tst", &[(42u8, 1), (43u8, 1)])]).unwrap(); - let mut config2 = NetworkConfiguration::new_local(); - config2.boot_nodes = vec![ service1.external_url().unwrap() ]; - let handler2 = Arc::new(TestProtocol::new(true)); - let _service2 = NetworkService::new(config2, vec![(handler2.clone(), *b"tst", &[(42u8, 1), (43u8, 1)])]).unwrap(); - while !(handler1.got_disconnect() && handler2.got_disconnect()) { - thread::sleep(Duration::from_millis(50)); - } - assert!(handler1.got_disconnect()); - assert!(handler2.got_disconnect()); -} - -#[test] -fn net_timeout() { - let config = NetworkConfiguration::new_local(); - let handler = Arc::new(TestProtocol::new(false)); - let _service = NetworkService::new(config, vec![(handler.clone(), *b"tst", &[(42u8, 1), (43u8, 1)])]).unwrap(); - while !handler.got_timeout() { - thread::sleep(Duration::from_millis(50)); - } -} diff --git a/substrate/core/network/Cargo.toml b/substrate/core/network/Cargo.toml index d124359765..28330229d5 100644 --- a/substrate/core/network/Cargo.toml +++ b/substrate/core/network/Cargo.toml @@ -22,6 +22,7 @@ sr-primitives = { path = "../../core/sr-primitives" } parity-codec = "2.0" parity-codec-derive = "2.0" substrate-network-libp2p = { path = "../../core/network-libp2p" } +tokio = "0.1.11" env_logger = { version = "0.4", optional = true } substrate-keyring = { path = "../../core/keyring", optional = true } diff --git a/substrate/core/network/src/io.rs b/substrate/core/network/src/io.rs index 4a7d57b158..67c09ab436 100644 --- a/substrate/core/network/src/io.rs +++ b/substrate/core/network/src/io.rs @@ -14,7 +14,9 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . -use network_libp2p::{NetworkContext, Severity, NodeIndex, SessionInfo}; +use parking_lot::Mutex; +use network_libp2p::{Service, Severity, NodeIndex, PeerId, ProtocolId}; +use std::sync::Arc; /// IO interface for the syncing handler. /// Provides peer connection management and an interface to the blockchain client. @@ -24,49 +26,54 @@ pub trait SyncIo { /// Send a packet to a peer. fn send(&mut self, who: NodeIndex, data: Vec); /// Returns peer identifier string - fn peer_info(&self, who: NodeIndex) -> String { + fn peer_debug_info(&self, who: NodeIndex) -> String { who.to_string() } /// Returns information on p2p session - fn peer_session_info(&self, who: NodeIndex) -> Option; - /// Check if the session is expired - fn is_expired(&self) -> bool; + fn peer_id(&self, who: NodeIndex) -> Option; } -/// Wraps `NetworkContext` and the blockchain client +/// Wraps the network service. pub struct NetSyncIo<'s> { - network: &'s NetworkContext, + network: &'s Arc>, + protocol: ProtocolId, } impl<'s> NetSyncIo<'s> { - /// Creates a new instance from the `NetworkContext` and the blockchain client reference. - pub fn new(network: &'s NetworkContext) -> NetSyncIo<'s> { + /// Creates a new instance. + pub fn new(network: &'s Arc>, protocol: ProtocolId) -> NetSyncIo<'s> { NetSyncIo { - network: network, + network, + protocol, } } } impl<'s> SyncIo for NetSyncIo<'s> { fn report_peer(&mut self, who: NodeIndex, reason: Severity) { - self.network.report_peer(who, reason); + info!("Purposefully dropping {} ; reason: {:?}", who, reason); + match reason { + Severity::Bad(_) => self.network.lock().ban_node(who), + Severity::Useless(_) => self.network.lock().drop_node(who), + Severity::Timeout => self.network.lock().drop_node(who), + } } fn send(&mut self, who: NodeIndex, data: Vec) { - self.network.send(who, 0, data) + self.network.lock().send_custom_message(who, self.protocol, data) } - fn peer_session_info(&self, who: NodeIndex) -> Option { - self.network.session_info(who) + fn peer_id(&self, who: NodeIndex) -> Option { + let net = self.network.lock(); + net.peer_id_of_node(who).cloned() } - fn is_expired(&self) -> bool { - self.network.is_expired() - } - - fn peer_info(&self, who: NodeIndex) -> String { - self.network.peer_client_version(who) + fn peer_debug_info(&self, who: NodeIndex) -> String { + let net = self.network.lock(); + if let (Some(peer_id), Some(addr)) = (net.peer_id_of_node(who), net.node_endpoint(who)) { + format!("{:?} through {:?}", peer_id, addr) + } else { + "unknown".to_string() + } } } - - diff --git a/substrate/core/network/src/lib.rs b/substrate/core/network/src/lib.rs index 7da1728f0e..bf2ac25a88 100644 --- a/substrate/core/network/src/lib.rs +++ b/substrate/core/network/src/lib.rs @@ -32,6 +32,7 @@ extern crate parity_codec as codec; extern crate futures; extern crate rustc_hex; extern crate rand; +extern crate tokio; #[macro_use] extern crate log; #[macro_use] extern crate bitflags; #[macro_use] extern crate error_chain; diff --git a/substrate/core/network/src/protocol.rs b/substrate/core/network/src/protocol.rs index 9d31741229..b91a07ee90 100644 --- a/substrate/core/network/src/protocol.rs +++ b/substrate/core/network/src/protocol.rs @@ -41,8 +41,6 @@ const REQUEST_TIMEOUT_SEC: u64 = 40; /// Current protocol version. pub (crate) const CURRENT_VERSION: u32 = 1; -/// Current packet count. -pub (crate) const CURRENT_PACKET_COUNT: u8 = 1; // Maximum allowed entries in `BlockResponse` const MAX_BLOCK_DATA_RESPONSE: u32 = 128; @@ -289,14 +287,14 @@ impl, H: ExHashT> Protocol { /// Called when a new peer is connected pub fn on_peer_connected(&self, io: &mut SyncIo, who: NodeIndex) { - trace!(target: "sync", "Connected {}: {}", who, io.peer_info(who)); + trace!(target: "sync", "Connected {}: {}", who, io.peer_debug_info(who)); self.handshaking_peers.write().insert(who, time::Instant::now()); self.send_status(io, who); } /// Called by peer when it is disconnecting pub fn on_peer_disconnected(&self, io: &mut SyncIo, peer: NodeIndex) { - trace!(target: "sync", "Disconnecting {}: {}", peer, io.peer_info(peer)); + trace!(target: "sync", "Disconnecting {}: {}", peer, io.peer_debug_info(peer)); // lock all the the peer lists so that add/remove peer events are in order let mut sync = self.sync.write(); @@ -420,16 +418,12 @@ impl, H: ExHashT> Protocol { /// Called by peer to report status fn on_status_message(&self, io: &mut SyncIo, who: NodeIndex, status: message::Status) { trace!(target: "sync", "New peer {} {:?}", who, status); - if io.is_expired() { - trace!(target: "sync", "Status packet from expired session {}:{}", who, io.peer_info(who)); - return; - } { let mut peers = self.context_data.peers.write(); let mut handshaking_peers = self.handshaking_peers.write(); if peers.contains_key(&who) { - debug!(target: "sync", "Unexpected status packet from {}:{}", who, io.peer_info(who)); + debug!(target: "sync", "Unexpected status packet from {}:{}", who, io.peer_debug_info(who)); return; } if status.genesis_hash != self.genesis_hash { @@ -464,7 +458,7 @@ impl, H: ExHashT> Protocol { }; peers.insert(who.clone(), peer); handshaking_peers.remove(&who); - debug!(target: "sync", "Connected {} {}", who, io.peer_info(who)); + debug!(target: "sync", "Connected {} {}", who, io.peer_debug_info(who)); } let mut context = ProtocolContext::new(&self.context_data, io); @@ -514,9 +508,7 @@ impl, H: ExHashT> Protocol { .unzip(); if !to_send.is_empty() { - let node_id = io.peer_session_info(*who) - .map(|info| format!("{}@{:?}", info.remote_address, info.id)); - + let node_id = io.peer_id(*who).map(|id| id.to_base58()); if let Some(id) = node_id { for hash in hashes { propagated_to.entry(hash).or_insert_with(Vec::new).push(id.clone()); diff --git a/substrate/core/network/src/service.rs b/substrate/core/network/src/service.rs index 9d35e653b7..b90a2fb72f 100644 --- a/substrate/core/network/src/service.rs +++ b/substrate/core/network/src/service.rs @@ -16,14 +16,15 @@ use std::collections::HashMap; use std::sync::Arc; -use std::io; +use std::{io, thread}; use std::time::Duration; -use futures::sync::{oneshot, mpsc}; -use network_libp2p::{NetworkProtocolHandler, NetworkContext, NodeIndex, ProtocolId, -NetworkConfiguration , NonReservedPeerMode, ErrorKind}; -use network_libp2p::{NetworkService, PeerId}; +use futures::{self, Future, Stream, stream, sync::{oneshot, mpsc}}; +use parking_lot::Mutex; +use network_libp2p::{ProtocolId, PeerId, NetworkConfiguration, ErrorKind}; +use network_libp2p::{start_service, Service as NetworkService, ServiceEvent as NetworkServiceEvent}; +use network_libp2p::{RegisteredProtocol, parse_str_addr, Protocol as Libp2pProtocol}; use io::NetSyncIo; -use protocol::{Protocol, ProtocolContext, Context, ProtocolStatus, PeerInfo as ProtocolPeerInfo}; +use protocol::{self, Protocol, ProtocolContext, Context, ProtocolStatus}; use config::{ProtocolConfig}; use error::Error; use chain::Client; @@ -32,18 +33,14 @@ use specialization::Specialization; use on_demand::OnDemandService; use import_queue::AsyncImportQueue; use runtime_primitives::traits::{Block as BlockT}; +use tokio::{runtime::Runtime, timer::Interval}; /// Type that represents fetch completion future. pub type FetchFuture = oneshot::Receiver>; /// Type that represents bft messages stream. pub type BftMessageStream = mpsc::UnboundedReceiver>; -type TimerToken = usize; - -const TICK_TOKEN: TimerToken = 0; const TICK_TIMEOUT: Duration = Duration::from_millis(1000); - -const PROPAGATE_TOKEN: TimerToken = 1; const PROPAGATE_TIMEOUT: Duration = Duration::from_millis(5000); bitflags! { @@ -76,8 +73,6 @@ impl ::codec::Decode for Roles { pub trait SyncProvider: Send + Sync { /// Get sync status fn status(&self) -> ProtocolStatus; - /// Get peers information - fn peers(&self) -> Vec>; /// Get this node id if available. fn node_id(&self) -> Option; } @@ -113,28 +108,6 @@ pub trait ExecuteInContext: Send + Sync { fn execute_in_context)>(&self, closure: F); } -/// Network protocol handler -struct ProtocolHandler, H: ExHashT> { - protocol: Protocol, -} - -/// Peer connection information -#[derive(Debug)] -pub struct PeerInfo { - /// Public node id - pub id: PeerId, - /// Node client ID - pub client_version: String, - /// Capabilities - pub capabilities: Vec, - /// Remote endpoint address - pub remote_address: String, - /// Local endpoint address - pub local_address: String, - /// Dot protocol info. - pub dot_info: Option>, -} - /// Service initialization parameters. pub struct Params { /// Configuration. @@ -154,50 +127,43 @@ pub struct Params { /// Substrate network service. Handles network IO and manages connectivity. pub struct Service, H: ExHashT> { /// Network service - network: NetworkService, - /// Devp2p protocol handler - handler: Arc>, - /// Devp2p protocol ID. + network: Arc>, + /// Protocol handler + handler: Arc>, + /// Protocol ID. protocol_id: ProtocolId, + /// Sender for messages to the backgound service task, and handle for the background thread. + /// Dropping the sender should close the task and the thread. + /// This is an `Option` because we need to extract it in the destructor. + bg_thread: Option<(oneshot::Sender<()>, thread::JoinHandle<()>)>, } impl, H: ExHashT> Service { /// Creates and register protocol with the network service pub fn new(params: Params, protocol_id: ProtocolId) -> Result>, Error> { let chain = params.chain.clone(); - // TODO: non-insatnt finality. + // TODO: non-instant finality. let import_queue = Arc::new(AsyncImportQueue::new(true)); - let handler = Arc::new(ProtocolHandler { - protocol: Protocol::new( - params.config, - params.chain, - import_queue.clone(), - params.on_demand, - params.transaction_pool, - params.specialization, - )?, - }); - let versions = [(::protocol::CURRENT_VERSION as u8, ::protocol::CURRENT_PACKET_COUNT)]; - let protocols = vec![(handler.clone() as Arc<_>, protocol_id, &versions[..])]; - let service = match NetworkService::new(params.network_config.clone(), protocols) { - Ok(service) => service, - Err(err) => { - match err.kind() { - ErrorKind::Io(ref e) if e.kind() == io::ErrorKind::AddrInUse => - warn!("Network port is already in use, make sure that another instance of Substrate client is not running or change the port using the --port option."), - _ => warn!("Error starting network: {}", err), - }; - return Err(err.into()) - }, - }; + let handler = Arc::new(Protocol::new( + params.config, + params.chain, + import_queue.clone(), + params.on_demand, + params.transaction_pool, + params.specialization, + )?); + let versions = [(protocol::CURRENT_VERSION as u8)]; + let registered = RegisteredProtocol::new(protocol_id, &versions[..]); + let (thread, network) = start_thread(params.network_config, handler.clone(), registered)?; let sync = Arc::new(Service { - network: service, + network, protocol_id, handler, + bg_thread: Some(thread), }); import_queue.start( - Arc::downgrade(sync.handler.protocol.sync()), + Arc::downgrade(sync.handler.sync()), Arc::downgrade(&sync), Arc::downgrade(&chain) )?; @@ -207,106 +173,57 @@ impl, H: ExHashT> Service { /// Called when a new block is imported by the client. pub fn on_block_imported(&self, hash: B::Hash, header: &B::Header) { - self.network.with_context(self.protocol_id, |context| { - self.handler.protocol.on_block_imported(&mut NetSyncIo::new(context), hash, header) - }); + self.handler.on_block_imported(&mut NetSyncIo::new(&self.network, self.protocol_id), hash, header) } /// Called when new transactons are imported by the client. pub fn trigger_repropagate(&self) { - self.network.with_context(self.protocol_id, |context| { - self.handler.protocol.propagate_extrinsics(&mut NetSyncIo::new(context)); - }); + self.handler.propagate_extrinsics(&mut NetSyncIo::new(&self.network, self.protocol_id)); } /// Execute a closure with the chain-specific network specialization. - /// If the network is unavailable, this will return `None`. - pub fn with_spec(&self, f: F) -> Option + pub fn with_spec(&self, f: F) -> U where F: FnOnce(&mut S, &mut Context) -> U { - let mut res = None; - self.network.with_context(self.protocol_id, |context| { - res = Some(self.handler.protocol.with_spec(&mut NetSyncIo::new(context), f)) - }); - - res + self.handler.with_spec(&mut NetSyncIo::new(&self.network, self.protocol_id), f) } } impl, H:ExHashT> Drop for Service { fn drop(&mut self) { - self.handler.protocol.stop(); + self.handler.stop(); + if let Some((sender, join)) = self.bg_thread.take() { + let _ = sender.send(()); + if let Err(e) = join.join() { + error!("Error while waiting on background thread: {:?}", e); + } + } } } + impl, H: ExHashT> ExecuteInContext for Service { fn execute_in_context)>(&self, closure: F) { - self.network.with_context(self.protocol_id, |context| { - closure(&mut ProtocolContext::new(self.handler.protocol.context_data(), &mut NetSyncIo::new(context))) - }); + closure(&mut ProtocolContext::new(self.handler.context_data(), &mut NetSyncIo::new(&self.network, self.protocol_id))) } } impl, H: ExHashT> SyncProvider for Service { /// Get sync status fn status(&self) -> ProtocolStatus { - self.handler.protocol.status() - } - - /// Get sync peers - fn peers(&self) -> Vec> { - self.network.with_context_eval(self.protocol_id, |ctx| { - let peer_ids = self.network.connected_peers(); - - peer_ids.into_iter().filter_map(|who| { - let session_info = match ctx.session_info(who) { - None => return None, - Some(info) => info, - }; - - Some(PeerInfo { - id: session_info.id, - client_version: session_info.client_version, - capabilities: session_info.peer_capabilities.into_iter().map(|c| c.to_string()).collect(), - remote_address: session_info.remote_address, - local_address: session_info.local_address, - dot_info: self.handler.protocol.peer_info(who), - }) - }).collect() - }).unwrap_or_else(Vec::new) + self.handler.status() } fn node_id(&self) -> Option { - self.network.external_url() - } -} - -impl, H: ExHashT> NetworkProtocolHandler for ProtocolHandler { - fn initialize(&self, io: &NetworkContext) { - io.register_timer(TICK_TOKEN, TICK_TIMEOUT) - .expect("Error registering sync timer"); - - io.register_timer(PROPAGATE_TOKEN, PROPAGATE_TIMEOUT) - .expect("Error registering transaction propagation timer"); - } - - fn read(&self, io: &NetworkContext, peer: &NodeIndex, _packet_id: u8, data: &[u8]) { - self.protocol.handle_packet(&mut NetSyncIo::new(io), *peer, data); - } - - fn connected(&self, io: &NetworkContext, peer: &NodeIndex) { - self.protocol.on_peer_connected(&mut NetSyncIo::new(io), *peer); - } - - fn disconnected(&self, io: &NetworkContext, peer: &NodeIndex) { - self.protocol.on_peer_disconnected(&mut NetSyncIo::new(io), *peer); - } - - fn timeout(&self, io: &NetworkContext, timer: TimerToken) { - match timer { - TICK_TOKEN => self.protocol.tick(&mut NetSyncIo::new(io)), - PROPAGATE_TOKEN => self.protocol.propagate_extrinsics(&mut NetSyncIo::new(io)), - _ => {} - } + let network = self.network.lock(); + let ret = network + .listeners() + .next() + .map(|addr| { + let mut addr = addr.clone(); + addr.append(Libp2pProtocol::P2p(network.peer_id().clone().into())); + addr.to_string() + }); + ret } } @@ -317,26 +234,170 @@ pub trait ManageNetwork: Send + Sync { /// Set to deny unreserved peers to connect fn deny_unreserved_peers(&self); /// Remove reservation for the peer - fn remove_reserved_peer(&self, peer: String) -> Result<(), String>; + fn remove_reserved_peer(&self, peer: PeerId); /// Add reserved peer fn add_reserved_peer(&self, peer: String) -> Result<(), String>; } - impl, H: ExHashT> ManageNetwork for Service { fn accept_unreserved_peers(&self) { - self.network.set_non_reserved_mode(NonReservedPeerMode::Accept); + self.network.lock().accept_unreserved_peers(); } fn deny_unreserved_peers(&self) { - self.network.set_non_reserved_mode(NonReservedPeerMode::Deny); + // This method can disconnect nodes, in which case we have to properly close them in the + // protocol. + let disconnected = self.network.lock().deny_unreserved_peers(); + let mut net_sync = NetSyncIo::new(&self.network, self.protocol_id); + for node_index in disconnected { + self.handler.on_peer_disconnected(&mut net_sync, node_index) + } } - fn remove_reserved_peer(&self, peer: String) -> Result<(), String> { - self.network.remove_reserved_peer(&peer).map_err(|e| format!("{:?}", e)) + fn remove_reserved_peer(&self, peer: PeerId) { + // This method can disconnect a node, in which case we have to properly close it in the + // protocol. + let disconnected = self.network.lock().remove_reserved_peer(peer); + if let Some(node_index) = disconnected { + let mut net_sync = NetSyncIo::new(&self.network, self.protocol_id); + self.handler.on_peer_disconnected(&mut net_sync, node_index) + } } fn add_reserved_peer(&self, peer: String) -> Result<(), String> { - self.network.add_reserved_peer(&peer).map_err(|e| format!("{:?}", e)) + let (addr, peer_id) = parse_str_addr(&peer).map_err(|e| format!("{:?}", e))?; + self.network.lock().add_reserved_peer(addr, peer_id); + Ok(()) } } + +/// Starts the background thread that handles the networking. +fn start_thread, H: ExHashT>( + config: NetworkConfiguration, + protocol: Arc>, + registered: RegisteredProtocol, +) -> Result<((oneshot::Sender<()>, thread::JoinHandle<()>), Arc>), Error> { + let protocol_id = registered.id(); + + // Start the main service. + let service = match start_service(config, Some(registered)) { + Ok(service) => Arc::new(Mutex::new(service)), + Err(err) => { + match err.kind() { + ErrorKind::Io(ref e) if e.kind() == io::ErrorKind::AddrInUse => + warn!("Network port is already in use, make sure that another instance of Substrate client is not running or change the port using the --port option."), + _ => warn!("Error starting network: {}", err), + }; + return Err(err.into()) + }, + }; + + let (close_tx, close_rx) = oneshot::channel(); + let service_clone = service.clone(); + let mut runtime = Runtime::new()?; + let thread = thread::Builder::new().name("network".to_string()).spawn(move || { + let fut = run_thread(service_clone, protocol, protocol_id) + .select(close_rx.then(|_| Ok(()))) + .map(|(val, _)| val) + .map_err(|(err,_ )| err); + + // Note that we use `block_on` and not `block_on_all` because we want to kill the thread + // instantly if `close_rx` receives something. + match runtime.block_on(fut) { + Ok(()) => debug!(target: "sub-libp2p", "Networking thread finished"), + Err(err) => error!(target: "sub-libp2p", "Error while running libp2p: {:?}", err), + }; + })?; + + Ok(((close_tx, thread), service)) +} + +/// Runs the background thread that handles the networking. +fn run_thread, H: ExHashT>( + network_service: Arc>, + protocol: Arc>, + protocol_id: ProtocolId, +) -> impl Future { + // Interval for performing maintenance on the protocol handler. + let tick = Interval::new_interval(TICK_TIMEOUT) + .for_each({ + let protocol = protocol.clone(); + let network_service = network_service.clone(); + move |_| { + protocol.tick(&mut NetSyncIo::new(&network_service, protocol_id)); + Ok(()) + } + }) + .then(|res| { + match res { + Ok(()) => (), + Err(err) => error!("Error in the propagation timer: {:?}", err), + }; + Ok(()) + }); + + // Interval at which we gossip extrinsics over the network. + let propagate = Interval::new_interval(PROPAGATE_TIMEOUT) + .for_each({ + let protocol = protocol.clone(); + let network_service = network_service.clone(); + move |_| { + protocol.propagate_extrinsics(&mut NetSyncIo::new(&network_service, protocol_id)); + Ok(()) + } + }) + .then(|res| { + match res { + Ok(()) => (), + Err(err) => error!("Error in the propagation timer: {:?}", err), + }; + Ok(()) + }); + + // The network service produces events about what happens on the network. Let's process them. + let network_service2 = network_service.clone(); + let network = stream::poll_fn(move || network_service2.lock().poll()).for_each(move |event| { + let mut net_sync = NetSyncIo::new(&network_service, protocol_id); + + match event { + NetworkServiceEvent::NodeClosed { node_index, closed_custom_protocols } => { + if !closed_custom_protocols.is_empty() { + debug_assert_eq!(closed_custom_protocols, &[protocol_id]); + protocol.on_peer_disconnected(&mut net_sync, node_index); + } + } + NetworkServiceEvent::ClosedCustomProtocols { node_index, protocols } => { + if !protocols.is_empty() { + debug_assert_eq!(protocols, &[protocol_id]); + protocol.on_peer_disconnected(&mut net_sync, node_index); + } + } + NetworkServiceEvent::OpenedCustomProtocol { node_index, version, .. } => { + debug_assert_eq!(version, protocol::CURRENT_VERSION as u8); + protocol.on_peer_connected(&mut net_sync, node_index); + } + NetworkServiceEvent::ClosedCustomProtocol { node_index, .. } => { + protocol.on_peer_disconnected(&mut net_sync, node_index); + } + NetworkServiceEvent::CustomMessage { node_index, data, .. } => { + protocol.handle_packet(&mut net_sync, node_index, &data); + } + }; + + Ok(()) + }); + + // Merge all futures into one. + let futures: Vec + Send>> = vec![ + Box::new(tick) as Box<_>, + Box::new(propagate) as Box<_>, + Box::new(network) as Box<_> + ]; + + futures::select_all(futures) + .and_then(move |_| { + debug!("Networking ended"); + Ok(()) + }) + .map_err(|(r, _, _)| r) +} diff --git a/substrate/core/network/src/test/mod.rs b/substrate/core/network/src/test/mod.rs index 89ea306be6..bff7cd9f65 100644 --- a/substrate/core/network/src/test/mod.rs +++ b/substrate/core/network/src/test/mod.rs @@ -31,7 +31,7 @@ use protocol::{Context, Protocol}; use primitives::{Blake2Hasher}; use config::ProtocolConfig; use service::TransactionPool; -use network_libp2p::{NodeIndex, SessionInfo, Severity}; +use network_libp2p::{NodeIndex, PeerId, Severity}; use keyring::Keyring; use codec::{Encode, Decode}; use import_queue::SyncImportQueue; @@ -79,7 +79,6 @@ pub struct TestIo<'p> { queue: &'p RwLock>, pub to_disconnect: HashSet, packets: Vec, - peers_info: HashMap, _sender: Option, } @@ -90,7 +89,6 @@ impl<'p> TestIo<'p> where { _sender: sender, to_disconnect: HashSet::new(), packets: Vec::new(), - peers_info: HashMap::new(), } } } @@ -106,10 +104,6 @@ impl<'p> SyncIo for TestIo<'p> { self.to_disconnect.insert(who); } - fn is_expired(&self) -> bool { - false - } - fn send(&mut self, who: NodeIndex, data: Vec) { self.packets.push(TestPacket { data: data, @@ -117,13 +111,11 @@ impl<'p> SyncIo for TestIo<'p> { }); } - fn peer_info(&self, who: NodeIndex) -> String { - self.peers_info.get(&who) - .cloned() - .unwrap_or_else(|| who.to_string()) + fn peer_debug_info(&self, _who: NodeIndex) -> String { + "unknown".to_string() } - fn peer_session_info(&self, _peer_id: NodeIndex) -> Option { + fn peer_id(&self, _peer_id: NodeIndex) -> Option { None } } diff --git a/substrate/node/network/src/consensus.rs b/substrate/node/network/src/consensus.rs index 33814c7126..e4c111d625 100644 --- a/substrate/node/network/src/consensus.rs +++ b/substrate/node/network/src/consensus.rs @@ -284,18 +284,10 @@ impl Network for ConsensusNetwork

{ } }); - match process_task { - Some(task) => - if let Err(e) = Executor::spawn(&mut task_executor, Box::new(task)) { - debug!(target: "node-network", "Cannot spawn message processing: {:?}", e) - }, - None => warn!(target: "node-network", "Cannot process incoming messages: network appears to be down"), + if let Err(e) = Executor::spawn(&mut task_executor, Box::new(process_task)) { + debug!(target: "node-network", "Cannot spawn message processing: {:?}", e) } (InputAdapter { input: bft_recv }, sink) } } - -/// Error when the network appears to be down. -#[derive(Clone, Copy, Debug, PartialEq, Eq)] -pub struct NetworkDown;