Quickly skip invalid transactions during block authorship. (#9789)

* Support skipping invalid transactions in the iterator.

* Expose concrete iterator.

* cargo +nightly fmt --all

* More consistent placement.

* Update Cargo.lock

* Pass transaction to 'report_invalid'
This commit is contained in:
Tomasz Drwięga
2021-10-01 16:25:13 +02:00
committed by GitHub
parent 29ff036463
commit 085935dd0a
8 changed files with 159 additions and 71 deletions
+21 -7
View File
@@ -30,8 +30,8 @@ use std::{borrow::Cow, collections::HashMap, pin::Pin, sync::Arc};
use node_primitives::Block;
use node_testing::bench::{BenchDb, BlockType, DatabaseType, KeyTypes, Profile};
use sc_transaction_pool_api::{
ImportNotificationStream, PoolFuture, PoolStatus, TransactionFor, TransactionSource,
TransactionStatusStreamFor, TxHash,
ImportNotificationStream, PoolFuture, PoolStatus, ReadyTransactions, TransactionFor,
TransactionSource, TransactionStatusStreamFor, TxHash,
};
use sp_consensus::{Environment, Proposer};
use sp_inherents::InherentDataProvider;
@@ -216,6 +216,19 @@ impl sc_transaction_pool_api::InPoolTransaction for PoolTransaction {
#[derive(Clone, Debug)]
pub struct Transactions(Vec<Arc<PoolTransaction>>);
pub struct TransactionsIterator(std::vec::IntoIter<Arc<PoolTransaction>>);
impl Iterator for TransactionsIterator {
type Item = Arc<PoolTransaction>;
fn next(&mut self) -> Option<Self::Item> {
self.0.next()
}
}
impl ReadyTransactions for TransactionsIterator {
fn report_invalid(&mut self, _tx: &Self::Item) {}
}
impl sc_transaction_pool_api::TransactionPool for Transactions {
type Block = Block;
@@ -257,16 +270,17 @@ impl sc_transaction_pool_api::TransactionPool for Transactions {
_at: NumberFor<Self::Block>,
) -> Pin<
Box<
dyn Future<Output = Box<dyn Iterator<Item = Arc<Self::InPoolTransaction>> + Send>>
+ Send,
dyn Future<
Output = Box<dyn ReadyTransactions<Item = Arc<Self::InPoolTransaction>> + Send>,
> + Send,
>,
> {
let iter: Box<dyn Iterator<Item = Arc<PoolTransaction>> + Send> =
Box::new(self.0.clone().into_iter());
let iter: Box<dyn ReadyTransactions<Item = Arc<PoolTransaction>> + Send> =
Box::new(TransactionsIterator(self.0.clone().into_iter()));
Box::pin(futures::future::ready(iter))
}
fn ready(&self) -> Box<dyn Iterator<Item = Arc<Self::InPoolTransaction>> + Send> {
fn ready(&self) -> Box<dyn ReadyTransactions<Item = Arc<Self::InPoolTransaction>> + Send> {
unimplemented!()
}
@@ -344,7 +344,7 @@ where
let mut t2 =
futures_timer::Delay::new(deadline.saturating_duration_since((self.now)()) / 8).fuse();
let pending_iterator = select! {
let mut pending_iterator = select! {
res = t1 => res,
_ = t2 => {
log::warn!(
@@ -363,7 +363,7 @@ where
let mut transaction_pushed = false;
let mut hit_block_size_limit = false;
for pending_tx in pending_iterator {
while let Some(pending_tx) = pending_iterator.next() {
if (self.now)() > deadline {
debug!(
"Consensus deadline reached when pushing block transactions, \
@@ -378,6 +378,7 @@ where
let block_size =
block_builder.estimate_block_size(self.include_proof_in_block_size_estimation);
if block_size + pending_tx_data.encoded_size() > block_size_limit {
pending_iterator.report_invalid(&pending_tx);
if skipped < MAX_SKIPPED_TRANSACTIONS {
skipped += 1;
debug!(
@@ -400,6 +401,7 @@ where
debug!("[{:?}] Pushed to the block.", pending_tx_hash);
},
Err(ApplyExtrinsicFailed(Validity(e))) if e.exhausted_resources() => {
pending_iterator.report_invalid(&pending_tx);
if skipped < MAX_SKIPPED_TRANSACTIONS {
skipped += 1;
debug!(
@@ -412,6 +414,7 @@ where
}
},
Err(e) if skipped > 0 => {
pending_iterator.report_invalid(&pending_tx);
trace!(
"[{:?}] Ignoring invalid transaction when skipping: {}",
pending_tx_hash,
@@ -419,6 +422,7 @@ where
);
},
Err(e) => {
pending_iterator.report_invalid(&pending_tx);
debug!("[{:?}] Invalid transaction: {}", pending_tx_hash, e);
unqueue_invalid.push(pending_tx_hash);
},
@@ -223,13 +223,14 @@ pub trait TransactionPool: Send + Sync {
at: NumberFor<Self::Block>,
) -> Pin<
Box<
dyn Future<Output = Box<dyn Iterator<Item = Arc<Self::InPoolTransaction>> + Send>>
+ Send,
dyn Future<
Output = Box<dyn ReadyTransactions<Item = Arc<Self::InPoolTransaction>> + Send>,
> + Send,
>,
>;
/// Get an iterator for ready transactions ordered by priority.
fn ready(&self) -> Box<dyn Iterator<Item = Arc<Self::InPoolTransaction>> + Send>;
fn ready(&self) -> Box<dyn ReadyTransactions<Item = Arc<Self::InPoolTransaction>> + Send>;
// *** Block production
/// Remove transactions identified by given hashes (and dependent transactions) from the pool.
@@ -254,6 +255,27 @@ pub trait TransactionPool: Send + Sync {
fn ready_transaction(&self, hash: &TxHash<Self>) -> Option<Arc<Self::InPoolTransaction>>;
}
/// An iterator of ready transactions.
///
/// The trait extends regular [`std::iter::Iterator`] trait and allows reporting
/// last-returned element as invalid.
///
/// The implementation is then allowed, for performance reasons, to change the elements
/// returned next, by e.g. skipping elements that are known to depend on the reported
/// transaction, which yields them invalid as well.
pub trait ReadyTransactions: Iterator {
/// Report given transaction as invalid.
///
/// This might affect subsequent elements returned by the iterator, so dependent transactions
/// are skipped for performance reasons.
fn report_invalid(&mut self, _tx: &Self::Item);
}
/// A no-op implementation for an empty iterator.
impl<T> ReadyTransactions for std::iter::Empty<T> {
fn report_invalid(&mut self, _tx: &T) {}
}
/// Events that the transaction pool listens for.
pub enum ChainEvent<B: BlockT> {
/// New best block have been added to the chain
@@ -1,39 +0,0 @@
[package]
name = "sc-transaction-graph"
version = "4.0.0-dev"
authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"
license = "GPL-3.0-or-later WITH Classpath-exception-2.0"
homepage = "https://substrate.dev"
repository = "https://github.com/paritytech/substrate/"
description = "Generic Transaction Pool"
readme = "README.md"
[package.metadata.docs.rs]
targets = ["x86_64-unknown-linux-gnu"]
[dependencies]
derive_more = "0.99.2"
thiserror = "1.0.21"
futures = "0.3.9"
log = "0.4.8"
parking_lot = "0.11.1"
serde = { version = "1.0.101", features = ["derive"] }
sp-blockchain = { version = "4.0.0-dev", path = "../../../primitives/blockchain" }
sc-utils = { version = "4.0.0-dev", path = "../../utils" }
sp-core = { version = "4.0.0-dev", path = "../../../primitives/core" }
sp-runtime = { version = "4.0.0-dev", path = "../../../primitives/runtime" }
sp-transaction-pool = { version = "4.0.0-dev", path = "../../../primitives/transaction-pool" }
parity-util-mem = { version = "0.10.0", default-features = false, features = ["primitive-types"] }
linked-hash-map = "0.5.4"
retain_mut = "0.1.3"
[dev-dependencies]
assert_matches = "1.3.0"
codec = { package = "parity-scale-codec", version = "2.0.0" }
substrate-test-runtime = { version = "2.0.0", path = "../../../test-utils/runtime" }
criterion = "0.3"
[[bench]]
name = "basics"
harness = false
@@ -36,7 +36,7 @@ use sp_runtime::{
use super::{
future::{FutureTransactions, WaitingTransaction},
ready::ReadyTransactions,
ready::{BestIterator, ReadyTransactions},
};
/// Successful import result.
@@ -355,7 +355,7 @@ impl<Hash: hash::Hash + Member + Serialize, Ex: std::fmt::Debug> BasePool<Hash,
}
/// Returns an iterator over ready transactions in the pool.
pub fn ready(&self) -> impl Iterator<Item = Arc<Transaction<Hash, Ex>>> {
pub fn ready(&self) -> BestIterator<Hash, Ex> {
self.ready.get()
}
@@ -23,7 +23,7 @@ use std::{
sync::Arc,
};
use log::trace;
use log::{debug, trace};
use sc_transaction_pool_api::error;
use serde::Serialize;
use sp_runtime::{traits::Member, transaction_validity::TransactionTag as Tag};
@@ -156,11 +156,16 @@ impl<Hash: hash::Hash + Member + Serialize, Ex> ReadyTransactions<Hash, Ex> {
/// - transactions that are valid for a shorter time go first
/// 4. Lastly we sort by the time in the queue
/// - transactions that are longer in the queue go first
pub fn get(&self) -> impl Iterator<Item = Arc<Transaction<Hash, Ex>>> {
///
/// The iterator is providing a way to report transactions that the receiver considers invalid.
/// In such case the entire subgraph of transactions that depend on the reported one will be
/// skipped.
pub fn get(&self) -> BestIterator<Hash, Ex> {
BestIterator {
all: self.ready.clone(),
best: self.best.clone(),
awaiting: Default::default(),
invalid: Default::default(),
}
}
@@ -482,6 +487,7 @@ pub struct BestIterator<Hash, Ex> {
all: ReadOnlyTrackedMap<Hash, ReadyTx<Hash, Ex>>,
awaiting: HashMap<Hash, (usize, TransactionRef<Hash, Ex>)>,
best: BTreeSet<TransactionRef<Hash, Ex>>,
invalid: HashSet<Hash>,
}
impl<Hash: hash::Hash + Member, Ex> BestIterator<Hash, Ex> {
@@ -498,6 +504,34 @@ impl<Hash: hash::Hash + Member, Ex> BestIterator<Hash, Ex> {
}
}
impl<Hash: hash::Hash + Member, Ex> sc_transaction_pool_api::ReadyTransactions
for BestIterator<Hash, Ex>
{
fn report_invalid(&mut self, tx: &Self::Item) {
BestIterator::report_invalid(self, tx)
}
}
impl<Hash: hash::Hash + Member, Ex> BestIterator<Hash, Ex> {
/// Report given transaction as invalid.
///
/// As a consequence, all values that depend on the invalid one will be skipped.
/// When given transaction is not in the pool it has no effect.
/// When invoked on a fully drained iterator it has no effect either.
pub fn report_invalid(&mut self, tx: &Arc<Transaction<Hash, Ex>>) {
if let Some(to_report) = self.all.read().get(&tx.hash) {
debug!(
target: "txpool",
"[{:?}] Reported as invalid. Will skip sub-chains while iterating.",
to_report.transaction.transaction.hash
);
for hash in &to_report.unlocks {
self.invalid.insert(hash.clone());
}
}
}
}
impl<Hash: hash::Hash + Member, Ex> Iterator for BestIterator<Hash, Ex> {
type Item = Arc<Transaction<Hash, Ex>>;
@@ -505,8 +539,19 @@ impl<Hash: hash::Hash + Member, Ex> Iterator for BestIterator<Hash, Ex> {
loop {
let best = self.best.iter().next_back()?.clone();
let best = self.best.take(&best)?;
let hash = &best.transaction.hash;
let next = self.all.read().get(&best.transaction.hash).cloned();
// Check if the transaction was marked invalid.
if self.invalid.contains(hash) {
debug!(
target: "txpool",
"[{:?}] Skipping invalid child transaction while iterating.",
hash
);
continue
}
let next = self.all.read().get(hash).cloned();
let ready = match next {
Some(ready) => ready,
// The transaction is not in all, maybe it was removed in the meantime?
@@ -635,10 +680,13 @@ mod tests {
assert_eq!(ready.get().count(), 3);
}
#[test]
fn should_return_best_transactions_in_correct_order() {
// given
let mut ready = ReadyTransactions::default();
/// Populate the pool, with a graph that looks like so:
///
/// tx1 -> tx2 \
/// -> -> tx3
/// -> tx4 -> tx5 -> tx6
/// -> tx7
fn populate_pool(ready: &mut ReadyTransactions<u64, Vec<u8>>) {
let mut tx1 = tx(1);
tx1.requires.clear();
let mut tx2 = tx(2);
@@ -649,11 +697,17 @@ mod tests {
tx3.provides = vec![];
let mut tx4 = tx(4);
tx4.requires = vec![tx1.provides[0].clone()];
tx4.provides = vec![];
let tx5 = Transaction {
data: vec![5],
tx4.provides = vec![vec![107]];
let mut tx5 = tx(5);
tx5.requires = vec![tx4.provides[0].clone()];
tx5.provides = vec![vec![108]];
let mut tx6 = tx(6);
tx6.requires = vec![tx5.provides[0].clone()];
tx6.provides = vec![];
let tx7 = Transaction {
data: vec![7],
bytes: 1,
hash: 5,
hash: 7,
priority: 1,
valid_till: u64::MAX, // use the max here for testing.
requires: vec![tx1.provides[0].clone()],
@@ -663,20 +717,30 @@ mod tests {
};
// when
for tx in vec![tx1, tx2, tx3, tx4, tx5] {
import(&mut ready, tx).unwrap();
for tx in vec![tx1, tx2, tx3, tx7, tx4, tx5, tx6] {
import(ready, tx).unwrap();
}
// then
assert_eq!(ready.best.len(), 1);
}
#[test]
fn should_return_best_transactions_in_correct_order() {
// given
let mut ready = ReadyTransactions::default();
populate_pool(&mut ready);
// when
let mut it = ready.get().map(|tx| tx.data[0]);
// then
assert_eq!(it.next(), Some(1));
assert_eq!(it.next(), Some(2));
assert_eq!(it.next(), Some(3));
assert_eq!(it.next(), Some(4));
assert_eq!(it.next(), Some(5));
assert_eq!(it.next(), Some(6));
assert_eq!(it.next(), Some(7));
assert_eq!(it.next(), None);
}
@@ -725,4 +789,26 @@ mod tests {
TransactionRef { transaction: Arc::new(with_priority(3, 3)), insertion_id: 2 }
);
}
#[test]
fn should_skip_invalid_transactions_while_iterating() {
// given
let mut ready = ReadyTransactions::default();
populate_pool(&mut ready);
// when
let mut it = ready.get();
let data = |tx: &Arc<Transaction<u64, Vec<u8>>>| tx.data[0];
// then
assert_eq!(it.next().as_ref().map(data), Some(1));
assert_eq!(it.next().as_ref().map(data), Some(2));
assert_eq!(it.next().as_ref().map(data), Some(3));
let tx4 = it.next();
assert_eq!(tx4.as_ref().map(data), Some(4));
// report 4 as invalid, which should skip 5 & 6.
it.report_invalid(&tx4.unwrap());
assert_eq!(it.next().as_ref().map(data), Some(7));
assert_eq!(it.next().as_ref().map(data), None);
}
}
@@ -25,7 +25,7 @@ use std::{
use futures::channel::mpsc::{channel, Sender};
use parking_lot::{Mutex, RwLock};
use retain_mut::RetainMut;
use sc_transaction_pool_api::{error, PoolStatus};
use sc_transaction_pool_api::{error, PoolStatus, ReadyTransactions};
use serde::Serialize;
use sp_runtime::{
generic::BlockId,
@@ -630,7 +630,7 @@ impl<B: ChainApi> ValidatedPool<B> {
}
/// Get an iterator for ready transactions ordered by priority
pub fn ready(&self) -> impl Iterator<Item = TransactionFor<B>> + Send {
pub fn ready(&self) -> impl ReadyTransactions<Item = TransactionFor<B>> + Send {
self.pool.read().ready()
}
+3 -2
View File
@@ -56,7 +56,8 @@ use std::{
use graph::{ExtrinsicHash, IsValidator};
use sc_transaction_pool_api::{
ChainEvent, ImportNotificationStream, MaintainedTransactionPool, PoolFuture, PoolStatus,
TransactionFor, TransactionPool, TransactionSource, TransactionStatusStreamFor, TxHash,
ReadyTransactions, TransactionFor, TransactionPool, TransactionSource,
TransactionStatusStreamFor, TxHash,
};
use sp_core::traits::SpawnEssentialNamed;
use sp_runtime::{
@@ -69,7 +70,7 @@ use crate::metrics::MetricsLink as PrometheusMetrics;
use prometheus_endpoint::Registry as PrometheusRegistry;
type BoxedReadyIterator<Hash, Data> =
Box<dyn Iterator<Item = Arc<graph::base_pool::Transaction<Hash, Data>>> + Send>;
Box<dyn ReadyTransactions<Item = Arc<graph::base_pool::Transaction<Hash, Data>>> + Send>;
type ReadyIteratorFor<PoolApi> =
BoxedReadyIterator<graph::ExtrinsicHash<PoolApi>, graph::ExtrinsicFor<PoolApi>>;