diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index 9418f373fa..021eca7758 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -1014,6 +1014,15 @@ dependencies = [ "futures-core-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "futures-timer" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)", + "pin-utils 0.1.0-alpha.4 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "futures-util-preview" version = "0.3.0-alpha.17" @@ -4808,6 +4817,8 @@ version = "2.0.0" dependencies = [ "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-timer 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "libp2p 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -4816,9 +4827,7 @@ dependencies = [ "slog 2.4.1 (registry+https://github.com/rust-lang/crates.io-index)", "slog-json 2.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "slog-scope 4.1.1 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-timer 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", "void 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -6018,6 +6027,7 @@ dependencies = [ "checksum futures-io-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)" = "082e402605fcb8b1ae1e5ba7d7fdfd3e31ef510e2a8367dd92927bb41ae41b3a" "checksum futures-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)" = "bf25f91c8a9a1f64c451e91b43ba269ed359b9f52d35ed4b3ce3f9c842435867" "checksum futures-sink-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)" = "4309a25a1069a1f3c10647b227b9afe6722b67a030d3f00a9cbdc171fc038de4" +"checksum futures-timer 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "eb4a32e84935678650944c6ebd0d912db46405d37bf94f1a058435c5080abcb1" "checksum futures-util-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)" = "af8198c48b222f02326940ce2b3aa9e6e91a32886eeaad7ca3b8e4c70daa3f4e" "checksum gcc 0.3.55 (registry+https://github.com/rust-lang/crates.io-index)" = "8f5f3913fa0bfe7ee1fd8248b6b9f42a5af4b9d65ec2dd2c3c26132b950ecfc2" "checksum generic-array 0.12.3 (registry+https://github.com/rust-lang/crates.io-index)" = "c68f0274ae0e023facc3c97b2e00f076be70e254bc851d972503b328db79b2ec" diff --git a/substrate/core/service/src/lib.rs b/substrate/core/service/src/lib.rs index 60f93b2da2..46a4e80d51 100644 --- a/substrate/core/service/src/lib.rs +++ b/substrate/core/service/src/lib.rs @@ -36,6 +36,7 @@ use parking_lot::Mutex; use client::{BlockchainEvents, backend::Backend, runtime_api::BlockT}; use exit_future::Signal; use futures::prelude::*; +use futures03::stream::{StreamExt as _, TryStreamExt as _}; use keystore::Store as Keystore; use network::NetworkState; use log::{info, warn, debug, error}; @@ -446,6 +447,8 @@ impl Service { wasm_external_transport: config.telemetry_external_transport.take(), }); let future = telemetry.clone() + .map(|ev| Ok::<_, ()>(ev)) + .compat() .for_each(move |event| { // Safe-guard in case we add more events in the future. let tel::TelemetryEvent::Connected = event; diff --git a/substrate/core/telemetry/Cargo.toml b/substrate/core/telemetry/Cargo.toml index 01c4d21a30..ca95fe94e5 100644 --- a/substrate/core/telemetry/Cargo.toml +++ b/substrate/core/telemetry/Cargo.toml @@ -8,7 +8,9 @@ edition = "2018" [dependencies] bytes = "0.4" parking_lot = "0.8.0" -futures = "0.1" +futures01 = { package = "futures", version = "0.1" } +futures-preview = { version = "0.3.0-alpha.17", features = ["compat"] } +futures-timer = "0.2.1" libp2p = { version = "0.10.0", default-features = false, features = ["libp2p-websocket"] } log = "0.4" rand = "0.6" @@ -17,8 +19,4 @@ slog = { version = "^2", features = ["nested-values"] } slog-json = { version = "^2", features = ["nested-values"] } slog-scope = "^4" tokio-io = "0.1" -tokio-timer = "0.2" void = "1.0" - -[dev-dependencies] -tokio = "0.1" diff --git a/substrate/core/telemetry/src/lib.rs b/substrate/core/telemetry/src/lib.rs index b9d9954851..88d515e538 100644 --- a/substrate/core/telemetry/src/lib.rs +++ b/substrate/core/telemetry/src/lib.rs @@ -48,7 +48,7 @@ //! //! // The `telemetry` object implements `Stream` and must be processed. //! std::thread::spawn(move || { -//! tokio::run(telemetry.for_each(|_| Ok(()))); +//! futures::executor::block_on(telemetry.for_each(|_| future::ready(()))); //! }); //! //! // Sends a message on the telemetry. @@ -58,13 +58,12 @@ //! ``` //! -use futures::{prelude::*, task::AtomicTask}; +use futures::{prelude::*, task::AtomicWaker}; use libp2p::{Multiaddr, wasm_ext}; use log::warn; use parking_lot::Mutex; use serde::{Serialize, Deserialize}; -use std::sync::{Arc, Weak}; -use std::time::{Duration, Instant}; +use std::{pin::Pin, sync::{Arc, Weak}, task::{Context, Poll}, time::{Duration, Instant}}; pub use slog_scope::with_logger; pub use slog; @@ -131,8 +130,8 @@ pub struct Telemetry { struct TelemetryInner { /// Worker for the telemetry. worker: Mutex, - /// Task to wake up when we add a log entry to the worker. - polling_task: AtomicTask, + /// Waker to wake up when we add a log entry to the worker. + polling_waker: AtomicWaker, } /// Implements `slog::Drain`. @@ -156,7 +155,7 @@ pub fn init_telemetry(config: TelemetryConfig) -> Telemetry { let inner = Arc::new(TelemetryInner { worker: Mutex::new(worker::TelemetryWorker::new(endpoints, config.wasm_external_transport)), - polling_task: AtomicTask::new(), + polling_waker: AtomicWaker::new(), }); let guard = { @@ -181,13 +180,12 @@ pub enum TelemetryEvent { impl Stream for Telemetry { type Item = TelemetryEvent; - type Error = (); - fn poll(&mut self) -> Poll, Self::Error> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let before = Instant::now(); let mut has_connected = false; - while let Async::Ready(event) = self.inner.worker.lock().poll() { + while let Poll::Ready(event) = self.inner.worker.lock().poll(cx) { // Right now we only have one possible event. This line is here in order to not // forget to handle any possible new event type. let worker::TelemetryWorkerEvent::Connected = event; @@ -199,10 +197,10 @@ impl Stream for Telemetry { } if has_connected { - Ok(Async::Ready(Some(TelemetryEvent::Connected))) + Poll::Ready(Some(TelemetryEvent::Connected)) } else { - self.inner.polling_task.register(); - Ok(Async::NotReady) + self.inner.polling_waker.register(cx.waker()); + Poll::Pending } } } @@ -215,7 +213,7 @@ impl slog::Drain for TelemetryDrain { if let Some(inner) = self.inner.0.upgrade() { let before = Instant::now(); let result = inner.worker.lock().log(record, values); - inner.polling_task.notify(); + inner.polling_waker.wake(); if before.elapsed() > Duration::from_millis(50) { warn!(target: "telemetry", "Writing a telemetry log took more than 50ms"); } diff --git a/substrate/core/telemetry/src/worker.rs b/substrate/core/telemetry/src/worker.rs index 87a3deb6ef..e132709378 100644 --- a/substrate/core/telemetry/src/worker.rs +++ b/substrate/core/telemetry/src/worker.rs @@ -27,12 +27,11 @@ //! use bytes::BytesMut; -use futures::prelude::*; +use futures::compat::Compat01As03Sink; use libp2p::{core::transport::OptionalTransport, core::ConnectedPoint, Multiaddr, Transport, wasm_ext}; use log::{trace, warn, error}; use slog::Drain; -use std::{io, time}; -use tokio_io::AsyncWrite; +use std::{io, pin::Pin, task::Context, task::Poll, time}; mod node; @@ -58,19 +57,29 @@ pub struct TelemetryWorker { /// The pile of libp2p transports. #[cfg(not(target_os = "unknown"))] type WsTrans = libp2p::core::transport::timeout::TransportTimeout< - libp2p::core::transport::OrTransport< - libp2p::core::transport::map::Map< - OptionalTransport, - fn(wasm_ext::Connection, ConnectedPoint) -> StreamSink + libp2p::core::transport::map::Map< + libp2p::core::transport::OrTransport< + libp2p::core::transport::map::Map< + OptionalTransport, + fn(wasm_ext::Connection, ConnectedPoint) -> StreamSink + >, + libp2p::websocket::framed::WsConfig> >, - libp2p::websocket::framed::WsConfig> + fn(libp2p::core::either::EitherOutput, + libp2p::websocket::framed::BytesConnection>, ConnectedPoint) + -> Compat01As03Sink, + libp2p::websocket::framed::BytesConnection>, BytesMut> > >; #[cfg(target_os = "unknown")] type WsTrans = libp2p::core::transport::timeout::TransportTimeout< libp2p::core::transport::map::Map< - OptionalTransport, - fn(wasm_ext::Connection, ConnectedPoint) -> StreamSink + libp2p::core::transport::map::Map< + OptionalTransport, + fn(wasm_ext::Connection, ConnectedPoint) -> StreamSink + >, + fn(StreamSink, ConnectedPoint) + -> Compat01As03Sink, BytesMut> > >; @@ -98,7 +107,9 @@ impl TelemetryWorker { libp2p::websocket::framed::WsConfig::new(inner) }); - let transport = transport.with_timeout(CONNECT_TIMEOUT); + let transport = transport + .map((|inner, _| Compat01As03Sink::new(inner)) as fn(_, _) -> _) + .with_timeout(CONNECT_TIMEOUT); TelemetryWorker { nodes: endpoints.into_iter().map(|(addr, verbosity)| { @@ -109,19 +120,19 @@ impl TelemetryWorker { } /// Polls the worker for events that happened. - pub fn poll(&mut self) -> Async { + pub fn poll(&mut self, cx: &mut Context) -> Poll { for (node, _) in &mut self.nodes { loop { - match node.poll() { - Async::Ready(node::NodeEvent::Connected) => - return Async::Ready(TelemetryWorkerEvent::Connected), - Async::Ready(node::NodeEvent::Disconnected(_)) => continue, - Async::NotReady => break, + match node::Node::poll(Pin::new(node), cx) { + Poll::Ready(node::NodeEvent::Connected) => + return Poll::Ready(TelemetryWorkerEvent::Connected), + Poll::Ready(node::NodeEvent::Disconnected(_)) => continue, + Poll::Pending => break, } } } - Async::NotReady + Poll::Pending } /// Equivalent to `slog::Drain::log`, but takes `self` by `&mut` instead, which is more convenient. @@ -132,7 +143,7 @@ impl TelemetryWorker { let msg_verbosity = match record.tag().parse::() { Ok(v) => v, Err(err) => { - warn!(target: "telemetry", "Failed to parse telemetry tag {:?}: {:?}", + warn!(target: "telemetry", "Failed to parse telemetry tag {:?}: {:?}", record.tag(), err); return Err(()) } @@ -176,27 +187,29 @@ impl TelemetryWorker { /// For some context, we put this object around the `wasm_ext::ExtTransport` in order to make sure /// that each telemetry message maps to one single call to `write` in the WASM FFI. struct StreamSink(T); -impl Sink for StreamSink { +impl futures01::Sink for StreamSink { type SinkItem = BytesMut; type SinkError = io::Error; - fn start_send(&mut self, item: Self::SinkItem) -> Result, io::Error> { + fn start_send(&mut self, item: Self::SinkItem) + -> Result, io::Error> { match self.0.write(&item[..]) { - Ok(n) if n == item.len() => Ok(AsyncSink::Ready), + Ok(n) if n == item.len() => Ok(futures01::AsyncSink::Ready), Ok(_) => { error!(target: "telemetry", "Detected some internal buffering happening in the telemetry"); Err(io::Error::new(io::ErrorKind::Other, "Internal buffering detected")) }, - Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => Ok(AsyncSink::NotReady(item)), + Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => + Ok(futures01::AsyncSink::NotReady(item)), Err(err) => Err(err), } } - fn poll_complete(&mut self) -> Poll<(), io::Error> { + fn poll_complete(&mut self) -> futures01::Poll<(), io::Error> { match self.0.flush() { - Ok(()) => Ok(Async::Ready(())), - Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => Ok(Async::NotReady), + Ok(()) => Ok(futures01::Async::Ready(())), + Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => Ok(futures01::Async::NotReady), Err(err) => Err(err), } } diff --git a/substrate/core/telemetry/src/worker/node.rs b/substrate/core/telemetry/src/worker/node.rs index a4d8f8d84e..fc09e90c7d 100644 --- a/substrate/core/telemetry/src/worker/node.rs +++ b/substrate/core/telemetry/src/worker/node.rs @@ -17,13 +17,13 @@ //! Contains the `Node` struct, which handles communications with a single telemetry endpoint. use bytes::BytesMut; -use futures::prelude::*; +use futures::{prelude::*, compat::{Future01CompatExt as _, Compat01As03}}; +use futures_timer::Delay; use libp2p::Multiaddr; use libp2p::core::transport::Transport; use log::{trace, debug, warn, error}; use rand::Rng as _; -use std::{collections::VecDeque, fmt, mem, time::Duration, time::Instant}; -use tokio_timer::Delay; +use std::{collections::VecDeque, fmt, mem, pin::Pin, task::Context, task::Poll, time::Duration}; /// Maximum number of pending telemetry messages. const MAX_PENDING: usize = 10; @@ -42,7 +42,7 @@ enum NodeSocket { /// We're connected to the node. This is the normal state. Connected(NodeSocketConnected), /// We are currently dialing the node. - Dialing(TTrans::Dial), + Dialing(Compat01As03), /// A new connection should be started as soon as possible. ReconnectNow, /// Waiting before attempting to dial again. @@ -86,8 +86,8 @@ impl Node { } impl Node -where TTrans: Clone, TTrans::Output: Sink, - TSinkErr: fmt::Debug { +where TTrans: Clone + Unpin, TTrans::Dial: Unpin, + TTrans::Output: Sink + Unpin, TSinkErr: fmt::Debug { /// Sends a WebSocket frame to the node. Returns an error if we are not connected to the node. /// /// After calling this method, you should call `poll` in order for it to be properly processed. @@ -108,29 +108,30 @@ where TTrans: Clone, TTrans::Output: Sink Async> { + pub fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let mut socket = mem::replace(&mut self.socket, NodeSocket::Poisoned); self.socket = loop { match socket { - NodeSocket::Connected(mut conn) => match conn.poll(&self.addr) { - Ok(Async::Ready(v)) => void::unreachable(v), - Ok(Async::NotReady) => break NodeSocket::Connected(conn), - Err(err) => { - debug!(target: "telemetry", "Disconnected from {}: {:?}", self.addr, err); - let timeout = gen_rand_reconnect_delay(); - self.socket = NodeSocket::WaitingReconnect(timeout); - return Async::Ready(NodeEvent::Disconnected(err)) + NodeSocket::Connected(mut conn) => + match NodeSocketConnected::poll(Pin::new(&mut conn), cx, &self.addr) { + Poll::Ready(Ok(v)) => match v {} + Poll::Pending => break NodeSocket::Connected(conn), + Poll::Ready(Err(err)) => { + debug!(target: "telemetry", "Disconnected from {}: {:?}", self.addr, err); + let timeout = gen_rand_reconnect_delay(); + self.socket = NodeSocket::WaitingReconnect(timeout); + return Poll::Ready(NodeEvent::Disconnected(err)) + } } - } - NodeSocket::Dialing(mut s) => match s.poll() { - Ok(Async::Ready(sink)) => { + NodeSocket::Dialing(mut s) => match Future::poll(Pin::new(&mut s), cx) { + Poll::Ready(Ok(sink)) => { debug!(target: "telemetry", "Connected to {}", self.addr); let conn = NodeSocketConnected { sink, pending: VecDeque::new(), need_flush: false }; self.socket = NodeSocket::Connected(conn); - return Async::Ready(NodeEvent::Connected) + return Poll::Ready(NodeEvent::Connected) }, - Ok(Async::NotReady) => break NodeSocket::Dialing(s), - Err(err) => { + Poll::Pending => break NodeSocket::Dialing(s), + Poll::Ready(Err(err)) => { debug!(target: "telemetry", "Error while dialing {}: {:?}", self.addr, err); let timeout = gen_rand_reconnect_delay(); socket = NodeSocket::WaitingReconnect(timeout); @@ -139,7 +140,7 @@ where TTrans: Clone, TTrans::Output: Sink match self.transport.clone().dial(self.addr.clone()) { Ok(d) => { debug!(target: "telemetry", "Started dialing {}", self.addr); - socket = NodeSocket::Dialing(d); + socket = NodeSocket::Dialing(d.compat()); } Err(err) => { debug!(target: "telemetry", "Error while dialing {}: {:?}", self.addr, err); @@ -147,11 +148,12 @@ where TTrans: Clone, TTrans::Output: Sink if let Ok(Async::Ready(_)) = s.poll() { - socket = NodeSocket::ReconnectNow; - } else { - break NodeSocket::WaitingReconnect(s) - } + NodeSocket::WaitingReconnect(mut s) => + if let Poll::Ready(_) = Future::poll(Pin::new(&mut s), cx) { + socket = NodeSocket::ReconnectNow; + } else { + break NodeSocket::WaitingReconnect(s) + } NodeSocket::Poisoned => { error!(target: "telemetry", "Poisoned connection with {}", self.addr); break NodeSocket::Poisoned @@ -159,7 +161,7 @@ where TTrans: Clone, TTrans::Output: Sink Delay { let random_delay = rand::thread_rng().gen_range(5, 10); - Delay::new(Instant::now() + Duration::from_secs(random_delay)) + Delay::new(Duration::from_secs(random_delay)) } impl NodeSocketConnected -where TTrans::Output: Sink { +where TTrans::Output: Sink + Unpin { /// Processes the queue of messages for the connected socket. /// /// The address is passed for logging purposes only. - fn poll(&mut self, my_addr: &Multiaddr) -> Poll { + fn poll( + mut self: Pin<&mut Self>, + cx: &mut Context, + my_addr: &Multiaddr + ) -> Poll> { loop { if let Some(item) = self.pending.pop_front() { - let item_len = item.len(); - if let AsyncSink::NotReady(item) = self.sink.start_send(item)? { + if let Poll::Pending = Sink::poll_ready(Pin::new(&mut self.sink), cx) { self.pending.push_front(item); - break - } else { - trace!(target: "telemetry", "Successfully sent {:?} bytes message to {}", - item_len, my_addr); - self.need_flush = true; + return Poll::Pending } - } else if self.need_flush && self.sink.poll_complete()?.is_ready() { - self.need_flush = false; + let item_len = item.len(); + if let Err(err) = Sink::start_send(Pin::new(&mut self.sink), item) { + return Poll::Ready(Err(err)) + } + trace!( + target: "telemetry", "Successfully sent {:?} bytes message to {}", + item_len, my_addr + ); + self.need_flush = true; + } else if self.need_flush { + match Sink::poll_flush(Pin::new(&mut self.sink), cx) { + Poll::Pending => {} + Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), + Poll::Ready(Ok(())) => self.need_flush = false, + } } else { break } } - Ok(Async::NotReady) + Poll::Pending } }