mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-11 21:11:07 +00:00
Transaction pool: Ensure that we prune transactions properly (#8963)
* Transaction pool: Ensure that we prune transactions properly There was a bug in the transaction pool that we didn't pruned transactions properly because we called `prune_known`, instead of `prune`. This bug was introduced by: https://github.com/paritytech/substrate/pull/4629 This is required to have stale extrinsics being removed properly, so that they don't fill up the tx pool. * Fix compilation * Fix benches * ...
This commit is contained in:
@@ -23,7 +23,7 @@ use sc_transaction_graph::*;
|
||||
use codec::Encode;
|
||||
use substrate_test_runtime::{Block, Extrinsic, Transfer, H256, AccountId};
|
||||
use sp_runtime::{
|
||||
generic::BlockId,
|
||||
generic::BlockId, traits::Block as BlockT,
|
||||
transaction_validity::{
|
||||
ValidTransaction, InvalidTransaction, TransactionValidity, TransactionTag as Tag,
|
||||
TransactionSource,
|
||||
@@ -114,6 +114,13 @@ impl ChainApi for TestApi {
|
||||
fn block_body(&self, _id: &BlockId<Self::Block>) -> Self::BodyFuture {
|
||||
ready(Ok(None))
|
||||
}
|
||||
|
||||
fn block_header(
|
||||
&self,
|
||||
_: &BlockId<Self::Block>,
|
||||
) -> Result<Option<<Self::Block as BlockT>::Header>, Self::Error> {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
fn uxt(transfer: Transfer) -> Extrinsic {
|
||||
|
||||
@@ -23,7 +23,7 @@ use std::{
|
||||
|
||||
use linked_hash_map::LinkedHashMap;
|
||||
use serde::Serialize;
|
||||
use log::{debug, trace, warn};
|
||||
use log::{debug, trace};
|
||||
use sp_runtime::traits;
|
||||
|
||||
use crate::{watcher, ChainApi, ExtrinsicHash, BlockHash};
|
||||
@@ -99,12 +99,8 @@ impl<H: hash::Hash + traits::Member + Serialize, C: ChainApi> Listener<H, C> {
|
||||
}
|
||||
|
||||
/// Transaction was removed as invalid.
|
||||
pub fn invalid(&mut self, tx: &H, warn: bool) {
|
||||
if warn {
|
||||
warn!(target: "txpool", "[{:?}] Extrinsic invalid", tx);
|
||||
} else {
|
||||
debug!(target: "txpool", "[{:?}] Extrinsic invalid", tx);
|
||||
}
|
||||
pub fn invalid(&mut self, tx: &H) {
|
||||
debug!(target: "txpool", "[{:?}] Extrinsic invalid", tx);
|
||||
self.fire(tx, |watcher| watcher.invalid());
|
||||
}
|
||||
|
||||
|
||||
@@ -95,6 +95,12 @@ pub trait ChainApi: Send + Sync {
|
||||
|
||||
/// Returns a block body given the block id.
|
||||
fn block_body(&self, at: &BlockId<Self::Block>) -> Self::BodyFuture;
|
||||
|
||||
/// Returns a block header given the block id.
|
||||
fn block_header(
|
||||
&self,
|
||||
at: &BlockId<Self::Block>,
|
||||
) -> Result<Option<<Self::Block as BlockT>::Header>, Self::Error>;
|
||||
}
|
||||
|
||||
/// Pool configuration options.
|
||||
@@ -237,7 +243,7 @@ impl<B: ChainApi> Pool<B> {
|
||||
) -> Result<(), B::Error> {
|
||||
// Get details of all extrinsics that are already in the pool
|
||||
let in_pool_tags = self.validated_pool.extrinsics_tags(hashes)
|
||||
.into_iter().filter_map(|x| x).flat_map(|x| x);
|
||||
.into_iter().filter_map(|x| x).flatten();
|
||||
|
||||
// Prune all transactions that provide given tags
|
||||
let prune_status = self.validated_pool.prune_tags(in_pool_tags)?;
|
||||
@@ -579,6 +585,13 @@ mod tests {
|
||||
fn block_body(&self, _id: &BlockId<Self::Block>) -> Self::BodyFuture {
|
||||
futures::future::ready(Ok(None))
|
||||
}
|
||||
|
||||
fn block_header(
|
||||
&self,
|
||||
_: &BlockId<Self::Block>,
|
||||
) -> Result<Option<<Self::Block as BlockT>::Header>, Self::Error> {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
fn uxt(transfer: Transfer) -> Extrinsic {
|
||||
|
||||
@@ -230,7 +230,7 @@ impl<B: ChainApi> ValidatedPool<B> {
|
||||
Err(err)
|
||||
},
|
||||
ValidatedTransaction::Unknown(hash, err) => {
|
||||
self.listener.write().invalid(&hash, false);
|
||||
self.listener.write().invalid(&hash);
|
||||
Err(err)
|
||||
},
|
||||
}
|
||||
@@ -415,7 +415,7 @@ impl<B: ChainApi> ValidatedPool<B> {
|
||||
Status::Future => listener.future(&hash),
|
||||
Status::Ready => listener.ready(&hash, None),
|
||||
Status::Dropped => listener.dropped(&hash, None),
|
||||
Status::Failed => listener.invalid(&hash, initial_status.is_some()),
|
||||
Status::Failed => listener.invalid(&hash),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -423,10 +423,12 @@ impl<B: ChainApi> ValidatedPool<B> {
|
||||
|
||||
/// For each extrinsic, returns tags that it provides (if known), or None (if it is unknown).
|
||||
pub fn extrinsics_tags(&self, hashes: &[ExtrinsicHash<B>]) -> Vec<Option<Vec<Tag>>> {
|
||||
self.pool.read().by_hashes(&hashes)
|
||||
self.pool.read()
|
||||
.by_hashes(&hashes)
|
||||
.into_iter()
|
||||
.map(|existing_in_pool| existing_in_pool
|
||||
.map(|transaction| transaction.provides.to_vec()))
|
||||
.map(|existing_in_pool|
|
||||
existing_in_pool.map(|transaction| transaction.provides.to_vec())
|
||||
)
|
||||
.collect()
|
||||
}
|
||||
|
||||
@@ -599,7 +601,7 @@ impl<B: ChainApi> ValidatedPool<B> {
|
||||
|
||||
let mut listener = self.listener.write();
|
||||
for tx in &invalid {
|
||||
listener.invalid(&tx.hash, true);
|
||||
listener.invalid(&tx.hash);
|
||||
}
|
||||
|
||||
invalid
|
||||
@@ -645,15 +647,9 @@ fn fire_events<H, B, Ex>(
|
||||
match *imported {
|
||||
base::Imported::Ready { ref promoted, ref failed, ref removed, ref hash } => {
|
||||
listener.ready(hash, None);
|
||||
for f in failed {
|
||||
listener.invalid(f, true);
|
||||
}
|
||||
for r in removed {
|
||||
listener.dropped(&r.hash, Some(hash));
|
||||
}
|
||||
for p in promoted {
|
||||
listener.ready(p, None);
|
||||
}
|
||||
failed.into_iter().for_each(|f| listener.invalid(f));
|
||||
removed.into_iter().for_each(|r| listener.dropped(&r.hash, Some(hash)));
|
||||
promoted.into_iter().for_each(|p| listener.ready(p, None));
|
||||
},
|
||||
base::Imported::Future { ref hash } => {
|
||||
listener.future(hash)
|
||||
|
||||
@@ -81,7 +81,7 @@ impl<Client, Block> FullChainApi<Client, Block> {
|
||||
impl<Client, Block> sc_transaction_graph::ChainApi for FullChainApi<Client, Block>
|
||||
where
|
||||
Block: BlockT,
|
||||
Client: ProvideRuntimeApi<Block> + BlockBackend<Block> + BlockIdTo<Block>,
|
||||
Client: ProvideRuntimeApi<Block> + BlockBackend<Block> + BlockIdTo<Block> + HeaderBackend<Block>,
|
||||
Client: Send + Sync + 'static,
|
||||
Client::Api: TaggedTransactionQueue<Block>,
|
||||
{
|
||||
@@ -150,6 +150,13 @@ where
|
||||
(<traits::HashFor::<Block> as traits::Hash>::hash(x), x.len())
|
||||
})
|
||||
}
|
||||
|
||||
fn block_header(
|
||||
&self,
|
||||
at: &BlockId<Self::Block>,
|
||||
) -> Result<Option<<Self::Block as BlockT>::Header>, Self::Error> {
|
||||
self.client.header(*at).map_err(Into::into)
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper function to validate a transaction using a full chain API.
|
||||
@@ -162,7 +169,7 @@ fn validate_transaction_blocking<Client, Block>(
|
||||
) -> error::Result<TransactionValidity>
|
||||
where
|
||||
Block: BlockT,
|
||||
Client: ProvideRuntimeApi<Block> + BlockBackend<Block> + BlockIdTo<Block>,
|
||||
Client: ProvideRuntimeApi<Block> + BlockBackend<Block> + BlockIdTo<Block> + HeaderBackend<Block>,
|
||||
Client: Send + Sync + 'static,
|
||||
Client::Api: TaggedTransactionQueue<Block>,
|
||||
{
|
||||
@@ -193,7 +200,7 @@ where
|
||||
impl<Client, Block> FullChainApi<Client, Block>
|
||||
where
|
||||
Block: BlockT,
|
||||
Client: ProvideRuntimeApi<Block> + BlockBackend<Block> + BlockIdTo<Block>,
|
||||
Client: ProvideRuntimeApi<Block> + BlockBackend<Block> + BlockIdTo<Block> + HeaderBackend<Block>,
|
||||
Client: Send + Sync + 'static,
|
||||
Client::Api: TaggedTransactionQueue<Block>,
|
||||
{
|
||||
@@ -333,4 +340,11 @@ impl<Client, F, Block> sc_transaction_graph::ChainApi for
|
||||
Ok(Some(transactions))
|
||||
}.boxed()
|
||||
}
|
||||
|
||||
fn block_header(
|
||||
&self,
|
||||
at: &BlockId<Self::Block>,
|
||||
) -> Result<Option<<Self::Block as BlockT>::Header>, Self::Error> {
|
||||
self.client.header(*at).map_err(Into::into)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -40,7 +40,7 @@ use parking_lot::Mutex;
|
||||
|
||||
use sp_runtime::{
|
||||
generic::BlockId,
|
||||
traits::{Block as BlockT, NumberFor, AtLeast32Bit, Extrinsic, Zero},
|
||||
traits::{Block as BlockT, NumberFor, AtLeast32Bit, Extrinsic, Zero, Header as HeaderT},
|
||||
};
|
||||
use sp_core::traits::SpawnNamed;
|
||||
use sp_transaction_pool::{
|
||||
@@ -379,6 +379,7 @@ where
|
||||
Block: BlockT,
|
||||
Client: sp_api::ProvideRuntimeApi<Block>
|
||||
+ sc_client_api::BlockBackend<Block>
|
||||
+ sc_client_api::blockchain::HeaderBackend<Block>
|
||||
+ sp_runtime::traits::BlockIdTo<Block>
|
||||
+ sc_client_api::ExecutorProvider<Block>
|
||||
+ sc_client_api::UsageProvider<Block>
|
||||
@@ -419,6 +420,7 @@ where
|
||||
Block: BlockT,
|
||||
Client: sp_api::ProvideRuntimeApi<Block>
|
||||
+ sc_client_api::BlockBackend<Block>
|
||||
+ sc_client_api::blockchain::HeaderBackend<Block>
|
||||
+ sp_runtime::traits::BlockIdTo<Block>,
|
||||
Client: Send + Sync + 'static,
|
||||
Client::Api: sp_transaction_pool::runtime_api::TaggedTransactionQueue<Block>,
|
||||
@@ -555,19 +557,32 @@ async fn prune_known_txs_for_block<Block: BlockT, Api: ChainApi<Block = Block>>(
|
||||
api: &Api,
|
||||
pool: &sc_transaction_graph::Pool<Api>,
|
||||
) -> Vec<ExtrinsicHash<Api>> {
|
||||
let hashes = api.block_body(&block_id).await
|
||||
let extrinsics = api.block_body(&block_id).await
|
||||
.unwrap_or_else(|e| {
|
||||
log::warn!("Prune known transactions: error request {:?}!", e);
|
||||
None
|
||||
})
|
||||
.unwrap_or_default()
|
||||
.into_iter()
|
||||
.unwrap_or_default();
|
||||
|
||||
let hashes = extrinsics.iter()
|
||||
.map(|tx| pool.hash_of(&tx))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
log::trace!(target: "txpool", "Pruning transactions: {:?}", hashes);
|
||||
|
||||
if let Err(e) = pool.prune_known(&block_id, &hashes) {
|
||||
let header = match api.block_header(&block_id) {
|
||||
Ok(Some(h)) => h,
|
||||
Ok(None) => {
|
||||
log::debug!(target: "txpool", "Could not find header for {:?}.", block_id);
|
||||
return hashes
|
||||
},
|
||||
Err(e) => {
|
||||
log::debug!(target: "txpool", "Error retrieving header for {:?}: {:?}", block_id, e);
|
||||
return hashes
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(e) = pool.prune(&block_id, &BlockId::hash(*header.parent_hash()), &extrinsics).await {
|
||||
log::error!("Cannot prune known in the pool {:?}!", e);
|
||||
}
|
||||
|
||||
|
||||
@@ -306,31 +306,6 @@ fn should_not_retain_invalid_hashes_from_retracted() {
|
||||
assert_eq!(pool.status().ready, 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn should_revalidate_transaction_multiple_times() {
|
||||
let xt = uxt(Alice, 209);
|
||||
|
||||
let (pool, _guard, mut notifier) = maintained_pool();
|
||||
|
||||
block_on(pool.submit_one(&BlockId::number(0), SOURCE, xt.clone())).expect("1. Imported");
|
||||
assert_eq!(pool.status().ready, 1);
|
||||
|
||||
let header = pool.api.push_block(1, vec![xt.clone()], true);
|
||||
|
||||
block_on(pool.maintain(block_event(header)));
|
||||
|
||||
block_on(pool.submit_one(&BlockId::number(0), SOURCE, xt.clone())).expect("1. Imported");
|
||||
assert_eq!(pool.status().ready, 1);
|
||||
|
||||
let header = pool.api.push_block(2, vec![], true);
|
||||
pool.api.add_invalid(&xt);
|
||||
|
||||
block_on(pool.maintain(block_event(header)));
|
||||
block_on(notifier.next());
|
||||
|
||||
assert_eq!(pool.status().ready, 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn should_revalidate_across_many_blocks() {
|
||||
let xt1 = uxt(Alice, 209);
|
||||
@@ -1002,21 +977,13 @@ fn pruning_a_transaction_should_remove_it_from_best_transaction() {
|
||||
let xt1 = Extrinsic::IncludeData(Vec::new());
|
||||
|
||||
block_on(pool.submit_one(&BlockId::number(0), SOURCE, xt1.clone())).expect("1. Imported");
|
||||
assert_eq!(pool.status().ready, 1);
|
||||
let header = pool.api.push_block(1, vec![xt1.clone()], true);
|
||||
|
||||
// This will prune `xt1`.
|
||||
block_on(pool.maintain(block_event(header)));
|
||||
|
||||
// Submit the tx again.
|
||||
block_on(pool.submit_one(&BlockId::number(1), SOURCE, xt1.clone())).expect("2. Imported");
|
||||
|
||||
let mut iterator = block_on(pool.ready_at(1));
|
||||
|
||||
assert_eq!(iterator.next().unwrap().data, xt1.clone());
|
||||
|
||||
// If the tx was not removed from the best txs, the tx would be
|
||||
// returned a second time by the iterator.
|
||||
assert!(iterator.next().is_none());
|
||||
assert_eq!(pool.status().ready, 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -1038,3 +1005,79 @@ fn only_revalidate_on_best_block() {
|
||||
|
||||
assert_eq!(pool.status().ready, 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stale_transactions_are_pruned() {
|
||||
sp_tracing::try_init_simple();
|
||||
|
||||
// Our initial transactions
|
||||
let xts = vec![
|
||||
Transfer {
|
||||
from: Alice.into(),
|
||||
to: Bob.into(),
|
||||
nonce: 1,
|
||||
amount: 1,
|
||||
},
|
||||
Transfer {
|
||||
from: Alice.into(),
|
||||
to: Bob.into(),
|
||||
nonce: 2,
|
||||
amount: 1,
|
||||
},
|
||||
Transfer {
|
||||
from: Alice.into(),
|
||||
to: Bob.into(),
|
||||
nonce: 3,
|
||||
amount: 1,
|
||||
},
|
||||
];
|
||||
|
||||
let (pool, _guard, _notifier) = maintained_pool();
|
||||
|
||||
xts.into_iter().for_each(|xt| {
|
||||
block_on(
|
||||
pool.submit_one(&BlockId::number(0), SOURCE, xt.into_signed_tx()),
|
||||
).expect("1. Imported");
|
||||
});
|
||||
assert_eq!(pool.status().ready, 0);
|
||||
assert_eq!(pool.status().future, 3);
|
||||
|
||||
// Almost the same as our initial transactions, but with some different `amount`s to make them
|
||||
// generate a different hash
|
||||
let xts = vec![
|
||||
Transfer {
|
||||
from: Alice.into(),
|
||||
to: Bob.into(),
|
||||
nonce: 1,
|
||||
amount: 2,
|
||||
}.into_signed_tx(),
|
||||
Transfer {
|
||||
from: Alice.into(),
|
||||
to: Bob.into(),
|
||||
nonce: 2,
|
||||
amount: 2,
|
||||
}.into_signed_tx(),
|
||||
Transfer {
|
||||
from: Alice.into(),
|
||||
to: Bob.into(),
|
||||
nonce: 3,
|
||||
amount: 2,
|
||||
}.into_signed_tx(),
|
||||
];
|
||||
|
||||
// Import block
|
||||
let header = pool.api.push_block(1, xts, true);
|
||||
block_on(pool.maintain(block_event(header)));
|
||||
// The imported transactions have a different hash and should not evict our initial
|
||||
// transactions.
|
||||
assert_eq!(pool.status().future, 3);
|
||||
|
||||
// Import enough blocks to make our transactions stale
|
||||
for n in 1..66 {
|
||||
let header = pool.api.push_block(n, vec![], true);
|
||||
block_on(pool.maintain(block_event(header)));
|
||||
}
|
||||
|
||||
assert_eq!(pool.status().future, 0);
|
||||
assert_eq!(pool.status().ready, 0);
|
||||
}
|
||||
|
||||
@@ -23,7 +23,7 @@ use codec::Encode;
|
||||
use parking_lot::RwLock;
|
||||
use sp_runtime::{
|
||||
generic::{self, BlockId},
|
||||
traits::{BlakeTwo256, Hash as HashT, Block as _, Header as _},
|
||||
traits::{BlakeTwo256, Hash as HashT, Block as BlockT, Header as _},
|
||||
transaction_validity::{
|
||||
TransactionValidity, ValidTransaction, TransactionValidityError, InvalidTransaction,
|
||||
TransactionSource,
|
||||
@@ -346,6 +346,24 @@ impl sc_transaction_graph::ChainApi for TestApi {
|
||||
.map(|b| b.extrinsics().to_vec()),
|
||||
}))
|
||||
}
|
||||
|
||||
fn block_header(
|
||||
&self,
|
||||
at: &BlockId<Self::Block>,
|
||||
) -> Result<Option<<Self::Block as BlockT>::Header>, Self::Error> {
|
||||
Ok(match at {
|
||||
BlockId::Number(num) => self.chain
|
||||
.read()
|
||||
.block_by_number
|
||||
.get(num)
|
||||
.map(|b| b[0].0.header().clone()),
|
||||
BlockId::Hash(hash) => self.chain
|
||||
.read()
|
||||
.block_by_hash
|
||||
.get(hash)
|
||||
.map(|b| b.header().clone()),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl sp_blockchain::HeaderMetadata<Block> for TestApi {
|
||||
|
||||
Reference in New Issue
Block a user