Switch the client to new futures (#3103)

* Switch the client to new futures

* No need for compat in the client

* Fix client tests

* Address review
This commit is contained in:
Pierre Krieger
2019-07-11 16:58:30 +02:00
committed by Bastian Köcher
parent f5e921281e
commit bf2551a854
28 changed files with 249 additions and 112 deletions
@@ -21,6 +21,7 @@ use network::test::{Block, DummySpecialization, Hash, TestNetFactory, Peer, Peer
use network::test::{PassThroughVerifier};
use network::config::{ProtocolConfig, Roles, BoxFinalityProofRequestBuilder};
use parking_lot::Mutex;
use futures03::{StreamExt as _, TryStreamExt as _};
use tokio::runtime::current_thread;
use keyring::ed25519::{Keyring as AuthorityKeyring};
use client::{
@@ -385,6 +386,7 @@ fn run_to_completion_with<F>(
wait_for.push(
Box::new(
client.finality_notification_stream()
.map(|v| Ok::<_, ()>(v)).compat()
.take_while(move |n| {
let mut highest_finalized = highest_finalized.write();
if *n.header.number() > *highest_finalized {
@@ -495,6 +497,7 @@ fn finalize_3_voters_1_full_observer() {
};
finality_notifications.push(
client.finality_notification_stream()
.map(|v| Ok::<_, ()>(v)).compat()
.take_while(|n| Ok(n.header.number() < &20))
.for_each(move |_| Ok(()))
);
@@ -585,6 +588,7 @@ fn transition_3_voters_twice_1_full_observer() {
// wait for blocks to be finalized before generating new ones
let block_production = client.finality_notification_stream()
.map(|v| Ok::<_, ()>(v)).compat()
.take_while(|n| Ok(n.header.number() < &30))
.for_each(move |n| {
match n.header.number() {
@@ -652,6 +656,7 @@ fn transition_3_voters_twice_1_full_observer() {
finality_notifications.push(
client.finality_notification_stream()
.map(|v| Ok::<_, ()>(v)).compat()
.take_while(|n| Ok(n.header.number() < &30))
.for_each(move |_| Ok(()))
.map(move |()| {
@@ -1275,6 +1280,7 @@ fn finalize_3_voters_1_light_observer() {
let link = net.lock().peer(3).data.lock().take().expect("link initialized on startup; qed");
let finality_notifications = net.lock().peer(3).client().finality_notification_stream()
.map(|v| Ok::<_, ()>(v)).compat()
.take_while(|n| Ok(n.header.number() < &20))
.collect();
@@ -1436,6 +1442,7 @@ fn voter_catches_up_to_latest_round_when_behind() {
finality_notifications.push(
client.finality_notification_stream()
.map(|v| Ok::<_, ()>(v)).compat()
.take_while(|n| Ok(n.header.number() < &50))
.for_each(move |_| Ok(()))
);
@@ -1471,6 +1478,7 @@ fn voter_catches_up_to_latest_round_when_behind() {
let set_state = link.persistent_data.set_state.clone();
let wait = client.finality_notification_stream()
.map(|v| Ok::<_, ()>(v)).compat()
.take_while(|n| Ok(n.header.number() < &50))
.collect()
.map(|_| set_state);
@@ -23,9 +23,10 @@
use super::{BlockStatus, CommunicationIn, Error, SignedMessage};
use log::{debug, warn};
use client::ImportNotifications;
use client::{BlockImportNotification, ImportNotifications};
use futures::prelude::*;
use futures::stream::Fuse;
use futures03::{StreamExt as _, TryStreamExt as _};
use grandpa::voter;
use parking_lot::Mutex;
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, NumberFor};
@@ -64,7 +65,7 @@ pub(crate) trait BlockUntilImported<Block: BlockT>: Sized {
/// Buffering imported messages until blocks with given hashes are imported.
pub(crate) struct UntilImported<Block: BlockT, Status, I, M: BlockUntilImported<Block>> {
import_notifications: Fuse<ImportNotifications<Block>>,
import_notifications: Fuse<Box<dyn Stream<Item = BlockImportNotification<Block>, Error = ()> + Send>>,
status_check: Status,
inner: Fuse<I>,
ready: VecDeque<M::Blocked>,
@@ -91,7 +92,10 @@ impl<Block: BlockT, Status, I: Stream, M> UntilImported<Block, Status, I, M>
let check_pending = Interval::new(now + CHECK_PENDING_INTERVAL, CHECK_PENDING_INTERVAL);
UntilImported {
import_notifications: import_notifications.fuse(),
import_notifications: {
let stream = import_notifications.map::<_, fn(_) -> _>(|v| Ok::<_, ()>(v)).compat();
Box::new(stream) as Box<dyn Stream<Item = _, Error = _> + Send>
}.fuse(),
status_check,
inner: stream.fuse(),
ready: VecDeque::new(),
@@ -194,7 +198,6 @@ impl<Block: BlockT, Status, I, M> Stream for UntilImported<Block, Status, I, M>
if self.import_notifications.is_done() && self.inner.is_done() {
Ok(Async::Ready(None))
} else {
Ok(Async::NotReady)
}
}
@@ -435,7 +438,7 @@ mod tests {
use consensus_common::BlockOrigin;
use client::BlockImportNotification;
use futures::future::Either;
use futures::sync::mpsc;
use futures03::channel::mpsc;
use grandpa::Precommit;
#[derive(Clone)]
@@ -523,7 +526,7 @@ mod tests {
// enact all dependencies before importing the message
enact_dependencies(&chain_state);
let (global_tx, global_rx) = mpsc::unbounded();
let (global_tx, global_rx) = futures::sync::mpsc::unbounded();
let until_imported = UntilGlobalMessageBlocksImported::new(
import_notifications,
@@ -548,7 +551,7 @@ mod tests {
let (chain_state, import_notifications) = TestChainState::new();
let block_status = chain_state.block_status();
let (global_tx, global_rx) = mpsc::unbounded();
let (global_tx, global_rx) = futures::sync::mpsc::unbounded();
let until_imported = UntilGlobalMessageBlocksImported::new(
import_notifications,