diff --git a/substrate/core/service/src/components.rs b/substrate/core/service/src/components.rs index 0cdf11cc88..51dc086689 100644 --- a/substrate/core/service/src/components.rs +++ b/substrate/core/service/src/components.rs @@ -199,36 +199,17 @@ fn on_block_imported( Executor: client::CallExecutor, PoolApi: txpool::ChainApi, { - use runtime_primitives::transaction_validity::TransactionValidity; - // Avoid calling into runtime if there is nothing to prune from the pool anyway. if transaction_pool.status().is_empty() { return Ok(()) } - let block = client.block(id)?; - let tags = match block { - None => return Ok(()), - Some(block) => { - let parent_id = BlockId::hash(*block.block.header().parent_hash()); - let mut tags = vec![]; - for tx in block.block.extrinsics() { - let tx = client.runtime_api().validate_transaction(&parent_id, tx.clone())?; - match tx { - TransactionValidity::Valid { mut provides, .. } => { - tags.append(&mut provides); - }, - // silently ignore invalid extrinsics, - // cause they might just be inherent - _ => {} - } + if let Some(block) = client.block(id)? { + let parent_id = BlockId::hash(*block.block.header().parent_hash()); + let extrinsics = block.block.extrinsics(); + transaction_pool.prune(id, &parent_id, extrinsics).map_err(|e| format!("{:?}", e))?; + } - } - tags - } - }; - - transaction_pool.prune_tags(id, tags).map_err(|e| format!("{:?}", e))?; Ok(()) } diff --git a/substrate/core/transaction-pool/graph/src/base_pool.rs b/substrate/core/transaction-pool/graph/src/base_pool.rs index 2e96cc75c2..9ec334fb8d 100644 --- a/substrate/core/transaction-pool/graph/src/base_pool.rs +++ b/substrate/core/transaction-pool/graph/src/base_pool.rs @@ -226,6 +226,21 @@ impl BasePool Vec>>> { + let ready = self.ready.by_hash(hashes); + let future = self.future.by_hash(hashes); + + ready + .into_iter() + .zip(future) + .map(|(a, b)| a.or(b)) + .collect() + } + /// Removes all transactions represented by the hashes and all other transactions /// that depend on them. /// @@ -236,7 +251,7 @@ impl BasePool Vec>> { let mut removed = self.ready.remove_invalid(hashes); - removed.extend(self.future.remove(hashes).into_iter().map(Arc::new)); + removed.extend(self.future.remove(hashes)); removed } diff --git a/substrate/core/transaction-pool/graph/src/future.rs b/substrate/core/transaction-pool/graph/src/future.rs index da6e8e55f2..0bab6bc2e0 100644 --- a/substrate/core/transaction-pool/graph/src/future.rs +++ b/substrate/core/transaction-pool/graph/src/future.rs @@ -17,6 +17,7 @@ use std::{ collections::{HashMap, HashSet}, hash, + sync::Arc, }; use sr_primitives::transaction_validity::{ @@ -29,7 +30,7 @@ use base_pool::Transaction; #[derive(Debug)] pub struct WaitingTransaction { /// Transaction details. - pub transaction: Transaction, + pub transaction: Arc>, /// Tags that are required and have not been satisfied yet by other transactions in the pool. pub missing_tags: HashSet, } @@ -47,7 +48,7 @@ impl WaitingTransaction { .collect(); WaitingTransaction { - transaction, + transaction: Arc::new(transaction), missing_tags, } } @@ -117,6 +118,11 @@ impl FutureTransactions { self.waiting.contains_key(hash) } + /// Returns a list of known transactions + pub fn by_hash(&self, hashes: &[Hash]) -> Vec>>> { + hashes.iter().map(|h| self.waiting.get(h).map(|x| x.transaction.clone())).collect() + } + /// Satisfies provided tags in transactions that are waiting for them. /// /// Returns (and removes) transactions that became ready after their last tag got @@ -148,7 +154,7 @@ impl FutureTransactions { /// Removes transactions for given list of hashes. /// /// Returns a list of actually removed transactions. - pub fn remove(&mut self, hashes: &[Hash]) -> Vec> { + pub fn remove(&mut self, hashes: &[Hash]) -> Vec>> { let mut removed = vec![]; for hash in hashes { if let Some(waiting_tx) = self.waiting.remove(hash) { @@ -171,7 +177,7 @@ impl FutureTransactions { /// Returns iterator over all future transactions pub fn all(&self) -> impl Iterator> { - self.waiting.values().map(|waiting| &waiting.transaction) + self.waiting.values().map(|waiting| &*waiting.transaction) } /// Returns number of transactions in the Future queue. diff --git a/substrate/core/transaction-pool/graph/src/pool.rs b/substrate/core/transaction-pool/graph/src/pool.rs index f69aa1d24d..26dfaa974d 100644 --- a/substrate/core/transaction-pool/graph/src/pool.rs +++ b/substrate/core/transaction-pool/graph/src/pool.rs @@ -152,9 +152,77 @@ impl Pool { Ok(watcher) } + /// Prunes ready transactions. + /// + /// Used to clear the pool from transactions that were part of recently imported block. + /// To perform pruning we need the tags that each extrinsic provides and to avoid calling + /// into runtime too often we first lookup all extrinsics that are in the pool and get + /// their provided tags from there. Otherwise we query the runtime at the `parent` block. + pub fn prune(&self, at: &BlockId, parent: &BlockId, extrinsics: &[ExtrinsicFor]) -> Result<(), B::Error> { + let mut tags = Vec::with_capacity(extrinsics.len()); + // Get details of all extrinsics that are already in the pool + let hashes = extrinsics.iter().map(|extrinsic| self.api.hash(extrinsic)).collect::>(); + let in_pool = self.pool.read().by_hash(&hashes); + { + // Zip the ones from the pool with the full list (we get pairs `(Extrinsic, Option)`) + let all = extrinsics.iter().zip(in_pool.iter()); + + for (extrinsic, existing_in_pool) in all { + match *existing_in_pool { + // reuse the tags for extrinsis that were found in the pool + Some(ref transaction) => { + tags.extend(transaction.provides.iter().cloned()); + }, + // if it's not found in the pool query the runtime at parent block + // to get validity info and tags that the extrinsic provides. + None => { + let validity = self.api.validate_transaction(parent, extrinsic.clone()); + match validity { + Ok(TransactionValidity::Valid { mut provides, .. }) => { + tags.append(&mut provides); + }, + // silently ignore invalid extrinsics, + // cause they might just be inherent + _ => {} + } + }, + } + } + } + + self.prune_tags(at, tags, in_pool.into_iter().filter_map(|x| x).map(|x| x.hash.clone()))?; + + Ok(()) + } + /// Prunes ready transactions that provide given list of tags. - pub fn prune_tags(&self, at: &BlockId, tags: impl IntoIterator) -> Result<(), B::Error> { + /// + /// Given tags are assumed to be always provided now, so all transactions + /// in the Future Queue that require that particular tag (and have other + /// requirements satisfied) are promoted to Ready Queue. + /// + /// Moreover for each provided tag we remove transactions in the pool that: + /// 1. Provide that tag directly + /// 2. Are a dependency of pruned transaction. + /// + /// By removing predecessor transactions as well we might actually end up + /// pruning too much, so all removed transactions are reverified against + /// the runtime (`validate_transaction`) to make sure they are invalid. + /// + /// However we avoid revalidating transactions that are contained within + /// the second parameter of `known_imported_hashes`. These transactions + /// (if pruned) are not revalidated and become temporarily banned to + /// prevent importing them in the (near) future. + pub fn prune_tags( + &self, + at: &BlockId, + tags: impl IntoIterator, + known_imported_hashes: impl IntoIterator> + Clone, + ) -> Result<(), B::Error> { + // Perform tag-based pruning in the base pool let status = self.pool.write().prune_tags(tags); + // Notify event listeners of all transactions + // that were promoted to `Ready` or were dropped. { let mut listener = self.listener.write(); for promoted in &status.promoted { @@ -164,10 +232,17 @@ impl Pool { listener.dropped(f, None); } } + // make sure that we don't revalidate extrinsics that were part of the recently + // imported block. This is especially important for UTXO-like chains cause the + // inputs are pruned so such transaction would go to future again. + self.rotator.ban(&std::time::Instant::now(), known_imported_hashes.clone().into_iter()); + // try to re-submit pruned transactions since some of them might be still valid. + // note that `known_imported_hashes` will be rejected here due to temporary ban. let hashes = status.pruned.iter().map(|tx| tx.hash.clone()).collect::>(); let results = self.submit_at(at, status.pruned.into_iter().map(|tx| tx.data.clone()))?; - // Fire mined event for transactions that became invalid. + + // Collect the hashes of transactions that now became invalid (meaning that they are succesfuly pruned). let hashes = results.into_iter().enumerate().filter_map(|(idx, r)| match r.map_err(error::IntoPoolError::into_pool_error) { Err(Ok(err)) => match err.kind() { error::ErrorKind::InvalidTransaction => Some(hashes[idx].clone()), @@ -175,15 +250,19 @@ impl Pool { }, _ => None, }); + // Fire `pruned` notifications for collected hashes and make sure to include + // `known_imported_hashes` since they were just imported as part of the block. + let hashes = hashes.chain(known_imported_hashes.into_iter()); { let header_hash = self.api.block_id_to_hash(at)? .ok_or_else(|| error::ErrorKind::Msg(format!("Invalid block id: {:?}", at)).into())?; let mut listener = self.listener.write(); for h in hashes { - listener.pruned(header_hash, &h) + listener.pruned(header_hash, &h); } } - // clear old transactions + // perform regular cleanup of old transactions in the pool + // and update temporary bans. self.clear_stale(at)?; Ok(()) } @@ -256,7 +335,7 @@ impl Pool { pub fn remove_invalid(&self, hashes: &[ExHash]) -> Vec> { // temporarily ban invalid transactions debug!(target: "txpool", "Banning invalid transactions: {:?}", hashes); - self.rotator.ban(&time::Instant::now(), hashes); + self.rotator.ban(&time::Instant::now(), hashes.iter().cloned()); let invalid = self.pool.write().remove_invalid(hashes); @@ -401,7 +480,7 @@ mod tests { }); // when - pool.rotator.ban(&time::Instant::now(), &[pool.hash_of(&uxt)]); + pool.rotator.ban(&time::Instant::now(), vec![pool.hash_of(&uxt)]); let res = pool.submit_one(&BlockId::Number(0), uxt); assert_eq!(pool.status().ready, 0); assert_eq!(pool.status().future, 0); @@ -486,6 +565,24 @@ mod tests { assert!(pool.rotator.is_banned(&hash3)); } + #[test] + fn should_ban_mined_transactions() { + // given + let pool = pool(); + let hash1 = pool.submit_one(&BlockId::Number(0), uxt(Transfer { + from: 1.into(), + to: 2.into(), + amount: 5, + nonce: 0, + })).unwrap(); + + // when + pool.prune_tags(&BlockId::Number(1), vec![vec![0]], vec![hash1.clone()]).unwrap(); + + // then + assert!(pool.rotator.is_banned(&hash1)); + } + mod listener { use super::*; @@ -503,7 +600,32 @@ mod tests { assert_eq!(pool.status().future, 0); // when - pool.prune_tags(&BlockId::Number(2), vec![vec![0u8]]).unwrap(); + pool.prune_tags(&BlockId::Number(2), vec![vec![0u8]], vec![]).unwrap(); + assert_eq!(pool.status().ready, 0); + assert_eq!(pool.status().future, 0); + + // then + let mut stream = watcher.into_stream().wait(); + assert_eq!(stream.next(), Some(Ok(::watcher::Status::Ready))); + assert_eq!(stream.next(), Some(Ok(::watcher::Status::Finalised(2.into())))); + assert_eq!(stream.next(), None); + } + + #[test] + fn should_trigger_ready_and_finalised_when_pruning_via_hash() { + // given + let pool = pool(); + let watcher = pool.submit_and_watch(&BlockId::Number(0), uxt(Transfer { + from: 1.into(), + to: 2.into(), + amount: 5, + nonce: 0, + })).unwrap(); + assert_eq!(pool.status().ready, 1); + assert_eq!(pool.status().future, 0); + + // when + pool.prune_tags(&BlockId::Number(2), vec![vec![0u8]], vec![2u64]).unwrap(); assert_eq!(pool.status().ready, 0); assert_eq!(pool.status().future, 0); diff --git a/substrate/core/transaction-pool/graph/src/ready.rs b/substrate/core/transaction-pool/graph/src/ready.rs index 1a531b3f49..ca2aa06d50 100644 --- a/substrate/core/transaction-pool/graph/src/ready.rs +++ b/substrate/core/transaction-pool/graph/src/ready.rs @@ -33,7 +33,7 @@ use future::WaitingTransaction; use base_pool::Transaction; #[derive(Debug)] -pub struct TransactionRef { +struct TransactionRef { pub transaction: Arc>, pub insertion_id: u64, } @@ -160,14 +160,14 @@ impl ReadyTransactions { self.insertion_id += 1; let insertion_id = self.insertion_id; let hash = tx.transaction.hash.clone(); - let tx = tx.transaction; + let transaction = tx.transaction; - let replaced = self.replace_previous(&tx)?; + let replaced = self.replace_previous(&transaction)?; let mut goes_to_best = true; let mut ready = self.ready.write(); // Add links to transactions that unlock the current one - for tag in &tx.requires { + for tag in &transaction.requires { // Check if the transaction that satisfies the tag is still in the queue. if let Some(other) = self.provided_tags.get(tag) { let mut tx = ready.get_mut(other).expect(HASH_READY); @@ -178,13 +178,13 @@ impl ReadyTransactions { } // update provided_tags - for tag in tx.provides.clone() { - self.provided_tags.insert(tag, hash.clone()); + for tag in &transaction.provides { + self.provided_tags.insert(tag.clone(), hash.clone()); } let transaction = TransactionRef { insertion_id, - transaction: Arc::new(tx), + transaction }; // insert to best if it doesn't require any other transaction to be included before it @@ -207,6 +207,14 @@ impl ReadyTransactions { self.ready.read().contains_key(hash) } + /// Retrieve transaction by hash + pub fn by_hash(&self, hashes: &[Hash]) -> Vec>>> { + let ready = self.ready.read(); + hashes.iter().map(|hash| { + ready.get(hash).map(|x| x.transaction.transaction.clone()) + }).collect() + } + /// Removes invalid transactions from the ready pool. /// /// NOTE removing a transaction will also cause a removal of all transactions that depend on that one diff --git a/substrate/core/transaction-pool/graph/src/rotator.rs b/substrate/core/transaction-pool/graph/src/rotator.rs index e5ba7ccde6..2d7532c7a9 100644 --- a/substrate/core/transaction-pool/graph/src/rotator.rs +++ b/substrate/core/transaction-pool/graph/src/rotator.rs @@ -22,6 +22,7 @@ use std::{ collections::HashMap, hash, + iter, time::{Duration, Instant}, }; use parking_lot::RwLock; @@ -58,11 +59,11 @@ impl PoolRotator { } /// Bans given set of hashes. - pub fn ban(&self, now: &Instant, hashes: &[Hash]) { + pub fn ban(&self, now: &Instant, hashes: impl IntoIterator) { let mut banned = self.banned_until.write(); for hash in hashes { - banned.insert(hash.clone(), *now + self.ban_time); + banned.insert(hash, *now + self.ban_time); } if banned.len() > 2 * EXPECTED_SIZE { @@ -83,7 +84,7 @@ impl PoolRotator { return false; } - self.ban(now, &[xt.hash.clone()]); + self.ban(now, iter::once(xt.hash.clone())); true } diff --git a/substrate/core/transaction-pool/src/tests.rs b/substrate/core/transaction-pool/src/tests.rs index 5e00b63ad0..fc429577d8 100644 --- a/substrate/core/transaction-pool/src/tests.rs +++ b/substrate/core/transaction-pool/src/tests.rs @@ -151,7 +151,7 @@ fn prune_tags_should_work() { let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); assert_eq!(pending, vec![209, 210]); - pool.prune_tags(&BlockId::number(1), vec![vec![209]]).unwrap(); + pool.prune_tags(&BlockId::number(1), vec![vec![209]], vec![]).unwrap(); let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); assert_eq!(pending, vec![210]);