Optimize transaction pool pruning (#1524)

* Reuse tags of known transactions, avoid reimporting extrinsics from imported block.

* Fix tests for graph.

* Add more detailed docs.

* Avoid cloning and computing hashes twice.
This commit is contained in:
Tomasz Drwięga
2019-01-24 16:52:17 +01:00
committed by Gav Wood
parent 997c8b4020
commit 1b0d90cdf6
7 changed files with 180 additions and 47 deletions
+5 -24
View File
@@ -199,36 +199,17 @@ fn on_block_imported<Api, Backend, Block, Executor, PoolApi>(
Executor: client::CallExecutor<Block, Blake2Hasher>, Executor: client::CallExecutor<Block, Blake2Hasher>,
PoolApi: txpool::ChainApi<Hash = Block::Hash, Block = Block>, PoolApi: txpool::ChainApi<Hash = Block::Hash, Block = Block>,
{ {
use runtime_primitives::transaction_validity::TransactionValidity;
// Avoid calling into runtime if there is nothing to prune from the pool anyway. // Avoid calling into runtime if there is nothing to prune from the pool anyway.
if transaction_pool.status().is_empty() { if transaction_pool.status().is_empty() {
return Ok(()) return Ok(())
} }
let block = client.block(id)?; if let Some(block) = client.block(id)? {
let tags = match block { let parent_id = BlockId::hash(*block.block.header().parent_hash());
None => return Ok(()), let extrinsics = block.block.extrinsics();
Some(block) => { transaction_pool.prune(id, &parent_id, extrinsics).map_err(|e| format!("{:?}", e))?;
let parent_id = BlockId::hash(*block.block.header().parent_hash()); }
let mut tags = vec![];
for tx in block.block.extrinsics() {
let tx = client.runtime_api().validate_transaction(&parent_id, tx.clone())?;
match tx {
TransactionValidity::Valid { mut provides, .. } => {
tags.append(&mut provides);
},
// silently ignore invalid extrinsics,
// cause they might just be inherent
_ => {}
}
}
tags
}
};
transaction_pool.prune_tags(id, tags).map_err(|e| format!("{:?}", e))?;
Ok(()) Ok(())
} }
@@ -226,6 +226,21 @@ impl<Hash: hash::Hash + Member + Serialize, Ex: ::std::fmt::Debug> BasePool<Hash
self.future.all() self.future.all()
} }
/// Returns pool transactions given list of hashes.
///
/// Includes both ready and future pool. For every hash in the `hashes`
/// iterator an `Option` is produced (so the resulting `Vec` always have the same length).
pub fn by_hash(&self, hashes: &[Hash]) -> Vec<Option<Arc<Transaction<Hash, Ex>>>> {
let ready = self.ready.by_hash(hashes);
let future = self.future.by_hash(hashes);
ready
.into_iter()
.zip(future)
.map(|(a, b)| a.or(b))
.collect()
}
/// Removes all transactions represented by the hashes and all other transactions /// Removes all transactions represented by the hashes and all other transactions
/// that depend on them. /// that depend on them.
/// ///
@@ -236,7 +251,7 @@ impl<Hash: hash::Hash + Member + Serialize, Ex: ::std::fmt::Debug> BasePool<Hash
/// and you don't want them to be stored in the pool use `prune_tags` method. /// and you don't want them to be stored in the pool use `prune_tags` method.
pub fn remove_invalid(&mut self, hashes: &[Hash]) -> Vec<Arc<Transaction<Hash, Ex>>> { pub fn remove_invalid(&mut self, hashes: &[Hash]) -> Vec<Arc<Transaction<Hash, Ex>>> {
let mut removed = self.ready.remove_invalid(hashes); let mut removed = self.ready.remove_invalid(hashes);
removed.extend(self.future.remove(hashes).into_iter().map(Arc::new)); removed.extend(self.future.remove(hashes));
removed removed
} }
@@ -17,6 +17,7 @@
use std::{ use std::{
collections::{HashMap, HashSet}, collections::{HashMap, HashSet},
hash, hash,
sync::Arc,
}; };
use sr_primitives::transaction_validity::{ use sr_primitives::transaction_validity::{
@@ -29,7 +30,7 @@ use base_pool::Transaction;
#[derive(Debug)] #[derive(Debug)]
pub struct WaitingTransaction<Hash, Ex> { pub struct WaitingTransaction<Hash, Ex> {
/// Transaction details. /// Transaction details.
pub transaction: Transaction<Hash, Ex>, pub transaction: Arc<Transaction<Hash, Ex>>,
/// Tags that are required and have not been satisfied yet by other transactions in the pool. /// Tags that are required and have not been satisfied yet by other transactions in the pool.
pub missing_tags: HashSet<Tag>, pub missing_tags: HashSet<Tag>,
} }
@@ -47,7 +48,7 @@ impl<Hash, Ex> WaitingTransaction<Hash, Ex> {
.collect(); .collect();
WaitingTransaction { WaitingTransaction {
transaction, transaction: Arc::new(transaction),
missing_tags, missing_tags,
} }
} }
@@ -117,6 +118,11 @@ impl<Hash: hash::Hash + Eq + Clone, Ex> FutureTransactions<Hash, Ex> {
self.waiting.contains_key(hash) self.waiting.contains_key(hash)
} }
/// Returns a list of known transactions
pub fn by_hash(&self, hashes: &[Hash]) -> Vec<Option<Arc<Transaction<Hash, Ex>>>> {
hashes.iter().map(|h| self.waiting.get(h).map(|x| x.transaction.clone())).collect()
}
/// Satisfies provided tags in transactions that are waiting for them. /// Satisfies provided tags in transactions that are waiting for them.
/// ///
/// Returns (and removes) transactions that became ready after their last tag got /// Returns (and removes) transactions that became ready after their last tag got
@@ -148,7 +154,7 @@ impl<Hash: hash::Hash + Eq + Clone, Ex> FutureTransactions<Hash, Ex> {
/// Removes transactions for given list of hashes. /// Removes transactions for given list of hashes.
/// ///
/// Returns a list of actually removed transactions. /// Returns a list of actually removed transactions.
pub fn remove(&mut self, hashes: &[Hash]) -> Vec<Transaction<Hash, Ex>> { pub fn remove(&mut self, hashes: &[Hash]) -> Vec<Arc<Transaction<Hash, Ex>>> {
let mut removed = vec![]; let mut removed = vec![];
for hash in hashes { for hash in hashes {
if let Some(waiting_tx) = self.waiting.remove(hash) { if let Some(waiting_tx) = self.waiting.remove(hash) {
@@ -171,7 +177,7 @@ impl<Hash: hash::Hash + Eq + Clone, Ex> FutureTransactions<Hash, Ex> {
/// Returns iterator over all future transactions /// Returns iterator over all future transactions
pub fn all(&self) -> impl Iterator<Item=&Transaction<Hash, Ex>> { pub fn all(&self) -> impl Iterator<Item=&Transaction<Hash, Ex>> {
self.waiting.values().map(|waiting| &waiting.transaction) self.waiting.values().map(|waiting| &*waiting.transaction)
} }
/// Returns number of transactions in the Future queue. /// Returns number of transactions in the Future queue.
@@ -152,9 +152,77 @@ impl<B: ChainApi> Pool<B> {
Ok(watcher) Ok(watcher)
} }
/// Prunes ready transactions.
///
/// Used to clear the pool from transactions that were part of recently imported block.
/// To perform pruning we need the tags that each extrinsic provides and to avoid calling
/// into runtime too often we first lookup all extrinsics that are in the pool and get
/// their provided tags from there. Otherwise we query the runtime at the `parent` block.
pub fn prune(&self, at: &BlockId<B::Block>, parent: &BlockId<B::Block>, extrinsics: &[ExtrinsicFor<B>]) -> Result<(), B::Error> {
let mut tags = Vec::with_capacity(extrinsics.len());
// Get details of all extrinsics that are already in the pool
let hashes = extrinsics.iter().map(|extrinsic| self.api.hash(extrinsic)).collect::<Vec<_>>();
let in_pool = self.pool.read().by_hash(&hashes);
{
// Zip the ones from the pool with the full list (we get pairs `(Extrinsic, Option<TransactionDetails>)`)
let all = extrinsics.iter().zip(in_pool.iter());
for (extrinsic, existing_in_pool) in all {
match *existing_in_pool {
// reuse the tags for extrinsis that were found in the pool
Some(ref transaction) => {
tags.extend(transaction.provides.iter().cloned());
},
// if it's not found in the pool query the runtime at parent block
// to get validity info and tags that the extrinsic provides.
None => {
let validity = self.api.validate_transaction(parent, extrinsic.clone());
match validity {
Ok(TransactionValidity::Valid { mut provides, .. }) => {
tags.append(&mut provides);
},
// silently ignore invalid extrinsics,
// cause they might just be inherent
_ => {}
}
},
}
}
}
self.prune_tags(at, tags, in_pool.into_iter().filter_map(|x| x).map(|x| x.hash.clone()))?;
Ok(())
}
/// Prunes ready transactions that provide given list of tags. /// Prunes ready transactions that provide given list of tags.
pub fn prune_tags(&self, at: &BlockId<B::Block>, tags: impl IntoIterator<Item=Tag>) -> Result<(), B::Error> { ///
/// Given tags are assumed to be always provided now, so all transactions
/// in the Future Queue that require that particular tag (and have other
/// requirements satisfied) are promoted to Ready Queue.
///
/// Moreover for each provided tag we remove transactions in the pool that:
/// 1. Provide that tag directly
/// 2. Are a dependency of pruned transaction.
///
/// By removing predecessor transactions as well we might actually end up
/// pruning too much, so all removed transactions are reverified against
/// the runtime (`validate_transaction`) to make sure they are invalid.
///
/// However we avoid revalidating transactions that are contained within
/// the second parameter of `known_imported_hashes`. These transactions
/// (if pruned) are not revalidated and become temporarily banned to
/// prevent importing them in the (near) future.
pub fn prune_tags(
&self,
at: &BlockId<B::Block>,
tags: impl IntoIterator<Item=Tag>,
known_imported_hashes: impl IntoIterator<Item=ExHash<B>> + Clone,
) -> Result<(), B::Error> {
// Perform tag-based pruning in the base pool
let status = self.pool.write().prune_tags(tags); let status = self.pool.write().prune_tags(tags);
// Notify event listeners of all transactions
// that were promoted to `Ready` or were dropped.
{ {
let mut listener = self.listener.write(); let mut listener = self.listener.write();
for promoted in &status.promoted { for promoted in &status.promoted {
@@ -164,10 +232,17 @@ impl<B: ChainApi> Pool<B> {
listener.dropped(f, None); listener.dropped(f, None);
} }
} }
// make sure that we don't revalidate extrinsics that were part of the recently
// imported block. This is especially important for UTXO-like chains cause the
// inputs are pruned so such transaction would go to future again.
self.rotator.ban(&std::time::Instant::now(), known_imported_hashes.clone().into_iter());
// try to re-submit pruned transactions since some of them might be still valid. // try to re-submit pruned transactions since some of them might be still valid.
// note that `known_imported_hashes` will be rejected here due to temporary ban.
let hashes = status.pruned.iter().map(|tx| tx.hash.clone()).collect::<Vec<_>>(); let hashes = status.pruned.iter().map(|tx| tx.hash.clone()).collect::<Vec<_>>();
let results = self.submit_at(at, status.pruned.into_iter().map(|tx| tx.data.clone()))?; let results = self.submit_at(at, status.pruned.into_iter().map(|tx| tx.data.clone()))?;
// Fire mined event for transactions that became invalid.
// Collect the hashes of transactions that now became invalid (meaning that they are succesfuly pruned).
let hashes = results.into_iter().enumerate().filter_map(|(idx, r)| match r.map_err(error::IntoPoolError::into_pool_error) { let hashes = results.into_iter().enumerate().filter_map(|(idx, r)| match r.map_err(error::IntoPoolError::into_pool_error) {
Err(Ok(err)) => match err.kind() { Err(Ok(err)) => match err.kind() {
error::ErrorKind::InvalidTransaction => Some(hashes[idx].clone()), error::ErrorKind::InvalidTransaction => Some(hashes[idx].clone()),
@@ -175,15 +250,19 @@ impl<B: ChainApi> Pool<B> {
}, },
_ => None, _ => None,
}); });
// Fire `pruned` notifications for collected hashes and make sure to include
// `known_imported_hashes` since they were just imported as part of the block.
let hashes = hashes.chain(known_imported_hashes.into_iter());
{ {
let header_hash = self.api.block_id_to_hash(at)? let header_hash = self.api.block_id_to_hash(at)?
.ok_or_else(|| error::ErrorKind::Msg(format!("Invalid block id: {:?}", at)).into())?; .ok_or_else(|| error::ErrorKind::Msg(format!("Invalid block id: {:?}", at)).into())?;
let mut listener = self.listener.write(); let mut listener = self.listener.write();
for h in hashes { for h in hashes {
listener.pruned(header_hash, &h) listener.pruned(header_hash, &h);
} }
} }
// clear old transactions // perform regular cleanup of old transactions in the pool
// and update temporary bans.
self.clear_stale(at)?; self.clear_stale(at)?;
Ok(()) Ok(())
} }
@@ -256,7 +335,7 @@ impl<B: ChainApi> Pool<B> {
pub fn remove_invalid(&self, hashes: &[ExHash<B>]) -> Vec<TransactionFor<B>> { pub fn remove_invalid(&self, hashes: &[ExHash<B>]) -> Vec<TransactionFor<B>> {
// temporarily ban invalid transactions // temporarily ban invalid transactions
debug!(target: "txpool", "Banning invalid transactions: {:?}", hashes); debug!(target: "txpool", "Banning invalid transactions: {:?}", hashes);
self.rotator.ban(&time::Instant::now(), hashes); self.rotator.ban(&time::Instant::now(), hashes.iter().cloned());
let invalid = self.pool.write().remove_invalid(hashes); let invalid = self.pool.write().remove_invalid(hashes);
@@ -401,7 +480,7 @@ mod tests {
}); });
// when // when
pool.rotator.ban(&time::Instant::now(), &[pool.hash_of(&uxt)]); pool.rotator.ban(&time::Instant::now(), vec![pool.hash_of(&uxt)]);
let res = pool.submit_one(&BlockId::Number(0), uxt); let res = pool.submit_one(&BlockId::Number(0), uxt);
assert_eq!(pool.status().ready, 0); assert_eq!(pool.status().ready, 0);
assert_eq!(pool.status().future, 0); assert_eq!(pool.status().future, 0);
@@ -486,6 +565,24 @@ mod tests {
assert!(pool.rotator.is_banned(&hash3)); assert!(pool.rotator.is_banned(&hash3));
} }
#[test]
fn should_ban_mined_transactions() {
// given
let pool = pool();
let hash1 = pool.submit_one(&BlockId::Number(0), uxt(Transfer {
from: 1.into(),
to: 2.into(),
amount: 5,
nonce: 0,
})).unwrap();
// when
pool.prune_tags(&BlockId::Number(1), vec![vec![0]], vec![hash1.clone()]).unwrap();
// then
assert!(pool.rotator.is_banned(&hash1));
}
mod listener { mod listener {
use super::*; use super::*;
@@ -503,7 +600,32 @@ mod tests {
assert_eq!(pool.status().future, 0); assert_eq!(pool.status().future, 0);
// when // when
pool.prune_tags(&BlockId::Number(2), vec![vec![0u8]]).unwrap(); pool.prune_tags(&BlockId::Number(2), vec![vec![0u8]], vec![]).unwrap();
assert_eq!(pool.status().ready, 0);
assert_eq!(pool.status().future, 0);
// then
let mut stream = watcher.into_stream().wait();
assert_eq!(stream.next(), Some(Ok(::watcher::Status::Ready)));
assert_eq!(stream.next(), Some(Ok(::watcher::Status::Finalised(2.into()))));
assert_eq!(stream.next(), None);
}
#[test]
fn should_trigger_ready_and_finalised_when_pruning_via_hash() {
// given
let pool = pool();
let watcher = pool.submit_and_watch(&BlockId::Number(0), uxt(Transfer {
from: 1.into(),
to: 2.into(),
amount: 5,
nonce: 0,
})).unwrap();
assert_eq!(pool.status().ready, 1);
assert_eq!(pool.status().future, 0);
// when
pool.prune_tags(&BlockId::Number(2), vec![vec![0u8]], vec![2u64]).unwrap();
assert_eq!(pool.status().ready, 0); assert_eq!(pool.status().ready, 0);
assert_eq!(pool.status().future, 0); assert_eq!(pool.status().future, 0);
@@ -33,7 +33,7 @@ use future::WaitingTransaction;
use base_pool::Transaction; use base_pool::Transaction;
#[derive(Debug)] #[derive(Debug)]
pub struct TransactionRef<Hash, Ex> { struct TransactionRef<Hash, Ex> {
pub transaction: Arc<Transaction<Hash, Ex>>, pub transaction: Arc<Transaction<Hash, Ex>>,
pub insertion_id: u64, pub insertion_id: u64,
} }
@@ -160,14 +160,14 @@ impl<Hash: hash::Hash + Member + Serialize, Ex> ReadyTransactions<Hash, Ex> {
self.insertion_id += 1; self.insertion_id += 1;
let insertion_id = self.insertion_id; let insertion_id = self.insertion_id;
let hash = tx.transaction.hash.clone(); let hash = tx.transaction.hash.clone();
let tx = tx.transaction; let transaction = tx.transaction;
let replaced = self.replace_previous(&tx)?; let replaced = self.replace_previous(&transaction)?;
let mut goes_to_best = true; let mut goes_to_best = true;
let mut ready = self.ready.write(); let mut ready = self.ready.write();
// Add links to transactions that unlock the current one // Add links to transactions that unlock the current one
for tag in &tx.requires { for tag in &transaction.requires {
// Check if the transaction that satisfies the tag is still in the queue. // Check if the transaction that satisfies the tag is still in the queue.
if let Some(other) = self.provided_tags.get(tag) { if let Some(other) = self.provided_tags.get(tag) {
let mut tx = ready.get_mut(other).expect(HASH_READY); let mut tx = ready.get_mut(other).expect(HASH_READY);
@@ -178,13 +178,13 @@ impl<Hash: hash::Hash + Member + Serialize, Ex> ReadyTransactions<Hash, Ex> {
} }
// update provided_tags // update provided_tags
for tag in tx.provides.clone() { for tag in &transaction.provides {
self.provided_tags.insert(tag, hash.clone()); self.provided_tags.insert(tag.clone(), hash.clone());
} }
let transaction = TransactionRef { let transaction = TransactionRef {
insertion_id, insertion_id,
transaction: Arc::new(tx), transaction
}; };
// insert to best if it doesn't require any other transaction to be included before it // insert to best if it doesn't require any other transaction to be included before it
@@ -207,6 +207,14 @@ impl<Hash: hash::Hash + Member + Serialize, Ex> ReadyTransactions<Hash, Ex> {
self.ready.read().contains_key(hash) self.ready.read().contains_key(hash)
} }
/// Retrieve transaction by hash
pub fn by_hash(&self, hashes: &[Hash]) -> Vec<Option<Arc<Transaction<Hash, Ex>>>> {
let ready = self.ready.read();
hashes.iter().map(|hash| {
ready.get(hash).map(|x| x.transaction.transaction.clone())
}).collect()
}
/// Removes invalid transactions from the ready pool. /// Removes invalid transactions from the ready pool.
/// ///
/// NOTE removing a transaction will also cause a removal of all transactions that depend on that one /// NOTE removing a transaction will also cause a removal of all transactions that depend on that one
@@ -22,6 +22,7 @@
use std::{ use std::{
collections::HashMap, collections::HashMap,
hash, hash,
iter,
time::{Duration, Instant}, time::{Duration, Instant},
}; };
use parking_lot::RwLock; use parking_lot::RwLock;
@@ -58,11 +59,11 @@ impl<Hash: hash::Hash + Eq + Clone> PoolRotator<Hash> {
} }
/// Bans given set of hashes. /// Bans given set of hashes.
pub fn ban(&self, now: &Instant, hashes: &[Hash]) { pub fn ban(&self, now: &Instant, hashes: impl IntoIterator<Item=Hash>) {
let mut banned = self.banned_until.write(); let mut banned = self.banned_until.write();
for hash in hashes { for hash in hashes {
banned.insert(hash.clone(), *now + self.ban_time); banned.insert(hash, *now + self.ban_time);
} }
if banned.len() > 2 * EXPECTED_SIZE { if banned.len() > 2 * EXPECTED_SIZE {
@@ -83,7 +84,7 @@ impl<Hash: hash::Hash + Eq + Clone> PoolRotator<Hash> {
return false; return false;
} }
self.ban(now, &[xt.hash.clone()]); self.ban(now, iter::once(xt.hash.clone()));
true true
} }
+1 -1
View File
@@ -151,7 +151,7 @@ fn prune_tags_should_work() {
let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect();
assert_eq!(pending, vec![209, 210]); assert_eq!(pending, vec![209, 210]);
pool.prune_tags(&BlockId::number(1), vec![vec![209]]).unwrap(); pool.prune_tags(&BlockId::number(1), vec![vec![209]], vec![]).unwrap();
let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect();
assert_eq!(pending, vec![210]); assert_eq!(pending, vec![210]);