client/finality-grandpa/src/observer: Poll NetworkBridge (#4766)

Patch d9837d7dd reintegrated the periodic neighbor packet worker, by
moving its logic into a `Future` implementation on `NetworkBridge` which
needs to be polled by its upper layer.

Polling by the upper layer was implemented within the `Future`
implementation of the `VoterWork` struct but not within the `Future`
implementation of the `ObserverWork` struct. This patch adds polling of
the `NetworkBridge` to the latter.
This commit is contained in:
Max Inden
2020-01-29 18:42:21 +01:00
committed by GitHub
parent 907fd8c2fa
commit 504b4e89e8
@@ -336,10 +336,8 @@ where
{ {
type Output = Result<(), Error>; type Output = Result<(), Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = Pin::into_inner(self); match Future::poll(Pin::new(&mut self.observer), cx) {
match Future::poll(Pin::new(&mut this.observer), cx) {
Poll::Pending => {} Poll::Pending => {}
Poll::Ready(Ok(())) => { Poll::Ready(Ok(())) => {
// observer commit stream doesn't conclude naturally; this could reasonably be an error. // observer commit stream doesn't conclude naturally; this could reasonably be an error.
@@ -351,12 +349,12 @@ where
} }
Poll::Ready(Err(CommandOrError::VoterCommand(command))) => { Poll::Ready(Err(CommandOrError::VoterCommand(command))) => {
// some command issued internally // some command issued internally
this.handle_voter_command(command)?; self.handle_voter_command(command)?;
cx.waker().wake_by_ref(); cx.waker().wake_by_ref();
} }
} }
match Stream::poll_next(Pin::new(&mut this.voter_commands_rx), cx) { match Stream::poll_next(Pin::new(&mut self.voter_commands_rx), cx) {
Poll::Pending => {} Poll::Pending => {}
Poll::Ready(None) => { Poll::Ready(None) => {
// the `voter_commands_rx` stream should never conclude since it's never closed. // the `voter_commands_rx` stream should never conclude since it's never closed.
@@ -364,11 +362,11 @@ where
} }
Poll::Ready(Some(command)) => { Poll::Ready(Some(command)) => {
// some command issued externally // some command issued externally
this.handle_voter_command(command)?; self.handle_voter_command(command)?;
cx.waker().wake_by_ref(); cx.waker().wake_by_ref();
} }
} }
Poll::Pending Future::poll(Pin::new(&mut self.network), cx)
} }
} }