Switch to libp2p master (#931)

This commit is contained in:
Pierre Krieger
2018-11-05 19:22:26 +01:00
committed by Gav Wood
parent 94e2589f1e
commit fcae7ac582
7 changed files with 455 additions and 340 deletions
@@ -15,7 +15,7 @@
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use bytes::Bytes;
use libp2p::core::{Multiaddr, ConnectionUpgrade, Endpoint};
use libp2p::core::{ConnectionUpgrade, Endpoint};
use libp2p::tokio_codec::Framed;
use std::{collections::VecDeque, io, vec::IntoIter as VecIntoIter};
use futures::{prelude::*, future, stream, task};
@@ -203,8 +203,7 @@ where TSubstream: AsyncRead + AsyncWrite,
self,
socket: TSubstream,
protocol_version: Self::UpgradeIdentifier,
_: Endpoint,
_: &Multiaddr
_: Endpoint
) -> Self::Future {
let framed = Framed::new(socket, UviBytes::default());
@@ -273,13 +272,12 @@ where TSubstream: AsyncRead + AsyncWrite,
self,
socket: TSubstream,
upgrade_identifier: Self::UpgradeIdentifier,
endpoint: Endpoint,
remote_addr: &Multiaddr
endpoint: Endpoint
) -> Self::Future {
let (protocol_index, inner_proto_id) = upgrade_identifier;
self.0.into_iter()
.nth(protocol_index)
.expect("invalid protocol index ; programmer logic error")
.upgrade(socket, inner_proto_id, endpoint, remote_addr)
.upgrade(socket, inner_proto_id, endpoint)
}
}
@@ -19,7 +19,6 @@ use custom_proto::{RegisteredProtocols, RegisteredProtocolSubstream};
use futures::{prelude::*, task};
use libp2p::core::{ConnectionUpgrade, Endpoint, PeerId, PublicKey, upgrade};
use libp2p::core::nodes::handled_node::{NodeHandler, NodeHandlerEndpoint, NodeHandlerEvent};
use libp2p::core::nodes::swarm::ConnectedPoint;
use libp2p::kad::{KadConnecConfig, KadFindNodeRespond, KadIncomingRequest, KadConnecController};
use libp2p::{identify, ping};
use parking_lot::Mutex;
@@ -54,9 +53,6 @@ pub struct SubstrateNodeHandler<TSubstream> {
/// Substreams open for "custom" protocols (eg. dot).
custom_protocols_substreams: Vec<RegisteredProtocolSubstream<TSubstream>>,
/// Address of the node.
address: Multiaddr,
/// Substream open for Kademlia, if any.
kademlia_substream: Option<(KadConnecController, Box<Stream<Item = KadIncomingRequest, Error = IoError> + Send>)>,
/// If true, we need to send back a `KadOpen` event on the stream (if Kademlia is open).
@@ -260,20 +256,14 @@ where TSubstream: AsyncRead + AsyncWrite + Send + 'static,
{
/// Creates a new node handler.
#[inline]
pub fn new(registered_custom: Arc<RegisteredProtocols>, endpoint: ConnectedPoint) -> Self {
pub fn new(registered_custom: Arc<RegisteredProtocols>) -> Self {
let registered_custom_len = registered_custom.len();
let queued_dial_upgrades = registered_custom.0
.iter()
.map(|proto| UpgradePurpose::Custom(proto.id()))
.collect();
let address = match endpoint {
ConnectedPoint::Dialer { address } => address.clone(),
ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr.clone(),
};
SubstrateNodeHandler {
address,
custom_protocols_substreams: Vec::with_capacity(registered_custom_len),
kademlia_substream: None,
need_report_kad_open: false,
@@ -294,18 +284,19 @@ where TSubstream: AsyncRead + AsyncWrite + Send + 'static,
}
}
impl<TSubstream> NodeHandler<TSubstream> for SubstrateNodeHandler<TSubstream>
impl<TSubstream> NodeHandler for SubstrateNodeHandler<TSubstream>
where TSubstream: AsyncRead + AsyncWrite + Send + 'static,
{
type InEvent = SubstrateInEvent;
type OutEvent = SubstrateOutEvent<TSubstream>;
type OutboundOpenInfo = ();
type Substream = TSubstream;
fn inject_substream(&mut self, substream: TSubstream, endpoint: NodeHandlerEndpoint<Self::OutboundOpenInfo>) {
// For listeners, propose all the possible upgrades.
if endpoint == NodeHandlerEndpoint::Listener {
let listener_upgrade = listener_upgrade!(self);
let upgrade = upgrade::apply(substream, listener_upgrade, Endpoint::Listener, &self.address);
let upgrade = upgrade::apply(substream, listener_upgrade, Endpoint::Listener);
self.upgrades_in_progress_listen.push(Box::new(upgrade) as Box<_>);
// Since we pushed to `upgrades_in_progress_listen`, we have to notify the task.
if let Some(task) = self.to_notify.take() {
@@ -338,26 +329,22 @@ where TSubstream: AsyncRead + AsyncWrite + Send + 'static,
return;
};
// TODO: shouldn't be &self.address ; requires a change in libp2p
let upgrade = upgrade::apply(substream, wanted, Endpoint::Dialer, &self.address);
let upgrade = upgrade::apply(substream, wanted, Endpoint::Dialer);
self.upgrades_in_progress_dial.push((purpose, Box::new(upgrade) as Box<_>));
}
UpgradePurpose::Kad => {
let wanted = upgrade::map(KadConnecConfig::new(), move |(c, s)| FinalUpgrade::Kad(c, s));
// TODO: shouldn't be &self.address ; requires a change in libp2p
let upgrade = upgrade::apply(substream, wanted, Endpoint::Dialer, &self.address);
let upgrade = upgrade::apply(substream, wanted, Endpoint::Dialer);
self.upgrades_in_progress_dial.push((purpose, Box::new(upgrade) as Box<_>));
}
UpgradePurpose::Identify => {
let wanted = upgrade::map(identify::IdentifyProtocolConfig, move |i| FinalUpgrade::from(i));
// TODO: shouldn't be &self.address ; requires a change in libp2p
let upgrade = upgrade::apply(substream, wanted, Endpoint::Dialer, &self.address);
let upgrade = upgrade::apply(substream, wanted, Endpoint::Dialer);
self.upgrades_in_progress_dial.push((purpose, Box::new(upgrade) as Box<_>));
}
UpgradePurpose::Ping => {
let wanted = upgrade::map(ping::Ping::default(), move |p| FinalUpgrade::from(p));
// TODO: shouldn't be &self.address ; requires a change in libp2p
let upgrade = upgrade::apply(substream, wanted, Endpoint::Dialer, &self.address);
let upgrade = upgrade::apply(substream, wanted, Endpoint::Dialer);
self.upgrades_in_progress_dial.push((purpose, Box::new(upgrade) as Box<_>));
}
};
@@ -733,6 +720,9 @@ where TSubstream: AsyncRead + AsyncWrite + Send + 'static,
},
// We don't care about Kademlia pings, they are unused.
Ok(Async::Ready(Some(KadIncomingRequest::PingPong))) => {},
// Other Kademlia messages are unimplemented.
Ok(Async::Ready(Some(KadIncomingRequest::GetProviders { .. }))) => {},
Ok(Async::Ready(Some(KadIncomingRequest::AddProvider { .. }))) => {},
Ok(Async::NotReady) => {
self.kademlia_substream = Some((controller, stream));
break;
@@ -21,7 +21,7 @@ use futures::{prelude::*, task, Stream};
use futures::sync::{oneshot, mpsc};
use libp2p::{Multiaddr, PeerId};
use libp2p::core::{Endpoint, PublicKey};
use libp2p::core::nodes::swarm::ConnectedPoint;
use libp2p::core::nodes::ConnectedPoint;
use libp2p::kad::{KadSystem, KadSystemConfig, KadConnecController, KadPeer};
use libp2p::kad::{KadConnectionType, KadQueryEvent};
use parking_lot::Mutex;
+108 -151
View File
@@ -20,15 +20,12 @@ use fnv::FnvHashMap;
use futures::{prelude::*, Stream};
use libp2p::{Multiaddr, multiaddr::Protocol, PeerId};
use libp2p::core::{muxing, Endpoint, PublicKey};
use libp2p::core::nodes::node::Substream;
use libp2p::core::nodes::swarm::{ConnectedPoint, Swarm as Libp2pSwarm, HandlerFactory};
use libp2p::core::nodes::swarm::{SwarmEvent as Libp2pSwarmEvent, Peer as SwarmPeer};
use libp2p::core::nodes::{ConnectedPoint, RawSwarm, RawSwarmEvent, Peer as SwarmPeer, Substream};
use libp2p::core::transport::boxed::Boxed;
use libp2p::kad::{KadConnecController, KadFindNodeRespond};
use libp2p::secio;
use node_handler::{SubstrateOutEvent, SubstrateNodeHandler, SubstrateInEvent, IdentificationRequest};
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::{mem, sync::Arc};
use std::{io, mem, sync::Arc};
use transport;
use {Error, NodeIndex, ProtocolId};
@@ -47,10 +44,11 @@ pub fn start_swarm(
let transport = transport::build_transport(local_private_key);
// Build the underlying libp2p swarm.
let swarm = Libp2pSwarm::with_handler_builder(transport, HandlerBuilder(Arc::new(registered_custom)));
let swarm = RawSwarm::new(transport);
Ok(Swarm {
swarm,
registered_custom: Arc::new(registered_custom),
local_public_key,
local_peer_id,
listening_addrs: Vec::new(),
@@ -60,20 +58,6 @@ pub fn start_swarm(
})
}
/// Dummy structure that exists because we need to be able to express the type. Otherwise we would
/// use a closure.
#[derive(Clone)]
struct HandlerBuilder(Arc<RegisteredProtocols>);
impl HandlerFactory for HandlerBuilder
{
type Handler = SubstrateNodeHandler<Substream<Muxer>>;
#[inline]
fn new_handler(&self, addr: ConnectedPoint) -> Self::Handler {
SubstrateNodeHandler::new(self.0.clone(), addr)
}
}
/// Event produced by the swarm.
pub enum SwarmEvent {
/// We have successfully connected to a node.
@@ -117,7 +101,7 @@ pub enum SwarmEvent {
/// Address that failed.
address: Multiaddr,
/// Reason why we failed.
error: IoError,
error: io::Error,
},
/// Report information about the node.
@@ -197,20 +181,23 @@ pub enum SwarmEvent {
/// Index of the node.
node_index: NodeIndex,
/// Reason why it has been closed. `Ok` means that it's been closed gracefully.
result: Result<(), IoError>,
result: Result<(), io::Error>,
},
}
/// Network swarm. Must be polled regularly in order for the networking to work.
pub struct Swarm {
/// Stream of events of the swarm.
swarm: Libp2pSwarm<
swarm: RawSwarm<
Boxed<(PeerId, Muxer)>,
SubstrateInEvent,
SubstrateOutEvent<Substream<Muxer>>,
HandlerBuilder
SubstrateNodeHandler<Substream<Muxer>>
>,
/// List of registered protocols. Used when we open or receive a new connection.
registered_custom: Arc<RegisteredProtocols>,
/// Public key of the local node.
local_public_key: PublicKey,
@@ -340,7 +327,7 @@ impl Swarm {
SwarmPeer::NotConnected(peer) => {
trace!(target: "sub-libp2p", "Starting to connect to {:?} through {}",
peer_id, addr);
match peer.connect(addr) {
match peer.connect(addr, SubstrateNodeHandler::new(self.registered_custom.clone())) {
Ok(_) => Ok(false),
Err(_) => Err(()),
}
@@ -351,7 +338,7 @@ impl Swarm {
/// Start dialing an address, not knowing which peer ID to expect.
#[inline]
pub fn dial(&mut self, addr: Multiaddr) -> Result<(), Multiaddr> {
self.swarm.dial(addr)
self.swarm.dial(addr, SubstrateNodeHandler::new(self.registered_custom.clone()))
}
/// After receiving a `NodePending` event, you should call either `accept_node` or `drop_node`
@@ -479,124 +466,6 @@ impl Swarm {
);
}
/// Processes an event received by the swarm.
///
/// Optionally returns an event to report back to the outside.
///
/// > **Note**: Must be called from inside `poll()`, otherwise it will panic. This method
/// > shouldn't be made public because of this requirement.
fn process_network_event(
&mut self,
event: Libp2pSwarmEvent<Boxed<(PeerId, Muxer)>, SubstrateOutEvent<Substream<Muxer>>>
) -> Option<SwarmEvent> {
match event {
Libp2pSwarmEvent::Connected { peer_id, endpoint } => {
let node_index = self.next_node_index.clone();
self.next_node_index += 1;
self.node_by_peer.insert(peer_id.clone(), node_index);
self.nodes_info.insert(node_index, NodeInfo {
peer_id: peer_id.clone(),
endpoint: match endpoint {
ConnectedPoint::Listener { .. } => Endpoint::Listener,
ConnectedPoint::Dialer { .. } => Endpoint::Dialer,
},
open_protocols: Vec::new(),
});
return Some(SwarmEvent::NodePending {
node_index,
peer_id,
endpoint
});
}
Libp2pSwarmEvent::Replaced { peer_id, endpoint, .. } => {
let node_index = *self.node_by_peer.get(&peer_id)
.expect("node_by_peer is always kept in sync with the inner swarm");
let infos = self.nodes_info.get_mut(&node_index)
.expect("nodes_info is always kept in sync with the swarm");
debug_assert_eq!(infos.peer_id, peer_id);
infos.endpoint = match endpoint {
ConnectedPoint::Listener { .. } => Endpoint::Listener,
ConnectedPoint::Dialer { .. } => Endpoint::Dialer,
};
let closed_custom_protocols = mem::replace(&mut infos.open_protocols, Vec::new());
return Some(SwarmEvent::Reconnected {
node_index,
endpoint,
closed_custom_protocols,
});
},
Libp2pSwarmEvent::NodeClosed { peer_id, .. } => {
debug!(target: "sub-libp2p", "Connection to {:?} closed gracefully", peer_id);
let node_index = self.node_by_peer.remove(&peer_id)
.expect("node_by_peer is always kept in sync with the inner swarm");
let infos = self.nodes_info.remove(&node_index)
.expect("nodes_info is always kept in sync with the inner swarm");
debug_assert_eq!(infos.peer_id, peer_id);
return Some(SwarmEvent::NodeClosed {
node_index,
peer_id,
closed_custom_protocols: infos.open_protocols,
});
},
Libp2pSwarmEvent::NodeError { peer_id, error, .. } => {
debug!(target: "sub-libp2p", "Closing {:?} because of error: {:?}", peer_id, error);
let node_index = self.node_by_peer.remove(&peer_id)
.expect("node_by_peer is always kept in sync with the inner swarm");
let infos = self.nodes_info.remove(&node_index)
.expect("nodes_info is always kept in sync with the inner swarm");
debug_assert_eq!(infos.peer_id, peer_id);
return Some(SwarmEvent::NodeClosed {
node_index,
peer_id,
closed_custom_protocols: infos.open_protocols,
});
},
Libp2pSwarmEvent::DialError { multiaddr, error, .. } =>
return Some(SwarmEvent::DialFail {
address: multiaddr,
error,
}),
Libp2pSwarmEvent::UnknownPeerDialError { multiaddr, error } =>
return Some(SwarmEvent::DialFail {
address: multiaddr,
error,
}),
Libp2pSwarmEvent::PublicKeyMismatch {
actual_peer_id,
multiaddr,
expected_peer_id,
..
} => {
debug!(target: "sub-libp2p", "When dialing {:?} through {}, public key mismatch, \
actual = {:?}", expected_peer_id, multiaddr, actual_peer_id);
return Some(SwarmEvent::DialFail {
address: multiaddr,
error: IoError::new(IoErrorKind::Other, "Public key mismatch"),
});
},
Libp2pSwarmEvent::ListenerClosed { listen_addr, result, .. } => {
warn!(target: "sub-libp2p", "Listener closed for {}: {:?}", listen_addr, result);
if self.swarm.listeners().count() == 0 {
warn!(target: "sub-libp2p", "No listener left");
}
},
Libp2pSwarmEvent::NodeEvent { peer_id, event } =>
if let Some(event) = self.handle_node_event(peer_id, event) {
return Some(event);
},
Libp2pSwarmEvent::IncomingConnection { listen_addr, send_back_addr } =>
trace!(target: "sub-libp2p", "Incoming connection with {} on listener {}",
send_back_addr, listen_addr),
Libp2pSwarmEvent::IncomingConnectionError { listen_addr, send_back_addr, error } =>
trace!(target: "sub-libp2p", "Incoming connection with {} on listener {} \
errored: {:?}", send_back_addr, listen_addr, error),
}
None
}
/// Processes an event obtained by a node in the swarm.
///
/// Optionally returns an event that the service must emit.
@@ -698,17 +567,105 @@ impl Swarm {
impl Stream for Swarm {
type Item = SwarmEvent;
type Error = IoError;
type Error = io::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
loop {
match self.swarm.poll() {
Async::Ready(Some(event)) =>
if let Some(event) = self.process_network_event(event) {
return Ok(Async::Ready(Some(event)));
}
let (peer_id, node_event) = match self.swarm.poll() {
Async::Ready(RawSwarmEvent::Connected { peer_id, endpoint }) => {
let node_index = self.next_node_index.clone();
self.next_node_index += 1;
self.node_by_peer.insert(peer_id.clone(), node_index);
self.nodes_info.insert(node_index, NodeInfo {
peer_id: peer_id.clone(),
endpoint: match endpoint {
ConnectedPoint::Listener { .. } => Endpoint::Listener,
ConnectedPoint::Dialer { .. } => Endpoint::Dialer,
},
open_protocols: Vec::new(),
});
return Ok(Async::Ready(Some(SwarmEvent::NodePending {
node_index,
peer_id,
endpoint
})));
}
Async::Ready(RawSwarmEvent::Replaced { peer_id, endpoint, .. }) => {
let node_index = *self.node_by_peer.get(&peer_id)
.expect("node_by_peer is always kept in sync with the inner swarm");
let infos = self.nodes_info.get_mut(&node_index)
.expect("nodes_info is always kept in sync with the swarm");
debug_assert_eq!(infos.peer_id, peer_id);
infos.endpoint = match endpoint {
ConnectedPoint::Listener { .. } => Endpoint::Listener,
ConnectedPoint::Dialer { .. } => Endpoint::Dialer,
};
let closed_custom_protocols = mem::replace(&mut infos.open_protocols, Vec::new());
return Ok(Async::Ready(Some(SwarmEvent::Reconnected {
node_index,
endpoint,
closed_custom_protocols,
})));
},
Async::Ready(RawSwarmEvent::NodeClosed { peer_id, .. }) => {
debug!(target: "sub-libp2p", "Connection to {:?} closed gracefully", peer_id);
let node_index = self.node_by_peer.remove(&peer_id)
.expect("node_by_peer is always kept in sync with the inner swarm");
let infos = self.nodes_info.remove(&node_index)
.expect("nodes_info is always kept in sync with the inner swarm");
debug_assert_eq!(infos.peer_id, peer_id);
return Ok(Async::Ready(Some(SwarmEvent::NodeClosed {
node_index,
peer_id,
closed_custom_protocols: infos.open_protocols,
})));
},
Async::Ready(RawSwarmEvent::NodeError { peer_id, error, .. }) => {
debug!(target: "sub-libp2p", "Closing {:?} because of error: {:?}", peer_id, error);
let node_index = self.node_by_peer.remove(&peer_id)
.expect("node_by_peer is always kept in sync with the inner swarm");
let infos = self.nodes_info.remove(&node_index)
.expect("nodes_info is always kept in sync with the inner swarm");
debug_assert_eq!(infos.peer_id, peer_id);
return Ok(Async::Ready(Some(SwarmEvent::NodeClosed {
node_index,
peer_id,
closed_custom_protocols: infos.open_protocols,
})));
},
Async::Ready(RawSwarmEvent::DialError { multiaddr, error, .. }) =>
return Ok(Async::Ready(Some(SwarmEvent::DialFail {
address: multiaddr,
error,
}))),
Async::Ready(RawSwarmEvent::UnknownPeerDialError { multiaddr, error, .. }) =>
return Ok(Async::Ready(Some(SwarmEvent::DialFail {
address: multiaddr,
error,
}))),
Async::Ready(RawSwarmEvent::ListenerClosed { listen_addr, result, .. }) => {
warn!(target: "sub-libp2p", "Listener closed for {}: {:?}", listen_addr, result);
continue;
},
Async::Ready(RawSwarmEvent::NodeEvent { peer_id, event }) => (peer_id, event),
Async::Ready(RawSwarmEvent::IncomingConnection(incoming)) => {
trace!(target: "sub-libp2p", "Incoming connection with {} on listener {}",
incoming.send_back_addr(), incoming.listen_addr());
incoming.accept(SubstrateNodeHandler::new(self.registered_custom.clone()));
continue;
},
Async::Ready(RawSwarmEvent::IncomingConnectionError { listen_addr, send_back_addr, error }) => {
trace!(target: "sub-libp2p", "Incoming connection with {} on listener {} \
errored: {:?}", send_back_addr, listen_addr, error);
continue;
},
Async::NotReady => return Ok(Async::NotReady),
Async::Ready(None) => unreachable!("The Swarm stream never ends"),
};
if let Some(event) = self.handle_node_event(peer_id, node_event) {
return Ok(Async::Ready(Some(event)));
}
}
}
@@ -30,14 +30,14 @@ pub fn build_transport(
let base = libp2p::CommonTransport::new()
.with_upgrade(secio::SecioConfig::new(local_private_key))
.and_then(move |out, endpoint, client_addr| {
.and_then(move |out, endpoint| {
let upgrade = upgrade::or(
upgrade::map(yamux::Config::default(), either::EitherOutput::First),
upgrade::map(mplex_config, either::EitherOutput::Second),
);
let peer_id = out.remote_key.into_peer_id();
let upgrade = upgrade::map(upgrade, move |muxer| (peer_id, muxer));
upgrade::apply(out.stream, upgrade, endpoint, client_addr)
upgrade::apply(out.stream, upgrade, endpoint.into())
})
.map(|(id, muxer), _| (id, StreamMuxerBox::new(muxer)));