Transition the transaction-pool to new futures (#3394)

* Transition the transaction-pool to new futures

* Fix tests

* Fix tests again?!
This commit is contained in:
Pierre Krieger
2019-08-15 12:01:47 +02:00
committed by Gavin Wood
parent 9d6118279e
commit 11da397db8
8 changed files with 32 additions and 33 deletions
+1 -2
View File
@@ -5060,7 +5060,7 @@ dependencies = [
"assert_matches 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "assert_matches 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"derive_more 0.14.1 (registry+https://github.com/rust-lang/crates.io-index)", "derive_more 0.14.1 (registry+https://github.com/rust-lang/crates.io-index)",
"env_logger 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)", "futures-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)",
"parity-scale-codec 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)", "parity-scale-codec 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)",
"parking_lot 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -5075,7 +5075,6 @@ name = "substrate-transaction-pool"
version = "2.0.0" version = "2.0.0"
dependencies = [ dependencies = [
"derive_more 0.14.1 (registry+https://github.com/rust-lang/crates.io-index)", "derive_more 0.14.1 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)",
"parity-scale-codec 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)", "parity-scale-codec 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)",
"parking_lot 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
+3 -2
View File
@@ -25,8 +25,9 @@ mod tests;
use std::{sync::Arc, convert::TryInto}; use std::{sync::Arc, convert::TryInto};
use client::{self, Client}; use client::{self, Client};
use crate::rpc::futures::{Sink, Stream, Future}; use crate::rpc::futures::{Sink, Future};
use crate::subscriptions::Subscriptions; use crate::subscriptions::Subscriptions;
use futures03::{StreamExt as _, compat::Compat};
use jsonrpc_derive::rpc; use jsonrpc_derive::rpc;
use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId}; use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId};
use log::warn; use log::warn;
@@ -249,7 +250,7 @@ impl<B, E, P, RA> AuthorApi<ExHash<P>, BlockHash<P>> for Author<B, E, P, RA> whe
self.subscriptions.add(subscriber, move |sink| { self.subscriptions.add(subscriber, move |sink| {
sink sink
.sink_map_err(|e| warn!("Error sending notifications: {:?}", e)) .sink_map_err(|e| warn!("Error sending notifications: {:?}", e))
.send_all(watcher.into_stream().map(Ok)) .send_all(Compat::new(watcher.into_stream().map(|v| Ok::<_, ()>(Ok(v)))))
.map(|_| ()) .map(|_| ())
}) })
} }
+1
View File
@@ -23,6 +23,7 @@ use transaction_pool::{
txpool::Pool, txpool::Pool,
ChainApi, ChainApi,
}; };
use futures::Stream;
use primitives::{ use primitives::{
H256, blake2_256, hexdisplay::HexDisplay, traits::BareCryptoStore, testing::KeyStore, H256, blake2_256, hexdisplay::HexDisplay, traits::BareCryptoStore, testing::KeyStore,
ed25519, crypto::key_types, ed25519, crypto::key_types,
+1
View File
@@ -298,6 +298,7 @@ impl<Components: components::Components> Service<Components> {
let network = Arc::downgrade(&network); let network = Arc::downgrade(&network);
let transaction_pool_ = transaction_pool.clone(); let transaction_pool_ = transaction_pool.clone();
let events = transaction_pool.import_notification_stream() let events = transaction_pool.import_notification_stream()
.map(|v| Ok::<_, ()>(v)).compat()
.for_each(move |_| { .for_each(move |_| {
if let Some(network) = network.upgrade() { if let Some(network) = network.upgrade() {
network.trigger_repropagate(); network.trigger_repropagate();
@@ -6,7 +6,6 @@ edition = "2018"
[dependencies] [dependencies]
derive_more = "0.14.0" derive_more = "0.14.0"
futures = "0.1"
log = "0.4" log = "0.4"
codec = { package = "parity-scale-codec", version = "1.0.0" } codec = { package = "parity-scale-codec", version = "1.0.0" }
parking_lot = "0.9.0" parking_lot = "0.9.0"
@@ -6,7 +6,7 @@ edition = "2018"
[dependencies] [dependencies]
derive_more = "0.14.0" derive_more = "0.14.0"
futures = "0.1" futures-preview = "=0.3.0-alpha.17"
log = "0.4" log = "0.4"
parking_lot = "0.9.0" parking_lot = "0.9.0"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
@@ -29,7 +29,7 @@ use crate::watcher::Watcher;
use serde::Serialize; use serde::Serialize;
use log::debug; use log::debug;
use futures::sync::mpsc; use futures::channel::mpsc;
use parking_lot::{Mutex, RwLock}; use parking_lot::{Mutex, RwLock};
use sr_primitives::{ use sr_primitives::{
generic::BlockId, generic::BlockId,
@@ -453,7 +453,6 @@ fn fire_events<H, H2, Ex>(
mod tests { mod tests {
use super::*; use super::*;
use sr_primitives::transaction_validity::ValidTransaction; use sr_primitives::transaction_validity::ValidTransaction;
use futures::Stream;
use codec::Encode; use codec::Encode;
use test_runtime::{Block, Extrinsic, Transfer, H256, AccountId}; use test_runtime::{Block, Extrinsic, Transfer, H256, AccountId};
use assert_matches::assert_matches; use assert_matches::assert_matches;
@@ -605,9 +604,9 @@ mod tests {
}; };
// then // then
let mut it = stream.wait(); let mut it = futures::executor::block_on_stream(stream);
assert_eq!(it.next(), Some(Ok(()))); assert_eq!(it.next(), Some(()));
assert_eq!(it.next(), Some(Ok(()))); assert_eq!(it.next(), Some(()));
assert_eq!(it.next(), None); assert_eq!(it.next(), None);
} }
@@ -747,9 +746,9 @@ mod tests {
assert_eq!(pool.status().future, 0); assert_eq!(pool.status().future, 0);
// then // then
let mut stream = watcher.into_stream().wait(); let mut stream = futures::executor::block_on_stream(watcher.into_stream());
assert_eq!(stream.next(), Some(Ok(watcher::Status::Ready))); assert_eq!(stream.next(), Some(watcher::Status::Ready));
assert_eq!(stream.next(), Some(Ok(watcher::Status::Finalized(H256::from_low_u64_be(2).into())))); assert_eq!(stream.next(), Some(watcher::Status::Finalized(H256::from_low_u64_be(2).into())));
assert_eq!(stream.next(), None); assert_eq!(stream.next(), None);
} }
@@ -772,9 +771,9 @@ mod tests {
assert_eq!(pool.status().future, 0); assert_eq!(pool.status().future, 0);
// then // then
let mut stream = watcher.into_stream().wait(); let mut stream = futures::executor::block_on_stream(watcher.into_stream());
assert_eq!(stream.next(), Some(Ok(watcher::Status::Ready))); assert_eq!(stream.next(), Some(watcher::Status::Ready));
assert_eq!(stream.next(), Some(Ok(watcher::Status::Finalized(H256::from_low_u64_be(2).into())))); assert_eq!(stream.next(), Some(watcher::Status::Finalized(H256::from_low_u64_be(2).into())));
assert_eq!(stream.next(), None); assert_eq!(stream.next(), None);
} }
@@ -801,9 +800,9 @@ mod tests {
assert_eq!(pool.status().ready, 2); assert_eq!(pool.status().ready, 2);
// then // then
let mut stream = watcher.into_stream().wait(); let mut stream = futures::executor::block_on_stream(watcher.into_stream());
assert_eq!(stream.next(), Some(Ok(watcher::Status::Future))); assert_eq!(stream.next(), Some(watcher::Status::Future));
assert_eq!(stream.next(), Some(Ok(watcher::Status::Ready))); assert_eq!(stream.next(), Some(watcher::Status::Ready));
} }
#[test] #[test]
@@ -824,9 +823,9 @@ mod tests {
// then // then
let mut stream = watcher.into_stream().wait(); let mut stream = futures::executor::block_on_stream(watcher.into_stream());
assert_eq!(stream.next(), Some(Ok(watcher::Status::Ready))); assert_eq!(stream.next(), Some(watcher::Status::Ready));
assert_eq!(stream.next(), Some(Ok(watcher::Status::Invalid))); assert_eq!(stream.next(), Some(watcher::Status::Invalid));
assert_eq!(stream.next(), None); assert_eq!(stream.next(), None);
} }
@@ -851,9 +850,9 @@ mod tests {
// then // then
let mut stream = watcher.into_stream().wait(); let mut stream = futures::executor::block_on_stream(watcher.into_stream());
assert_eq!(stream.next(), Some(Ok(watcher::Status::Ready))); assert_eq!(stream.next(), Some(watcher::Status::Ready));
assert_eq!(stream.next(), Some(Ok(watcher::Status::Broadcast(peers)))); assert_eq!(stream.next(), Some(watcher::Status::Broadcast(peers)));
} }
#[test] #[test]
@@ -888,9 +887,9 @@ mod tests {
assert_eq!(pool.status().ready, 1); assert_eq!(pool.status().ready, 1);
// then // then
let mut stream = watcher.into_stream().wait(); let mut stream = futures::executor::block_on_stream(watcher.into_stream());
assert_eq!(stream.next(), Some(Ok(watcher::Status::Ready))); assert_eq!(stream.next(), Some(watcher::Status::Ready));
assert_eq!(stream.next(), Some(Ok(watcher::Status::Dropped))); assert_eq!(stream.next(), Some(watcher::Status::Dropped));
} }
#[test] #[test]
@@ -18,7 +18,7 @@
use futures::{ use futures::{
Stream, Stream,
sync::mpsc, channel::mpsc,
}; };
use serde::{Serialize, Deserialize}; use serde::{Serialize, Deserialize};
@@ -60,9 +60,8 @@ impl<H, H2> Watcher<H, H2> {
/// Pipe the notifications to given sink. /// Pipe the notifications to given sink.
/// ///
/// Make sure to drive the future to completion. /// Make sure to drive the future to completion.
pub fn into_stream(self) -> impl Stream<Item=Status<H, H2>, Error=()> { pub fn into_stream(self) -> impl Stream<Item=Status<H, H2>> {
// we can safely ignore the error here, `UnboundedReceiver` never fails. self.receiver
self.receiver.map_err(|_| ())
} }
} }