From d7ffef43cee79fbe554af9af85da9a0636b0bc3e Mon Sep 17 00:00:00 2001 From: Nikolay Volf Date: Sun, 29 Mar 2020 01:40:00 -0700 Subject: [PATCH] don't use delays in tests (#5404) --- substrate/Cargo.lock | 12 ++- substrate/client/transaction-pool/Cargo.toml | 2 +- substrate/client/transaction-pool/src/lib.rs | 23 +++++- .../transaction-pool/src/revalidation.rs | 47 ++++++++---- .../transaction-pool/src/testing/pool.rs | 74 ++++++++++--------- 5 files changed, 106 insertions(+), 52 deletions(-) diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index 22636830a6..b0d6c55686 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -2345,6 +2345,16 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "141340095b15ed7491bd3d4ced9d20cebfb826174b6bb03386381f62b01e3d77" +[[package]] +name = "intervalier" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "750dc2c10615a0aa0d38a5adf9d4e62651c178109f40253cb6235b3f638af6a9" +dependencies = [ + "futures 0.3.4", + "futures-timer 2.0.2", +] + [[package]] name = "iovec" version = "0.1.4" @@ -6690,8 +6700,8 @@ dependencies = [ "derive_more", "futures 0.3.4", "futures-diagnose", - "futures-timer 2.0.2", "hex", + "intervalier", "log 0.4.8", "parity-scale-codec", "parity-util-mem", diff --git a/substrate/client/transaction-pool/Cargo.toml b/substrate/client/transaction-pool/Cargo.toml index e01409ecb0..3528463c7b 100644 --- a/substrate/client/transaction-pool/Cargo.toml +++ b/substrate/client/transaction-pool/Cargo.toml @@ -23,7 +23,7 @@ sc-transaction-graph = { version = "2.0.0-alpha.5", path = "./graph" } sp-transaction-pool = { version = "2.0.0-alpha.5", path = "../../primitives/transaction-pool" } sc-client-api = { version = "2.0.0-alpha.5", path = "../api" } sp-blockchain = { version = "2.0.0-alpha.5", path = "../../primitives/blockchain" } -futures-timer = "2.0" +intervalier = "0.3" parity-util-mem = { version = "0.6.0", default-features = false, features = ["primitive-types"] } [dev-dependencies] diff --git a/substrate/client/transaction-pool/src/lib.rs b/substrate/client/transaction-pool/src/lib.rs index 4c54cf28e0..c50d9dbbb4 100644 --- a/substrate/client/transaction-pool/src/lib.rs +++ b/substrate/client/transaction-pool/src/lib.rs @@ -31,7 +31,7 @@ pub use sc_transaction_graph as txpool; pub use crate::api::{FullChainApi, LightChainApi}; use std::{collections::HashMap, sync::Arc, pin::Pin}; -use futures::{Future, FutureExt, future::ready, channel::oneshot}; +use futures::{prelude::*, future::ready, channel::oneshot}; use parking_lot::Mutex; use sp_runtime::{ @@ -151,6 +151,27 @@ impl BasicPool Self::with_revalidation_type(options, pool_api, RevalidationType::Full) } + /// Create new basic transaction pool with provided api, for tests. + #[cfg(test)] + pub fn new_test( + pool_api: Arc, + ) -> (Self, Pin + Send>>, intervalier::BackSignalControl) { + let pool = Arc::new(sc_transaction_graph::Pool::new(Default::default(), pool_api.clone())); + let (revalidation_queue, background_task, notifier) = + revalidation::RevalidationQueue::new_test(pool_api.clone(), pool.clone()); + ( + BasicPool { + api: pool_api, + pool, + revalidation_queue: Arc::new(revalidation_queue), + revalidation_strategy: Arc::new(Mutex::new(RevalidationStrategy::Always)), + ready_poll: Default::default(), + }, + background_task, + notifier, + ) + } + /// Create new basic transaction pool with provided api and custom /// revalidation type. pub fn with_revalidation_type( diff --git a/substrate/client/transaction-pool/src/revalidation.rs b/substrate/client/transaction-pool/src/revalidation.rs index ee0aa1ab2d..5a3b2521c3 100644 --- a/substrate/client/transaction-pool/src/revalidation.rs +++ b/substrate/client/transaction-pool/src/revalidation.rs @@ -23,9 +23,8 @@ use sp_runtime::traits::{Zero, SaturatedConversion}; use sp_runtime::generic::BlockId; use sp_runtime::transaction_validity::TransactionValidityError; -use futures::{prelude::*, channel::mpsc, stream::unfold}; +use futures::{prelude::*, channel::mpsc}; use std::time::Duration; -use futures_timer::Delay; #[cfg(not(test))] const BACKGROUND_REVALIDATION_INTERVAL: Duration = Duration::from_millis(200); @@ -53,12 +52,6 @@ struct RevalidationWorker { impl Unpin for RevalidationWorker {} -fn interval(duration: Duration) -> impl Stream + Unpin { - unfold((), move |_| { - Delay::new(duration).map(|_| Some(((), ()))) - }).map(drop) -} - /// Revalidate batch of transaction. /// /// Each transaction is validated against chain, and invalid are @@ -207,8 +200,13 @@ impl RevalidationWorker { /// It does two things: periodically tries to process some transactions /// from the queue and also accepts messages to enqueue some more /// transactions from the pool. - pub async fn run(mut self, from_queue: mpsc::UnboundedReceiver>) { - let interval = interval(BACKGROUND_REVALIDATION_INTERVAL).fuse(); + pub async fn run( + mut self, + from_queue: mpsc::UnboundedReceiver>, + interval: R, + ) where R: Send, R::Guard: Send + { + let interval = interval.into_stream().fuse(); let from_queue = from_queue.fuse(); futures::pin_mut!(interval, from_queue); let this = &mut self; @@ -270,9 +268,12 @@ where } } - /// New revalidation queue with background worker. - pub fn new_background(api: Arc, pool: Arc>) -> - (Self, Pin + Send>>) + pub fn new_with_interval( + api: Arc, + pool: Arc>, + interval: R, + ) -> (Self, Pin + Send>>) + where R: Send + 'static, R::Guard: Send { let (to_worker, from_queue) = mpsc::unbounded(); @@ -285,7 +286,25 @@ where background: Some(to_worker), }; - (queue, worker.run(from_queue).boxed()) + (queue, worker.run(from_queue, interval).boxed()) + } + + /// New revalidation queue with background worker. + pub fn new_background(api: Arc, pool: Arc>) -> + (Self, Pin + Send>>) + { + Self::new_with_interval(api, pool, intervalier::Interval::new(BACKGROUND_REVALIDATION_INTERVAL)) + } + + /// New revalidation queue with background worker and test signal. + #[cfg(test)] + pub fn new_test(api: Arc, pool: Arc>) -> + (Self, Pin + Send>>, intervalier::BackSignalControl) + { + let (interval, notifier) = intervalier::BackSignalInterval::new(BACKGROUND_REVALIDATION_INTERVAL); + let (queue, background) = Self::new_with_interval(api, pool, interval); + + (queue, background, notifier) } /// Queue some transaction for later revalidation. diff --git a/substrate/client/transaction-pool/src/testing/pool.rs b/substrate/client/transaction-pool/src/testing/pool.rs index 7dbb8e6158..766257ff5e 100644 --- a/substrate/client/transaction-pool/src/testing/pool.rs +++ b/substrate/client/transaction-pool/src/testing/pool.rs @@ -27,20 +27,25 @@ use substrate_test_runtime_client::{ AccountKeyring::*, }; use substrate_test_runtime_transaction_pool::{TestApi, uxt}; -use crate::revalidation::BACKGROUND_REVALIDATION_INTERVAL; -use futures::task::Poll; +use futures::{prelude::*, task::Poll}; use codec::Encode; fn pool() -> Pool { Pool::new(Default::default(), TestApi::with_alice_nonce(209).into()) } -fn maintained_pool() -> (BasicPool, futures::executor::ThreadPool) { - let (pool, background_task) = BasicPool::new(Default::default(), std::sync::Arc::new(TestApi::with_alice_nonce(209))); +fn maintained_pool() -> ( + BasicPool, + futures::executor::ThreadPool, + intervalier::BackSignalControl, +) { + let (pool, background_task, notifier) = BasicPool::new_test( + std::sync::Arc::new(TestApi::with_alice_nonce(209)) + ); let thread_pool = futures::executor::ThreadPool::new().unwrap(); - thread_pool.spawn_ok(background_task.expect("basic pool have background task")); - (pool, thread_pool) + thread_pool.spawn_ok(background_task); + (pool, thread_pool, notifier) } fn header(number: u64) -> Header { @@ -190,7 +195,7 @@ fn block_event_with_retracted(id: u64, retracted: Vec) -> ChainEvent>(), @@ -409,7 +413,7 @@ fn should_push_watchers_during_maintaince() { #[test] fn can_track_heap_size() { - let (pool, _guard) = maintained_pool(); + let (pool, _guard, _notifier) = maintained_pool(); block_on(pool.submit_one(&BlockId::number(0), SOURCE, uxt(Alice, 209))).expect("1. Imported"); block_on(pool.submit_one(&BlockId::number(0), SOURCE, uxt(Alice, 210))).expect("1. Imported"); block_on(pool.submit_one(&BlockId::number(0), SOURCE, uxt(Alice, 211))).expect("1. Imported"); @@ -629,7 +633,7 @@ fn fork_aware_finalization() { #[test] fn ready_set_should_not_resolve_before_block_update() { - let (pool, _guard) = maintained_pool(); + let (pool, _guard, _notifier) = maintained_pool(); let xt1 = uxt(Alice, 209); block_on(pool.submit_one(&BlockId::number(1), SOURCE, xt1.clone())).expect("1. Imported"); @@ -638,7 +642,7 @@ fn ready_set_should_not_resolve_before_block_update() { #[test] fn ready_set_should_resolve_after_block_update() { - let (pool, _guard) = maintained_pool(); + let (pool, _guard, _notifier) = maintained_pool(); pool.api.push_block(1, vec![]); let xt1 = uxt(Alice, 209); @@ -651,7 +655,7 @@ fn ready_set_should_resolve_after_block_update() { #[test] fn ready_set_should_eventually_resolve_when_block_update_arrives() { - let (pool, _guard) = maintained_pool(); + let (pool, _guard, _notifier) = maintained_pool(); pool.api.push_block(1, vec![]); let xt1 = uxt(Alice, 209);