Merge network-libp2p into network (#2843)

* Move network-libp2p into network

* Merge libp2p_service into service

* Don't expose RegisteredProtocol in the API

* Extract DiscoveryBehaviour from Behaviour

* Restore libp2p tests

* Add a test for discovery

* Line width

* Remove bandwidth check

* Fix gitlab
This commit is contained in:
Pierre Krieger
2019-06-13 18:21:31 +02:00
committed by Gavin Wood
parent f4afdd2f0b
commit 12bbc2ffd9
28 changed files with 1072 additions and 1326 deletions
+255
View File
@@ -0,0 +1,255 @@
// Copyright 2019 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use crate::{debug_info, discovery::DiscoveryBehaviour, discovery::DiscoveryOut, DiscoveryNetBehaviour};
use futures::prelude::*;
use libp2p::NetworkBehaviour;
use libp2p::core::{Multiaddr, PeerId, ProtocolsHandler, protocols_handler::IntoProtocolsHandler, PublicKey};
use libp2p::core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction};
use libp2p::core::swarm::{NetworkBehaviourEventProcess, PollParameters};
#[cfg(not(target_os = "unknown"))]
use libp2p::core::swarm::toggle::Toggle;
#[cfg(not(target_os = "unknown"))]
use libp2p::mdns::{Mdns, MdnsEvent};
use log::warn;
use std::iter;
use void;
/// General behaviour of the network.
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "TBehaviourEv", poll_method = "poll")]
pub struct Behaviour<TBehaviour, TBehaviourEv, TSubstream> {
/// Main protocol that handles everything except the discovery and the technicalities.
user_protocol: UserBehaviourWrap<TBehaviour>,
/// Periodically pings and identifies the nodes we are connected to, and store information in a
/// cache.
debug_info: debug_info::DebugInfoBehaviour<TSubstream>,
/// Discovers nodes of the network. Defined below.
discovery: DiscoveryBehaviour<TSubstream>,
/// Discovers nodes on the local network.
#[cfg(not(target_os = "unknown"))]
mdns: Toggle<Mdns<TSubstream>>,
/// Queue of events to produce for the outside.
#[behaviour(ignore)]
events: Vec<TBehaviourEv>,
}
impl<TBehaviour, TBehaviourEv, TSubstream> Behaviour<TBehaviour, TBehaviourEv, TSubstream> {
/// Builds a new `Behaviour`.
pub fn new(
user_protocol: TBehaviour,
user_agent: String,
local_public_key: PublicKey,
known_addresses: Vec<(PeerId, Multiaddr)>,
enable_mdns: bool,
) -> Self {
let debug_info = debug_info::DebugInfoBehaviour::new(user_agent, local_public_key.clone());
if enable_mdns {
#[cfg(target_os = "unknown")]
warn!(target: "sub-libp2p", "mDNS is not available on this platform");
}
Behaviour {
user_protocol: UserBehaviourWrap(user_protocol),
debug_info,
discovery: DiscoveryBehaviour::new(local_public_key, known_addresses),
#[cfg(not(target_os = "unknown"))]
mdns: if enable_mdns {
match Mdns::new() {
Ok(mdns) => Some(mdns).into(),
Err(err) => {
warn!(target: "sub-libp2p", "Failed to initialize mDNS: {:?}", err);
None.into()
}
}
} else {
None.into()
},
events: Vec::new(),
}
}
/// Returns the list of nodes that we know exist in the network.
pub fn known_peers(&mut self) -> impl Iterator<Item = &PeerId> {
self.discovery.known_peers()
}
/// Adds a hard-coded address for the given peer, that never expires.
pub fn add_known_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
self.discovery.add_known_address(peer_id, addr)
}
/// Borrows `self` and returns a struct giving access to the information about a node.
///
/// Returns `None` if we don't know anything about this node. Always returns `Some` for nodes
/// we're connected to, meaning that if `None` is returned then we're not connected to that
/// node.
pub fn node(&self, peer_id: &PeerId) -> Option<debug_info::Node> {
self.debug_info.node(peer_id)
}
/// Returns a shared reference to the user protocol.
pub fn user_protocol(&self) -> &TBehaviour {
&self.user_protocol.0
}
/// Returns a mutable reference to the user protocol.
pub fn user_protocol_mut(&mut self) -> &mut TBehaviour {
&mut self.user_protocol.0
}
}
impl<TBehaviour, TBehaviourEv, TSubstream> NetworkBehaviourEventProcess<void::Void> for
Behaviour<TBehaviour, TBehaviourEv, TSubstream> {
fn inject_event(&mut self, event: void::Void) {
void::unreachable(event)
}
}
impl<TBehaviour, TBehaviourEv, TSubstream> NetworkBehaviourEventProcess<UserEventWrap<TBehaviourEv>> for
Behaviour<TBehaviour, TBehaviourEv, TSubstream> {
fn inject_event(&mut self, event: UserEventWrap<TBehaviourEv>) {
self.events.push(event.0);
}
}
impl<TBehaviour, TBehaviourEv, TSubstream> NetworkBehaviourEventProcess<debug_info::DebugInfoEvent>
for Behaviour<TBehaviour, TBehaviourEv, TSubstream>
where TBehaviour: DiscoveryNetBehaviour {
fn inject_event(&mut self, event: debug_info::DebugInfoEvent) {
let debug_info::DebugInfoEvent::Identified { peer_id, mut info } = event;
if !info.protocol_version.contains("substrate") {
warn!(target: "sub-libp2p", "Connected to a non-Substrate node: {:?}", info);
}
if info.listen_addrs.len() > 30 {
warn!(target: "sub-libp2p", "Node {:?} has reported more than 30 addresses; \
it is identified by {:?} and {:?}", peer_id, info.protocol_version,
info.agent_version
);
info.listen_addrs.truncate(30);
}
for addr in &info.listen_addrs {
self.discovery.add_self_reported_address(&peer_id, addr.clone());
}
self.user_protocol.0.add_discovered_nodes(iter::once(peer_id.clone()));
}
}
impl<TBehaviour, TBehaviourEv, TSubstream> NetworkBehaviourEventProcess<DiscoveryOut>
for Behaviour<TBehaviour, TBehaviourEv, TSubstream>
where TBehaviour: DiscoveryNetBehaviour {
fn inject_event(&mut self, out: DiscoveryOut) {
match out {
DiscoveryOut::Discovered(peer_id) => {
self.user_protocol.0.add_discovered_nodes(iter::once(peer_id));
}
}
}
}
#[cfg(not(target_os = "unknown"))]
impl<TBehaviour, TBehaviourEv, TSubstream> NetworkBehaviourEventProcess<MdnsEvent> for
Behaviour<TBehaviour, TBehaviourEv, TSubstream>
where TBehaviour: DiscoveryNetBehaviour {
fn inject_event(&mut self, event: MdnsEvent) {
match event {
MdnsEvent::Discovered(list) => {
self.user_protocol.0.add_discovered_nodes(list.into_iter().map(|(peer_id, _)| peer_id));
},
MdnsEvent::Expired(_) => {}
}
}
}
impl<TBehaviour, TBehaviourEv, TSubstream> Behaviour<TBehaviour, TBehaviourEv, TSubstream> {
fn poll<TEv>(&mut self) -> Async<NetworkBehaviourAction<TEv, TBehaviourEv>> {
if !self.events.is_empty() {
return Async::Ready(NetworkBehaviourAction::GenerateEvent(self.events.remove(0)))
}
Async::NotReady
}
}
/// Because of limitations with the network behaviour custom derive and trait impl duplication, we
/// have to wrap the user protocol into a struct.
pub struct UserBehaviourWrap<TInner>(TInner);
/// Event produced by `UserBehaviourWrap`.
pub struct UserEventWrap<TInner>(TInner);
impl<TInner: NetworkBehaviour> NetworkBehaviour for UserBehaviourWrap<TInner> {
type ProtocolsHandler = TInner::ProtocolsHandler;
type OutEvent = UserEventWrap<TInner::OutEvent>;
fn new_handler(&mut self) -> Self::ProtocolsHandler { self.0.new_handler() }
fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
self.0.addresses_of_peer(peer_id)
}
fn inject_connected(&mut self, peer_id: PeerId, endpoint: ConnectedPoint) {
self.0.inject_connected(peer_id, endpoint)
}
fn inject_disconnected(&mut self, peer_id: &PeerId, endpoint: ConnectedPoint) {
self.0.inject_disconnected(peer_id, endpoint)
}
fn inject_node_event(
&mut self,
peer_id: PeerId,
event: <<Self::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent
) {
self.0.inject_node_event(peer_id, event)
}
fn poll(
&mut self,
params: &mut PollParameters
) -> Async<
NetworkBehaviourAction<
<<Self::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent,
Self::OutEvent
>
> {
match self.0.poll(params) {
Async::NotReady => Async::NotReady,
Async::Ready(NetworkBehaviourAction::GenerateEvent(ev)) =>
Async::Ready(NetworkBehaviourAction::GenerateEvent(UserEventWrap(ev))),
Async::Ready(NetworkBehaviourAction::DialAddress { address }) =>
Async::Ready(NetworkBehaviourAction::DialAddress { address }),
Async::Ready(NetworkBehaviourAction::DialPeer { peer_id }) =>
Async::Ready(NetworkBehaviourAction::DialPeer { peer_id }),
Async::Ready(NetworkBehaviourAction::SendEvent { peer_id, event }) =>
Async::Ready(NetworkBehaviourAction::SendEvent { peer_id, event }),
Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address }) =>
Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address }),
}
}
fn inject_replaced(&mut self, peer_id: PeerId, closed_endpoint: ConnectedPoint, new_endpoint: ConnectedPoint) {
self.0.inject_replaced(peer_id, closed_endpoint, new_endpoint)
}
fn inject_addr_reach_failure(&mut self, peer_id: Option<&PeerId>, addr: &Multiaddr, error: &dyn std::error::Error) {
self.0.inject_addr_reach_failure(peer_id, addr, error)
}
fn inject_dial_failure(&mut self, peer_id: &PeerId) {
self.0.inject_dial_failure(peer_id)
}
fn inject_new_listen_addr(&mut self, addr: &Multiaddr) {
self.0.inject_new_listen_addr(addr)
}
fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) {
self.0.inject_expired_listen_addr(addr)
}
fn inject_new_external_addr(&mut self, addr: &Multiaddr) {
self.0.inject_new_external_addr(addr)
}
}
+289 -4
View File
@@ -17,16 +17,22 @@
//! Configuration for the networking layer of Substrate.
pub use crate::protocol::ProtocolConfig;
pub use network_libp2p::{NonReservedPeerMode, NetworkConfiguration, NodeKeyConfig, ProtocolId, Secret};
use crate::ProtocolId;
use crate::chain::{Client, FinalityProofProvider};
use crate::on_demand_layer::OnDemand;
use crate::service::{ExHashT, TransactionPool};
use bitflags::bitflags;
use consensus::import_queue::ImportQueue;
use crate::chain::{Client, FinalityProofProvider};
use parity_codec;
use crate::on_demand_layer::OnDemand;
use runtime_primitives::traits::{Block as BlockT};
use crate::service::{ExHashT, TransactionPool};
use std::sync::Arc;
use libp2p::identity::{Keypair, secp256k1, ed25519};
use libp2p::wasm_ext;
use libp2p::{Multiaddr, multiaddr::Protocol};
use std::error::Error;
use std::{io::{self, Write}, iter, fs, net::Ipv4Addr, path::{Path, PathBuf}};
use zeroize::Zeroize;
/// Service initialization parameters.
pub struct Params<B: BlockT, S, H: ExHashT> {
@@ -87,3 +93,282 @@ impl parity_codec::Decode for Roles {
Self::from_bits(input.read_byte()?)
}
}
/// Network service configuration.
#[derive(Clone)]
pub struct NetworkConfiguration {
/// Directory path to store general network configuration. None means nothing will be saved.
pub config_path: Option<String>,
/// Directory path to store network-specific configuration. None means nothing will be saved.
pub net_config_path: Option<String>,
/// Multiaddresses to listen for incoming connections.
pub listen_addresses: Vec<Multiaddr>,
/// Multiaddresses to advertise. Detected automatically if empty.
pub public_addresses: Vec<Multiaddr>,
/// List of initial node addresses
pub boot_nodes: Vec<String>,
/// The node key configuration, which determines the node's network identity keypair.
pub node_key: NodeKeyConfig,
/// Maximum allowed number of incoming connections.
pub in_peers: u32,
/// Number of outgoing connections we're trying to maintain.
pub out_peers: u32,
/// List of reserved node addresses.
pub reserved_nodes: Vec<String>,
/// The non-reserved peer mode.
pub non_reserved_mode: NonReservedPeerMode,
/// Client identifier. Sent over the wire for debugging purposes.
pub client_version: String,
/// Name of the node. Sent over the wire for debugging purposes.
pub node_name: String,
/// If true, the network will use mDNS to discover other libp2p nodes on the local network
/// and connect to them if they support the same chain.
pub enable_mdns: bool,
/// Optional external implementation of a libp2p transport. Used in WASM contexts where we need
/// some binding between the networking provided by the operating system or environment and
/// libp2p.
///
/// This parameter exists whatever the target platform is, but it is expected to be set to
/// `Some` only when compiling for WASM.
pub wasm_external_transport: Option<wasm_ext::ExtTransport>,
}
impl Default for NetworkConfiguration {
fn default() -> Self {
NetworkConfiguration {
config_path: None,
net_config_path: None,
listen_addresses: Vec::new(),
public_addresses: Vec::new(),
boot_nodes: Vec::new(),
node_key: NodeKeyConfig::Ed25519(Secret::New),
in_peers: 25,
out_peers: 75,
reserved_nodes: Vec::new(),
non_reserved_mode: NonReservedPeerMode::Accept,
client_version: "unknown".into(),
node_name: "unknown".into(),
enable_mdns: false,
wasm_external_transport: None,
}
}
}
impl NetworkConfiguration {
/// Create a new instance of default settings.
pub fn new() -> Self {
Self::default()
}
/// Create new default configuration for localhost-only connection with random port (useful for testing)
pub fn new_local() -> NetworkConfiguration {
let mut config = NetworkConfiguration::new();
config.listen_addresses = vec![
iter::once(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))
.chain(iter::once(Protocol::Tcp(0)))
.collect()
];
config
}
}
/// The policy for connections to non-reserved peers.
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum NonReservedPeerMode {
/// Accept them. This is the default.
Accept,
/// Deny them.
Deny,
}
impl NonReservedPeerMode {
/// Attempt to parse the peer mode from a string.
pub fn parse(s: &str) -> Option<Self> {
match s {
"accept" => Some(NonReservedPeerMode::Accept),
"deny" => Some(NonReservedPeerMode::Deny),
_ => None,
}
}
}
/// The configuration of a node's secret key, describing the type of key
/// and how it is obtained. A node's identity keypair is the result of
/// the evaluation of the node key configuration.
#[derive(Clone)]
pub enum NodeKeyConfig {
/// A Secp256k1 secret key configuration.
Secp256k1(Secret<secp256k1::SecretKey>),
/// A Ed25519 secret key configuration.
Ed25519(Secret<ed25519::SecretKey>)
}
/// The options for obtaining a Secp256k1 secret key.
pub type Secp256k1Secret = Secret<secp256k1::SecretKey>;
/// The options for obtaining a Ed25519 secret key.
pub type Ed25519Secret = Secret<ed25519::SecretKey>;
/// The configuration options for obtaining a secret key `K`.
#[derive(Clone)]
pub enum Secret<K> {
/// Use the given secret key `K`.
Input(K),
/// Read the secret key from a file. If the file does not exist,
/// it is created with a newly generated secret key `K`. The format
/// of the file is determined by `K`:
///
/// * `secp256k1::SecretKey`: An unencoded 32 bytes Secp256k1 secret key.
/// * `ed25519::SecretKey`: An unencoded 32 bytes Ed25519 secret key.
File(PathBuf),
/// Always generate a new secret key `K`.
New
}
impl NodeKeyConfig {
/// Evaluate a `NodeKeyConfig` to obtain an identity `Keypair`:
///
/// * If the secret is configured as input, the corresponding keypair is returned.
///
/// * If the secret is configured as a file, it is read from that file, if it exists.
/// Otherwise a new secret is generated and stored. In either case, the
/// keypair obtained from the secret is returned.
///
/// * If the secret is configured to be new, it is generated and the corresponding
/// keypair is returned.
pub fn into_keypair(self) -> io::Result<Keypair> {
use NodeKeyConfig::*;
match self {
Secp256k1(Secret::New) =>
Ok(Keypair::generate_secp256k1()),
Secp256k1(Secret::Input(k)) =>
Ok(Keypair::Secp256k1(k.into())),
Secp256k1(Secret::File(f)) =>
get_secret(f,
|mut b| secp256k1::SecretKey::from_bytes(&mut b),
secp256k1::SecretKey::generate,
|b| b.to_bytes().to_vec())
.map(secp256k1::Keypair::from)
.map(Keypair::Secp256k1),
Ed25519(Secret::New) =>
Ok(Keypair::generate_ed25519()),
Ed25519(Secret::Input(k)) =>
Ok(Keypair::Ed25519(k.into())),
Ed25519(Secret::File(f)) =>
get_secret(f,
|mut b| ed25519::SecretKey::from_bytes(&mut b),
ed25519::SecretKey::generate,
|b| b.as_ref().to_vec())
.map(ed25519::Keypair::from)
.map(Keypair::Ed25519),
}
}
}
/// Load a secret key from a file, if it exists, or generate a
/// new secret key and write it to that file. In either case,
/// the secret key is returned.
fn get_secret<P, F, G, E, W, K>(file: P, parse: F, generate: G, serialize: W) -> io::Result<K>
where
P: AsRef<Path>,
F: for<'r> FnOnce(&'r mut [u8]) -> Result<K, E>,
G: FnOnce() -> K,
E: Error + Send + Sync + 'static,
W: Fn(&K) -> Vec<u8>,
{
std::fs::read(&file)
.and_then(|mut sk_bytes|
parse(&mut sk_bytes)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e)))
.or_else(|e| {
if e.kind() == io::ErrorKind::NotFound {
file.as_ref().parent().map_or(Ok(()), fs::create_dir_all)?;
let sk = generate();
let mut sk_vec = serialize(&sk);
write_secret_file(file, &sk_vec)?;
sk_vec.zeroize();
Ok(sk)
} else {
Err(e)
}
})
}
/// Write secret bytes to a file.
fn write_secret_file<P>(path: P, sk_bytes: &[u8]) -> io::Result<()>
where
P: AsRef<Path>
{
let mut file = open_secret_file(&path)?;
file.write_all(sk_bytes)
}
/// Opens a file containing a secret key in write mode.
#[cfg(unix)]
fn open_secret_file<P>(path: P) -> io::Result<fs::File>
where
P: AsRef<Path>
{
use std::os::unix::fs::OpenOptionsExt;
fs::OpenOptions::new()
.write(true)
.create_new(true)
.mode(0o600)
.open(path)
}
/// Opens a file containing a secret key in write mode.
#[cfg(not(unix))]
fn open_secret_file<P>(path: P) -> Result<fs::File, io::Error>
where
P: AsRef<Path>
{
fs::OpenOptions::new()
.write(true)
.create_new(true)
.open(path)
}
#[cfg(test)]
mod tests {
use super::*;
use tempdir::TempDir;
fn secret_bytes(kp: &Keypair) -> Vec<u8> {
match kp {
Keypair::Ed25519(p) => p.secret().as_ref().iter().cloned().collect(),
Keypair::Secp256k1(p) => p.secret().to_bytes().to_vec(),
_ => panic!("Unexpected keypair.")
}
}
#[test]
fn test_secret_file() {
let tmp = TempDir::new("x").unwrap();
std::fs::remove_dir(tmp.path()).unwrap(); // should be recreated
let file = tmp.path().join("x").to_path_buf();
let kp1 = NodeKeyConfig::Ed25519(Secret::File(file.clone())).into_keypair().unwrap();
let kp2 = NodeKeyConfig::Ed25519(Secret::File(file.clone())).into_keypair().unwrap();
assert!(file.is_file() && secret_bytes(&kp1) == secret_bytes(&kp2))
}
#[test]
fn test_secret_input() {
let sk = secp256k1::SecretKey::generate();
let kp1 = NodeKeyConfig::Secp256k1(Secret::Input(sk.clone())).into_keypair().unwrap();
let kp2 = NodeKeyConfig::Secp256k1(Secret::Input(sk)).into_keypair().unwrap();
assert!(secret_bytes(&kp1) == secret_bytes(&kp2));
}
#[test]
fn test_secret_new() {
let kp1 = NodeKeyConfig::Ed25519(Secret::New).into_keypair().unwrap();
let kp2 = NodeKeyConfig::Ed25519(Secret::New).into_keypair().unwrap();
assert!(secret_bytes(&kp1) != secret_bytes(&kp2));
}
}
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,657 @@
// Copyright 2019 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use crate::custom_proto::upgrade::{CustomMessage, RegisteredProtocol};
use crate::custom_proto::upgrade::{RegisteredProtocolEvent, RegisteredProtocolSubstream};
use futures::prelude::*;
use libp2p::core::{
ConnectedPoint, PeerId, Endpoint, ProtocolsHandler, ProtocolsHandlerEvent,
protocols_handler::IntoProtocolsHandler,
protocols_handler::KeepAlive,
protocols_handler::ProtocolsHandlerUpgrErr,
protocols_handler::SubstreamProtocol,
upgrade::{InboundUpgrade, OutboundUpgrade}
};
use log::{debug, error};
use smallvec::{smallvec, SmallVec};
use std::{borrow::Cow, error, fmt, io, marker::PhantomData, mem, time::Duration};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_timer::{Delay, clock::Clock};
/// Implements the `IntoProtocolsHandler` trait of libp2p.
///
/// Every time a connection with a remote starts, an instance of this struct is created and
/// sent to a background task dedicated to this connection. Once the connection is established,
/// it is turned into a `CustomProtoHandler`. It then handles all communications that are specific
/// to Substrate on that single connection.
///
/// Note that there can be multiple instance of this struct simultaneously for same peer. However
/// if that happens, only one main instance can communicate with the outer layers of the code. In
/// other words, the outer layers of the code only ever see one handler.
///
/// ## State of the handler
///
/// There are six possible states for the handler:
///
/// - Enabled and open, which is a normal operation.
/// - Enabled and closed, in which case it will try to open substreams.
/// - Disabled and open, in which case it will try to close substreams.
/// - Disabled and closed, in which case the handler is idle. The connection will be
/// garbage-collected after a few seconds if nothing more happens.
/// - Initializing and open.
/// - Initializing and closed, which is the state the handler starts in.
///
/// The Init/Enabled/Disabled state is entirely controlled by the user by sending `Enable` or
/// `Disable` messages to the handler. The handler itself never transitions automatically between
/// these states. For example, if the handler reports a network misbehaviour, it will close the
/// substreams but it is the role of the user to send a `Disabled` event if it wants the connection
/// to close. Otherwise, the handler will try to reopen substreams.
/// The handler starts in the "Initializing" state and must be transitionned to Enabled or Disabled
/// as soon as possible.
///
/// The Open/Closed state is decided by the handler and is reported with the `CustomProtocolOpen`
/// and `CustomProtocolClosed` events. The `CustomMessage` event can only be generated if the
/// handler is open.
///
/// ## How it works
///
/// When the handler is created, it is initially in the `Init` state and waits for either a
/// `Disable` or an `Enable` message from the outer layer. At any time, the outer layer is free to
/// toggle the handler between the disabled and enabled states.
///
/// When the handler switches to "enabled", it opens a substream and negotiates the protocol named
/// `/substrate/xxx`, where `xxx` is chosen by the user and depends on the chain.
///
/// For backwards compatibility reasons, when we switch to "enabled" for the first time (while we
/// are still in "init" mode) and we are the connection listener, we don't open a substream.
///
/// In order the handle the situation where both the remote and us get enabled at the same time,
/// we tolerate multiple substreams open at the same time. Messages are transmitted on an arbitrary
/// substream. The endpoints don't try to agree on a single substream.
///
/// We consider that we are now "closed" if the remote closes all the existing substreams.
/// Re-opening it can then be performed by closing all active substream and re-opening one.
///
pub struct CustomProtoHandlerProto<TMessage, TSubstream> {
/// Configuration for the protocol upgrade to negotiate.
protocol: RegisteredProtocol<TMessage>,
/// Marker to pin the generic type.
marker: PhantomData<TSubstream>,
}
impl<TMessage, TSubstream> CustomProtoHandlerProto<TMessage, TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
TMessage: CustomMessage,
{
/// Builds a new `CustomProtoHandlerProto`.
pub fn new(protocol: RegisteredProtocol<TMessage>) -> Self {
CustomProtoHandlerProto {
protocol,
marker: PhantomData,
}
}
}
impl<TMessage, TSubstream> IntoProtocolsHandler for CustomProtoHandlerProto<TMessage, TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
TMessage: CustomMessage,
{
type Handler = CustomProtoHandler<TMessage, TSubstream>;
fn inbound_protocol(&self) -> RegisteredProtocol<TMessage> {
self.protocol.clone()
}
fn into_handler(self, remote_peer_id: &PeerId, connected_point: &ConnectedPoint) -> Self::Handler {
let clock = Clock::new();
CustomProtoHandler {
protocol: self.protocol,
endpoint: connected_point.to_endpoint(),
remote_peer_id: remote_peer_id.clone(),
state: ProtocolState::Init {
substreams: SmallVec::new(),
init_deadline: Delay::new(clock.now() + Duration::from_secs(5))
},
events_queue: SmallVec::new(),
clock,
}
}
}
/// The actual handler once the connection has been established.
pub struct CustomProtoHandler<TMessage, TSubstream> {
/// Configuration for the protocol upgrade to negotiate.
protocol: RegisteredProtocol<TMessage>,
/// State of the communications with the remote.
state: ProtocolState<TMessage, TSubstream>,
/// Identifier of the node we're talking to. Used only for logging purposes and shouldn't have
/// any influence on the behaviour.
remote_peer_id: PeerId,
/// Whether we are the connection dialer or listener. Used to determine who, between the local
/// node and the remote node, has priority.
endpoint: Endpoint,
/// Queue of events to send to the outside.
///
/// This queue must only ever be modified to insert elements at the back, or remove the first
/// element.
events_queue: SmallVec<[ProtocolsHandlerEvent<RegisteredProtocol<TMessage>, (), CustomProtoHandlerOut<TMessage>>; 16]>,
/// `Clock` instance that uses the current execution context's source of time.
clock: Clock,
}
/// State of the handler.
enum ProtocolState<TMessage, TSubstream> {
/// Waiting for the behaviour to tell the handler whether it is enabled or disabled.
Init {
/// List of substreams opened by the remote but that haven't been processed yet.
substreams: SmallVec<[RegisteredProtocolSubstream<TMessage, TSubstream>; 6]>,
/// Deadline after which the initialization is abnormally long.
init_deadline: Delay,
},
/// Handler is opening a substream in order to activate itself.
/// If we are in this state, we haven't sent any `CustomProtocolOpen` yet.
Opening {
/// Deadline after which the opening is abnormally long.
deadline: Delay,
},
/// Normal operating mode. Contains the substreams that are open.
/// If we are in this state, we have sent a `CustomProtocolOpen` message to the outside.
Normal {
/// The substreams where bidirectional communications happen.
substreams: SmallVec<[RegisteredProtocolSubstream<TMessage, TSubstream>; 4]>,
/// Contains substreams which are being shut down.
shutdown: SmallVec<[RegisteredProtocolSubstream<TMessage, TSubstream>; 4]>,
},
/// We are disabled. Contains substreams that are being closed.
/// If we are in this state, either we have sent a `CustomProtocolClosed` message to the
/// outside or we have never sent any `CustomProtocolOpen` in the first place.
Disabled {
/// List of substreams to shut down.
shutdown: SmallVec<[RegisteredProtocolSubstream<TMessage, TSubstream>; 6]>,
/// If true, we should reactivate the handler after all the substreams in `shutdown` have
/// been closed.
///
/// Since we don't want to mix old and new substreams, we wait for all old substreams to
/// be closed before opening any new one.
reenable: bool,
},
/// In this state, we don't care about anything anymore and need to kill the connection as soon
/// as possible.
KillAsap,
/// We sometimes temporarily switch to this state during processing. If we are in this state
/// at the beginning of a method, that means something bad happened in the source code.
Poisoned,
}
/// Event that can be received by a `CustomProtoHandler`.
#[derive(Debug)]
pub enum CustomProtoHandlerIn<TMessage> {
/// The node should start using custom protocols.
Enable,
/// The node should stop using custom protocols.
Disable,
/// Sends a message through a custom protocol substream.
SendCustomMessage {
/// The message to send.
message: TMessage,
},
}
/// Event that can be emitted by a `CustomProtoHandler`.
#[derive(Debug)]
pub enum CustomProtoHandlerOut<TMessage> {
/// Opened a custom protocol with the remote.
CustomProtocolOpen {
/// Version of the protocol that has been opened.
version: u8,
},
/// Closed a custom protocol with the remote.
CustomProtocolClosed {
/// Reason why the substream closed, for diagnostic purposes.
reason: Cow<'static, str>,
},
/// Receives a message on a custom protocol substream.
CustomMessage {
/// Message that has been received.
message: TMessage,
},
/// A substream to the remote is clogged. The send buffer is very large, and we should print
/// a diagnostic message and/or avoid sending more data.
Clogged {
/// Copy of the messages that are within the buffer, for further diagnostic.
messages: Vec<TMessage>,
},
/// An error has happened on the protocol level with this node.
ProtocolError {
/// If true the error is severe, such as a protocol violation.
is_severe: bool,
/// The error that happened.
error: Box<dyn error::Error + Send + Sync>,
},
}
impl<TMessage, TSubstream> CustomProtoHandler<TMessage, TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
TMessage: CustomMessage,
{
/// Enables the handler.
fn enable(&mut self) {
self.state = match mem::replace(&mut self.state, ProtocolState::Poisoned) {
ProtocolState::Poisoned => {
error!(target: "sub-libp2p", "Handler with {:?} is in poisoned state",
self.remote_peer_id);
ProtocolState::Poisoned
}
ProtocolState::Init { substreams: incoming, .. } => {
if incoming.is_empty() {
if let Endpoint::Dialer = self.endpoint {
self.events_queue.push(ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(self.protocol.clone()),
info: (),
});
}
ProtocolState::Opening {
deadline: Delay::new(self.clock.now() + Duration::from_secs(60))
}
} else {
let event = CustomProtoHandlerOut::CustomProtocolOpen {
version: incoming[0].protocol_version()
};
self.events_queue.push(ProtocolsHandlerEvent::Custom(event));
ProtocolState::Normal {
substreams: incoming.into_iter().collect(),
shutdown: SmallVec::new()
}
}
}
st @ ProtocolState::KillAsap => st,
st @ ProtocolState::Opening { .. } => st,
st @ ProtocolState::Normal { .. } => st,
ProtocolState::Disabled { shutdown, .. } => {
ProtocolState::Disabled { shutdown, reenable: true }
}
}
}
/// Disables the handler.
fn disable(&mut self) {
self.state = match mem::replace(&mut self.state, ProtocolState::Poisoned) {
ProtocolState::Poisoned => {
error!(target: "sub-libp2p", "Handler with {:?} is in poisoned state",
self.remote_peer_id);
ProtocolState::Poisoned
}
ProtocolState::Init { substreams: mut shutdown, .. } => {
for s in &mut shutdown {
s.shutdown();
}
ProtocolState::Disabled { shutdown, reenable: false }
}
ProtocolState::Opening { .. } | ProtocolState::Normal { .. } =>
// At the moment, if we get disabled while things were working, we kill the entire
// connection in order to force a reset of the state.
// This is obviously an extremely shameful way to do things, but at the time of
// the writing of this comment, the networking works very poorly and a solution
// needs to be found.
ProtocolState::KillAsap,
ProtocolState::Disabled { shutdown, .. } =>
ProtocolState::Disabled { shutdown, reenable: false },
ProtocolState::KillAsap => ProtocolState::KillAsap,
};
}
/// Polls the state for events. Optionally returns an event to produce.
#[must_use]
fn poll_state(&mut self)
-> Option<ProtocolsHandlerEvent<RegisteredProtocol<TMessage>, (), CustomProtoHandlerOut<TMessage>>> {
match mem::replace(&mut self.state, ProtocolState::Poisoned) {
ProtocolState::Poisoned => {
error!(target: "sub-libp2p", "Handler with {:?} is in poisoned state",
self.remote_peer_id);
self.state = ProtocolState::Poisoned;
None
}
ProtocolState::Init { substreams, mut init_deadline } => {
match init_deadline.poll() {
Ok(Async::Ready(())) => {
init_deadline.reset(self.clock.now() + Duration::from_secs(60));
error!(target: "sub-libp2p", "Handler initialization process is too long \
with {:?}", self.remote_peer_id)
},
Ok(Async::NotReady) => {}
Err(_) => error!(target: "sub-libp2p", "Tokio timer has errored")
}
self.state = ProtocolState::Init { substreams, init_deadline };
None
}
ProtocolState::Opening { mut deadline } => {
match deadline.poll() {
Ok(Async::Ready(())) => {
deadline.reset(self.clock.now() + Duration::from_secs(60));
let event = CustomProtoHandlerOut::ProtocolError {
is_severe: true,
error: "Timeout when opening protocol".to_string().into(),
};
self.state = ProtocolState::Opening { deadline };
Some(ProtocolsHandlerEvent::Custom(event))
},
Ok(Async::NotReady) => {
self.state = ProtocolState::Opening { deadline };
None
},
Err(_) => {
error!(target: "sub-libp2p", "Tokio timer has errored");
deadline.reset(self.clock.now() + Duration::from_secs(60));
self.state = ProtocolState::Opening { deadline };
None
},
}
}
ProtocolState::Normal { mut substreams, mut shutdown } => {
for n in (0..substreams.len()).rev() {
let mut substream = substreams.swap_remove(n);
match substream.poll() {
Ok(Async::NotReady) => substreams.push(substream),
Ok(Async::Ready(Some(RegisteredProtocolEvent::Message(message)))) => {
let event = CustomProtoHandlerOut::CustomMessage {
message
};
substreams.push(substream);
self.state = ProtocolState::Normal { substreams, shutdown };
return Some(ProtocolsHandlerEvent::Custom(event));
},
Ok(Async::Ready(Some(RegisteredProtocolEvent::Clogged { messages }))) => {
let event = CustomProtoHandlerOut::Clogged {
messages,
};
substreams.push(substream);
self.state = ProtocolState::Normal { substreams, shutdown };
return Some(ProtocolsHandlerEvent::Custom(event));
}
Ok(Async::Ready(None)) => {
shutdown.push(substream);
if substreams.is_empty() {
let event = CustomProtoHandlerOut::CustomProtocolClosed {
reason: "All substreams have been closed by the remote".into(),
};
self.state = ProtocolState::Disabled {
shutdown: shutdown.into_iter().collect(),
reenable: true
};
return Some(ProtocolsHandlerEvent::Custom(event));
}
}
Err(err) => {
if substreams.is_empty() {
let event = CustomProtoHandlerOut::CustomProtocolClosed {
reason: format!("Error on the last substream: {:?}", err).into(),
};
self.state = ProtocolState::Disabled {
shutdown: shutdown.into_iter().collect(),
reenable: true
};
return Some(ProtocolsHandlerEvent::Custom(event));
} else {
debug!(target: "sub-libp2p", "Error on extra substream: {:?}", err);
}
}
}
}
// This code is reached is none if and only if none of the substreams are in a ready state.
self.state = ProtocolState::Normal { substreams, shutdown };
None
}
ProtocolState::Disabled { mut shutdown, reenable } => {
shutdown_list(&mut shutdown);
// If `reenable` is `true`, that means we should open the substreams system again
// after all the substreams are closed.
if reenable && shutdown.is_empty() {
self.state = ProtocolState::Opening {
deadline: Delay::new(self.clock.now() + Duration::from_secs(60))
};
Some(ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(self.protocol.clone()),
info: (),
})
} else {
self.state = ProtocolState::Disabled { shutdown, reenable };
None
}
}
ProtocolState::KillAsap => None,
}
}
/// Called by `inject_fully_negotiated_inbound` and `inject_fully_negotiated_outbound`.
fn inject_fully_negotiated(
&mut self,
mut substream: RegisteredProtocolSubstream<TMessage, TSubstream>
) {
self.state = match mem::replace(&mut self.state, ProtocolState::Poisoned) {
ProtocolState::Poisoned => {
error!(target: "sub-libp2p", "Handler with {:?} is in poisoned state",
self.remote_peer_id);
ProtocolState::Poisoned
}
ProtocolState::Init { mut substreams, init_deadline } => {
if substream.endpoint() == Endpoint::Dialer {
error!(target: "sub-libp2p", "Opened dialing substream with {:?} before \
initialization", self.remote_peer_id);
}
substreams.push(substream);
ProtocolState::Init { substreams, init_deadline }
}
ProtocolState::Opening { .. } => {
let event = CustomProtoHandlerOut::CustomProtocolOpen {
version: substream.protocol_version()
};
self.events_queue.push(ProtocolsHandlerEvent::Custom(event));
ProtocolState::Normal {
substreams: smallvec![substream],
shutdown: SmallVec::new()
}
}
ProtocolState::Normal { substreams: mut existing, shutdown } => {
existing.push(substream);
ProtocolState::Normal { substreams: existing, shutdown }
}
ProtocolState::Disabled { mut shutdown, .. } => {
substream.shutdown();
shutdown.push(substream);
ProtocolState::Disabled { shutdown, reenable: false }
}
ProtocolState::KillAsap => ProtocolState::KillAsap,
};
}
/// Sends a message to the remote.
fn send_message(&mut self, message: TMessage) {
match self.state {
ProtocolState::Normal { ref mut substreams, .. } =>
substreams[0].send_message(message),
_ => debug!(target: "sub-libp2p", "Tried to send message over closed protocol \
with {:?}", self.remote_peer_id)
}
}
}
impl<TMessage, TSubstream> ProtocolsHandler for CustomProtoHandler<TMessage, TSubstream>
where TSubstream: AsyncRead + AsyncWrite, TMessage: CustomMessage {
type InEvent = CustomProtoHandlerIn<TMessage>;
type OutEvent = CustomProtoHandlerOut<TMessage>;
type Substream = TSubstream;
type Error = ConnectionKillError;
type InboundProtocol = RegisteredProtocol<TMessage>;
type OutboundProtocol = RegisteredProtocol<TMessage>;
type OutboundOpenInfo = ();
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
SubstreamProtocol::new(self.protocol.clone())
}
fn inject_fully_negotiated_inbound(
&mut self,
proto: <Self::InboundProtocol as InboundUpgrade<TSubstream>>::Output
) {
self.inject_fully_negotiated(proto);
}
fn inject_fully_negotiated_outbound(
&mut self,
proto: <Self::OutboundProtocol as OutboundUpgrade<TSubstream>>::Output,
_: Self::OutboundOpenInfo
) {
self.inject_fully_negotiated(proto);
}
fn inject_event(&mut self, message: CustomProtoHandlerIn<TMessage>) {
match message {
CustomProtoHandlerIn::Disable => self.disable(),
CustomProtoHandlerIn::Enable => self.enable(),
CustomProtoHandlerIn::SendCustomMessage { message } =>
self.send_message(message),
}
}
#[inline]
fn inject_dial_upgrade_error(&mut self, _: (), err: ProtocolsHandlerUpgrErr<io::Error>) {
let is_severe = match err {
ProtocolsHandlerUpgrErr::Upgrade(_) => true,
_ => false,
};
self.events_queue.push(ProtocolsHandlerEvent::Custom(CustomProtoHandlerOut::ProtocolError {
is_severe,
error: Box::new(err),
}));
}
fn connection_keep_alive(&self) -> KeepAlive {
match self.state {
ProtocolState::Init { .. } | ProtocolState::Opening { .. } |
ProtocolState::Normal { .. } => KeepAlive::Yes,
ProtocolState::Disabled { .. } | ProtocolState::Poisoned |
ProtocolState::KillAsap => KeepAlive::No,
}
}
fn poll(
&mut self,
) -> Poll<
ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent>,
Self::Error,
> {
// Flush the events queue if necessary.
if !self.events_queue.is_empty() {
let event = self.events_queue.remove(0);
return Ok(Async::Ready(event))
}
// Kill the connection if needed.
if let ProtocolState::KillAsap = self.state {
return Err(ConnectionKillError);
}
// Process all the substreams.
if let Some(event) = self.poll_state() {
return Ok(Async::Ready(event))
}
Ok(Async::NotReady)
}
}
impl<TMessage, TSubstream> fmt::Debug for CustomProtoHandler<TMessage, TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
{
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
f.debug_struct("CustomProtoHandler")
.finish()
}
}
/// Given a list of substreams, tries to shut them down. The substreams that have been successfully
/// shut down are removed from the list.
fn shutdown_list<TMessage, TSubstream>
(list: &mut SmallVec<impl smallvec::Array<Item = RegisteredProtocolSubstream<TMessage, TSubstream>>>)
where TSubstream: AsyncRead + AsyncWrite, TMessage: CustomMessage {
'outer: for n in (0..list.len()).rev() {
let mut substream = list.swap_remove(n);
loop {
match substream.poll() {
Ok(Async::Ready(Some(_))) => {}
Ok(Async::NotReady) => break,
Err(_) | Ok(Async::Ready(None)) => continue 'outer,
}
}
list.push(substream);
}
}
/// Error returned when switching from normal to disabled.
#[derive(Debug)]
pub struct ConnectionKillError;
impl error::Error for ConnectionKillError {
}
impl fmt::Display for ConnectionKillError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Connection kill when switching from normal to disabled")
}
}
@@ -0,0 +1,22 @@
// Copyright 2019 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
pub use self::behaviour::{CustomProto, CustomProtoOut};
pub use self::upgrade::CustomMessage;
mod behaviour;
mod handler;
mod upgrade;
@@ -0,0 +1,331 @@
// Copyright 2018-2019 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use crate::ProtocolId;
use bytes::Bytes;
use libp2p::core::{Negotiated, Endpoint, UpgradeInfo, InboundUpgrade, OutboundUpgrade, upgrade::ProtocolName};
use libp2p::tokio_codec::Framed;
use log::warn;
use std::{collections::VecDeque, io, marker::PhantomData, vec::IntoIter as VecIntoIter};
use futures::{prelude::*, future, stream};
use tokio_io::{AsyncRead, AsyncWrite};
use unsigned_varint::codec::UviBytes;
/// Connection upgrade for a single protocol.
///
/// Note that "a single protocol" here refers to `par` for example. However
/// each protocol can have multiple different versions for networking purposes.
pub struct RegisteredProtocol<TMessage> {
/// Id of the protocol for API purposes.
id: ProtocolId,
/// Base name of the protocol as advertised on the network.
/// Ends with `/` so that we can append a version number behind.
base_name: Bytes,
/// List of protocol versions that we support.
/// Ordered in descending order so that the best comes first.
supported_versions: Vec<u8>,
/// Marker to pin the generic.
marker: PhantomData<TMessage>,
}
impl<TMessage> RegisteredProtocol<TMessage> {
/// Creates a new `RegisteredProtocol`. The `custom_data` parameter will be
/// passed inside the `RegisteredProtocolOutput`.
pub fn new(protocol: impl Into<ProtocolId>, versions: &[u8])
-> Self {
let protocol = protocol.into();
let mut base_name = Bytes::from_static(b"/substrate/");
base_name.extend_from_slice(protocol.as_bytes());
base_name.extend_from_slice(b"/");
RegisteredProtocol {
base_name,
id: protocol,
supported_versions: {
let mut tmp = versions.to_vec();
tmp.sort_unstable_by(|a, b| b.cmp(&a));
tmp
},
marker: PhantomData,
}
}
}
impl<TMessage> Clone for RegisteredProtocol<TMessage> {
fn clone(&self) -> Self {
RegisteredProtocol {
id: self.id.clone(),
base_name: self.base_name.clone(),
supported_versions: self.supported_versions.clone(),
marker: PhantomData,
}
}
}
/// Output of a `RegisteredProtocol` upgrade.
pub struct RegisteredProtocolSubstream<TMessage, TSubstream> {
/// If true, we are in the process of closing the sink.
is_closing: bool,
/// Whether the local node opened this substream (dialer), or we received this substream from
/// the remote (listener).
endpoint: Endpoint,
/// Buffer of packets to send.
send_queue: VecDeque<Vec<u8>>,
/// If true, we should call `poll_complete` on the inner sink.
requires_poll_complete: bool,
/// The underlying substream.
inner: stream::Fuse<Framed<Negotiated<TSubstream>, UviBytes<Vec<u8>>>>,
/// Version of the protocol that was negotiated.
protocol_version: u8,
/// If true, we have sent a "remote is clogged" event recently and shouldn't send another one
/// unless the buffer empties then fills itself again.
clogged_fuse: bool,
/// Marker to pin the generic.
marker: PhantomData<TMessage>,
}
impl<TMessage, TSubstream> RegisteredProtocolSubstream<TMessage, TSubstream> {
/// Returns the version of the protocol that was negotiated.
pub fn protocol_version(&self) -> u8 {
self.protocol_version
}
/// Returns whether the local node opened this substream (dialer), or we received this
/// substream from the remote (listener).
pub fn endpoint(&self) -> Endpoint {
self.endpoint
}
/// Starts a graceful shutdown process on this substream.
///
/// Note that "graceful" means that we sent a closing message. We don't wait for any
/// confirmation from the remote.
///
/// After calling this, the stream is guaranteed to finish soon-ish.
pub fn shutdown(&mut self) {
self.is_closing = true;
self.send_queue.clear();
}
/// Sends a message to the substream.
pub fn send_message(&mut self, data: TMessage)
where TMessage: CustomMessage {
if self.is_closing {
return
}
self.send_queue.push_back(data.into_bytes());
}
}
/// Implemented on messages that can be sent or received on the network.
pub trait CustomMessage {
/// Turns a message into the raw bytes to send over the network.
fn into_bytes(self) -> Vec<u8>;
/// Tries to parse `bytes` received from the network into a message.
fn from_bytes(bytes: &[u8]) -> Result<Self, ()>
where Self: Sized;
}
// This trait implementation exist mostly for testing convenience. This should eventually be
// removed.
impl CustomMessage for Vec<u8> {
fn into_bytes(self) -> Vec<u8> {
self
}
fn from_bytes(bytes: &[u8]) -> Result<Self, ()> {
Ok(bytes.to_vec())
}
}
/// Event produced by the `RegisteredProtocolSubstream`.
#[derive(Debug, Clone)]
pub enum RegisteredProtocolEvent<TMessage> {
/// Received a message from the remote.
Message(TMessage),
/// Diagnostic event indicating that the connection is clogged and we should avoid sending too
/// many messages to it.
Clogged {
/// Copy of the messages that are within the buffer, for further diagnostic.
messages: Vec<TMessage>,
},
}
impl<TMessage, TSubstream> Stream for RegisteredProtocolSubstream<TMessage, TSubstream>
where TSubstream: AsyncRead + AsyncWrite, TMessage: CustomMessage {
type Item = RegisteredProtocolEvent<TMessage>;
type Error = io::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
// Flushing the local queue.
while let Some(packet) = self.send_queue.pop_front() {
match self.inner.start_send(packet)? {
AsyncSink::NotReady(packet) => {
self.send_queue.push_front(packet);
break
},
AsyncSink::Ready => self.requires_poll_complete = true,
}
}
// If we are closing, close as soon as the Sink is closed.
if self.is_closing {
return Ok(self.inner.close()?.map(|()| None))
}
// Indicating that the remote is clogged if that's the case.
if self.send_queue.len() >= 2048 {
if !self.clogged_fuse {
// Note: this fuse is important not just for preventing us from flooding the logs;
// if you remove the fuse, then we will always return early from this function and
// thus never read any message from the network.
self.clogged_fuse = true;
return Ok(Async::Ready(Some(RegisteredProtocolEvent::Clogged {
messages: self.send_queue.iter()
.map(|m| CustomMessage::from_bytes(&m))
.filter_map(Result::ok)
.collect(),
})))
}
} else {
self.clogged_fuse = false;
}
// Flushing if necessary.
if self.requires_poll_complete {
if let Async::Ready(()) = self.inner.poll_complete()? {
self.requires_poll_complete = false;
}
}
// Receiving incoming packets.
// Note that `inner` is wrapped in a `Fuse`, therefore we can poll it forever.
match self.inner.poll()? {
Async::Ready(Some(data)) => {
let message = <TMessage as CustomMessage>::from_bytes(&data)
.map_err(|()| {
warn!(target: "sub-libp2p", "Couldn't decode packet sent by the remote: {:?}", data);
io::ErrorKind::InvalidData
})?;
Ok(Async::Ready(Some(RegisteredProtocolEvent::Message(message))))
}
Async::Ready(None) =>
if !self.requires_poll_complete && self.send_queue.is_empty() {
Ok(Async::Ready(None))
} else {
Ok(Async::NotReady)
}
Async::NotReady => Ok(Async::NotReady),
}
}
}
impl<TMessage> UpgradeInfo for RegisteredProtocol<TMessage> {
type Info = RegisteredProtocolName;
type InfoIter = VecIntoIter<Self::Info>;
#[inline]
fn protocol_info(&self) -> Self::InfoIter {
// Report each version as an individual protocol.
self.supported_versions.iter().map(|&version| {
let num = version.to_string();
let mut name = self.base_name.clone();
name.extend_from_slice(num.as_bytes());
RegisteredProtocolName {
name,
version,
}
}).collect::<Vec<_>>().into_iter()
}
}
/// Implementation of `ProtocolName` for a custom protocol.
#[derive(Debug, Clone)]
pub struct RegisteredProtocolName {
/// Protocol name, as advertised on the wire.
name: Bytes,
/// Version number. Stored in string form in `name`, but duplicated here for easier retrieval.
version: u8,
}
impl ProtocolName for RegisteredProtocolName {
fn protocol_name(&self) -> &[u8] {
&self.name
}
}
impl<TMessage, TSubstream> InboundUpgrade<TSubstream> for RegisteredProtocol<TMessage>
where TSubstream: AsyncRead + AsyncWrite,
{
type Output = RegisteredProtocolSubstream<TMessage, TSubstream>;
type Future = future::FutureResult<Self::Output, io::Error>;
type Error = io::Error;
fn upgrade_inbound(
self,
socket: Negotiated<TSubstream>,
info: Self::Info,
) -> Self::Future {
let framed = {
let mut codec = UviBytes::default();
codec.set_max_len(16 * 1024 * 1024); // 16 MiB hard limit for packets.
Framed::new(socket, codec)
};
future::ok(RegisteredProtocolSubstream {
is_closing: false,
endpoint: Endpoint::Listener,
send_queue: VecDeque::new(),
requires_poll_complete: false,
inner: framed.fuse(),
protocol_version: info.version,
clogged_fuse: false,
marker: PhantomData,
})
}
}
impl<TMessage, TSubstream> OutboundUpgrade<TSubstream> for RegisteredProtocol<TMessage>
where TSubstream: AsyncRead + AsyncWrite,
{
type Output = <Self as InboundUpgrade<TSubstream>>::Output;
type Future = <Self as InboundUpgrade<TSubstream>>::Future;
type Error = <Self as InboundUpgrade<TSubstream>>::Error;
fn upgrade_outbound(
self,
socket: Negotiated<TSubstream>,
info: Self::Info,
) -> Self::Future {
let framed = Framed::new(socket, UviBytes::default());
future::ok(RegisteredProtocolSubstream {
is_closing: false,
endpoint: Endpoint::Dialer,
send_queue: VecDeque::new(),
requires_poll_complete: false,
inner: framed.fuse(),
protocol_version: info.version,
clogged_fuse: false,
marker: PhantomData,
})
}
}
+324
View File
@@ -0,0 +1,324 @@
// Copyright 2019 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use fnv::FnvHashMap;
use futures::prelude::*;
use libp2p::Multiaddr;
use libp2p::core::{either::EitherOutput, PeerId, PublicKey};
use libp2p::core::protocols_handler::{IntoProtocolsHandler, IntoProtocolsHandlerSelect, ProtocolsHandler};
use libp2p::core::nodes::ConnectedPoint;
use libp2p::core::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters};
use libp2p::identify::{Identify, IdentifyEvent, protocol::IdentifyInfo};
use libp2p::ping::{Ping, PingConfig, PingEvent, PingSuccess};
use log::{debug, trace, error};
use std::collections::hash_map::Entry;
use std::time::{Duration, Instant};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_timer::Interval;
/// Time after we disconnect from a node before we purge its information from the cache.
const CACHE_EXPIRE: Duration = Duration::from_secs(10 * 60);
/// Interval at which we perform garbage collection on the node info.
const GARBAGE_COLLECT_INTERVAL: Duration = Duration::from_secs(2 * 60);
/// Implementation of `NetworkBehaviour` that holds information about nodes in cache for diagnostic
/// purposes.
pub struct DebugInfoBehaviour<TSubstream> {
/// Periodically ping nodes, and close the connection if it's unresponsive.
ping: Ping<TSubstream>,
/// Periodically identifies the remote and responds to incoming requests.
identify: Identify<TSubstream>,
/// Information that we know about all nodes.
nodes_info: FnvHashMap<PeerId, NodeInfo>,
/// Interval at which we perform garbage collection in `nodes_info`.
garbage_collect: Interval,
}
/// Information about a node we're connected to.
#[derive(Debug)]
struct NodeInfo {
/// When we will remove the entry about this node from the list, or `None` if we're connected
/// to the node.
info_expire: Option<Instant>,
/// How we're connected to the node.
endpoint: ConnectedPoint,
/// Version reported by the remote, or `None` if unknown.
client_version: Option<String>,
/// Latest ping time with this node.
latest_ping: Option<Duration>,
}
impl<TSubstream> DebugInfoBehaviour<TSubstream> {
/// Builds a new `DebugInfoBehaviour`.
pub fn new(
user_agent: String,
local_public_key: PublicKey,
) -> Self {
let identify = {
let proto_version = "/substrate/1.0".to_string();
Identify::new(proto_version, user_agent, local_public_key.clone())
};
DebugInfoBehaviour {
ping: Ping::new(PingConfig::new()),
identify,
nodes_info: FnvHashMap::default(),
garbage_collect: Interval::new_interval(GARBAGE_COLLECT_INTERVAL),
}
}
/// Borrows `self` and returns a struct giving access to the information about a node.
///
/// Returns `None` if we don't know anything about this node. Always returns `Some` for nodes
/// we're connected to, meaning that if `None` is returned then we're not connected to that
/// node.
pub fn node(&self, peer_id: &PeerId) -> Option<Node> {
self.nodes_info.get(peer_id).map(Node)
}
/// Inserts a ping time in the cache. Has no effect if we don't have any entry for that node,
/// which shouldn't happen.
fn handle_ping_report(&mut self, peer_id: &PeerId, ping_time: Duration) {
trace!(target: "sub-libp2p", "Ping time with {:?}: {:?}", peer_id, ping_time);
if let Some(entry) = self.nodes_info.get_mut(peer_id) {
entry.latest_ping = Some(ping_time);
} else {
error!(target: "sub-libp2p",
"Received ping from node we're not connected to {:?}", peer_id);
}
}
/// Inserts an identify record in the cache. Has no effect if we don't have any entry for that
/// node, which shouldn't happen.
fn handle_identify_report(&mut self, peer_id: &PeerId, info: &IdentifyInfo) {
trace!(target: "sub-libp2p", "Identified {:?} => {:?}", peer_id, info);
if let Some(entry) = self.nodes_info.get_mut(peer_id) {
entry.client_version = Some(info.agent_version.clone());
} else {
error!(target: "sub-libp2p",
"Received pong from node we're not connected to {:?}", peer_id);
}
}
}
/// Gives access to the information about a node.
pub struct Node<'a>(&'a NodeInfo);
impl<'a> Node<'a> {
/// Returns the endpoint we are connected to or were last connected to.
pub fn endpoint(&self) -> &'a ConnectedPoint {
&self.0.endpoint
}
/// Returns the latest version information we know of.
pub fn client_version(&self) -> Option<&'a str> {
self.0.client_version.as_ref().map(|s| &s[..])
}
/// Returns the latest ping time we know of for this node. `None` if we never successfully
/// pinged this node.
pub fn latest_ping(&self) -> Option<Duration> {
self.0.latest_ping
}
}
/// Event that can be emitted by the behaviour.
#[derive(Debug)]
pub enum DebugInfoEvent {
/// We have obtained debug information from a peer, including the addresses it is listening
/// on.
Identified {
/// Id of the peer that has been identified.
peer_id: PeerId,
/// Information about the peer.
info: IdentifyInfo,
},
}
impl<TSubstream> NetworkBehaviour for DebugInfoBehaviour<TSubstream>
where TSubstream: AsyncRead + AsyncWrite {
type ProtocolsHandler = IntoProtocolsHandlerSelect<
<Ping<TSubstream> as NetworkBehaviour>::ProtocolsHandler,
<Identify<TSubstream> as NetworkBehaviour>::ProtocolsHandler
>;
type OutEvent = DebugInfoEvent;
fn new_handler(&mut self) -> Self::ProtocolsHandler {
IntoProtocolsHandler::select(self.ping.new_handler(), self.identify.new_handler())
}
fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
let mut list = self.ping.addresses_of_peer(peer_id);
list.extend_from_slice(&self.identify.addresses_of_peer(peer_id));
list
}
fn inject_connected(&mut self, peer_id: PeerId, endpoint: ConnectedPoint) {
self.ping.inject_connected(peer_id.clone(), endpoint.clone());
self.identify.inject_connected(peer_id.clone(), endpoint.clone());
match self.nodes_info.entry(peer_id) {
Entry::Vacant(e) => {
e.insert(NodeInfo {
info_expire: None,
endpoint,
client_version: None,
latest_ping: None,
});
}
Entry::Occupied(e) => {
let e = e.into_mut();
if e.info_expire.as_ref().map(|exp| *exp < Instant::now()).unwrap_or(false) {
e.client_version = None;
e.latest_ping = None;
}
e.info_expire = None;
e.endpoint = endpoint;
}
}
}
fn inject_disconnected(&mut self, peer_id: &PeerId, endpoint: ConnectedPoint) {
self.ping.inject_disconnected(peer_id, endpoint.clone());
self.identify.inject_disconnected(peer_id, endpoint);
if let Some(entry) = self.nodes_info.get_mut(peer_id) {
entry.info_expire = Some(Instant::now() + CACHE_EXPIRE);
} else {
error!(target: "sub-libp2p",
"Disconnected from node we were not connected to {:?}", peer_id);
}
}
fn inject_node_event(
&mut self,
peer_id: PeerId,
event: <<Self::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent
) {
match event {
EitherOutput::First(event) => self.ping.inject_node_event(peer_id, event),
EitherOutput::Second(event) => self.identify.inject_node_event(peer_id, event),
}
}
fn inject_replaced(&mut self, peer_id: PeerId, closed_endpoint: ConnectedPoint, new_endpoint: ConnectedPoint) {
self.ping.inject_replaced(peer_id.clone(), closed_endpoint.clone(), new_endpoint.clone());
self.identify.inject_replaced(peer_id.clone(), closed_endpoint, new_endpoint.clone());
if let Some(entry) = self.nodes_info.get_mut(&peer_id) {
entry.endpoint = new_endpoint;
} else {
error!(target: "sub-libp2p",
"Disconnected from node we were not connected to {:?}", peer_id);
}
}
fn inject_addr_reach_failure(&mut self, peer_id: Option<&PeerId>, addr: &Multiaddr, error: &dyn std::error::Error) {
self.ping.inject_addr_reach_failure(peer_id, addr, error);
self.identify.inject_addr_reach_failure(peer_id, addr, error);
}
fn inject_dial_failure(&mut self, peer_id: &PeerId) {
self.ping.inject_dial_failure(peer_id);
self.identify.inject_dial_failure(peer_id);
}
fn inject_new_listen_addr(&mut self, addr: &Multiaddr) {
self.ping.inject_new_listen_addr(addr);
self.identify.inject_new_listen_addr(addr);
}
fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) {
self.ping.inject_expired_listen_addr(addr);
self.identify.inject_expired_listen_addr(addr);
}
fn inject_new_external_addr(&mut self, addr: &Multiaddr) {
self.ping.inject_new_external_addr(addr);
self.identify.inject_new_external_addr(addr);
}
fn poll(
&mut self,
params: &mut PollParameters
) -> Async<
NetworkBehaviourAction<
<<Self::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent,
Self::OutEvent
>
> {
loop {
match self.ping.poll(params) {
Async::NotReady => break,
Async::Ready(NetworkBehaviourAction::GenerateEvent(ev)) => {
if let PingEvent { peer, result: Ok(PingSuccess::Ping { rtt }) } = ev {
self.handle_ping_report(&peer, rtt)
}
},
Async::Ready(NetworkBehaviourAction::DialAddress { address }) =>
return Async::Ready(NetworkBehaviourAction::DialAddress { address }),
Async::Ready(NetworkBehaviourAction::DialPeer { peer_id }) =>
return Async::Ready(NetworkBehaviourAction::DialPeer { peer_id }),
Async::Ready(NetworkBehaviourAction::SendEvent { peer_id, event }) =>
return Async::Ready(NetworkBehaviourAction::SendEvent {
peer_id,
event: EitherOutput::First(event)
}),
Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address }) =>
return Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address }),
}
}
loop {
match self.identify.poll(params) {
Async::NotReady => break,
Async::Ready(NetworkBehaviourAction::GenerateEvent(event)) => {
match event {
IdentifyEvent::Identified { peer_id, info, .. } => {
self.handle_identify_report(&peer_id, &info);
let event = DebugInfoEvent::Identified { peer_id, info };
return Async::Ready(NetworkBehaviourAction::GenerateEvent(event));
}
IdentifyEvent::Error { .. } => {}
IdentifyEvent::SendBack { result: Err(ref err), ref peer_id } =>
debug!(target: "sub-libp2p", "Error when sending back identify info \
to {:?} => {}", peer_id, err),
IdentifyEvent::SendBack { .. } => {}
}
},
Async::Ready(NetworkBehaviourAction::DialAddress { address }) =>
return Async::Ready(NetworkBehaviourAction::DialAddress { address }),
Async::Ready(NetworkBehaviourAction::DialPeer { peer_id }) =>
return Async::Ready(NetworkBehaviourAction::DialPeer { peer_id }),
Async::Ready(NetworkBehaviourAction::SendEvent { peer_id, event }) =>
return Async::Ready(NetworkBehaviourAction::SendEvent {
peer_id,
event: EitherOutput::Second(event)
}),
Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address }) =>
return Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address }),
}
}
while let Ok(Async::Ready(Some(_))) = self.garbage_collect.poll() {
self.nodes_info.retain(|_, node| {
node.info_expire.as_ref().map(|exp| *exp >= Instant::now()).unwrap_or(true)
});
}
Async::NotReady
}
}
+302
View File
@@ -0,0 +1,302 @@
// Copyright 2019 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use futures::prelude::*;
use libp2p::core::{Multiaddr, PeerId, ProtocolsHandler, PublicKey};
use libp2p::core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction};
use libp2p::core::swarm::PollParameters;
use libp2p::kad::{Kademlia, KademliaOut};
use libp2p::multiaddr::Protocol;
use log::{debug, info, trace, warn};
use std::{cmp, time::Duration};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_timer::{Delay, clock::Clock};
/// Implementation of `NetworkBehaviour` that discovers the nodes on the network.
pub struct DiscoveryBehaviour<TSubstream> {
/// User-defined list of nodes and their addresses. Typically includes bootstrap nodes and
/// reserved nodes.
user_defined: Vec<(PeerId, Multiaddr)>,
/// Kademlia requests and answers.
kademlia: Kademlia<TSubstream>,
/// Stream that fires when we need to perform the next random Kademlia query.
next_kad_random_query: Delay,
/// After `next_kad_random_query` triggers, the next one triggers after this duration.
duration_to_next_kad: Duration,
/// `Clock` instance that uses the current execution context's source of time.
clock: Clock,
/// Identity of our local node.
local_peer_id: PeerId,
}
impl<TSubstream> DiscoveryBehaviour<TSubstream> {
/// Builds a new `DiscoveryBehaviour`.
///
/// `user_defined` is a list of known address for nodes that never expire.
pub fn new(local_public_key: PublicKey, user_defined: Vec<(PeerId, Multiaddr)>) -> Self {
let mut kademlia = Kademlia::new(local_public_key.clone().into_peer_id());
for (peer_id, addr) in &user_defined {
kademlia.add_address(peer_id, addr.clone());
}
let clock = Clock::new();
DiscoveryBehaviour {
user_defined,
kademlia,
next_kad_random_query: Delay::new(clock.now()),
duration_to_next_kad: Duration::from_secs(1),
clock,
local_peer_id: local_public_key.into_peer_id(),
}
}
/// Returns the list of nodes that we know exist in the network.
pub fn known_peers(&mut self) -> impl Iterator<Item = &PeerId> {
self.kademlia.kbuckets_entries()
}
/// Adds a hard-coded address for the given peer, that never expires.
///
/// This adds an entry to the parameter that was passed to `new`.
pub fn add_known_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
if self.user_defined.iter().all(|(p, a)| *p != peer_id && *a != addr) {
self.user_defined.push((peer_id, addr));
}
}
/// Call this method when a node reports an address for itself.
pub fn add_self_reported_address(&mut self, peer_id: &PeerId, addr: Multiaddr) {
self.kademlia.add_address(peer_id, addr);
}
}
/// Event generated by the `DiscoveryBehaviour`.
pub enum DiscoveryOut {
/// We have discovered a node. Can be called multiple times with the same identity.
Discovered(PeerId),
}
impl<TSubstream> NetworkBehaviour for DiscoveryBehaviour<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
{
type ProtocolsHandler = <Kademlia<TSubstream> as NetworkBehaviour>::ProtocolsHandler;
type OutEvent = DiscoveryOut;
fn new_handler(&mut self) -> Self::ProtocolsHandler {
NetworkBehaviour::new_handler(&mut self.kademlia)
}
fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
let mut list = self.user_defined.iter()
.filter_map(|(p, a)| if p == peer_id { Some(a.clone()) } else { None })
.collect::<Vec<_>>();
list.extend(self.kademlia.addresses_of_peer(peer_id));
trace!(target: "sub-libp2p", "Addresses of {:?} are {:?}", peer_id, list);
if list.is_empty() {
if self.kademlia.kbuckets_entries().any(|p| p == peer_id) {
debug!(target: "sub-libp2p", "Requested dialing to {:?} (peer in k-buckets), \
and no address was found", peer_id);
} else {
debug!(target: "sub-libp2p", "Requested dialing to {:?} (peer not in k-buckets), \
and no address was found", peer_id);
}
}
list
}
fn inject_connected(&mut self, peer_id: PeerId, endpoint: ConnectedPoint) {
NetworkBehaviour::inject_connected(&mut self.kademlia, peer_id, endpoint)
}
fn inject_disconnected(&mut self, peer_id: &PeerId, endpoint: ConnectedPoint) {
NetworkBehaviour::inject_disconnected(&mut self.kademlia, peer_id, endpoint)
}
fn inject_replaced(&mut self, peer_id: PeerId, closed: ConnectedPoint, opened: ConnectedPoint) {
NetworkBehaviour::inject_replaced(&mut self.kademlia, peer_id, closed, opened)
}
fn inject_node_event(
&mut self,
peer_id: PeerId,
event: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent,
) {
NetworkBehaviour::inject_node_event(&mut self.kademlia, peer_id, event)
}
fn inject_new_external_addr(&mut self, addr: &Multiaddr) {
let new_addr = addr.clone()
.with(Protocol::P2p(self.local_peer_id.clone().into()));
info!(target: "sub-libp2p", "Discovered new external address for our node: {}", new_addr);
}
fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) {
info!(target: "sub-libp2p", "No longer listening on {}", addr);
}
fn poll(
&mut self,
params: &mut PollParameters,
) -> Async<
NetworkBehaviourAction<
<Self::ProtocolsHandler as ProtocolsHandler>::InEvent,
Self::OutEvent,
>,
> {
// Poll Kademlia.
match self.kademlia.poll(params) {
Async::NotReady => (),
Async::Ready(NetworkBehaviourAction::GenerateEvent(ev)) => {
match ev {
KademliaOut::Discovered { .. } => {}
KademliaOut::KBucketAdded { peer_id, .. } => {
let ev = DiscoveryOut::Discovered(peer_id);
return Async::Ready(NetworkBehaviourAction::GenerateEvent(ev));
}
KademliaOut::FindNodeResult { key, closer_peers } => {
trace!(target: "sub-libp2p", "Libp2p => Query for {:?} yielded {:?} results",
key, closer_peers.len());
if closer_peers.is_empty() {
warn!(target: "sub-libp2p", "Libp2p => Random Kademlia query has yielded empty \
results");
}
}
// We never start any other type of query.
KademliaOut::GetProvidersResult { .. } => {}
KademliaOut::GetValueResult(_) => {}
KademliaOut::PutValueResult(_) => {}
}
},
Async::Ready(NetworkBehaviourAction::DialAddress { address }) =>
return Async::Ready(NetworkBehaviourAction::DialAddress { address }),
Async::Ready(NetworkBehaviourAction::DialPeer { peer_id }) =>
return Async::Ready(NetworkBehaviourAction::DialPeer { peer_id }),
Async::Ready(NetworkBehaviourAction::SendEvent { peer_id, event }) =>
return Async::Ready(NetworkBehaviourAction::SendEvent { peer_id, event }),
Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address }) =>
return Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address }),
}
// Poll the stream that fires when we need to start a random Kademlia query.
loop {
match self.next_kad_random_query.poll() {
Ok(Async::NotReady) => break,
Ok(Async::Ready(_)) => {
let random_peer_id = PeerId::random();
debug!(target: "sub-libp2p", "Libp2p <= Starting random Kademlia request for \
{:?}", random_peer_id);
self.kademlia.find_node(random_peer_id);
// Reset the `Delay` to the next random.
self.next_kad_random_query.reset(self.clock.now() + self.duration_to_next_kad);
self.duration_to_next_kad = cmp::min(self.duration_to_next_kad * 2,
Duration::from_secs(60));
},
Err(err) => {
warn!(target: "sub-libp2p", "Kademlia query timer errored: {:?}", err);
break
}
}
}
Async::NotReady
}
}
#[cfg(test)]
mod tests {
use futures::prelude::*;
use libp2p::identity::Keypair;
use libp2p::Multiaddr;
use libp2p::core::{upgrade, Swarm};
use libp2p::core::transport::{Transport, MemoryTransport};
use libp2p::core::upgrade::{InboundUpgradeExt, OutboundUpgradeExt};
use std::collections::HashSet;
use super::{DiscoveryBehaviour, DiscoveryOut};
#[test]
fn discovery_working() {
let mut user_defined = Vec::new();
// Build swarms whose behaviour is `DiscoveryBehaviour`.
let mut swarms = (0..25).map(|_| {
let keypair = Keypair::generate_ed25519();
let transport = MemoryTransport
.with_upgrade(libp2p::secio::SecioConfig::new(keypair.clone()))
.and_then(move |out, endpoint| {
let peer_id = out.remote_key.into_peer_id();
let peer_id2 = peer_id.clone();
let upgrade = libp2p::yamux::Config::default()
.map_inbound(move |muxer| (peer_id, muxer))
.map_outbound(move |muxer| (peer_id2, muxer));
upgrade::apply(out.stream, upgrade, endpoint)
});
let behaviour = DiscoveryBehaviour::new(keypair.public(), user_defined.clone());
let mut swarm = Swarm::new(transport, behaviour, keypair.public().into_peer_id());
let listen_addr: Multiaddr = format!("/memory/{}", rand::random::<u64>()).parse().unwrap();
if user_defined.is_empty() {
user_defined.push((keypair.public().into_peer_id(), listen_addr.clone()));
}
Swarm::listen_on(&mut swarm, listen_addr.clone()).unwrap();
(swarm, listen_addr)
}).collect::<Vec<_>>();
// Build a `Vec<HashSet<PeerId>>` with the list of nodes remaining to be discovered.
let mut to_discover = (0..swarms.len()).map(|n| {
(0..swarms.len()).filter(|p| *p != n)
.map(|p| Swarm::local_peer_id(&swarms[p].0).clone())
.collect::<HashSet<_>>()
}).collect::<Vec<_>>();
let fut = futures::future::poll_fn(move || -> Result<_, ()> {
loop {
let mut keep_polling = false;
for swarm_n in 0..swarms.len() {
if let Async::Ready(Some(DiscoveryOut::Discovered(other))) =
swarms[swarm_n].0.poll().unwrap() {
if to_discover[swarm_n].remove(&other) {
keep_polling = true;
// Call `add_self_reported_address` to simulate identify happening.
let addr = swarms.iter()
.find(|s| *Swarm::local_peer_id(&s.0) == other)
.unwrap()
.1.clone();
swarms[swarm_n].0.add_self_reported_address(&other, addr);
}
}
}
if !keep_polling {
break;
}
}
if to_discover.iter().all(|l| l.is_empty()) {
Ok(Async::Ready(()))
} else {
Ok(Async::NotReady)
}
});
tokio::runtime::Runtime::new().unwrap().block_on(fut).unwrap();
}
}
+183 -10
View File
@@ -23,11 +23,17 @@
//! **Important**: This crate is unstable and the API and usage may change.
//!
mod service;
mod behaviour;
mod chain;
mod custom_proto;
mod debug_info;
mod discovery;
mod on_demand_layer;
#[macro_use]
mod protocol;
mod chain;
mod on_demand_layer;
mod service;
mod transport;
pub mod config;
pub mod error;
@@ -39,18 +45,185 @@ pub use service::{
NetworkService, NetworkWorker, FetchFuture, TransactionPool, ManageNetwork,
NetworkMsg, SyncProvider, ExHashT, ReportHandle,
};
pub use config::{NodeKeyConfig, Secret, Secp256k1Secret, Ed25519Secret};
pub use protocol::{ProtocolStatus, PeerInfo, Context, consensus_gossip, message, specialization};
pub use protocol::sync::{Status as SyncStatus, SyncState};
pub use network_libp2p::{
identity, multiaddr,
ProtocolId, Multiaddr,
NetworkState, NetworkStatePeer, NetworkStateNotConnectedPeer, NetworkStatePeerEndpoint,
NodeKeyConfig, Secret, Secp256k1Secret, Ed25519Secret,
build_multiaddr, PeerId, PublicKey
};
pub use libp2p::{Multiaddr, multiaddr, build_multiaddr};
pub use libp2p::{identity, PeerId, core::PublicKey};
pub use message::{generic as generic_message, RequestId, Status as StatusMessage};
pub use error::Error;
pub use protocol::on_demand::AlwaysBadChecker;
pub use on_demand_layer::{OnDemand, RemoteResponse};
#[doc(hidden)]
pub use runtime_primitives::traits::Block as BlockT;
use libp2p::core::nodes::ConnectedPoint;
use serde::{Deserialize, Serialize};
use slog_derive::SerdeValue;
use std::{collections::{HashMap, HashSet}, fmt, time::Duration};
/// Extension trait for `NetworkBehaviour` that also accepts discovering nodes.
pub trait DiscoveryNetBehaviour {
/// Notify the protocol that we have learned about the existence of nodes.
///
/// Can (or most likely will) be called multiple times with the same `PeerId`s.
///
/// Also note that there is no notification for expired nodes. The implementer must add a TTL
/// system, or remove nodes that will fail to reach.
fn add_discovered_nodes(&mut self, nodes: impl Iterator<Item = PeerId>);
}
/// Name of a protocol, transmitted on the wire. Should be unique for each chain.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ProtocolId(smallvec::SmallVec<[u8; 6]>);
impl<'a> From<&'a [u8]> for ProtocolId {
fn from(bytes: &'a [u8]) -> ProtocolId {
ProtocolId(bytes.into())
}
}
impl ProtocolId {
/// Exposes the `ProtocolId` as bytes.
pub fn as_bytes(&self) -> &[u8] {
self.0.as_ref()
}
}
/// Parses a string address and returns the component, if valid.
pub fn parse_str_addr(addr_str: &str) -> Result<(PeerId, Multiaddr), ParseErr> {
let mut addr: Multiaddr = addr_str.parse()?;
let who = match addr.pop() {
Some(multiaddr::Protocol::P2p(key)) => PeerId::from_multihash(key)
.map_err(|_| ParseErr::InvalidPeerId)?,
_ => return Err(ParseErr::PeerIdMissing),
};
Ok((who, addr))
}
/// Error that can be generated by `parse_str_addr`.
#[derive(Debug)]
pub enum ParseErr {
/// Error while parsing the multiaddress.
MultiaddrParse(multiaddr::Error),
/// Multihash of the peer ID is invalid.
InvalidPeerId,
/// The peer ID is missing from the address.
PeerIdMissing,
}
impl fmt::Display for ParseErr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ParseErr::MultiaddrParse(err) => write!(f, "{}", err),
ParseErr::InvalidPeerId => write!(f, "Peer id at the end of the address is invalid"),
ParseErr::PeerIdMissing => write!(f, "Peer id is missing from the address"),
}
}
}
impl std::error::Error for ParseErr {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
ParseErr::MultiaddrParse(err) => Some(err),
ParseErr::InvalidPeerId => None,
ParseErr::PeerIdMissing => None,
}
}
}
impl From<multiaddr::Error> for ParseErr {
fn from(err: multiaddr::Error) -> ParseErr {
ParseErr::MultiaddrParse(err)
}
}
/// Returns general information about the networking.
///
/// Meant for general diagnostic purposes.
///
/// **Warning**: This API is not stable.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, SerdeValue)]
#[serde(rename_all = "camelCase")]
pub struct NetworkState {
/// PeerId of the local node.
pub peer_id: String,
/// List of addresses the node is currently listening on.
pub listened_addresses: HashSet<Multiaddr>,
/// List of addresses the node knows it can be reached as.
pub external_addresses: HashSet<Multiaddr>,
/// List of node we're connected to.
pub connected_peers: HashMap<String, NetworkStatePeer>,
/// List of node that we know of but that we're not connected to.
pub not_connected_peers: HashMap<String, NetworkStateNotConnectedPeer>,
/// Downloaded bytes per second averaged over the past few seconds.
pub average_download_per_sec: u64,
/// Uploaded bytes per second averaged over the past few seconds.
pub average_upload_per_sec: u64,
/// State of the peerset manager.
pub peerset: serde_json::Value,
}
/// Part of the `NetworkState` struct. Unstable.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct NetworkStatePeer {
/// How we are connected to the node.
pub endpoint: NetworkStatePeerEndpoint,
/// Node information, as provided by the node itself. Can be empty if not known yet.
pub version_string: Option<String>,
/// Latest ping duration with this node.
pub latest_ping_time: Option<Duration>,
/// If true, the peer is "enabled", which means that we try to open Substrate-related protocols
/// with this peer. If false, we stick to Kademlia and/or other network-only protocols.
pub enabled: bool,
/// If true, the peer is "open", which means that we have a Substrate-related protocol
/// with this peer.
pub open: bool,
/// List of addresses known for this node.
pub known_addresses: HashSet<Multiaddr>,
}
/// Part of the `NetworkState` struct. Unstable.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct NetworkStateNotConnectedPeer {
/// List of addresses known for this node.
pub known_addresses: HashSet<Multiaddr>,
/// Node information, as provided by the node itself, if we were ever connected to this node.
pub version_string: Option<String>,
/// Latest ping duration with this node, if we were ever connected to this node.
pub latest_ping_time: Option<Duration>,
}
/// Part of the `NetworkState` struct. Unstable.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum NetworkStatePeerEndpoint {
/// We are dialing the given address.
Dialing(Multiaddr),
/// We are listening.
Listening {
/// Address we're listening on that received the connection.
listen_addr: Multiaddr,
/// Address data is sent back to.
send_back_addr: Multiaddr,
},
}
impl From<ConnectedPoint> for NetworkStatePeerEndpoint {
fn from(endpoint: ConnectedPoint) -> Self {
match endpoint {
ConnectedPoint::Dialer { address } =>
NetworkStatePeerEndpoint::Dialing(address),
ConnectedPoint::Listener { listen_addr, send_back_addr } =>
NetworkStatePeerEndpoint::Listening {
listen_addr,
send_back_addr
}
}
}
}
+5 -5
View File
@@ -15,7 +15,7 @@
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use futures::prelude::*;
use network_libp2p::PeerId;
use libp2p::PeerId;
use primitives::storage::StorageKey;
use consensus::{import_queue::IncomingBlock, import_queue::Origin, BlockOrigin};
use runtime_primitives::{generic::BlockId, ConsensusEngineId, Justification};
@@ -618,15 +618,15 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
}
/// Called when a new peer is connected
pub fn on_peer_connected(&mut self, network_out: &mut dyn NetworkOut<B>, who: PeerId, debug_info: String) {
trace!(target: "sync", "Connecting {}: {}", who, debug_info);
pub fn on_peer_connected(&mut self, network_out: &mut dyn NetworkOut<B>, who: PeerId) {
trace!(target: "sync", "Connecting {}", who);
self.handshaking_peers.insert(who.clone(), HandshakingPeer { timestamp: time::Instant::now() });
self.send_status(network_out, who);
}
/// Called by peer when it is disconnecting
pub fn on_peer_disconnected(&mut self, mut network_out: &mut dyn NetworkOut<B>, peer: PeerId, debug_info: String) {
trace!(target: "sync", "Disconnecting {}: {}", peer, debug_info);
pub fn on_peer_disconnected(&mut self, mut network_out: &mut dyn NetworkOut<B>, peer: PeerId) {
trace!(target: "sync", "Disconnecting {}", peer);
// lock all the the peer lists so that add/remove peer events are in order
let removed = {
self.handshaking_peers.remove(&peer);
@@ -24,7 +24,7 @@ use std::time;
use log::{trace, debug};
use futures::sync::mpsc;
use lru_cache::LruCache;
use network_libp2p::PeerId;
use libp2p::PeerId;
use runtime_primitives::traits::{Block as BlockT, Hash, HashFor};
use runtime_primitives::ConsensusEngineId;
pub use crate::message::generic::{Message, ConsensusMessage};
@@ -125,8 +125,8 @@ pub struct RemoteReadResponse {
/// Generic types.
pub mod generic {
use crate::custom_proto::CustomMessage;
use parity_codec::{Encode, Decode};
use network_libp2p::CustomMessage;
use runtime_primitives::Justification;
use crate::config::Roles;
use super::{
@@ -27,7 +27,7 @@ use client::light::fetcher::{FetchChecker, RemoteHeaderRequest,
RemoteCallRequest, RemoteReadRequest, RemoteChangesRequest, ChangesProof,
RemoteReadChildRequest, RemoteBodyRequest};
use crate::message::{self, BlockAttributes, Direction, FromBlock, RequestId};
use network_libp2p::PeerId;
use libp2p::PeerId;
use crate::config::Roles;
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, NumberFor};
@@ -644,7 +644,7 @@ pub mod tests {
RemoteReadChildRequest, RemoteChangesRequest, RemoteBodyRequest};
use crate::config::Roles;
use crate::message::{self, BlockAttributes, Direction, FromBlock, RequestId};
use network_libp2p::PeerId;
use libp2p::PeerId;
use super::{REQUEST_TIMEOUT, OnDemandCore, OnDemandNetwork, RequestData};
use test_client::runtime::{changes_trie_config, Block, Extrinsic, Header};
@@ -16,9 +16,9 @@
//! Specializations of the substrate network protocol to allow more complex forms of communication.
use crate::PeerId;
use runtime_primitives::traits::Block as BlockT;
use crate::protocol::Context;
use libp2p::PeerId;
use runtime_primitives::traits::Block as BlockT;
/// A specialization of the substrate network protocol. Handles events and sends messages.
pub trait NetworkSpecialization<B: BlockT>: Send + Sync + 'static {
+1 -1
View File
@@ -35,7 +35,7 @@ use std::ops::Range;
use std::collections::{HashMap, VecDeque};
use log::{debug, trace, warn, info, error};
use crate::protocol::PeerInfo as ProtocolPeerInfo;
use network_libp2p::PeerId;
use libp2p::PeerId;
use client::{BlockStatus, ClientInfo};
use consensus::{BlockOrigin, import_queue::{IncomingBlock, SharedFinalityProofRequestBuilder}};
use client::error::Error as ClientError;
@@ -20,7 +20,7 @@ use std::ops::Range;
use std::collections::{HashMap, BTreeMap};
use std::collections::hash_map::Entry;
use log::trace;
use network_libp2p::PeerId;
use libp2p::PeerId;
use runtime_primitives::traits::{Block as BlockT, NumberFor, One};
use crate::message;
@@ -20,7 +20,7 @@ use log::{trace, warn};
use client::error::Error as ClientError;
use consensus::import_queue::SharedFinalityProofRequestBuilder;
use fork_tree::ForkTree;
use network_libp2p::PeerId;
use libp2p::PeerId;
use runtime_primitives::Justification;
use runtime_primitives::traits::{Block as BlockT, NumberFor};
use crate::protocol::message;
+179 -31
View File
@@ -15,16 +15,20 @@
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use std::collections::HashMap;
use std::io;
use std::{fs, io, path::Path};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use log::{warn, debug, error, info};
use libp2p::core::swarm::NetworkBehaviour;
use libp2p::core::{nodes::Substream, transport::boxed::Boxed, muxing::StreamMuxerBox};
use futures::{prelude::*, sync::oneshot, sync::mpsc};
use parking_lot::{Mutex, RwLock};
use network_libp2p::{start_service, parse_str_addr, Service as Libp2pNetService, ServiceEvent as Libp2pNetServiceEvent};
use network_libp2p::{RegisteredProtocol, NetworkState};
use crate::custom_proto::{CustomProto, CustomProtoOut};
use crate::{behaviour::Behaviour, parse_str_addr, ProtocolId};
use crate::{NetworkState, NetworkStateNotConnectedPeer, NetworkStatePeer};
use crate::{transport, config::NodeKeyConfig, config::NonReservedPeerMode, config::NetworkConfiguration};
use peerset::PeersetHandle;
use consensus::import_queue::{ImportQueue, Link, SharedFinalityProofRequestBuilder};
use runtime_primitives::{traits::{Block as BlockT, NumberFor}, ConsensusEngineId};
@@ -40,12 +44,14 @@ use crate::config::Params;
use crate::error::Error;
use crate::protocol::specialization::NetworkSpecialization;
mod tests;
/// Interval at which we send status updates on the SyncProvider status stream.
const STATUS_INTERVAL: Duration = Duration::from_millis(5000);
/// Interval at which we update the `peers` field on the main thread.
const CONNECTED_PEERS_INTERVAL: Duration = Duration::from_millis(500);
pub use network_libp2p::PeerId;
pub use libp2p::PeerId;
/// Type that represents fetch completion future.
pub type FetchFuture = oneshot::Receiver<Vec<u8>>;
@@ -187,7 +193,9 @@ pub struct NetworkService<B: BlockT + 'static, S: NetworkSpecialization<B>> {
/// Channel for networking messages processed by the background thread.
network_chan: mpsc::UnboundedSender<NetworkMsg<B>>,
/// Network service
network: Arc<Mutex<Libp2pNetService<Message<B>>>>,
network: Arc<Mutex<Swarm<B>>>,
/// Bandwidth logging system. Can be queried to know the average bandwidth consumed.
bandwidth: Arc<transport::BandwidthSinks>,
/// Peerset manager (PSM); manages the reputation of nodes and indicates the network which
/// nodes it should be connected to or not.
peerset: PeersetHandle,
@@ -227,19 +235,20 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkWorker
params.specialization,
)?;
let versions: Vec<_> = ((protocol::MIN_VERSION as u8)..=(protocol::CURRENT_VERSION as u8)).collect();
let registered = RegisteredProtocol::new(params.protocol_id, &versions);
// Start the main service.
let (network, peerset) = match start_service(params.network_config, registered) {
Ok((network, peerset)) => (Arc::new(Mutex::new(network)), peerset),
Err(err) => {
warn!("Error starting network: {}", err);
return Err(err.into())
},
};
let (network, bandwidth, peerset) =
match start_service::<B, _>(params.network_config, params.protocol_id, &versions) {
Ok((network, bandwidth, peerset)) => (Arc::new(Mutex::new(network)), bandwidth, peerset),
Err(err) => {
warn!("Error starting network: {}", err);
return Err(err.into())
},
};
let service = Arc::new(NetworkService {
status_sinks: status_sinks.clone(),
bandwidth,
is_offline: is_offline.clone(),
is_major_syncing: is_major_syncing.clone(),
network_chan,
@@ -278,20 +287,18 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkWorker
impl<B: BlockT + 'static, S: NetworkSpecialization<B>> NetworkService<B, S> {
/// Returns the downloaded bytes per second averaged over the past few seconds.
#[inline]
pub fn average_download_per_sec(&self) -> u64 {
self.network.lock().average_download_per_sec()
self.bandwidth.average_download_per_sec()
}
/// Returns the uploaded bytes per second averaged over the past few seconds.
#[inline]
pub fn average_upload_per_sec(&self) -> u64 {
self.network.lock().average_upload_per_sec()
self.bandwidth.average_upload_per_sec()
}
/// Returns the network identity of the node.
pub fn local_peer_id(&self) -> PeerId {
self.network.lock().peer_id().clone()
Swarm::<B>::local_peer_id(&*self.network.lock()).clone()
}
/// Called when a new block is imported by the client.
@@ -404,7 +411,58 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> SyncProvider<B> for Netwo
}
fn network_state(&self) -> NetworkState {
self.network.lock().state()
let mut swarm = self.network.lock();
let open = swarm.user_protocol().open_peers().cloned().collect::<Vec<_>>();
let connected_peers = {
let swarm = &mut *swarm;
open.iter().filter_map(move |peer_id| {
let known_addresses = NetworkBehaviour::addresses_of_peer(&mut **swarm, peer_id)
.into_iter().collect();
let endpoint = if let Some(e) = swarm.node(peer_id).map(|i| i.endpoint()) {
e.clone().into()
} else {
error!(target: "sub-libp2p", "Found state inconsistency between custom protocol \
and debug information about {:?}", peer_id);
return None
};
Some((peer_id.to_base58(), NetworkStatePeer {
endpoint,
version_string: swarm.node(peer_id).and_then(|i| i.client_version().map(|s| s.to_owned())).clone(),
latest_ping_time: swarm.node(peer_id).and_then(|i| i.latest_ping()),
enabled: swarm.user_protocol().is_enabled(&peer_id),
open: swarm.user_protocol().is_open(&peer_id),
known_addresses,
}))
}).collect()
};
let not_connected_peers = {
let swarm = &mut *swarm;
let list = swarm.known_peers().filter(|p| open.iter().all(|n| n != *p))
.cloned().collect::<Vec<_>>();
list.into_iter().map(move |peer_id| {
(peer_id.to_base58(), NetworkStateNotConnectedPeer {
version_string: swarm.node(&peer_id).and_then(|i| i.client_version().map(|s| s.to_owned())).clone(),
latest_ping_time: swarm.node(&peer_id).and_then(|i| i.latest_ping()),
known_addresses: NetworkBehaviour::addresses_of_peer(&mut **swarm, &peer_id)
.into_iter().collect(),
})
}).collect()
};
NetworkState {
peer_id: Swarm::<B>::local_peer_id(&swarm).to_base58(),
listened_addresses: Swarm::<B>::listeners(&swarm).cloned().collect(),
external_addresses: Swarm::<B>::external_addresses(&swarm).cloned().collect(),
average_download_per_sec: self.bandwidth.average_download_per_sec(),
average_upload_per_sec: self.bandwidth.average_upload_per_sec(),
connected_peers,
not_connected_peers,
peerset: swarm.user_protocol_mut().peerset_debug_info(),
}
}
fn peers_debug_info(&self) -> Vec<(PeerId, PeerInfo<B>)> {
@@ -533,7 +591,7 @@ pub struct NetworkWorker<B: BlockT + 'static, S: NetworkSpecialization<B>, H: Ex
protocol: Protocol<B, S, H>,
/// The network service that can be extracted and shared through the codebase.
service: Arc<NetworkService<B, S>>,
network_service: Arc<Mutex<Libp2pNetService<Message<B>>>>,
network_service: Arc<Mutex<Swarm<B>>>,
peers: Arc<RwLock<HashMap<PeerId, ConnectedPeer<B>>>>,
import_queue: Box<dyn ImportQueue<B>>,
transaction_pool: Arc<dyn TransactionPool<H, B>>,
@@ -556,16 +614,16 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Future for Ne
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
// Implementation of `protocol::NetworkOut` using the available local variables.
struct Context<'a, B: BlockT>(&'a mut Libp2pNetService<Message<B>>, &'a PeersetHandle);
struct Context<'a, B: BlockT>(&'a mut Swarm<B>, &'a PeersetHandle);
impl<'a, B: BlockT> NetworkOut<B> for Context<'a, B> {
fn report_peer(&mut self, who: PeerId, reputation: i32) {
self.1.report_peer(who, reputation)
}
fn disconnect_peer(&mut self, who: PeerId) {
self.0.drop_node(&who)
self.0.user_protocol_mut().disconnect_peer(&who)
}
fn send_message(&mut self, who: PeerId, message: Message<B>) {
self.0.send_custom_message(&who, message)
self.0.user_protocol_mut().send_packet(&who, message)
}
}
@@ -598,11 +656,11 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Future for Ne
match self.network_port.poll() {
Ok(Async::NotReady) => break,
Ok(Async::Ready(Some(NetworkMsg::Outgoing(who, outgoing_message)))) =>
self.network_service.lock().send_custom_message(&who, outgoing_message),
self.network_service.lock().user_protocol_mut().send_packet(&who, outgoing_message),
Ok(Async::Ready(Some(NetworkMsg::ReportPeer(who, reputation)))) =>
self.peerset.report_peer(who, reputation),
Ok(Async::Ready(Some(NetworkMsg::DisconnectPeer(who)))) =>
self.network_service.lock().drop_node(&who),
self.network_service.lock().user_protocol_mut().disconnect_peer(&who),
#[cfg(any(test, feature = "test-helpers"))]
Ok(Async::Ready(Some(NetworkMsg::Synchronized))) => {}
@@ -672,19 +730,19 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Future for Ne
let outcome = match poll_value {
Ok(Async::NotReady) => break,
Ok(Async::Ready(Some(Libp2pNetServiceEvent::OpenedCustomProtocol { peer_id, version, debug_info, .. }))) => {
Ok(Async::Ready(Some(CustomProtoOut::CustomProtocolOpen { peer_id, version, .. }))) => {
debug_assert!(
version <= protocol::CURRENT_VERSION as u8
&& version >= protocol::MIN_VERSION as u8
);
self.protocol.on_peer_connected(&mut network_out, peer_id, debug_info);
self.protocol.on_peer_connected(&mut network_out, peer_id);
CustomMessageOutcome::None
}
Ok(Async::Ready(Some(Libp2pNetServiceEvent::ClosedCustomProtocol { peer_id, debug_info, .. }))) => {
self.protocol.on_peer_disconnected(&mut network_out, peer_id, debug_info);
Ok(Async::Ready(Some(CustomProtoOut::CustomProtocolClosed { peer_id, .. }))) => {
self.protocol.on_peer_disconnected(&mut network_out, peer_id);
CustomMessageOutcome::None
},
Ok(Async::Ready(Some(Libp2pNetServiceEvent::CustomMessage { peer_id, message, .. }))) =>
Ok(Async::Ready(Some(CustomProtoOut::CustomMessage { peer_id, message }))) =>
self.protocol.on_custom_message(
&mut network_out,
&*self.transaction_pool,
@@ -692,7 +750,7 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Future for Ne
message,
self.finality_proof_provider.as_ref().map(|p| &**p)
),
Ok(Async::Ready(Some(Libp2pNetServiceEvent::Clogged { peer_id, messages, .. }))) => {
Ok(Async::Ready(Some(CustomProtoOut::Clogged { peer_id, messages, .. }))) => {
debug!(target: "sync", "{} clogging messages:", messages.len());
for msg in messages.into_iter().take(5) {
debug!(target: "sync", "{:?}", msg);
@@ -724,3 +782,93 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Future for Ne
Ok(Async::NotReady)
}
}
/// The libp2p swarm, customized for our needs.
type Swarm<B> = libp2p::core::Swarm<
Boxed<(PeerId, StreamMuxerBox), io::Error>,
Behaviour<CustomProto<Message<B>, Substream<StreamMuxerBox>>, CustomProtoOut<Message<B>>, Substream<StreamMuxerBox>>
>;
/// Starts the substrate libp2p service.
///
/// Returns a stream that must be polled regularly in order for the networking to function.
fn start_service<B: BlockT, Pid: Into<ProtocolId>>(
config: NetworkConfiguration,
protocol_id: Pid,
versions: &[u8],
) -> Result<(Swarm<B>, Arc<transport::BandwidthSinks>, peerset::PeersetHandle), io::Error> {
if let Some(ref path) = config.net_config_path {
fs::create_dir_all(Path::new(path))?;
}
// List of multiaddresses that we know in the network.
let mut known_addresses = Vec::new();
let mut bootnodes = Vec::new();
let mut reserved_nodes = Vec::new();
// Process the bootnodes.
for bootnode in config.boot_nodes.iter() {
match parse_str_addr(bootnode) {
Ok((peer_id, addr)) => {
bootnodes.push(peer_id.clone());
known_addresses.push((peer_id, addr));
},
Err(_) => warn!(target: "sub-libp2p", "Not a valid bootnode address: {}", bootnode),
}
}
// Initialize the reserved peers.
for reserved in config.reserved_nodes.iter() {
if let Ok((peer_id, addr)) = parse_str_addr(reserved) {
reserved_nodes.push(peer_id.clone());
known_addresses.push((peer_id, addr));
} else {
warn!(target: "sub-libp2p", "Not a valid reserved node address: {}", reserved);
}
}
// Build the peerset.
let (peerset, peerset_handle) = peerset::Peerset::from_config(peerset::PeersetConfig {
in_peers: config.in_peers,
out_peers: config.out_peers,
bootnodes,
reserved_only: config.non_reserved_mode == NonReservedPeerMode::Deny,
reserved_nodes,
});
// Private and public keys configuration.
if let NodeKeyConfig::Secp256k1(_) = config.node_key {
warn!(target: "sub-libp2p", "Secp256k1 keys are deprecated in favour of ed25519");
}
let local_identity = config.node_key.clone().into_keypair()?;
let local_public = local_identity.public();
let local_peer_id = local_public.clone().into_peer_id();
info!(target: "sub-libp2p", "Local node identity is: {}", local_peer_id.to_base58());
// Build the swarm.
let (mut swarm, bandwidth) = {
let user_agent = format!("{} ({})", config.client_version, config.node_name);
let proto = CustomProto::new(protocol_id, versions, peerset);
let behaviour = Behaviour::new(proto, user_agent, local_public, known_addresses, config.enable_mdns);
let (transport, bandwidth) = transport::build_transport(
local_identity,
config.wasm_external_transport
);
(Swarm::<B>::new(transport, behaviour, local_peer_id.clone()), bandwidth)
};
// Listen on multiaddresses.
for addr in &config.listen_addresses {
if let Err(err) = Swarm::<B>::listen_on(&mut swarm, addr.clone()) {
warn!(target: "sub-libp2p", "Can't listen on {} because: {:?}", addr, err)
}
}
// Add external addresses.
for addr in &config.public_addresses {
Swarm::<B>::add_external_address(&mut swarm, addr.clone());
}
Ok((swarm, bandwidth, peerset_handle))
}
+363
View File
@@ -0,0 +1,363 @@
// Copyright 2019 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
#![cfg(test)]
use futures::{future, stream, prelude::*, try_ready};
use libp2p::core::swarm::ExpandedSwarm;
use rand::seq::SliceRandom;
use runtime_primitives::traits::Block as BlockT;
use std::{io, time::Duration, time::Instant};
use test_client::runtime::Block;
use crate::protocol::message::generic::Message;
use crate::{Multiaddr, multiaddr::Protocol, build_multiaddr};
use crate::custom_proto::CustomProtoOut;
use super::{start_service, Swarm};
/// Builds two services. The second one and further have the first one as its bootstrap node.
/// This is to be used only for testing, and a panic will happen if something goes wrong.
fn build_nodes<B: BlockT>(num: usize, base_port: u16) -> Vec<Swarm<B>> {
let mut result: Vec<Swarm<B>> = Vec::with_capacity(num);
let mut first_addr = None::<Multiaddr>;
for index in 0 .. num {
let mut boot_nodes = Vec::new();
if let Some(first_addr) = first_addr.as_ref() {
boot_nodes.push(first_addr.clone()
.with(Protocol::P2p(ExpandedSwarm::local_peer_id(&result[0]).clone().into()))
.to_string());
}
let config = crate::config::NetworkConfiguration {
listen_addresses: vec![build_multiaddr![Ip4([127, 0, 0, 1]), Tcp(base_port + index as u16)]],
boot_nodes,
..crate::config::NetworkConfiguration::default()
};
if first_addr.is_none() {
first_addr = Some(config.listen_addresses.iter().next().unwrap().clone());
}
result.push(start_service::<B, _>(config, &b"tst"[..], &[1]).unwrap().0);
}
result
}
#[test]
fn basic_two_nodes_connectivity() {
let (mut service1, mut service2) = {
let mut l = build_nodes::<Block>(2, 50400).into_iter();
let a = l.next().unwrap();
let b = l.next().unwrap();
(a, b)
};
let fut1 = future::poll_fn(move || -> io::Result<_> {
match try_ready!(service1.poll()) {
Some(CustomProtoOut::CustomProtocolOpen { version, .. }) => {
assert_eq!(version, 1);
Ok(Async::Ready(()))
},
_ => panic!(),
}
});
let fut2 = future::poll_fn(move || -> io::Result<_> {
match try_ready!(service2.poll()) {
Some(CustomProtoOut::CustomProtocolOpen { version, .. }) => {
assert_eq!(version, 1);
Ok(Async::Ready(()))
},
_ => panic!(),
}
});
let combined = fut1.select(fut2).map_err(|(err, _)| err);
tokio::runtime::Runtime::new().unwrap().block_on_all(combined).unwrap();
}
#[test]
fn two_nodes_transfer_lots_of_packets() {
// We spawn two nodes, then make the first one send lots of packets to the second one. The test
// ends when the second one has received all of them.
// Note that if we go too high, we will reach the limit to the number of simultaneous
// substreams allowed by the multiplexer.
const NUM_PACKETS: u32 = 5000;
let (mut service1, mut service2) = {
let mut l = build_nodes::<Block>(2, 50450).into_iter();
let a = l.next().unwrap();
let b = l.next().unwrap();
(a, b)
};
let fut1 = future::poll_fn(move || -> io::Result<_> {
loop {
match try_ready!(service1.poll()) {
Some(CustomProtoOut::CustomProtocolOpen { peer_id, .. }) => {
for n in 0 .. NUM_PACKETS {
service1.user_protocol_mut().send_packet(
&peer_id,
Message::ChainSpecific(vec![(n % 256) as u8])
);
}
},
_ => panic!(),
}
}
});
let mut packet_counter = 0u32;
let fut2 = future::poll_fn(move || -> io::Result<_> {
loop {
match try_ready!(service2.poll()) {
Some(CustomProtoOut::CustomProtocolOpen { .. }) => {},
Some(CustomProtoOut::CustomMessage { message: Message::ChainSpecific(message), .. }) => {
assert_eq!(message.len(), 1);
packet_counter += 1;
if packet_counter == NUM_PACKETS {
return Ok(Async::Ready(()))
}
}
_ => panic!(),
}
}
});
let combined = fut1.select(fut2).map_err(|(err, _)| err);
tokio::runtime::Runtime::new().unwrap().block_on(combined).unwrap();
}
#[test]
fn many_nodes_connectivity() {
// Creates many nodes, then make sure that they are all connected to each other.
// Note: if you increase this number, keep in mind that there's a limit to the number of
// simultaneous connections which will make the test fail if it is reached. This can be
// increased in the `NetworkConfiguration`.
const NUM_NODES: usize = 25;
let mut futures = build_nodes::<Block>(NUM_NODES, 50500)
.into_iter()
.map(move |mut node| {
let mut num_connecs = 0;
stream::poll_fn(move || -> io::Result<_> {
loop {
match try_ready!(node.poll()) {
Some(CustomProtoOut::CustomProtocolOpen { .. }) => {
num_connecs += 1;
assert!(num_connecs < NUM_NODES);
if num_connecs == NUM_NODES - 1 {
return Ok(Async::Ready(Some(true)))
}
}
Some(CustomProtoOut::CustomProtocolClosed { .. }) => {
let was_success = num_connecs == NUM_NODES - 1;
num_connecs -= 1;
if was_success && num_connecs < NUM_NODES - 1 {
return Ok(Async::Ready(Some(false)))
}
}
_ => panic!(),
}
}
})
})
.collect::<Vec<_>>();
let mut successes = 0;
let combined = future::poll_fn(move || -> io::Result<_> {
for node in futures.iter_mut() {
match node.poll()? {
Async::Ready(Some(true)) => successes += 1,
Async::Ready(Some(false)) => successes -= 1,
Async::Ready(None) => unreachable!(),
Async::NotReady => ()
}
}
if successes == NUM_NODES {
Ok(Async::Ready(()))
} else {
Ok(Async::NotReady)
}
});
tokio::runtime::Runtime::new().unwrap().block_on(combined).unwrap();
}
#[test]
fn basic_two_nodes_requests_in_parallel() {
let (mut service1, mut service2) = {
let mut l = build_nodes::<Block>(2, 50550).into_iter();
let a = l.next().unwrap();
let b = l.next().unwrap();
(a, b)
};
// Generate random messages with or without a request id.
let mut to_send = {
let mut to_send = Vec::new();
for _ in 0..200 { // Note: don't make that number too high or the CPU usage will explode.
let msg = (0..10).map(|_| rand::random::<u8>()).collect::<Vec<_>>();
to_send.push(Message::ChainSpecific(msg));
}
to_send
};
// Clone `to_send` in `to_receive`. Below we will remove from `to_receive` the messages we
// receive, until the list is empty.
let mut to_receive = to_send.clone();
to_send.shuffle(&mut rand::thread_rng());
let fut1 = future::poll_fn(move || -> io::Result<_> {
loop {
match try_ready!(service1.poll()) {
Some(CustomProtoOut::CustomProtocolOpen { peer_id, .. }) => {
for msg in to_send.drain(..) {
service1.user_protocol_mut().send_packet(&peer_id, msg);
}
},
_ => panic!(),
}
}
});
let fut2 = future::poll_fn(move || -> io::Result<_> {
loop {
match try_ready!(service2.poll()) {
Some(CustomProtoOut::CustomProtocolOpen { .. }) => {},
Some(CustomProtoOut::CustomMessage { message, .. }) => {
let pos = to_receive.iter().position(|m| *m == message).unwrap();
to_receive.remove(pos);
if to_receive.is_empty() {
return Ok(Async::Ready(()))
}
}
_ => panic!(),
}
}
});
let combined = fut1.select(fut2).map_err(|(err, _)| err);
tokio::runtime::Runtime::new().unwrap().block_on_all(combined).unwrap();
}
#[test]
fn reconnect_after_disconnect() {
// We connect two nodes together, then force a disconnect (through the API of the `Service`),
// check that the disconnect worked, and finally check whether they successfully reconnect.
let (mut service1, mut service2) = {
let mut l = build_nodes::<Block>(2, 50350).into_iter();
let a = l.next().unwrap();
let b = l.next().unwrap();
(a, b)
};
// We use the `current_thread` runtime because it doesn't require us to have `'static` futures.
let mut runtime = tokio::runtime::current_thread::Runtime::new().unwrap();
// For this test, the services can be in the following states.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
enum ServiceState { NotConnected, FirstConnec, Disconnected, ConnectedAgain }
let mut service1_state = ServiceState::NotConnected;
let mut service2_state = ServiceState::NotConnected;
// Run the events loops.
runtime.block_on(future::poll_fn(|| -> Result<_, io::Error> {
loop {
let mut service1_not_ready = false;
match service1.poll().unwrap() {
Async::Ready(Some(CustomProtoOut::CustomProtocolOpen { .. })) => {
match service1_state {
ServiceState::NotConnected => {
service1_state = ServiceState::FirstConnec;
if service2_state == ServiceState::FirstConnec {
service1.user_protocol_mut().disconnect_peer(ExpandedSwarm::local_peer_id(&service2));
}
},
ServiceState::Disconnected => service1_state = ServiceState::ConnectedAgain,
ServiceState::FirstConnec | ServiceState::ConnectedAgain => panic!(),
}
},
Async::Ready(Some(CustomProtoOut::CustomProtocolClosed { .. })) => {
match service1_state {
ServiceState::FirstConnec => service1_state = ServiceState::Disconnected,
ServiceState::ConnectedAgain| ServiceState::NotConnected |
ServiceState::Disconnected => panic!(),
}
},
Async::NotReady => service1_not_ready = true,
_ => panic!()
}
match service2.poll().unwrap() {
Async::Ready(Some(CustomProtoOut::CustomProtocolOpen { .. })) => {
match service2_state {
ServiceState::NotConnected => {
service2_state = ServiceState::FirstConnec;
if service1_state == ServiceState::FirstConnec {
service1.user_protocol_mut().disconnect_peer(ExpandedSwarm::local_peer_id(&service2));
}
},
ServiceState::Disconnected => service2_state = ServiceState::ConnectedAgain,
ServiceState::FirstConnec | ServiceState::ConnectedAgain => panic!(),
}
},
Async::Ready(Some(CustomProtoOut::CustomProtocolClosed { .. })) => {
match service2_state {
ServiceState::FirstConnec => service2_state = ServiceState::Disconnected,
ServiceState::ConnectedAgain| ServiceState::NotConnected |
ServiceState::Disconnected => panic!(),
}
},
Async::NotReady if service1_not_ready => break,
Async::NotReady => {}
_ => panic!()
}
}
if service1_state == ServiceState::ConnectedAgain && service2_state == ServiceState::ConnectedAgain {
Ok(Async::Ready(()))
} else {
Ok(Async::NotReady)
}
})).unwrap();
// Do a second 3-seconds run to make sure we don't get disconnected immediately again.
let mut delay = tokio::timer::Delay::new(Instant::now() + Duration::from_secs(3));
runtime.block_on(future::poll_fn(|| -> Result<_, io::Error> {
match service1.poll().unwrap() {
Async::NotReady => {},
_ => panic!()
}
match service2.poll().unwrap() {
Async::NotReady => {},
_ => panic!()
}
if let Async::Ready(()) = delay.poll().unwrap() {
Ok(Async::Ready(()))
} else {
Ok(Async::NotReady)
}
})).unwrap();
}
+11 -11
View File
@@ -42,7 +42,7 @@ use consensus::{BlockOrigin, ForkChoiceStrategy, ImportBlock, JustificationImpor
use crate::consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient, TopicNotification};
use futures::{prelude::*, sync::{mpsc, oneshot}};
use crate::message::Message;
use network_libp2p::PeerId;
use libp2p::PeerId;
use parking_lot::{Mutex, RwLock};
use primitives::{H256, sr25519::Public as AuthorityId, Blake2Hasher};
use crate::protocol::{Context, Protocol, ProtocolConfig, ProtocolStatus, CustomMessageOutcome, NetworkOut};
@@ -296,10 +296,10 @@ pub struct Peer<D, S: NetworkSpecialization<Block>> {
type MessageFilter = dyn Fn(&NetworkMsg<Block>) -> bool;
pub enum FromNetworkMsg<B: BlockT> {
/// A peer connected, with debug info.
PeerConnected(PeerId, String),
/// A peer disconnected, with debug info.
PeerDisconnected(PeerId, String),
/// A peer connected.
PeerConnected(PeerId),
/// A peer disconnected.
PeerDisconnected(PeerId),
/// A custom message from another peer.
CustomMessage(PeerId, Message<B>),
/// Synchronization request.
@@ -504,12 +504,12 @@ impl<D, S: NetworkSpecialization<Block>> Peer<D, S> {
/// Called on connection to other indicated peer.
fn on_connect(&self, other: &Self) {
self.net_proto_channel.send_from_net(FromNetworkMsg::PeerConnected(other.peer_id.clone(), String::new()));
self.net_proto_channel.send_from_net(FromNetworkMsg::PeerConnected(other.peer_id.clone()));
}
/// Called on disconnect from other indicated peer.
fn on_disconnect(&self, other: &Self) {
self.net_proto_channel.send_from_net(FromNetworkMsg::PeerDisconnected(other.peer_id.clone(), String::new()));
self.net_proto_channel.send_from_net(FromNetworkMsg::PeerDisconnected(other.peer_id.clone()));
}
/// Receive a message from another peer. Return a set of peers to disconnect.
@@ -828,12 +828,12 @@ pub trait TestNetFactory: Sized {
tokio::runtime::current_thread::run(futures::future::poll_fn(move || {
while let Async::Ready(msg) = network_to_protocol_rx.poll().unwrap() {
let outcome = match msg {
Some(FromNetworkMsg::PeerConnected(peer_id, debug_msg)) => {
protocol.on_peer_connected(&mut Ctxt(&network_sender), peer_id, debug_msg);
Some(FromNetworkMsg::PeerConnected(peer_id)) => {
protocol.on_peer_connected(&mut Ctxt(&network_sender), peer_id);
CustomMessageOutcome::None
},
Some(FromNetworkMsg::PeerDisconnected(peer_id, debug_msg)) => {
protocol.on_peer_disconnected(&mut Ctxt(&network_sender), peer_id, debug_msg);
Some(FromNetworkMsg::PeerDisconnected(peer_id)) => {
protocol.on_peer_disconnected(&mut Ctxt(&network_sender), peer_id);
CustomMessageOutcome::None
},
Some(FromNetworkMsg::CustomMessage(peer_id, message)) =>
+75
View File
@@ -0,0 +1,75 @@
// Copyright 2018-2019 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use futures::prelude::*;
use libp2p::{
InboundUpgradeExt, OutboundUpgradeExt, PeerId, Transport,
mplex, identity, secio, yamux, bandwidth, wasm_ext
};
#[cfg(not(target_os = "unknown"))]
use libp2p::{tcp, dns, websocket};
use libp2p::core::{self, transport::boxed::Boxed, transport::OptionalTransport, muxing::StreamMuxerBox};
use std::{io, sync::Arc, time::Duration, usize};
pub use self::bandwidth::BandwidthSinks;
/// Builds the transport that serves as a common ground for all connections.
///
/// Returns a `BandwidthSinks` object that allows querying the average bandwidth produced by all
/// the connections spawned with this transport.
pub fn build_transport(
keypair: identity::Keypair,
wasm_external_transport: Option<wasm_ext::ExtTransport>
) -> (Boxed<(PeerId, StreamMuxerBox), io::Error>, Arc<bandwidth::BandwidthSinks>) {
let mut mplex_config = mplex::MplexConfig::new();
mplex_config.max_buffer_len_behaviour(mplex::MaxBufferBehaviour::Block);
mplex_config.max_buffer_len(usize::MAX);
let transport = if let Some(t) = wasm_external_transport {
OptionalTransport::some(t)
} else {
OptionalTransport::none()
};
#[cfg(not(target_os = "unknown"))]
let transport = {
let desktop_trans = tcp::TcpConfig::new();
let desktop_trans = websocket::WsConfig::new(desktop_trans.clone())
.or_transport(desktop_trans);
transport.or_transport(dns::DnsConfig::new(desktop_trans))
};
let (transport, sinks) = bandwidth::BandwidthLogging::new(transport, Duration::from_secs(5));
// TODO: rework the transport creation (https://github.com/libp2p/rust-libp2p/issues/783)
let transport = transport
.with_upgrade(secio::SecioConfig::new(keypair))
.and_then(move |out, endpoint| {
let peer_id = out.remote_key.into_peer_id();
let peer_id2 = peer_id.clone();
let upgrade = core::upgrade::SelectUpgrade::new(yamux::Config::default(), mplex_config)
.map_inbound(move |muxer| (peer_id, muxer))
.map_outbound(move |muxer| (peer_id2, muxer));
core::upgrade::apply(out.stream, upgrade, endpoint)
.map(|(id, muxer)| (id, core::muxing::StreamMuxerBox::new(muxer)))
})
.with_timeout(Duration::from_secs(20))
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
.boxed();
(transport, sinks)
}