Update to latest libp2p (#1386)

* Update to latest libp2p

* Fix indentations

* Add basic test

* Apply suggestions from code review

Co-Authored-By: tomaka <pierre.krieger1708@gmail.com>

* Remove Mutex from topology

* Remove unused method

* Fix concerns
This commit is contained in:
Pierre Krieger
2019-01-14 13:38:36 +01:00
committed by Gav Wood
parent 08402d26e6
commit a0d458aa06
15 changed files with 1975 additions and 3138 deletions
+290 -548
View File
File diff suppressed because it is too large Load Diff
+3 -1
View File
@@ -11,7 +11,7 @@ bytes = "0.4"
error-chain = { version = "0.12", default-features = false }
fnv = "1.0"
futures = "0.1"
libp2p = { git = "https://github.com/tomaka/libp2p-rs", rev = "997d0163bc8a7e11559524ad8466bc3b1850c8ec", default-features = false, features = ["secio-rsa", "secio-secp256k1"] }
libp2p = { version = "0.2", default-features = false, features = ["secio-rsa", "secio-secp256k1"] }
parking_lot = "0.7.1"
libc = "0.2"
log = "0.4"
@@ -19,10 +19,12 @@ rand = "0.5.0"
serde = "1.0.70"
serde_derive = "1.0.70"
serde_json = "1.0.24"
smallvec = "0.6"
tokio = "0.1"
tokio-io = "0.1"
tokio-timer = "0.2"
unsigned-varint = { version = "0.2.1", features = ["codec"] }
void = "1.0"
[dev-dependencies]
assert_matches = "1.2"
@@ -0,0 +1,262 @@
// Copyright 2019 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use crate::custom_proto::{CustomProtos, CustomProtosOut, RegisteredProtocols};
use crate::{NetworkConfiguration, ProtocolId};
use bytes::Bytes;
use futures::prelude::*;
use libp2p::core::{PeerId, ProtocolsHandler};
use libp2p::core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction};
use libp2p::core::swarm::{NetworkBehaviourEventProcess, PollParameters};
use libp2p::identify::{Identify, IdentifyEvent};
use libp2p::kad::{Kademlia, KademliaOut, KademliaTopology};
use libp2p::ping::{Ping, PingEvent};
use std::{cmp, time::Duration, time::Instant};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_timer::Delay;
use void;
/// General behaviour of the network.
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "CustomProtosOut", poll_method = "poll")]
pub struct Behaviour<TSubstream> {
/// Periodically ping nodes, and close the connection if it's unresponsive.
ping: Ping<TSubstream>,
/// Custom protocols (dot, bbq, sub, etc.).
custom_protocols: CustomProtos<TSubstream>,
/// Discovers nodes of the network. Defined below.
discovery: DiscoveryBehaviour<TSubstream>,
/// Periodically identifies the remote and responds to incoming requests.
identify: Identify<TSubstream>,
/// Queue of events to produce for the outside.
#[behaviour(ignore)]
events: Vec<CustomProtosOut>,
}
impl<TSubstream> Behaviour<TSubstream> {
/// Builds a new `Behaviour`.
// TODO: redundancy between config and local_peer_id (https://github.com/libp2p/rust-libp2p/issues/745)
pub fn new(config: &NetworkConfiguration, local_peer_id: PeerId, protocols: RegisteredProtocols) -> Self {
Behaviour {
ping: Ping::new(),
custom_protocols: CustomProtos::new(config, protocols),
discovery: DiscoveryBehaviour::new(local_peer_id),
identify: Identify::new(
// The agent and protocol versions; maybe we should use something better?
concat!("substrate/", env!("CARGO_PKG_VERSION")).to_owned(),
concat!("substrate/", env!("CARGO_PKG_VERSION")).to_owned()
),
events: Vec::new(),
}
}
/// Sends a message to a peer using the given custom protocol.
///
/// Has no effect if the custom protocol is not open with the given peer.
///
/// Also note that even we have a valid open substream, it may in fact be already closed
/// without us knowing, in which case the packet will not be received.
#[inline]
pub fn send_custom_message(&mut self, target: &PeerId, protocol_id: ProtocolId, data: impl Into<Bytes>) {
self.custom_protocols.send_packet(target, protocol_id, data)
}
/// Try to add a reserved peer.
pub fn add_reserved_peer(&mut self, peer_id: PeerId) {
self.custom_protocols.add_reserved_peer(peer_id)
}
/// Try to remove a reserved peer.
///
/// If we are in reserved mode and we were connected to a node with this peer ID, then this
/// method will disconnect it and return its index.
pub fn remove_reserved_peer(&mut self, peer_id: PeerId) {
self.custom_protocols.remove_reserved_peer(peer_id)
}
/// Start accepting all peers again if we weren't.
pub fn accept_unreserved_peers(&mut self) {
self.custom_protocols.accept_unreserved_peers()
}
/// Start refusing non-reserved nodes. Returns the list of nodes that have been disconnected.
pub fn deny_unreserved_peers(&mut self) {
self.custom_protocols.deny_unreserved_peers()
}
/// Disconnects a peer and bans it for a little while.
///
/// Same as `drop_node`, except that the same peer will not be able to reconnect later.
#[inline]
pub fn ban_node(&mut self, peer_id: PeerId) {
self.custom_protocols.ban_peer(peer_id)
}
/// Disconnects the custom protocols from a peer.
///
/// The peer will still be able to use Kademlia or other protocols, but will get disconnected
/// after a few seconds of inactivity.
///
/// This is asynchronous and does not instantly close the custom protocols.
/// Corresponding closing events will be generated once the closing actually happens.
///
/// Has no effect if we're not connected to the `PeerId`.
#[inline]
pub fn drop_node(&mut self, peer_id: &PeerId) {
self.custom_protocols.disconnect_peer(peer_id)
}
}
impl<TSubstream> NetworkBehaviourEventProcess<void::Void> for Behaviour<TSubstream> {
fn inject_event(&mut self, event: void::Void) {
void::unreachable(event)
}
}
impl<TSubstream> NetworkBehaviourEventProcess<CustomProtosOut> for Behaviour<TSubstream> {
fn inject_event(&mut self, event: CustomProtosOut) {
self.events.push(event);
}
}
impl<TSubstream> NetworkBehaviourEventProcess<IdentifyEvent> for Behaviour<TSubstream> {
fn inject_event(&mut self, event: IdentifyEvent) {
match event {
IdentifyEvent::Identified { peer_id, info, .. } => {
trace!(target: "sub-libp2p", "Identified {:?} => {:?}", peer_id, info);
}
IdentifyEvent::Error { .. } => {}
}
}
}
impl<TSubstream> NetworkBehaviourEventProcess<KademliaOut> for Behaviour<TSubstream> {
fn inject_event(&mut self, _: KademliaOut) {
// We only ever use Kademlia for discovering nodes, and nodes discovered by Kademlia are
// automatically added to the topology. Therefore we don't need to do anything.
}
}
impl<TSubstream> NetworkBehaviourEventProcess<PingEvent> for Behaviour<TSubstream> {
fn inject_event(&mut self, event: PingEvent) {
match event {
PingEvent::PingSuccess { peer, time } => {
trace!(target: "sub-libp2p", "Ping time with {:?}: {:?}", peer, time);
}
}
}
}
impl<TSubstream> Behaviour<TSubstream> {
fn poll<TEv>(&mut self) -> Async<NetworkBehaviourAction<TEv, CustomProtosOut>> {
if !self.events.is_empty() {
return Async::Ready(NetworkBehaviourAction::GenerateEvent(self.events.remove(0)))
}
Async::NotReady
}
}
/// Implementation of `NetworkBehaviour` that discovers the nodes on the network.
pub struct DiscoveryBehaviour<TSubstream> {
/// Kademlia requests and answers.
kademlia: Kademlia<TSubstream>,
/// Stream that fires when we need to perform the next random Kademlia query.
next_kad_random_query: Delay,
/// After `next_kad_random_query` triggers, the next one triggers after this duration.
duration_to_next_kad: Duration,
}
impl<TSubstream> DiscoveryBehaviour<TSubstream> {
fn new(local_peer_id: PeerId) -> Self {
DiscoveryBehaviour {
kademlia: Kademlia::without_init(local_peer_id),
next_kad_random_query: Delay::new(Instant::now()),
duration_to_next_kad: Duration::from_secs(1),
}
}
}
impl<TSubstream, TTopology> NetworkBehaviour<TTopology> for DiscoveryBehaviour<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
TTopology: KademliaTopology,
{
type ProtocolsHandler = <Kademlia<TSubstream> as NetworkBehaviour<TTopology>>::ProtocolsHandler;
type OutEvent = <Kademlia<TSubstream> as NetworkBehaviour<TTopology>>::OutEvent;
fn new_handler(&mut self) -> Self::ProtocolsHandler {
NetworkBehaviour::<TTopology>::new_handler(&mut self.kademlia)
}
fn inject_connected(&mut self, peer_id: PeerId, endpoint: ConnectedPoint) {
NetworkBehaviour::<TTopology>::inject_connected(&mut self.kademlia, peer_id, endpoint)
}
fn inject_disconnected(&mut self, peer_id: &PeerId, endpoint: ConnectedPoint) {
NetworkBehaviour::<TTopology>::inject_disconnected(&mut self.kademlia, peer_id, endpoint)
}
fn inject_node_event(
&mut self,
peer_id: PeerId,
event: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent,
) {
NetworkBehaviour::<TTopology>::inject_node_event(&mut self.kademlia, peer_id, event)
}
fn poll(
&mut self,
params: &mut PollParameters<TTopology>,
) -> Async<
NetworkBehaviourAction<
<Self::ProtocolsHandler as ProtocolsHandler>::InEvent,
Self::OutEvent,
>,
> {
// Poll Kademlia.
match self.kademlia.poll(params) {
Async::Ready(action) => return Async::Ready(action),
Async::NotReady => (),
}
// Poll the stream that fires when we need to start a random Kademlia query.
loop {
match self.next_kad_random_query.poll() {
Ok(Async::NotReady) => break,
Ok(Async::Ready(_)) => {
let random_peer_id = PeerId::random();
debug!(target: "sub-libp2p", "Starting random Kademlia request for {:?}",
random_peer_id);
self.kademlia.find_node(random_peer_id);
// Reset the `Delay` to the next random.
self.next_kad_random_query.reset(Instant::now() + self.duration_to_next_kad);
self.duration_to_next_kad = cmp::min(self.duration_to_next_kad * 2,
Duration::from_secs(60));
},
Err(err) => {
warn!(target: "sub-libp2p", "Kad query timer errored: {:?}", err);
break
}
}
}
Async::NotReady
}
}
@@ -0,0 +1,471 @@
// Copyright 2019 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use crate::custom_proto::handler::{CustomProtosHandler, CustomProtosHandlerOut, CustomProtosHandlerIn};
use crate::custom_proto::upgrade::RegisteredProtocols;
use crate::{NetworkConfiguration, NonReservedPeerMode, ProtocolId, topology::NetTopology};
use bytes::Bytes;
use fnv::{FnvHashMap, FnvHashSet};
use futures::prelude::*;
use libp2p::core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters};
use libp2p::core::{protocols_handler::ProtocolsHandler, PeerId};
use smallvec::SmallVec;
use std::{io, marker::PhantomData, time::Duration, time::Instant};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_timer::Delay;
// Duration during which a peer is disabled.
const PEER_DISABLE_DURATION: Duration = Duration::from_secs(5 * 60);
/// Network behaviour that handles opening substreams for custom protocols with other nodes.
pub struct CustomProtos<TSubstream> {
/// List of protocols to open with peers. Never modified.
registered_protocols: RegisteredProtocols,
/// List of custom protocols that we have open with remotes.
open_protocols: Vec<(PeerId, ProtocolId)>,
/// List of peer handlers that were enabled, and whether we're dialing or listening.
///
/// Note that it is possible for a peer to be in the shutdown process, in which case it will
/// not be in this list but will be present in `open_protocols`.
/// It is also possible that we have *just* enabled a peer, in which case it will be in this
/// list but not in `open_protocols`.
enabled_peers: FnvHashMap<PeerId, ConnectedPoint>,
/// Maximum number of incoming non-reserved connections, taken from the config. Never modified.
max_incoming_connections: usize,
/// Maximum number of outgoing non-reserved connections, taken from the config. Never modified.
max_outgoing_connections: usize,
/// If true, only reserved peers can connect.
reserved_only: bool,
/// List of the IDs of the reserved peers. We always try to maintain a connection these peers.
reserved_peers: FnvHashSet<PeerId>,
/// List of the IDs of peers that are forbidden, and the moment their ban expires.
banned_peers: Vec<(PeerId, Instant)>,
/// When this delay expires, we need to synchronize our active connectons with the
/// network topology.
next_connect_to_nodes: Delay,
/// Events to produce from `poll()`.
events: SmallVec<[NetworkBehaviourAction<CustomProtosHandlerIn, CustomProtosOut>; 4]>,
/// Marker to pin the generics.
marker: PhantomData<TSubstream>,
}
/// Event that can be emitted by the `CustomProtos`.
#[derive(Debug)]
pub enum CustomProtosOut {
/// Opened a custom protocol with the remote.
CustomProtocolOpen {
/// Identifier of the protocol.
protocol_id: ProtocolId,
/// Version of the protocol that has been opened.
version: u8,
/// Id of the node we have opened a connection with.
peer_id: PeerId,
/// Endpoint used for this custom protocol.
endpoint: ConnectedPoint,
},
/// Closed a custom protocol with the remote.
CustomProtocolClosed {
/// Id of the peer we were connected to.
peer_id: PeerId,
/// Identifier of the protocol.
protocol_id: ProtocolId,
/// Reason why the substream closed. If `Ok`, then it's a graceful exit (EOF).
result: io::Result<()>,
},
/// Receives a message on a custom protocol substream.
CustomMessage {
/// Id of the peer the message came from.
peer_id: PeerId,
/// Protocol which generated the message.
protocol_id: ProtocolId,
/// Data that has been received.
data: Bytes,
},
}
impl<TSubstream> CustomProtos<TSubstream> {
/// Creates a `CustomProtos`.
pub fn new(config: &NetworkConfiguration, registered_protocols: RegisteredProtocols) -> Self {
let max_incoming_connections = config.in_peers as usize;
let max_outgoing_connections = config.out_peers as usize;
// Expected maximum number of connections.
let connec_cap = max_incoming_connections
.saturating_add(max_outgoing_connections)
.saturating_add(4); // We add an arbitrary number for reserved peers slots
// Expected maximum number of substreams.
let open_protos_cap = connec_cap.saturating_mul(registered_protocols.len());
CustomProtos {
registered_protocols,
max_incoming_connections,
max_outgoing_connections,
reserved_only: config.non_reserved_mode == NonReservedPeerMode::Deny,
reserved_peers: Default::default(),
banned_peers: Vec::new(),
open_protocols: Vec::with_capacity(open_protos_cap),
enabled_peers: FnvHashMap::with_capacity_and_hasher(connec_cap, Default::default()),
next_connect_to_nodes: Delay::new(Instant::now()),
events: SmallVec::new(),
marker: PhantomData,
}
}
/// Adds a reserved peer.
pub fn add_reserved_peer(&mut self, peer_id: PeerId) {
self.reserved_peers.insert(peer_id);
// Trigger a `connect_to_nodes` round.
self.next_connect_to_nodes = Delay::new(Instant::now());
}
/// Removes a reserved peer.
///
/// If we are in reserved mode and we were connected to a node with this peer ID, then this
/// method will disconnect it and return its index.
pub fn remove_reserved_peer(&mut self, peer_id: PeerId) {
self.reserved_peers.remove(&peer_id);
}
/// Start accepting all peers again if we weren't.
pub fn accept_unreserved_peers(&mut self) {
if !self.reserved_only {
return
}
self.reserved_only = false;
// Trigger a `connect_to_nodes` round.
self.next_connect_to_nodes = Delay::new(Instant::now());
}
/// Start refusing non-reserved nodes.
pub fn deny_unreserved_peers(&mut self) {
if self.reserved_only {
return
}
self.reserved_only = true;
// Disconnecting nodes that are connected to us and that aren't reserved
let reserved_peers = &mut self.reserved_peers;
let events = &mut self.events;
self.enabled_peers.retain(move |peer_id, _| {
if reserved_peers.contains(peer_id) {
return true
}
events.push(NetworkBehaviourAction::SendEvent {
peer_id: peer_id.clone(),
event: CustomProtosHandlerIn::Disable,
});
false
})
}
/// Disconnects the given peer if we are connected to it.
pub fn disconnect_peer(&mut self, peer: &PeerId) {
if self.enabled_peers.remove(peer).is_some() {
self.events.push(NetworkBehaviourAction::SendEvent {
peer_id: peer.clone(),
event: CustomProtosHandlerIn::Disable,
});
}
}
/// Disconnects the given peer if we are connected to it and disables it for a little while.
pub fn ban_peer(&mut self, peer_id: PeerId) {
// Peer is already banned
if self.banned_peers.iter().any(|(p, _)| p == &peer_id) {
return
}
self.banned_peers.push((peer_id.clone(), Instant::now() + PEER_DISABLE_DURATION));
if self.enabled_peers.remove(&peer_id).is_some() {
self.events.push(NetworkBehaviourAction::SendEvent {
peer_id,
event: CustomProtosHandlerIn::Disable,
});
}
}
/// Sends a message to a peer using the given custom protocol.
///
/// Has no effect if the custom protocol is not open with the given peer.
///
/// Also note that even we have a valid open substream, it may in fact be already closed
/// without us knowing, in which case the packet will not be received.
pub fn send_packet(&mut self, target: &PeerId, protocol_id: ProtocolId, data: impl Into<Bytes>) {
self.events.push(NetworkBehaviourAction::SendEvent {
peer_id: target.clone(),
event: CustomProtosHandlerIn::SendCustomMessage {
protocol: protocol_id,
data: data.into(),
}
});
}
/// Updates the attempted connections to nodes.
///
/// Also updates `next_connect_to_nodes` with the earliest known moment when we need to
/// update connections again.
fn connect_to_nodes(&mut self, params: &mut PollParameters<NetTopology>) {
// Make sure we are connected or connecting to all the reserved nodes.
for reserved in self.reserved_peers.iter() {
// TODO: don't generate an event if we're already in a pending connection (https://github.com/libp2p/rust-libp2p/issues/697)
if !self.enabled_peers.contains_key(&reserved) {
self.events.push(NetworkBehaviourAction::DialPeer { peer_id: reserved.clone() });
}
}
// We're done with reserved node; return early if there's nothing more to do.
if self.reserved_only {
return
}
// Counter of number of connections to open, decreased when we open one.
let mut num_to_open = {
let num_outgoing_connections = self.enabled_peers
.iter()
.filter(|(_, endpoint)| endpoint.is_dialer())
.filter(|(p, _)| !self.reserved_peers.contains(p))
.count();
self.max_outgoing_connections - num_outgoing_connections
};
trace!(target: "sub-libp2p", "Connect-to-nodes round; attempting to fill {:?} slots",
num_to_open);
let local_peer_id = params.local_peer_id().clone();
let (to_try, will_change) = params.topology().addrs_to_attempt();
for (peer_id, _) in to_try {
if num_to_open == 0 {
break
}
if peer_id == &local_peer_id {
continue
}
if let Some((_, ban_end)) = self.banned_peers.iter().find(|(p, _)| p == peer_id) {
if *ban_end > Instant::now() {
continue
}
}
num_to_open -= 1;
self.events.push(NetworkBehaviourAction::DialPeer { peer_id: peer_id.clone() });
}
// Next round is when we expect the topology will change.
self.next_connect_to_nodes.reset(will_change);
}
}
impl<TSubstream> NetworkBehaviour<NetTopology> for CustomProtos<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
{
type ProtocolsHandler = CustomProtosHandler<TSubstream>;
type OutEvent = CustomProtosOut;
fn new_handler(&mut self) -> Self::ProtocolsHandler {
CustomProtosHandler::new(self.registered_protocols.clone())
}
fn inject_connected(&mut self, peer_id: PeerId, endpoint: ConnectedPoint) {
// When a peer connects, its handler is initially in the disabled state. We make sure that
// the peer is allowed, and if so we put it in the enabled state.
let is_reserved = self.reserved_peers.contains(&peer_id);
if self.reserved_only && !is_reserved {
debug!(target: "sub-libp2p", "Ignoring {:?} because we're in reserved mode", peer_id);
return
}
// Check whether peer is banned.
if !is_reserved {
if let Some((_, expire)) = self.banned_peers.iter().find(|(p, _)| p == &peer_id) {
if *expire >= Instant::now() {
debug!(target: "sub-libp2p", "Ignoring banned peer {:?}", peer_id);
return
}
}
}
// Check the limits on the ingoing and outgoing connections.
match endpoint {
ConnectedPoint::Dialer { .. } => {
let num_outgoing = self.enabled_peers.iter()
.filter(|(_, e)| e.is_dialer())
.filter(|(p, _)| !self.reserved_peers.contains(p))
.count();
debug_assert!(num_outgoing <= self.max_outgoing_connections);
if num_outgoing == self.max_outgoing_connections {
return
}
}
ConnectedPoint::Listener { .. } => {
let num_ingoing = self.enabled_peers.iter()
.filter(|(_, e)| e.is_listener())
.filter(|(p, _)| !self.reserved_peers.contains(p))
.count();
debug_assert!(num_ingoing <= self.max_incoming_connections);
if num_ingoing == self.max_incoming_connections {
debug!(target: "sub-libp2p", "Ignoring incoming connection from {:?} because \
we're full", peer_id);
return
}
}
}
// If everything is fine, enable the node.
debug_assert!(!self.enabled_peers.contains_key(&peer_id));
// We ask the handler to actively open substreams only if we are the dialer; otherwise
// the two nodes will race to be the first to open the unique allowed substream.
if endpoint.is_dialer() {
trace!(target: "sub-libp2p", "Enabling custom protocols with {:?} (active)", peer_id);
self.events.push(NetworkBehaviourAction::SendEvent {
peer_id: peer_id.clone(),
event: CustomProtosHandlerIn::EnableActive,
});
} else {
trace!(target: "sub-libp2p", "Enabling custom protocols with {:?} (passive)", peer_id);
self.events.push(NetworkBehaviourAction::SendEvent {
peer_id: peer_id.clone(),
event: CustomProtosHandlerIn::EnablePassive,
});
}
self.enabled_peers.insert(peer_id, endpoint);
}
fn inject_disconnected(&mut self, peer_id: &PeerId, _: ConnectedPoint) {
while let Some(pos) = self.open_protocols.iter().position(|(p, _)| p == peer_id) {
let (_, protocol_id) = self.open_protocols.remove(pos);
let event = CustomProtosOut::CustomProtocolClosed {
protocol_id,
peer_id: peer_id.clone(),
result: Ok(()),
};
self.events.push(NetworkBehaviourAction::GenerateEvent(event));
// Trigger a `connect_to_nodes` round.
self.next_connect_to_nodes = Delay::new(Instant::now());
}
self.enabled_peers.remove(peer_id);
}
fn inject_node_event(
&mut self,
source: PeerId,
event: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent,
) {
match event {
CustomProtosHandlerOut::CustomProtocolClosed { protocol_id, result } => {
let pos = self.open_protocols.iter().position(|(s, p)|
s == &source && p == &protocol_id
);
if let Some(pos) = pos {
self.open_protocols.remove(pos);
} else {
debug_assert!(false, "Couldn't find protocol in open_protocols");
}
let event = CustomProtosOut::CustomProtocolClosed {
protocol_id,
result,
peer_id: source,
};
self.events.push(NetworkBehaviourAction::GenerateEvent(event));
}
CustomProtosHandlerOut::CustomProtocolOpen { protocol_id, version } => {
debug_assert!(!self.open_protocols.iter().any(|(s, p)|
s == &source && p == &protocol_id
));
self.open_protocols.push((source.clone(), protocol_id));
if let Some(address) = self.enabled_peers.get(&source) {
let event = CustomProtosOut::CustomProtocolOpen {
protocol_id,
version,
peer_id: source,
endpoint: address.clone()
};
self.events.push(NetworkBehaviourAction::GenerateEvent(event));
}
}
CustomProtosHandlerOut::CustomMessage { protocol_id, data } => {
let event = CustomProtosOut::CustomMessage {
peer_id: source,
protocol_id,
data,
};
self.events.push(NetworkBehaviourAction::GenerateEvent(event));
}
}
}
fn poll(
&mut self,
params: &mut PollParameters<NetTopology>,
) -> Async<
NetworkBehaviourAction<
<Self::ProtocolsHandler as ProtocolsHandler>::InEvent,
Self::OutEvent,
>,
> {
loop {
match self.next_connect_to_nodes.poll() {
Ok(Async::Ready(())) => self.connect_to_nodes(params),
Ok(Async::NotReady) => break,
Err(err) => {
warn!(target: "sub-libp2p", "Connect-to-nodes timer errored: {:?}", err);
break
}
}
}
// Clean up `banned_peers`
self.banned_peers.retain(|(_, end)| *end < Instant::now());
self.banned_peers.shrink_to_fit();
if !self.events.is_empty() {
return Async::Ready(self.events.remove(0))
}
Async::NotReady
}
}
@@ -0,0 +1,327 @@
// Copyright 2019 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use crate::ProtocolId;
use crate::custom_proto::upgrade::{RegisteredProtocol, RegisteredProtocols, RegisteredProtocolSubstream};
use bytes::Bytes;
use futures::prelude::*;
use libp2p::core::{
Endpoint, ProtocolsHandler, ProtocolsHandlerEvent,
protocols_handler::ProtocolsHandlerUpgrErr,
upgrade::{InboundUpgrade, OutboundUpgrade}
};
use smallvec::SmallVec;
use std::{fmt, io};
use tokio_io::{AsyncRead, AsyncWrite};
use void::Void;
/// Protocol handler that tries to maintain one substream per registered custom protocol.
///
/// The handler initially starts in the "Disable" state. It can then be enabled by sending an
/// `Enable` message.
/// The handler can then be enabled and disabled at any time with the `Enable` and `Disable`
/// messages.
pub struct CustomProtosHandler<TSubstream> {
/// List of all the protocols we support.
protocols: RegisteredProtocols,
/// See the documentation of `State`.
state: State,
/// The active substreams. There should always ever be only one substream per protocol.
substreams: SmallVec<[RegisteredProtocolSubstream<TSubstream>; 6]>,
/// Queue of events to send to the outside.
events_queue: SmallVec<[ProtocolsHandlerEvent<RegisteredProtocol, (), CustomProtosHandlerOut>; 16]>,
}
/// State of the handler.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
enum State {
/// Normal functionning.
Normal,
/// We are disabled. We close existing substreams and refuse incoming connections, but don't
/// shut down the entire handler.
Disabled,
/// We are trying to shut down the existing node and thus should refuse any incoming
/// connection.
ShuttingDown,
}
/// Event that can be received by a `CustomProtosHandler`.
#[derive(Debug)]
pub enum CustomProtosHandlerIn {
/// The node should start using custom protocols and actively open substreams.
EnableActive,
/// The node should listen to custom protocols but not open substreams.
EnablePassive,
/// The node should stop using custom protocols.
Disable,
/// Sends a message through a custom protocol substream.
SendCustomMessage {
/// The protocol to use.
protocol: ProtocolId,
/// The data to send.
data: Bytes,
},
}
/// Event that can be emitted by a `CustomProtosHandler`.
#[derive(Debug)]
pub enum CustomProtosHandlerOut {
/// Opened a custom protocol with the remote.
CustomProtocolOpen {
/// Identifier of the protocol.
protocol_id: ProtocolId,
/// Version of the protocol that has been opened.
version: u8,
},
/// Closed a custom protocol with the remote.
CustomProtocolClosed {
/// Identifier of the protocol.
protocol_id: ProtocolId,
/// Reason why the substream closed. If `Ok`, then it's a graceful exit (EOF).
result: io::Result<()>,
},
/// Receives a message on a custom protocol substream.
CustomMessage {
/// Protocol which generated the message.
protocol_id: ProtocolId,
/// Data that has been received.
data: Bytes,
},
}
impl<TSubstream> CustomProtosHandler<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
{
/// Builds a new `CustomProtosHandler`.
pub fn new(protocols: RegisteredProtocols) -> Self {
CustomProtosHandler {
protocols,
state: State::Disabled,
substreams: SmallVec::new(),
events_queue: SmallVec::new(),
}
}
/// Called by `inject_fully_negotiated_inbound` and `inject_fully_negotiated_outbound`.
fn inject_fully_negotiated(
&mut self,
proto: RegisteredProtocolSubstream<TSubstream>,
_: Endpoint,
) {
match self.state {
State::Disabled | State::ShuttingDown => return,
State::Normal => ()
}
if self.substreams.iter().any(|p| p.protocol_id() == proto.protocol_id()) {
// Skipping protocol that's already open.
return
}
let event = CustomProtosHandlerOut::CustomProtocolOpen {
protocol_id: proto.protocol_id(),
version: proto.protocol_version(),
};
self.substreams.push(proto);
self.events_queue.push(ProtocolsHandlerEvent::Custom(event));
}
}
impl<TSubstream> ProtocolsHandler for CustomProtosHandler<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
{
type InEvent = CustomProtosHandlerIn;
type OutEvent = CustomProtosHandlerOut;
type Substream = TSubstream;
type Error = Void;
type InboundProtocol = RegisteredProtocols;
type OutboundProtocol = RegisteredProtocol;
type OutboundOpenInfo = ();
#[inline]
fn listen_protocol(&self) -> Self::InboundProtocol {
self.protocols.clone()
}
fn inject_fully_negotiated_inbound(
&mut self,
proto: <Self::InboundProtocol as InboundUpgrade<TSubstream>>::Output
) {
self.inject_fully_negotiated(proto, Endpoint::Listener);
}
#[inline]
fn inject_fully_negotiated_outbound(
&mut self,
proto: <Self::OutboundProtocol as OutboundUpgrade<TSubstream>>::Output,
_: Self::OutboundOpenInfo
) {
self.inject_fully_negotiated(proto, Endpoint::Dialer);
}
fn inject_event(&mut self, message: CustomProtosHandlerIn) {
match message {
CustomProtosHandlerIn::Disable => {
match self.state {
State::Normal => self.state = State::Disabled,
State::Disabled | State::ShuttingDown => (),
}
for substream in self.substreams.iter_mut() {
substream.shutdown();
}
},
CustomProtosHandlerIn::EnableActive | CustomProtosHandlerIn::EnablePassive => {
match self.state {
State::Disabled => self.state = State::Normal,
State::Normal | State::ShuttingDown => (),
}
// Try open one substream for each registered protocol.
if let CustomProtosHandlerIn::EnableActive = message {
for protocol in self.protocols.0.iter() {
if self.substreams.iter().any(|p| p.protocol_id() == protocol.id()) {
// Skipping protocol that's already open.
continue
}
self.events_queue.push(ProtocolsHandlerEvent::OutboundSubstreamRequest {
upgrade: protocol.clone(),
info: (),
});
}
}
},
CustomProtosHandlerIn::SendCustomMessage { protocol, data } => {
debug_assert!(self.protocols.has_protocol(protocol),
"invalid protocol id requested in the API of the libp2p networking");
let proto = match self.substreams.iter_mut().find(|p| p.protocol_id() == protocol) {
Some(proto) => proto,
None => {
// We are processing a message event before we could report to the outside
// that we disconnected from the protocol. This is not an error.
trace!(target: "sub-libp2p", "Tried to send message through closed \
protocol");
return
},
};
proto.send_message(data);
},
}
}
#[inline]
fn inject_inbound_closed(&mut self) {}
#[inline]
fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, err: ProtocolsHandlerUpgrErr<io::Error>) {
warn!(target: "sub-libp2p", "Error while opening custom protocol: {:?}", err);
}
#[inline]
fn connection_keep_alive(&self) -> bool {
// Right now if the remote doesn't support one of the custom protocols, we shut down the
// entire connection. This is a hack-ish solution to the problem where we connect to nodes
// that support libp2p but not the testnet that we want.
self.substreams.len() == self.protocols.len()
}
fn shutdown(&mut self) {
match self.state {
State::Normal | State::Disabled => self.state = State::ShuttingDown,
State::ShuttingDown => (),
}
for substream in self.substreams.iter_mut() {
substream.shutdown();
}
}
fn poll(
&mut self,
) -> Poll<
ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent>,
Self::Error,
> {
if !self.events_queue.is_empty() {
let event = self.events_queue.remove(0);
return Ok(Async::Ready(event))
}
if self.state == State::ShuttingDown && self.substreams.is_empty() {
return Ok(Async::Ready(ProtocolsHandlerEvent::Shutdown))
}
for n in (0..self.substreams.len()).rev() {
let mut substream = self.substreams.swap_remove(n);
match substream.poll() {
Ok(Async::Ready(Some(data))) => {
let event = CustomProtosHandlerOut::CustomMessage {
protocol_id: substream.protocol_id(),
data
};
self.substreams.push(substream);
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(event)))
},
Ok(Async::NotReady) =>
self.substreams.push(substream),
Ok(Async::Ready(None)) => {
let event = CustomProtosHandlerOut::CustomProtocolClosed {
protocol_id: substream.protocol_id(),
result: Ok(())
};
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(event)))
},
Err(err) => {
let event = CustomProtosHandlerOut::CustomProtocolClosed {
protocol_id: substream.protocol_id(),
result: Err(err)
};
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(event)))
},
}
}
Ok(Async::NotReady)
}
}
impl<TSubstream> fmt::Debug for CustomProtosHandler<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
{
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
f.debug_struct("CustomProtosHandler")
.field("protocols", &self.protocols.len())
.field("state", &self.state)
.field("substreams", &self.substreams.len())
.finish()
}
}
@@ -0,0 +1,22 @@
// Copyright 2019 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
pub use self::behaviour::{CustomProtos, CustomProtosOut};
pub use self::upgrade::{RegisteredProtocol, RegisteredProtocols};
mod behaviour;
mod handler;
mod upgrade;
@@ -15,10 +15,10 @@
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use bytes::Bytes;
use libp2p::core::{ConnectionUpgrade, Endpoint};
use libp2p::core::{UpgradeInfo, InboundUpgrade, OutboundUpgrade, upgrade::ProtocolName};
use libp2p::tokio_codec::Framed;
use std::{collections::VecDeque, io, vec::IntoIter as VecIntoIter};
use futures::{prelude::*, future, stream, task};
use futures::{prelude::*, future, stream};
use tokio_io::{AsyncRead, AsyncWrite};
use unsigned_varint::codec::UviBytes;
use ProtocolId;
@@ -80,8 +80,6 @@ pub struct RegisteredProtocolSubstream<TSubstream> {
protocol_id: ProtocolId,
/// Version of the protocol that was negotiated.
protocol_version: u8,
/// Task to notify when something is changed and we need to be polled.
to_notify: Option<task::Task>,
}
impl<TSubstream> RegisteredProtocolSubstream<TSubstream> {
@@ -105,9 +103,6 @@ impl<TSubstream> RegisteredProtocolSubstream<TSubstream> {
/// After calling this, the stream is guaranteed to finish soon-ish.
pub fn shutdown(&mut self) {
self.is_closing = true;
if let Some(task) = self.to_notify.take() {
task.notify();
}
}
/// Sends a message to the substream.
@@ -119,10 +114,6 @@ impl<TSubstream> RegisteredProtocolSubstream<TSubstream> {
warn!(target: "sub-libp2p", "Queue of packets to send over substream is pretty \
large: {}", self.send_queue.len());
}
if let Some(task) = self.to_notify.take() {
task.notify();
}
}
}
@@ -135,7 +126,7 @@ where TSubstream: AsyncRead + AsyncWrite,
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
// If we are closing, close as soon as the Sink is closed.
if self.is_closing {
return Ok(self.inner.close()?.map(|()| None));
return Ok(self.inner.close()?.map(|()| None))
}
// Flushing the local queue.
@@ -143,7 +134,7 @@ where TSubstream: AsyncRead + AsyncWrite,
match self.inner.start_send(packet)? {
AsyncSink::NotReady(packet) => {
self.send_queue.push_front(packet);
break;
break
},
AsyncSink::Ready => self.requires_poll_complete = true,
}
@@ -158,51 +149,64 @@ where TSubstream: AsyncRead + AsyncWrite,
// Receiving incoming packets.
// Note that `inner` is wrapped in a `Fuse`, therefore we can poll it forever.
loop {
match self.inner.poll()? {
Async::Ready(Some(data)) =>
return Ok(Async::Ready(Some(data.freeze()))),
Async::Ready(None) =>
if !self.requires_poll_complete && self.send_queue.is_empty() {
return Ok(Async::Ready(None))
} else {
break
},
Async::NotReady => break,
}
match self.inner.poll()? {
Async::Ready(Some(data)) => Ok(Async::Ready(Some(data.freeze()))),
Async::Ready(None) =>
if !self.requires_poll_complete && self.send_queue.is_empty() {
Ok(Async::Ready(None))
} else {
Ok(Async::NotReady)
},
Async::NotReady => Ok(Async::NotReady),
}
self.to_notify = Some(task::current());
Ok(Async::NotReady)
}
}
impl<TSubstream> ConnectionUpgrade<TSubstream> for RegisteredProtocol
where TSubstream: AsyncRead + AsyncWrite,
{
type NamesIter = VecIntoIter<(Bytes, Self::UpgradeIdentifier)>;
type UpgradeIdentifier = u8; // Protocol version
impl UpgradeInfo for RegisteredProtocol {
type Info = RegisteredProtocolName;
type InfoIter = VecIntoIter<Self::Info>;
#[inline]
fn protocol_names(&self) -> Self::NamesIter {
fn protocol_info(&self) -> Self::InfoIter {
// Report each version as an individual protocol.
self.supported_versions.iter().map(|&ver| {
let num = ver.to_string();
self.supported_versions.iter().map(|&version| {
let num = version.to_string();
let mut name = self.base_name.clone();
name.extend_from_slice(num.as_bytes());
(name, ver)
RegisteredProtocolName {
name,
version,
}
}).collect::<Vec<_>>().into_iter()
}
}
/// Implementation of `ProtocolName` for a custom protocol.
#[derive(Debug, Clone)]
pub struct RegisteredProtocolName {
/// Protocol name, as advertised on the wire.
name: Bytes,
/// Version number. Stored in string form in `name`, but duplicated here for easier retrieval.
version: u8,
}
impl ProtocolName for RegisteredProtocolName {
fn protocol_name(&self) -> &[u8] {
&self.name
}
}
impl<TSubstream> InboundUpgrade<TSubstream> for RegisteredProtocol
where TSubstream: AsyncRead + AsyncWrite,
{
type Output = RegisteredProtocolSubstream<TSubstream>;
type Future = future::FutureResult<Self::Output, io::Error>;
type Error = io::Error;
#[allow(deprecated)]
fn upgrade(
fn upgrade_inbound(
self,
socket: TSubstream,
protocol_version: Self::UpgradeIdentifier,
_: Endpoint
info: Self::Info,
) -> Self::Future {
let framed = Framed::new(socket, UviBytes::default());
@@ -212,12 +216,28 @@ where TSubstream: AsyncRead + AsyncWrite,
requires_poll_complete: false,
inner: framed.fuse(),
protocol_id: self.id,
protocol_version,
to_notify: None,
protocol_version: info.version,
})
}
}
impl<TSubstream> OutboundUpgrade<TSubstream> for RegisteredProtocol
where TSubstream: AsyncRead + AsyncWrite,
{
type Output = <Self as InboundUpgrade<TSubstream>>::Output;
type Future = <Self as InboundUpgrade<TSubstream>>::Future;
type Error = <Self as InboundUpgrade<TSubstream>>::Error;
fn upgrade_outbound(
self,
socket: TSubstream,
info: Self::Info,
) -> Self::Future {
// Upgrades are symmetrical.
self.upgrade_inbound(socket, info)
}
}
// Connection upgrade for all the protocols contained in it.
#[derive(Clone)]
pub struct RegisteredProtocols(pub Vec<RegisteredProtocol>);
@@ -229,12 +249,6 @@ impl RegisteredProtocols {
self.0.len()
}
/// Finds a protocol in the list by its id.
pub fn find_protocol(&self, protocol: ProtocolId)
-> Option<&RegisteredProtocol> {
self.0.iter().find(|p| p.id == protocol)
}
/// Returns true if the given protocol is in the list.
pub fn has_protocol(&self, protocol: ProtocolId) -> bool {
self.0.iter().any(|p| p.id == protocol)
@@ -247,36 +261,75 @@ impl Default for RegisteredProtocols {
}
}
impl<TSubstream> ConnectionUpgrade<TSubstream> for RegisteredProtocols
where TSubstream: AsyncRead + AsyncWrite,
{
type NamesIter = VecIntoIter<(Bytes, Self::UpgradeIdentifier)>;
type UpgradeIdentifier = (usize,
<RegisteredProtocol as ConnectionUpgrade<TSubstream>>::UpgradeIdentifier);
impl UpgradeInfo for RegisteredProtocols {
type Info = RegisteredProtocolsName;
type InfoIter = VecIntoIter<Self::Info>;
fn protocol_names(&self) -> Self::NamesIter {
#[inline]
fn protocol_info(&self) -> Self::InfoIter {
// We concat the lists of `RegisteredProtocol::protocol_names` for
// each protocol.
self.0.iter().enumerate().flat_map(|(n, proto)|
ConnectionUpgrade::<TSubstream>::protocol_names(proto)
.map(move |(name, id)| (name, (n, id)))
UpgradeInfo::protocol_info(proto)
.map(move |inner| {
RegisteredProtocolsName {
inner,
index: n,
}
})
).collect::<Vec<_>>().into_iter()
}
}
type Output = <RegisteredProtocol as ConnectionUpgrade<TSubstream>>::Output;
type Future = <RegisteredProtocol as ConnectionUpgrade<TSubstream>>::Future;
/// Implementation of `ProtocolName` for several custom protocols.
#[derive(Debug, Clone)]
pub struct RegisteredProtocolsName {
/// Inner registered protocol.
inner: RegisteredProtocolName,
/// Index of the protocol in the list of registered custom protocols.
index: usize,
}
#[inline]
fn upgrade(
self,
socket: TSubstream,
upgrade_identifier: Self::UpgradeIdentifier,
endpoint: Endpoint
) -> Self::Future {
let (protocol_index, inner_proto_id) = upgrade_identifier;
self.0.into_iter()
.nth(protocol_index)
.expect("invalid protocol index ; programmer logic error")
.upgrade(socket, inner_proto_id, endpoint)
impl ProtocolName for RegisteredProtocolsName {
fn protocol_name(&self) -> &[u8] {
self.inner.protocol_name()
}
}
impl<TSubstream> InboundUpgrade<TSubstream> for RegisteredProtocols
where TSubstream: AsyncRead + AsyncWrite,
{
type Output = <RegisteredProtocol as InboundUpgrade<TSubstream>>::Output;
type Future = <RegisteredProtocol as InboundUpgrade<TSubstream>>::Future;
type Error = io::Error;
#[inline]
fn upgrade_inbound(
self,
socket: TSubstream,
info: Self::Info,
) -> Self::Future {
self.0.into_iter()
.nth(info.index)
.expect("invalid protocol index ; programmer logic error")
.upgrade_inbound(socket, info.inner)
}
}
impl<TSubstream> OutboundUpgrade<TSubstream> for RegisteredProtocols
where TSubstream: AsyncRead + AsyncWrite,
{
type Output = <Self as InboundUpgrade<TSubstream>>::Output;
type Future = <Self as InboundUpgrade<TSubstream>>::Future;
type Error = <Self as InboundUpgrade<TSubstream>>::Error;
#[inline]
fn upgrade_outbound(
self,
socket: TSubstream,
info: Self::Info,
) -> Self::Future {
// Upgrades are symmetrical.
self.upgrade_inbound(socket, info)
}
}
+4 -3
View File
@@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
//! Substrate libp2p implementation of the ethcore network library
//! Networking layer of Substrate.
#![recursion_limit = "128"]
@@ -31,6 +31,8 @@ extern crate rand;
#[macro_use]
extern crate serde_derive;
extern crate serde_json;
extern crate smallvec;
extern crate void;
extern crate bytes;
extern crate unsigned_varint;
@@ -41,12 +43,11 @@ extern crate log;
#[cfg(test)] #[macro_use]
extern crate assert_matches;
mod behaviour;
mod custom_proto;
mod error;
mod node_handler;
mod secret;
mod service_task;
mod swarm;
mod topology;
mod traits;
mod transport;
@@ -1,863 +0,0 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use bytes::Bytes;
use custom_proto::{RegisteredProtocols, RegisteredProtocolSubstream};
use futures::{prelude::*, task};
use libp2p::core::{ConnectionUpgrade, Endpoint, PeerId, PublicKey, upgrade};
use libp2p::core::nodes::handled_node::{NodeHandler, NodeHandlerEndpoint, NodeHandlerEvent};
use libp2p::kad::{KadConnecConfig, KadFindNodeRespond, KadIncomingRequest, KadConnecController};
use libp2p::{identify, ping};
use parking_lot::Mutex;
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_timer::{Delay, Interval};
use {Multiaddr, ProtocolId};
/// Duration after which we consider that a ping failed.
const PING_TIMEOUT: Duration = Duration::from_secs(30);
/// After a ping succeeded, wait this long before the next ping.
const DELAY_TO_NEXT_PING: Duration = Duration::from_secs(15);
/// Period at which we identify the remote.
const PERIOD_IDENTIFY: Duration = Duration::from_secs(5 * 60);
/// Delay between the moment we connect and the first time we ping.
const DELAY_TO_FIRST_PING: Duration = Duration::from_secs(5);
/// Delay between the moment we connect and the first time we identify.
const DELAY_TO_FIRST_IDENTIFY: Duration = Duration::from_secs(2);
/// This struct handles the open substreams of a specific node.
///
/// It doesn't handle opening the substreams, but only what to do with substreams that have been
/// opened.
///
/// The node will be pinged at a regular interval to determine whether it's still alive. We will
/// also regularly query the remote for identification information, for statistics purposes.
pub struct SubstrateNodeHandler<TSubstream> {
/// List of registered custom protocols.
registered_custom: Arc<RegisteredProtocols>,
/// Substreams open for "custom" protocols (eg. dot).
custom_protocols_substreams: Vec<RegisteredProtocolSubstream<TSubstream>>,
/// Substream open for Kademlia, if any.
kademlia_substream: Option<(KadConnecController, Box<Stream<Item = KadIncomingRequest, Error = IoError> + Send>)>,
/// If true, we need to send back a `KadOpen` event on the stream (if Kademlia is open).
need_report_kad_open: bool,
/// Substream open for sending pings, if any.
ping_out_substream: Option<ping::protocol::PingDialer<TSubstream, Instant>>,
/// Active pinging attempt with the moment it expires.
active_ping_out: Option<Delay>,
/// Substreams open for receiving pings.
ping_in_substreams: Vec<ping::protocol::PingListener<TSubstream>>,
/// Future that fires when we need to ping the node again.
///
/// Every time we receive a pong, we reset the timer to the next time.
next_ping: Delay,
/// Substreams for sending back our identify info to the remote.
///
/// This is in an `Arc` in order to avoid borrowing issues with the future.
identify_send_back: Arc<Mutex<Vec<Box<Future<Item = (), Error = IoError> + Send>>>>,
/// Stream that fires when we need to identify the node again.
next_identify: Interval,
/// Substreams being upgraded on the listening side.
upgrades_in_progress_listen: Vec<Box<Future<Item = FinalUpgrade<TSubstream>, Error = IoError> + Send>>,
/// Substreams being upgraded on the dialing side. Contrary to `upgrades_in_progress_listen`,
/// these have a known purpose.
upgrades_in_progress_dial: Vec<(UpgradePurpose, Box<Future<Item = FinalUpgrade<TSubstream>, Error = IoError> + Send>)>,
/// The substreams we want to open.
queued_dial_upgrades: Vec<UpgradePurpose>,
/// Number of outbound substreams the outside should open for us.
num_out_user_must_open: usize,
/// The node has started its shutdown process.
is_shutting_down: bool,
/// Task to notify if we add an element to one of the lists from the public API.
to_notify: Option<task::Task>,
}
/// Purpose of an upgrade in progress on the dialing side.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
enum UpgradePurpose {
Custom(ProtocolId),
Kad,
Identify,
Ping,
}
/// Event that can happen on the `SubstrateNodeHandler`.
pub enum SubstrateOutEvent<TSubstream> {
/// The node has been determined to be unresponsive.
Unresponsive,
/// The node works but we can't do anything useful with it.
Useless,
/// Started pinging the remote. This can be used to print a diagnostic message in the logs.
PingStart,
/// The node has successfully responded to a ping.
PingSuccess(Duration),
/// Opened a custom protocol with the remote.
CustomProtocolOpen {
/// Identifier of the protocol.
protocol_id: ProtocolId,
/// Version of the protocol that has been opened.
version: u8,
},
/// Closed a custom protocol with the remote.
CustomProtocolClosed {
/// Identifier of the protocol.
protocol_id: ProtocolId,
/// Reason why the substream closed. If `Ok`, then it's a graceful exit (EOF).
result: Result<(), IoError>,
},
/// Receives a message on a custom protocol substream.
CustomMessage {
/// Protocol which generated the message.
protocol_id: ProtocolId,
/// Data that has been received.
data: Bytes,
},
/// We obtained identification information from the remote
Identified {
/// Information of the remote.
info: identify::IdentifyInfo,
/// Address the remote observes us as.
observed_addr: Multiaddr,
},
/// The remote wants us to send back identification information.
///
/// The `IdentificationRequest` object should be used to send the information.
IdentificationRequest(IdentificationRequest<TSubstream>),
/// Opened a Kademlia substream with the node.
KadOpen(KadConnecController),
/// The remote wants us to answer a Kademlia `FIND_NODE` request.
///
/// The `responder` should be used to answer that query.
// TODO: this API with the "responder" is bad, but changing it requires modifications in libp2p
KadFindNode {
/// The value being searched.
searched: PeerId,
/// Object to use to respond to the request.
responder: KadFindNodeRespond,
},
/// The Kademlia substream has been closed.
///
/// The parameter contains the reason why it has been closed. `Ok` means that it's been closed
/// gracefully.
KadClosed(Result<(), IoError>),
/// An error happened while upgrading a substream.
///
/// This can be used to print a diagnostic message.
SubstreamUpgradeFail(IoError),
}
/// The remote wants us to send back information.
pub struct IdentificationRequest<TSubstream> {
/// Where to store the future that sends back the information.
identify_send_back: Arc<Mutex<Vec<Box<Future<Item = (), Error = IoError> + Send>>>>,
/// Object that sends back the information.
sender: identify::IdentifySender<TSubstream>,
/// Protocol names that we support, to send back.
protocols: Vec<String>,
}
impl<TSubstream> IdentificationRequest<TSubstream> {
/// Responds to the request.
///
/// - `local_key` must contain our local public key.
/// - `listen_addrs` must contain the list of addresses we're listening on (preferably after
/// NAT traversal).
/// - `remote_addr` must be the address of the remote from our local point of view.
///
pub fn respond(
self,
local_key: PublicKey,
listen_addrs: Vec<Multiaddr>,
remote_addr: &Multiaddr
) where TSubstream: AsyncRead + AsyncWrite + Send + 'static {
// TODO: what to return for `protocol_version` and `agent_version`?
let sender = self.sender.send(
identify::IdentifyInfo {
public_key: local_key,
protocol_version: concat!("substrate/", env!("CARGO_PKG_VERSION")).to_owned(),
agent_version: concat!("substrate/", env!("CARGO_PKG_VERSION")).to_owned(),
listen_addrs,
protocols: self.protocols,
},
remote_addr
);
self.identify_send_back.lock().push(sender);
}
}
/// Event that can be received by a `SubstrateNodeHandler`.
#[derive(Debug, Clone)]
pub enum SubstrateInEvent {
/// Before anything happens on the node, we wait for an `Accept` event. This is used to deny
/// nodes based on their peer ID.
Accept,
/// Sends a message through a custom protocol substream.
SendCustomMessage {
protocol: ProtocolId,
data: Vec<u8>,
},
/// Requests to open a Kademlia substream.
// TODO: document better
OpenKademlia,
}
/// Ideally we would have a method on `SubstrateNodeHandler` that builds this type, but in practice it's a
/// bit tedious to express, even with the `impl Trait` syntax.
/// Therefore we simply use a macro instead.
macro_rules! listener_upgrade {
($self:expr) => (
upgrade::or(upgrade::or(upgrade::or(
upgrade::map((*$self.registered_custom).clone(), move |c| FinalUpgrade::Custom(c)),
upgrade::map(KadConnecConfig::new(), move |(c, s)| FinalUpgrade::Kad(c, s))),
upgrade::map(ping::protocol::Ping::default(), move |p| FinalUpgrade::from(p))),
upgrade::map(identify::IdentifyProtocolConfig, move |i| FinalUpgrade::from(i)))
// TODO: meh for cloning a Vec here
)
}
impl<TSubstream> SubstrateNodeHandler<TSubstream>
where TSubstream: AsyncRead + AsyncWrite + Send + 'static,
{
/// Creates a new node handler.
#[inline]
pub fn new(registered_custom: Arc<RegisteredProtocols>) -> Self {
let registered_custom_len = registered_custom.len();
let queued_dial_upgrades = registered_custom.0
.iter()
.map(|proto| UpgradePurpose::Custom(proto.id()))
.collect();
SubstrateNodeHandler {
custom_protocols_substreams: Vec::with_capacity(registered_custom_len),
kademlia_substream: None,
need_report_kad_open: false,
identify_send_back: Arc::new(Mutex::new(Vec::with_capacity(1))),
ping_in_substreams: Vec::with_capacity(1),
ping_out_substream: None,
active_ping_out: None,
registered_custom,
upgrades_in_progress_listen: Vec::with_capacity(registered_custom_len + 3),
upgrades_in_progress_dial: Vec::with_capacity(registered_custom_len + 3),
next_ping: Delay::new(Instant::now() + DELAY_TO_FIRST_PING),
next_identify: Interval::new(Instant::now() + DELAY_TO_FIRST_IDENTIFY, PERIOD_IDENTIFY),
queued_dial_upgrades,
num_out_user_must_open: registered_custom_len,
is_shutting_down: false,
to_notify: None,
}
}
}
impl<TSubstream> NodeHandler for SubstrateNodeHandler<TSubstream>
where TSubstream: AsyncRead + AsyncWrite + Send + 'static,
{
type InEvent = SubstrateInEvent;
type OutEvent = SubstrateOutEvent<TSubstream>;
type OutboundOpenInfo = ();
type Substream = TSubstream;
fn inject_substream(&mut self, substream: TSubstream, endpoint: NodeHandlerEndpoint<Self::OutboundOpenInfo>) {
// For listeners, propose all the possible upgrades.
if endpoint == NodeHandlerEndpoint::Listener {
let listener_upgrade = listener_upgrade!(self);
let upgrade = upgrade::apply(substream, listener_upgrade, Endpoint::Listener);
self.upgrades_in_progress_listen.push(Box::new(upgrade) as Box<_>);
// Since we pushed to `upgrades_in_progress_listen`, we have to notify the task.
if let Some(task) = self.to_notify.take() {
task.notify();
}
return;
}
// If we're the dialer, we have to decide which upgrade we want.
let purpose = if self.queued_dial_upgrades.is_empty() {
// Since we sometimes remove elements from `queued_dial_upgrades` before they succeed
// but after the outbound substream has started opening, it is possible that the queue
// is empty when we receive a substream. This is not an error.
// Example: we want to open a Kademlia substream, we start opening one, but in the
// meanwhile the remote opens a Kademlia substream. When we receive the new substream,
// we don't need it anymore.
return;
} else {
self.queued_dial_upgrades.remove(0)
};
match purpose {
UpgradePurpose::Custom(id) => {
let wanted = if let Some(proto) = self.registered_custom.find_protocol(id) {
// TODO: meh for cloning
upgrade::map(proto.clone(), move |c| FinalUpgrade::Custom(c))
} else {
error!(target: "sub-libp2p", "Logic error: wrong custom protocol id for \
opened substream");
return;
};
let upgrade = upgrade::apply(substream, wanted, Endpoint::Dialer);
self.upgrades_in_progress_dial.push((purpose, Box::new(upgrade) as Box<_>));
}
UpgradePurpose::Kad => {
let wanted = upgrade::map(KadConnecConfig::new(), move |(c, s)| FinalUpgrade::Kad(c, s));
let upgrade = upgrade::apply(substream, wanted, Endpoint::Dialer);
self.upgrades_in_progress_dial.push((purpose, Box::new(upgrade) as Box<_>));
}
UpgradePurpose::Identify => {
let wanted = upgrade::map(identify::IdentifyProtocolConfig, move |i| FinalUpgrade::from(i));
let upgrade = upgrade::apply(substream, wanted, Endpoint::Dialer);
self.upgrades_in_progress_dial.push((purpose, Box::new(upgrade) as Box<_>));
}
UpgradePurpose::Ping => {
let wanted = upgrade::map(ping::protocol::Ping::default(), move |p| FinalUpgrade::from(p));
let upgrade = upgrade::apply(substream, wanted, Endpoint::Dialer);
self.upgrades_in_progress_dial.push((purpose, Box::new(upgrade) as Box<_>));
}
};
// Since we pushed to `upgrades_in_progress_dial`, we have to notify the task.
if let Some(task) = self.to_notify.take() {
task.notify();
}
}
#[inline]
fn inject_inbound_closed(&mut self) {
}
#[inline]
fn inject_outbound_closed(&mut self, _: Self::OutboundOpenInfo) {
}
fn inject_event(&mut self, event: Self::InEvent) {
match event {
SubstrateInEvent::SendCustomMessage { protocol, data } => {
self.send_custom_message(protocol, data);
},
SubstrateInEvent::OpenKademlia => self.open_kademlia(),
SubstrateInEvent::Accept => {
// TODO: implement
},
}
}
fn shutdown(&mut self) {
// TODO: close gracefully
self.is_shutting_down = true;
for custom_proto in &mut self.custom_protocols_substreams {
custom_proto.shutdown();
}
if let Some(to_notify) = self.to_notify.take() {
to_notify.notify();
}
}
fn poll(&mut self) -> Poll<Option<NodeHandlerEvent<Self::OutboundOpenInfo, Self::OutEvent>>, IoError> {
if self.is_shutting_down {
// TODO: finish only when everything is closed
return Ok(Async::Ready(None));
}
match self.poll_upgrades_in_progress()? {
Async::Ready(value) => return Ok(Async::Ready(value.map(NodeHandlerEvent::Custom))),
Async::NotReady => (),
};
match self.poll_custom_protocols()? {
Async::Ready(value) => return Ok(Async::Ready(value.map(NodeHandlerEvent::Custom))),
Async::NotReady => (),
};
match self.poll_kademlia()? {
Async::Ready(value) => return Ok(Async::Ready(value.map(NodeHandlerEvent::Custom))),
Async::NotReady => (),
};
match self.poll_ping()? {
Async::Ready(value) => return Ok(Async::Ready(value.map(NodeHandlerEvent::Custom))),
Async::NotReady => (),
};
match self.poll_identify()? {
Async::Ready(value) => return Ok(Async::Ready(value.map(NodeHandlerEvent::Custom))),
Async::NotReady => (),
};
// Request new outbound substreams from the user if necessary.
if self.num_out_user_must_open >= 1 {
self.num_out_user_must_open -= 1;
return Ok(Async::Ready(Some(NodeHandlerEvent::OutboundSubstreamRequest(()))));
}
// Nothing happened. Register our task to be notified and return.
self.to_notify = Some(task::current());
Ok(Async::NotReady)
}
}
impl<TSubstream> SubstrateNodeHandler<TSubstream>
where TSubstream: AsyncRead + AsyncWrite + Send + 'static,
{
/// Sends a message on a custom protocol substream.
fn send_custom_message(
&mut self,
protocol: ProtocolId,
data: Vec<u8>,
) {
debug_assert!(self.registered_custom.has_protocol(protocol),
"invalid protocol id requested in the API of the libp2p networking");
let proto = match self.custom_protocols_substreams.iter_mut().find(|p| p.protocol_id() == protocol) {
Some(proto) => proto,
None => {
// We are processing a message event before we could report to the outside that
// we disconnected from the protocol. This is not an error.
return
},
};
proto.send_message(data.into());
}
/// The node will try to open a Kademlia substream and produce a `KadOpen` event containing the
/// controller. If a Kademlia substream is already open, produces the event immediately.
fn open_kademlia(&mut self) {
if self.kademlia_substream.is_some() {
self.need_report_kad_open = true;
if let Some(to_notify) = self.to_notify.take() {
to_notify.notify();
}
} else if self.has_upgrade_purpose(&UpgradePurpose::Kad) {
// We are currently upgrading a substream to Kademlia ; nothing more to do except wait.
} else {
// Opening a new substream for Kademlia.
self.queued_dial_upgrades.push(UpgradePurpose::Kad);
self.num_out_user_must_open += 1;
if let Some(to_notify) = self.to_notify.take() {
to_notify.notify();
}
}
}
/// Returns true if we are currently upgrading to the given protocol.
fn has_upgrade_purpose(&self, purpose: &UpgradePurpose) -> bool {
self.upgrades_in_progress_dial.iter().any(|&(ref p, _)| p == purpose) ||
self.queued_dial_upgrades.iter().any(|p| p == purpose)
}
/// Cancels a dialing upgrade in progress.
///
/// Useful when the listener opened the protocol we wanted.
fn cancel_dial_upgrade(&mut self, purpose: &UpgradePurpose) {
self.upgrades_in_progress_dial.retain(|&(purp, _)| &purp != purpose);
self.queued_dial_upgrades.retain(|u| u != purpose);
}
/// Returns the names of the protocols that we supporitt.
fn supported_protocol_names(&self) -> Vec<String> {
let list = listener_upgrade!(self);
ConnectionUpgrade::<TSubstream>::protocol_names(&list)
.filter_map(|(n, _)| String::from_utf8(n.to_vec()).ok())
.collect()
}
/// Inject a fully negotiated substream into the state.
///
/// Optionally produces an event to dispatch.
fn inject_fully_negotiated(
&mut self,
upgrade: FinalUpgrade<TSubstream>
) -> Option<SubstrateOutEvent<TSubstream>> {
match upgrade {
FinalUpgrade::IdentifyListener(sender) =>
Some(SubstrateOutEvent::IdentificationRequest(IdentificationRequest {
sender,
identify_send_back: self.identify_send_back.clone(),
protocols: self.supported_protocol_names(),
})),
FinalUpgrade::IdentifyDialer(info, observed_addr) => {
self.cancel_dial_upgrade(&UpgradePurpose::Identify);
Some(SubstrateOutEvent::Identified { info, observed_addr })
},
FinalUpgrade::PingDialer(ping_dialer) => {
self.cancel_dial_upgrade(&UpgradePurpose::Ping);
// We always open the ping substream for a reason, which is to immediately ping.
self.ping_out_substream = Some(ping_dialer);
self.active_ping_out = None;
if self.ping_remote() {
Some(SubstrateOutEvent::PingStart)
} else {
None
}
},
FinalUpgrade::PingListener(ping_listener) => {
self.ping_in_substreams.push(ping_listener);
None
},
FinalUpgrade::Kad(controller, stream) => {
// Remove all upgrades in the progress for Kademlia.
self.cancel_dial_upgrade(&UpgradePurpose::Kad);
// Refuse the substream if we already have Kademlia substream open.
if self.kademlia_substream.is_none() {
self.kademlia_substream = Some((controller.clone(), stream));
Some(SubstrateOutEvent::KadOpen(controller))
} else {
None
}
},
FinalUpgrade::Custom(proto) => {
self.cancel_dial_upgrade(&UpgradePurpose::Custom(proto.protocol_id()));
if self.custom_protocols_substreams.iter().any(|p| p.protocol_id() == proto.protocol_id()) {
// Skipping protocol that's already open.
return None;
}
let event = SubstrateOutEvent::CustomProtocolOpen {
protocol_id: proto.protocol_id(),
version: proto.protocol_version(),
};
self.custom_protocols_substreams.push(proto);
Some(event)
},
}
}
/// Start the process of identifying the remote.
fn identify_remote(&mut self) {
if !self.has_upgrade_purpose(&UpgradePurpose::Identify) {
self.queued_dial_upgrades.push(UpgradePurpose::Identify);
self.num_out_user_must_open += 1;
if let Some(to_notify) = self.to_notify.take() {
to_notify.notify();
}
}
}
/// Start the process of pinging the remote.
///
/// Doesn't do anything if a ping attempt is already in progress.
///
/// Returns true if this actually starts a ping, false is this just opens a substream or does
/// nothing.
fn ping_remote(&mut self) -> bool {
// Ignore if we are already actively pinging.
if self.active_ping_out.is_some() {
return false;
}
// If we have a ping open, ping it!
if let Some(ref mut pinger) = self.ping_out_substream {
let now = Instant::now();
pinger.ping(now);
let future = Delay::new(now + PING_TIMEOUT);
self.active_ping_out = Some(future);
if let Some(to_notify) = self.to_notify.take() {
to_notify.notify();
}
return true;
}
// Otherwise, ensure we have an upgrade for a ping substream in queue.
if !self.has_upgrade_purpose(&UpgradePurpose::Ping) {
self.queued_dial_upgrades.push(UpgradePurpose::Ping);
self.num_out_user_must_open += 1;
// We also start the unresponsiveness counter when opening the substream, as a
// peer may not respond to our opening request.
let future = Delay::new(Instant::now() + PING_TIMEOUT);
self.active_ping_out = Some(future);
if let Some(to_notify) = self.to_notify.take() {
to_notify.notify();
}
}
false
}
/// Polls the upgrades in progress.
fn poll_upgrades_in_progress(&mut self) -> Poll<Option<SubstrateOutEvent<TSubstream>>, IoError> {
// Continue negotiation of newly-opened substreams on the listening side.
// We remove each element from `upgrades_in_progress_listen` one by one and add them back
// if not ready.
for n in (0 .. self.upgrades_in_progress_listen.len()).rev() {
let mut in_progress = self.upgrades_in_progress_listen.swap_remove(n);
match in_progress.poll() {
Ok(Async::Ready(upgrade)) => {
if let Some(event) = self.inject_fully_negotiated(upgrade) {
return Ok(Async::Ready(Some(event)));
}
},
Ok(Async::NotReady) => {
self.upgrades_in_progress_listen.push(in_progress);
},
Err(err) => {
return Ok(Async::Ready(Some(SubstrateOutEvent::SubstreamUpgradeFail(err))));
},
}
}
// Continue negotiation of newly-opened substreams.
// We remove each element from `upgrades_in_progress_dial` one by one and add them back if
// not ready.
for n in (0 .. self.upgrades_in_progress_dial.len()).rev() {
let (purpose, mut in_progress) = self.upgrades_in_progress_dial.swap_remove(n);
match in_progress.poll() {
Ok(Async::Ready(upgrade)) => {
if let Some(event) = self.inject_fully_negotiated(upgrade) {
return Ok(Async::Ready(Some(event)));
}
},
Ok(Async::NotReady) =>
self.upgrades_in_progress_dial.push((purpose, in_progress)),
Err(err) => {
// TODO: dispatch depending on actual error ; right now we assume that
// error == not supported, which is not necessarily true in theory
if let UpgradePurpose::Custom(_) = purpose {
return Ok(Async::Ready(Some(SubstrateOutEvent::Useless)));
} else {
let msg = format!("While upgrading to {:?}: {:?}", purpose, err);
let err = IoError::new(IoErrorKind::Other, msg);
return Ok(Async::Ready(Some(SubstrateOutEvent::SubstreamUpgradeFail(err))));
}
},
}
}
Ok(Async::NotReady)
}
/// Polls the upgrades in progress.
fn poll_custom_protocols(&mut self) -> Poll<Option<SubstrateOutEvent<TSubstream>>, IoError> {
// Poll for messages on the custom protocol stream.
for n in (0 .. self.custom_protocols_substreams.len()).rev() {
let mut custom_proto = self.custom_protocols_substreams.swap_remove(n);
match custom_proto.poll() {
Ok(Async::NotReady) => self.custom_protocols_substreams.push(custom_proto),
Ok(Async::Ready(Some(data))) => {
let protocol_id = custom_proto.protocol_id();
self.custom_protocols_substreams.push(custom_proto);
return Ok(Async::Ready(Some(SubstrateOutEvent::CustomMessage {
protocol_id,
data,
})));
},
Ok(Async::Ready(None)) => {
// Trying to reopen the protocol.
self.queued_dial_upgrades.push(UpgradePurpose::Custom(custom_proto.protocol_id()));
self.num_out_user_must_open += 1;
return Ok(Async::Ready(Some(SubstrateOutEvent::CustomProtocolClosed {
protocol_id: custom_proto.protocol_id(),
result: Ok(()),
})))
},
Err(err) => {
// Trying to reopen the protocol.
self.queued_dial_upgrades.push(UpgradePurpose::Custom(custom_proto.protocol_id()));
self.num_out_user_must_open += 1;
return Ok(Async::Ready(Some(SubstrateOutEvent::CustomProtocolClosed {
protocol_id: custom_proto.protocol_id(),
result: Err(err),
})))
},
}
}
Ok(Async::NotReady)
}
/// Polls the open Kademlia substream, if any.
fn poll_kademlia(&mut self) -> Poll<Option<SubstrateOutEvent<TSubstream>>, IoError> {
// Produce a `KadOpen` event if necessary.
if self.need_report_kad_open {
self.need_report_kad_open = false;
if let Some((ref kad_ctrl, _)) = self.kademlia_substream {
return Ok(Async::Ready(Some(SubstrateOutEvent::KadOpen(kad_ctrl.clone()))));
}
}
// Poll for Kademlia events.
if let Some((controller, mut stream)) = self.kademlia_substream.take() {
loop {
match stream.poll() {
Ok(Async::Ready(Some(KadIncomingRequest::FindNode { searched, responder }))) => {
self.kademlia_substream = Some((controller, stream));
return Ok(Async::Ready(Some(SubstrateOutEvent::KadFindNode { searched, responder })));
},
// We don't care about Kademlia pings, they are unused.
Ok(Async::Ready(Some(KadIncomingRequest::PingPong))) => {},
// Other Kademlia messages are unimplemented.
Ok(Async::Ready(Some(KadIncomingRequest::GetProviders { .. }))) => {},
Ok(Async::Ready(Some(KadIncomingRequest::AddProvider { .. }))) => {},
Ok(Async::NotReady) => {
self.kademlia_substream = Some((controller, stream));
break;
},
Ok(Async::Ready(None)) => return Ok(Async::Ready(Some(SubstrateOutEvent::KadClosed(Ok(()))))),
Err(err) => return Ok(Async::Ready(Some(SubstrateOutEvent::KadClosed(Err(err))))),
}
}
}
Ok(Async::NotReady)
}
/// Polls the ping substreams.
fn poll_ping(&mut self) -> Poll<Option<SubstrateOutEvent<TSubstream>>, IoError> {
// Poll the future that fires when we need to ping the node again.
match self.next_ping.poll() {
Ok(Async::NotReady) => {},
Ok(Async::Ready(())) => {
// We reset `next_ping` to a very long time in the future so that we can poll
// it again without having an accident.
self.next_ping.reset(Instant::now() + Duration::from_secs(5 * 60));
if self.ping_remote() {
return Ok(Async::Ready(Some(SubstrateOutEvent::PingStart)));
}
},
Err(err) => {
warn!(target: "sub-libp2p", "Ping timer errored: {:?}", err);
return Err(IoError::new(IoErrorKind::Other, err));
}
}
// Poll for answering pings.
for n in (0 .. self.ping_in_substreams.len()).rev() {
let mut ping = self.ping_in_substreams.swap_remove(n);
match ping.poll() {
Ok(Async::Ready(())) => {},
Ok(Async::NotReady) => self.ping_in_substreams.push(ping),
Err(err) => warn!(target: "sub-libp2p", "Remote ping substream errored: {:?}", err),
}
}
// Poll the ping substream.
if let Some(mut ping_dialer) = self.ping_out_substream.take() {
match ping_dialer.poll() {
Ok(Async::Ready(Some(started))) => {
self.active_ping_out = None;
self.next_ping.reset(Instant::now() + DELAY_TO_NEXT_PING);
return Ok(Async::Ready(Some(SubstrateOutEvent::PingSuccess(started.elapsed()))));
},
Ok(Async::Ready(None)) => {
// Try re-open ping if it got closed.
self.queued_dial_upgrades.push(UpgradePurpose::Ping);
self.num_out_user_must_open += 1;
},
Ok(Async::NotReady) => self.ping_out_substream = Some(ping_dialer),
Err(_) => {},
}
}
// Poll the active ping attempt.
if let Some(mut deadline) = self.active_ping_out.take() {
match deadline.poll() {
Ok(Async::Ready(())) =>
return Ok(Async::Ready(Some(SubstrateOutEvent::Unresponsive))),
Ok(Async::NotReady) => self.active_ping_out = Some(deadline),
Err(err) => {
warn!(target: "sub-libp2p", "Active ping deadline errored: {:?}", err);
return Err(IoError::new(IoErrorKind::Other, err));
},
}
}
Ok(Async::NotReady)
}
/// Polls the identify substreams.
fn poll_identify(&mut self) -> Poll<Option<SubstrateOutEvent<TSubstream>>, IoError> {
// Poll the future that fires when we need to identify the node again.
loop {
match self.next_identify.poll() {
Ok(Async::NotReady) => break,
Ok(Async::Ready(Some(_))) => self.identify_remote(),
Ok(Async::Ready(None)) => {
warn!(target: "sub-libp2p", "Identify timer closed unexpectedly");
return Ok(Async::Ready(None));
}
Err(err) => {
warn!(target: "sub-libp2p", "Identify timer errored: {:?}", err);
return Err(IoError::new(IoErrorKind::Other, err));
}
}
}
// Poll for sending identify information to the remote.
let mut identify_send_back = self.identify_send_back.lock();
for n in (0 .. identify_send_back.len()).rev() {
let mut id_send_back = identify_send_back.swap_remove(n);
match id_send_back.poll() {
Ok(Async::Ready(())) => {},
Ok(Async::NotReady) => identify_send_back.push(id_send_back),
Err(err) => warn!(target: "sub-libp2p", "Sending back identify info errored: {:?}", err),
}
}
Ok(Async::NotReady)
}
}
/// Enum of all the possible protocols our service handles.
enum FinalUpgrade<TSubstream> {
Kad(KadConnecController, Box<Stream<Item = KadIncomingRequest, Error = IoError> + Send>),
IdentifyListener(identify::IdentifySender<TSubstream>),
IdentifyDialer(identify::IdentifyInfo, Multiaddr),
PingDialer(ping::protocol::PingDialer<TSubstream, Instant>),
PingListener(ping::protocol::PingListener<TSubstream>),
Custom(RegisteredProtocolSubstream<TSubstream>),
}
impl<TSubstream> From<ping::protocol::PingOutput<TSubstream, Instant>> for FinalUpgrade<TSubstream> {
fn from(out: ping::protocol::PingOutput<TSubstream, Instant>) -> Self {
match out {
ping::protocol::PingOutput::Ponger(ponger) => FinalUpgrade::PingListener(ponger),
ping::protocol::PingOutput::Pinger(pinger) => FinalUpgrade::PingDialer(pinger),
}
}
}
impl<TSubstream> From<identify::IdentifyOutput<TSubstream>> for FinalUpgrade<TSubstream> {
fn from(out: identify::IdentifyOutput<TSubstream>) -> Self {
match out {
identify::IdentifyOutput::RemoteInfo { info, observed_addr } =>
FinalUpgrade::IdentifyDialer(info, observed_addr),
identify::IdentifyOutput::Sender { sender } =>
FinalUpgrade::IdentifyListener(sender),
}
}
}
File diff suppressed because it is too large Load Diff
-672
View File
@@ -1,672 +0,0 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use bytes::Bytes;
use custom_proto::RegisteredProtocols;
use fnv::FnvHashMap;
use futures::{prelude::*, Stream};
use libp2p::{Multiaddr, multiaddr::Protocol, PeerId};
use libp2p::core::{muxing, Endpoint, PublicKey};
use libp2p::core::nodes::{ConnectedPoint, RawSwarm, RawSwarmEvent, Peer as SwarmPeer, Substream};
use libp2p::core::transport::boxed::Boxed;
use libp2p::kad::{KadConnecController, KadFindNodeRespond};
use libp2p::secio;
use node_handler::{SubstrateOutEvent, SubstrateNodeHandler, SubstrateInEvent, IdentificationRequest};
use std::{io, mem, sync::Arc};
use transport;
use {Error, NodeIndex, ProtocolId};
/// Starts a swarm.
///
/// Returns a stream that must be polled regularly in order for the networking to function.
pub fn start_swarm(
registered_custom: RegisteredProtocols,
local_private_key: secio::SecioKeyPair,
) -> Result<Swarm, Error> {
// Private and public keys.
let local_public_key = local_private_key.to_public_key();
let local_peer_id = local_public_key.clone().into_peer_id();
// Build the transport layer. This is what allows us to listen or to reach nodes.
let transport = transport::build_transport(local_private_key);
// Build the underlying libp2p swarm.
let swarm = RawSwarm::new(transport);
Ok(Swarm {
swarm,
registered_custom: Arc::new(registered_custom),
local_public_key,
local_peer_id,
listening_addrs: Vec::new(),
node_by_peer: Default::default(),
nodes_info: Default::default(),
next_node_index: 0,
})
}
/// Event produced by the swarm.
pub enum SwarmEvent {
/// We have successfully connected to a node.
///
/// The node is in pending node, and should be accepted by calling `accept_node(node_index)`
/// or denied by calling `drop_node(node_index)`.
NodePending {
/// Index of the node.
node_index: NodeIndex,
/// Public key of the node as a peer id.
peer_id: PeerId,
/// Whether we dialed the node or if it came to us.
endpoint: ConnectedPoint,
},
/// The connection to a peer has changed.
Reconnected {
/// Index of the node.
node_index: NodeIndex,
/// The new endpoint.
endpoint: ConnectedPoint,
/// List of custom protocols that were closed in the process.
closed_custom_protocols: Vec<ProtocolId>,
},
/// Closed connection to a node, either gracefully or because of an error.
///
/// It is guaranteed that this node has been opened with a `NewNode` event beforehand. However
/// not all `ClosedCustomProtocol` events have been dispatched.
NodeClosed {
/// Index of the node.
node_index: NodeIndex,
/// Peer id we were connected to.
peer_id: PeerId,
/// List of custom protocols that were still open.
closed_custom_protocols: Vec<ProtocolId>,
},
/// Failed to dial an address.
DialFail {
/// Address that failed.
address: Multiaddr,
/// Reason why we failed.
error: io::Error,
},
/// Report information about the node.
NodeInfos {
/// Index of the node.
node_index: NodeIndex,
/// The client version. Note that it can be anything and should not be trusted.
client_version: String,
/// Multiaddresses the node is listening on.
listen_addrs: Vec<Multiaddr>,
},
/// A custom protocol substream has been opened with a node.
OpenedCustomProtocol {
/// Index of the node.
node_index: NodeIndex,
/// Protocol that has been opened.
protocol: ProtocolId,
/// Version of the protocol that was opened.
version: u8,
},
/// A custom protocol substream has been closed.
ClosedCustomProtocol {
/// Index of the node.
node_index: NodeIndex,
/// Protocol that has been closed.
protocol: ProtocolId,
},
/// Receives a message on a custom protocol stream.
CustomMessage {
/// Index of the node.
node_index: NodeIndex,
/// Protocol which generated the message.
protocol_id: ProtocolId,
/// Data that has been received.
data: Bytes,
},
/// The node has been determined to be unresponsive.
UnresponsiveNode {
/// Index of the node.
node_index: NodeIndex,
},
/// The node works but we can't do anything useful with it.
UselessNode {
/// Index of the node.
node_index: NodeIndex,
},
/// Opened a Kademlia substream with the node.
// TODO: the controller API is bad, but we need to make changes in libp2p to improve that
KadOpen {
/// Index of the node.
node_index: NodeIndex,
/// The Kademlia controller. Allows making queries.
controller: KadConnecController,
},
/// The remote wants us to answer a Kademlia `FIND_NODE` request.
///
/// The `responder` should be used to answer that query.
// TODO: this API with the "responder" is bad, but changing it requires modifications in libp2p
KadFindNode {
/// Index of the node that wants an answer.
node_index: NodeIndex,
/// The value being searched.
searched: PeerId,
/// Object to use to respond to the request.
responder: KadFindNodeRespond,
},
/// A Kademlia substream has been closed.
KadClosed {
/// Index of the node.
node_index: NodeIndex,
/// Reason why it has been closed. `Ok` means that it's been closed gracefully.
result: Result<(), io::Error>,
},
}
/// Network swarm. Must be polled regularly in order for the networking to work.
pub struct Swarm {
/// Stream of events of the swarm.
swarm: RawSwarm<
Boxed<(PeerId, Muxer)>,
SubstrateInEvent,
SubstrateOutEvent<Substream<Muxer>>,
SubstrateNodeHandler<Substream<Muxer>>
>,
/// List of registered protocols. Used when we open or receive a new connection.
registered_custom: Arc<RegisteredProtocols>,
/// Public key of the local node.
local_public_key: PublicKey,
/// Peer id of the local node.
local_peer_id: PeerId,
/// Addresses we know we're listening on. Only includes NAT traversed addresses.
listening_addrs: Vec<Multiaddr>,
/// For each peer id, the corresponding node index.
node_by_peer: FnvHashMap<PeerId, NodeIndex>,
/// All the nodes tasks. Must be maintained consistent with `node_by_peer`.
nodes_info: FnvHashMap<NodeIndex, NodeInfo>,
/// Next key to use when we insert a new entry in `nodes_info`.
next_node_index: NodeIndex,
}
/// Local information about a peer.
struct NodeInfo {
/// The peer id. Must be maintained consistent with the rest of the state.
peer_id: PeerId,
/// Whether we opened the connection or the remote opened it.
endpoint: Endpoint,
/// List of custom protocol substreams that are open.
open_protocols: Vec<ProtocolId>,
}
/// The muxer used by the transport.
type Muxer = muxing::StreamMuxerBox;
impl Swarm {
/// Start listening on a multiaddr.
#[inline]
pub fn listen_on(&mut self, addr: Multiaddr) -> Result<Multiaddr, Multiaddr> {
match self.swarm.listen_on(addr) {
Ok(mut addr) => {
addr.append(Protocol::P2p(self.local_peer_id.clone().into()));
info!(target: "sub-libp2p", "Local node address is: {}", addr);
Ok(addr)
},
Err(addr) => Err(addr)
}
}
/// Returns an iterator that produces the list of addresses we're listening on.
#[inline]
pub fn listeners(&self) -> impl Iterator<Item = &Multiaddr> {
self.swarm.listeners()
}
/// Adds an external address. Sent to other nodes when they query it.
#[inline]
pub fn add_external_address(&mut self, addr: Multiaddr) {
self.listening_addrs.push(addr);
}
/// Returns an iterator to our known external addresses.
#[inline]
pub fn external_addresses(&self) -> impl Iterator<Item = &Multiaddr> {
self.listening_addrs.iter()
}
/// Returns all the nodes that are currently active.
#[inline]
pub fn nodes<'a>(&'a self) -> impl Iterator<Item = NodeIndex> + 'a {
self.nodes_info.keys().cloned()
}
/// Returns the latest node connected to this peer ID.
#[inline]
pub fn latest_node_by_peer_id(&self, peer_id: &PeerId) -> Option<NodeIndex> {
self.node_by_peer.get(peer_id).map(|&i| i)
}
/// Endpoint of the node.
///
/// Returns `None` if the index is invalid.
#[inline]
pub fn node_endpoint(&self, node_index: NodeIndex) -> Option<Endpoint> {
self.nodes_info.get(&node_index).map(|i| i.endpoint)
}
/// Sends a message to a peer using the custom protocol.
// TODO: report invalid node index or protocol?
pub fn send_custom_message(
&mut self,
node_index: NodeIndex,
protocol: ProtocolId,
data: Vec<u8>
) {
if let Some(info) = self.nodes_info.get_mut(&node_index) {
if let Some(mut connected) = self.swarm.peer(info.peer_id.clone()).as_connected() {
connected.send_event(SubstrateInEvent::SendCustomMessage { protocol, data });
} else {
error!(target: "sub-libp2p", "Tried to send message to {:?}, but we're not \
connected to it", info.peer_id);
}
} else {
error!(target: "sub-libp2p", "Tried to send message to invalid node index {:?}",
node_index);
}
}
/// Returns the peer id of a node we're connected to.
#[inline]
pub fn peer_id_of_node(&self, node_index: NodeIndex) -> Option<&PeerId> {
self.nodes_info.get(&node_index).map(|i| &i.peer_id)
}
/// If we're not already dialing the given peer, start dialing it and return false.
/// If we're dialing, adds the address to the queue of addresses to try (if not already) and
/// return false.
/// If we're already connected, do nothing and return true.
///
/// Returns an error if the address is not supported.
pub fn ensure_connection(&mut self, peer_id: PeerId, addr: Multiaddr) -> Result<bool, ()> {
match self.swarm.peer(peer_id.clone()) {
SwarmPeer::Connected(_) => Ok(true),
SwarmPeer::PendingConnect(mut peer) => {
peer.append_multiaddr_attempt(addr);
Ok(false)
},
SwarmPeer::NotConnected(peer) => {
trace!(target: "sub-libp2p", "Starting to connect to {:?} through {}",
peer_id, addr);
match peer.connect(addr, SubstrateNodeHandler::new(self.registered_custom.clone())) {
Ok(_) => Ok(false),
Err(_) => Err(()),
}
},
}
}
/// Start dialing an address, not knowing which peer ID to expect.
#[inline]
pub fn dial(&mut self, addr: Multiaddr) -> Result<(), Multiaddr> {
self.swarm.dial(addr, SubstrateNodeHandler::new(self.registered_custom.clone()))
}
/// After receiving a `NodePending` event, you should call either `accept_node` or `drop_node`
/// with the specified index.
///
/// Returns an error if the node index is invalid, or if it was already accepted.
pub fn accept_node(&mut self, node_index: NodeIndex) -> Result<(), ()> {
// TODO: detect if already accepted?
let peer_id = match self.nodes_info.get(&node_index) {
Some(info) => &info.peer_id,
None => return Err(())
};
match self.swarm.peer(peer_id.clone()) {
SwarmPeer::Connected(mut peer) => {
peer.send_event(SubstrateInEvent::Accept);
Ok(())
},
SwarmPeer::PendingConnect(_) | SwarmPeer::NotConnected(_) => {
error!(target: "sub-libp2p", "State inconsistency detected in accept_node ; \
nodes_info is not in sync with the underlying swarm");
Err(())
},
}
}
/// Disconnects a peer.
///
/// If the peer is connected, this disconnects it.
/// If the peer hasn't been accepted yet, this immediately drops it.
///
/// Returns the list of custom protocol substreams that were opened.
#[inline]
pub fn drop_node(&mut self, node_index: NodeIndex) -> Result<Vec<ProtocolId>, ()> {
let info = match self.nodes_info.remove(&node_index) {
Some(i) => i,
None => {
error!(target: "sub-libp2p", "Trying to close non-existing node #{}", node_index);
return Err(());
},
};
let idx_in_hashmap = self.node_by_peer.remove(&info.peer_id);
debug_assert_eq!(idx_in_hashmap, Some(node_index));
if let Some(connected) = self.swarm.peer(info.peer_id.clone()).as_connected() {
connected.close();
} else {
error!(target: "sub-libp2p", "State inconsistency: node_by_peer and nodes_info are \
not in sync with the underlying swarm");
}
Ok(info.open_protocols)
}
/// Opens a Kademlia substream with the given node. A `KadOpen` event will later be produced
/// for the given node.
///
/// If a Kademlia substream is already open, also produces a `KadOpen` event.
///
/// Returns an error if the node index is invalid.
pub fn open_kademlia(&mut self, node_index: NodeIndex) -> Result<(), ()> {
if let Some(info) = self.nodes_info.get_mut(&node_index) {
if let Some(mut connected) = self.swarm.peer(info.peer_id.clone()).as_connected() {
connected.send_event(SubstrateInEvent::OpenKademlia);
Ok(())
} else {
error!(target: "sub-libp2p", "Tried to open Kademlia with {:?}, but we're not \
connected to it", info.peer_id);
Err(())
}
} else {
error!(target: "sub-libp2p", "Tried to open Kademlia with invalid node index {:?}",
node_index);
Err(())
}
}
/// Adds an address the given peer observes us as.
fn add_observed_addr(&mut self, peer_id: &PeerId, observed_addr: &Multiaddr) {
for mut addr in self.swarm.nat_traversal(observed_addr) {
// Ignore addresses we already know about.
if self.listening_addrs.iter().any(|a| a == &addr) {
continue;
}
debug!(target: "sub-libp2p",
"NAT traversal: {:?} observes us as {}; registering {} as one of our own addresses",
peer_id,
observed_addr,
addr
);
self.listening_addrs.push(addr.clone());
addr.append(Protocol::P2p(self.local_peer_id.clone().into()));
info!(target: "sub-libp2p", "New external node address: {}", addr);
}
}
/// Responds to an answer to send back identification information.
fn respond_to_identify_request(
&mut self,
requester: &PeerId,
responder: IdentificationRequest<Substream<Muxer>>
) {
let peer = match self.swarm.peer(requester.clone()).as_connected() {
Some(p) => p,
None => {
debug!(target: "sub-libp2p", "Ignoring identify request from {:?} because we are \
disconnected", requester);
return;
}
};
let observed_addr = match peer.endpoint() {
&ConnectedPoint::Dialer { ref address } => address,
&ConnectedPoint::Listener { ref send_back_addr, .. } => send_back_addr,
};
trace!(target: "sub-libp2p", "Responding to identify request from {:?}", requester);
responder.respond(
self.local_public_key.clone(),
self.listening_addrs.clone(),
&observed_addr,
);
}
/// Processes an event obtained by a node in the swarm.
///
/// Optionally returns an event that the service must emit.
///
/// > **Note**: The event **must** have been produced by the swarm, otherwise state
/// > inconsistencies will likely happen.
fn handle_node_event(
&mut self,
peer_id: PeerId,
event: SubstrateOutEvent<Substream<Muxer>>
) -> Option<SwarmEvent> {
// Obtain the peer id and whether the node has been closed earlier.
// If the node has been closed, do not generate any additional event about it.
let node_index = *self.node_by_peer.get(&peer_id)
.expect("node_by_peer is always kept in sync with the underlying swarm");
match event {
SubstrateOutEvent::Unresponsive => {
debug!(target: "sub-libp2p", "Node {:?} is unresponsive", peer_id);
Some(SwarmEvent::UnresponsiveNode { node_index })
},
SubstrateOutEvent::Useless => {
debug!(target: "sub-libp2p", "Node {:?} is useless", peer_id);
Some(SwarmEvent::UselessNode { node_index })
},
SubstrateOutEvent::PingStart => {
trace!(target: "sub-libp2p", "Pinging {:?}", peer_id);
None
},
SubstrateOutEvent::PingSuccess(ping) => {
trace!(target: "sub-libp2p", "Pong from {:?} in {:?}", peer_id, ping);
None
},
SubstrateOutEvent::Identified { info, observed_addr } => {
self.add_observed_addr(&peer_id, &observed_addr);
trace!(target: "sub-libp2p", "Client version of {:?}: {:?}", peer_id, info.agent_version);
if !info.agent_version.contains("substrate") {
info!(target: "sub-libp2p", "Connected to non-substrate node {:?}: {}",
peer_id, info.agent_version);
}
Some(SwarmEvent::NodeInfos {
node_index,
client_version: info.agent_version,
listen_addrs: info.listen_addrs,
})
},
SubstrateOutEvent::IdentificationRequest(request) => {
self.respond_to_identify_request(&peer_id, request);
None
},
SubstrateOutEvent::KadFindNode { searched, responder } => {
Some(SwarmEvent::KadFindNode { node_index, searched, responder })
},
SubstrateOutEvent::KadOpen(ctrl) => {
trace!(target: "sub-libp2p", "Opened Kademlia substream with {:?}", peer_id);
Some(SwarmEvent::KadOpen { node_index, controller: ctrl })
},
SubstrateOutEvent::KadClosed(result) => {
trace!(target: "sub-libp2p", "Closed Kademlia substream with {:?}: {:?}", peer_id, result);
Some(SwarmEvent::KadClosed { node_index, result })
},
SubstrateOutEvent::CustomProtocolOpen { protocol_id, version } => {
trace!(target: "sub-libp2p", "Opened custom protocol with {:?}", peer_id);
self.nodes_info.get_mut(&node_index)
.expect("nodes_info is kept in sync with the underlying swarm")
.open_protocols.push(protocol_id);
Some(SwarmEvent::OpenedCustomProtocol {
node_index,
protocol: protocol_id,
version,
})
},
SubstrateOutEvent::CustomProtocolClosed { protocol_id, result } => {
trace!(target: "sub-libp2p", "Closed custom protocol with {:?}: {:?}", peer_id, result);
self.nodes_info.get_mut(&node_index)
.expect("nodes_info is kept in sync with the underlying swarm")
.open_protocols.retain(|p| p != &protocol_id);
Some(SwarmEvent::ClosedCustomProtocol {
node_index,
protocol: protocol_id,
})
},
SubstrateOutEvent::CustomMessage { protocol_id, data } => {
Some(SwarmEvent::CustomMessage {
node_index,
protocol_id,
data,
})
},
SubstrateOutEvent::SubstreamUpgradeFail(err) => {
debug!(target: "sub-libp2p", "Error while negotiating final protocol \
with {:?}: {:?}", peer_id, err);
None
},
}
}
}
impl Stream for Swarm {
type Item = SwarmEvent;
type Error = io::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
loop {
let (peer_id, node_event) = match self.swarm.poll() {
Async::Ready(RawSwarmEvent::Connected { peer_id, endpoint }) => {
let node_index = self.next_node_index.clone();
self.next_node_index += 1;
self.node_by_peer.insert(peer_id.clone(), node_index);
self.nodes_info.insert(node_index, NodeInfo {
peer_id: peer_id.clone(),
endpoint: match endpoint {
ConnectedPoint::Listener { .. } => Endpoint::Listener,
ConnectedPoint::Dialer { .. } => Endpoint::Dialer,
},
open_protocols: Vec::new(),
});
return Ok(Async::Ready(Some(SwarmEvent::NodePending {
node_index,
peer_id,
endpoint
})));
}
Async::Ready(RawSwarmEvent::Replaced { peer_id, endpoint, .. }) => {
let node_index = *self.node_by_peer.get(&peer_id)
.expect("node_by_peer is always kept in sync with the inner swarm");
let infos = self.nodes_info.get_mut(&node_index)
.expect("nodes_info is always kept in sync with the swarm");
debug_assert_eq!(infos.peer_id, peer_id);
infos.endpoint = match endpoint {
ConnectedPoint::Listener { .. } => Endpoint::Listener,
ConnectedPoint::Dialer { .. } => Endpoint::Dialer,
};
let closed_custom_protocols = mem::replace(&mut infos.open_protocols, Vec::new());
return Ok(Async::Ready(Some(SwarmEvent::Reconnected {
node_index,
endpoint,
closed_custom_protocols,
})));
},
Async::Ready(RawSwarmEvent::NodeClosed { peer_id, .. }) => {
debug!(target: "sub-libp2p", "Connection to {:?} closed gracefully", peer_id);
let node_index = self.node_by_peer.remove(&peer_id)
.expect("node_by_peer is always kept in sync with the inner swarm");
let infos = self.nodes_info.remove(&node_index)
.expect("nodes_info is always kept in sync with the inner swarm");
debug_assert_eq!(infos.peer_id, peer_id);
return Ok(Async::Ready(Some(SwarmEvent::NodeClosed {
node_index,
peer_id,
closed_custom_protocols: infos.open_protocols,
})));
},
Async::Ready(RawSwarmEvent::NodeError { peer_id, error, .. }) => {
debug!(target: "sub-libp2p", "Closing {:?} because of error: {:?}", peer_id, error);
let node_index = self.node_by_peer.remove(&peer_id)
.expect("node_by_peer is always kept in sync with the inner swarm");
let infos = self.nodes_info.remove(&node_index)
.expect("nodes_info is always kept in sync with the inner swarm");
debug_assert_eq!(infos.peer_id, peer_id);
return Ok(Async::Ready(Some(SwarmEvent::NodeClosed {
node_index,
peer_id,
closed_custom_protocols: infos.open_protocols,
})));
},
Async::Ready(RawSwarmEvent::DialError { multiaddr, error, .. }) =>
return Ok(Async::Ready(Some(SwarmEvent::DialFail {
address: multiaddr,
error,
}))),
Async::Ready(RawSwarmEvent::UnknownPeerDialError { multiaddr, error, .. }) =>
return Ok(Async::Ready(Some(SwarmEvent::DialFail {
address: multiaddr,
error,
}))),
Async::Ready(RawSwarmEvent::ListenerClosed { listen_addr, result, .. }) => {
warn!(target: "sub-libp2p", "Listener closed for {}: {:?}", listen_addr, result);
continue;
},
Async::Ready(RawSwarmEvent::NodeEvent { peer_id, event }) => (peer_id, event),
Async::Ready(RawSwarmEvent::IncomingConnection(incoming)) => {
trace!(target: "sub-libp2p", "Incoming connection with {} on listener {}",
incoming.send_back_addr(), incoming.listen_addr());
incoming.accept(SubstrateNodeHandler::new(self.registered_custom.clone()));
continue;
},
Async::Ready(RawSwarmEvent::IncomingConnectionError { listen_addr, send_back_addr, error }) => {
trace!(target: "sub-libp2p", "Incoming connection with {} on listener {} \
errored: {:?}", send_back_addr, listen_addr, error);
continue;
},
Async::NotReady => return Ok(Async::NotReady),
};
if let Some(event) = self.handle_node_event(peer_id, node_event) {
return Ok(Async::Ready(Some(event)));
}
}
}
}
+239 -222
View File
@@ -15,10 +15,11 @@
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.?
use fnv::FnvHashMap;
use parking_lot::Mutex;
use libp2p::{Multiaddr, PeerId};
use libp2p::{Multiaddr, PeerId, identify::IdentifyTopology, multihash::Multihash};
use libp2p::core::{PublicKey, swarm::ConnectedPoint, topology::DisconnectReason, topology::Topology};
use libp2p::kad::{KBucketsPeerId, KadConnectionType, KademliaTopology};
use serde_json;
use std::{cmp, fs};
use std::{cmp, fs, iter, vec};
use std::io::{Read, Cursor, Error as IoError, ErrorKind as IoErrorKind, Write, BufReader, BufWriter};
use std::path::{Path, PathBuf};
use std::time::{Duration, Instant, SystemTime};
@@ -46,8 +47,6 @@ const KADEMLIA_DISCOVERY_EXPIRATION: Duration = Duration::from_secs(2 * 3600);
const EXPIRATION_PUSH_BACK_CONNEC: Duration = Duration::from_secs(2 * 3600);
/// Initial score that a bootstrap node receives when registered.
const BOOTSTRAP_NODE_SCORE: u32 = 100;
/// Score modifier to apply on a peer that has been determined to be useless.
const USELESS_PEER_SCORE_CHANGE: i32 = -9;
/// Time to live of a boostrap node. This only applies if you start the node later *without*
/// that bootstrap node configured anymore.
const BOOTSTRAP_NODE_EXPIRATION: Duration = Duration::from_secs(24 * 3600);
@@ -63,15 +62,16 @@ const MAX_BACKOFF: Duration = Duration::from_secs(30 * 60);
/// Stores information about the topology of the network.
#[derive(Debug)]
pub struct NetTopology {
/// The actual storage. Never contains a key for `local_peer_id`.
store: FnvHashMap<PeerId, PeerInfo>,
/// Optional path to the file that caches the serialized version of `store`.
cache_path: Option<PathBuf>,
}
impl Default for NetTopology {
#[inline]
fn default() -> NetTopology {
NetTopology::memory()
}
/// Public key of the local node.
local_public_key: PublicKey,
/// PeerId of the local node. Derived from `local_public_key`.
local_peer_id: PeerId,
/// Known addresses for the local node to report to the network.
external_addresses: Vec<Multiaddr>,
}
impl NetTopology {
@@ -79,10 +79,14 @@ impl NetTopology {
///
/// `flush_to_disk()` will be a no-op.
#[inline]
pub fn memory() -> NetTopology {
pub fn memory(local_public_key: PublicKey) -> NetTopology {
let local_peer_id = local_public_key.clone().into_peer_id();
NetTopology {
store: Default::default(),
cache_path: None,
local_peer_id,
local_public_key,
external_addresses: Vec::new(),
}
}
@@ -92,19 +96,24 @@ impl NetTopology {
/// or contains garbage data, the execution still continues.
///
/// Calling `flush_to_disk()` in the future writes to the given path.
pub fn from_file<P: AsRef<Path>>(path: P) -> NetTopology {
pub fn from_file<P: AsRef<Path>>(local_public_key: PublicKey, path: P) -> NetTopology {
let path = path.as_ref();
let local_peer_id = local_public_key.clone().into_peer_id();
debug!(target: "sub-libp2p", "Initializing peer store for JSON file {:?}", path);
let store = try_load(path, &local_peer_id);
NetTopology {
store: try_load(path),
store,
cache_path: Some(path.to_owned()),
local_peer_id,
local_public_key,
external_addresses: Vec::new(),
}
}
/// Writes the topology into the path passed to `from_file`.
///
/// No-op if the object was created with `memory()`.
pub fn flush_to_disk(&self) -> Result<(), IoError> {
pub fn flush_to_disk(&mut self) -> Result<(), IoError> {
let path = match self.cache_path {
Some(ref p) => p,
None => return Ok(())
@@ -112,10 +121,10 @@ impl NetTopology {
let file = fs::File::create(path)?;
// TODO: the capacity of the BufWriter is kind of arbitrary ; decide better
serialize(BufWriter::with_capacity(1024 * 1024, file), &self.store)
serialize(BufWriter::with_capacity(1024 * 1024, file), &mut self.store)
}
/// Returns the number of peers in the topology.
/// Returns the number of peers in the topology, excluding the local peer.
#[inline]
pub fn num_peers(&self) -> usize {
self.store.len()
@@ -127,40 +136,19 @@ impl NetTopology {
pub fn cleanup(&mut self) {
let now_systime = SystemTime::now();
self.store.retain(|_, peer| {
peer.addrs.retain(|a| {
a.expires > now_systime || a.is_connected()
});
let new_addrs = peer.addrs
.drain(..)
.filter(|a| a.expires > now_systime || a.is_connected())
.collect();
peer.addrs = new_addrs;
!peer.addrs.is_empty()
});
}
/// Returns the known potential addresses of a peer, ordered by score. Excludes backed-off
/// addresses.
///
/// The boolean associated to each address indicates whether we're connected to it.
pub fn addrs_of_peer(&self, peer: &PeerId) -> impl Iterator<Item = (&Multiaddr, bool)> {
let peer = if let Some(peer) = self.store.get(peer) {
peer
} else {
// TODO: use an EitherIterator or something
return Vec::new().into_iter();
};
let now_st = SystemTime::now();
let now_is = Instant::now();
let mut list = peer.addrs.iter().filter_map(move |addr| {
let (score, connected) = addr.score_and_is_connected();
if (addr.expires >= now_st && score > 0 && addr.back_off_until < now_is) || connected {
Some((score, connected, &addr.addr))
} else {
None
}
}).collect::<Vec<_>>();
list.sort_by(|a, b| a.0.cmp(&b.0));
// TODO: meh, optimize
let l = list.into_iter().map(|(_, connec, addr)| (addr, connec)).collect::<Vec<_>>();
l.into_iter()
/// Add the external addresses that are known for the local node.
pub fn add_external_addrs<TIter>(&mut self, addrs: TIter)
where TIter: Iterator<Item = Multiaddr> {
self.external_addresses.extend(addrs);
}
/// Returns a list of all the known addresses of peers, ordered by the
@@ -170,7 +158,7 @@ impl NetTopology {
/// by itself over time. The `Instant` that is returned corresponds to
/// the earlier known time when a new entry will be added automatically to
/// the list.
pub fn addrs_to_attempt(&self) -> (impl Iterator<Item = (&PeerId, &Multiaddr)>, Instant) {
pub fn addrs_to_attempt(&mut self) -> (impl Iterator<Item = (&PeerId, &Multiaddr)>, Instant) {
// TODO: optimize
let now = Instant::now();
let now_systime = SystemTime::now();
@@ -179,20 +167,20 @@ impl NetTopology {
let mut peer_addrs = Vec::new();
'peer_loop: for (peer, info) in &self.store {
'peer_loop: for (peer, info) in &mut self.store {
peer_addrs.clear();
for addr in &info.addrs {
for addr in &mut info.addrs {
let (score, is_connected) = addr.score_and_is_connected();
if is_connected {
continue 'peer_loop;
continue 'peer_loop
}
if score == 0 || addr.expires < now_systime {
continue;
continue
}
if addr.back_off_until > now {
instant = cmp::min(instant, addr.back_off_until);
continue;
continue
}
peer_addrs.push(((peer, &addr.addr), score));
@@ -217,15 +205,19 @@ impl NetTopology {
let peer = peer_access(&mut self.store, peer);
let mut found = false;
peer.addrs.retain(|a| {
if a.expires < now_systime && !a.is_connected() {
return false;
}
if a.addr == addr {
found = true;
}
true
});
let new_addrs = peer.addrs
.drain(..)
.filter_map(|a| {
if a.expires < now_systime && !a.is_connected() {
return None
}
if a.addr == addr {
found = true;
}
Some(a)
})
.collect();
peer.addrs = new_addrs;
if !found {
peer.addrs.push(Addr {
@@ -233,50 +225,15 @@ impl NetTopology {
expires: now_systime + BOOTSTRAP_NODE_EXPIRATION,
back_off_until: now,
next_back_off: FIRST_CONNECT_FAIL_BACKOFF,
score: Mutex::new(AddrScore {
score: AddrScore {
connected_since: None,
score: BOOTSTRAP_NODE_SCORE,
latest_score_update: now,
}),
},
});
}
}
/// Adds addresses that a node says it is listening on.
///
/// The addresses are most likely to be valid.
///
/// Returns `true` if the topology has changed in some way. Returns `false` if calling this
/// method was a no-op.
#[inline]
pub fn add_self_reported_listen_addrs<I>(
&mut self,
peer_id: &PeerId,
addrs: I,
) -> bool
where I: Iterator<Item = Multiaddr> {
self.add_discovered_addrs(peer_id, addrs.map(|a| (a, true)))
}
/// Adds addresses discovered through the Kademlia DHT.
///
/// The addresses are not necessarily valid and should expire after a TTL.
///
/// For each address, incorporates a boolean. If true, that means we have some sort of hint
/// that this address can be reached.
///
/// Returns `true` if the topology has changed in some way. Returns `false` if calling this
/// method was a no-op.
#[inline]
pub fn add_kademlia_discovered_addrs<I>(
&mut self,
peer_id: &PeerId,
addrs: I,
) -> bool
where I: Iterator<Item = (Multiaddr, bool)> {
self.add_discovered_addrs(peer_id, addrs)
}
/// Inner implementaiton of the `add_*_discovered_addrs` methods.
/// Returns `true` if the topology has changed in some way. Returns `false` if calling this
/// method was a no-op.
@@ -292,15 +249,19 @@ impl NetTopology {
let peer = peer_access(&mut self.store, peer_id);
peer.addrs.retain(|a| {
if a.expires < now_systime && !a.is_connected() {
return false;
}
if let Some(pos) = addrs.iter().position(|&(ref addr, _)| addr == &a.addr) {
addrs.remove(pos);
}
true
});
let new_addrs = peer.addrs
.drain(..)
.filter_map(|a| {
if a.expires < now_systime && !a.is_connected() {
return None
}
if let Some(pos) = addrs.iter().position(|&(ref addr, _)| addr == &a.addr) {
addrs.remove(pos);
}
Some(a)
})
.collect();
peer.addrs = new_addrs;
let mut anything_changed = false;
@@ -322,7 +283,7 @@ impl NetTopology {
// Enforce `MAX_ADDRESSES_PER_PEER` before inserting, or skip this entry.
while peer.addrs.len() >= MAX_ADDRESSES_PER_PEER {
let pos = peer.addrs.iter().position(|addr| addr.score() <= initial_score);
let pos = peer.addrs.iter_mut().position(|addr| addr.score() <= initial_score);
if let Some(pos) = pos {
let _ = peer.addrs.remove(pos);
} else {
@@ -336,23 +297,109 @@ impl NetTopology {
expires: now_systime + KADEMLIA_DISCOVERY_EXPIRATION,
back_off_until: now,
next_back_off: FIRST_CONNECT_FAIL_BACKOFF,
score: Mutex::new(AddrScore {
score: AddrScore {
connected_since: None,
score: initial_score,
latest_score_update: now,
}),
},
});
}
anything_changed
}
}
impl KademliaTopology for NetTopology {
type ClosestPeersIter = vec::IntoIter<PeerId>;
type GetProvidersIter = iter::Empty<PeerId>;
fn add_kad_discovered_address(&mut self, peer: PeerId, addr: Multiaddr, ty: KadConnectionType) {
self.add_discovered_addrs(&peer, iter::once((addr, ty == KadConnectionType::Connected)));
}
fn closest_peers(&mut self, target: &Multihash, _max: usize) -> Self::ClosestPeersIter {
// TODO: very inefficient
let mut peers = self.store.keys().cloned().collect::<Vec<_>>();
peers.push(self.local_peer_id.clone());
peers.sort_by(|a, b| {
b.as_ref().distance_with(target).cmp(&a.as_ref().distance_with(target))
});
peers.into_iter()
}
fn add_provider(&mut self, _: Multihash, _: PeerId) {
// We don't implement ADD_PROVIDER/GET_PROVIDERS
}
fn get_providers(&mut self, _: &Multihash) -> Self::GetProvidersIter {
// We don't implement ADD_PROVIDER/GET_PROVIDERS
iter::empty()
}
}
impl IdentifyTopology for NetTopology {
#[inline]
fn add_identify_discovered_addrs<TIter>(&mut self, peer: &PeerId, addrs: TIter)
where
TIter: Iterator<Item = Multiaddr>
{
// These are addresses that peers indicate for themselves.
// The typical use case is:
// - A peer connects to one of our listening points.
// - We send an identify request to it, and it answers with a list of addresses.
// - If later it disconnects, we can try to dial it back through one of these addresses.
self.add_discovered_addrs(peer, addrs.map(move |a| (a, true)));
}
}
impl Topology for NetTopology {
#[inline]
fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec<Multiaddr> {
if peer == &self.local_peer_id {
return self.external_addresses.clone()
}
let peer = if let Some(peer) = self.store.get_mut(peer) {
peer
} else {
return Vec::new()
};
let now_st = SystemTime::now();
let now_is = Instant::now();
let mut list = peer.addrs.iter_mut().filter_map(move |addr| {
let (score, connected) = addr.score_and_is_connected();
if (addr.expires >= now_st && score > 0 && addr.back_off_until < now_is) || connected {
Some((score, &addr.addr))
} else {
None
}
}).collect::<Vec<_>>();
list.sort_by(|a, b| a.0.cmp(&b.0));
// TODO: meh, optimize
list.into_iter().map(|(_, addr)| addr.clone()).collect::<Vec<_>>()
}
fn add_local_external_addrs<TIter>(&mut self, addrs: TIter)
where TIter: Iterator<Item = Multiaddr> {
self.add_external_addrs(addrs)
}
fn local_peer_id(&self) -> &PeerId {
&self.local_peer_id
}
fn local_public_key(&self) -> &PublicKey {
&self.local_public_key
}
fn set_connected(&mut self, peer: &PeerId, endpoint: &ConnectedPoint) {
let addr = match endpoint {
ConnectedPoint::Dialer { address } => address,
ConnectedPoint::Listener { .. } => return
};
/// Indicates the peer store that we're connected to this given address.
///
/// This increases the score of the address that we connected to. Since we assume that only
/// one peer can be reached with any specific address, we also remove all addresses from other
/// peers that match the one we connected to.
pub fn report_connected(&mut self, addr: &Multiaddr, peer: &PeerId) {
let now = Instant::now();
// Just making sure that we have an entry for this peer in `store`, but don't use it.
@@ -364,20 +411,19 @@ impl NetTopology {
addr.connected_now(CONNECTED_MINIMUM_SCORE);
addr.back_off_until = now;
addr.next_back_off = FIRST_CONNECT_FAIL_BACKOFF;
continue;
continue
}
// TODO: a else block would be better, but we get borrowck errors
info_in_store.addrs.push(Addr {
addr: addr.clone(),
expires: SystemTime::now() + EXPIRATION_PUSH_BACK_CONNEC,
back_off_until: now,
next_back_off: FIRST_CONNECT_FAIL_BACKOFF,
score: Mutex::new(AddrScore {
score: AddrScore {
connected_since: Some(now),
latest_score_update: now,
score: CONNECTED_MINIMUM_SCORE,
}),
},
});
} else {
@@ -391,17 +437,16 @@ impl NetTopology {
}
}
/// Indicates the peer store that we're disconnected from an address.
///
/// There's no need to indicate a peer ID, as each address can only have one peer ID.
/// If we were indeed connected to this addr, then we can find out which peer ID it is.
pub fn report_disconnected(&mut self, addr: &Multiaddr, reason: DisconnectReason) {
fn set_disconnected(&mut self, _: &PeerId, endpoint: &ConnectedPoint, reason: DisconnectReason) {
let addr = match endpoint {
ConnectedPoint::Dialer { address } => address,
ConnectedPoint::Listener { .. } => return
};
let score_diff = match reason {
DisconnectReason::NoSlot => -1,
DisconnectReason::FoundBetterAddr => -5,
DisconnectReason::RemoteClosed => -5,
DisconnectReason::Useless => -5,
DisconnectReason::Banned => -5,
DisconnectReason::Replaced => -3,
DisconnectReason::Graceful => -1,
DisconnectReason::Error => -5,
};
for info in self.store.values_mut() {
@@ -414,58 +459,26 @@ impl NetTopology {
if a.expires < expires_push_back {
a.expires = expires_push_back;
}
return;
return
}
}
}
}
/// Indicates the peer store that we failed to connect to an address.
///
/// We don't care about which peer is supposed to be behind that address. If we failed to dial
/// it for a specific peer, we would also fail to dial it for all peers that have this
/// address.
pub fn report_failed_to_connect(&mut self, addr: &Multiaddr) {
fn set_unreachable(&mut self, addr: &Multiaddr) {
for info in self.store.values_mut() {
for a in info.addrs.iter_mut() {
if &a.addr == addr {
a.adjust_score(SCORE_DIFF_ON_FAILED_TO_CONNECT);
trace!(target: "sub-libp2p", "Back off for {} = {:?}", addr, a.next_back_off);
a.back_off_until = Instant::now() + a.next_back_off;
a.next_back_off = cmp::min(a.next_back_off * FAIL_BACKOFF_MULTIPLIER, MAX_BACKOFF);
if &a.addr != addr {
continue
}
a.adjust_score(SCORE_DIFF_ON_FAILED_TO_CONNECT);
trace!(target: "sub-libp2p", "Back off for {} = {:?}", addr, a.next_back_off);
a.back_off_until = Instant::now() + a.next_back_off;
a.next_back_off = cmp::min(a.next_back_off * FAIL_BACKOFF_MULTIPLIER, MAX_BACKOFF);
}
}
}
/// Indicates the peer store that the given peer is useless.
///
/// This decreases the scores of the addresses of that peer.
pub fn report_useless(&mut self, peer: &PeerId) {
for (peer_in_store, info_in_store) in self.store.iter_mut() {
if peer == peer_in_store {
for addr in info_in_store.addrs.iter_mut() {
addr.adjust_score(USELESS_PEER_SCORE_CHANGE);
}
}
}
}
}
/// Reason why we disconnected from a peer.
#[derive(Debug)]
pub enum DisconnectReason {
/// No slot available locally anymore for this peer.
NoSlot,
/// A better way to connect to this peer has been found, therefore we disconnect from
/// the old one.
FoundBetterAddr,
/// The remote closed the connection.
RemoteClosed,
/// This node is considered useless for our needs. This includes time outs.
Useless,
/// The peer has been banned.
Banned,
}
fn peer_access<'a>(store: &'a mut FnvHashMap<PeerId, PeerInfo>, peer: &PeerId) -> &'a mut PeerInfo {
@@ -488,17 +501,17 @@ struct Addr {
next_back_off: Duration,
/// Don't try to connect to this node until `Instant`.
back_off_until: Instant,
score: Mutex<AddrScore>,
score: AddrScore,
}
impl Clone for Addr {
fn clone(&self) -> Addr {
Addr {
addr: self.addr.clone(),
expires: self.expires.clone(),
next_back_off: self.next_back_off.clone(),
back_off_until: self.back_off_until.clone(),
score: Mutex::new(self.score.lock().clone()),
expires: self.expires,
next_back_off: self.next_back_off,
back_off_until: self.back_off_until,
score: self.score.clone(),
}
}
}
@@ -516,58 +529,52 @@ struct AddrScore {
impl Addr {
/// Sets the addr to connected. If the score is lower than the given value, raises it to this
/// value.
fn connected_now(&self, raise_to_min: u32) {
let mut score = self.score.lock();
fn connected_now(&mut self, raise_to_min: u32) {
let now = Instant::now();
Addr::flush(&mut score, now);
score.connected_since = Some(now);
if score.score < raise_to_min {
score.score = raise_to_min;
Addr::flush(&mut self.score, now);
self.score.connected_since = Some(now);
if self.score.score < raise_to_min {
self.score.score = raise_to_min;
}
}
/// Applies a modification to the score.
fn adjust_score(&self, score_diff: i32) {
let mut score = self.score.lock();
Addr::flush(&mut score, Instant::now());
fn adjust_score(&mut self, score_diff: i32) {
Addr::flush(&mut self.score, Instant::now());
if score_diff >= 0 {
score.score = cmp::min(MAX_SCORE, score.score + score_diff as u32);
self.score.score = cmp::min(MAX_SCORE, self.score.score + score_diff as u32);
} else {
score.score = score.score.saturating_sub(-score_diff as u32);
self.score.score = self.score.score.saturating_sub(-score_diff as u32);
}
}
/// Sets the addr to disconnected and applies a modification to the score.
fn disconnected_now(&self, score_diff: i32) {
let mut score = self.score.lock();
Addr::flush(&mut score, Instant::now());
score.connected_since = None;
fn disconnected_now(&mut self, score_diff: i32) {
Addr::flush(&mut self.score, Instant::now());
self.score.connected_since = None;
if score_diff >= 0 {
score.score = cmp::min(MAX_SCORE, score.score + score_diff as u32);
self.score.score = cmp::min(MAX_SCORE, self.score.score + score_diff as u32);
} else {
score.score = score.score.saturating_sub(-score_diff as u32);
self.score.score = self.score.score.saturating_sub(-score_diff as u32);
}
}
/// Returns true if we are connected to this addr.
fn is_connected(&self) -> bool {
let score = self.score.lock();
score.connected_since.is_some()
self.score.connected_since.is_some()
}
/// Returns the score, and true if we are connected to this addr.
fn score_and_is_connected(&self) -> (u32, bool) {
let mut score = self.score.lock();
Addr::flush(&mut score, Instant::now());
let is_connected = score.connected_since.is_some();
(score.score, is_connected)
fn score_and_is_connected(&mut self) -> (u32, bool) {
Addr::flush(&mut self.score, Instant::now());
let is_connected = self.score.connected_since.is_some();
(self.score.score, is_connected)
}
/// Updates `score` and `latest_score_update`, and returns the score.
fn score(&self) -> u32 {
let mut score = self.score.lock();
Addr::flush(&mut score, Instant::now());
score.score
fn score(&mut self) -> u32 {
Addr::flush(&mut self.score, Instant::now());
self.score.score
}
fn flush(score: &mut AddrScore, now: Instant) {
@@ -588,8 +595,8 @@ impl Addr {
/// Divides a `Duration` with a `Duration`. This exists in the stdlib but isn't stable yet.
// TODO: remove this function once stable
fn div_dur_with_dur(a: Duration, b: Duration) -> u32 {
let a_ms = a.as_secs() * 1_000_000 + (a.subsec_nanos() / 1_000) as u64;
let b_ms = b.as_secs() * 1_000_000 + (b.subsec_nanos() / 1_000) as u64;
let a_ms = a.as_secs() * 1_000_000 + u64::from(a.subsec_micros());
let b_ms = b.as_secs() * 1_000_000 + u64::from(b.subsec_micros());
(a_ms / b_ms) as u32
}
@@ -607,8 +614,8 @@ struct SerializedAddr {
score: u32,
}
impl<'a> From<&'a Addr> for SerializedAddr {
fn from(addr: &'a Addr) -> SerializedAddr {
impl<'a> From<&'a mut Addr> for SerializedAddr {
fn from(addr: &'a mut Addr) -> SerializedAddr {
SerializedAddr {
addr: addr.addr.to_string(),
expires: addr.expires,
@@ -618,9 +625,10 @@ impl<'a> From<&'a Addr> for SerializedAddr {
}
/// Attempts to load storage from a file.
/// Ignores any entry equal to `local_peer_id`.
/// Deletes the file and returns an empty map if the file doesn't exist, cannot be opened
/// or is corrupted.
fn try_load(path: impl AsRef<Path>) -> FnvHashMap<PeerId, PeerInfo> {
fn try_load(path: impl AsRef<Path>, local_peer_id: &PeerId) -> FnvHashMap<PeerId, PeerInfo> {
let path = path.as_ref();
if !path.exists() {
debug!(target: "sub-libp2p", "Peer storage file {:?} doesn't exist", path);
@@ -664,7 +672,8 @@ fn try_load(path: impl AsRef<Path>) -> FnvHashMap<PeerId, PeerInfo> {
let data = Cursor::new(first_byte).chain(file);
match serde_json::from_reader::<_, serde_json::Value>(data) {
Ok(serde_json::Value::Null) => Default::default(),
Ok(serde_json::Value::Object(map)) => deserialize_tolerant(map.into_iter()),
Ok(serde_json::Value::Object(map)) =>
deserialize_tolerant(map.into_iter(), local_peer_id),
Ok(_) | Err(_) => {
// The `Ok(_)` case means that the file doesn't contain a map.
let _ = fs::remove_file(path);
@@ -676,9 +685,10 @@ fn try_load(path: impl AsRef<Path>) -> FnvHashMap<PeerId, PeerInfo> {
/// Attempts to turn a deserialized version of the storage into the final version.
///
/// Skips entries that are invalid.
/// Skips entries that are invalid or equal to `local_peer_id`.
fn deserialize_tolerant(
iter: impl Iterator<Item = (String, serde_json::Value)>
iter: impl Iterator<Item = (String, serde_json::Value)>,
local_peer_id: &PeerId
) -> FnvHashMap<PeerId, PeerInfo> {
let now = Instant::now();
let now_systime = SystemTime::now();
@@ -690,6 +700,10 @@ fn deserialize_tolerant(
Err(_) => continue,
};
if &peer == local_peer_id {
continue
}
let info: SerializedPeerInfo = match serde_json::from_value(info) {
Ok(i) => i,
Err(_) => continue,
@@ -711,16 +725,16 @@ fn deserialize_tolerant(
expires: addr.expires,
next_back_off: FIRST_CONNECT_FAIL_BACKOFF,
back_off_until: now,
score: Mutex::new(AddrScore {
score: AddrScore {
connected_since: None,
score: addr.score,
latest_score_update: now,
}),
},
});
}
if addrs.is_empty() {
continue;
continue
}
out.insert(peer, PeerInfo { addrs });
@@ -732,18 +746,21 @@ fn deserialize_tolerant(
/// Attempts to turn a deserialized version of the storage into the final version.
///
/// Skips entries that are invalid or expired.
fn serialize<W: Write>(out: W, map: &FnvHashMap<PeerId, PeerInfo>) -> Result<(), IoError> {
fn serialize<W: Write>(out: W, map: &mut FnvHashMap<PeerId, PeerInfo>) -> Result<(), IoError> {
let now = SystemTime::now();
let array: FnvHashMap<_, _> = map.iter().filter_map(|(peer, info)| {
let array: FnvHashMap<_, _> = map.iter_mut().filter_map(|(peer, info)| {
if info.addrs.is_empty() {
return None
}
let peer = peer.to_base58();
let info = SerializedPeerInfo {
addrs: info.addrs.iter()
.filter(|a| a.expires > now || a.is_connected())
.map(Into::into)
addrs: info.addrs.iter_mut()
.filter_map(|a| if a.expires > now || a.is_connected() {
Some(a.into())
} else {
None
})
.collect(),
};
+16 -16
View File
@@ -14,33 +14,33 @@
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use libp2p::{self, PeerId, Transport, mplex, secio, yamux};
use libp2p::core::{either, upgrade, transport::boxed::Boxed, muxing::StreamMuxerBox};
use libp2p::transport_timeout::TransportTimeout;
use std::time::Duration;
use std::usize;
use futures::prelude::*;
use libp2p::{self, InboundUpgradeExt, OutboundUpgradeExt, PeerId, Transport, mplex, secio, yamux};
use libp2p::core::{self, transport::boxed::Boxed, muxing::StreamMuxerBox};
use std::{io, time::Duration, usize};
/// Builds the transport that serves as a common ground for all connections.
pub fn build_transport(
local_private_key: secio::SecioKeyPair
) -> Boxed<(PeerId, StreamMuxerBox)> {
) -> Boxed<(PeerId, StreamMuxerBox), io::Error> {
let mut mplex_config = mplex::MplexConfig::new();
mplex_config.max_buffer_len_behaviour(mplex::MaxBufferBehaviour::Block);
mplex_config.max_buffer_len(usize::MAX);
let base = libp2p::CommonTransport::new()
// TODO: rework the transport creation (https://github.com/libp2p/rust-libp2p/issues/783)
libp2p::tcp::TcpConfig::new()
.with_upgrade(secio::SecioConfig::new(local_private_key))
.and_then(move |out, endpoint| {
let upgrade = upgrade::or(
upgrade::map(yamux::Config::default(), either::EitherOutput::First),
upgrade::map(mplex_config, either::EitherOutput::Second),
);
let peer_id = out.remote_key.into_peer_id();
let upgrade = upgrade::map(upgrade, move |muxer| (peer_id, muxer));
upgrade::apply(out.stream, upgrade, endpoint.into())
})
.map(|(id, muxer), _| (id, StreamMuxerBox::new(muxer)));
let peer_id2 = peer_id.clone();
let upgrade = core::upgrade::SelectUpgrade::new(yamux::Config::default(), mplex_config)
.map_inbound(move |muxer| (peer_id, muxer))
.map_outbound(move |muxer| (peer_id2, muxer));
TransportTimeout::new(base, Duration::from_secs(20))
core::upgrade::apply(out.stream, upgrade, endpoint)
.map(|(id, muxer)| (id, core::muxing::StreamMuxerBox::new(muxer)))
})
.with_timeout(Duration::from_secs(20))
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
.boxed()
}
@@ -0,0 +1,83 @@
// Copyright 2019 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
#[macro_use]
extern crate futures;
#[macro_use]
extern crate substrate_network_libp2p;
extern crate tokio;
use futures::{future, prelude::*};
use std::{io, iter};
use substrate_network_libp2p::ServiceEvent;
/// Builds two services. The second one has the first one as its bootstrap node.
/// This is to be used only for testing, and a panic will happen if something goes wrong.
fn build_two_nodes() -> (substrate_network_libp2p::Service, substrate_network_libp2p::Service) {
let service1 = {
let config = substrate_network_libp2p::NetworkConfiguration {
listen_addresses: vec![multiaddr![Ip4([127, 0, 0, 1]), Tcp(0u16)]],
..substrate_network_libp2p::NetworkConfiguration ::default()
};
let proto = substrate_network_libp2p::RegisteredProtocol::new(*b"tst", &[1]);
substrate_network_libp2p::start_service(config, iter::once(proto)).unwrap()
};
let service2 = {
let mut bootnode = service1.listeners().next().unwrap().clone();
bootnode.append(libp2p::multiaddr::Protocol::P2p(service1.peer_id().clone().into()));
let config = substrate_network_libp2p::NetworkConfiguration {
listen_addresses: vec![multiaddr![Ip4([127, 0, 0, 1]), Tcp(0u16)]],
boot_nodes: vec![bootnode.to_string()],
..substrate_network_libp2p::NetworkConfiguration::default()
};
let proto = substrate_network_libp2p::RegisteredProtocol::new(*b"tst", &[1]);
substrate_network_libp2p::start_service(config, iter::once(proto)).unwrap()
};
(service1, service2)
}
#[test]
fn basic_two_nodes_connectivity() {
let (mut service1, mut service2) = build_two_nodes();
let fut1 = future::poll_fn(move || -> io::Result<_> {
match try_ready!(service1.poll()) {
Some(ServiceEvent::OpenedCustomProtocol { protocol, version, .. }) => {
assert_eq!(protocol, *b"tst");
assert_eq!(version, 1);
Ok(Async::Ready(()))
},
_ => panic!(),
}
});
let fut2 = future::poll_fn(move || -> io::Result<_> {
match try_ready!(service2.poll()) {
Some(ServiceEvent::OpenedCustomProtocol { protocol, version, .. }) => {
assert_eq!(protocol, *b"tst");
assert_eq!(version, 1);
Ok(Async::Ready(()))
},
_ => panic!(),
}
});
let combined = fut1.select(fut2).map_err(|(err, _)| err);
tokio::runtime::Runtime::new().unwrap().block_on_all(combined).unwrap();
}
+2 -20
View File
@@ -260,23 +260,11 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> ManageNetwork
}
fn deny_unreserved_peers(&self) {
// This method can disconnect nodes, in which case we have to properly close them in the
// protocol.
let disconnected = self.network.lock().deny_unreserved_peers();
let mut net_sync = NetSyncIo::new(&self.network, self.protocol_id);
for node_index in disconnected {
self.handler.on_peer_disconnected(&mut net_sync, node_index)
}
self.network.lock().deny_unreserved_peers();
}
fn remove_reserved_peer(&self, peer: PeerId) {
// This method can disconnect a node, in which case we have to properly close it in the
// protocol.
let disconnected = self.network.lock().remove_reserved_peer(peer);
if let Some(node_index) = disconnected {
let mut net_sync = NetSyncIo::new(&self.network, self.protocol_id);
self.handler.on_peer_disconnected(&mut net_sync, node_index)
}
self.network.lock().remove_reserved_peer(peer);
}
fn add_reserved_peer(&self, peer: String) -> Result<(), String> {
@@ -388,12 +376,6 @@ fn run_thread<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT>(
let mut net_sync = NetSyncIo::new(&network_service, protocol_id);
match event {
NetworkServiceEvent::NodeClosed { node_index, closed_custom_protocols } => {
if !closed_custom_protocols.is_empty() {
debug_assert_eq!(closed_custom_protocols, &[protocol_id]);
protocol.on_peer_disconnected(&mut net_sync, node_index);
}
}
NetworkServiceEvent::ClosedCustomProtocols { node_index, protocols } => {
if !protocols.is_empty() {
debug_assert_eq!(protocols, &[protocol_id]);