Switch the telemetry to new futures (#3100)

* Switch the telemetry to new futures

* Line widths

* Apply suggestions from code review

Co-Authored-By: Bastian Köcher <bkchr@users.noreply.github.com>

* Fix Cargo.lock
This commit is contained in:
Pierre Krieger
2019-07-11 18:11:55 +02:00
committed by Gavin Wood
parent bf2551a854
commit b3dc472a9b
6 changed files with 124 additions and 88 deletions
+12 -2
View File
@@ -1014,6 +1014,15 @@ dependencies = [
"futures-core-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)", "futures-core-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
[[package]]
name = "futures-timer"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"futures-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)",
"pin-utils 0.1.0-alpha.4 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]] [[package]]
name = "futures-util-preview" name = "futures-util-preview"
version = "0.3.0-alpha.17" version = "0.3.0-alpha.17"
@@ -4808,6 +4817,8 @@ version = "2.0.0"
dependencies = [ dependencies = [
"bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-timer 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"libp2p 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)", "libp2p 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
"parking_lot 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -4816,9 +4827,7 @@ dependencies = [
"slog 2.4.1 (registry+https://github.com/rust-lang/crates.io-index)", "slog 2.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
"slog-json 2.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "slog-json 2.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"slog-scope 4.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "slog-scope 4.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-io 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-timer 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)",
"void 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", "void 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
@@ -6018,6 +6027,7 @@ dependencies = [
"checksum futures-io-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)" = "082e402605fcb8b1ae1e5ba7d7fdfd3e31ef510e2a8367dd92927bb41ae41b3a" "checksum futures-io-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)" = "082e402605fcb8b1ae1e5ba7d7fdfd3e31ef510e2a8367dd92927bb41ae41b3a"
"checksum futures-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)" = "bf25f91c8a9a1f64c451e91b43ba269ed359b9f52d35ed4b3ce3f9c842435867" "checksum futures-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)" = "bf25f91c8a9a1f64c451e91b43ba269ed359b9f52d35ed4b3ce3f9c842435867"
"checksum futures-sink-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)" = "4309a25a1069a1f3c10647b227b9afe6722b67a030d3f00a9cbdc171fc038de4" "checksum futures-sink-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)" = "4309a25a1069a1f3c10647b227b9afe6722b67a030d3f00a9cbdc171fc038de4"
"checksum futures-timer 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "eb4a32e84935678650944c6ebd0d912db46405d37bf94f1a058435c5080abcb1"
"checksum futures-util-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)" = "af8198c48b222f02326940ce2b3aa9e6e91a32886eeaad7ca3b8e4c70daa3f4e" "checksum futures-util-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)" = "af8198c48b222f02326940ce2b3aa9e6e91a32886eeaad7ca3b8e4c70daa3f4e"
"checksum gcc 0.3.55 (registry+https://github.com/rust-lang/crates.io-index)" = "8f5f3913fa0bfe7ee1fd8248b6b9f42a5af4b9d65ec2dd2c3c26132b950ecfc2" "checksum gcc 0.3.55 (registry+https://github.com/rust-lang/crates.io-index)" = "8f5f3913fa0bfe7ee1fd8248b6b9f42a5af4b9d65ec2dd2c3c26132b950ecfc2"
"checksum generic-array 0.12.3 (registry+https://github.com/rust-lang/crates.io-index)" = "c68f0274ae0e023facc3c97b2e00f076be70e254bc851d972503b328db79b2ec" "checksum generic-array 0.12.3 (registry+https://github.com/rust-lang/crates.io-index)" = "c68f0274ae0e023facc3c97b2e00f076be70e254bc851d972503b328db79b2ec"
+3
View File
@@ -36,6 +36,7 @@ use parking_lot::Mutex;
use client::{BlockchainEvents, backend::Backend, runtime_api::BlockT}; use client::{BlockchainEvents, backend::Backend, runtime_api::BlockT};
use exit_future::Signal; use exit_future::Signal;
use futures::prelude::*; use futures::prelude::*;
use futures03::stream::{StreamExt as _, TryStreamExt as _};
use keystore::Store as Keystore; use keystore::Store as Keystore;
use network::NetworkState; use network::NetworkState;
use log::{info, warn, debug, error}; use log::{info, warn, debug, error};
@@ -446,6 +447,8 @@ impl<Components: components::Components> Service<Components> {
wasm_external_transport: config.telemetry_external_transport.take(), wasm_external_transport: config.telemetry_external_transport.take(),
}); });
let future = telemetry.clone() let future = telemetry.clone()
.map(|ev| Ok::<_, ()>(ev))
.compat()
.for_each(move |event| { .for_each(move |event| {
// Safe-guard in case we add more events in the future. // Safe-guard in case we add more events in the future.
let tel::TelemetryEvent::Connected = event; let tel::TelemetryEvent::Connected = event;
+3 -5
View File
@@ -8,7 +8,9 @@ edition = "2018"
[dependencies] [dependencies]
bytes = "0.4" bytes = "0.4"
parking_lot = "0.8.0" parking_lot = "0.8.0"
futures = "0.1" futures01 = { package = "futures", version = "0.1" }
futures-preview = { version = "0.3.0-alpha.17", features = ["compat"] }
futures-timer = "0.2.1"
libp2p = { version = "0.10.0", default-features = false, features = ["libp2p-websocket"] } libp2p = { version = "0.10.0", default-features = false, features = ["libp2p-websocket"] }
log = "0.4" log = "0.4"
rand = "0.6" rand = "0.6"
@@ -17,8 +19,4 @@ slog = { version = "^2", features = ["nested-values"] }
slog-json = { version = "^2", features = ["nested-values"] } slog-json = { version = "^2", features = ["nested-values"] }
slog-scope = "^4" slog-scope = "^4"
tokio-io = "0.1" tokio-io = "0.1"
tokio-timer = "0.2"
void = "1.0" void = "1.0"
[dev-dependencies]
tokio = "0.1"
+12 -14
View File
@@ -48,7 +48,7 @@
//! //!
//! // The `telemetry` object implements `Stream` and must be processed. //! // The `telemetry` object implements `Stream` and must be processed.
//! std::thread::spawn(move || { //! std::thread::spawn(move || {
//! tokio::run(telemetry.for_each(|_| Ok(()))); //! futures::executor::block_on(telemetry.for_each(|_| future::ready(())));
//! }); //! });
//! //!
//! // Sends a message on the telemetry. //! // Sends a message on the telemetry.
@@ -58,13 +58,12 @@
//! ``` //! ```
//! //!
use futures::{prelude::*, task::AtomicTask}; use futures::{prelude::*, task::AtomicWaker};
use libp2p::{Multiaddr, wasm_ext}; use libp2p::{Multiaddr, wasm_ext};
use log::warn; use log::warn;
use parking_lot::Mutex; use parking_lot::Mutex;
use serde::{Serialize, Deserialize}; use serde::{Serialize, Deserialize};
use std::sync::{Arc, Weak}; use std::{pin::Pin, sync::{Arc, Weak}, task::{Context, Poll}, time::{Duration, Instant}};
use std::time::{Duration, Instant};
pub use slog_scope::with_logger; pub use slog_scope::with_logger;
pub use slog; pub use slog;
@@ -131,8 +130,8 @@ pub struct Telemetry {
struct TelemetryInner { struct TelemetryInner {
/// Worker for the telemetry. /// Worker for the telemetry.
worker: Mutex<worker::TelemetryWorker>, worker: Mutex<worker::TelemetryWorker>,
/// Task to wake up when we add a log entry to the worker. /// Waker to wake up when we add a log entry to the worker.
polling_task: AtomicTask, polling_waker: AtomicWaker,
} }
/// Implements `slog::Drain`. /// Implements `slog::Drain`.
@@ -156,7 +155,7 @@ pub fn init_telemetry(config: TelemetryConfig) -> Telemetry {
let inner = Arc::new(TelemetryInner { let inner = Arc::new(TelemetryInner {
worker: Mutex::new(worker::TelemetryWorker::new(endpoints, config.wasm_external_transport)), worker: Mutex::new(worker::TelemetryWorker::new(endpoints, config.wasm_external_transport)),
polling_task: AtomicTask::new(), polling_waker: AtomicWaker::new(),
}); });
let guard = { let guard = {
@@ -181,13 +180,12 @@ pub enum TelemetryEvent {
impl Stream for Telemetry { impl Stream for Telemetry {
type Item = TelemetryEvent; type Item = TelemetryEvent;
type Error = ();
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let before = Instant::now(); let before = Instant::now();
let mut has_connected = false; let mut has_connected = false;
while let Async::Ready(event) = self.inner.worker.lock().poll() { while let Poll::Ready(event) = self.inner.worker.lock().poll(cx) {
// Right now we only have one possible event. This line is here in order to not // Right now we only have one possible event. This line is here in order to not
// forget to handle any possible new event type. // forget to handle any possible new event type.
let worker::TelemetryWorkerEvent::Connected = event; let worker::TelemetryWorkerEvent::Connected = event;
@@ -199,10 +197,10 @@ impl Stream for Telemetry {
} }
if has_connected { if has_connected {
Ok(Async::Ready(Some(TelemetryEvent::Connected))) Poll::Ready(Some(TelemetryEvent::Connected))
} else { } else {
self.inner.polling_task.register(); self.inner.polling_waker.register(cx.waker());
Ok(Async::NotReady) Poll::Pending
} }
} }
} }
@@ -215,7 +213,7 @@ impl slog::Drain for TelemetryDrain {
if let Some(inner) = self.inner.0.upgrade() { if let Some(inner) = self.inner.0.upgrade() {
let before = Instant::now(); let before = Instant::now();
let result = inner.worker.lock().log(record, values); let result = inner.worker.lock().log(record, values);
inner.polling_task.notify(); inner.polling_waker.wake();
if before.elapsed() > Duration::from_millis(50) { if before.elapsed() > Duration::from_millis(50) {
warn!(target: "telemetry", "Writing a telemetry log took more than 50ms"); warn!(target: "telemetry", "Writing a telemetry log took more than 50ms");
} }
+38 -25
View File
@@ -27,12 +27,11 @@
//! //!
use bytes::BytesMut; use bytes::BytesMut;
use futures::prelude::*; use futures::compat::Compat01As03Sink;
use libp2p::{core::transport::OptionalTransport, core::ConnectedPoint, Multiaddr, Transport, wasm_ext}; use libp2p::{core::transport::OptionalTransport, core::ConnectedPoint, Multiaddr, Transport, wasm_ext};
use log::{trace, warn, error}; use log::{trace, warn, error};
use slog::Drain; use slog::Drain;
use std::{io, time}; use std::{io, pin::Pin, task::Context, task::Poll, time};
use tokio_io::AsyncWrite;
mod node; mod node;
@@ -58,19 +57,29 @@ pub struct TelemetryWorker {
/// The pile of libp2p transports. /// The pile of libp2p transports.
#[cfg(not(target_os = "unknown"))] #[cfg(not(target_os = "unknown"))]
type WsTrans = libp2p::core::transport::timeout::TransportTimeout< type WsTrans = libp2p::core::transport::timeout::TransportTimeout<
libp2p::core::transport::OrTransport< libp2p::core::transport::map::Map<
libp2p::core::transport::map::Map< libp2p::core::transport::OrTransport<
OptionalTransport<wasm_ext::ExtTransport>, libp2p::core::transport::map::Map<
fn(wasm_ext::Connection, ConnectedPoint) -> StreamSink<wasm_ext::Connection> OptionalTransport<wasm_ext::ExtTransport>,
fn(wasm_ext::Connection, ConnectedPoint) -> StreamSink<wasm_ext::Connection>
>,
libp2p::websocket::framed::WsConfig<libp2p::dns::DnsConfig<libp2p::tcp::TcpConfig>>
>, >,
libp2p::websocket::framed::WsConfig<libp2p::dns::DnsConfig<libp2p::tcp::TcpConfig>> fn(libp2p::core::either::EitherOutput<StreamSink<wasm_ext::Connection>,
libp2p::websocket::framed::BytesConnection<libp2p::tcp::TcpTransStream>>, ConnectedPoint)
-> Compat01As03Sink<libp2p::core::either::EitherOutput<StreamSink<wasm_ext::Connection>,
libp2p::websocket::framed::BytesConnection<libp2p::tcp::TcpTransStream>>, BytesMut>
> >
>; >;
#[cfg(target_os = "unknown")] #[cfg(target_os = "unknown")]
type WsTrans = libp2p::core::transport::timeout::TransportTimeout< type WsTrans = libp2p::core::transport::timeout::TransportTimeout<
libp2p::core::transport::map::Map< libp2p::core::transport::map::Map<
OptionalTransport<wasm_ext::ExtTransport>, libp2p::core::transport::map::Map<
fn(wasm_ext::Connection, ConnectedPoint) -> StreamSink<wasm_ext::Connection> OptionalTransport<wasm_ext::ExtTransport>,
fn(wasm_ext::Connection, ConnectedPoint) -> StreamSink<wasm_ext::Connection>
>,
fn(StreamSink<wasm_ext::Connection>, ConnectedPoint)
-> Compat01As03Sink<StreamSink<wasm_ext::Connection>, BytesMut>
> >
>; >;
@@ -98,7 +107,9 @@ impl TelemetryWorker {
libp2p::websocket::framed::WsConfig::new(inner) libp2p::websocket::framed::WsConfig::new(inner)
}); });
let transport = transport.with_timeout(CONNECT_TIMEOUT); let transport = transport
.map((|inner, _| Compat01As03Sink::new(inner)) as fn(_, _) -> _)
.with_timeout(CONNECT_TIMEOUT);
TelemetryWorker { TelemetryWorker {
nodes: endpoints.into_iter().map(|(addr, verbosity)| { nodes: endpoints.into_iter().map(|(addr, verbosity)| {
@@ -109,19 +120,19 @@ impl TelemetryWorker {
} }
/// Polls the worker for events that happened. /// Polls the worker for events that happened.
pub fn poll(&mut self) -> Async<TelemetryWorkerEvent> { pub fn poll(&mut self, cx: &mut Context) -> Poll<TelemetryWorkerEvent> {
for (node, _) in &mut self.nodes { for (node, _) in &mut self.nodes {
loop { loop {
match node.poll() { match node::Node::poll(Pin::new(node), cx) {
Async::Ready(node::NodeEvent::Connected) => Poll::Ready(node::NodeEvent::Connected) =>
return Async::Ready(TelemetryWorkerEvent::Connected), return Poll::Ready(TelemetryWorkerEvent::Connected),
Async::Ready(node::NodeEvent::Disconnected(_)) => continue, Poll::Ready(node::NodeEvent::Disconnected(_)) => continue,
Async::NotReady => break, Poll::Pending => break,
} }
} }
} }
Async::NotReady Poll::Pending
} }
/// Equivalent to `slog::Drain::log`, but takes `self` by `&mut` instead, which is more convenient. /// Equivalent to `slog::Drain::log`, but takes `self` by `&mut` instead, which is more convenient.
@@ -176,27 +187,29 @@ impl TelemetryWorker {
/// For some context, we put this object around the `wasm_ext::ExtTransport` in order to make sure /// For some context, we put this object around the `wasm_ext::ExtTransport` in order to make sure
/// that each telemetry message maps to one single call to `write` in the WASM FFI. /// that each telemetry message maps to one single call to `write` in the WASM FFI.
struct StreamSink<T>(T); struct StreamSink<T>(T);
impl<T: AsyncWrite> Sink for StreamSink<T> { impl<T: tokio_io::AsyncWrite> futures01::Sink for StreamSink<T> {
type SinkItem = BytesMut; type SinkItem = BytesMut;
type SinkError = io::Error; type SinkError = io::Error;
fn start_send(&mut self, item: Self::SinkItem) -> Result<AsyncSink<Self::SinkItem>, io::Error> { fn start_send(&mut self, item: Self::SinkItem)
-> Result<futures01::AsyncSink<Self::SinkItem>, io::Error> {
match self.0.write(&item[..]) { match self.0.write(&item[..]) {
Ok(n) if n == item.len() => Ok(AsyncSink::Ready), Ok(n) if n == item.len() => Ok(futures01::AsyncSink::Ready),
Ok(_) => { Ok(_) => {
error!(target: "telemetry", error!(target: "telemetry",
"Detected some internal buffering happening in the telemetry"); "Detected some internal buffering happening in the telemetry");
Err(io::Error::new(io::ErrorKind::Other, "Internal buffering detected")) Err(io::Error::new(io::ErrorKind::Other, "Internal buffering detected"))
}, },
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => Ok(AsyncSink::NotReady(item)), Err(ref err) if err.kind() == io::ErrorKind::WouldBlock =>
Ok(futures01::AsyncSink::NotReady(item)),
Err(err) => Err(err), Err(err) => Err(err),
} }
} }
fn poll_complete(&mut self) -> Poll<(), io::Error> { fn poll_complete(&mut self) -> futures01::Poll<(), io::Error> {
match self.0.flush() { match self.0.flush() {
Ok(()) => Ok(Async::Ready(())), Ok(()) => Ok(futures01::Async::Ready(())),
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => Ok(Async::NotReady), Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => Ok(futures01::Async::NotReady),
Err(err) => Err(err), Err(err) => Err(err),
} }
} }
+55 -41
View File
@@ -17,13 +17,13 @@
//! Contains the `Node` struct, which handles communications with a single telemetry endpoint. //! Contains the `Node` struct, which handles communications with a single telemetry endpoint.
use bytes::BytesMut; use bytes::BytesMut;
use futures::prelude::*; use futures::{prelude::*, compat::{Future01CompatExt as _, Compat01As03}};
use futures_timer::Delay;
use libp2p::Multiaddr; use libp2p::Multiaddr;
use libp2p::core::transport::Transport; use libp2p::core::transport::Transport;
use log::{trace, debug, warn, error}; use log::{trace, debug, warn, error};
use rand::Rng as _; use rand::Rng as _;
use std::{collections::VecDeque, fmt, mem, time::Duration, time::Instant}; use std::{collections::VecDeque, fmt, mem, pin::Pin, task::Context, task::Poll, time::Duration};
use tokio_timer::Delay;
/// Maximum number of pending telemetry messages. /// Maximum number of pending telemetry messages.
const MAX_PENDING: usize = 10; const MAX_PENDING: usize = 10;
@@ -42,7 +42,7 @@ enum NodeSocket<TTrans: Transport> {
/// We're connected to the node. This is the normal state. /// We're connected to the node. This is the normal state.
Connected(NodeSocketConnected<TTrans>), Connected(NodeSocketConnected<TTrans>),
/// We are currently dialing the node. /// We are currently dialing the node.
Dialing(TTrans::Dial), Dialing(Compat01As03<TTrans::Dial>),
/// A new connection should be started as soon as possible. /// A new connection should be started as soon as possible.
ReconnectNow, ReconnectNow,
/// Waiting before attempting to dial again. /// Waiting before attempting to dial again.
@@ -86,8 +86,8 @@ impl<TTrans: Transport> Node<TTrans> {
} }
impl<TTrans: Transport, TSinkErr> Node<TTrans> impl<TTrans: Transport, TSinkErr> Node<TTrans>
where TTrans: Clone, TTrans::Output: Sink<SinkItem = BytesMut, SinkError = TSinkErr>, where TTrans: Clone + Unpin, TTrans::Dial: Unpin,
TSinkErr: fmt::Debug { TTrans::Output: Sink<BytesMut, Error = TSinkErr> + Unpin, TSinkErr: fmt::Debug {
/// Sends a WebSocket frame to the node. Returns an error if we are not connected to the node. /// Sends a WebSocket frame to the node. Returns an error if we are not connected to the node.
/// ///
/// After calling this method, you should call `poll` in order for it to be properly processed. /// After calling this method, you should call `poll` in order for it to be properly processed.
@@ -108,29 +108,30 @@ where TTrans: Clone, TTrans::Output: Sink<SinkItem = BytesMut, SinkError = TSink
} }
/// Polls the node for updates. Must be performed regularly. /// Polls the node for updates. Must be performed regularly.
pub fn poll(&mut self) -> Async<NodeEvent<TSinkErr>> { pub fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<NodeEvent<TSinkErr>> {
let mut socket = mem::replace(&mut self.socket, NodeSocket::Poisoned); let mut socket = mem::replace(&mut self.socket, NodeSocket::Poisoned);
self.socket = loop { self.socket = loop {
match socket { match socket {
NodeSocket::Connected(mut conn) => match conn.poll(&self.addr) { NodeSocket::Connected(mut conn) =>
Ok(Async::Ready(v)) => void::unreachable(v), match NodeSocketConnected::poll(Pin::new(&mut conn), cx, &self.addr) {
Ok(Async::NotReady) => break NodeSocket::Connected(conn), Poll::Ready(Ok(v)) => match v {}
Err(err) => { Poll::Pending => break NodeSocket::Connected(conn),
debug!(target: "telemetry", "Disconnected from {}: {:?}", self.addr, err); Poll::Ready(Err(err)) => {
let timeout = gen_rand_reconnect_delay(); debug!(target: "telemetry", "Disconnected from {}: {:?}", self.addr, err);
self.socket = NodeSocket::WaitingReconnect(timeout); let timeout = gen_rand_reconnect_delay();
return Async::Ready(NodeEvent::Disconnected(err)) self.socket = NodeSocket::WaitingReconnect(timeout);
return Poll::Ready(NodeEvent::Disconnected(err))
}
} }
} NodeSocket::Dialing(mut s) => match Future::poll(Pin::new(&mut s), cx) {
NodeSocket::Dialing(mut s) => match s.poll() { Poll::Ready(Ok(sink)) => {
Ok(Async::Ready(sink)) => {
debug!(target: "telemetry", "Connected to {}", self.addr); debug!(target: "telemetry", "Connected to {}", self.addr);
let conn = NodeSocketConnected { sink, pending: VecDeque::new(), need_flush: false }; let conn = NodeSocketConnected { sink, pending: VecDeque::new(), need_flush: false };
self.socket = NodeSocket::Connected(conn); self.socket = NodeSocket::Connected(conn);
return Async::Ready(NodeEvent::Connected) return Poll::Ready(NodeEvent::Connected)
}, },
Ok(Async::NotReady) => break NodeSocket::Dialing(s), Poll::Pending => break NodeSocket::Dialing(s),
Err(err) => { Poll::Ready(Err(err)) => {
debug!(target: "telemetry", "Error while dialing {}: {:?}", self.addr, err); debug!(target: "telemetry", "Error while dialing {}: {:?}", self.addr, err);
let timeout = gen_rand_reconnect_delay(); let timeout = gen_rand_reconnect_delay();
socket = NodeSocket::WaitingReconnect(timeout); socket = NodeSocket::WaitingReconnect(timeout);
@@ -139,7 +140,7 @@ where TTrans: Clone, TTrans::Output: Sink<SinkItem = BytesMut, SinkError = TSink
NodeSocket::ReconnectNow => match self.transport.clone().dial(self.addr.clone()) { NodeSocket::ReconnectNow => match self.transport.clone().dial(self.addr.clone()) {
Ok(d) => { Ok(d) => {
debug!(target: "telemetry", "Started dialing {}", self.addr); debug!(target: "telemetry", "Started dialing {}", self.addr);
socket = NodeSocket::Dialing(d); socket = NodeSocket::Dialing(d.compat());
} }
Err(err) => { Err(err) => {
debug!(target: "telemetry", "Error while dialing {}: {:?}", self.addr, err); debug!(target: "telemetry", "Error while dialing {}: {:?}", self.addr, err);
@@ -147,11 +148,12 @@ where TTrans: Clone, TTrans::Output: Sink<SinkItem = BytesMut, SinkError = TSink
socket = NodeSocket::WaitingReconnect(timeout); socket = NodeSocket::WaitingReconnect(timeout);
} }
} }
NodeSocket::WaitingReconnect(mut s) => if let Ok(Async::Ready(_)) = s.poll() { NodeSocket::WaitingReconnect(mut s) =>
socket = NodeSocket::ReconnectNow; if let Poll::Ready(_) = Future::poll(Pin::new(&mut s), cx) {
} else { socket = NodeSocket::ReconnectNow;
break NodeSocket::WaitingReconnect(s) } else {
} break NodeSocket::WaitingReconnect(s)
}
NodeSocket::Poisoned => { NodeSocket::Poisoned => {
error!(target: "telemetry", "Poisoned connection with {}", self.addr); error!(target: "telemetry", "Poisoned connection with {}", self.addr);
break NodeSocket::Poisoned break NodeSocket::Poisoned
@@ -159,7 +161,7 @@ where TTrans: Clone, TTrans::Output: Sink<SinkItem = BytesMut, SinkError = TSink
} }
}; };
Async::NotReady Poll::Pending
} }
} }
@@ -169,36 +171,48 @@ where TTrans: Clone, TTrans::Output: Sink<SinkItem = BytesMut, SinkError = TSink
/// re-connection time. /// re-connection time.
fn gen_rand_reconnect_delay() -> Delay { fn gen_rand_reconnect_delay() -> Delay {
let random_delay = rand::thread_rng().gen_range(5, 10); let random_delay = rand::thread_rng().gen_range(5, 10);
Delay::new(Instant::now() + Duration::from_secs(random_delay)) Delay::new(Duration::from_secs(random_delay))
} }
impl<TTrans: Transport, TSinkErr> NodeSocketConnected<TTrans> impl<TTrans: Transport, TSinkErr> NodeSocketConnected<TTrans>
where TTrans::Output: Sink<SinkItem = BytesMut, SinkError = TSinkErr> { where TTrans::Output: Sink<BytesMut, Error = TSinkErr> + Unpin {
/// Processes the queue of messages for the connected socket. /// Processes the queue of messages for the connected socket.
/// ///
/// The address is passed for logging purposes only. /// The address is passed for logging purposes only.
fn poll(&mut self, my_addr: &Multiaddr) -> Poll<void::Void, TSinkErr> { fn poll(
mut self: Pin<&mut Self>,
cx: &mut Context,
my_addr: &Multiaddr
) -> Poll<Result<futures::never::Never, TSinkErr>> {
loop { loop {
if let Some(item) = self.pending.pop_front() { if let Some(item) = self.pending.pop_front() {
let item_len = item.len(); if let Poll::Pending = Sink::poll_ready(Pin::new(&mut self.sink), cx) {
if let AsyncSink::NotReady(item) = self.sink.start_send(item)? {
self.pending.push_front(item); self.pending.push_front(item);
break return Poll::Pending
} else {
trace!(target: "telemetry", "Successfully sent {:?} bytes message to {}",
item_len, my_addr);
self.need_flush = true;
} }
} else if self.need_flush && self.sink.poll_complete()?.is_ready() { let item_len = item.len();
self.need_flush = false; if let Err(err) = Sink::start_send(Pin::new(&mut self.sink), item) {
return Poll::Ready(Err(err))
}
trace!(
target: "telemetry", "Successfully sent {:?} bytes message to {}",
item_len, my_addr
);
self.need_flush = true;
} else if self.need_flush {
match Sink::poll_flush(Pin::new(&mut self.sink), cx) {
Poll::Pending => {}
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
Poll::Ready(Ok(())) => self.need_flush = false,
}
} else { } else {
break break
} }
} }
Ok(Async::NotReady) Poll::Pending
} }
} }