Switch to the master branch of libp2p (#427)

* Switch to the master branch of libp2p

* Fixed having to clear manually

* Fix TTL of bootstrap nodes

* Speed up compilation time

* Update libp2p

* Remove obsolete comment
This commit is contained in:
Pierre Krieger
2018-07-28 11:42:26 +02:00
committed by Gav Wood
parent 93fb824dfc
commit b1a3552aa0
7 changed files with 191 additions and 214 deletions
@@ -11,7 +11,7 @@ bytes = "0.4"
error-chain = { version = "0.12", default-features = false }
fnv = "1.0"
futures = "0.1"
libp2p = { git = "https://github.com/tomaka/libp2p-rs", rev = "fad12c89ea2b6f1f6420557db6e9305fb03f9f67", default-features = false, features = ["libp2p-secio", "libp2p-secio-secp256k1"] }
libp2p = { git = "https://github.com/tomaka/libp2p-rs", branch = "polkadot-2", default-features = false, features = ["libp2p-secio", "libp2p-secio-secp256k1"] }
ethcore-io = { git = "https://github.com/paritytech/parity.git" }
ethkey = { git = "https://github.com/paritytech/parity.git" }
ethereum-types = "0.3"
@@ -20,10 +20,10 @@ parking_lot = "0.5"
libc = "0.2"
log = "0.3"
rand = "0.5.0"
tokio-core = "0.1"
tokio = "0.1"
tokio-io = "0.1"
tokio-timer = "0.2"
varint = { git = "https://github.com/libp2p/rust-libp2p" }
varint = { git = "https://github.com/tomaka/libp2p-rs", branch = "polkadot-2" }
[dev-dependencies]
assert_matches = "1.2"
@@ -19,9 +19,8 @@
extern crate parking_lot;
extern crate fnv;
#[macro_use]
extern crate futures;
extern crate tokio_core;
extern crate tokio;
extern crate tokio_io;
extern crate tokio_timer;
extern crate ethkey;
@@ -658,10 +658,6 @@ impl NetworkState {
peer_info.id,
peer_info.kad_connec.is_alive(),
peer_info.protocols.iter().filter(|c| c.1.is_alive()).count());
// TODO: we manually clear the connections as a work-around for
// networking bugs ; normally it should automatically drop
for c in peer_info.protocols.iter() { c.1.clear(); }
peer_info.kad_connec.clear();
let old = connections.peer_by_nodeid.remove(&peer_info.id);
debug_assert_eq!(old, Some(who));
}
@@ -852,11 +848,11 @@ fn parse_and_add_to_node_store(
NodeStore::Memory(ref node_store) =>
node_store
.peer_or_create(&who)
.add_addr(addr, Duration::from_secs(100000 * 365 * 24 * 3600)),
.set_addr_ttl(addr, Duration::from_secs(100000 * 365 * 24 * 3600)),
NodeStore::Json(ref node_store) =>
node_store
.peer_or_create(&who)
.add_addr(addr, Duration::from_secs(100000 * 365 * 24 * 3600)),
.set_addr_ttl(addr, Duration::from_secs(100000 * 365 * 24 * 3600)),
}
Ok(who)
@@ -41,7 +41,7 @@ use std::thread;
use std::time::{Duration, Instant};
use futures::{future, Future, Stream, IntoFuture};
use futures::sync::{mpsc, oneshot};
use tokio_core::reactor::{Core, Handle};
use tokio::runtime::current_thread;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_timer::{Interval, Deadline};
@@ -118,7 +118,7 @@ impl NetworkService {
local_peer_id: local_peer_id.clone(),
kbuckets_timeout: Duration::from_secs(600),
request_timeout: Duration::from_secs(10),
known_initial_peers: network_state.known_peers().collect(),
known_initial_peers: network_state.known_peers(),
});
let shared = Arc::new(Shared {
@@ -191,8 +191,8 @@ impl NetworkService {
let shared = self.shared.clone();
let join_handle = thread::spawn(move || {
// Tokio core that is going to run everything in this thread.
let mut core = match Core::new() {
// Tokio runtime that is going to run everything in this thread.
let mut runtime = match current_thread::Runtime::new() {
Ok(c) => c,
Err(err) => {
let _ = init_tx.send(Err(err.into()));
@@ -200,7 +200,7 @@ impl NetworkService {
}
};
let fut = match init_thread(core.handle(), shared,
let fut = match init_thread(shared,
timeouts_register_rx, close_rx) {
Ok(future) => {
debug!(target: "sub-libp2p", "Successfully started networking service");
@@ -213,7 +213,7 @@ impl NetworkService {
}
};
match core.run(fut) {
match runtime.block_on(fut) {
Ok(()) => debug!(target: "sub-libp2p", "libp2p future finished"),
Err(err) => error!(target: "sub-libp2p", "error while running libp2p: {:?}", err),
}
@@ -395,7 +395,6 @@ impl NetworkContext for NetworkContextImpl {
/// - `timeouts_register_rx` should receive newly-registered timeouts.
/// - `close_rx` should be triggered when we want to close the network.
fn init_thread(
core: Handle,
shared: Arc<Shared>,
timeouts_register_rx: mpsc::UnboundedReceiver<
(Duration, (Arc<NetworkProtocolHandler + Send + Sync + 'static>, ProtocolId, TimerToken))
@@ -405,7 +404,6 @@ fn init_thread(
// Build the transport layer.
let transport = {
let base = transport::build_transport(
core.clone(),
transport::UnencryptedAllowed::Denied,
shared.network_state.local_private_key().clone()
);
@@ -535,7 +533,7 @@ fn init_thread(
// Build the timeouts system for the `register_timeout` function.
// (note: this has nothing to do with socket timeouts)
let timeouts = timeouts::build_timeouts_stream(core.clone(), timeouts_register_rx)
let timeouts = timeouts::build_timeouts_stream(timeouts_register_rx)
.for_each({
let shared = shared.clone();
move |(handler, protocol_id, timer_token)| {
@@ -630,7 +628,7 @@ fn listener_handle<'a, C>(
match shared.network_state.ping_connection(node_id.clone()) {
Ok((_, ping_connec)) => {
trace!(target: "sub-libp2p", "Successfully opened ping substream with {:?}", node_id);
let fut = ping_connec.set_until(pinger, future);
let fut = ping_connec.tie_or_passthrough(pinger, future);
Box::new(fut) as Box<_>
},
Err(err) => Box::new(future::err(err)) as Box<_>
@@ -687,7 +685,7 @@ fn handle_kademlia_connection(
val
});
Ok(kad_connec.set_until(controller, future))
Ok(kad_connec.tie_or_passthrough(controller, future))
}
/// When a remote performs a `FIND_NODE` Kademlia request for `searched`,
@@ -823,7 +821,7 @@ fn handle_custom_connection(
});
let val = (custom_proto_out.outgoing, custom_proto_out.protocol_version);
let final_fut = unique_connec.set_until(val, fut)
let final_fut = unique_connec.tie_or_stop(val, fut)
.then(move |val| {
// Makes sure that `dc_guard` is kept alive until here.
drop(dc_guard);
@@ -950,7 +948,7 @@ fn perform_kademlia_query<T, To, St, C>(
let random_peer_id = random_key.into_peer_id();
trace!(target: "sub-libp2p", "Start kademlia discovery for {:?}", random_peer_id);
shared.clone()
let future = shared.clone()
.kad_system
.find_node(random_peer_id, {
let shared = shared.clone();
@@ -974,7 +972,10 @@ fn perform_kademlia_query<T, To, St, C>(
)
.into_future()
.map_err(|(err, _)| err)
.map(|_| ())
.map(|_| ());
// Note that we use a `Box` in order to speed up compilation.
Box::new(future) as Box<Future<Item = _, Error = _>>
}
/// Connects to additional nodes, if necessary.
@@ -1163,8 +1164,7 @@ fn open_peer_custom_proto<T, To, St, C>(
);
}
// TODO: this future should be used
let _ = unique_connec.get_or_dial(&swarm_controller, &addr, with_err);
unique_connec.dial(&swarm_controller, &addr, with_err);
},
Err(err) => {
trace!(target: "sub-libp2p",
@@ -1200,11 +1200,14 @@ fn obtain_kad_connection<T, To, St, C>(shared: Arc<Shared>,
})
});
shared.network_state
let future = shared.network_state
.kad_connection(who.clone())
.into_future()
.map(move |(_, k)| k.get_or_dial(&swarm_controller, &addr, transport))
.flatten()
.map(move |(_, k)| k.dial(&swarm_controller, &addr, transport))
.flatten();
// Note that we use a Box in order to speed up compilation.
Box::new(future) as Box<Future<Item = _, Error = _>>
}
/// Processes the information about a node.
@@ -1305,7 +1308,7 @@ fn ping_all<T, St, C>(
let addr = Multiaddr::from(AddrComponent::P2P(who.clone().into_bytes()));
let fut = pinger
.get_or_dial(&swarm_controller, &addr, transport.clone())
.dial(&swarm_controller, &addr, transport.clone())
.and_then(move |mut p| {
trace!(target: "sub-libp2p", "Pinging peer #{} aka. {:?}", peer, who);
p.ping()
@@ -1334,7 +1337,7 @@ fn ping_all<T, St, C>(
ping_futures.push(fut);
}
future::loop_fn(ping_futures, |ping_futures| {
let future = future::loop_fn(ping_futures, |ping_futures| {
if ping_futures.is_empty() {
let fut = future::ok(future::Loop::Break(()));
return future::Either::A(fut)
@@ -1344,7 +1347,10 @@ fn ping_all<T, St, C>(
.map(|((), _, rest)| future::Loop::Continue(rest))
.map_err(|(err, _, _)| err);
future::Either::B(fut)
})
});
// Note that we use a Box in order to speed up compilation.
Box::new(future) as Box<Future<Item = _, Error = _>>
}
/// Expects a multiaddr of the format `/p2p/<node_id>` and returns the node ID.
@@ -15,10 +15,10 @@
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.?
use futures::{Async, future, Future, Poll, stream, Stream, sync::mpsc};
use std::io::Error as IoError;
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::marker::PhantomData;
use std::time::{Duration, Instant};
use tokio_core::reactor::{Handle, Timeout};
use tokio_timer::{self, Delay};
/// Builds the timeouts system.
///
@@ -27,21 +27,18 @@ use tokio_core::reactor::{Handle, Timeout};
/// `T` can be anything you want, as it is transparently passed from the input
/// to the output. Timeouts continue to fire forever, as there is no way to
/// unregister them.
pub fn build_timeouts_stream<T>(
core: Handle,
pub fn build_timeouts_stream<'a, T>(
timeouts_rx: mpsc::UnboundedReceiver<(Duration, T)>
) -> impl Stream<Item = T, Error = IoError>
where T: Clone {
) -> Box<Stream<Item = T, Error = IoError> + 'a>
where T: Clone + 'a {
let next_timeout = next_in_timeouts_stream(timeouts_rx);
// The `unfold` function is essentially a loop turned into a stream. The
// first parameter is the initial state, and the closure returns the new
// state and an item.
stream::unfold(vec![future::Either::A(next_timeout)], move |timeouts| {
let stream = stream::unfold(vec![future::Either::A(next_timeout)], move |timeouts| {
// `timeouts` is a `Vec` of futures that produce an `Out`.
let core = core.clone();
// `select_ok` panics if `timeouts` is empty anyway.
if timeouts.is_empty() {
return None
@@ -53,8 +50,7 @@ pub fn build_timeouts_stream<T>(
Out::NewTimeout((Some((duration, item)), next_timeouts)) => {
// Received a new timeout request on the channel.
let next_timeout = next_in_timeouts_stream(next_timeouts);
let at = Instant::now() + duration;
let timeout = Timeout::new_at(at, &core)?;
let timeout = Delay::new(Instant::now() + duration);
let timeout = TimeoutWrapper(timeout, duration, Some(item), PhantomData);
timeouts.push(future::Either::B(timeout));
timeouts.push(future::Either::A(next_timeout));
@@ -66,8 +62,7 @@ pub fn build_timeouts_stream<T>(
Out::Timeout(duration, item) => {
// A timeout has happened.
let returned = item.clone();
let at = Instant::now() + duration;
let timeout = Timeout::new_at(at, &core)?;
let timeout = Delay::new(Instant::now() + duration);
let timeout = TimeoutWrapper(timeout, duration, Some(item), PhantomData);
timeouts.push(future::Either::B(timeout));
Ok((Some(returned), timeouts))
@@ -75,7 +70,10 @@ pub fn build_timeouts_stream<T>(
}
)
)
}).filter_map(|item| item)
}).filter_map(|item| item);
// Note that we use a `Box` in order to speed up compilation time.
Box::new(stream) as Box<Stream<Item = _, Error = _>>
}
/// Local enum representing the output of the selection.
@@ -97,15 +95,20 @@ fn next_in_timeouts_stream<T, B>(
.map_err(|_| unreachable!("an UnboundedReceiver can never error"))
}
/// Does the equivalent to `future.map(move |()| (duration, item))`.
/// Does the equivalent to `future.map(move |()| (duration, item)).map_err(|err| to_io_err(err))`.
struct TimeoutWrapper<A, F, T>(F, Duration, Option<T>, PhantomData<A>);
impl<A, F, T> Future for TimeoutWrapper<A, F, T>
where F: Future<Item = ()> {
where F: Future<Item = (), Error = tokio_timer::Error> {
type Item = Out<A, T>;
type Error = F::Error;
type Error = IoError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let _ready: () = try_ready!(self.0.poll());
match self.0.poll() {
Ok(Async::Ready(())) => (),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(err) => return Err(IoError::new(IoErrorKind::Other, err.to_string())),
}
let out = Out::Timeout(self.1, self.2.take().expect("poll() called again after success"));
Ok(Async::Ready(out))
}
@@ -18,16 +18,14 @@ use libp2p::{self, Transport, mplex, secio, yamux};
use libp2p::core::{MuxedTransport, either, upgrade};
use libp2p::transport_timeout::TransportTimeout;
use std::time::Duration;
use tokio_core::reactor::Handle;
use tokio_io::{AsyncRead, AsyncWrite};
/// Builds the transport that serves as a common ground for all connections.
pub fn build_transport(
core: Handle,
unencrypted_allowed: UnencryptedAllowed,
local_private_key: secio::SecioKeyPair
) -> impl MuxedTransport<Output = impl AsyncRead + AsyncWrite> + Clone {
let base = libp2p::CommonTransport::new(core)
let base = libp2p::CommonTransport::new()
.with_upgrade({
let secio = secio::SecioConfig {
key: local_private_key,