From 11da397db8b778b307b41ff8b2b025b605191a4f Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Thu, 15 Aug 2019 12:01:47 +0200 Subject: [PATCH] Transition the transaction-pool to new futures (#3394) * Transition the transaction-pool to new futures * Fix tests * Fix tests again?! --- substrate/Cargo.lock | 3 +- substrate/core/rpc/src/author/mod.rs | 5 ++- substrate/core/rpc/src/author/tests.rs | 1 + substrate/core/service/src/lib.rs | 1 + substrate/core/transaction-pool/Cargo.toml | 1 - .../core/transaction-pool/graph/Cargo.toml | 2 +- .../core/transaction-pool/graph/src/pool.rs | 45 +++++++++---------- .../transaction-pool/graph/src/watcher.rs | 7 ++- 8 files changed, 32 insertions(+), 33 deletions(-) diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index 5e12dcd427..032f36da3c 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -5060,7 +5060,7 @@ dependencies = [ "assert_matches 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "derive_more 0.14.1 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", - "futures 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)", "parity-scale-codec 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -5075,7 +5075,6 @@ name = "substrate-transaction-pool" version = "2.0.0" dependencies = [ "derive_more 0.14.1 (registry+https://github.com/rust-lang/crates.io-index)", - "futures 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)", "parity-scale-codec 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/substrate/core/rpc/src/author/mod.rs b/substrate/core/rpc/src/author/mod.rs index d797e87da5..6e2d7aa92a 100644 --- a/substrate/core/rpc/src/author/mod.rs +++ b/substrate/core/rpc/src/author/mod.rs @@ -25,8 +25,9 @@ mod tests; use std::{sync::Arc, convert::TryInto}; use client::{self, Client}; -use crate::rpc::futures::{Sink, Stream, Future}; +use crate::rpc::futures::{Sink, Future}; use crate::subscriptions::Subscriptions; +use futures03::{StreamExt as _, compat::Compat}; use jsonrpc_derive::rpc; use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId}; use log::warn; @@ -249,7 +250,7 @@ impl AuthorApi, BlockHash

> for Author whe self.subscriptions.add(subscriber, move |sink| { sink .sink_map_err(|e| warn!("Error sending notifications: {:?}", e)) - .send_all(watcher.into_stream().map(Ok)) + .send_all(Compat::new(watcher.into_stream().map(|v| Ok::<_, ()>(Ok(v))))) .map(|_| ()) }) } diff --git a/substrate/core/rpc/src/author/tests.rs b/substrate/core/rpc/src/author/tests.rs index 0fdff9989b..e7cf84c927 100644 --- a/substrate/core/rpc/src/author/tests.rs +++ b/substrate/core/rpc/src/author/tests.rs @@ -23,6 +23,7 @@ use transaction_pool::{ txpool::Pool, ChainApi, }; +use futures::Stream; use primitives::{ H256, blake2_256, hexdisplay::HexDisplay, traits::BareCryptoStore, testing::KeyStore, ed25519, crypto::key_types, diff --git a/substrate/core/service/src/lib.rs b/substrate/core/service/src/lib.rs index dc34a48853..98f4daf97e 100644 --- a/substrate/core/service/src/lib.rs +++ b/substrate/core/service/src/lib.rs @@ -298,6 +298,7 @@ impl Service { let network = Arc::downgrade(&network); let transaction_pool_ = transaction_pool.clone(); let events = transaction_pool.import_notification_stream() + .map(|v| Ok::<_, ()>(v)).compat() .for_each(move |_| { if let Some(network) = network.upgrade() { network.trigger_repropagate(); diff --git a/substrate/core/transaction-pool/Cargo.toml b/substrate/core/transaction-pool/Cargo.toml index 747c39a165..5e9973b6dc 100644 --- a/substrate/core/transaction-pool/Cargo.toml +++ b/substrate/core/transaction-pool/Cargo.toml @@ -6,7 +6,6 @@ edition = "2018" [dependencies] derive_more = "0.14.0" -futures = "0.1" log = "0.4" codec = { package = "parity-scale-codec", version = "1.0.0" } parking_lot = "0.9.0" diff --git a/substrate/core/transaction-pool/graph/Cargo.toml b/substrate/core/transaction-pool/graph/Cargo.toml index 4dc7ce69cf..a19f3aaccc 100644 --- a/substrate/core/transaction-pool/graph/Cargo.toml +++ b/substrate/core/transaction-pool/graph/Cargo.toml @@ -6,7 +6,7 @@ edition = "2018" [dependencies] derive_more = "0.14.0" -futures = "0.1" +futures-preview = "=0.3.0-alpha.17" log = "0.4" parking_lot = "0.9.0" serde = { version = "1.0", features = ["derive"] } diff --git a/substrate/core/transaction-pool/graph/src/pool.rs b/substrate/core/transaction-pool/graph/src/pool.rs index 6eec0d222f..06159257ae 100644 --- a/substrate/core/transaction-pool/graph/src/pool.rs +++ b/substrate/core/transaction-pool/graph/src/pool.rs @@ -29,7 +29,7 @@ use crate::watcher::Watcher; use serde::Serialize; use log::debug; -use futures::sync::mpsc; +use futures::channel::mpsc; use parking_lot::{Mutex, RwLock}; use sr_primitives::{ generic::BlockId, @@ -453,7 +453,6 @@ fn fire_events( mod tests { use super::*; use sr_primitives::transaction_validity::ValidTransaction; - use futures::Stream; use codec::Encode; use test_runtime::{Block, Extrinsic, Transfer, H256, AccountId}; use assert_matches::assert_matches; @@ -605,9 +604,9 @@ mod tests { }; // then - let mut it = stream.wait(); - assert_eq!(it.next(), Some(Ok(()))); - assert_eq!(it.next(), Some(Ok(()))); + let mut it = futures::executor::block_on_stream(stream); + assert_eq!(it.next(), Some(())); + assert_eq!(it.next(), Some(())); assert_eq!(it.next(), None); } @@ -747,9 +746,9 @@ mod tests { assert_eq!(pool.status().future, 0); // then - let mut stream = watcher.into_stream().wait(); - assert_eq!(stream.next(), Some(Ok(watcher::Status::Ready))); - assert_eq!(stream.next(), Some(Ok(watcher::Status::Finalized(H256::from_low_u64_be(2).into())))); + let mut stream = futures::executor::block_on_stream(watcher.into_stream()); + assert_eq!(stream.next(), Some(watcher::Status::Ready)); + assert_eq!(stream.next(), Some(watcher::Status::Finalized(H256::from_low_u64_be(2).into()))); assert_eq!(stream.next(), None); } @@ -772,9 +771,9 @@ mod tests { assert_eq!(pool.status().future, 0); // then - let mut stream = watcher.into_stream().wait(); - assert_eq!(stream.next(), Some(Ok(watcher::Status::Ready))); - assert_eq!(stream.next(), Some(Ok(watcher::Status::Finalized(H256::from_low_u64_be(2).into())))); + let mut stream = futures::executor::block_on_stream(watcher.into_stream()); + assert_eq!(stream.next(), Some(watcher::Status::Ready)); + assert_eq!(stream.next(), Some(watcher::Status::Finalized(H256::from_low_u64_be(2).into()))); assert_eq!(stream.next(), None); } @@ -801,9 +800,9 @@ mod tests { assert_eq!(pool.status().ready, 2); // then - let mut stream = watcher.into_stream().wait(); - assert_eq!(stream.next(), Some(Ok(watcher::Status::Future))); - assert_eq!(stream.next(), Some(Ok(watcher::Status::Ready))); + let mut stream = futures::executor::block_on_stream(watcher.into_stream()); + assert_eq!(stream.next(), Some(watcher::Status::Future)); + assert_eq!(stream.next(), Some(watcher::Status::Ready)); } #[test] @@ -824,9 +823,9 @@ mod tests { // then - let mut stream = watcher.into_stream().wait(); - assert_eq!(stream.next(), Some(Ok(watcher::Status::Ready))); - assert_eq!(stream.next(), Some(Ok(watcher::Status::Invalid))); + let mut stream = futures::executor::block_on_stream(watcher.into_stream()); + assert_eq!(stream.next(), Some(watcher::Status::Ready)); + assert_eq!(stream.next(), Some(watcher::Status::Invalid)); assert_eq!(stream.next(), None); } @@ -851,9 +850,9 @@ mod tests { // then - let mut stream = watcher.into_stream().wait(); - assert_eq!(stream.next(), Some(Ok(watcher::Status::Ready))); - assert_eq!(stream.next(), Some(Ok(watcher::Status::Broadcast(peers)))); + let mut stream = futures::executor::block_on_stream(watcher.into_stream()); + assert_eq!(stream.next(), Some(watcher::Status::Ready)); + assert_eq!(stream.next(), Some(watcher::Status::Broadcast(peers))); } #[test] @@ -888,9 +887,9 @@ mod tests { assert_eq!(pool.status().ready, 1); // then - let mut stream = watcher.into_stream().wait(); - assert_eq!(stream.next(), Some(Ok(watcher::Status::Ready))); - assert_eq!(stream.next(), Some(Ok(watcher::Status::Dropped))); + let mut stream = futures::executor::block_on_stream(watcher.into_stream()); + assert_eq!(stream.next(), Some(watcher::Status::Ready)); + assert_eq!(stream.next(), Some(watcher::Status::Dropped)); } #[test] diff --git a/substrate/core/transaction-pool/graph/src/watcher.rs b/substrate/core/transaction-pool/graph/src/watcher.rs index 44ab8431e8..11d6b9f407 100644 --- a/substrate/core/transaction-pool/graph/src/watcher.rs +++ b/substrate/core/transaction-pool/graph/src/watcher.rs @@ -18,7 +18,7 @@ use futures::{ Stream, - sync::mpsc, + channel::mpsc, }; use serde::{Serialize, Deserialize}; @@ -60,9 +60,8 @@ impl Watcher { /// Pipe the notifications to given sink. /// /// Make sure to drive the future to completion. - pub fn into_stream(self) -> impl Stream, Error=()> { - // we can safely ignore the error here, `UnboundedReceiver` never fails. - self.receiver.map_err(|_| ()) + pub fn into_stream(self) -> impl Stream> { + self.receiver } }