Clean up the networking layer (#881)

This commit is contained in:
Pierre Krieger
2018-10-08 18:20:38 +02:00
committed by Gav Wood
parent 02f8897648
commit 0666759b16
18 changed files with 461 additions and 1519 deletions
+38 -33
View File
@@ -46,7 +46,7 @@ dependencies = [
"log 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)",
"parking_lot 0.5.5 (registry+https://github.com/rust-lang/crates.io-index)",
"quick-error 1.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-executor 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-executor 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-io 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-timer 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)",
]
@@ -1063,7 +1063,7 @@ dependencies = [
"multiaddr 0.3.0 (git+https://github.com/tomaka/libp2p-rs?rev=8111062f0177fd7423626f2db9560273644a4c4d)",
"stdweb 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-codec 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-current-thread 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-current-thread 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-io 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
]
@@ -1085,7 +1085,7 @@ dependencies = [
"quick-error 1.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
"rw-stream-sink 0.1.0 (git+https://github.com/tomaka/libp2p-rs?rev=8111062f0177fd7423626f2db9560273644a4c4d)",
"smallvec 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-executor 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-executor 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-io 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
"void 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
]
@@ -1227,7 +1227,7 @@ dependencies = [
"futures 0.1.24 (registry+https://github.com/rust-lang/crates.io-index)",
"libp2p-core 0.1.0 (git+https://github.com/tomaka/libp2p-rs?rev=8111062f0177fd7423626f2db9560273644a4c4d)",
"log 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-executor 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-executor 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-io 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
]
@@ -1585,7 +1585,7 @@ dependencies = [
"log 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)",
"node-service 0.1.0",
"substrate-cli 0.3.0",
"tokio 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@@ -1608,7 +1608,7 @@ dependencies = [
"substrate-client 0.1.0",
"substrate-keyring 0.1.0",
"substrate-primitives 0.1.0",
"tokio 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@@ -1651,7 +1651,7 @@ dependencies = [
"substrate-bft 0.1.0",
"substrate-network 0.1.0",
"substrate-primitives 0.1.0",
"tokio 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@@ -1730,7 +1730,7 @@ dependencies = [
"substrate-service-test 0.3.0",
"substrate-telemetry 0.3.0",
"substrate-test-client 0.1.0",
"tokio 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@@ -2839,7 +2839,7 @@ dependencies = [
"substrate-executor 0.1.0",
"substrate-keyring 0.1.0",
"substrate-primitives 0.1.0",
"tokio 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@@ -2870,7 +2870,7 @@ dependencies = [
"substrate-telemetry 0.3.0",
"sysinfo 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)",
"time 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@@ -2961,7 +2961,7 @@ dependencies = [
"substrate-keyring 0.1.0",
"substrate-network 0.1.0",
"substrate-primitives 0.1.0",
"tokio 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@@ -3033,6 +3033,7 @@ dependencies = [
"substrate-network-libp2p 0.1.0",
"substrate-primitives 0.1.0",
"substrate-test-client 0.1.0",
"tokio 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@@ -3053,7 +3054,7 @@ dependencies = [
"serde 1.0.70 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_derive 1.0.70 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_json 1.0.24 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-io 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-timer 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)",
"unsigned-varint 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -3107,7 +3108,7 @@ dependencies = [
"substrate-primitives 0.1.0",
"substrate-test-client 0.1.0",
"substrate-transaction-pool 0.1.0",
"tokio 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@@ -3159,7 +3160,7 @@ dependencies = [
"substrate-telemetry 0.3.0",
"substrate-transaction-pool 0.1.0",
"target_info 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@@ -3176,7 +3177,7 @@ dependencies = [
"substrate-primitives 0.1.0",
"substrate-service 0.3.0",
"tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@@ -3455,25 +3456,29 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"futures 0.1.24 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-io 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "tokio"
version = "0.1.7"
version = "0.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"bytes 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.24 (registry+https://github.com/rust-lang/crates.io-index)",
"mio 0.6.14 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-executor 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-fs 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-codec 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-current-thread 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-executor 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-fs 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-io 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-reactor 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-tcp 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-threadpool 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-timer 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-udp 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-uds 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@@ -3497,8 +3502,8 @@ dependencies = [
"log 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)",
"mio 0.6.14 (registry+https://github.com/rust-lang/crates.io-index)",
"scoped-tls 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-executor 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-executor 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-io 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-reactor 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-timer 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -3506,11 +3511,11 @@ dependencies = [
[[package]]
name = "tokio-current-thread"
version = "0.1.0"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"futures 0.1.24 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-executor 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-executor 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@@ -3521,12 +3526,12 @@ dependencies = [
"futures 0.1.24 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-cpupool 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "tokio-executor"
version = "0.1.4"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"futures 0.1.24 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -3534,7 +3539,7 @@ dependencies = [
[[package]]
name = "tokio-fs"
version = "0.1.0"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"futures 0.1.24 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -3578,7 +3583,7 @@ dependencies = [
"log 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)",
"mio 0.6.14 (registry+https://github.com/rust-lang/crates.io-index)",
"slab 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-executor 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-executor 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-io 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
]
@@ -3613,7 +3618,7 @@ dependencies = [
"log 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)",
"num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-executor 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-executor 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@@ -3624,7 +3629,7 @@ dependencies = [
"crossbeam-utils 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.24 (registry+https://github.com/rust-lang/crates.io-index)",
"slab 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-executor 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-executor 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@@ -4355,13 +4360,13 @@ dependencies = [
"checksum time 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)" = "d825be0eb33fda1a7e68012d51e9c7f451dc1a69391e7fdc197060bb8c56667b"
"checksum tiny-keccak 1.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e9175261fbdb60781fcd388a4d6cc7e14764a2b629a7ad94abb439aed223a44f"
"checksum tk-listen 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "dec7ba6a80b7695fc2abb21af18bed445a362ffd80b64704771ce142d6d2151d"
"checksum tokio 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "8ee337e5f4e501fc32966fec6fe0ca0cc1c237b0b1b14a335f8bfe3c5f06e286"
"checksum tokio 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)" = "6e93c78d23cc61aa245a8acd2c4a79c4d7fa7fb5c3ca90d5737029f043a84895"
"checksum tokio-codec 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "881e9645b81c2ce95fcb799ded2c29ffb9f25ef5bef909089a420e5961dd8ccb"
"checksum tokio-core 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)" = "aeeffbbb94209023feaef3c196a41cbcdafa06b4a6f893f68779bb5e53796f71"
"checksum tokio-current-thread 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "9f785265962bde425bf3b77dd6abac6674b8c6d5e8831427383aa9c56c5210e1"
"checksum tokio-current-thread 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "f90fcd90952f0a496d438a976afba8e5c205fb12123f813d8ab3aa1c8436638c"
"checksum tokio-dns-unofficial 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "bb9bf62ca2c53bf2f2faec3e48a98b6d8c9577c27011cb0203a4beacdc8ab328"
"checksum tokio-executor 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "84823b932d566bc3c6aa644df4ca36cb38593c50b7db06011fd4e12e31e4047e"
"checksum tokio-fs 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "76766830bbf9a2d5bfb50c95350d56a2e79e2c80f675967fff448bc615899708"
"checksum tokio-executor 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "c117b6cf86bb730aab4834f10df96e4dd586eff2c3c27d3781348da49e255bde"
"checksum tokio-fs 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "b5cbe4ca6e71cb0b62a66e4e6f53a8c06a6eefe46cc5f665ad6f274c9906f135"
"checksum tokio-io 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "8d6cc2de7725863c86ac71b0b9068476fec50834f055a243558ef1655bbd34cb"
"checksum tokio-proto 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8fbb47ae81353c63c487030659494b295f6cb6576242f907f203473b191b0389"
"checksum tokio-reactor 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b3cedc8e5af5131dc3423ffa4f877cce78ad25259a9a62de0613735a13ebc64b"
@@ -1,31 +0,0 @@
// Copyright 2015-2018 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
//! Connection filter trait.
use super::NodeId;
/// Filtered connection direction.
pub enum ConnectionDirection {
Inbound,
Outbound,
}
/// Connection filter. Each connection is checked against `connection_allowed`.
pub trait ConnectionFilter : Send + Sync {
/// Filter a connection. Returns `true` if connection should be allowed. `false` if rejected.
fn connection_allowed(&self, own_id: &NodeId, connecting_id: &NodeId, direction: ConnectionDirection) -> bool;
}
@@ -14,12 +14,10 @@
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use bytes::{Bytes, BytesMut};
use bytes::Bytes;
use libp2p::core::{Multiaddr, ConnectionUpgrade, Endpoint};
use libp2p::tokio_codec::Framed;
use std::collections::VecDeque;
use std::io::Error as IoError;
use std::vec::IntoIter as VecIntoIter;
use std::{collections::VecDeque, io, vec::IntoIter as VecIntoIter};
use futures::{prelude::*, future, stream, task};
use tokio_io::{AsyncRead, AsyncWrite};
use unsigned_varint::codec::UviBytes;
@@ -30,24 +28,21 @@ use ProtocolId;
/// Note that "a single protocol" here refers to `par` for example. However
/// each protocol can have multiple different versions for networking purposes.
#[derive(Clone)]
pub struct RegisteredProtocol<TUserData> {
pub struct RegisteredProtocol {
/// Id of the protocol for API purposes.
id: ProtocolId,
/// Base name of the protocol as advertised on the network.
/// Ends with `/` so that we can append a version number behind.
base_name: Bytes,
/// List of protocol versions that we support, plus their packet count.
/// List of protocol versions that we support.
/// Ordered in descending order so that the best comes first.
/// The packet count is used to filter out invalid messages.
supported_versions: Vec<(u8, u8)>,
/// Custom data.
custom_data: TUserData,
supported_versions: Vec<u8>,
}
impl<TUserData> RegisteredProtocol<TUserData> {
impl RegisteredProtocol {
/// Creates a new `RegisteredProtocol`. The `custom_data` parameter will be
/// passed inside the `RegisteredProtocolOutput`.
pub fn new(custom_data: TUserData, protocol: ProtocolId, versions: &[(u8, u8)])
pub fn new(protocol: ProtocolId, versions: &[u8])
-> Self {
let mut base_name = Bytes::from_static(b"/substrate/");
base_name.extend_from_slice(&protocol);
@@ -57,11 +52,10 @@ impl<TUserData> RegisteredProtocol<TUserData> {
base_name,
id: protocol,
supported_versions: {
let mut tmp: Vec<_> = versions.iter().rev().cloned().collect();
tmp.sort_unstable_by(|a, b| b.1.cmp(&a.1));
let mut tmp = versions.to_vec();
tmp.sort_unstable_by(|a, b| b.cmp(&a));
tmp
},
custom_data,
}
}
@@ -70,12 +64,6 @@ impl<TUserData> RegisteredProtocol<TUserData> {
pub fn id(&self) -> ProtocolId {
self.id
}
/// Returns the custom data that was passed to `new`.
#[inline]
pub fn custom_data(&self) -> &TUserData {
&self.custom_data
}
}
/// Output of a `RegisteredProtocol` upgrade.
@@ -88,8 +76,6 @@ pub struct RegisteredProtocolSubstream<TSubstream> {
requires_poll_complete: bool,
/// The underlying substream.
inner: stream::Fuse<Framed<TSubstream, UviBytes<Bytes>>>,
/// Maximum packet id.
packet_count: u8,
/// Id of the protocol.
protocol_id: ProtocolId,
/// Version of the protocol that was negotiated.
@@ -98,15 +84,6 @@ pub struct RegisteredProtocolSubstream<TSubstream> {
to_notify: Option<task::Task>,
}
/// Packet of data that can be sent or received.
#[derive(Debug, Clone)]
pub struct Packet {
/// Identifier of the packet.
pub id: u8,
/// The raw data.
pub data: Bytes,
}
impl<TSubstream> RegisteredProtocolSubstream<TSubstream> {
/// Returns the protocol id.
#[inline]
@@ -134,14 +111,10 @@ impl<TSubstream> RegisteredProtocolSubstream<TSubstream> {
}
/// Sends a message to the substream.
pub fn send_message(&mut self, Packet { id: packet_id, data }: Packet) {
if packet_id >= self.packet_count {
error!(target: "sub-libp2p", "Tried to send a packet with an invalid ID {}", packet_id);
return;
}
pub fn send_message(&mut self, data: Bytes) {
// TODO: remove the packet id system
let mut message = Bytes::with_capacity(1 + data.len());
message.extend_from_slice(&[packet_id]);
message.extend_from_slice(&[0]);
message.extend_from_slice(&data);
self.send_queue.push_back(message);
@@ -156,36 +129,13 @@ impl<TSubstream> RegisteredProtocolSubstream<TSubstream> {
task.notify();
}
}
/// Turns raw data into a packet and checks whether it is valid.
fn data_to_packet(&self, mut data: BytesMut) -> Result<Packet, ()> {
// The `data` should be prefixed by the packet ID, therefore an empty packet is invalid.
if data.is_empty() {
debug!(target: "sub-libp2p", "ignoring incoming packet because it was empty");
return Err(());
}
let packet = {
let id = data[0];
let data = data.split_off(1);
Packet { id, data: data.freeze() }
};
if packet.id >= self.packet_count {
debug!(target: "sub-libp2p", "ignoring incoming packet because packet_id {} is \
too large", packet.id);
return Err(())
}
Ok(packet)
}
}
impl<TSubstream> Stream for RegisteredProtocolSubstream<TSubstream>
where TSubstream: AsyncRead + AsyncWrite,
{
type Item = Packet;
type Error = IoError;
type Item = Bytes;
type Error = io::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
// If we are closing, close as soon as the Sink is closed.
@@ -215,10 +165,16 @@ where TSubstream: AsyncRead + AsyncWrite,
// Note that `inner` is wrapped in a `Fuse`, therefore we can poll it forever.
loop {
match self.inner.poll()? {
Async::Ready(Some(data)) =>
if let Ok(packet) = self.data_to_packet(data) {
return Ok(Async::Ready(Some(packet)))
},
Async::Ready(Some(mut data)) => {
// The `data` should be prefixed by the packet ID, therefore an empty
// packet is invalid.
// TODO: remove the packet id system
if data.is_empty() {
return Err(io::Error::new(io::ErrorKind::Other, "bad packet"));
}
let data = data.split_off(1);
return Ok(Async::Ready(Some(data.freeze())))
},
Async::Ready(None) =>
if !self.requires_poll_complete && self.send_queue.is_empty() {
return Ok(Async::Ready(None))
@@ -234,9 +190,8 @@ where TSubstream: AsyncRead + AsyncWrite,
}
}
impl<TSubstream, TUserData> ConnectionUpgrade<TSubstream> for RegisteredProtocol<TUserData>
impl<TSubstream> ConnectionUpgrade<TSubstream> for RegisteredProtocol
where TSubstream: AsyncRead + AsyncWrite,
TUserData: Clone,
{
type NamesIter = VecIntoIter<(Bytes, Self::UpgradeIdentifier)>;
type UpgradeIdentifier = u8; // Protocol version
@@ -244,7 +199,7 @@ where TSubstream: AsyncRead + AsyncWrite,
#[inline]
fn protocol_names(&self) -> Self::NamesIter {
// Report each version as an individual protocol.
self.supported_versions.iter().map(|&(ver, _)| {
self.supported_versions.iter().map(|&ver| {
let num = ver.to_string();
let mut name = self.base_name.clone();
name.extend_from_slice(num.as_bytes());
@@ -253,7 +208,7 @@ where TSubstream: AsyncRead + AsyncWrite,
}
type Output = RegisteredProtocolSubstream<TSubstream>;
type Future = future::FutureResult<Self::Output, IoError>;
type Future = future::FutureResult<Self::Output, io::Error>;
#[allow(deprecated)]
fn upgrade(
@@ -263,13 +218,6 @@ where TSubstream: AsyncRead + AsyncWrite,
_: Endpoint,
_: &Multiaddr
) -> Self::Future {
let packet_count = self.supported_versions
.iter()
.find(|&(v, _)| *v == protocol_version)
.expect("negotiated protocol version that wasn't advertised ; \
programmer error")
.1;
let framed = Framed::new(socket, UviBytes::default());
future::ok(RegisteredProtocolSubstream {
@@ -277,7 +225,6 @@ where TSubstream: AsyncRead + AsyncWrite,
send_queue: VecDeque::new(),
requires_poll_complete: false,
inner: framed.fuse(),
packet_count,
protocol_id: self.id,
protocol_version,
to_notify: None,
@@ -287,9 +234,9 @@ where TSubstream: AsyncRead + AsyncWrite,
// Connection upgrade for all the protocols contained in it.
#[derive(Clone)]
pub struct RegisteredProtocols<TUserData>(pub Vec<RegisteredProtocol<TUserData>>);
pub struct RegisteredProtocols(pub Vec<RegisteredProtocol>);
impl<TUserData> RegisteredProtocols<TUserData> {
impl RegisteredProtocols {
/// Returns the number of protocols.
#[inline]
pub fn len(&self) -> usize {
@@ -298,7 +245,7 @@ impl<TUserData> RegisteredProtocols<TUserData> {
/// Finds a protocol in the list by its id.
pub fn find_protocol(&self, protocol: ProtocolId)
-> Option<&RegisteredProtocol<TUserData>> {
-> Option<&RegisteredProtocol> {
self.0.iter().find(|p| p.id == protocol)
}
@@ -308,19 +255,18 @@ impl<TUserData> RegisteredProtocols<TUserData> {
}
}
impl<TUserData> Default for RegisteredProtocols<TUserData> {
impl Default for RegisteredProtocols {
fn default() -> Self {
RegisteredProtocols(Vec::new())
}
}
impl<TSubstream, TUserData> ConnectionUpgrade<TSubstream> for RegisteredProtocols<TUserData>
impl<TSubstream> ConnectionUpgrade<TSubstream> for RegisteredProtocols
where TSubstream: AsyncRead + AsyncWrite,
TUserData: Clone,
{
type NamesIter = VecIntoIter<(Bytes, Self::UpgradeIdentifier)>;
type UpgradeIdentifier = (usize,
<RegisteredProtocol<TUserData> as ConnectionUpgrade<TSubstream>>::UpgradeIdentifier);
<RegisteredProtocol as ConnectionUpgrade<TSubstream>>::UpgradeIdentifier);
fn protocol_names(&self) -> Self::NamesIter {
// We concat the lists of `RegisteredProtocol::protocol_names` for
@@ -331,8 +277,8 @@ where TSubstream: AsyncRead + AsyncWrite,
).collect::<Vec<_>>().into_iter()
}
type Output = <RegisteredProtocol<TUserData> as ConnectionUpgrade<TSubstream>>::Output;
type Future = <RegisteredProtocol<TUserData> as ConnectionUpgrade<TSubstream>>::Future;
type Output = <RegisteredProtocol as ConnectionUpgrade<TSubstream>>::Output;
type Future = <RegisteredProtocol as ConnectionUpgrade<TSubstream>>::Future;
#[inline]
fn upgrade(
+6 -14
View File
@@ -44,29 +44,21 @@ extern crate log;
#[cfg(test)] #[macro_use]
extern crate assert_matches;
pub use connection_filter::{ConnectionFilter, ConnectionDirection};
pub use error::{Error, ErrorKind, DisconnectReason};
pub use libp2p::{Multiaddr, multiaddr::Protocol, PeerId};
pub use traits::*;
pub type TimerToken = usize;
// TODO: remove as it is unused ; however modifying `network` causes a clusterfuck of dependencies
// resolve errors at the moment
mod connection_filter;
mod custom_proto;
mod error;
mod node_handler;
mod secret;
mod service;
mod service_task;
mod swarm;
mod timeouts;
mod topology;
mod traits;
mod transport;
pub use service::NetworkService;
pub use custom_proto::RegisteredProtocol;
pub use error::{Error, ErrorKind, DisconnectReason};
pub use libp2p::{Multiaddr, multiaddr::Protocol, PeerId};
pub use service_task::{start_service, Service, ServiceEvent};
pub use traits::*; // TODO: expand to actual items
/// Check if node url is valid
pub fn validate_node_url(url: &str) -> Result<(), Error> {
@@ -77,7 +69,7 @@ pub fn validate_node_url(url: &str) -> Result<(), Error> {
}
/// Parses a string address and returns the component, if valid.
pub(crate) fn parse_str_addr(addr_str: &str) -> Result<(PeerId, Multiaddr), Error> {
pub fn parse_str_addr(addr_str: &str) -> Result<(PeerId, Multiaddr), Error> {
let mut addr: Multiaddr = addr_str.parse().map_err(|_| ErrorKind::AddressParse)?;
let who = match addr.pop() {
Some(Protocol::P2p(key)) =>
@@ -15,7 +15,7 @@
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use bytes::Bytes;
use custom_proto::{Packet, RegisteredProtocols, RegisteredProtocolSubstream};
use custom_proto::{RegisteredProtocols, RegisteredProtocolSubstream};
use futures::{prelude::*, task};
use libp2p::core::{ConnectionUpgrade, Endpoint, PeerId, PublicKey, upgrade};
use libp2p::core::nodes::handled_node::{NodeHandler, NodeHandlerEndpoint, NodeHandlerEvent};
@@ -28,7 +28,7 @@ use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_timer::{Delay, Interval};
use {Multiaddr, PacketId, ProtocolId};
use {Multiaddr, ProtocolId};
/// Duration after which we consider that a ping failed.
const PING_TIMEOUT: Duration = Duration::from_secs(30);
@@ -48,9 +48,9 @@ const DELAY_TO_FIRST_IDENTIFY: Duration = Duration::from_secs(2);
///
/// The node will be pinged at a regular interval to determine whether it's still alive. We will
/// also regularly query the remote for identification information, for statistics purposes.
pub struct SubstrateNodeHandler<TSubstream, TUserData> {
pub struct SubstrateNodeHandler<TSubstream> {
/// List of registered custom protocols.
registered_custom: Arc<RegisteredProtocols<TUserData>>,
registered_custom: Arc<RegisteredProtocols>,
/// Substreams open for "custom" protocols (eg. dot).
custom_protocols_substreams: Vec<RegisteredProtocolSubstream<TSubstream>>,
@@ -140,8 +140,6 @@ pub enum SubstrateOutEvent<TSubstream> {
CustomMessage {
/// Protocol which generated the message.
protocol_id: ProtocolId,
/// Identifier of the packet.
packet_id: u8,
/// Data that has been received.
data: Bytes,
},
@@ -235,7 +233,6 @@ pub enum SubstrateInEvent {
/// Sends a message through a custom protocol substream.
SendCustomMessage {
protocol: ProtocolId,
packet_id: PacketId,
data: Vec<u8>,
},
@@ -258,13 +255,12 @@ macro_rules! listener_upgrade {
)
}
impl<TSubstream, TUserData> SubstrateNodeHandler<TSubstream, TUserData>
impl<TSubstream> SubstrateNodeHandler<TSubstream>
where TSubstream: AsyncRead + AsyncWrite + Send + 'static,
TUserData: Clone + Send + 'static,
{
/// Creates a new node handler.
#[inline]
pub fn new(registered_custom: Arc<RegisteredProtocols<TUserData>>, endpoint: ConnectedPoint) -> Self {
pub fn new(registered_custom: Arc<RegisteredProtocols>, endpoint: ConnectedPoint) -> Self {
let registered_custom_len = registered_custom.len();
let queued_dial_upgrades = registered_custom.0
.iter()
@@ -298,9 +294,8 @@ where TSubstream: AsyncRead + AsyncWrite + Send + 'static,
}
}
impl<TSubstream, TUserData> NodeHandler<TSubstream> for SubstrateNodeHandler<TSubstream, TUserData>
impl<TSubstream> NodeHandler<TSubstream> for SubstrateNodeHandler<TSubstream>
where TSubstream: AsyncRead + AsyncWrite + Send + 'static,
TUserData: Clone + Send + 'static,
{
type InEvent = SubstrateInEvent;
type OutEvent = SubstrateOutEvent<TSubstream>;
@@ -383,8 +378,8 @@ where TSubstream: AsyncRead + AsyncWrite + Send + 'static,
fn inject_event(&mut self, event: Self::InEvent) {
match event {
SubstrateInEvent::SendCustomMessage { protocol, packet_id, data } => {
self.send_custom_message(protocol, packet_id, data);
SubstrateInEvent::SendCustomMessage { protocol, data } => {
self.send_custom_message(protocol, data);
},
SubstrateInEvent::OpenKademlia => self.open_kademlia(),
SubstrateInEvent::Accept => {
@@ -449,15 +444,13 @@ where TSubstream: AsyncRead + AsyncWrite + Send + 'static,
}
}
impl<TSubstream, TUserData> SubstrateNodeHandler<TSubstream, TUserData>
impl<TSubstream> SubstrateNodeHandler<TSubstream>
where TSubstream: AsyncRead + AsyncWrite + Send + 'static,
TUserData: Clone + Send + 'static,
{
/// Sends a message on a custom protocol substream.
fn send_custom_message(
&mut self,
protocol: ProtocolId,
packet_id: PacketId,
data: Vec<u8>,
) {
debug_assert!(self.registered_custom.has_protocol(protocol),
@@ -471,7 +464,7 @@ where TSubstream: AsyncRead + AsyncWrite + Send + 'static,
},
};
proto.send_message(Packet { id: packet_id, data: data.into() });
proto.send_message(data.into());
}
/// The node will try to open a Kademlia substream and produce a `KadOpen` event containing the
@@ -688,12 +681,11 @@ where TSubstream: AsyncRead + AsyncWrite + Send + 'static,
let mut custom_proto = self.custom_protocols_substreams.swap_remove(n);
match custom_proto.poll() {
Ok(Async::NotReady) => self.custom_protocols_substreams.push(custom_proto),
Ok(Async::Ready(Some(Packet { id, data }))) => {
Ok(Async::Ready(Some(data))) => {
let protocol_id = custom_proto.protocol_id();
self.custom_protocols_substreams.push(custom_proto);
return Ok(Async::Ready(Some(SubstrateOutEvent::CustomMessage {
protocol_id,
packet_id: id,
data,
})));
},
@@ -1,597 +0,0 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use fnv::FnvHashMap;
use parking_lot::Mutex;
use libp2p::core::{nodes::swarm::ConnectedPoint, Multiaddr, PeerId as PeerstorePeerId};
use libp2p::multiaddr::Protocol;
use {PacketId, SessionInfo, TimerToken};
use service_task::ServiceEvent;
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::sync::Arc;
use std::sync::mpsc as sync_mpsc;
use std::thread;
use std::time::Duration;
use futures::{prelude::*, Future, stream, Stream, select_all};
use futures::sync::{mpsc, oneshot};
use tokio::runtime::current_thread;
use {Error, ErrorKind, NetworkConfiguration, NetworkProtocolHandler, parse_str_addr};
use {NonReservedPeerMode, NetworkContext, Severity, NodeIndex, ProtocolId};
use custom_proto::{RegisteredProtocol, RegisteredProtocols};
use service_task::start_service;
use timeouts;
/// IO Service with networking.
pub struct NetworkService {
/// Information about peers.
peer_infos: Arc<Mutex<FnvHashMap<NodeIndex, PeerInfos>>>,
/// Use this channel to send a timeout request to the background thread's
/// events loop. After the timeout, elapsed, it will call `timeout` on the
/// `NetworkProtocolHandler`.
timeouts_register_tx: mpsc::UnboundedSender<(Duration, (Arc<NetworkProtocolHandler + Send + Sync>, ProtocolId, TimerToken))>,
/// Sender for messages to send to the background thread.
msg_tx: mpsc::UnboundedSender<MsgToBgThread>,
/// Sender for messages to the backgound service task, and handle for the background thread.
/// Dropping the sender should close the task and the thread.
bg_thread: Option<(oneshot::Sender<()>, thread::JoinHandle<()>)>,
/// Original configuration of the service.
config: NetworkConfiguration,
/// List of registered protocols.
registered_custom: Arc<RegisteredProtocols<Arc<NetworkProtocolHandler + Send + Sync>>>,
/// The external URL to report to the user.
external_url: Option<String>,
}
/// Known information about a peer.
struct PeerInfos {
/// Id of the peer.
id: PeerstorePeerId,
/// List of open custom protocols, and their version.
protocols: Vec<(ProtocolId, u8)>,
/// How we are connected to the remote.
endpoint: ConnectedPoint,
/// Latest known ping duration.
ping: Option<Duration>,
/// The client version of the remote, or `None` if not known.
client_version: Option<String>,
/// The local multiaddress used to communicate with the remote, or `None`
/// if not known.
// TODO: never filled ; also shouldn't be an `Option`
local_address: Option<Multiaddr>,
}
/// Message to send to the service task.
#[derive(Debug, Clone)]
enum MsgToBgThread {
/// Call `add_reserved_peer` on the network service.
AddReservedPeer(PeerstorePeerId, Multiaddr),
/// Call `remove_reserved_peer` on the network service.
RemoveReservedPeer(PeerstorePeerId, Multiaddr),
/// Call `set_non_reserved_mode` on the network service.
SetNonReserved(NonReservedPeerMode),
/// Call `send_custom_message` on the network service.
SendCustomMessage(NodeIndex, ProtocolId, PacketId, Vec<u8>),
/// Call `drop_peer` on the network service.
DropNode(NodeIndex),
/// Call `ban_peer` on the network service.
BanNode(NodeIndex),
}
impl NetworkService {
/// Starts the networking service.
///
/// Note that we could use an iterator for `protocols`, but having a
/// generic here is too much and crashes the Rust compiler.
pub fn new(
config: NetworkConfiguration,
protocols: Vec<(Arc<NetworkProtocolHandler + Send + Sync>, ProtocolId, &[(u8, u8)])>
) -> Result<NetworkService, Error> {
// Start by creating the protocols list.
let registered_custom = Arc::new(RegisteredProtocols(protocols.into_iter()
.map(|(handler, protocol, versions)|
RegisteredProtocol::new(handler.clone(), protocol, versions))
.collect()));
let (init_tx, init_rx) = sync_mpsc::channel();
let (close_tx, close_rx) = oneshot::channel();
let (timeouts_register_tx, timeouts_register_rx) = mpsc::unbounded();
let peer_infos = Arc::new(Mutex::new(Default::default()));
let timeouts_register_tx_clone = timeouts_register_tx.clone();
let (msg_tx, msg_rx) = mpsc::unbounded();
let registered_custom_clone = registered_custom.clone();
let config_clone = config.clone();
let peer_infos_clone = peer_infos.clone();
let msg_tx_clone = msg_tx.clone();
// Initialize all the protocols now.
// TODO: remove this `initialize` method eventually, as it's only used for timers
for protocol in registered_custom.0.iter() {
protocol.custom_data().initialize(&NetworkContextImpl {
peer_infos: peer_infos.clone(),
registered_custom: registered_custom.clone(),
msg_tx: msg_tx.clone(),
timeouts_register_tx: timeouts_register_tx.clone(),
protocol: protocol.id(),
current_peer: None,
});
}
let join_handle = thread::spawn(move || {
// Tokio runtime that is going to run everything in this thread.
let mut runtime = match current_thread::Runtime::new() {
Ok(c) => c,
Err(err) => {
let _ = init_tx.send(Err(err.into()));
return
}
};
let fut = match init_thread(
config_clone,
registered_custom_clone,
peer_infos_clone,
timeouts_register_tx_clone,
timeouts_register_rx,
msg_tx_clone,
msg_rx
) {
Ok((future, external_url)) => {
debug!(target: "sub-libp2p", "Successfully started networking service");
let _ = init_tx.send(Ok(external_url));
future
},
Err(err) => {
let _ = init_tx.send(Err(err));
return
}
};
let fut = fut.map_err(|_| ())
.select(close_rx.then(|_| Ok(())))
.map(|_| ()).map_err(|_| ());
match runtime.block_on(fut) {
Ok(()) => debug!(target: "sub-libp2p", "libp2p future finished"),
Err(err) => error!(target: "sub-libp2p", "Error while running libp2p: {:?}", err),
}
});
let external_url = init_rx.recv().expect("libp2p background thread panicked")?;
Ok(NetworkService {
config,
peer_infos,
timeouts_register_tx,
msg_tx,
external_url,
bg_thread: Some((close_tx, join_handle)),
registered_custom,
})
}
/// Returns network configuration.
// TODO: is this method really necessary? we could remove the `config` field if not
pub fn config(&self) -> &NetworkConfiguration {
&self.config
}
pub fn external_url(&self) -> Option<String> {
// TODO: cleanup this in external API layers
self.external_url.clone()
}
/// Get a list of all connected peers by id.
pub fn connected_peers(&self) -> Vec<NodeIndex> {
self.peer_infos.lock().keys().cloned().collect()
}
/// Try to add a reserved peer.
pub fn add_reserved_peer(&self, peer: &str) -> Result<(), Error> {
let (peer_id, addr) = parse_str_addr(peer)?;
let _ = self.msg_tx.unbounded_send(MsgToBgThread::AddReservedPeer(peer_id, addr));
Ok(())
}
/// Try to remove a reserved peer.
pub fn remove_reserved_peer(&self, peer: &str) -> Result<(), Error> {
let (peer_id, addr) = parse_str_addr(peer)?;
let _ = self.msg_tx.unbounded_send(MsgToBgThread::RemoveReservedPeer(peer_id, addr));
Ok(())
}
/// Set the non-reserved peer mode.
pub fn set_non_reserved_mode(&self, mode: NonReservedPeerMode) {
let _ = self.msg_tx.unbounded_send(MsgToBgThread::SetNonReserved(mode));
}
/// Executes action in the network context
pub fn with_context<F>(&self, protocol: ProtocolId, action: F)
where F: FnOnce(&NetworkContext) {
self.with_context_eval(protocol, action);
}
/// Evaluates function in the network context
pub fn with_context_eval<F, T>(&self, protocol: ProtocolId, action: F)
-> Option<T>
where F: FnOnce(&NetworkContext) -> T {
if !self.registered_custom.has_protocol(protocol) {
return None
}
Some(action(&NetworkContextImpl {
peer_infos: self.peer_infos.clone(),
registered_custom: self.registered_custom.clone(),
msg_tx: self.msg_tx.clone(),
timeouts_register_tx: self.timeouts_register_tx.clone(),
protocol: protocol.clone(),
current_peer: None,
}))
}
}
impl Drop for NetworkService {
fn drop(&mut self) {
if let Some((sender, join)) = self.bg_thread.take() {
drop(sender);
if let Err(e) = join.join() {
warn!(target: "sub-libp2p", "error while waiting on libp2p background thread: {:?}", e);
}
}
}
}
/// Builds the main `Future` for the network service.
///
/// Also returns a diagnostic external node address, to report to the user.
fn init_thread(
config: NetworkConfiguration,
registered_custom: Arc<RegisteredProtocols<Arc<NetworkProtocolHandler + Send + Sync>>>,
peers: Arc<Mutex<FnvHashMap<NodeIndex, PeerInfos>>>,
timeouts_register_tx: mpsc::UnboundedSender<
(Duration, (Arc<NetworkProtocolHandler + Send + Sync + 'static>, ProtocolId, TimerToken))
>,
timeouts_register_rx: mpsc::UnboundedReceiver<
(Duration, (Arc<NetworkProtocolHandler + Send + Sync + 'static>, ProtocolId, TimerToken))
>,
msg_tx: mpsc::UnboundedSender<MsgToBgThread>,
mut msg_rx: mpsc::UnboundedReceiver<MsgToBgThread>,
) -> Result<(impl Future<Item = (), Error = IoError>, Option<String>), Error> {
// Build the timeouts system for the `register_timeout` function.
// (note: this has nothing to do with socket timeouts)
let timeouts = timeouts::build_timeouts_stream(timeouts_register_rx)
.for_each({
let peers = peers.clone();
let msg_tx = msg_tx.clone();
let registered_custom = registered_custom.clone();
let timeouts_register_tx = timeouts_register_tx.clone();
move |(handler, protocol_id, timer_token)| {
handler.timeout(&NetworkContextImpl {
peer_infos: peers.clone(),
registered_custom: registered_custom.clone(),
msg_tx: msg_tx.clone(),
protocol: protocol_id,
current_peer: None,
timeouts_register_tx: timeouts_register_tx.clone(),
}, timer_token);
Ok(())
}
})
.then(|val| {
warn!(target: "sub-libp2p", "Timeouts stream closed unexpectedly: {:?}", val);
val
});
// Start the main service.
let mut service = start_service(config, registered_custom.clone())?;
// Build the external URL to report to the user.
let external_url = service
.listeners()
.next()
.map(|addr| {
let mut addr = addr.clone();
addr.append(Protocol::P2p(service.peer_id().clone().into()));
addr.to_string()
});
let service_stream = stream::poll_fn(move || {
loop {
match msg_rx.poll() {
Ok(Async::NotReady) => break,
Ok(Async::Ready(None)) => return Ok(Async::Ready(None)),
Ok(Async::Ready(Some(MsgToBgThread::AddReservedPeer(peer_id, addr)))) => {
service.add_reserved_peer(peer_id, addr);
},
Ok(Async::Ready(Some(MsgToBgThread::RemoveReservedPeer(peer_id, addr)))) => {
service.remove_reserved_peer(peer_id, addr);
},
Ok(Async::Ready(Some(MsgToBgThread::SetNonReserved(mode)))) => {
service.set_non_reserved_mode(mode);
},
Ok(Async::Ready(Some(MsgToBgThread::SendCustomMessage(node_index, protocol_id, packet_id, data)))) => {
service.send_custom_message(node_index, protocol_id, packet_id, data);
},
Ok(Async::Ready(Some(MsgToBgThread::DropNode(node_index)))) => {
service.drop_node(node_index);
},
Ok(Async::Ready(Some(MsgToBgThread::BanNode(node_index)))) => {
service.ban_node(node_index);
},
Err(()) => unreachable!("An unbounded receiver never errors"),
}
}
service.poll()
})
.for_each(move |event| {
macro_rules! ctxt {
($protocol:expr, $node_index:expr) => (
NetworkContextImpl {
peer_infos: peers.clone(),
registered_custom: registered_custom.clone(),
msg_tx: msg_tx.clone(),
timeouts_register_tx: timeouts_register_tx.clone(),
protocol: $protocol,
current_peer: Some($node_index),
}
);
}
match event {
ServiceEvent::NewNode { node_index, peer_id, endpoint } => {
peers.lock().insert(node_index, PeerInfos {
id: peer_id,
protocols: Vec::new(),
endpoint,
ping: None,
client_version: None,
local_address: None, // TODO: fill
});
},
ServiceEvent::NodeClosed { node_index, closed_custom_protocols } => {
let old = peers.lock().remove(&node_index);
debug_assert!(old.is_some());
for protocol in closed_custom_protocols {
registered_custom.find_protocol(protocol)
.expect("we passed a list of protocols when building the service, and never \
modify that list ; therefore all the reported ids should always be valid")
.custom_data()
.disconnected(&ctxt!(protocol, node_index), &node_index);
}
},
ServiceEvent::ClosedCustomProtocols { node_index, protocols } => {
peers.lock().get_mut(&node_index)
.expect("peers is kept in sync with the state in the service")
.protocols
.retain(|&(ref p, _)| !protocols.iter().any(|pr| pr == p));
for protocol in protocols {
registered_custom.find_protocol(protocol)
.expect("we passed a list of protocols when building the service, and never \
modify that list ; therefore all the reported ids should always be valid")
.custom_data()
.disconnected(&ctxt!(protocol, node_index), &node_index);
}
},
ServiceEvent::PingDuration(node_index, ping) =>
peers.lock().get_mut(&node_index)
.expect("peers is kept in sync with the state in the service")
.ping = Some(ping),
ServiceEvent::NodeInfos { node_index, client_version } =>
peers.lock().get_mut(&node_index)
.expect("peers is kept in sync with the state in the service")
.client_version = Some(client_version),
ServiceEvent::OpenedCustomProtocol { node_index, protocol, version } => {
peers.lock().get_mut(&node_index)
.expect("peers is kept in sync with the state in the service")
.protocols
.push((protocol, version));
registered_custom.find_protocol(protocol)
.expect("we passed a list of protocols when building the service, and never \
modify that list ; therefore all the reported ids should always be valid")
.custom_data()
.connected(&ctxt!(protocol, node_index), &node_index)
},
ServiceEvent::ClosedCustomProtocol { node_index, protocol } => {
peers.lock().get_mut(&node_index)
.expect("peers is kept in sync with the state in the service")
.protocols
.retain(|&(ref p, _)| p != &protocol);
registered_custom.find_protocol(protocol)
.expect("we passed a list of protocols when building the service, and never \
modify that list ; therefore all the reported ids should always be valid")
.custom_data()
.disconnected(&ctxt!(protocol, node_index), &node_index)
},
ServiceEvent::CustomMessage { node_index, protocol_id, packet_id, data } => {
registered_custom.find_protocol(protocol_id)
.expect("we passed a list of protocols when building the service, and never \
modify that list ; therefore all the reported ids should always be valid")
.custom_data()
.read(&ctxt!(protocol_id, node_index), &node_index, packet_id, &data)
},
};
Ok(())
});
// Merge all futures into one.
let futures: Vec<Box<Future<Item = (), Error = IoError>>> = vec![
Box::new(service_stream),
Box::new(timeouts),
];
let final_future = select_all(futures)
.and_then(move |_| {
debug!(target: "sub-libp2p", "Networking ended");
Ok(())
})
.map_err(|(r, _, _)| r);
Ok((final_future, external_url))
}
#[derive(Clone)]
struct NetworkContextImpl {
peer_infos: Arc<Mutex<FnvHashMap<NodeIndex, PeerInfos>>>,
msg_tx: mpsc::UnboundedSender<MsgToBgThread>,
protocol: ProtocolId,
current_peer: Option<NodeIndex>,
registered_custom: Arc<RegisteredProtocols<Arc<NetworkProtocolHandler + Send + Sync>>>,
/// Clone of `NetworkService::timeouts_register_tx`.
timeouts_register_tx: mpsc::UnboundedSender<(Duration, (Arc<NetworkProtocolHandler + Send + Sync>, ProtocolId, TimerToken))>,
}
impl NetworkContext for NetworkContextImpl {
fn send(&self, peer: NodeIndex, packet_id: PacketId, data: Vec<u8>) {
self.send_protocol(self.protocol, peer, packet_id, data)
}
fn send_protocol(
&self,
protocol: ProtocolId,
peer: NodeIndex,
packet_id: PacketId,
data: Vec<u8>
) {
let msg = MsgToBgThread::SendCustomMessage(peer, protocol, packet_id, data);
let _ = self.msg_tx.unbounded_send(msg);
}
fn respond(&self, packet_id: PacketId, data: Vec<u8>) {
if let Some(peer) = self.current_peer {
self.send_protocol(self.protocol, peer, packet_id, data)
} else {
panic!("respond() called outside of a received message");
}
}
fn report_peer(&self, node_index: NodeIndex, reason: Severity) {
let peer_infos = self.peer_infos.lock();
if let Some(info) = peer_infos.get(&node_index) {
if let Some(ref client_version) = info.client_version {
info!(target: "sub-libp2p",
"Peer {:?} ({:?} {}) reported by client: {}",
info.id,
info.endpoint,
client_version,
reason
);
} else {
info!(target: "sub-libp2p", "Peer {:?} reported by client: {}", info.id, reason);
}
}
let _ = self.msg_tx.unbounded_send(match reason {
Severity::Bad(_) => MsgToBgThread::BanNode(node_index),
Severity::Useless(_) => MsgToBgThread::DropNode(node_index),
Severity::Timeout => MsgToBgThread::DropNode(node_index),
});
}
fn is_expired(&self) -> bool {
let peer_infos = self.peer_infos.lock();
let current_peer = self.current_peer.as_ref()
.expect("Called is_expired outside of a context");
!peer_infos.contains_key(current_peer)
}
fn register_timer(&self, token: usize, duration: Duration)
-> Result<(), Error> {
let handler = self.registered_custom
.find_protocol(self.protocol)
.ok_or(ErrorKind::BadProtocol)?
.custom_data()
.clone();
self.timeouts_register_tx
.unbounded_send((duration, (handler, self.protocol, token)))
.map_err(|err| ErrorKind::Io(IoError::new(IoErrorKind::Other, err)))?;
Ok(())
}
fn peer_client_version(&self, peer: NodeIndex) -> String {
// Devp2p returns "unknown" on unknown peer ID, so we do the same.
// TODO: implement more directly, without going through `session_info`
self.session_info(peer)
.map(|info| info.client_version)
.unwrap_or_else(|| "unknown".to_string())
}
fn session_info(&self, peer: NodeIndex) -> Option<SessionInfo> {
let peer_infos = self.peer_infos.lock();
let info = match peer_infos.get(&peer) {
Some(info) => info,
None => return None,
};
let protocol_id = self.protocol;
let protocol_version = match info.protocols.iter().find(|&(ref p, _)| p == &protocol_id) {
Some(&(_, vers)) => vers,
None => return None,
};
Some(SessionInfo {
id: info.id.clone(),
client_version: info.client_version.clone().take().unwrap_or(String::new()),
protocol_version: From::from(protocol_version),
capabilities: Vec::new(), // TODO: list of supported protocols ; hard
peer_capabilities: Vec::new(), // TODO: difference with `peer_capabilities`?
ping: info.ping,
originated: if let ConnectedPoint::Dialer { .. } = info.endpoint { true } else { false },
remote_address: String::new(), // TODO:
local_address: info.local_address.as_ref().map(|a| a.to_string())
.unwrap_or(String::new()),
})
}
fn protocol_version(&self, protocol: ProtocolId, peer: NodeIndex) -> Option<u8> {
let peer_infos = self.peer_infos.lock();
let info = match peer_infos.get(&peer) {
Some(info) => info,
None => return None,
};
let protocol_version = match info.protocols.iter().find(|&(ref p, _)| p == &protocol) {
Some(&(_, vers)) => vers,
None => return None,
};
Some(protocol_version)
}
fn subprotocol_name(&self) -> ProtocolId {
self.protocol.clone()
}
}
#[cfg(test)]
mod tests {
use super::NetworkService;
#[test]
fn builds_and_finishes_in_finite_time() {
// Checks that merely starting the network doesn't end up in an infinite loop.
let _service = NetworkService::new(Default::default(), vec![]).unwrap();
}
}
+105 -106
View File
@@ -15,7 +15,7 @@
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use bytes::Bytes;
use custom_proto::RegisteredProtocols;
use custom_proto::{RegisteredProtocol, RegisteredProtocols};
use fnv::{FnvHashMap, FnvHashSet};
use futures::{prelude::*, task, Stream};
use futures::sync::{oneshot, mpsc};
@@ -36,8 +36,8 @@ use std::time::{Duration, Instant};
use swarm::{self, Swarm, SwarmEvent};
use topology::{DisconnectReason, NetTopology};
use tokio_timer::{Delay, Interval};
use {Error, ErrorKind, NetworkConfiguration, NetworkProtocolHandler, NodeIndex, parse_str_addr};
use {NonReservedPeerMode, PacketId, ProtocolId};
use {Error, ErrorKind, NetworkConfiguration, NodeIndex, parse_str_addr};
use {NonReservedPeerMode, ProtocolId};
// File where the network topology is stored.
const NODES_FILE: &str = "nodes.json";
@@ -47,16 +47,18 @@ const PEER_DISABLE_DURATION: Duration = Duration::from_secs(5 * 60);
/// Starts the substrate libp2p service.
///
/// Returns a stream that must be polled regularly in order for the networking to function.
pub fn start_service(
pub fn start_service<TProtos>(
config: NetworkConfiguration,
registered_custom: Arc<RegisteredProtocols<Arc<NetworkProtocolHandler + Send + Sync>>>,
) -> Result<Service, Error> {
registered_custom: TProtos,
) -> Result<Service, Error>
where TProtos: IntoIterator<Item = RegisteredProtocol> {
// Private and public keys configuration.
let local_private_key = obtain_private_key(&config)?;
let local_public_key = local_private_key.to_public_key();
let local_peer_id = local_public_key.clone().into_peer_id();
// Build the swarm.
let registered_custom = RegisteredProtocols(registered_custom.into_iter().collect());
let mut swarm = swarm::start_swarm(registered_custom, local_private_key)?;
// Listen on multiaddresses.
@@ -170,18 +172,6 @@ pub fn start_service(
/// Event produced by the service.
pub enum ServiceEvent {
/// We have successfully connected to a new node.
NewNode {
/// Index that was attributed for this node. Will be used for all further interaction with
/// it.
node_index: NodeIndex,
/// Public key of the node as a peer id.
peer_id: PeerId,
/// Whether we dialed the node or if it came to us. Should be used only for statistics
/// purposes.
endpoint: ConnectedPoint,
},
/// Closed connection to a node.
///
/// It is guaranteed that this node has been opened with a `NewNode` event beforehand. However
@@ -193,17 +183,6 @@ pub enum ServiceEvent {
closed_custom_protocols: Vec<ProtocolId>,
},
/// Report the duration of the ping for the given node.
PingDuration(NodeIndex, Duration),
/// Report information about the node.
NodeInfos {
/// Index of the node.
node_index: NodeIndex,
/// The client version. Note that it can be anything and should not be trusted.
client_version: String,
},
/// A custom protocol substream has been opened with a node.
OpenedCustomProtocol {
/// Index of the node.
@@ -238,8 +217,6 @@ pub enum ServiceEvent {
node_index: NodeIndex,
/// Protocol which generated the message.
protocol_id: ProtocolId,
/// Identifier of the packet.
packet_id: u8,
/// Data that has been received.
data: Bytes,
},
@@ -248,7 +225,7 @@ pub enum ServiceEvent {
/// Network service. Must be polled regularly in order for the networking to work.
pub struct Service {
/// Stream of events of the swarm.
swarm: Swarm<Arc<NetworkProtocolHandler + Send + Sync>>,
swarm: Swarm,
/// Maximum number of incoming non-reserved connections, taken from the config.
max_incoming_connections: usize,
@@ -256,10 +233,8 @@ pub struct Service {
/// Maximum number of outgoing non-reserved connections, taken from the config.
max_outgoing_connections: usize,
/// For each node we're connected to, its address if known.
///
/// This is used purely to report disconnections to the topology.
nodes_addresses: FnvHashMap<NodeIndex, Multiaddr>,
/// For each node we're connected to, how we're connected to it.
nodes_addresses: FnvHashMap<NodeIndex, ConnectedPoint>,
/// If true, only reserved peers can connect.
reserved_only: bool,
@@ -322,6 +297,12 @@ impl Service {
self.kad_system.local_peer_id()
}
/// Returns the list of all the peers we are connected to.
#[inline]
pub fn connected_peers<'a>(&'a self) -> impl Iterator<Item = NodeIndex> + 'a {
self.nodes_addresses.keys().cloned()
}
/// Try to add a reserved peer.
pub fn add_reserved_peer(&mut self, peer_id: PeerId, addr: Multiaddr) {
self.reserved_peers.insert(peer_id.clone());
@@ -330,35 +311,59 @@ impl Service {
}
/// Try to remove a reserved peer.
// TODO: remove `_addr` parameter?
pub fn remove_reserved_peer(&mut self, peer_id: PeerId, _addr: Multiaddr) {
///
/// If we are in reserved mode and we were connected to a node with this peer ID, then this
/// method will disconnect it and return its index.
pub fn remove_reserved_peer(&mut self, peer_id: PeerId) -> Option<NodeIndex> {
self.reserved_peers.remove(&peer_id);
if self.reserved_only {
if let Some(node_index) = self.swarm.latest_node_by_peer_id(&peer_id) {
self.drop_node_inner(node_index, DisconnectReason::NoSlot, None);
return Some(node_index);
}
}
None
}
/// Start accepting all peers again if we weren't.
pub fn accept_unreserved_peers(&mut self) {
if self.reserved_only {
self.reserved_only = false;
self.connect_to_nodes();
}
}
/// Set the non-reserved peer mode.
pub fn set_non_reserved_mode(&mut self, mode: NonReservedPeerMode) {
self.reserved_only = mode == NonReservedPeerMode::Deny;
if self.reserved_only {
// Disconnect the nodes that are not reserved.
let to_disconnect: Vec<NodeIndex> = self.swarm
.nodes()
.filter(|&n| {
let peer_id = self.swarm.peer_id_of_node(n)
.expect("swarm.nodes() always returns valid node indices");
!self.reserved_peers.contains(peer_id)
})
.collect();
for node_index in to_disconnect {
self.drop_node_inner(node_index, DisconnectReason::NoSlot, None);
}
} else {
self.connect_to_nodes();
/// Start refusing non-reserved nodes. Returns the list of nodes that have been disconnected.
pub fn deny_unreserved_peers(&mut self) -> Vec<NodeIndex> {
self.reserved_only = true;
// Disconnect the nodes that are not reserved.
let to_disconnect: Vec<NodeIndex> = self.swarm
.nodes()
.filter(|&n| {
let peer_id = self.swarm.peer_id_of_node(n)
.expect("swarm.nodes() always returns valid node indices");
!self.reserved_peers.contains(peer_id)
})
.collect();
for &node_index in &to_disconnect {
self.drop_node_inner(node_index, DisconnectReason::NoSlot, None);
}
to_disconnect
}
/// Returns the `PeerId` of a node.
#[inline]
pub fn peer_id_of_node(&self, node_index: NodeIndex) -> Option<&PeerId> {
self.swarm.peer_id_of_node(node_index)
}
/// Returns the way we are connected to a node.
#[inline]
pub fn node_endpoint(&self, node_index: NodeIndex) -> Option<&ConnectedPoint> {
self.nodes_addresses.get(&node_index)
}
/// Sends a message to a peer using the custom protocol.
@@ -367,10 +372,9 @@ impl Service {
&mut self,
node_index: NodeIndex,
protocol: ProtocolId,
packet_id: PacketId,
data: Vec<u8>
) {
self.swarm.send_custom_message(node_index, protocol, packet_id, data)
self.swarm.send_custom_message(node_index, protocol, data)
}
/// Disconnects a peer and bans it for a little while.
@@ -378,6 +382,10 @@ impl Service {
/// Same as `drop_node`, except that the same peer will not be able to reconnect later.
#[inline]
pub fn ban_node(&mut self, node_index: NodeIndex) {
if let Some(peer_id) = self.swarm.peer_id_of_node(node_index) {
info!(target: "sub-libp2p", "Banned {:?}", peer_id);
}
self.drop_node_inner(node_index, DisconnectReason::Banned, Some(PEER_DISABLE_DURATION));
}
@@ -387,6 +395,10 @@ impl Service {
/// Corresponding closing events will be generated once the closing actually happens.
#[inline]
pub fn drop_node(&mut self, node_index: NodeIndex) {
if let Some(peer_id) = self.swarm.peer_id_of_node(node_index) {
info!(target: "sub-libp2p", "Dropped {:?}", peer_id);
}
self.drop_node_inner(node_index, DisconnectReason::Useless, None);
}
@@ -414,8 +426,8 @@ impl Service {
to_notify.notify();
}
if let Some(addr) = self.nodes_addresses.remove(&node_index) {
self.topology.report_disconnected(&addr, reason);
if let Some(ConnectedPoint::Dialer { address }) = self.nodes_addresses.remove(&node_index) {
self.topology.report_disconnected(&address, reason);
}
if let Some(disable_duration) = disable_duration {
@@ -472,10 +484,8 @@ impl Service {
continue;
}
// TODO: it is possible that we are connected to this peer, but the topology
// doesn't know about that because we don't know its multiaddress yet
// TODO: after some changes in libp2p, we can avoid this situation and also remove
// the `num_to_open` variable
// It is possible that we are connected to this peer, but the topology doesn't know
// about that because it is an incoming connection.
match self.swarm.ensure_connection(peer_id.clone(), addr.clone()) {
Ok(true) => (),
Ok(false) => num_to_open -= 1,
@@ -578,15 +588,13 @@ impl Service {
/// Handles the swarm opening a connection to the given peer.
///
/// Returns the `NewNode` event to produce.
///
/// > **Note**: Must be called from inside `poll()`, otherwise it will panic.
fn handle_connection(
&mut self,
node_index: NodeIndex,
peer_id: PeerId,
endpoint: ConnectedPoint
) -> Option<ServiceEvent> {
) {
// Reject connections to our own node, which can happen if the DHT contains `127.0.0.1`
// for example.
if &peer_id == self.kad_system.local_peer_id() {
@@ -595,7 +603,7 @@ impl Service {
if let ConnectedPoint::Dialer { ref address } = endpoint {
self.topology.report_failed_to_connect(address);
}
return None;
return;
}
// Reject non-reserved nodes if we're in reserved mode.
@@ -606,7 +614,7 @@ impl Service {
if let ConnectedPoint::Dialer { ref address } = endpoint {
self.topology.report_failed_to_connect(address);
}
return None;
return;
}
// Reject connections from disabled peers.
@@ -617,7 +625,7 @@ impl Service {
if let ConnectedPoint::Dialer { ref address } = endpoint {
self.topology.report_failed_to_connect(address);
}
return None;
return;
}
}
@@ -629,18 +637,17 @@ impl Service {
} else {
info!(target: "sub-libp2p", "Rejected incoming peer {:?} because we are full", peer_id);
assert_eq!(self.swarm.drop_node(node_index), Ok(Vec::new()));
return None;
return;
}
},
ConnectedPoint::Dialer { ref address } => {
if is_reserved || self.num_outgoing_connections() < self.max_outgoing_connections {
debug!(target: "sub-libp2p", "Connected to {:?} through {}", peer_id, address);
self.topology.report_connected(address, &peer_id);
self.nodes_addresses.insert(node_index, address.clone());
} else {
debug!(target: "sub-libp2p", "Rejected dialed peer {:?} because we are full", peer_id);
assert_eq!(self.swarm.drop_node(node_index), Ok(Vec::new()));
return None;
return;
}
},
};
@@ -649,19 +656,19 @@ impl Service {
error!(target: "sub-libp2p", "accept_node returned an error");
}
// We are finally sure that we're connected.
if let ConnectedPoint::Dialer { ref address } = endpoint {
self.topology.report_connected(address, &peer_id);
}
self.nodes_addresses.insert(node_index, endpoint.clone());
// If we're waiting for a Kademlia substream for this peer id, open one.
let kad_pending_ctrls = self.kad_pending_ctrls.lock();
if kad_pending_ctrls.contains_key(&peer_id) {
let res = self.swarm.open_kademlia(node_index);
debug_assert!(res.is_ok());
}
drop(kad_pending_ctrls);
Some(ServiceEvent::NewNode {
node_index,
peer_id,
endpoint
})
}
/// Processes an event received by the swarm.
@@ -674,22 +681,20 @@ impl Service {
event: SwarmEvent
) -> Option<ServiceEvent> {
match event {
SwarmEvent::NodePending { node_index, peer_id, endpoint } =>
if let Some(event) = self.handle_connection(node_index, peer_id, endpoint) {
Some(event)
} else {
None
},
SwarmEvent::NodePending { node_index, peer_id, endpoint } => {
self.handle_connection(node_index, peer_id, endpoint);
None
},
SwarmEvent::Reconnected { node_index, endpoint, closed_custom_protocols } => {
if let Some(addr) = self.nodes_addresses.remove(&node_index) {
self.topology.report_disconnected(&addr, DisconnectReason::FoundBetterAddr);
if let Some(ConnectedPoint::Dialer { address }) = self.nodes_addresses.remove(&node_index) {
self.topology.report_disconnected(&address, DisconnectReason::FoundBetterAddr);
}
if let ConnectedPoint::Dialer { address } = endpoint {
if let ConnectedPoint::Dialer { ref address } = endpoint {
let peer_id = self.swarm.peer_id_of_node(node_index)
.expect("the swarm always produces events containing valid node indices");
self.nodes_addresses.insert(node_index, address.clone());
self.topology.report_connected(&address, peer_id);
self.topology.report_connected(address, peer_id);
}
self.nodes_addresses.insert(node_index, endpoint);
Some(ServiceEvent::ClosedCustomProtocols {
node_index,
protocols: closed_custom_protocols,
@@ -697,8 +702,8 @@ impl Service {
},
SwarmEvent::NodeClosed { node_index, peer_id, closed_custom_protocols } => {
debug!(target: "sub-libp2p", "Connection to {:?} closed gracefully", peer_id);
if let Some(addr) = self.nodes_addresses.get(&node_index) {
self.topology.report_disconnected(addr, DisconnectReason::RemoteClosed);
if let Some(ConnectedPoint::Dialer { ref address }) = self.nodes_addresses.get(&node_index) {
self.topology.report_disconnected(address, DisconnectReason::RemoteClosed);
}
self.connect_to_nodes();
Some(ServiceEvent::NodeClosed {
@@ -715,8 +720,8 @@ impl Service {
SwarmEvent::UnresponsiveNode { node_index } => {
let closed_custom_protocols = self.swarm.drop_node(node_index)
.expect("the swarm always produces events containing valid node indices");
if let Some(addr) = self.nodes_addresses.remove(&node_index) {
self.topology.report_disconnected(&addr, DisconnectReason::Useless);
if let Some(ConnectedPoint::Dialer { address }) = self.nodes_addresses.remove(&node_index) {
self.topology.report_disconnected(&address, DisconnectReason::Useless);
}
Some(ServiceEvent::NodeClosed {
node_index,
@@ -730,27 +735,22 @@ impl Service {
let closed_custom_protocols = self.swarm.drop_node(node_index)
.expect("the swarm always produces events containing valid node indices");
self.topology.report_useless(&peer_id);
if let Some(addr) = self.nodes_addresses.remove(&node_index) {
self.topology.report_disconnected(&addr, DisconnectReason::Useless);
if let Some(ConnectedPoint::Dialer { address }) = self.nodes_addresses.remove(&node_index) {
self.topology.report_disconnected(&address, DisconnectReason::Useless);
}
Some(ServiceEvent::NodeClosed {
node_index,
closed_custom_protocols,
})
},
SwarmEvent::PingDuration(node_index, ping) =>
Some(ServiceEvent::PingDuration(node_index, ping)),
SwarmEvent::NodeInfos { node_index, client_version, listen_addrs } => {
SwarmEvent::NodeInfos { node_index, listen_addrs, .. } => {
let peer_id = self.swarm.peer_id_of_node(node_index)
.expect("the swarm always produces events containing valid node indices");
self.topology.add_self_reported_listen_addrs(
peer_id,
listen_addrs.into_iter()
);
Some(ServiceEvent::NodeInfos {
node_index,
client_version,
})
None
},
SwarmEvent::KadFindNode { searched, responder, .. } => {
let response = self.build_kademlia_response(&searched);
@@ -786,14 +786,13 @@ impl Service {
node_index,
protocol,
}),
SwarmEvent::CustomMessage { node_index, protocol_id, packet_id, data } => {
SwarmEvent::CustomMessage { node_index, protocol_id, data } => {
let peer_id = self.swarm.peer_id_of_node(node_index)
.expect("the swarm always produces events containing valid node indices");
self.kad_system.update_kbuckets(peer_id.clone());
Some(ServiceEvent::CustomMessage {
node_index,
protocol_id,
packet_id,
data,
})
},
+16 -27
View File
@@ -28,18 +28,17 @@ use libp2p::kad::{KadConnecController, KadFindNodeRespond};
use libp2p::secio;
use node_handler::{SubstrateOutEvent, SubstrateNodeHandler, SubstrateInEvent, IdentificationRequest};
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::{mem, sync::Arc, time::Duration};
use std::{mem, sync::Arc};
use transport;
use {Error, NodeIndex, PacketId, ProtocolId};
use {Error, NodeIndex, ProtocolId};
/// Starts a swarm.
///
/// Returns a stream that must be polled regularly in order for the networking to function.
pub fn start_swarm<TUserData>(
registered_custom: Arc<RegisteredProtocols<TUserData>>,
pub fn start_swarm(
registered_custom: RegisteredProtocols,
local_private_key: secio::SecioKeyPair,
) -> Result<Swarm<TUserData>, Error>
where TUserData: Send + Sync + Clone + 'static {
) -> Result<Swarm, Error> {
// Private and public keys.
let local_public_key = local_private_key.to_public_key();
let local_peer_id = local_public_key.clone().into_peer_id();
@@ -48,7 +47,7 @@ where TUserData: Send + Sync + Clone + 'static {
let transport = transport::build_transport(local_private_key);
// Build the underlying libp2p swarm.
let swarm = Libp2pSwarm::with_handler_builder(transport, HandlerBuilder(registered_custom));
let swarm = Libp2pSwarm::with_handler_builder(transport, HandlerBuilder(Arc::new(registered_custom)));
Ok(Swarm {
swarm,
@@ -64,11 +63,10 @@ where TUserData: Send + Sync + Clone + 'static {
/// Dummy structure that exists because we need to be able to express the type. Otherwise we would
/// use a closure.
#[derive(Clone)]
struct HandlerBuilder<TUserData>(Arc<RegisteredProtocols<TUserData>>);
impl<TUserData> HandlerFactory for HandlerBuilder<TUserData>
where TUserData: Clone + Send + Sync + 'static
struct HandlerBuilder(Arc<RegisteredProtocols>);
impl HandlerFactory for HandlerBuilder
{
type Handler = SubstrateNodeHandler<Substream<Muxer>, TUserData>;
type Handler = SubstrateNodeHandler<Substream<Muxer>>;
#[inline]
fn new_handler(&self, addr: ConnectedPoint) -> Self::Handler {
@@ -122,9 +120,6 @@ pub enum SwarmEvent {
error: IoError,
},
/// Report the duration of the ping for the given node.
PingDuration(NodeIndex, Duration),
/// Report information about the node.
NodeInfos {
/// Index of the node.
@@ -159,8 +154,6 @@ pub enum SwarmEvent {
node_index: NodeIndex,
/// Protocol which generated the message.
protocol_id: ProtocolId,
/// Identifier of the packet.
packet_id: u8,
/// Data that has been received.
data: Bytes,
},
@@ -209,13 +202,13 @@ pub enum SwarmEvent {
}
/// Network swarm. Must be polled regularly in order for the networking to work.
pub struct Swarm<TUserData> {
pub struct Swarm {
/// Stream of events of the swarm.
swarm: Libp2pSwarm<
Boxed<(PeerId, Muxer)>,
SubstrateInEvent,
SubstrateOutEvent<Substream<Muxer>>,
HandlerBuilder<TUserData>
HandlerBuilder
>,
/// Public key of the local node.
@@ -252,8 +245,7 @@ struct NodeInfo {
/// The muxer used by the transport.
type Muxer = muxing::StreamMuxerBox;
impl<TUserData> Swarm<TUserData>
where TUserData: Clone + Send + Sync + 'static {
impl Swarm {
/// Start listening on a multiaddr.
#[inline]
pub fn listen_on(&mut self, addr: Multiaddr) -> Result<Multiaddr, Multiaddr> {
@@ -311,12 +303,11 @@ impl<TUserData> Swarm<TUserData>
&mut self,
node_index: NodeIndex,
protocol: ProtocolId,
packet_id: PacketId,
data: Vec<u8>
) {
if let Some(info) = self.nodes_info.get_mut(&node_index) {
if let Some(mut connected) = self.swarm.peer(info.peer_id.clone()).as_connected() {
connected.send_event(SubstrateInEvent::SendCustomMessage { protocol, packet_id, data });
connected.send_event(SubstrateInEvent::SendCustomMessage { protocol, data });
} else {
error!(target: "sub-libp2p", "Tried to send message to {:?}, but we're not \
connected to it", info.peer_id);
@@ -637,7 +628,7 @@ impl<TUserData> Swarm<TUserData>
},
SubstrateOutEvent::PingSuccess(ping) => {
trace!(target: "sub-libp2p", "Pong from {:?} in {:?}", peer_id, ping);
Some(SwarmEvent::PingDuration(node_index, ping))
None
},
SubstrateOutEvent::Identified { info, observed_addr } => {
self.add_observed_addr(&peer_id, &observed_addr);
@@ -689,11 +680,10 @@ impl<TUserData> Swarm<TUserData>
protocol: protocol_id,
})
},
SubstrateOutEvent::CustomMessage { protocol_id, packet_id, data } => {
SubstrateOutEvent::CustomMessage { protocol_id, data } => {
Some(SwarmEvent::CustomMessage {
node_index,
protocol_id,
packet_id,
data,
})
},
@@ -706,8 +696,7 @@ impl<TUserData> Swarm<TUserData>
}
}
impl<TUserData> Stream for Swarm<TUserData>
where TUserData: Clone + Send + Sync + 'static {
impl Stream for Swarm {
type Item = SwarmEvent;
type Error = IoError;
@@ -1,115 +0,0 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use futures::{Async, future, Future, Poll, stream, Stream, sync::mpsc};
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::marker::PhantomData;
use std::time::{Duration, Instant};
use tokio_timer::{self, Delay};
/// Builds the timeouts system.
///
/// The `timeouts_rx` should be a stream receiving newly-created timeout
/// requests. Returns a stream that produces items as their timeout elapses.
/// `T` can be anything you want, as it is transparently passed from the input
/// to the output. Timeouts continue to fire forever, as there is no way to
/// unregister them.
pub fn build_timeouts_stream<'a, T>(
timeouts_rx: mpsc::UnboundedReceiver<(Duration, T)>
) -> Box<Stream<Item = T, Error = IoError> + 'a>
where T: Clone + 'a {
let next_timeout = next_in_timeouts_stream(timeouts_rx);
// The `unfold` function is essentially a loop turned into a stream. The
// first parameter is the initial state, and the closure returns the new
// state and an item.
let stream = stream::unfold(vec![future::Either::A(next_timeout)], move |timeouts| {
// `timeouts` is a `Vec` of futures that produce an `Out`.
// `select_ok` panics if `timeouts` is empty anyway.
if timeouts.is_empty() {
return None
}
Some(future::select_ok(timeouts.into_iter())
.and_then(move |(item, mut timeouts)|
match item {
Out::NewTimeout((Some((duration, item)), next_timeouts)) => {
// Received a new timeout request on the channel.
let next_timeout = next_in_timeouts_stream(next_timeouts);
let timeout = Delay::new(Instant::now() + duration);
let timeout = TimeoutWrapper(timeout, duration, Some(item), PhantomData);
timeouts.push(future::Either::B(timeout));
timeouts.push(future::Either::A(next_timeout));
Ok((None, timeouts))
},
Out::NewTimeout((None, _)) =>
// The channel has been closed.
Ok((None, timeouts)),
Out::Timeout(duration, item) => {
// A timeout has happened.
let returned = item.clone();
let timeout = Delay::new(Instant::now() + duration);
let timeout = TimeoutWrapper(timeout, duration, Some(item), PhantomData);
timeouts.push(future::Either::B(timeout));
Ok((Some(returned), timeouts))
},
}
)
)
}).filter_map(|item| item);
// Note that we use a `Box` in order to speed up compilation time.
Box::new(stream) as Box<Stream<Item = _, Error = _>>
}
/// Local enum representing the output of the selection.
enum Out<A, B> {
NewTimeout(A),
Timeout(Duration, B),
}
/// Convenience function that calls `.into_future()` on the timeouts stream,
/// and applies some modifiers.
/// This function is necessary. Otherwise if we copy-paste its content we run
/// into errors because the type of the copy-pasted closures differs.
fn next_in_timeouts_stream<T, B>(
stream: mpsc::UnboundedReceiver<T>
) -> impl Future<Item = Out<(Option<T>, mpsc::UnboundedReceiver<T>), B>, Error = IoError> {
stream
.into_future()
.map(Out::NewTimeout)
.map_err(|_| unreachable!("an UnboundedReceiver can never error"))
}
/// Does the equivalent to `future.map(move |()| (duration, item)).map_err(|err| to_io_err(err))`.
struct TimeoutWrapper<A, F, T>(F, Duration, Option<T>, PhantomData<A>);
impl<A, F, T> Future for TimeoutWrapper<A, F, T>
where F: Future<Item = (), Error = tokio_timer::Error> {
type Item = Out<A, T>;
type Error = IoError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.0.poll() {
Ok(Async::Ready(())) => (),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(err) => return Err(IoError::new(IoErrorKind::Other, err.to_string())),
}
let out = Out::Timeout(self.1, self.2.take().expect("poll() called again after success"));
Ok(Async::Ready(out))
}
}
+1 -161
View File
@@ -14,18 +14,9 @@
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use std::fmt;
use std::cmp::Ordering;
use std::iter;
use std::net::Ipv4Addr;
use std::str;
use std::time::Duration;
use TimerToken;
use std::{fmt, iter, net::Ipv4Addr, str};
use libp2p::{multiaddr::Protocol, Multiaddr, PeerId};
use error::Error;
/// Protocol handler level packet id
pub type PacketId = u8;
/// Protocol / handler id
pub type ProtocolId = [u8; 3];
@@ -38,66 +29,6 @@ pub type NodeIndex = usize;
/// secio secret key;
pub type Secret = [u8; 32];
/// Shared session information
#[derive(Debug, Clone)]
pub struct SessionInfo {
/// Peer public key
pub id: NodeId,
/// Peer client ID
pub client_version: String,
/// Peer RLPx protocol version
pub protocol_version: u32,
/// Session protocol capabilities
pub capabilities: Vec<SessionCapabilityInfo>,
/// Peer protocol capabilities
pub peer_capabilities: Vec<PeerCapabilityInfo>,
/// Peer ping delay
pub ping: Option<Duration>,
/// True if this session was originated by us.
pub originated: bool,
/// Remote endpoint address of the session
pub remote_address: String,
/// Local endpoint address of the session
pub local_address: String,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PeerCapabilityInfo {
pub protocol: ProtocolId,
pub version: u8,
}
impl ToString for PeerCapabilityInfo {
fn to_string(&self) -> String {
format!("{}/{}", str::from_utf8(&self.protocol[..]).unwrap_or("???"), self.version)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SessionCapabilityInfo {
pub protocol: [u8; 3],
pub version: u8,
pub packet_count: u8,
pub id_offset: u8,
}
impl PartialOrd for SessionCapabilityInfo {
fn partial_cmp(&self, other: &SessionCapabilityInfo) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for SessionCapabilityInfo {
fn cmp(&self, b: &SessionCapabilityInfo) -> Ordering {
// By protocol id first
if self.protocol != b.protocol {
return self.protocol.cmp(&b.protocol);
}
// By version
self.version.cmp(&b.version)
}
}
/// Network service configuration
#[derive(Debug, PartialEq, Clone)]
pub struct NetworkConfiguration {
@@ -188,97 +119,6 @@ impl<'a> fmt::Display for Severity<'a> {
}
}
/// IO access point. This is passed to all IO handlers and provides an interface to the IO subsystem.
pub trait NetworkContext {
/// Send a packet over the network to another peer.
fn send(&self, peer: NodeIndex, packet_id: PacketId, data: Vec<u8>);
/// Send a packet over the network to another peer using specified protocol.
fn send_protocol(&self, protocol: ProtocolId, peer: NodeIndex, packet_id: PacketId, data: Vec<u8>);
/// Respond to a current network message. Panics if no there is no packet in the context. If the session is expired returns nothing.
fn respond(&self, packet_id: PacketId, data: Vec<u8>);
/// Report peer. Depending on the report, peer may be disconnected and possibly banned.
fn report_peer(&self, peer: NodeIndex, reason: Severity);
/// Check if the session is still active.
fn is_expired(&self) -> bool;
/// Register a new IO timer. 'IoHandler::timeout' will be called with the token.
fn register_timer(&self, token: TimerToken, delay: Duration) -> Result<(), Error>;
/// Returns peer identification string
fn peer_client_version(&self, peer: NodeIndex) -> String;
/// Returns information on p2p session
fn session_info(&self, peer: NodeIndex) -> Option<SessionInfo>;
/// Returns max version for a given protocol.
fn protocol_version(&self, protocol: ProtocolId, peer: NodeIndex) -> Option<u8>;
/// Returns this object's subprotocol name.
fn subprotocol_name(&self) -> ProtocolId;
}
impl<'a, T> NetworkContext for &'a T where T: ?Sized + NetworkContext {
fn send(&self, peer: NodeIndex, packet_id: PacketId, data: Vec<u8>) {
(**self).send(peer, packet_id, data)
}
fn send_protocol(&self, protocol: ProtocolId, peer: NodeIndex, packet_id: PacketId, data: Vec<u8>) {
(**self).send_protocol(protocol, peer, packet_id, data)
}
fn respond(&self, packet_id: PacketId, data: Vec<u8>) {
(**self).respond(packet_id, data)
}
fn report_peer(&self, peer: NodeIndex, reason: Severity) {
(**self).report_peer(peer, reason)
}
fn is_expired(&self) -> bool {
(**self).is_expired()
}
fn register_timer(&self, token: TimerToken, delay: Duration) -> Result<(), Error> {
(**self).register_timer(token, delay)
}
fn peer_client_version(&self, peer: NodeIndex) -> String {
(**self).peer_client_version(peer)
}
fn session_info(&self, peer: NodeIndex) -> Option<SessionInfo> {
(**self).session_info(peer)
}
fn protocol_version(&self, protocol: ProtocolId, peer: NodeIndex) -> Option<u8> {
(**self).protocol_version(protocol, peer)
}
fn subprotocol_name(&self) -> ProtocolId {
(**self).subprotocol_name()
}
}
/// Network IO protocol handler. This needs to be implemented for each new subprotocol.
/// All the handler function are called from within IO event loop.
/// `Message` is the type for message data.
pub trait NetworkProtocolHandler: Sync + Send {
/// Initialize the handler
fn initialize(&self, _io: &NetworkContext) {}
/// Called when new network packet received.
fn read(&self, io: &NetworkContext, peer: &NodeIndex, packet_id: u8, data: &[u8]);
/// Called when new peer is connected. Only called when peer supports the same protocol.
fn connected(&self, io: &NetworkContext, peer: &NodeIndex);
/// Called when a previously connected peer disconnects.
fn disconnected(&self, io: &NetworkContext, peer: &NodeIndex);
/// Timer function called after a timeout created with `NetworkContext::timeout`.
fn timeout(&self, _io: &NetworkContext, _timer: TimerToken) {}
}
/// Non-reserved peer modes.
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum NonReservedPeerMode {
@@ -1,124 +0,0 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
extern crate parking_lot;
extern crate parity_bytes;
extern crate substrate_network_libp2p;
use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
use std::sync::Arc;
use std::thread;
use std::time::*;
use parking_lot::Mutex;
use parity_bytes::Bytes;
use substrate_network_libp2p::*;
use TimerToken;
pub struct TestProtocol {
drop_session: bool,
pub packet: Mutex<Bytes>,
pub got_timeout: AtomicBool,
pub got_disconnect: AtomicBool,
}
impl TestProtocol {
pub fn new(drop_session: bool) -> Self {
TestProtocol {
packet: Mutex::new(Vec::new()),
got_timeout: AtomicBool::new(false),
got_disconnect: AtomicBool::new(false),
drop_session: drop_session,
}
}
pub fn got_packet(&self) -> bool {
self.packet.lock()[..] == b"hello"[..]
}
pub fn got_timeout(&self) -> bool {
self.got_timeout.load(AtomicOrdering::Relaxed)
}
pub fn got_disconnect(&self) -> bool {
self.got_disconnect.load(AtomicOrdering::Relaxed)
}
}
impl NetworkProtocolHandler for TestProtocol {
fn initialize(&self, io: &NetworkContext) {
io.register_timer(0, Duration::from_millis(10)).unwrap();
}
fn read(&self, _io: &NetworkContext, _peer: &NodeIndex, packet_id: u8, data: &[u8]) {
assert_eq!(packet_id, 33);
self.packet.lock().extend(data);
}
fn connected(&self, io: &NetworkContext, peer: &NodeIndex) {
if self.drop_session {
io.report_peer(*peer, Severity::Bad("We are evil and just want to drop"))
} else {
io.respond(33, "hello".to_owned().into_bytes());
}
}
fn disconnected(&self, _io: &NetworkContext, _peer: &NodeIndex) {
self.got_disconnect.store(true, AtomicOrdering::Relaxed);
}
/// Timer function called after a timeout created with `NetworkContext::timeout`.
fn timeout(&self, _io: &NetworkContext, timer: TimerToken) {
assert_eq!(timer, 0);
self.got_timeout.store(true, AtomicOrdering::Relaxed);
}
}
#[test]
fn net_service() {
let _service = NetworkService::new(
NetworkConfiguration::new_local(),
vec![(Arc::new(TestProtocol::new(false)), *b"myp", &[(1u8, 1)])]
).expect("Error creating network service");
}
#[test]
#[ignore] // TODO: how is this test even supposed to work?
fn net_disconnect() {
let mut config1 = NetworkConfiguration::new_local();
config1.boot_nodes = vec![ ];
let handler1 = Arc::new(TestProtocol::new(false));
let service1 = NetworkService::new(config1, vec![(handler1.clone(), *b"tst", &[(42u8, 1), (43u8, 1)])]).unwrap();
let mut config2 = NetworkConfiguration::new_local();
config2.boot_nodes = vec![ service1.external_url().unwrap() ];
let handler2 = Arc::new(TestProtocol::new(true));
let _service2 = NetworkService::new(config2, vec![(handler2.clone(), *b"tst", &[(42u8, 1), (43u8, 1)])]).unwrap();
while !(handler1.got_disconnect() && handler2.got_disconnect()) {
thread::sleep(Duration::from_millis(50));
}
assert!(handler1.got_disconnect());
assert!(handler2.got_disconnect());
}
#[test]
fn net_timeout() {
let config = NetworkConfiguration::new_local();
let handler = Arc::new(TestProtocol::new(false));
let _service = NetworkService::new(config, vec![(handler.clone(), *b"tst", &[(42u8, 1), (43u8, 1)])]).unwrap();
while !handler.got_timeout() {
thread::sleep(Duration::from_millis(50));
}
}
+1
View File
@@ -22,6 +22,7 @@ sr-primitives = { path = "../../core/sr-primitives" }
parity-codec = "2.0"
parity-codec-derive = "2.0"
substrate-network-libp2p = { path = "../../core/network-libp2p" }
tokio = "0.1.11"
env_logger = { version = "0.4", optional = true }
substrate-keyring = { path = "../../core/keyring", optional = true }
+29 -22
View File
@@ -14,7 +14,9 @@
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use network_libp2p::{NetworkContext, Severity, NodeIndex, SessionInfo};
use parking_lot::Mutex;
use network_libp2p::{Service, Severity, NodeIndex, PeerId, ProtocolId};
use std::sync::Arc;
/// IO interface for the syncing handler.
/// Provides peer connection management and an interface to the blockchain client.
@@ -24,49 +26,54 @@ pub trait SyncIo {
/// Send a packet to a peer.
fn send(&mut self, who: NodeIndex, data: Vec<u8>);
/// Returns peer identifier string
fn peer_info(&self, who: NodeIndex) -> String {
fn peer_debug_info(&self, who: NodeIndex) -> String {
who.to_string()
}
/// Returns information on p2p session
fn peer_session_info(&self, who: NodeIndex) -> Option<SessionInfo>;
/// Check if the session is expired
fn is_expired(&self) -> bool;
fn peer_id(&self, who: NodeIndex) -> Option<PeerId>;
}
/// Wraps `NetworkContext` and the blockchain client
/// Wraps the network service.
pub struct NetSyncIo<'s> {
network: &'s NetworkContext,
network: &'s Arc<Mutex<Service>>,
protocol: ProtocolId,
}
impl<'s> NetSyncIo<'s> {
/// Creates a new instance from the `NetworkContext` and the blockchain client reference.
pub fn new(network: &'s NetworkContext) -> NetSyncIo<'s> {
/// Creates a new instance.
pub fn new(network: &'s Arc<Mutex<Service>>, protocol: ProtocolId) -> NetSyncIo<'s> {
NetSyncIo {
network: network,
network,
protocol,
}
}
}
impl<'s> SyncIo for NetSyncIo<'s> {
fn report_peer(&mut self, who: NodeIndex, reason: Severity) {
self.network.report_peer(who, reason);
info!("Purposefully dropping {} ; reason: {:?}", who, reason);
match reason {
Severity::Bad(_) => self.network.lock().ban_node(who),
Severity::Useless(_) => self.network.lock().drop_node(who),
Severity::Timeout => self.network.lock().drop_node(who),
}
}
fn send(&mut self, who: NodeIndex, data: Vec<u8>) {
self.network.send(who, 0, data)
self.network.lock().send_custom_message(who, self.protocol, data)
}
fn peer_session_info(&self, who: NodeIndex) -> Option<SessionInfo> {
self.network.session_info(who)
fn peer_id(&self, who: NodeIndex) -> Option<PeerId> {
let net = self.network.lock();
net.peer_id_of_node(who).cloned()
}
fn is_expired(&self) -> bool {
self.network.is_expired()
}
fn peer_info(&self, who: NodeIndex) -> String {
self.network.peer_client_version(who)
fn peer_debug_info(&self, who: NodeIndex) -> String {
let net = self.network.lock();
if let (Some(peer_id), Some(addr)) = (net.peer_id_of_node(who), net.node_endpoint(who)) {
format!("{:?} through {:?}", peer_id, addr)
} else {
"unknown".to_string()
}
}
}
+1
View File
@@ -32,6 +32,7 @@ extern crate parity_codec as codec;
extern crate futures;
extern crate rustc_hex;
extern crate rand;
extern crate tokio;
#[macro_use] extern crate log;
#[macro_use] extern crate bitflags;
#[macro_use] extern crate error_chain;
+5 -13
View File
@@ -41,8 +41,6 @@ const REQUEST_TIMEOUT_SEC: u64 = 40;
/// Current protocol version.
pub (crate) const CURRENT_VERSION: u32 = 1;
/// Current packet count.
pub (crate) const CURRENT_PACKET_COUNT: u8 = 1;
// Maximum allowed entries in `BlockResponse`
const MAX_BLOCK_DATA_RESPONSE: u32 = 128;
@@ -289,14 +287,14 @@ impl<B: BlockT, S: Specialization<B>, H: ExHashT> Protocol<B, S, H> {
/// Called when a new peer is connected
pub fn on_peer_connected(&self, io: &mut SyncIo, who: NodeIndex) {
trace!(target: "sync", "Connected {}: {}", who, io.peer_info(who));
trace!(target: "sync", "Connected {}: {}", who, io.peer_debug_info(who));
self.handshaking_peers.write().insert(who, time::Instant::now());
self.send_status(io, who);
}
/// Called by peer when it is disconnecting
pub fn on_peer_disconnected(&self, io: &mut SyncIo, peer: NodeIndex) {
trace!(target: "sync", "Disconnecting {}: {}", peer, io.peer_info(peer));
trace!(target: "sync", "Disconnecting {}: {}", peer, io.peer_debug_info(peer));
// lock all the the peer lists so that add/remove peer events are in order
let mut sync = self.sync.write();
@@ -420,16 +418,12 @@ impl<B: BlockT, S: Specialization<B>, H: ExHashT> Protocol<B, S, H> {
/// Called by peer to report status
fn on_status_message(&self, io: &mut SyncIo, who: NodeIndex, status: message::Status<B>) {
trace!(target: "sync", "New peer {} {:?}", who, status);
if io.is_expired() {
trace!(target: "sync", "Status packet from expired session {}:{}", who, io.peer_info(who));
return;
}
{
let mut peers = self.context_data.peers.write();
let mut handshaking_peers = self.handshaking_peers.write();
if peers.contains_key(&who) {
debug!(target: "sync", "Unexpected status packet from {}:{}", who, io.peer_info(who));
debug!(target: "sync", "Unexpected status packet from {}:{}", who, io.peer_debug_info(who));
return;
}
if status.genesis_hash != self.genesis_hash {
@@ -464,7 +458,7 @@ impl<B: BlockT, S: Specialization<B>, H: ExHashT> Protocol<B, S, H> {
};
peers.insert(who.clone(), peer);
handshaking_peers.remove(&who);
debug!(target: "sync", "Connected {} {}", who, io.peer_info(who));
debug!(target: "sync", "Connected {} {}", who, io.peer_debug_info(who));
}
let mut context = ProtocolContext::new(&self.context_data, io);
@@ -514,9 +508,7 @@ impl<B: BlockT, S: Specialization<B>, H: ExHashT> Protocol<B, S, H> {
.unzip();
if !to_send.is_empty() {
let node_id = io.peer_session_info(*who)
.map(|info| format!("{}@{:?}", info.remote_address, info.id));
let node_id = io.peer_id(*who).map(|id| id.to_base58());
if let Some(id) = node_id {
for hash in hashes {
propagated_to.entry(hash).or_insert_with(Vec::new).push(id.clone());
+206 -145
View File
@@ -16,14 +16,15 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::io;
use std::{io, thread};
use std::time::Duration;
use futures::sync::{oneshot, mpsc};
use network_libp2p::{NetworkProtocolHandler, NetworkContext, NodeIndex, ProtocolId,
NetworkConfiguration , NonReservedPeerMode, ErrorKind};
use network_libp2p::{NetworkService, PeerId};
use futures::{self, Future, Stream, stream, sync::{oneshot, mpsc}};
use parking_lot::Mutex;
use network_libp2p::{ProtocolId, PeerId, NetworkConfiguration, ErrorKind};
use network_libp2p::{start_service, Service as NetworkService, ServiceEvent as NetworkServiceEvent};
use network_libp2p::{RegisteredProtocol, parse_str_addr, Protocol as Libp2pProtocol};
use io::NetSyncIo;
use protocol::{Protocol, ProtocolContext, Context, ProtocolStatus, PeerInfo as ProtocolPeerInfo};
use protocol::{self, Protocol, ProtocolContext, Context, ProtocolStatus};
use config::{ProtocolConfig};
use error::Error;
use chain::Client;
@@ -32,18 +33,14 @@ use specialization::Specialization;
use on_demand::OnDemandService;
use import_queue::AsyncImportQueue;
use runtime_primitives::traits::{Block as BlockT};
use tokio::{runtime::Runtime, timer::Interval};
/// Type that represents fetch completion future.
pub type FetchFuture = oneshot::Receiver<Vec<u8>>;
/// Type that represents bft messages stream.
pub type BftMessageStream<B> = mpsc::UnboundedReceiver<LocalizedBftMessage<B>>;
type TimerToken = usize;
const TICK_TOKEN: TimerToken = 0;
const TICK_TIMEOUT: Duration = Duration::from_millis(1000);
const PROPAGATE_TOKEN: TimerToken = 1;
const PROPAGATE_TIMEOUT: Duration = Duration::from_millis(5000);
bitflags! {
@@ -76,8 +73,6 @@ impl ::codec::Decode for Roles {
pub trait SyncProvider<B: BlockT>: Send + Sync {
/// Get sync status
fn status(&self) -> ProtocolStatus<B>;
/// Get peers information
fn peers(&self) -> Vec<PeerInfo<B>>;
/// Get this node id if available.
fn node_id(&self) -> Option<String>;
}
@@ -113,28 +108,6 @@ pub trait ExecuteInContext<B: BlockT>: Send + Sync {
fn execute_in_context<F: Fn(&mut Context<B>)>(&self, closure: F);
}
/// Network protocol handler
struct ProtocolHandler<B: BlockT, S: Specialization<B>, H: ExHashT> {
protocol: Protocol<B, S, H>,
}
/// Peer connection information
#[derive(Debug)]
pub struct PeerInfo<B: BlockT> {
/// Public node id
pub id: PeerId,
/// Node client ID
pub client_version: String,
/// Capabilities
pub capabilities: Vec<String>,
/// Remote endpoint address
pub remote_address: String,
/// Local endpoint address
pub local_address: String,
/// Dot protocol info.
pub dot_info: Option<ProtocolPeerInfo<B>>,
}
/// Service initialization parameters.
pub struct Params<B: BlockT, S, H: ExHashT> {
/// Configuration.
@@ -154,50 +127,43 @@ pub struct Params<B: BlockT, S, H: ExHashT> {
/// Substrate network service. Handles network IO and manages connectivity.
pub struct Service<B: BlockT + 'static, S: Specialization<B>, H: ExHashT> {
/// Network service
network: NetworkService,
/// Devp2p protocol handler
handler: Arc<ProtocolHandler<B, S, H>>,
/// Devp2p protocol ID.
network: Arc<Mutex<NetworkService>>,
/// Protocol handler
handler: Arc<Protocol<B, S, H>>,
/// Protocol ID.
protocol_id: ProtocolId,
/// Sender for messages to the backgound service task, and handle for the background thread.
/// Dropping the sender should close the task and the thread.
/// This is an `Option` because we need to extract it in the destructor.
bg_thread: Option<(oneshot::Sender<()>, thread::JoinHandle<()>)>,
}
impl<B: BlockT + 'static, S: Specialization<B>, H: ExHashT> Service<B, S, H> {
/// Creates and register protocol with the network service
pub fn new(params: Params<B, S, H>, protocol_id: ProtocolId) -> Result<Arc<Service<B, S, H>>, Error> {
let chain = params.chain.clone();
// TODO: non-insatnt finality.
// TODO: non-instant finality.
let import_queue = Arc::new(AsyncImportQueue::new(true));
let handler = Arc::new(ProtocolHandler {
protocol: Protocol::new(
params.config,
params.chain,
import_queue.clone(),
params.on_demand,
params.transaction_pool,
params.specialization,
)?,
});
let versions = [(::protocol::CURRENT_VERSION as u8, ::protocol::CURRENT_PACKET_COUNT)];
let protocols = vec![(handler.clone() as Arc<_>, protocol_id, &versions[..])];
let service = match NetworkService::new(params.network_config.clone(), protocols) {
Ok(service) => service,
Err(err) => {
match err.kind() {
ErrorKind::Io(ref e) if e.kind() == io::ErrorKind::AddrInUse =>
warn!("Network port is already in use, make sure that another instance of Substrate client is not running or change the port using the --port option."),
_ => warn!("Error starting network: {}", err),
};
return Err(err.into())
},
};
let handler = Arc::new(Protocol::new(
params.config,
params.chain,
import_queue.clone(),
params.on_demand,
params.transaction_pool,
params.specialization,
)?);
let versions = [(protocol::CURRENT_VERSION as u8)];
let registered = RegisteredProtocol::new(protocol_id, &versions[..]);
let (thread, network) = start_thread(params.network_config, handler.clone(), registered)?;
let sync = Arc::new(Service {
network: service,
network,
protocol_id,
handler,
bg_thread: Some(thread),
});
import_queue.start(
Arc::downgrade(sync.handler.protocol.sync()),
Arc::downgrade(sync.handler.sync()),
Arc::downgrade(&sync),
Arc::downgrade(&chain)
)?;
@@ -207,106 +173,57 @@ impl<B: BlockT + 'static, S: Specialization<B>, H: ExHashT> Service<B, S, H> {
/// Called when a new block is imported by the client.
pub fn on_block_imported(&self, hash: B::Hash, header: &B::Header) {
self.network.with_context(self.protocol_id, |context| {
self.handler.protocol.on_block_imported(&mut NetSyncIo::new(context), hash, header)
});
self.handler.on_block_imported(&mut NetSyncIo::new(&self.network, self.protocol_id), hash, header)
}
/// Called when new transactons are imported by the client.
pub fn trigger_repropagate(&self) {
self.network.with_context(self.protocol_id, |context| {
self.handler.protocol.propagate_extrinsics(&mut NetSyncIo::new(context));
});
self.handler.propagate_extrinsics(&mut NetSyncIo::new(&self.network, self.protocol_id));
}
/// Execute a closure with the chain-specific network specialization.
/// If the network is unavailable, this will return `None`.
pub fn with_spec<F, U>(&self, f: F) -> Option<U>
pub fn with_spec<F, U>(&self, f: F) -> U
where F: FnOnce(&mut S, &mut Context<B>) -> U
{
let mut res = None;
self.network.with_context(self.protocol_id, |context| {
res = Some(self.handler.protocol.with_spec(&mut NetSyncIo::new(context), f))
});
res
self.handler.with_spec(&mut NetSyncIo::new(&self.network, self.protocol_id), f)
}
}
impl<B: BlockT + 'static, S: Specialization<B>, H:ExHashT> Drop for Service<B, S, H> {
fn drop(&mut self) {
self.handler.protocol.stop();
self.handler.stop();
if let Some((sender, join)) = self.bg_thread.take() {
let _ = sender.send(());
if let Err(e) = join.join() {
error!("Error while waiting on background thread: {:?}", e);
}
}
}
}
impl<B: BlockT + 'static, S: Specialization<B>, H: ExHashT> ExecuteInContext<B> for Service<B, S, H> {
fn execute_in_context<F: Fn(&mut ::protocol::Context<B>)>(&self, closure: F) {
self.network.with_context(self.protocol_id, |context| {
closure(&mut ProtocolContext::new(self.handler.protocol.context_data(), &mut NetSyncIo::new(context)))
});
closure(&mut ProtocolContext::new(self.handler.context_data(), &mut NetSyncIo::new(&self.network, self.protocol_id)))
}
}
impl<B: BlockT + 'static, S: Specialization<B>, H: ExHashT> SyncProvider<B> for Service<B, S, H> {
/// Get sync status
fn status(&self) -> ProtocolStatus<B> {
self.handler.protocol.status()
}
/// Get sync peers
fn peers(&self) -> Vec<PeerInfo<B>> {
self.network.with_context_eval(self.protocol_id, |ctx| {
let peer_ids = self.network.connected_peers();
peer_ids.into_iter().filter_map(|who| {
let session_info = match ctx.session_info(who) {
None => return None,
Some(info) => info,
};
Some(PeerInfo {
id: session_info.id,
client_version: session_info.client_version,
capabilities: session_info.peer_capabilities.into_iter().map(|c| c.to_string()).collect(),
remote_address: session_info.remote_address,
local_address: session_info.local_address,
dot_info: self.handler.protocol.peer_info(who),
})
}).collect()
}).unwrap_or_else(Vec::new)
self.handler.status()
}
fn node_id(&self) -> Option<String> {
self.network.external_url()
}
}
impl<B: BlockT + 'static, S: Specialization<B>, H: ExHashT> NetworkProtocolHandler for ProtocolHandler<B, S, H> {
fn initialize(&self, io: &NetworkContext) {
io.register_timer(TICK_TOKEN, TICK_TIMEOUT)
.expect("Error registering sync timer");
io.register_timer(PROPAGATE_TOKEN, PROPAGATE_TIMEOUT)
.expect("Error registering transaction propagation timer");
}
fn read(&self, io: &NetworkContext, peer: &NodeIndex, _packet_id: u8, data: &[u8]) {
self.protocol.handle_packet(&mut NetSyncIo::new(io), *peer, data);
}
fn connected(&self, io: &NetworkContext, peer: &NodeIndex) {
self.protocol.on_peer_connected(&mut NetSyncIo::new(io), *peer);
}
fn disconnected(&self, io: &NetworkContext, peer: &NodeIndex) {
self.protocol.on_peer_disconnected(&mut NetSyncIo::new(io), *peer);
}
fn timeout(&self, io: &NetworkContext, timer: TimerToken) {
match timer {
TICK_TOKEN => self.protocol.tick(&mut NetSyncIo::new(io)),
PROPAGATE_TOKEN => self.protocol.propagate_extrinsics(&mut NetSyncIo::new(io)),
_ => {}
}
let network = self.network.lock();
let ret = network
.listeners()
.next()
.map(|addr| {
let mut addr = addr.clone();
addr.append(Libp2pProtocol::P2p(network.peer_id().clone().into()));
addr.to_string()
});
ret
}
}
@@ -317,26 +234,170 @@ pub trait ManageNetwork: Send + Sync {
/// Set to deny unreserved peers to connect
fn deny_unreserved_peers(&self);
/// Remove reservation for the peer
fn remove_reserved_peer(&self, peer: String) -> Result<(), String>;
fn remove_reserved_peer(&self, peer: PeerId);
/// Add reserved peer
fn add_reserved_peer(&self, peer: String) -> Result<(), String>;
}
impl<B: BlockT + 'static, S: Specialization<B>, H: ExHashT> ManageNetwork for Service<B, S, H> {
fn accept_unreserved_peers(&self) {
self.network.set_non_reserved_mode(NonReservedPeerMode::Accept);
self.network.lock().accept_unreserved_peers();
}
fn deny_unreserved_peers(&self) {
self.network.set_non_reserved_mode(NonReservedPeerMode::Deny);
// This method can disconnect nodes, in which case we have to properly close them in the
// protocol.
let disconnected = self.network.lock().deny_unreserved_peers();
let mut net_sync = NetSyncIo::new(&self.network, self.protocol_id);
for node_index in disconnected {
self.handler.on_peer_disconnected(&mut net_sync, node_index)
}
}
fn remove_reserved_peer(&self, peer: String) -> Result<(), String> {
self.network.remove_reserved_peer(&peer).map_err(|e| format!("{:?}", e))
fn remove_reserved_peer(&self, peer: PeerId) {
// This method can disconnect a node, in which case we have to properly close it in the
// protocol.
let disconnected = self.network.lock().remove_reserved_peer(peer);
if let Some(node_index) = disconnected {
let mut net_sync = NetSyncIo::new(&self.network, self.protocol_id);
self.handler.on_peer_disconnected(&mut net_sync, node_index)
}
}
fn add_reserved_peer(&self, peer: String) -> Result<(), String> {
self.network.add_reserved_peer(&peer).map_err(|e| format!("{:?}", e))
let (addr, peer_id) = parse_str_addr(&peer).map_err(|e| format!("{:?}", e))?;
self.network.lock().add_reserved_peer(addr, peer_id);
Ok(())
}
}
/// Starts the background thread that handles the networking.
fn start_thread<B: BlockT + 'static, S: Specialization<B>, H: ExHashT>(
config: NetworkConfiguration,
protocol: Arc<Protocol<B, S, H>>,
registered: RegisteredProtocol,
) -> Result<((oneshot::Sender<()>, thread::JoinHandle<()>), Arc<Mutex<NetworkService>>), Error> {
let protocol_id = registered.id();
// Start the main service.
let service = match start_service(config, Some(registered)) {
Ok(service) => Arc::new(Mutex::new(service)),
Err(err) => {
match err.kind() {
ErrorKind::Io(ref e) if e.kind() == io::ErrorKind::AddrInUse =>
warn!("Network port is already in use, make sure that another instance of Substrate client is not running or change the port using the --port option."),
_ => warn!("Error starting network: {}", err),
};
return Err(err.into())
},
};
let (close_tx, close_rx) = oneshot::channel();
let service_clone = service.clone();
let mut runtime = Runtime::new()?;
let thread = thread::Builder::new().name("network".to_string()).spawn(move || {
let fut = run_thread(service_clone, protocol, protocol_id)
.select(close_rx.then(|_| Ok(())))
.map(|(val, _)| val)
.map_err(|(err,_ )| err);
// Note that we use `block_on` and not `block_on_all` because we want to kill the thread
// instantly if `close_rx` receives something.
match runtime.block_on(fut) {
Ok(()) => debug!(target: "sub-libp2p", "Networking thread finished"),
Err(err) => error!(target: "sub-libp2p", "Error while running libp2p: {:?}", err),
};
})?;
Ok(((close_tx, thread), service))
}
/// Runs the background thread that handles the networking.
fn run_thread<B: BlockT + 'static, S: Specialization<B>, H: ExHashT>(
network_service: Arc<Mutex<NetworkService>>,
protocol: Arc<Protocol<B, S, H>>,
protocol_id: ProtocolId,
) -> impl Future<Item = (), Error = io::Error> {
// Interval for performing maintenance on the protocol handler.
let tick = Interval::new_interval(TICK_TIMEOUT)
.for_each({
let protocol = protocol.clone();
let network_service = network_service.clone();
move |_| {
protocol.tick(&mut NetSyncIo::new(&network_service, protocol_id));
Ok(())
}
})
.then(|res| {
match res {
Ok(()) => (),
Err(err) => error!("Error in the propagation timer: {:?}", err),
};
Ok(())
});
// Interval at which we gossip extrinsics over the network.
let propagate = Interval::new_interval(PROPAGATE_TIMEOUT)
.for_each({
let protocol = protocol.clone();
let network_service = network_service.clone();
move |_| {
protocol.propagate_extrinsics(&mut NetSyncIo::new(&network_service, protocol_id));
Ok(())
}
})
.then(|res| {
match res {
Ok(()) => (),
Err(err) => error!("Error in the propagation timer: {:?}", err),
};
Ok(())
});
// The network service produces events about what happens on the network. Let's process them.
let network_service2 = network_service.clone();
let network = stream::poll_fn(move || network_service2.lock().poll()).for_each(move |event| {
let mut net_sync = NetSyncIo::new(&network_service, protocol_id);
match event {
NetworkServiceEvent::NodeClosed { node_index, closed_custom_protocols } => {
if !closed_custom_protocols.is_empty() {
debug_assert_eq!(closed_custom_protocols, &[protocol_id]);
protocol.on_peer_disconnected(&mut net_sync, node_index);
}
}
NetworkServiceEvent::ClosedCustomProtocols { node_index, protocols } => {
if !protocols.is_empty() {
debug_assert_eq!(protocols, &[protocol_id]);
protocol.on_peer_disconnected(&mut net_sync, node_index);
}
}
NetworkServiceEvent::OpenedCustomProtocol { node_index, version, .. } => {
debug_assert_eq!(version, protocol::CURRENT_VERSION as u8);
protocol.on_peer_connected(&mut net_sync, node_index);
}
NetworkServiceEvent::ClosedCustomProtocol { node_index, .. } => {
protocol.on_peer_disconnected(&mut net_sync, node_index);
}
NetworkServiceEvent::CustomMessage { node_index, data, .. } => {
protocol.handle_packet(&mut net_sync, node_index, &data);
}
};
Ok(())
});
// Merge all futures into one.
let futures: Vec<Box<Future<Item = (), Error = io::Error> + Send>> = vec![
Box::new(tick) as Box<_>,
Box::new(propagate) as Box<_>,
Box::new(network) as Box<_>
];
futures::select_all(futures)
.and_then(move |_| {
debug!("Networking ended");
Ok(())
})
.map_err(|(r, _, _)| r)
}
+4 -12
View File
@@ -31,7 +31,7 @@ use protocol::{Context, Protocol};
use primitives::{Blake2Hasher};
use config::ProtocolConfig;
use service::TransactionPool;
use network_libp2p::{NodeIndex, SessionInfo, Severity};
use network_libp2p::{NodeIndex, PeerId, Severity};
use keyring::Keyring;
use codec::{Encode, Decode};
use import_queue::SyncImportQueue;
@@ -79,7 +79,6 @@ pub struct TestIo<'p> {
queue: &'p RwLock<VecDeque<TestPacket>>,
pub to_disconnect: HashSet<NodeIndex>,
packets: Vec<TestPacket>,
peers_info: HashMap<NodeIndex, String>,
_sender: Option<NodeIndex>,
}
@@ -90,7 +89,6 @@ impl<'p> TestIo<'p> where {
_sender: sender,
to_disconnect: HashSet::new(),
packets: Vec::new(),
peers_info: HashMap::new(),
}
}
}
@@ -106,10 +104,6 @@ impl<'p> SyncIo for TestIo<'p> {
self.to_disconnect.insert(who);
}
fn is_expired(&self) -> bool {
false
}
fn send(&mut self, who: NodeIndex, data: Vec<u8>) {
self.packets.push(TestPacket {
data: data,
@@ -117,13 +111,11 @@ impl<'p> SyncIo for TestIo<'p> {
});
}
fn peer_info(&self, who: NodeIndex) -> String {
self.peers_info.get(&who)
.cloned()
.unwrap_or_else(|| who.to_string())
fn peer_debug_info(&self, _who: NodeIndex) -> String {
"unknown".to_string()
}
fn peer_session_info(&self, _peer_id: NodeIndex) -> Option<SessionInfo> {
fn peer_id(&self, _peer_id: NodeIndex) -> Option<PeerId> {
None
}
}
+2 -10
View File
@@ -284,18 +284,10 @@ impl<P: AuthoringApi + Send + Sync + 'static> Network for ConsensusNetwork<P> {
}
});
match process_task {
Some(task) =>
if let Err(e) = Executor::spawn(&mut task_executor, Box::new(task)) {
debug!(target: "node-network", "Cannot spawn message processing: {:?}", e)
},
None => warn!(target: "node-network", "Cannot process incoming messages: network appears to be down"),
if let Err(e) = Executor::spawn(&mut task_executor, Box::new(process_task)) {
debug!(target: "node-network", "Cannot spawn message processing: {:?}", e)
}
(InputAdapter { input: bft_recv }, sink)
}
}
/// Error when the network appears to be down.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct NetworkDown;