From b5cd36289edd12a5a3227ffa56070705b1d16175 Mon Sep 17 00:00:00 2001 From: Arkadiy Paronyan Date: Wed, 7 Aug 2019 20:49:21 +0200 Subject: [PATCH] Handle telemetry socket errors (#3321) * Handle telemetry socket errors * Line width --- substrate/core/telemetry/src/worker/node.rs | 28 +++++++++++++++------ 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/substrate/core/telemetry/src/worker/node.rs b/substrate/core/telemetry/src/worker/node.rs index d6193d4cc6..11b1f2a81e 100644 --- a/substrate/core/telemetry/src/worker/node.rs +++ b/substrate/core/telemetry/src/worker/node.rs @@ -87,7 +87,11 @@ impl Node { impl Node where TTrans: Clone + Unpin, TTrans::Dial: Unpin, - TTrans::Output: Sink + Stream + Unpin, TSinkErr: fmt::Debug { + TTrans::Output: Sink + + Stream> + + Unpin, + TSinkErr: fmt::Debug +{ /// Sends a WebSocket frame to the node. Returns an error if we are not connected to the node. /// /// After calling this method, you should call `poll` in order for it to be properly processed. @@ -175,7 +179,10 @@ fn gen_rand_reconnect_delay() -> Delay { } impl NodeSocketConnected -where TTrans::Output: Sink + Stream + Unpin { +where TTrans::Output: Sink + + Stream> + + Unpin +{ /// Processes the queue of messages for the connected socket. /// /// The address is passed for logging purposes only. @@ -208,13 +215,18 @@ where TTrans::Output: Sink + Stream + Unpin { Poll::Ready(Ok(())) => self.need_flush = false, } - } else if let Poll::Ready(_) = Stream::poll_next(Pin::new(&mut self.sink), cx) { - // We poll the telemetry `Stream` because the underlying implementation relies on - // this in order to answer PINGs. - // We don't do anything with incoming messages, however. - } else { - break + match Stream::poll_next(Pin::new(&mut self.sink), cx) { + Poll::Ready(Some(Ok(_))) => { + // We poll the telemetry `Stream` because the underlying implementation relies on + // this in order to answer PINGs. + // We don't do anything with incoming messages, however. + }, + Poll::Ready(Some(Err(err))) => { + return Poll::Ready(Err(err)) + }, + Poll::Pending | Poll::Ready(None) => break, + } } }