Remove wait on future in network bridge (#1765)

* remove wait on future in network bridge

* nit

Co-Authored-By: gterzian <2792687+gterzian@users.noreply.github.com>

* nit

Co-Authored-By: gterzian <2792687+gterzian@users.noreply.github.com>

* nit

* propagate error

* nit once more

* nit

Co-Authored-By: gterzian <2792687+gterzian@users.noreply.github.com>
This commit is contained in:
Gregory Terzian
2019-02-12 23:18:53 +08:00
committed by Bastian Köcher
parent 9e999cdd81
commit 6122f7d6b6
+30 -5
View File
@@ -84,7 +84,7 @@ extern crate env_logger;
extern crate parity_codec_derive;
use futures::prelude::*;
use futures::sync::{self, mpsc};
use futures::sync::{self, mpsc, oneshot};
use client::{
BlockchainEvents, CallExecutor, Client, backend::Backend,
error::Error as ClientError,
@@ -103,7 +103,6 @@ use grandpa::Error as GrandpaError;
use grandpa::{voter, round::State as RoundState, BlockNumberOps, VoterSet};
use network::Service as NetworkService;
use network::consensus_gossip::ConsensusMessage;
use std::sync::Arc;
use std::time::Duration;
@@ -216,6 +215,32 @@ impl From<ClientError> for Error {
}
}
/// A stream used by NetworkBridge in its implementation of Network.
pub struct NetworkStream {
inner: Option<mpsc::UnboundedReceiver<Vec<u8>>>,
outer: oneshot::Receiver<mpsc::UnboundedReceiver<Vec<u8>>>
}
impl Stream for NetworkStream {
type Item = Vec<u8>;
type Error = ();
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
if let Some(ref mut inner) = self.inner {
return inner.poll();
}
match self.outer.poll() {
Ok(futures::Async::Ready(mut inner)) => {
let poll_result = inner.poll();
self.inner = Some(inner);
poll_result
},
Ok(futures::Async::NotReady) => Ok(futures::Async::NotReady),
Err(_) => Err(())
}
}
}
/// A handle to the network. This is generally implemented by providing some
/// handle to a gossip service or similar.
///
@@ -277,14 +302,14 @@ fn commit_topic<B: BlockT>(set_id: u64) -> B::Hash {
}
impl<B: BlockT, S: network::specialization::NetworkSpecialization<B>,> Network<B> for NetworkBridge<B, S> {
type In = mpsc::UnboundedReceiver<ConsensusMessage>;
type In = NetworkStream;
fn messages_for(&self, round: u64, set_id: u64) -> Self::In {
let (tx, rx) = sync::oneshot::channel();
self.service.with_gossip(move |gossip, _| {
let inner_rx = gossip.messages_for(message_topic::<B>(round, set_id));
let _ = tx.send(inner_rx);
});
rx.wait().ok().expect("1. Network is running, 2. it should handle the above closure successfully")
NetworkStream { outer: rx, inner: None }
}
fn send_message(&self, round: u64, set_id: u64, message: Vec<u8>) {
@@ -308,7 +333,7 @@ impl<B: BlockT, S: network::specialization::NetworkSpecialization<B>,> Network<B
let inner_rx = gossip.messages_for(commit_topic::<B>(set_id));
let _ = tx.send(inner_rx);
});
rx.wait().ok().expect("1. Network is running, 2. it should handle the above closure successfully")
NetworkStream { outer: rx, inner: None }
}
fn send_commit(&self, _round: u64, set_id: u64, message: Vec<u8>) {