Fix transaction pruning in tx-pool (#6276)

The `tree_route` generated by the import notification is only from the
old best block to the new best parent. This means, it does not contain
the new best block in `enacted()`. We need to prune the transactions of
the new best block "manually" to fix this bug.

Besides that, this pr also changed the `id` parameter of the `NewBlock`
chain event to `hash`. The hash of a block is unique in contrast to the
block number. (Block id can either be number or hash)
This commit is contained in:
Bastian Köcher
2020-06-08 12:38:19 +02:00
committed by GitHub
parent 84cdb02963
commit 663cd09be9
11 changed files with 134 additions and 81 deletions
@@ -18,7 +18,7 @@
use crate::*;
use sp_transaction_pool::TransactionStatus;
use futures::executor::block_on;
use futures::executor::{block_on, block_on_stream};
use txpool::{self, Pool};
use sp_runtime::{
generic::BlockId,
@@ -26,11 +26,15 @@ use sp_runtime::{
};
use substrate_test_runtime_client::{
runtime::{Block, Hash, Index, Header, Extrinsic, Transfer}, AccountKeyring::*,
ClientBlockImportExt,
};
use substrate_test_runtime_transaction_pool::{TestApi, uxt};
use futures::{prelude::*, task::Poll};
use codec::Encode;
use std::collections::BTreeSet;
use sc_client_api::client::BlockchainEvents;
use sc_block_builder::BlockBuilderProvider;
use sp_consensus::BlockOrigin;
fn pool() -> Pool<TestApi> {
Pool::new(Default::default(), TestApi::with_alice_nonce(209).into())
@@ -50,16 +54,6 @@ fn maintained_pool() -> (
(pool, thread_pool, notifier)
}
fn header(number: u64) -> Header {
Header {
number,
digest: Default::default(),
extrinsics_root: Default::default(),
parent_hash: Default::default(),
state_root: Default::default(),
}
}
const SOURCE: TransactionSource = TransactionSource::External;
#[test]
@@ -153,7 +147,7 @@ fn only_prune_on_new_best() {
assert_eq!(pool.status().ready, 1);
let event = ChainEvent::NewBlock {
id: BlockId::Number(1),
hash: header.hash(),
is_new_best: false,
header: header.clone(),
tree_route: None,
@@ -163,7 +157,7 @@ fn only_prune_on_new_best() {
let header = pool.api.push_block(2, vec![uxt]);
let event = ChainEvent::NewBlock {
id: BlockId::Number(2),
hash: header.hash(),
is_new_best: true,
header: header.clone(),
tree_route: None,
@@ -209,12 +203,12 @@ fn should_correctly_prune_transactions_providing_more_than_one_tag() {
assert_eq!(pool.validated_pool().status().future, 2);
}
fn block_event(id: u64) -> ChainEvent<Block> {
fn block_event(header: Header) -> ChainEvent<Block> {
ChainEvent::NewBlock {
id: BlockId::number(id),
hash: header.hash(),
is_new_best: true,
tree_route: None,
header: header(id),
header,
}
}
@@ -223,10 +217,10 @@ fn block_event_with_retracted(
retracted_start: Hash,
api: &TestApi,
) -> ChainEvent<Block> {
let tree_route = api.tree_route(retracted_start, header.hash()).expect("Tree route exists");
let tree_route = api.tree_route(retracted_start, header.parent_hash).expect("Tree route exists");
ChainEvent::NewBlock {
id: BlockId::hash(header.hash()),
hash: header.hash(),
is_new_best: true,
tree_route: Some(Arc::new(tree_route)),
header,
@@ -242,9 +236,9 @@ fn should_prune_old_during_maintenance() {
block_on(pool.submit_one(&BlockId::number(0), SOURCE, xt.clone())).expect("1. Imported");
assert_eq!(pool.status().ready, 1);
pool.api.push_block(1, vec![xt.clone()]);
let header = pool.api.push_block(1, vec![xt.clone()]);
block_on(pool.maintain(block_event(1)));
block_on(pool.maintain(block_event(header)));
assert_eq!(pool.status().ready, 0);
}
@@ -259,9 +253,9 @@ fn should_revalidate_during_maintenance() {
assert_eq!(pool.status().ready, 2);
assert_eq!(pool.api.validation_requests().len(), 2);
pool.api.push_block(1, vec![xt1.clone()]);
let header = pool.api.push_block(1, vec![xt1.clone()]);
block_on(pool.maintain(block_event(1)));
block_on(pool.maintain(block_event(header)));
assert_eq!(pool.status().ready, 1);
block_on(notifier.next());
@@ -317,17 +311,17 @@ fn should_revalidate_transaction_multiple_times() {
block_on(pool.submit_one(&BlockId::number(0), SOURCE, xt.clone())).expect("1. Imported");
assert_eq!(pool.status().ready, 1);
pool.api.push_block(1, vec![xt.clone()]);
let header = pool.api.push_block(1, vec![xt.clone()]);
block_on(pool.maintain(block_event(1)));
block_on(pool.maintain(block_event(header)));
block_on(pool.submit_one(&BlockId::number(0), SOURCE, xt.clone())).expect("1. Imported");
assert_eq!(pool.status().ready, 1);
pool.api.push_block(2, vec![]);
let header = pool.api.push_block(2, vec![]);
pool.api.add_invalid(&xt);
block_on(pool.maintain(block_event(2)));
block_on(pool.maintain(block_event(header)));
block_on(notifier.next());
assert_eq!(pool.status().ready, 0);
@@ -345,15 +339,15 @@ fn should_revalidate_across_many_blocks() {
block_on(pool.submit_one(&BlockId::number(1), SOURCE, xt2.clone())).expect("1. Imported");
assert_eq!(pool.status().ready, 2);
pool.api.push_block(1, vec![]);
block_on(pool.maintain(block_event(1)));
let header = pool.api.push_block(1, vec![]);
block_on(pool.maintain(block_event(header)));
block_on(notifier.next());
block_on(pool.submit_one(&BlockId::number(2), SOURCE, xt3.clone())).expect("1. Imported");
assert_eq!(pool.status().ready, 3);
pool.api.push_block(2, vec![xt1.clone()]);
block_on(pool.maintain(block_event(2)));
let header = pool.api.push_block(2, vec![xt1.clone()]);
block_on(pool.maintain(block_event(header)));
block_on(notifier.next());
assert_eq!(pool.status().ready, 2);
@@ -398,7 +392,8 @@ fn should_push_watchers_during_maintaince() {
pool.api.add_invalid(&tx4);
// clear timer events if any
block_on(pool.maintain(block_event(0)));
let header = pool.api.push_block(1, vec![]);
block_on(pool.maintain(block_event(header)));
block_on(notifier.next());
// then
@@ -415,8 +410,9 @@ fn should_push_watchers_during_maintaince() {
);
// when
let header_hash = pool.api.push_block(1, vec![tx0, tx1, tx2]).hash();
block_on(pool.maintain(block_event(1)));
let header = pool.api.push_block(2, vec![tx0, tx1, tx2]);
let header_hash = header.hash();
block_on(pool.maintain(block_event(header)));
let event = ChainEvent::Finalized { hash: header_hash.clone() };
block_on(pool.maintain(event));
@@ -472,7 +468,7 @@ fn finalization() {
let header = pool.api.chain().read().block_by_number.get(&2).unwrap()[0].header().clone();
let event = ChainEvent::NewBlock {
id: BlockId::Hash(header.hash()),
hash: header.hash(),
is_new_best: true,
header: header.clone(),
tree_route: None,
@@ -524,7 +520,7 @@ fn fork_aware_finalization() {
assert_eq!(pool.status().ready, 1);
let event = ChainEvent::NewBlock {
id: BlockId::Number(2),
hash: header.hash(),
is_new_best: true,
header: header.clone(),
tree_route: None,
@@ -544,7 +540,7 @@ fn fork_aware_finalization() {
).expect("1. Imported");
assert_eq!(pool.status().ready, 1);
let event = ChainEvent::NewBlock {
id: BlockId::Hash(header.hash()),
hash: header.hash(),
is_new_best: true,
header: header.clone(),
tree_route: None,
@@ -563,7 +559,7 @@ fn fork_aware_finalization() {
let header = pool.api.push_block_with_parent(c2, vec![from_bob.clone()]);
let event = ChainEvent::NewBlock {
id: BlockId::Hash(header.hash()),
hash: header.hash(),
is_new_best: true,
header: header.clone(),
tree_route: None,
@@ -601,7 +597,7 @@ fn fork_aware_finalization() {
canon_watchers.push((w, header.hash()));
let event = ChainEvent::NewBlock {
id: BlockId::Hash(header.hash()),
hash: header.hash(),
is_new_best: true,
header: header.clone(),
tree_route: None,
@@ -620,7 +616,7 @@ fn fork_aware_finalization() {
let header = pool.api.push_block(5, vec![from_dave, from_bob]);
e1 = header.hash();
let event = ChainEvent::NewBlock {
id: BlockId::Hash(header.hash()),
hash: header.hash(),
is_new_best: true,
header: header.clone(),
tree_route: None,
@@ -702,7 +698,7 @@ fn resubmit_tx_of_fork_that_is_not_part_of_retracted() {
assert_eq!(pool.status().ready, 1);
let event = ChainEvent::NewBlock {
id: BlockId::Number(2),
hash: header.hash(),
is_new_best: true,
header: header.clone(),
tree_route: None,
@@ -720,7 +716,7 @@ fn resubmit_tx_of_fork_that_is_not_part_of_retracted() {
let header = pool.api.push_block(2, vec![tx1.clone()]);
assert_eq!(pool.status().ready, 1);
let event = ChainEvent::NewBlock {
id: BlockId::Hash(header.hash()),
hash: header.hash(),
is_new_best: false,
header: header.clone(),
tree_route: None,
@@ -773,7 +769,7 @@ fn resubmit_from_retracted_fork() {
assert_eq!(pool.status().ready, 1);
let event = ChainEvent::NewBlock {
id: BlockId::Number(2),
hash: header.hash(),
is_new_best: true,
header: header.clone(),
tree_route: None,
@@ -789,7 +785,7 @@ fn resubmit_from_retracted_fork() {
).expect("1. Imported");
let header = pool.api.push_block(3, vec![tx1.clone()]);
let event = ChainEvent::NewBlock {
id: BlockId::Hash(header.hash()),
hash: header.hash(),
is_new_best: true,
header: header.clone(),
tree_route: None,
@@ -805,7 +801,7 @@ fn resubmit_from_retracted_fork() {
).expect("1. Imported");
let header = pool.api.push_block(4, vec![tx2.clone()]);
let event = ChainEvent::NewBlock {
id: BlockId::Hash(header.hash()),
hash: header.hash(),
is_new_best: true,
header: header.clone(),
tree_route: None,
@@ -822,7 +818,7 @@ fn resubmit_from_retracted_fork() {
).expect("1. Imported");
let header = pool.api.push_block(2, vec![tx3.clone()]);
let event = ChainEvent::NewBlock {
id: BlockId::Hash(header.hash()),
hash: header.hash(),
is_new_best: false,
header: header.clone(),
tree_route: None,
@@ -839,7 +835,7 @@ fn resubmit_from_retracted_fork() {
).expect("1. Imported");
let header = pool.api.push_block_with_parent(d1.clone(), vec![tx4.clone()]);
let event = ChainEvent::NewBlock {
id: BlockId::Hash(header.hash()),
hash: header.hash(),
is_new_best: false,
header: header.clone(),
tree_route: None,
@@ -886,12 +882,12 @@ fn ready_set_should_not_resolve_before_block_update() {
#[test]
fn ready_set_should_resolve_after_block_update() {
let (pool, _guard, _notifier) = maintained_pool();
pool.api.push_block(1, vec![]);
let header = pool.api.push_block(1, vec![]);
let xt1 = uxt(Alice, 209);
block_on(pool.submit_one(&BlockId::number(1), SOURCE, xt1.clone())).expect("1. Imported");
block_on(pool.maintain(block_event(1)));
block_on(pool.maintain(block_event(header)));
assert!(pool.ready_at(1).now_or_never().is_some());
}
@@ -899,7 +895,7 @@ fn ready_set_should_resolve_after_block_update() {
#[test]
fn ready_set_should_eventually_resolve_when_block_update_arrives() {
let (pool, _guard, _notifier) = maintained_pool();
pool.api.push_block(1, vec![]);
let header = pool.api.push_block(1, vec![]);
let xt1 = uxt(Alice, 209);
@@ -913,7 +909,7 @@ fn ready_set_should_eventually_resolve_when_block_update_arrives() {
panic!("Ready set should not be ready before block update!");
}
block_on(pool.maintain(block_event(1)));
block_on(pool.maintain(block_event(header)));
match ready_set_future.poll_unpin(&mut context) {
Poll::Pending => {
@@ -949,7 +945,11 @@ fn should_not_accept_old_signatures() {
"c427eb672e8c441c86d31f1a81b22b43102058e9ce237cabe9897ea5099ffd426cd1c6a1f4f2869c3df57901d36bedcb295657adb3a4355add86ed234eb83108"
).expect("hex invalid")[..]).expect("signature construction failed");
let xt = Extrinsic::Transfer { transfer, signature: old_singature, exhaust_resources_when_not_first: false };
let xt = Extrinsic::Transfer {
transfer,
signature: old_singature,
exhaust_resources_when_not_first: false,
};
assert_matches::assert_matches!(
block_on(pool.submit_one(&BlockId::number(0), SOURCE, xt.clone())),
@@ -959,3 +959,31 @@ fn should_not_accept_old_signatures() {
"Should be invalid transaction with bad proof",
);
}
#[test]
fn import_notification_to_pool_maintain_works() {
let mut client = Arc::new(substrate_test_runtime_client::new());
let pool = Arc::new(
BasicPool::new_test(Arc::new(FullChainApi::new(client.clone()))).0
);
// Prepare the extrisic, push it to the pool and check that it was added.
let xt = uxt(Alice, 0);
block_on(pool.submit_one(&BlockId::number(0), SOURCE, xt.clone())).expect("1. Imported");
assert_eq!(pool.status().ready, 1);
let mut import_stream = block_on_stream(client.import_notification_stream());
// Build the block with the transaction included
let mut block_builder = client.new_block(Default::default()).unwrap();
block_builder.push(xt).unwrap();
let block = block_builder.build().unwrap().block;
client.import(BlockOrigin::Own, block).unwrap();
// Get the notification of the block import and maintain the pool with it,
// Now, the pool should not contain any transactions.
let evt = import_stream.next().expect("Importing a block leads to an event");
block_on(pool.maintain(evt.into()));
assert_eq!(pool.status().ready, 0);
}