diff --git a/substrate/core/telemetry/src/worker.rs b/substrate/core/telemetry/src/worker.rs index e132709378..24a1de8ec4 100644 --- a/substrate/core/telemetry/src/worker.rs +++ b/substrate/core/telemetry/src/worker.rs @@ -187,6 +187,25 @@ impl TelemetryWorker { /// For some context, we put this object around the `wasm_ext::ExtTransport` in order to make sure /// that each telemetry message maps to one single call to `write` in the WASM FFI. struct StreamSink(T); + +impl futures01::Stream for StreamSink { + type Item = BytesMut; + type Error = io::Error; + + fn poll(&mut self) -> futures01::Poll, Self::Error> { + let mut buf = [0; 128]; + Ok(self.0.poll_read(&mut buf)? + .map(|n| + if n == 0 { + None + } else { + let buf: BytesMut = buf[..n].into(); + Some(buf) + } + )) + } +} + impl futures01::Sink for StreamSink { type SinkItem = BytesMut; type SinkError = io::Error; diff --git a/substrate/core/telemetry/src/worker/node.rs b/substrate/core/telemetry/src/worker/node.rs index 64d150a3be..d6193d4cc6 100644 --- a/substrate/core/telemetry/src/worker/node.rs +++ b/substrate/core/telemetry/src/worker/node.rs @@ -87,7 +87,7 @@ impl Node { impl Node where TTrans: Clone + Unpin, TTrans::Dial: Unpin, - TTrans::Output: Sink + Unpin, TSinkErr: fmt::Debug { + TTrans::Output: Sink + Stream + Unpin, TSinkErr: fmt::Debug { /// Sends a WebSocket frame to the node. Returns an error if we are not connected to the node. /// /// After calling this method, you should call `poll` in order for it to be properly processed. @@ -175,7 +175,7 @@ fn gen_rand_reconnect_delay() -> Delay { } impl NodeSocketConnected -where TTrans::Output: Sink + Unpin { +where TTrans::Output: Sink + Stream + Unpin { /// Processes the queue of messages for the connected socket. /// /// The address is passed for logging purposes only. @@ -200,6 +200,7 @@ where TTrans::Output: Sink + Unpin { item_len, my_addr ); self.need_flush = true; + } else if self.need_flush { match Sink::poll_flush(Pin::new(&mut self.sink), cx) { Poll::Pending => return Poll::Pending, @@ -207,6 +208,11 @@ where TTrans::Output: Sink + Unpin { Poll::Ready(Ok(())) => self.need_flush = false, } + } else if let Poll::Ready(_) = Stream::poll_next(Pin::new(&mut self.sink), cx) { + // We poll the telemetry `Stream` because the underlying implementation relies on + // this in order to answer PINGs. + // We don't do anything with incoming messages, however. + } else { break }