Update networking code to libp2p 0.14 (#4383)

* Entirely update substrate-telemetry to futures 0.3

* Add a Closed error

* Update to libp2p 0.14

* More work

* More work

* More work

* More work

* Fix warnings

* Remove unwrap()

* Work on tests fixing

* Fix network tests

* Fix external network tests

* Update libp2p and restore Yamux in discovery test

* Ignore DNS if initializatio nfails

* Restore variables ordering

* Forgot browser-utils

* Fix downfall after merge

* Fix tests
This commit is contained in:
Pierre Krieger
2020-01-09 19:01:23 +01:00
committed by Gavin Wood
parent 6e572a9477
commit ca997cf1e4
29 changed files with 842 additions and 812 deletions
+23 -10
View File
@@ -60,11 +60,12 @@
use futures::{prelude::*, channel::mpsc};
use libp2p::{Multiaddr, wasm_ext};
use log::warn;
use log::{error, warn};
use parking_lot::Mutex;
use serde::{Serialize, Deserialize};
use std::{pin::Pin, sync::Arc, task::{Context, Poll}, time::{Duration, Instant}};
pub use libp2p::wasm_ext::ExtTransport;
pub use slog_scope::with_logger;
pub use slog;
@@ -129,8 +130,8 @@ pub struct Telemetry {
/// where we extract the telemetry registration so that it continues running during the shutdown
/// process.
struct TelemetryInner {
/// Worker for the telemetry.
worker: worker::TelemetryWorker,
/// Worker for the telemetry. `None` if it failed to initialize.
worker: Option<worker::TelemetryWorker>,
/// Receives log entries for them to be dispatched to the worker.
receiver: mpsc::Receiver<async_record::AsyncRecord>,
}
@@ -162,9 +163,17 @@ pub fn init_telemetry(config: TelemetryConfig) -> Telemetry {
slog_scope::set_global_logger(root)
};
let worker = match worker::TelemetryWorker::new(endpoints, config.wasm_external_transport) {
Ok(w) => Some(w),
Err(err) => {
error!(target: "telemetry", "Failed to initialize telemetry worker: {:?}", err);
None
}
};
Telemetry {
inner: Arc::new(Mutex::new(TelemetryInner {
worker: worker::TelemetryWorker::new(endpoints, config.wasm_external_transport),
worker,
receiver,
})),
_guard: Arc::new(guard),
@@ -209,15 +218,19 @@ impl Stream for Telemetry {
// The polling pattern is: poll the worker so that it processes its queue, then add one
// message from the receiver (if possible), then poll the worker again, and so on.
loop {
while let Poll::Ready(event) = inner.worker.poll(cx) {
// Right now we only have one possible event. This line is here in order to not
// forget to handle any possible new event type.
let worker::TelemetryWorkerEvent::Connected = event;
has_connected = true;
if let Some(worker) = inner.worker.as_mut() {
while let Poll::Ready(event) = worker.poll(cx) {
// Right now we only have one possible event. This line is here in order to not
// forget to handle any possible new event type.
let worker::TelemetryWorkerEvent::Connected = event;
has_connected = true;
}
}
if let Poll::Ready(Some(log_entry)) = Stream::poll_next(Pin::new(&mut inner.receiver), cx) {
log_entry.as_record_values(|rec, val| { let _ = inner.worker.log(rec, val); });
if let Some(worker) = inner.worker.as_mut() {
log_entry.as_record_values(|rec, val| { let _ = worker.log(rec, val); });
}
} else {
break;
}
+97 -73
View File
@@ -27,8 +27,8 @@
//!
use bytes::BytesMut;
use futures::compat::Compat01As03Sink;
use libp2p::{core::transport::OptionalTransport, core::ConnectedPoint, Multiaddr, Transport, wasm_ext};
use futures::{prelude::*, ready};
use libp2p::{core::transport::OptionalTransport, Multiaddr, Transport, wasm_ext};
use log::{trace, warn, error};
use slog::Drain;
use std::{io, pin::Pin, task::Context, task::Poll, time};
@@ -54,33 +54,16 @@ pub struct TelemetryWorker {
nodes: Vec<(node::Node<WsTrans>, u8)>,
}
/// The pile of libp2p transports.
#[cfg(not(target_os = "unknown"))]
type WsTrans = libp2p::core::transport::timeout::TransportTimeout<
libp2p::core::transport::map::Map<
libp2p::core::transport::OrTransport<
libp2p::core::transport::map::Map<
OptionalTransport<wasm_ext::ExtTransport>,
fn(wasm_ext::Connection, ConnectedPoint) -> StreamSink<wasm_ext::Connection>
>,
libp2p::websocket::framed::WsConfig<libp2p::dns::DnsConfig<libp2p::tcp::TcpConfig>>
>,
fn(libp2p::core::either::EitherOutput<StreamSink<wasm_ext::Connection>,
libp2p::websocket::framed::BytesConnection<libp2p::tcp::TcpTransStream>>, ConnectedPoint)
-> Compat01As03Sink<libp2p::core::either::EitherOutput<StreamSink<wasm_ext::Connection>,
libp2p::websocket::framed::BytesConnection<libp2p::tcp::TcpTransStream>>, BytesMut>
>
>;
#[cfg(target_os = "unknown")]
type WsTrans = libp2p::core::transport::timeout::TransportTimeout<
libp2p::core::transport::map::Map<
libp2p::core::transport::map::Map<
OptionalTransport<wasm_ext::ExtTransport>,
fn(wasm_ext::Connection, ConnectedPoint) -> StreamSink<wasm_ext::Connection>
>,
fn(StreamSink<wasm_ext::Connection>, ConnectedPoint)
-> Compat01As03Sink<StreamSink<wasm_ext::Connection>, BytesMut>
>
trait StreamAndSink<I>: Stream + Sink<I> {}
impl<T: ?Sized + Stream + Sink<I>, I> StreamAndSink<I> for T {}
type WsTrans = libp2p::core::transport::boxed::Boxed<
Pin<Box<dyn StreamAndSink<
BytesMut,
Item = Result<BytesMut, io::Error>,
Error = io::Error
> + Send>>,
io::Error
>;
impl TelemetryWorker {
@@ -92,31 +75,48 @@ impl TelemetryWorker {
pub fn new(
endpoints: impl IntoIterator<Item = (Multiaddr, u8)>,
wasm_external_transport: impl Into<Option<wasm_ext::ExtTransport>>
) -> Self {
) -> Result<Self, io::Error> {
let transport = match wasm_external_transport.into() {
Some(t) => OptionalTransport::some(t),
None => OptionalTransport::none()
}.map((|inner, _| StreamSink(inner)) as fn(_, _) -> _);
}.map((|inner, _| StreamSink::from(inner)) as fn(_, _) -> _);
// The main transport is the `wasm_external_transport`, but if we're on desktop we add
// support for TCP+WebSocket+DNS as a fallback. In practice, you're not expected to pass
// an external transport on desktop and the fallback is used all the time.
#[cfg(not(target_os = "unknown"))]
let transport = transport.or_transport({
let inner = libp2p::dns::DnsConfig::new(libp2p::tcp::TcpConfig::new());
let inner = libp2p::dns::DnsConfig::new(libp2p::tcp::TcpConfig::new())?;
libp2p::websocket::framed::WsConfig::new(inner)
.and_then(|connec, _| {
let connec = connec
.with(|item: BytesMut| {
let item = libp2p::websocket::framed::OutgoingData::Binary(item);
future::ready(Ok::<_, io::Error>(item))
})
.try_filter(|item| future::ready(item.is_data()))
.map_ok(|data| BytesMut::from(data.as_ref()));
future::ready(Ok::<_, io::Error>(connec))
})
});
let transport = transport
.map((|inner, _| Compat01As03Sink::new(inner)) as fn(_, _) -> _)
.timeout(CONNECT_TIMEOUT);
.timeout(CONNECT_TIMEOUT)
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
.map(|out, _| {
let out = out
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
.sink_map_err(|err| io::Error::new(io::ErrorKind::Other, err));
Box::pin(out) as Pin<Box<_>>
})
.boxed();
TelemetryWorker {
Ok(TelemetryWorker {
nodes: endpoints.into_iter().map(|(addr, verbosity)| {
let node = node::Node::new(transport.clone(), addr);
(node, verbosity)
}).collect()
}
})
}
/// Polls the worker for events that happened.
@@ -174,7 +174,7 @@ impl TelemetryWorker {
}
// `send_message` returns an error if we're not connected, which we silently ignore.
let _ = node.send_message(serialized.clone());
let _ = node.send_message(&serialized.clone()[..]);
}
Ok(())
@@ -186,50 +186,74 @@ impl TelemetryWorker {
///
/// For some context, we put this object around the `wasm_ext::ExtTransport` in order to make sure
/// that each telemetry message maps to one single call to `write` in the WASM FFI.
struct StreamSink<T>(T);
#[pin_project::pin_project]
struct StreamSink<T>(#[pin] T, Option<BytesMut>);
impl<T: tokio_io::AsyncRead> futures01::Stream for StreamSink<T> {
type Item = BytesMut;
type Error = io::Error;
fn poll(&mut self) -> futures01::Poll<Option<Self::Item>, Self::Error> {
let mut buf = [0; 128];
Ok(self.0.poll_read(&mut buf)?
.map(|n|
if n == 0 {
None
} else {
let buf: BytesMut = buf[..n].into();
Some(buf)
}
))
impl<T> From<T> for StreamSink<T> {
fn from(inner: T) -> StreamSink<T> {
StreamSink(inner, None)
}
}
impl<T: tokio_io::AsyncWrite> futures01::Sink for StreamSink<T> {
type SinkItem = BytesMut;
type SinkError = io::Error;
impl<T: AsyncRead> Stream for StreamSink<T> {
type Item = Result<BytesMut, io::Error>;
fn start_send(&mut self, item: Self::SinkItem)
-> Result<futures01::AsyncSink<Self::SinkItem>, io::Error> {
match self.0.write(&item[..]) {
Ok(n) if n == item.len() => Ok(futures01::AsyncSink::Ready),
Ok(_) => {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let this = self.project();
let mut buf = [0; 128];
match ready!(AsyncRead::poll_read(this.0, cx, &mut buf)) {
Ok(0) => Poll::Ready(None),
Ok(n) => {
let buf: BytesMut = buf[..n].into();
Poll::Ready(Some(Ok(buf)))
},
Err(err) => Poll::Ready(Some(Err(err))),
}
}
}
impl<T: AsyncWrite> StreamSink<T> {
fn poll_flush_buffer(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
let this = self.project();
if let Some(buffer) = this.1 {
if ready!(this.0.poll_write(cx, &buffer[..]))? != buffer.len() {
error!(target: "telemetry",
"Detected some internal buffering happening in the telemetry");
Err(io::Error::new(io::ErrorKind::Other, "Internal buffering detected"))
},
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock =>
Ok(futures01::AsyncSink::NotReady(item)),
Err(err) => Err(err),
let err = io::Error::new(io::ErrorKind::Other, "Internal buffering detected");
return Poll::Ready(Err(err));
}
}
}
fn poll_complete(&mut self) -> futures01::Poll<(), io::Error> {
match self.0.flush() {
Ok(()) => Ok(futures01::Async::Ready(())),
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => Ok(futures01::Async::NotReady),
Err(err) => Err(err),
}
*this.1 = None;
Poll::Ready(Ok(()))
}
}
impl<T: AsyncWrite> Sink<BytesMut> for StreamSink<T> {
type Error = io::Error;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
ready!(StreamSink::poll_flush_buffer(self, cx))?;
Poll::Ready(Ok(()))
}
fn start_send(self: Pin<&mut Self>, item: BytesMut) -> Result<(), Self::Error> {
let this = self.project();
debug_assert!(this.1.is_none());
*this.1 = Some(item);
Ok(())
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
ready!(self.as_mut().poll_flush_buffer(cx))?;
let this = self.project();
AsyncWrite::poll_flush(this.0, cx)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
ready!(self.as_mut().poll_flush_buffer(cx))?;
let this = self.project();
AsyncWrite::poll_close(this.0, cx)
}
}
+16 -7
View File
@@ -17,7 +17,7 @@
//! Contains the `Node` struct, which handles communications with a single telemetry endpoint.
use bytes::BytesMut;
use futures::{prelude::*, compat::{Future01CompatExt as _, Compat01As03}};
use futures::prelude::*;
use futures_timer::Delay;
use libp2p::Multiaddr;
use libp2p::core::transport::Transport;
@@ -42,7 +42,7 @@ enum NodeSocket<TTrans: Transport> {
/// We're connected to the node. This is the normal state.
Connected(NodeSocketConnected<TTrans>),
/// We are currently dialing the node.
Dialing(Compat01As03<TTrans::Dial>),
Dialing(TTrans::Dial),
/// A new connection should be started as soon as possible.
ReconnectNow,
/// Waiting before attempting to dial again.
@@ -76,6 +76,9 @@ pub enum NodeEvent<TSinkErr> {
pub enum ConnectionError<TSinkErr> {
/// The connection timed-out.
Timeout,
/// Reading from the socket returned and end-of-file, indicating that the socket has been
/// closed.
Closed,
/// The sink errored.
Sink(TSinkErr),
}
@@ -106,7 +109,7 @@ where TTrans: Clone + Unpin, TTrans::Dial: Unpin,
/// Sends a WebSocket frame to the node. Returns an error if we are not connected to the node.
///
/// After calling this method, you should call `poll` in order for it to be properly processed.
pub fn send_message(&mut self, payload: Vec<u8>) -> Result<(), ()> {
pub fn send_message(&mut self, payload: impl Into<BytesMut>) -> Result<(), ()> {
if let NodeSocket::Connected(NodeSocketConnected { pending, .. }) = &mut self.socket {
if pending.len() <= MAX_PENDING {
trace!(target: "telemetry", "Adding log entry to queue for {:?}", self.addr);
@@ -163,7 +166,7 @@ where TTrans: Clone + Unpin, TTrans::Dial: Unpin,
NodeSocket::ReconnectNow => match self.transport.clone().dial(self.addr.clone()) {
Ok(d) => {
debug!(target: "telemetry", "Started dialing {}", self.addr);
socket = NodeSocket::Dialing(d.compat());
socket = NodeSocket::Dialing(d);
}
Err(err) => {
warn!(target: "telemetry", "Error while dialing {}: {:?}", self.addr, err);
@@ -212,10 +215,13 @@ where TTrans::Output: Sink<BytesMut, Error = TSinkErr>
) -> Poll<Result<futures::never::Never, ConnectionError<TSinkErr>>> {
while let Some(item) = self.pending.pop_front() {
if let Poll::Ready(_) = Sink::poll_ready(Pin::new(&mut self.sink), cx) {
if let Poll::Ready(result) = Sink::poll_ready(Pin::new(&mut self.sink), cx) {
if let Err(err) = result {
return Poll::Ready(Err(ConnectionError::Sink(err)))
}
let item_len = item.len();
if let Err(err) = Sink::start_send(Pin::new(&mut self.sink), item) {
self.timeout = None;
return Poll::Ready(Err(ConnectionError::Sink(err)))
}
trace!(
@@ -270,7 +276,10 @@ where TTrans::Output: Sink<BytesMut, Error = TSinkErr>
Poll::Ready(Some(Err(err))) => {
return Poll::Ready(Err(ConnectionError::Sink(err)))
},
Poll::Pending | Poll::Ready(None) => {},
Poll::Ready(None) => {
return Poll::Ready(Err(ConnectionError::Closed))
},
Poll::Pending => {},
}
Poll::Pending