Pool: parallel ready and runtime changes (#922)

* Revert "Revert runtime changes."

This reverts commit 01a7d1aa83c2918dd63b7dc54eb688d544cfc649.

* Parallel queue reads.

* Avoid recursion in best iterator.
This commit is contained in:
Tomasz Drwięga
2018-10-18 10:37:29 +02:00
committed by Gav Wood
parent 62c71a31ec
commit 36625faa9f
13 changed files with 112 additions and 97 deletions
+1 -1
View File
@@ -574,7 +574,7 @@ impl<B, E, Block> Client<B, E, Block> where
for tx in extrinsics {
let tx = api::TaggedTransactionQueue::validate_transaction(self, &id, &tx)?;
match tx {
TransactionValidity::Valid(_, _, mut provides, ..) => {
TransactionValidity::Valid { mut provides, .. } => {
tags.append(&mut provides);
},
// silently ignore invalid extrinsics,
+1 -1
View File
@@ -516,8 +516,8 @@ impl<B: BlockT, S: Specialization<B>, H: ExHashT> Protocol<B, S, H> {
for (who, ref mut peer) in peers.iter_mut() {
let (hashes, to_send): (Vec<_>, Vec<_>) = extrinsics
.iter()
.cloned()
.filter(|&(ref hash, _)| peer.known_extrinsics.insert(hash.clone()))
.cloned()
.unzip();
if !to_send.is_empty() {
+1 -1
View File
@@ -129,7 +129,7 @@ impl<B: BlockT + 'static, S: Specialization<B>, H: ExHashT> Service<B, S, H> {
params: Params<B, S, H>,
protocol_id: ProtocolId,
import_queue: I,
) -> Result<Arc<Service<B, S, H>>, Error> {
) -> Result<Arc<Service<B, S, H>>, Error> {
let chain = params.chain.clone();
let import_queue = Arc::new(import_queue);
let handler = Arc::new(Protocol::new(
+1 -1
View File
@@ -127,7 +127,7 @@ impl<B, E, P> AuthorApi<ExHash<P>, BlockHash<P>, ExtrinsicFor<P>, Vec<ExtrinsicF
}
fn pending_extrinsics(&self) -> Result<Vec<ExtrinsicFor<P>>> {
Ok(self.pool.all(usize::max_value()))
Ok(self.pool.ready().map(|tx| tx.data.clone()).collect())
}
fn watch_extrinsic(&self, _metadata: Self::Metadata, subscriber: pubsub::Subscriber<Status<ExHash<P>, BlockHash<P>>>, xt: Bytes) {
+2 -2
View File
@@ -393,13 +393,13 @@ impl<C: Components> TransactionPoolAdapter<C> {
impl<C: Components> network::TransactionPool<ComponentExHash<C>, ComponentBlock<C>> for TransactionPoolAdapter<C> {
fn transactions(&self) -> Vec<(ComponentExHash<C>, ComponentExtrinsic<C>)> {
self.pool.ready(|pending| pending
self.pool.ready()
.map(|t| {
let hash = t.hash.clone();
let ex: ComponentExtrinsic<C> = t.data.clone();
(hash, ex)
})
.collect())
.collect()
}
fn import(&self, transaction: &ComponentExtrinsic<C>) -> Option<ComponentExHash<C>> {
+1 -1
View File
@@ -247,7 +247,7 @@ where
let best_block = BlockId::number(first_service.client().info().unwrap().chain.best_number);
first_service.transaction_pool().submit_one(&best_block, extrinsic_factory(&first_service)).unwrap();
network.run_until_all_full(|_index, service|
service.transaction_pool().all(usize::max_value()).len() == 1
service.transaction_pool().ready().count() == 1
);
}
@@ -33,11 +33,11 @@ pub type TransactionTag = Vec<u8>;
#[cfg_attr(feature = "std", derive(Debug))]
pub enum TransactionValidity {
Invalid,
Valid(
/* priority: */TransactionPriority,
/* requires: */Vec<TransactionTag>,
/* provides: */Vec<TransactionTag>,
/* longevity: */TransactionLongevity
),
Valid {
priority: TransactionPriority,
requires: Vec<TransactionTag>,
provides: Vec<TransactionTag>,
longevity: TransactionLongevity
},
Unknown,
}
+4 -4
View File
@@ -131,12 +131,12 @@ pub fn validate_transaction(utx: Extrinsic) -> TransactionValidity {
p
};
TransactionValidity::Valid(
/* priority: */tx.amount,
TransactionValidity::Valid {
priority: tx.amount,
requires,
provides,
/* longevity: */64
)
longevity: 64
}
}
@@ -216,7 +216,7 @@ impl<Hash: hash::Hash + Member, Ex: ::std::fmt::Debug> BasePool<Hash, Ex> {
}
/// Returns an iterator over ready transactions in the pool.
pub fn ready<'a, 'b: 'a>(&'b self) -> impl Iterator<Item=Arc<Transaction<Hash, Ex>>> + 'a {
pub fn ready(&self) -> impl Iterator<Item=Arc<Transaction<Hash, Ex>>> {
self.ready.get()
}
@@ -105,7 +105,7 @@ impl<B: ChainApi> Pool<B> {
}
match self.api.validate_transaction(at, &xt)? {
TransactionValidity::Valid(priority, requires, provides, longevity) => {
TransactionValidity::Valid { priority, requires, provides, longevity } => {
Ok(base::Transaction {
data: xt,
hash,
@@ -197,11 +197,12 @@ impl<B: ChainApi> Pool<B> {
.ok_or_else(|| error::ErrorKind::Msg(format!("Invalid block id: {:?}", at)).into())?
.as_();
let now = time::Instant::now();
let to_remove = self.ready(|pending| pending
.filter(|tx| self.rotator.ban_if_stale(&now, block_number, &tx))
.map(|tx| tx.hash.clone())
.collect::<Vec<_>>()
);
let to_remove = {
self.ready()
.filter(|tx| self.rotator.ban_if_stale(&now, block_number, &tx))
.map(|tx| tx.hash.clone())
.collect::<Vec<_>>()
};
let futures_to_remove: Vec<ExHash<B>> = {
let p = self.pool.read();
let mut hashes = Vec::new();
@@ -266,20 +267,9 @@ impl<B: ChainApi> Pool<B> {
invalid
}
/// Get ready transactions ordered by priority
pub fn ready<F, X>(&self, f: F) -> X where
F: FnOnce(&mut Iterator<Item=TransactionFor<B>>) -> X,
{
let pool = self.pool.read();
let mut ready = pool.ready();
f(&mut ready)
}
/// Returns all transactions in the pool.
///
/// Be careful with large limit values, as querying the entire pool might be time consuming.
pub fn all(&self, limit: usize) -> Vec<ExtrinsicFor<B>> {
self.ready(|it| it.take(limit).map(|ex| ex.data.clone()).collect())
/// Get an iterator for ready transactions ordered by priority
pub fn ready(&self) -> impl Iterator<Item=TransactionFor<B>> {
self.pool.read().ready()
}
/// Returns pool status.
@@ -21,6 +21,7 @@ use std::{
sync::Arc,
};
use parking_lot::RwLock;
use sr_primitives::traits::Member;
use sr_primitives::transaction_validity::{
TransactionTag as Tag,
@@ -79,6 +80,16 @@ struct ReadyTx<Hash, Ex> {
pub requires_offset: usize,
}
impl<Hash: Clone, Ex> Clone for ReadyTx<Hash, Ex> {
fn clone(&self) -> Self {
ReadyTx {
transaction: self.transaction.clone(),
unlocks: self.unlocks.clone(),
requires_offset: self.requires_offset,
}
}
}
const HASH_READY: &str = r#"
Every time transaction is imported its hash is placed in `ready` map and tags in `provided_tags`;
Every time transaction is removed from the queue we remove the hash from `ready` map and from `provided_tags`;
@@ -93,8 +104,7 @@ pub struct ReadyTransactions<Hash: hash::Hash + Eq, Ex> {
/// tags that are provided by Ready transactions
provided_tags: HashMap<Tag, Hash>,
/// Transactions that are ready (i.e. don't have any requirements external to the pool)
ready: HashMap<Hash, ReadyTx<Hash, Ex>>,
// ^^ TODO [ToDr] Consider wrapping this into `Arc<RwLock<>>` and allow multiple concurrent iterators
ready: Arc<RwLock<HashMap<Hash, ReadyTx<Hash, Ex>>>>,
/// Best transactions that are ready to be included to the block without any other previous transaction.
best: BTreeSet<TransactionRef<Hash, Ex>>,
}
@@ -127,9 +137,9 @@ impl<Hash: hash::Hash + Member, Ex> ReadyTransactions<Hash, Ex> {
/// - transactions that are valid for a shorter time go first
/// 4. Lastly we sort by the time in the queue
/// - transactions that are longer in the queue go first
pub fn get<'a>(&'a self) -> impl Iterator<Item=Arc<Transaction<Hash, Ex>>> + 'a {
pub fn get(&self) -> impl Iterator<Item=Arc<Transaction<Hash, Ex>>> {
BestIterator {
all: &self.ready,
all: self.ready.clone(),
best: self.best.clone(),
awaiting: Default::default(),
}
@@ -144,7 +154,7 @@ impl<Hash: hash::Hash + Member, Ex> ReadyTransactions<Hash, Ex> {
tx: WaitingTransaction<Hash, Ex>,
) -> error::Result<Vec<Arc<Transaction<Hash, Ex>>>> {
assert!(tx.is_ready(), "Only ready transactions can be imported.");
assert!(!self.ready.contains_key(&tx.transaction.hash), "Transaction is already imported.");
assert!(!self.ready.read().contains_key(&tx.transaction.hash), "Transaction is already imported.");
self.insertion_id += 1;
let insertion_id = self.insertion_id;
@@ -154,11 +164,12 @@ impl<Hash: hash::Hash + Member, Ex> ReadyTransactions<Hash, Ex> {
let replaced = self.replace_previous(&tx)?;
let mut goes_to_best = true;
let mut ready = self.ready.write();
// Add links to transactions that unlock the current one
for tag in &tx.requires {
// Check if the transaction that satisfies the tag is still in the queue.
if let Some(other) = self.provided_tags.get(tag) {
let mut tx = self.ready.get_mut(other).expect(HASH_READY);
let mut tx = ready.get_mut(other).expect(HASH_READY);
tx.unlocks.push(hash.clone());
// this transaction depends on some other, so it doesn't go to best directly.
goes_to_best = false;
@@ -181,7 +192,7 @@ impl<Hash: hash::Hash + Member, Ex> ReadyTransactions<Hash, Ex> {
}
// insert to Ready
self.ready.insert(hash, ReadyTx {
ready.insert(hash, ReadyTx {
transaction,
unlocks: vec![],
requires_offset: 0,
@@ -192,7 +203,7 @@ impl<Hash: hash::Hash + Member, Ex> ReadyTransactions<Hash, Ex> {
/// Returns true if given hash is part of the queue.
pub fn contains(&self, hash: &Hash) -> bool {
self.ready.contains_key(hash)
self.ready.read().contains_key(hash)
}
/// Removes invalid transactions from the ready pool.
@@ -204,13 +215,14 @@ impl<Hash: hash::Hash + Member, Ex> ReadyTransactions<Hash, Ex> {
let mut removed = vec![];
let mut to_remove = hashes.iter().cloned().collect::<Vec<_>>();
let mut ready = self.ready.write();
loop {
let hash = match to_remove.pop() {
Some(hash) => hash,
None => return removed,
};
if let Some(mut tx) = self.ready.remove(&hash) {
if let Some(mut tx) = ready.remove(&hash) {
// remove entries from provided_tags
for tag in &tx.transaction.transaction.provides {
self.provided_tags.remove(tag);
@@ -218,7 +230,7 @@ impl<Hash: hash::Hash + Member, Ex> ReadyTransactions<Hash, Ex> {
// remove from unlocks
for tag in &tx.transaction.transaction.requires {
if let Some(hash) = self.provided_tags.get(tag) {
if let Some(tx) = self.ready.get_mut(hash) {
if let Some(tx) = ready.get_mut(hash) {
remove_item(&mut tx.unlocks, &hash);
}
}
@@ -253,7 +265,7 @@ impl<Hash: hash::Hash + Member, Ex> ReadyTransactions<Hash, Ex> {
};
let res = self.provided_tags.remove(&tag)
.and_then(|hash| self.ready.remove(&hash));
.and_then(|hash| self.ready.write().remove(&hash));
if let Some(tx) = res {
let unlocks = tx.unlocks;
@@ -262,9 +274,10 @@ impl<Hash: hash::Hash + Member, Ex> ReadyTransactions<Hash, Ex> {
// prune previous transactions as well
{
let hash = &tx.hash;
let mut ready = self.ready.write();
let mut find_previous = |tag| -> Option<Vec<Tag>> {
let prev_hash = self.provided_tags.get(tag)?;
let tx2 = self.ready.get_mut(&prev_hash)?;
let tx2 = ready.get_mut(&prev_hash)?;
remove_item(&mut tx2.unlocks, hash);
// We eagerly prune previous transactions as well.
// But it might not always be good.
@@ -292,7 +305,7 @@ impl<Hash: hash::Hash + Member, Ex> ReadyTransactions<Hash, Ex> {
// add the transactions that just got unlocked to `best`
for hash in unlocks {
if let Some(tx) = self.ready.get_mut(&hash) {
if let Some(tx) = self.ready.write().get_mut(&hash) {
tx.requires_offset += 1;
// this transaction is ready
if tx.requires_offset == tx.transaction.transaction.requires.len() {
@@ -328,10 +341,13 @@ impl<Hash: hash::Hash + Member, Ex> ReadyTransactions<Hash, Ex> {
}
// now check if collective priority is lower than the replacement transaction.
let old_priority = replace_hashes
.iter()
.filter_map(|hash| self.ready.get(hash))
.fold(0u64, |total, tx| total.saturating_add(tx.transaction.transaction.priority));
let old_priority = {
let ready = self.ready.read();
replace_hashes
.iter()
.filter_map(|hash| ready.get(hash))
.fold(0u64, |total, tx| total.saturating_add(tx.transaction.transaction.priority))
};
// bail - the transaction has too low priority to replace the old ones
if old_priority >= tx.priority {
@@ -349,7 +365,7 @@ impl<Hash: hash::Hash + Member, Ex> ReadyTransactions<Hash, Ex> {
None => return Ok(removed),
};
let tx = self.ready.remove(&hash).expect(HASH_READY);
let tx = self.ready.write().remove(&hash).expect(HASH_READY);
// check if this transaction provides stuff that is not provided by the new one.
let (mut unlocks, tx) = (tx.unlocks, tx.transaction.transaction);
{
@@ -371,18 +387,18 @@ impl<Hash: hash::Hash + Member, Ex> ReadyTransactions<Hash, Ex> {
/// Returns number of transactions in this queue.
pub fn len(&self) -> usize {
self.ready.len()
self.ready.read().len()
}
}
pub struct BestIterator<'a, Hash: 'a, Ex: 'a> {
all: &'a HashMap<Hash, ReadyTx<Hash, Ex>>,
pub struct BestIterator<Hash, Ex> {
all: Arc<RwLock<HashMap<Hash, ReadyTx<Hash, Ex>>>>,
awaiting: HashMap<Hash, (usize, TransactionRef<Hash, Ex>)>,
best: BTreeSet<TransactionRef<Hash, Ex>>,
}
impl<'a, Hash: 'a + hash::Hash + Member, Ex: 'a> BestIterator<'a, Hash, Ex> {
impl<Hash: hash::Hash + Member, Ex> BestIterator<Hash, Ex> {
/// Depending on number of satisfied requirements insert given ref
/// either to awaiting set or to best set.
fn best_or_awaiting(&mut self, satisfied: usize, tx_ref: TransactionRef<Hash, Ex>) {
@@ -397,32 +413,41 @@ impl<'a, Hash: 'a + hash::Hash + Member, Ex: 'a> BestIterator<'a, Hash, Ex> {
}
}
impl<'a, Hash: 'a + hash::Hash + Member, Ex: 'a> Iterator for BestIterator<'a, Hash, Ex> {
impl<Hash: hash::Hash + Member, Ex> Iterator for BestIterator<Hash, Ex> {
type Item = Arc<Transaction<Hash, Ex>>;
fn next(&mut self) -> Option<Self::Item> {
let best = self.best.iter().next_back()?.clone();
let best = self.best.take(&best)?;
loop {
let best = self.best.iter().next_back()?.clone();
let best = self.best.take(&best)?;
let ready = match self.all.get(&best.transaction.hash) {
Some(ready) => ready,
// The transaction is not in all, maybe it was removed in the meantime?
None => return self.next(),
};
let next = self.all.read().get(&best.transaction.hash).cloned();
let ready = match next {
Some(ready) => ready,
// The transaction is not in all, maybe it was removed in the meantime?
None => continue,
};
// Insert transactions that just got unlocked.
for hash in &ready.unlocks {
// first check local awaiting transactions
if let Some((mut satisfied, tx_ref)) = self.awaiting.remove(hash) {
satisfied += 1;
self.best_or_awaiting(satisfied, tx_ref);
// then get from the pool
} else if let Some(next) = self.all.get(hash) {
self.best_or_awaiting(next.requires_offset + 1, next.transaction.clone());
// Insert transactions that just got unlocked.
for hash in &ready.unlocks {
// first check local awaiting transactions
let res = if let Some((mut satisfied, tx_ref)) = self.awaiting.remove(hash) {
satisfied += 1;
Some((satisfied, tx_ref))
// then get from the pool
} else if let Some(next) = self.all.read().get(hash) {
Some((next.requires_offset + 1, next.transaction.clone()))
} else {
None
};
if let Some((satisfied, tx_ref)) = res {
self.best_or_awaiting(satisfied, tx_ref)
}
}
}
Some(best.transaction.clone())
return Some(best.transaction.clone())
}
}
}
+12 -12
View File
@@ -49,12 +49,12 @@ impl txpool::ChainApi for TestApi {
};
let provides = vec![vec![uxt.transfer.nonce as u8]];
Ok(TransactionValidity::Valid(
/* priority: */1,
Ok(TransactionValidity::Valid {
priority: 1,
requires,
provides,
/* longevity: */64
))
longevity: 64
})
}
fn block_id_to_number(&self, at: &BlockId<Self::Block>) -> error::Result<Option<txpool::NumberFor<Self>>> {
@@ -109,7 +109,7 @@ fn submission_should_work() {
assert_eq!(209, index(&BlockId::number(0)));
pool.submit_one(&BlockId::number(0), uxt(Alice, 209)).unwrap();
let pending: Vec<_> = pool.ready(|p| p.map(|a| a.data.transfer.nonce).collect());
let pending: Vec<_> = pool.ready().map(|a| a.data.transfer.nonce).collect();
assert_eq!(pending, vec![209]);
}
@@ -119,7 +119,7 @@ fn multiple_submission_should_work() {
pool.submit_one(&BlockId::number(0), uxt(Alice, 209)).unwrap();
pool.submit_one(&BlockId::number(0), uxt(Alice, 210)).unwrap();
let pending: Vec<_> = pool.ready(|p| p.map(|a| a.data.transfer.nonce).collect());
let pending: Vec<_> = pool.ready().map(|a| a.data.transfer.nonce).collect();
assert_eq!(pending, vec![209, 210]);
}
@@ -128,7 +128,7 @@ fn early_nonce_should_be_culled() {
let pool = pool();
pool.submit_one(&BlockId::number(0), uxt(Alice, 208)).unwrap();
let pending: Vec<_> = pool.ready(|p| p.map(|a| a.data.transfer.nonce).collect());
let pending: Vec<_> = pool.ready().map(|a| a.data.transfer.nonce).collect();
assert_eq!(pending, Vec::<Index>::new());
}
@@ -137,11 +137,11 @@ fn late_nonce_should_be_queued() {
let pool = pool();
pool.submit_one(&BlockId::number(0), uxt(Alice, 210)).unwrap();
let pending: Vec<_> = pool.ready(|p| p.map(|a| a.data.transfer.nonce).collect());
let pending: Vec<_> = pool.ready().map(|a| a.data.transfer.nonce).collect();
assert_eq!(pending, Vec::<Index>::new());
pool.submit_one(&BlockId::number(0), uxt(Alice, 209)).unwrap();
let pending: Vec<_> = pool.ready(|p| p.map(|a| a.data.transfer.nonce).collect());
let pending: Vec<_> = pool.ready().map(|a| a.data.transfer.nonce).collect();
assert_eq!(pending, vec![209, 210]);
}
@@ -151,12 +151,12 @@ fn prune_tags_should_work() {
pool.submit_one(&BlockId::number(0), uxt(Alice, 209)).unwrap();
pool.submit_one(&BlockId::number(0), uxt(Alice, 210)).unwrap();
let pending: Vec<_> = pool.ready(|p| p.map(|a| a.data.transfer.nonce).collect());
let pending: Vec<_> = pool.ready().map(|a| a.data.transfer.nonce).collect();
assert_eq!(pending, vec![209, 210]);
pool.prune_tags(&BlockId::number(1), vec![vec![209]]).unwrap();
let pending: Vec<_> = pool.ready(|p| p.map(|a| a.data.transfer.nonce).collect());
let pending: Vec<_> = pool.ready().map(|a| a.data.transfer.nonce).collect();
assert_eq!(pending, vec![210]);
}
@@ -169,7 +169,7 @@ fn should_ban_invalid_transactions() {
pool.submit_one(&BlockId::number(0), uxt.clone()).unwrap_err();
// when
let pending: Vec<_> = pool.ready(|p| p.map(|a| a.data.transfer.nonce).collect());
let pending: Vec<_> = pool.ready().map(|a| a.data.transfer.nonce).collect();
assert_eq!(pending, Vec::<Index>::new());
// then
+6 -6
View File
@@ -260,12 +260,12 @@ impl<
expected_index = expected_index + One::one();
}
TransactionValidity::Valid(
/*priority: */encoded_len as TransactionPriority,
/*requires: */deps,
/*provides: */vec![(sender, *index).encode()],
/*longevity: */TransactionLongevity::max_value(),
)
TransactionValidity::Valid {
priority: encoded_len as TransactionPriority,
requires: deps,
provides: vec![(sender, *index).encode()],
longevity: TransactionLongevity::max_value(),
}
} else {
return TransactionValidity::Invalid
}