gossip: futures 03 Receiver (#3832)

* gossip: futures 03 receiver

* fix gossip test

* use tokio 01

* add comment

* Update core/finality-grandpa/src/communication/mod.rs

Co-Authored-By: Bastian Köcher <bkchr@users.noreply.github.com>

* fix format

* rename

* remove tokio 01 runtime

* minor fix

* make stable happy
This commit is contained in:
Weiliang Li
2019-10-23 15:23:47 +09:00
committed by Bastian Köcher
parent 5123a84f13
commit 5a07887535
2 changed files with 35 additions and 25 deletions
@@ -31,6 +31,7 @@ use std::sync::Arc;
use futures::prelude::*;
use futures::sync::{oneshot, mpsc};
use futures03::stream::{StreamExt, TryStreamExt};
use grandpa::Message::{Prevote, Precommit, PrimaryPropose};
use grandpa::{voter, voter_set::VoterSet};
use log::{debug, trace};
@@ -100,7 +101,7 @@ mod benefit {
/// Intended to be a lightweight handle such as an `Arc`.
pub trait Network<Block: BlockT>: Clone + Send + 'static {
/// A stream of input messages for a topic.
type In: Stream<Item=network_gossip::TopicNotification,Error=()>;
type In: Stream<Item = network_gossip::TopicNotification, Error = ()>;
/// Get a stream of messages for a specific gossip topic.
fn messages_for(&self, topic: Block::Hash) -> Self::In;
@@ -145,7 +146,9 @@ impl<B, S, H> Network<B> for Arc<NetworkService<B, S, H>> where
S: network::specialization::NetworkSpecialization<B>,
H: network::ExHashT,
{
type In = NetworkStream;
type In = NetworkStream<
Box<dyn Stream<Item = network_gossip::TopicNotification, Error = ()> + Send + 'static>,
>;
fn messages_for(&self, topic: B::Hash) -> Self::In {
// Given that one can only communicate with the Substrate network via the `NetworkService` via message-passing,
@@ -159,7 +162,11 @@ impl<B, S, H> Network<B> for Arc<NetworkService<B, S, H>> where
// waiting for the oneshot to resolve and from there on acting like a normal message channel.
let (tx, rx) = oneshot::channel();
self.with_gossip(move |gossip, _| {
let inner_rx = gossip.messages_for(GRANDPA_ENGINE_ID, topic);
let inner_rx: Box<dyn Stream<Item = _, Error = ()> + Send> = Box::new(gossip
.messages_for(GRANDPA_ENGINE_ID, topic)
.map(|x| Ok(x))
.compat()
);
let _ = tx.send(inner_rx);
});
NetworkStream::PollingOneshot(rx)
@@ -220,13 +227,16 @@ impl<B, S, H> Network<B> for Arc<NetworkService<B, S, H>> where
///
/// `NetworkStream` combines the two steps into one, requiring a consumer to only poll `NetworkStream` to retrieve
/// messages directly.
pub enum NetworkStream {
PollingOneshot(oneshot::Receiver<mpsc::UnboundedReceiver<network_gossip::TopicNotification>>),
PollingTopicNotifications(mpsc::UnboundedReceiver<network_gossip::TopicNotification>),
pub enum NetworkStream<R> {
PollingOneshot(oneshot::Receiver<R>),
PollingTopicNotifications(R),
}
impl Stream for NetworkStream {
type Item = network_gossip::TopicNotification;
impl<R> Stream for NetworkStream<R>
where
R: Stream<Item = network_gossip::TopicNotification, Error = ()>,
{
type Item = R::Item;
type Error = ();
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
@@ -266,11 +276,11 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
service: N,
config: crate::Config,
set_state: crate::environment::SharedVoterSetState<B>,
on_exit: impl Future<Item=(),Error=()> + Clone + Send + 'static,
on_exit: impl Future<Item = (), Error = ()> + Clone + Send + 'static,
catch_up_enabled: bool,
) -> (
Self,
impl futures::Future<Item = (), Error = ()> + Send + 'static,
impl Future<Item = (), Error = ()> + Send + 'static,
) {
let (validator, report_stream) = GossipValidator::new(
@@ -48,7 +48,7 @@ use std::sync::Arc;
use std::iter;
use std::time;
use log::{trace, debug};
use futures::sync::mpsc;
use futures03::channel::mpsc;
use lru_cache::LruCache;
use libp2p::PeerId;
use sr_primitives::traits::{Block as BlockT, Hash, HashFor};
@@ -608,7 +608,7 @@ impl<B: BlockT> Validator<B> for DiscardAll {
#[cfg(test)]
mod tests {
use sr_primitives::testing::{H256, Block as RawBlock, ExtrinsicWrapper};
use futures::Stream;
use futures03::executor::block_on_stream;
use super::*;
@@ -670,7 +670,7 @@ mod tests {
let m2 = vec![4, 5, 6];
push_msg!(consensus, prev_hash, m1_hash, m1);
push_msg!(consensus, best_hash, m2_hash, m2.clone());
push_msg!(consensus, best_hash, m2_hash, m2);
consensus.known_messages.insert(m1_hash, ());
consensus.known_messages.insert(m2_hash, ());
@@ -692,8 +692,6 @@ mod tests {
#[test]
fn message_stream_include_those_sent_before_asking_for_stream() {
use futures::Stream;
let mut consensus = ConsensusGossip::<Block>::new();
consensus.register_validator_internal([0, 0, 0, 0], Arc::new(AllowAll));
@@ -701,9 +699,9 @@ mod tests {
let topic = HashFor::<Block>::hash(&[1,2,3]);
consensus.register_message(topic, message.clone());
let stream = consensus.messages_for([0, 0, 0, 0], topic);
let mut stream = block_on_stream(consensus.messages_for([0, 0, 0, 0], topic));
assert_eq!(stream.wait().next(), Some(Ok(TopicNotification { message: message.data, sender: None })));
assert_eq!(stream.next(), Some(TopicNotification { message: message.data, sender: None }));
}
#[test]
@@ -725,16 +723,17 @@ mod tests {
let mut consensus = ConsensusGossip::<Block>::new();
consensus.register_validator_internal([0, 0, 0, 0], Arc::new(AllowAll));
let message = ConsensusMessage { data: vec![4, 5, 6], engine_id: [0, 0, 0, 0] };
let topic = HashFor::<Block>::hash(&[1,2,3]);
let data = vec![4, 5, 6];
let message = ConsensusMessage { data: data.clone(), engine_id: [0, 0, 0, 0] };
let topic = HashFor::<Block>::hash(&[1, 2, 3]);
consensus.register_message(topic, message.clone());
let stream1 = consensus.messages_for([0, 0, 0, 0], topic);
let stream2 = consensus.messages_for([0, 0, 0, 0], topic);
let mut stream1 = block_on_stream(consensus.messages_for([0, 0, 0, 0], topic));
let mut stream2 = block_on_stream(consensus.messages_for([0, 0, 0, 0], topic));
assert_eq!(stream1.wait().next(), Some(Ok(TopicNotification { message: message.data.clone(), sender: None })));
assert_eq!(stream2.wait().next(), Some(Ok(TopicNotification { message: message.data, sender: None })));
assert_eq!(stream1.next(), Some(TopicNotification { message: data.clone(), sender: None }));
assert_eq!(stream2.next(), Some(TopicNotification { message: data, sender: None }));
}
#[test]
@@ -749,9 +748,10 @@ mod tests {
consensus.register_message(topic, msg_a);
consensus.register_message(topic, msg_b);
let mut stream = consensus.messages_for([0, 0, 0, 0], topic).wait();
let mut stream = block_on_stream(consensus.messages_for([0, 0, 0, 0], topic));
assert_eq!(stream.next(), Some(TopicNotification { message: vec![1, 2, 3], sender: None }));
assert_eq!(stream.next(), Some(Ok(TopicNotification { message: vec![1, 2, 3], sender: None })));
let _ = consensus.live_message_sinks.remove(&([0, 0, 0, 0], topic));
assert_eq!(stream.next(), None);
}