Fix a race conditon in the pool when transactions are imported during pruning. (#2136)

* Store recently pruned tags to avoid re-importing transactions.

* Update core/transaction-pool/graph/src/base_pool.rs

* Update core/transaction-pool/graph/src/base_pool.rs

* Update core/transaction-pool/graph/src/base_pool.rs

* Update base_pool.rs
This commit is contained in:
Tomasz Drwięga
2019-03-28 18:57:26 +01:00
committed by Gav Wood
parent 7046e13de2
commit e22b491783
6 changed files with 113 additions and 17 deletions
@@ -19,6 +19,7 @@
//! For a more full-featured pool, have a look at the `pool` module.
use std::{
collections::HashSet,
fmt,
hash,
sync::Arc,
@@ -134,6 +135,9 @@ impl<Hash, Extrinsic> fmt::Debug for Transaction<Hash, Extrinsic> where
}
}
/// Store last pruned tags for given number of invocations.
const RECENTLY_PRUNED_TAGS: usize = 2;
/// Transaction pool.
///
/// Builds a dependency graph for all transactions in the pool and returns
@@ -148,6 +152,12 @@ impl<Hash, Extrinsic> fmt::Debug for Transaction<Hash, Extrinsic> where
pub struct BasePool<Hash: hash::Hash + Eq, Ex> {
future: FutureTransactions<Hash, Ex>,
ready: ReadyTransactions<Hash, Ex>,
/// Store recently pruned tags (for last two invocations).
///
/// This is used to make sure we don't accidentally put
/// transactions to future in case they were just stuck in verification.
recently_pruned: [HashSet<Tag>; RECENTLY_PRUNED_TAGS],
recently_pruned_index: usize,
}
impl<Hash: hash::Hash + Eq, Ex> Default for BasePool<Hash, Ex> {
@@ -155,6 +165,8 @@ impl<Hash: hash::Hash + Eq, Ex> Default for BasePool<Hash, Ex> {
BasePool {
future: Default::default(),
ready: Default::default(),
recently_pruned: Default::default(),
recently_pruned_index: 0,
}
}
}
@@ -175,7 +187,11 @@ impl<Hash: hash::Hash + Member + Serialize, Ex: ::std::fmt::Debug> BasePool<Hash
bail!(error::ErrorKind::AlreadyImported(Box::new(tx.hash.clone())))
}
let tx = WaitingTransaction::new(tx, self.ready.provided_tags());
let tx = WaitingTransaction::new(
tx,
self.ready.provided_tags(),
&self.recently_pruned,
);
trace!(target: "txpool", "[{:?}] {:?}", tx.transaction.hash, tx);
debug!(target: "txpool", "[{:?}] Importing to {}", tx.transaction.hash, if tx.is_ready() { "ready" } else { "future" });
@@ -354,12 +370,17 @@ impl<Hash: hash::Hash + Member + Serialize, Ex: ::std::fmt::Debug> BasePool<Hash
pub fn prune_tags(&mut self, tags: impl IntoIterator<Item=Tag>) -> PruneStatus<Hash, Ex> {
let mut to_import = vec![];
let mut pruned = vec![];
let recently_pruned = &mut self.recently_pruned[self.recently_pruned_index];
self.recently_pruned_index = (self.recently_pruned_index + 1) % RECENTLY_PRUNED_TAGS;
recently_pruned.clear();
for tag in tags {
// make sure to promote any future transactions that could be unlocked
to_import.append(&mut self.future.satisfy_tags(::std::iter::once(&tag)));
// and actually prune transactions in ready queue
pruned.append(&mut self.ready.prune_tags(tag));
pruned.append(&mut self.ready.prune_tags(tag.clone()));
// store the tags for next submission
recently_pruned.insert(tag);
}
let mut promoted = vec![];
@@ -663,7 +684,6 @@ mod tests {
assert_eq!(pool.future.len(), 0);
}
#[test]
fn should_handle_a_cycle_with_low_priority() {
// given
@@ -798,7 +818,6 @@ mod tests {
assert_eq!(pool.future.len(), 0);
}
#[test]
fn should_prune_ready_transactions() {
// given
@@ -887,5 +906,4 @@ mod tests {
r#"Transaction { hash: 4, priority: 1000, valid_till: 64, bytes: 1, requires: [03,02], provides: [04], data: [4]}"#.to_owned()
);
}
}
@@ -71,10 +71,19 @@ impl<Hash, Ex> WaitingTransaction<Hash, Ex> {
///
/// Computes the set of missing tags based on the requirements and tags that
/// are provided by all transactions in the ready queue.
pub fn new(transaction: Transaction<Hash, Ex>, provided: &HashMap<Tag, Hash>) -> Self {
pub fn new(
transaction: Transaction<Hash, Ex>,
provided: &HashMap<Tag, Hash>,
recently_pruned: &[HashSet<Tag>],
) -> Self {
let missing_tags = transaction.requires
.iter()
.filter(|tag| !provided.contains_key(&**tag))
.filter(|tag| {
// is true if the tag is already satisfied either via transaction in the pool
// or one that was recently included.
let is_provided = provided.contains_key(&**tag) || recently_pruned.iter().any(|x| x.contains(&**tag));
!is_provided
})
.cloned()
.collect();
@@ -460,7 +460,9 @@ mod tests {
use crate::watcher;
#[derive(Debug, Default)]
struct TestApi;
struct TestApi {
delay: Mutex<Option<std::sync::mpsc::Receiver<()>>>,
}
impl ChainApi for TestApi {
type Block = Block;
@@ -469,9 +471,20 @@ mod tests {
/// Verify extrinsic at given block.
fn validate_transaction(&self, at: &BlockId<Self::Block>, uxt: ExtrinsicFor<Self>) -> Result<TransactionValidity, Self::Error> {
let block_number = self.block_id_to_number(at)?.unwrap();
let nonce = uxt.transfer().nonce;
// This is used to control the test flow.
if nonce > 0 {
let opt = self.delay.lock().take();
if let Some(delay) = opt {
if delay.recv().is_err() {
println!("Error waiting for delay!");
}
}
}
if nonce < block_number {
Ok(TransactionValidity::Invalid(0))
} else {
@@ -878,5 +891,59 @@ mod tests {
assert_eq!(stream.next(), Some(Ok(watcher::Status::Ready)));
assert_eq!(stream.next(), Some(Ok(watcher::Status::Dropped)));
}
#[test]
fn should_handle_pruning_in_the_middle_of_import() {
let _ = env_logger::try_init();
// given
let (ready, is_ready) = std::sync::mpsc::sync_channel(0);
let (tx, rx) = std::sync::mpsc::sync_channel(1);
let mut api = TestApi::default();
api.delay = Mutex::new(rx.into());
let pool = Arc::new(Pool::new(Default::default(), api));
// when
let xt = uxt(Transfer {
from: AccountId::from_h256(H256::from_low_u64_be(1)),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 5,
nonce: 1,
});
// This transaction should go to future, since we use `nonce: 1`
let pool2 = pool.clone();
std::thread::spawn(move || {
pool2.submit_one(&BlockId::Number(0), xt).unwrap();
ready.send(()).unwrap();
});
// But now before the previous one is imported we import
// the one that it depends on.
let xt = uxt(Transfer {
from: AccountId::from_h256(H256::from_low_u64_be(1)),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 4,
nonce: 0,
});
// The tag the above transaction provides (TestApi is using just nonce as u8)
let provides = vec![0_u8];
pool.submit_one(&BlockId::Number(0), xt).unwrap();
assert_eq!(pool.status().ready, 1);
// Now block import happens before the second transaction is able to finish verification.
pool.prune_tags(&BlockId::Number(1), vec![provides], vec![]).unwrap();
assert_eq!(pool.status().ready, 0);
// so when we release the verification of the previous one it will have
// something in `requires`, but should go to ready directly, since the previous transaction was imported
// correctly.
tx.send(()).unwrap();
// then
is_ready.recv().unwrap(); // wait for finish
assert_eq!(pool.status().ready, 1);
assert_eq!(pool.status().future, 0);
}
}
}
@@ -517,18 +517,18 @@ mod tests {
tx3.provides = vec![vec![4]];
// when
let x = WaitingTransaction::new(tx2, &ready.provided_tags());
let x = WaitingTransaction::new(tx2, &ready.provided_tags(), &[]);
ready.import(x).unwrap();
let x = WaitingTransaction::new(tx3, &ready.provided_tags());
let x = WaitingTransaction::new(tx3, &ready.provided_tags(), &[]);
ready.import(x).unwrap();
assert_eq!(ready.get().count(), 2);
// too low priority
let x = WaitingTransaction::new(tx1.clone(), &ready.provided_tags());
let x = WaitingTransaction::new(tx1.clone(), &ready.provided_tags(), &[]);
ready.import(x).unwrap_err();
tx1.priority = 10;
let x = WaitingTransaction::new(tx1.clone(), &ready.provided_tags());
let x = WaitingTransaction::new(tx1.clone(), &ready.provided_tags(), &[]);
ready.import(x).unwrap();
// then
@@ -562,15 +562,15 @@ mod tests {
};
// when
let x = WaitingTransaction::new(tx1, &ready.provided_tags());
let x = WaitingTransaction::new(tx1, &ready.provided_tags(), &[]);
ready.import(x).unwrap();
let x = WaitingTransaction::new(tx2, &ready.provided_tags());
let x = WaitingTransaction::new(tx2, &ready.provided_tags(), &[]);
ready.import(x).unwrap();
let x = WaitingTransaction::new(tx3, &ready.provided_tags());
let x = WaitingTransaction::new(tx3, &ready.provided_tags(), &[]);
ready.import(x).unwrap();
let x = WaitingTransaction::new(tx4, &ready.provided_tags());
let x = WaitingTransaction::new(tx4, &ready.provided_tags(), &[]);
ready.import(x).unwrap();
let x = WaitingTransaction::new(tx5, &ready.provided_tags());
let x = WaitingTransaction::new(tx5, &ready.provided_tags(), &[]);
ready.import(x).unwrap();
// then