Add trivial improvements to transaction pool (#8572)

* Add trival improvements to transaction pool

* .

* Add trival improvements to transaction pool

* Update client/transaction-pool/graph/src/future.rs

* Update client/transaction-pool/graph/src/base_pool.rs

* Fix transaction_debug test

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>
This commit is contained in:
Liu-Cheng Xu
2021-04-09 19:37:40 +08:00
committed by GitHub
parent 9fa684f2a4
commit 7e59d172b8
9 changed files with 83 additions and 102 deletions
@@ -155,13 +155,13 @@ impl<Hash: Clone, Extrinsic: Clone> Transaction<Hash, Extrinsic> {
/// every reason to be commented. That's why we `Transaction` is not `Clone`,
/// but there's explicit `duplicate` method.
pub fn duplicate(&self) -> Self {
Transaction {
Self {
data: self.data.clone(),
bytes: self.bytes.clone(),
bytes: self.bytes,
hash: self.hash.clone(),
priority: self.priority.clone(),
priority: self.priority,
source: self.source,
valid_till: self.valid_till.clone(),
valid_till: self.valid_till,
requires: self.requires.clone(),
provides: self.provides.clone(),
propagate: self.propagate,
@@ -174,16 +174,9 @@ impl<Hash, Extrinsic> fmt::Debug for Transaction<Hash, Extrinsic> where
Extrinsic: fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fn print_tags(fmt: &mut fmt::Formatter, tags: &[Tag]) -> fmt::Result {
let mut it = tags.iter();
if let Some(t) = it.next() {
write!(fmt, "{}", HexDisplay::from(t))?;
}
for t in it {
write!(fmt, ",{}", HexDisplay::from(t))?;
}
Ok(())
}
let join_tags = |tags: &[Tag]| {
tags.iter().map(|tag| HexDisplay::from(tag).to_string()).collect::<Vec<_>>().join(", ")
};
write!(fmt, "Transaction {{ ")?;
write!(fmt, "hash: {:?}, ", &self.hash)?;
@@ -192,11 +185,8 @@ impl<Hash, Extrinsic> fmt::Debug for Transaction<Hash, Extrinsic> where
write!(fmt, "bytes: {:?}, ", &self.bytes)?;
write!(fmt, "propagate: {:?}, ", &self.propagate)?;
write!(fmt, "source: {:?}, ", &self.source)?;
write!(fmt, "requires: [")?;
print_tags(fmt, &self.requires)?;
write!(fmt, "], provides: [")?;
print_tags(fmt, &self.provides)?;
write!(fmt, "], ")?;
write!(fmt, "requires: [{}], ", join_tags(&self.requires))?;
write!(fmt, "provides: [{}], ", join_tags(&self.provides))?;
write!(fmt, "data: {:?}", &self.data)?;
write!(fmt, "}}")?;
Ok(())
@@ -239,7 +229,7 @@ impl<Hash: hash::Hash + Member + Serialize, Ex: std::fmt::Debug> Default for Bas
impl<Hash: hash::Hash + Member + Serialize, Ex: std::fmt::Debug> BasePool<Hash, Ex> {
/// Create new pool given reject_future_transactions flag.
pub fn new(reject_future_transactions: bool) -> Self {
BasePool {
Self {
reject_future_transactions,
future: Default::default(),
ready: Default::default(),
@@ -320,13 +310,8 @@ impl<Hash: hash::Hash + Member + Serialize, Ex: std::fmt::Debug> BasePool<Hash,
let mut first = true;
let mut to_import = vec![tx];
loop {
// take first transaction from the list
let tx = match to_import.pop() {
Some(tx) => tx,
None => break,
};
// take first transaction from the list
while let Some(tx) = to_import.pop() {
// find transactions in Future that it unlocks
to_import.append(&mut self.future.satisfy_tags(&tx.transaction.provides));
@@ -1087,7 +1072,7 @@ mod tests {
}),
"Transaction { \
hash: 4, priority: 1000, valid_till: 64, bytes: 1, propagate: true, \
source: TransactionSource::External, requires: [03,02], provides: [04], data: [4]}".to_owned()
source: TransactionSource::External, requires: [03, 02], provides: [04], data: [4]}".to_owned()
);
}
@@ -47,24 +47,22 @@ impl<Hash: fmt::Debug, Ex: fmt::Debug> fmt::Debug for WaitingTransaction<Hash, E
write!(fmt, "WaitingTransaction {{ ")?;
write!(fmt, "imported_at: {:?}, ", self.imported_at)?;
write!(fmt, "transaction: {:?}, ", self.transaction)?;
write!(fmt, "missing_tags: {{")?;
let mut it = self.missing_tags.iter().map(|tag| HexDisplay::from(tag));
if let Some(tag) = it.next() {
write!(fmt, "{}", tag)?;
}
for tag in it {
write!(fmt, ", {}", tag)?;
}
write!(fmt, " }}}}")
write!(
fmt,
"missing_tags: {{{}}}",
self.missing_tags.iter()
.map(|tag| HexDisplay::from(tag).to_string()).collect::<Vec<_>>().join(", "),
)?;
write!(fmt, "}}")
}
}
impl<Hash, Ex> Clone for WaitingTransaction<Hash, Ex> {
fn clone(&self) -> Self {
WaitingTransaction {
Self {
transaction: self.transaction.clone(),
missing_tags: self.missing_tags.clone(),
imported_at: self.imported_at.clone(),
imported_at: self.imported_at,
}
}
}
@@ -90,7 +88,7 @@ impl<Hash, Ex> WaitingTransaction<Hash, Ex> {
.cloned()
.collect();
WaitingTransaction {
Self {
transaction: Arc::new(transaction),
missing_tags,
imported_at: Instant::now(),
@@ -123,7 +121,7 @@ pub struct FutureTransactions<Hash: hash::Hash + Eq, Ex> {
impl<Hash: hash::Hash + Eq, Ex> Default for FutureTransactions<Hash, Ex> {
fn default() -> Self {
FutureTransactions {
Self {
wanted_tags: Default::default(),
waiting: Default::default(),
}
@@ -20,12 +20,14 @@
use std::{
collections::HashMap, hash, fmt::Debug,
};
use linked_hash_map::LinkedHashMap;
use serde::Serialize;
use crate::{watcher, ChainApi, ExtrinsicHash, BlockHash};
use log::{debug, trace, warn};
use sp_runtime::traits;
use crate::{watcher, ChainApi, ExtrinsicHash, BlockHash};
/// Extrinsic pool default listener.
pub struct Listener<H: hash::Hash + Eq, C: ChainApi> {
watchers: HashMap<H, watcher::Sender<H, ExtrinsicHash<C>>>,
@@ -37,7 +39,7 @@ const MAX_FINALITY_WATCHERS: usize = 512;
impl<H: hash::Hash + Eq + Debug, C: ChainApi> Default for Listener<H, C> {
fn default() -> Self {
Listener {
Self {
watchers: Default::default(),
finality_watchers: Default::default(),
}
@@ -115,7 +117,7 @@ impl<H: hash::Hash + traits::Member + Serialize, C: ChainApi> Listener<H, C> {
while self.finality_watchers.len() > MAX_FINALITY_WATCHERS {
if let Some((hash, txs)) = self.finality_watchers.pop_front() {
for tx in txs {
self.fire(&tx, |s| s.finality_timeout(hash.clone()));
self.fire(&tx, |s| s.finality_timeout(hash));
}
}
}
@@ -21,8 +21,6 @@ use std::{
sync::Arc,
};
use crate::{base_pool as base, watcher::Watcher};
use futures::Future;
use sp_runtime::{
generic::BlockId,
@@ -35,6 +33,7 @@ use sp_transaction_pool::error;
use wasm_timer::Instant;
use futures::channel::mpsc::Receiver;
use crate::{base_pool as base, watcher::Watcher};
use crate::validated_pool::ValidatedPool;
pub use crate::validated_pool::{IsValidator, ValidatedTransaction};
@@ -111,7 +110,7 @@ pub struct Options {
impl Default for Options {
fn default() -> Self {
Options {
Self {
ready: base::Limit {
count: 8192,
total_bytes: 20 * 1024 * 1024,
@@ -151,7 +150,7 @@ where
impl<B: ChainApi> Pool<B> {
/// Create a new transaction pool.
pub fn new(options: Options, is_validator: IsValidator, api: Arc<B>) -> Self {
Pool {
Self {
validated_pool: Arc::new(ValidatedPool::new(options, is_validator, api)),
}
}
@@ -193,7 +192,7 @@ impl<B: ChainApi> Pool<B> {
res.expect("One extrinsic passed; one result returned; qed")
}
/// Import a single extrinsic and starts to watch their progress in the pool.
/// Import a single extrinsic and starts to watch its progress in the pool.
pub async fn submit_and_watch(
&self,
at: &BlockId<B::Block>,
@@ -242,8 +241,8 @@ impl<B: ChainApi> Pool<B> {
// Prune all transactions that provide given tags
let prune_status = self.validated_pool.prune_tags(in_pool_tags)?;
let pruned_transactions = hashes.into_iter().cloned()
.chain(prune_status.pruned.iter().map(|tx| tx.hash.clone()));
let pruned_transactions = hashes.iter().cloned()
.chain(prune_status.pruned.iter().map(|tx| tx.hash));
self.validated_pool.fire_pruned(at, pruned_transactions)
}
@@ -337,7 +336,7 @@ impl<B: ChainApi> Pool<B> {
// note that `known_imported_hashes` will be rejected here due to temporary ban.
let pruned_hashes = prune_status.pruned
.iter()
.map(|tx| tx.hash.clone()).collect::<Vec<_>>();
.map(|tx| tx.hash).collect::<Vec<_>>();
let pruned_transactions = prune_status.pruned
.into_iter()
.map(|tx| (tx.source, tx.data.clone()));
@@ -402,7 +401,7 @@ impl<B: ChainApi> Pool<B> {
let ignore_banned = matches!(check, CheckBannedBeforeVerify::No);
if let Err(err) = self.validated_pool.check_is_known(&hash, ignore_banned) {
return (hash.clone(), ValidatedTransaction::Invalid(hash, err.into()))
return (hash, ValidatedTransaction::Invalid(hash, err))
}
let validation_result = self.validated_pool.api().validate_transaction(
@@ -413,17 +412,17 @@ impl<B: ChainApi> Pool<B> {
let status = match validation_result {
Ok(status) => status,
Err(e) => return (hash.clone(), ValidatedTransaction::Invalid(hash, e)),
Err(e) => return (hash, ValidatedTransaction::Invalid(hash, e)),
};
let validity = match status {
Ok(validity) => {
if validity.provides.is_empty() {
ValidatedTransaction::Invalid(hash.clone(), error::Error::NoTagsProvided.into())
ValidatedTransaction::Invalid(hash, error::Error::NoTagsProvided.into())
} else {
ValidatedTransaction::valid_at(
block_number.saturated_into::<u64>(),
hash.clone(),
hash,
source,
xt,
bytes,
@@ -432,9 +431,9 @@ impl<B: ChainApi> Pool<B> {
}
},
Err(TransactionValidityError::Invalid(e)) =>
ValidatedTransaction::Invalid(hash.clone(), error::Error::InvalidTransaction(e).into()),
ValidatedTransaction::Invalid(hash, error::Error::InvalidTransaction(e).into()),
Err(TransactionValidityError::Unknown(e)) =>
ValidatedTransaction::Unknown(hash.clone(), error::Error::UnknownTransaction(e).into()),
ValidatedTransaction::Unknown(hash, error::Error::UnknownTransaction(e).into()),
};
(hash, validity)
@@ -50,7 +50,7 @@ pub struct TransactionRef<Hash, Ex> {
impl<Hash, Ex> Clone for TransactionRef<Hash, Ex> {
fn clone(&self) -> Self {
TransactionRef {
Self {
transaction: self.transaction.clone(),
insertion_id: self.insertion_id,
}
@@ -93,7 +93,7 @@ pub struct ReadyTx<Hash, Ex> {
impl<Hash: Clone, Ex> Clone for ReadyTx<Hash, Ex> {
fn clone(&self) -> Self {
ReadyTx {
Self {
transaction: self.transaction.clone(),
unlocks: self.unlocks.clone(),
requires_offset: self.requires_offset,
@@ -128,7 +128,7 @@ impl<Hash, Ex> tracked_map::Size for ReadyTx<Hash, Ex> {
impl<Hash: hash::Hash + Eq, Ex> Default for ReadyTransactions<Hash, Ex> {
fn default() -> Self {
ReadyTransactions {
Self {
insertion_id: Default::default(),
provided_tags: Default::default(),
ready: Default::default(),
@@ -259,7 +259,7 @@ impl<Hash: hash::Hash + Member + Serialize, Ex> ReadyTransactions<Hash, Ex> {
/// (i.e. the entire subgraph that this transaction is a start of will be removed).
/// All removed transactions are returned.
pub fn remove_subtree(&mut self, hashes: &[Hash]) -> Vec<Arc<Transaction<Hash, Ex>>> {
let to_remove = hashes.iter().cloned().collect::<Vec<_>>();
let to_remove = hashes.to_vec();
self.remove_subtree_with_tag_filter(to_remove, None)
}
@@ -48,7 +48,7 @@ pub struct PoolRotator<Hash> {
impl<Hash: hash::Hash + Eq> Default for PoolRotator<Hash> {
fn default() -> Self {
PoolRotator {
Self {
ban_time: Duration::from_secs(60 * 30),
banned_until: Default::default(),
}
@@ -78,7 +78,6 @@ impl<Hash: hash::Hash + Eq + Clone> PoolRotator<Hash> {
}
}
/// Bans extrinsic if it's stale.
///
/// Returns `true` if extrinsic is stale and got banned.
@@ -22,7 +22,7 @@ use std::{
};
use parking_lot::{RwLock, RwLockWriteGuard, RwLockReadGuard};
/// Something that can report it's size.
/// Something that can report its size.
pub trait Size {
fn size(&self) -> usize;
}
@@ -64,14 +64,14 @@ impl<K, V> TrackedMap<K, V> {
}
/// Lock map for read.
pub fn read<'a>(&'a self) -> TrackedMapReadAccess<'a, K, V> {
pub fn read(&self) -> TrackedMapReadAccess<K, V> {
TrackedMapReadAccess {
inner_guard: self.index.read(),
}
}
/// Lock map for write.
pub fn write<'a>(&'a self) -> TrackedMapWriteAccess<'a, K, V> {
pub fn write(&self) -> TrackedMapWriteAccess<K, V> {
TrackedMapWriteAccess {
inner_guard: self.index.write(),
bytes: &self.bytes,
@@ -90,7 +90,7 @@ where
K: Eq + std::hash::Hash
{
/// Lock map for read.
pub fn read<'a>(&'a self) -> TrackedMapReadAccess<'a, K, V> {
pub fn read(&self) -> TrackedMapReadAccess<K, V> {
TrackedMapReadAccess {
inner_guard: self.0.read(),
}
@@ -136,10 +136,10 @@ where
let new_bytes = val.size();
self.bytes.fetch_add(new_bytes as isize, AtomicOrdering::Relaxed);
self.length.fetch_add(1, AtomicOrdering::Relaxed);
self.inner_guard.insert(key, val).and_then(|old_val| {
self.inner_guard.insert(key, val).map(|old_val| {
self.bytes.fetch_sub(old_val.size() as isize, AtomicOrdering::Relaxed);
self.length.fetch_sub(1, AtomicOrdering::Relaxed);
Some(old_val)
old_val
})
}
@@ -186,4 +186,4 @@ mod tests {
assert_eq!(map.bytes(), 1);
assert_eq!(map.len(), 1);
}
}
}
@@ -22,12 +22,7 @@ use std::{
sync::Arc,
};
use crate::base_pool as base;
use crate::listener::Listener;
use crate::rotator::PoolRotator;
use crate::watcher::Watcher;
use serde::Serialize;
use parking_lot::{Mutex, RwLock};
use sp_runtime::{
generic::BlockId,
@@ -39,7 +34,10 @@ use wasm_timer::Instant;
use futures::channel::mpsc::{channel, Sender};
use retain_mut::RetainMut;
use crate::base_pool::PruneStatus;
use crate::base_pool::{self as base, PruneStatus};
use crate::listener::Listener;
use crate::rotator::PoolRotator;
use crate::watcher::Watcher;
use crate::pool::{
EventStream, Options, ChainApi, BlockHash, ExtrinsicHash, ExtrinsicFor, TransactionFor,
};
@@ -95,13 +93,13 @@ pub struct IsValidator(Box<dyn Fn() -> bool + Send + Sync>);
impl From<bool> for IsValidator {
fn from(is_validator: bool) -> Self {
IsValidator(Box::new(move || is_validator))
Self(Box::new(move || is_validator))
}
}
impl From<Box<dyn Fn() -> bool + Send + Sync>> for IsValidator {
fn from(is_validator: Box<dyn Fn() -> bool + Send + Sync>) -> Self {
IsValidator(is_validator)
Self(is_validator)
}
}
@@ -134,7 +132,7 @@ impl<B: ChainApi> ValidatedPool<B> {
/// Create a new transaction pool.
pub fn new(options: Options, is_validator: IsValidator, api: Arc<B>) -> Self {
let base_pool = base::BasePool::new(options.reject_future_transactions);
ValidatedPool {
Self {
is_validator,
options,
listener: Default::default(),
@@ -168,7 +166,7 @@ impl<B: ChainApi> ValidatedPool<B> {
if !ignore_banned && self.is_banned(tx_hash) {
Err(error::Error::TemporarilyBanned.into())
} else if self.pool.read().is_imported(tx_hash) {
Err(error::Error::AlreadyImported(Box::new(tx_hash.clone())).into())
Err(error::Error::AlreadyImported(Box::new(*tx_hash)).into())
} else {
Ok(())
}
@@ -209,7 +207,7 @@ impl<B: ChainApi> ValidatedPool<B> {
if let base::Imported::Ready { ref hash, .. } = imported {
self.import_notification_sinks.lock()
.retain_mut(|sink| {
match sink.try_send(hash.clone()) {
match sink.try_send(*hash) {
Ok(()) => true,
Err(e) => {
if e.is_full() {
@@ -225,15 +223,15 @@ impl<B: ChainApi> ValidatedPool<B> {
let mut listener = self.listener.write();
fire_events(&mut *listener, &imported);
Ok(imported.hash().clone())
Ok(*imported.hash())
},
ValidatedTransaction::Invalid(hash, err) => {
self.rotator.ban(&Instant::now(), std::iter::once(hash));
Err(err.into())
Err(err)
},
ValidatedTransaction::Unknown(hash, err) => {
self.listener.write().invalid(&hash, false);
Err(err.into())
Err(err)
},
}
}
@@ -258,9 +256,9 @@ impl<B: ChainApi> ValidatedPool<B> {
let removed = {
let mut pool = self.pool.write();
let removed = pool.enforce_limits(ready_limit, future_limit)
.into_iter().map(|x| x.hash.clone()).collect::<HashSet<_>>();
.into_iter().map(|x| x.hash).collect::<HashSet<_>>();
// ban all removed transactions
self.rotator.ban(&Instant::now(), removed.iter().map(|x| x.clone()));
self.rotator.ban(&Instant::now(), removed.iter().copied());
removed
};
if !removed.is_empty() {
@@ -295,9 +293,9 @@ impl<B: ChainApi> ValidatedPool<B> {
},
ValidatedTransaction::Invalid(hash, err) => {
self.rotator.ban(&Instant::now(), std::iter::once(hash));
Err(err.into())
Err(err)
},
ValidatedTransaction::Unknown(_, err) => Err(err.into()),
ValidatedTransaction::Unknown(_, err) => Err(err),
}
}
@@ -327,9 +325,9 @@ impl<B: ChainApi> ValidatedPool<B> {
// note we are not considering tx with hash invalid here - we just want
// to remove it along with dependent transactions and `remove_subtree()`
// does exactly what we need
let removed = pool.remove_subtree(&[hash.clone()]);
let removed = pool.remove_subtree(&[hash]);
for removed_tx in removed {
let removed_hash = removed_tx.hash.clone();
let removed_hash = removed_tx.hash;
let updated_transaction = updated_transactions.remove(&removed_hash);
let tx_to_resubmit = if let Some(updated_tx) = updated_transaction {
updated_tx
@@ -343,7 +341,7 @@ impl<B: ChainApi> ValidatedPool<B> {
ValidatedTransaction::Valid(transaction)
};
initial_statuses.insert(removed_hash.clone(), Status::Ready);
initial_statuses.insert(removed_hash, Status::Ready);
txs_to_resubmit.push((removed_hash, tx_to_resubmit));
}
// make sure to remove the hash even if it's not present in the pool any more.
@@ -370,7 +368,7 @@ impl<B: ChainApi> ValidatedPool<B> {
final_statuses.insert(hash, Status::Failed);
}
for tx in removed {
final_statuses.insert(tx.hash.clone(), Status::Dropped);
final_statuses.insert(tx.hash, Status::Dropped);
}
},
base::Imported::Future { .. } => {
@@ -400,7 +398,7 @@ impl<B: ChainApi> ValidatedPool<B> {
// queue, updating final statuses as required
if reject_future_transactions {
for future_tx in pool.clear_future() {
final_statuses.insert(future_tx.hash.clone(), Status::Dropped);
final_statuses.insert(future_tx.hash, Status::Dropped);
}
}
@@ -428,7 +426,7 @@ impl<B: ChainApi> ValidatedPool<B> {
self.pool.read().by_hashes(&hashes)
.into_iter()
.map(|existing_in_pool| existing_in_pool
.map(|transaction| transaction.provides.iter().cloned().collect()))
.map(|transaction| transaction.provides.to_vec()))
.collect()
}
@@ -477,7 +475,7 @@ impl<B: ChainApi> ValidatedPool<B> {
.into_iter()
.enumerate()
.filter_map(|(idx, r)| match r.map_err(error::IntoPoolError::into_pool_error) {
Err(Ok(error::Error::InvalidTransaction(_))) => Some(pruned_hashes[idx].clone()),
Err(Ok(error::Error::InvalidTransaction(_))) => Some(pruned_hashes[idx]),
_ => None,
});
// Fire `pruned` notifications for collected hashes and make sure to include
@@ -498,7 +496,7 @@ impl<B: ChainApi> ValidatedPool<B> {
hashes: impl Iterator<Item=ExtrinsicHash<B>>,
) -> Result<(), B::Error> {
let header_hash = self.api.block_id_to_hash(at)?
.ok_or_else(|| error::Error::InvalidBlockId(format!("{:?}", at)).into())?;
.ok_or_else(|| error::Error::InvalidBlockId(format!("{:?}", at)))?;
let mut listener = self.listener.write();
let mut set = HashSet::with_capacity(hashes.size_hint().0);
for h in hashes {
@@ -519,13 +517,13 @@ impl<B: ChainApi> ValidatedPool<B> {
/// See `prune_tags` if you want this.
pub fn clear_stale(&self, at: &BlockId<B::Block>) -> Result<(), B::Error> {
let block_number = self.api.block_id_to_number(at)?
.ok_or_else(|| error::Error::InvalidBlockId(format!("{:?}", at)).into())?
.ok_or_else(|| error::Error::InvalidBlockId(format!("{:?}", at)))?
.saturated_into::<u64>();
let now = Instant::now();
let to_remove = {
self.ready()
.filter(|tx| self.rotator.ban_if_stale(&now, block_number, &tx))
.map(|tx| tx.hash.clone())
.map(|tx| tx.hash)
.collect::<Vec<_>>()
};
let futures_to_remove: Vec<ExtrinsicHash<B>> = {
@@ -533,7 +531,7 @@ impl<B: ChainApi> ValidatedPool<B> {
let mut hashes = Vec::new();
for tx in p.futures() {
if self.rotator.ban_if_stale(&now, block_number, &tx) {
hashes.push(tx.hash.clone());
hashes.push(tx.hash);
}
}
hashes