Account for transaction priority when enforcing limits (#10388)

* Account for transaction priority when enforcing limits

* Improve `enforce_limits` comment

* Explanation comment on why comparison impl is not used for limit enforcement
This commit is contained in:
Nazar Mokrynskyi
2021-12-03 01:17:23 +02:00
committed by GitHub
parent e9a7dc3ca6
commit 798e01bf9b
2 changed files with 97 additions and 17 deletions
@@ -20,7 +20,7 @@
//!
//! For a more full-featured pool, have a look at the `pool` module.
use std::{collections::HashSet, fmt, hash, sync::Arc};
use std::{cmp::Ordering, collections::HashSet, fmt, hash, sync::Arc};
use log::{debug, trace, warn};
use sc_transaction_pool_api::{error, InPoolTransaction, PoolStatus};
@@ -36,7 +36,7 @@ use sp_runtime::{
use super::{
future::{FutureTransactions, WaitingTransaction},
ready::{BestIterator, ReadyTransactions},
ready::{BestIterator, ReadyTransactions, TransactionRef},
};
/// Successful import result.
@@ -384,8 +384,8 @@ impl<Hash: hash::Hash + Member + Serialize, Ex: std::fmt::Debug> BasePool<Hash,
///
/// Removes and returns worst transactions from the queues and all transactions that depend on
/// them. Technically the worst transaction should be evaluated by computing the entire pending
/// set. We use a simplified approach to remove the transaction that occupies the pool for the
/// longest time.
/// set. We use a simplified approach to remove transactions with the lowest priority first or
/// those that occupy the pool for the longest time in case priority is the same.
pub fn enforce_limits(
&mut self,
ready: &Limit,
@@ -395,18 +395,30 @@ impl<Hash: hash::Hash + Member + Serialize, Ex: std::fmt::Debug> BasePool<Hash,
while ready.is_exceeded(self.ready.len(), self.ready.bytes()) {
// find the worst transaction
let minimal = self.ready.fold(|minimal, current| {
let worst = self.ready.fold::<TransactionRef<Hash, Ex>, _>(|worst, current| {
let transaction = &current.transaction;
match minimal {
None => Some(transaction.clone()),
Some(ref tx) if tx.insertion_id > transaction.insertion_id =>
Some(transaction.clone()),
other => other,
}
worst
.map(|worst| {
// Here we don't use `TransactionRef`'s ordering implementation because
// while it prefers priority like need here, it also prefers older
// transactions for inclusion purposes and limit enforcement needs to prefer
// newer transactions instead and drop the older ones.
match worst.transaction.priority.cmp(&transaction.transaction.priority) {
Ordering::Less => worst,
Ordering::Equal =>
if worst.insertion_id > transaction.insertion_id {
transaction.clone()
} else {
worst
},
Ordering::Greater => transaction.clone(),
}
})
.or_else(|| Some(transaction.clone()))
});
if let Some(minimal) = minimal {
removed.append(&mut self.remove_subtree(&[minimal.transaction.hash.clone()]))
if let Some(worst) = worst {
removed.append(&mut self.remove_subtree(&[worst.transaction.hash.clone()]))
} else {
break
}
@@ -414,14 +426,14 @@ impl<Hash: hash::Hash + Member + Serialize, Ex: std::fmt::Debug> BasePool<Hash,
while future.is_exceeded(self.future.len(), self.future.bytes()) {
// find the worst transaction
let minimal = self.future.fold(|minimal, current| match minimal {
let worst = self.future.fold(|worst, current| match worst {
None => Some(current.clone()),
Some(ref tx) if tx.imported_at > current.imported_at => Some(current.clone()),
other => other,
});
if let Some(minimal) = minimal {
removed.append(&mut self.remove_subtree(&[minimal.transaction.hash.clone()]))
if let Some(worst) = worst {
removed.append(&mut self.remove_subtree(&[worst.transaction.hash.clone()]))
} else {
break
}
@@ -539,6 +539,13 @@ mod tests {
longevity: 9001,
propagate: false,
}),
Extrinsic::Store(_) => Ok(ValidTransaction {
priority: 9001,
requires: vec![],
provides: vec![vec![43]],
longevity: 9001,
propagate: false,
}),
_ => unimplemented!(),
};
@@ -1044,7 +1051,7 @@ mod tests {
}
#[test]
fn should_trigger_dropped() {
fn should_trigger_dropped_older() {
// given
let limit = Limit { count: 1, total_bytes: 1000 };
let options =
@@ -1077,6 +1084,67 @@ mod tests {
assert_eq!(stream.next(), Some(TransactionStatus::Dropped));
}
#[test]
fn should_trigger_dropped_lower_priority() {
{
// given
let limit = Limit { count: 1, total_bytes: 1000 };
let options =
Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };
let pool = Pool::new(options, true.into(), TestApi::default().into());
let xt = Extrinsic::IncludeData(Vec::new());
block_on(pool.submit_one(&BlockId::Number(0), SOURCE, xt)).unwrap();
assert_eq!(pool.validated_pool().status().ready, 1);
// then
let xt = uxt(Transfer {
from: AccountId::from_h256(H256::from_low_u64_be(2)),
to: AccountId::from_h256(H256::from_low_u64_be(1)),
amount: 4,
nonce: 1,
});
let result = block_on(pool.submit_one(&BlockId::Number(1), SOURCE, xt));
assert!(matches!(
result,
Err(sc_transaction_pool_api::error::Error::ImmediatelyDropped)
));
}
{
// given
let limit = Limit { count: 2, total_bytes: 1000 };
let options =
Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };
let pool = Pool::new(options, true.into(), TestApi::default().into());
let xt = Extrinsic::IncludeData(Vec::new());
block_on(pool.submit_and_watch(&BlockId::Number(0), SOURCE, xt)).unwrap();
assert_eq!(pool.validated_pool().status().ready, 1);
let xt = uxt(Transfer {
from: AccountId::from_h256(H256::from_low_u64_be(1)),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 5,
nonce: 0,
});
let watcher =
block_on(pool.submit_and_watch(&BlockId::Number(0), SOURCE, xt)).unwrap();
assert_eq!(pool.validated_pool().status().ready, 2);
// when
let xt = Extrinsic::Store(Vec::new());
block_on(pool.submit_one(&BlockId::Number(1), SOURCE, xt)).unwrap();
assert_eq!(pool.validated_pool().status().ready, 2);
// then
let mut stream = futures::executor::block_on_stream(watcher.into_stream());
assert_eq!(stream.next(), Some(TransactionStatus::Ready));
assert_eq!(stream.next(), Some(TransactionStatus::Dropped));
}
}
#[test]
fn should_handle_pruning_in_the_middle_of_import() {
// given