diff --git a/substrate/client/transaction-pool/graph/src/base_pool.rs b/substrate/client/transaction-pool/graph/src/base_pool.rs index 8878e9e6dd..820c1bea73 100644 --- a/substrate/client/transaction-pool/graph/src/base_pool.rs +++ b/substrate/client/transaction-pool/graph/src/base_pool.rs @@ -274,7 +274,12 @@ impl BasePool Clone for TransactionRef { impl Ord for TransactionRef { fn cmp(&self, other: &Self) -> cmp::Ordering { self.transaction.priority.cmp(&other.transaction.priority) - .then(other.transaction.valid_till.cmp(&self.transaction.valid_till)) - .then(other.insertion_id.cmp(&self.insertion_id)) + .then_with(|| other.transaction.valid_till.cmp(&self.transaction.valid_till)) + .then_with(|| other.insertion_id.cmp(&self.insertion_id)) } } @@ -161,7 +161,10 @@ impl ReadyTransactions { &mut self, tx: WaitingTransaction, ) -> error::Result>>> { - assert!(tx.is_ready(), "Only ready transactions can be imported."); + assert!( + tx.is_ready(), + "Only ready transactions can be imported. Missing: {:?}", tx.missing_tags + ); assert!(!self.ready.read().contains_key(&tx.transaction.hash), "Transaction is already imported."); self.insertion_id += 1; @@ -169,10 +172,11 @@ impl ReadyTransactions { let hash = tx.transaction.hash.clone(); let transaction = tx.transaction; - let replaced = self.replace_previous(&transaction)?; + let (replaced, unlocks) = self.replace_previous(&transaction)?; let mut goes_to_best = true; let mut ready = self.ready.write(); + let mut requires_offset = 0; // Add links to transactions that unlock the current one for tag in &transaction.requires { // Check if the transaction that satisfies the tag is still in the queue. @@ -181,10 +185,14 @@ impl ReadyTransactions { tx.unlocks.push(hash.clone()); // this transaction depends on some other, so it doesn't go to best directly. goes_to_best = false; + } else { + requires_offset += 1; } } // update provided_tags + // call to replace_previous guarantees that we will be overwriting + // only entries that have been removed. for tag in &transaction.provides { self.provided_tags.insert(tag.clone(), hash.clone()); } @@ -202,8 +210,8 @@ impl ReadyTransactions { // insert to Ready ready.insert(hash, ReadyTx { transaction, - unlocks: vec![], - requires_offset: 0, + unlocks, + requires_offset, }); Ok(replaced) @@ -236,9 +244,21 @@ impl ReadyTransactions { /// (i.e. the entire subgraph that this transaction is a start of will be removed). /// All removed transactions are returned. pub fn remove_subtree(&mut self, hashes: &[Hash]) -> Vec>> { - let mut removed = vec![]; - let mut to_remove = hashes.iter().cloned().collect::>(); + let to_remove = hashes.iter().cloned().collect::>(); + self.remove_subtree_with_tag_filter(to_remove, None) + } + /// Removes a subtrees of transactions trees starting from roots given in `to_remove`. + /// + /// We proceed with a particular branch only if there is at least one provided tag + /// that is not part of `provides_tag_filter`. I.e. the filter contains tags + /// that will stay in the pool, so that we can early exit and avoid descending. + fn remove_subtree_with_tag_filter( + &mut self, + mut to_remove: Vec, + provides_tag_filter: Option>, + ) -> Vec>> { + let mut removed = vec![]; let mut ready = self.ready.write(); loop { let hash = match to_remove.pop() { @@ -247,10 +267,21 @@ impl ReadyTransactions { }; if let Some(mut tx) = ready.remove(&hash) { + let invalidated = tx.transaction.transaction.provides + .iter() + .filter(|tag| provides_tag_filter + .as_ref() + .map(|filter| !filter.contains(&**tag)) + .unwrap_or(true) + ); + + let mut removed_some_tags = false; // remove entries from provided_tags - for tag in &tx.transaction.transaction.provides { + for tag in invalidated { + removed_some_tags = true; self.provided_tags.remove(tag); } + // remove from unlocks for tag in &tx.transaction.transaction.requires { if let Some(hash) = self.provided_tags.get(tag) { @@ -263,8 +294,10 @@ impl ReadyTransactions { // remove from best self.best.remove(&tx.transaction); - // remove all transactions that the current one unlocks - to_remove.append(&mut tx.unlocks); + if removed_some_tags { + // remove all transactions that the current one unlocks + to_remove.append(&mut tx.unlocks); + } // add to removed trace!(target: "txpool", "[{:?}] Removed as part of the subtree.", hash); @@ -363,9 +396,15 @@ impl ReadyTransactions { /// we are about to replace is lower than the priority of the replacement transaction. /// We remove/replace old transactions in case they have lower priority. /// - /// In case replacement is successful returns a list of removed transactions. - fn replace_previous(&mut self, tx: &Transaction) -> error::Result>>> { - let mut to_remove = { + /// In case replacement is successful returns a list of removed transactions + /// and a list of hashes that are still in pool and gets unlocked by the new transaction. + fn replace_previous( + &mut self, + tx: &Transaction, + ) -> error::Result< + (Vec>>, Vec) + > { + let (to_remove, unlocks) = { // check if we are replacing a transaction let replace_hashes = tx.provides .iter() @@ -374,7 +413,7 @@ impl ReadyTransactions { // early exit if we are not replacing anything. if replace_hashes.is_empty() { - return Ok(vec![]); + return Ok((vec![], vec![])); } // now check if collective priority is lower than the replacement transaction. @@ -383,7 +422,9 @@ impl ReadyTransactions { replace_hashes .iter() .filter_map(|hash| ready.get(hash)) - .fold(0u64, |total, tx| total.saturating_add(tx.transaction.transaction.priority)) + .fold(0u64, |total, tx| + total.saturating_add(tx.transaction.transaction.priority) + ) }; // bail - the transaction has too low priority to replace the old ones @@ -391,35 +432,31 @@ impl ReadyTransactions { return Err(error::Error::TooLowPriority { old: old_priority, new: tx.priority }) } - replace_hashes.into_iter().cloned().collect::>() + // construct a list of unlocked transactions + let unlocks = { + let ready = self.ready.read(); + replace_hashes + .iter() + .filter_map(|hash| ready.get(hash)) + .fold(vec![], |mut list, tx| { + list.extend(tx.unlocks.iter().cloned()); + list + }) + }; + + ( + replace_hashes.into_iter().cloned().collect::>(), + unlocks + ) }; let new_provides = tx.provides.iter().cloned().collect::>(); - let mut removed = vec![]; - loop { - let hash = match to_remove.pop() { - Some(hash) => hash, - None => return Ok(removed), - }; + let removed = self.remove_subtree_with_tag_filter(to_remove, Some(new_provides)); - let tx = self.ready.write().remove(&hash).expect(HASH_READY); - // check if this transaction provides stuff that is not provided by the new one. - let (mut unlocks, tx) = (tx.unlocks, tx.transaction.transaction); - { - let invalidated = tx.provides - .iter() - .filter(|tag| !new_provides.contains(&**tag)); - - for tag in invalidated { - // remove the tag since it's no longer provided by any transaction - self.provided_tags.remove(tag); - // add more transactions to remove - to_remove.append(&mut unlocks); - } - } - - removed.push(tx); - } + Ok(( + removed, + unlocks + )) } /// Returns number of transactions in this queue. @@ -444,7 +481,7 @@ impl BestIterator { /// Depending on number of satisfied requirements insert given ref /// either to awaiting set or to best set. fn best_or_awaiting(&mut self, satisfied: usize, tx_ref: TransactionRef) { - if satisfied == tx_ref.transaction.requires.len() { + if satisfied >= tx_ref.transaction.requires.len() { // If we have satisfied all deps insert to best self.best.insert(tx_ref); @@ -517,6 +554,14 @@ mod tests { } } + fn import( + ready: &mut ReadyTransactions, + tx: Transaction + ) -> error::Result>>> { + let x = WaitingTransaction::new(tx, ready.provided_tags(), &[]); + ready.import(x) + } + #[test] fn should_replace_transaction_that_provides_the_same_tag() { // given @@ -531,24 +576,56 @@ mod tests { tx3.provides = vec![vec![4]]; // when - let x = WaitingTransaction::new(tx2, &ready.provided_tags(), &[]); - ready.import(x).unwrap(); - let x = WaitingTransaction::new(tx3, &ready.provided_tags(), &[]); - ready.import(x).unwrap(); + import(&mut ready, tx2).unwrap(); + import(&mut ready, tx3).unwrap(); assert_eq!(ready.get().count(), 2); // too low priority - let x = WaitingTransaction::new(tx1.clone(), &ready.provided_tags(), &[]); - ready.import(x).unwrap_err(); + import(&mut ready, tx1.clone()).unwrap_err(); tx1.priority = 10; - let x = WaitingTransaction::new(tx1.clone(), &ready.provided_tags(), &[]); - ready.import(x).unwrap(); + import(&mut ready, tx1).unwrap(); // then assert_eq!(ready.get().count(), 1); } + #[test] + fn should_replace_multiple_transactions_correctly() { + // given + let mut ready = ReadyTransactions::default(); + let mut tx0 = tx(0); + tx0.requires = vec![]; + tx0.provides = vec![vec![0]]; + let mut tx1 = tx(1); + tx1.requires = vec![]; + tx1.provides = vec![vec![1]]; + let mut tx2 = tx(2); + tx2.requires = vec![vec![0], vec![1]]; + tx2.provides = vec![vec![2], vec![3]]; + let mut tx3 = tx(3); + tx3.requires = vec![vec![2]]; + tx3.provides = vec![vec![4]]; + let mut tx4 = tx(4); + tx4.requires = vec![vec![3]]; + tx4.provides = vec![vec![5]]; + // replacement + let mut tx2_2 = tx(5); + tx2_2.requires = vec![vec![0], vec![1]]; + tx2_2.provides = vec![vec![2]]; + tx2_2.priority = 10; + + for tx in vec![tx0, tx1, tx2, tx3, tx4] { + import(&mut ready, tx).unwrap(); + } + assert_eq!(ready.get().count(), 5); + + // when + import(&mut ready, tx2_2).unwrap(); + + // then + assert_eq!(ready.get().count(), 3); + } #[test] fn should_return_best_transactions_in_correct_order() { @@ -577,16 +654,9 @@ mod tests { }; // when - let x = WaitingTransaction::new(tx1, &ready.provided_tags(), &[]); - ready.import(x).unwrap(); - let x = WaitingTransaction::new(tx2, &ready.provided_tags(), &[]); - ready.import(x).unwrap(); - let x = WaitingTransaction::new(tx3, &ready.provided_tags(), &[]); - ready.import(x).unwrap(); - let x = WaitingTransaction::new(tx4, &ready.provided_tags(), &[]); - ready.import(x).unwrap(); - let x = WaitingTransaction::new(tx5, &ready.provided_tags(), &[]); - ready.import(x).unwrap(); + for tx in vec![tx1, tx2, tx3, tx4, tx5] { + import(&mut ready, tx).unwrap(); + } // then assert_eq!(ready.best.len(), 1);