From e22b4917831be69fa6c82ae19d4d3b8cb2a169e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Thu, 28 Mar 2019 18:57:26 +0100 Subject: [PATCH] Fix a race conditon in the pool when transactions are imported during pruning. (#2136) * Store recently pruned tags to avoid re-importing transactions. * Update core/transaction-pool/graph/src/base_pool.rs * Update core/transaction-pool/graph/src/base_pool.rs * Update core/transaction-pool/graph/src/base_pool.rs * Update base_pool.rs --- substrate/Cargo.lock | 1 + .../core/transaction-pool/graph/Cargo.toml | 1 + .../transaction-pool/graph/src/base_pool.rs | 28 ++++++-- .../core/transaction-pool/graph/src/future.rs | 13 +++- .../core/transaction-pool/graph/src/pool.rs | 69 ++++++++++++++++++- .../core/transaction-pool/graph/src/ready.rs | 18 ++--- 6 files changed, 113 insertions(+), 17 deletions(-) diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index 07d0e3c4d9..6549381f60 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -4263,6 +4263,7 @@ name = "substrate-transaction-graph" version = "0.1.0" dependencies = [ "assert_matches 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)", + "env_logger 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", "error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/substrate/core/transaction-pool/graph/Cargo.toml b/substrate/core/transaction-pool/graph/Cargo.toml index 6244c14e5c..58aeec885d 100644 --- a/substrate/core/transaction-pool/graph/Cargo.toml +++ b/substrate/core/transaction-pool/graph/Cargo.toml @@ -16,5 +16,6 @@ sr-primitives = { path = "../../sr-primitives" } [dev-dependencies] assert_matches = "1.1" +env_logger = "0.6" parity-codec = "3.2" test_runtime = { package = "substrate-test-runtime", path = "../../test-runtime" } diff --git a/substrate/core/transaction-pool/graph/src/base_pool.rs b/substrate/core/transaction-pool/graph/src/base_pool.rs index ecdea3d485..ad434e57d4 100644 --- a/substrate/core/transaction-pool/graph/src/base_pool.rs +++ b/substrate/core/transaction-pool/graph/src/base_pool.rs @@ -19,6 +19,7 @@ //! For a more full-featured pool, have a look at the `pool` module. use std::{ + collections::HashSet, fmt, hash, sync::Arc, @@ -134,6 +135,9 @@ impl fmt::Debug for Transaction where } } +/// Store last pruned tags for given number of invocations. +const RECENTLY_PRUNED_TAGS: usize = 2; + /// Transaction pool. /// /// Builds a dependency graph for all transactions in the pool and returns @@ -148,6 +152,12 @@ impl fmt::Debug for Transaction where pub struct BasePool { future: FutureTransactions, ready: ReadyTransactions, + /// Store recently pruned tags (for last two invocations). + /// + /// This is used to make sure we don't accidentally put + /// transactions to future in case they were just stuck in verification. + recently_pruned: [HashSet; RECENTLY_PRUNED_TAGS], + recently_pruned_index: usize, } impl Default for BasePool { @@ -155,6 +165,8 @@ impl Default for BasePool { BasePool { future: Default::default(), ready: Default::default(), + recently_pruned: Default::default(), + recently_pruned_index: 0, } } } @@ -175,7 +187,11 @@ impl BasePool BasePool) -> PruneStatus { let mut to_import = vec![]; let mut pruned = vec![]; + let recently_pruned = &mut self.recently_pruned[self.recently_pruned_index]; + self.recently_pruned_index = (self.recently_pruned_index + 1) % RECENTLY_PRUNED_TAGS; + recently_pruned.clear(); for tag in tags { // make sure to promote any future transactions that could be unlocked to_import.append(&mut self.future.satisfy_tags(::std::iter::once(&tag))); // and actually prune transactions in ready queue - pruned.append(&mut self.ready.prune_tags(tag)); + pruned.append(&mut self.ready.prune_tags(tag.clone())); + // store the tags for next submission + recently_pruned.insert(tag); } let mut promoted = vec![]; @@ -663,7 +684,6 @@ mod tests { assert_eq!(pool.future.len(), 0); } - #[test] fn should_handle_a_cycle_with_low_priority() { // given @@ -798,7 +818,6 @@ mod tests { assert_eq!(pool.future.len(), 0); } - #[test] fn should_prune_ready_transactions() { // given @@ -887,5 +906,4 @@ mod tests { r#"Transaction { hash: 4, priority: 1000, valid_till: 64, bytes: 1, requires: [03,02], provides: [04], data: [4]}"#.to_owned() ); } - } diff --git a/substrate/core/transaction-pool/graph/src/future.rs b/substrate/core/transaction-pool/graph/src/future.rs index 10293a3de7..6ca5019e47 100644 --- a/substrate/core/transaction-pool/graph/src/future.rs +++ b/substrate/core/transaction-pool/graph/src/future.rs @@ -71,10 +71,19 @@ impl WaitingTransaction { /// /// Computes the set of missing tags based on the requirements and tags that /// are provided by all transactions in the ready queue. - pub fn new(transaction: Transaction, provided: &HashMap) -> Self { + pub fn new( + transaction: Transaction, + provided: &HashMap, + recently_pruned: &[HashSet], + ) -> Self { let missing_tags = transaction.requires .iter() - .filter(|tag| !provided.contains_key(&**tag)) + .filter(|tag| { + // is true if the tag is already satisfied either via transaction in the pool + // or one that was recently included. + let is_provided = provided.contains_key(&**tag) || recently_pruned.iter().any(|x| x.contains(&**tag)); + !is_provided + }) .cloned() .collect(); diff --git a/substrate/core/transaction-pool/graph/src/pool.rs b/substrate/core/transaction-pool/graph/src/pool.rs index e24e6f5ae2..9c3478d3d7 100644 --- a/substrate/core/transaction-pool/graph/src/pool.rs +++ b/substrate/core/transaction-pool/graph/src/pool.rs @@ -460,7 +460,9 @@ mod tests { use crate::watcher; #[derive(Debug, Default)] - struct TestApi; + struct TestApi { + delay: Mutex>>, + } impl ChainApi for TestApi { type Block = Block; @@ -469,9 +471,20 @@ mod tests { /// Verify extrinsic at given block. fn validate_transaction(&self, at: &BlockId, uxt: ExtrinsicFor) -> Result { + let block_number = self.block_id_to_number(at)?.unwrap(); let nonce = uxt.transfer().nonce; + // This is used to control the test flow. + if nonce > 0 { + let opt = self.delay.lock().take(); + if let Some(delay) = opt { + if delay.recv().is_err() { + println!("Error waiting for delay!"); + } + } + } + if nonce < block_number { Ok(TransactionValidity::Invalid(0)) } else { @@ -878,5 +891,59 @@ mod tests { assert_eq!(stream.next(), Some(Ok(watcher::Status::Ready))); assert_eq!(stream.next(), Some(Ok(watcher::Status::Dropped))); } + + #[test] + fn should_handle_pruning_in_the_middle_of_import() { + let _ = env_logger::try_init(); + // given + let (ready, is_ready) = std::sync::mpsc::sync_channel(0); + let (tx, rx) = std::sync::mpsc::sync_channel(1); + let mut api = TestApi::default(); + api.delay = Mutex::new(rx.into()); + let pool = Arc::new(Pool::new(Default::default(), api)); + + // when + let xt = uxt(Transfer { + from: AccountId::from_h256(H256::from_low_u64_be(1)), + to: AccountId::from_h256(H256::from_low_u64_be(2)), + amount: 5, + nonce: 1, + }); + + // This transaction should go to future, since we use `nonce: 1` + let pool2 = pool.clone(); + std::thread::spawn(move || { + pool2.submit_one(&BlockId::Number(0), xt).unwrap(); + ready.send(()).unwrap(); + }); + + // But now before the previous one is imported we import + // the one that it depends on. + let xt = uxt(Transfer { + from: AccountId::from_h256(H256::from_low_u64_be(1)), + to: AccountId::from_h256(H256::from_low_u64_be(2)), + amount: 4, + nonce: 0, + }); + // The tag the above transaction provides (TestApi is using just nonce as u8) + let provides = vec![0_u8]; + pool.submit_one(&BlockId::Number(0), xt).unwrap(); + assert_eq!(pool.status().ready, 1); + + // Now block import happens before the second transaction is able to finish verification. + pool.prune_tags(&BlockId::Number(1), vec![provides], vec![]).unwrap(); + assert_eq!(pool.status().ready, 0); + + + // so when we release the verification of the previous one it will have + // something in `requires`, but should go to ready directly, since the previous transaction was imported + // correctly. + tx.send(()).unwrap(); + + // then + is_ready.recv().unwrap(); // wait for finish + assert_eq!(pool.status().ready, 1); + assert_eq!(pool.status().future, 0); + } } } diff --git a/substrate/core/transaction-pool/graph/src/ready.rs b/substrate/core/transaction-pool/graph/src/ready.rs index 5a9ea11e50..befb1b60cc 100644 --- a/substrate/core/transaction-pool/graph/src/ready.rs +++ b/substrate/core/transaction-pool/graph/src/ready.rs @@ -517,18 +517,18 @@ mod tests { tx3.provides = vec![vec![4]]; // when - let x = WaitingTransaction::new(tx2, &ready.provided_tags()); + let x = WaitingTransaction::new(tx2, &ready.provided_tags(), &[]); ready.import(x).unwrap(); - let x = WaitingTransaction::new(tx3, &ready.provided_tags()); + let x = WaitingTransaction::new(tx3, &ready.provided_tags(), &[]); ready.import(x).unwrap(); assert_eq!(ready.get().count(), 2); // too low priority - let x = WaitingTransaction::new(tx1.clone(), &ready.provided_tags()); + let x = WaitingTransaction::new(tx1.clone(), &ready.provided_tags(), &[]); ready.import(x).unwrap_err(); tx1.priority = 10; - let x = WaitingTransaction::new(tx1.clone(), &ready.provided_tags()); + let x = WaitingTransaction::new(tx1.clone(), &ready.provided_tags(), &[]); ready.import(x).unwrap(); // then @@ -562,15 +562,15 @@ mod tests { }; // when - let x = WaitingTransaction::new(tx1, &ready.provided_tags()); + let x = WaitingTransaction::new(tx1, &ready.provided_tags(), &[]); ready.import(x).unwrap(); - let x = WaitingTransaction::new(tx2, &ready.provided_tags()); + let x = WaitingTransaction::new(tx2, &ready.provided_tags(), &[]); ready.import(x).unwrap(); - let x = WaitingTransaction::new(tx3, &ready.provided_tags()); + let x = WaitingTransaction::new(tx3, &ready.provided_tags(), &[]); ready.import(x).unwrap(); - let x = WaitingTransaction::new(tx4, &ready.provided_tags()); + let x = WaitingTransaction::new(tx4, &ready.provided_tags(), &[]); ready.import(x).unwrap(); - let x = WaitingTransaction::new(tx5, &ready.provided_tags()); + let x = WaitingTransaction::new(tx5, &ready.provided_tags(), &[]); ready.import(x).unwrap(); // then