Pass transaction source to validate_transaction (#5366)

* WiP

* Support source in the runtime API.

* Finish implementation in txpool.

* Fix warning.

* Fix tests.

* Apply suggestions from code review

Co-Authored-By: Kian Paimani <5588131+kianenigma@users.noreply.github.com>
Co-Authored-By: Nikolay Volf <nikvolf@gmail.com>

* Extra changes.

* Fix test and benches.

* fix test

* Fix test & benches again.

* Fix tests.

* Update bumpalo

* Fix doc test.

* Fix doctest.

* Fix doctest.

Co-authored-by: Kian Paimani <5588131+kianenigma@users.noreply.github.com>
Co-authored-by: Nikolay Volf <nikvolf@gmail.com>
This commit is contained in:
Tomasz Drwięga
2020-03-25 14:09:23 +01:00
committed by GitHub
parent 601ac11e52
commit 04ccb179e9
37 changed files with 414 additions and 163 deletions
@@ -20,7 +20,7 @@ use futures::executor::block_on;
use txpool::{self, Pool};
use sp_runtime::{
generic::BlockId,
transaction_validity::{ValidTransaction, InvalidTransaction},
transaction_validity::{ValidTransaction, InvalidTransaction, TransactionSource},
};
use substrate_test_runtime_client::{
runtime::{Block, Hash, Index, Header, Extrinsic, Transfer},
@@ -53,10 +53,12 @@ fn header(number: u64) -> Header {
}
}
const SOURCE: TransactionSource = TransactionSource::External;
#[test]
fn submission_should_work() {
let pool = pool();
block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 209))).unwrap();
block_on(pool.submit_one(&BlockId::number(0), SOURCE, uxt(Alice, 209))).unwrap();
let pending: Vec<_> = pool.validated_pool().ready().map(|a| a.data.transfer().nonce).collect();
assert_eq!(pending, vec![209]);
@@ -65,8 +67,8 @@ fn submission_should_work() {
#[test]
fn multiple_submission_should_work() {
let pool = pool();
block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 209))).unwrap();
block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 210))).unwrap();
block_on(pool.submit_one(&BlockId::number(0), SOURCE, uxt(Alice, 209))).unwrap();
block_on(pool.submit_one(&BlockId::number(0), SOURCE, uxt(Alice, 210))).unwrap();
let pending: Vec<_> = pool.validated_pool().ready().map(|a| a.data.transfer().nonce).collect();
assert_eq!(pending, vec![209, 210]);
@@ -75,7 +77,7 @@ fn multiple_submission_should_work() {
#[test]
fn early_nonce_should_be_culled() {
let pool = pool();
block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 208))).unwrap();
block_on(pool.submit_one(&BlockId::number(0), SOURCE, uxt(Alice, 208))).unwrap();
let pending: Vec<_> = pool.validated_pool().ready().map(|a| a.data.transfer().nonce).collect();
assert_eq!(pending, Vec::<Index>::new());
@@ -85,11 +87,11 @@ fn early_nonce_should_be_culled() {
fn late_nonce_should_be_queued() {
let pool = pool();
block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 210))).unwrap();
block_on(pool.submit_one(&BlockId::number(0), SOURCE, uxt(Alice, 210))).unwrap();
let pending: Vec<_> = pool.validated_pool().ready().map(|a| a.data.transfer().nonce).collect();
assert_eq!(pending, Vec::<Index>::new());
block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 209))).unwrap();
block_on(pool.submit_one(&BlockId::number(0), SOURCE, uxt(Alice, 209))).unwrap();
let pending: Vec<_> = pool.validated_pool().ready().map(|a| a.data.transfer().nonce).collect();
assert_eq!(pending, vec![209, 210]);
}
@@ -97,8 +99,8 @@ fn late_nonce_should_be_queued() {
#[test]
fn prune_tags_should_work() {
let pool = pool();
let hash209 = block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 209))).unwrap();
block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 210))).unwrap();
let hash209 = block_on(pool.submit_one(&BlockId::number(0), SOURCE, uxt(Alice, 209))).unwrap();
block_on(pool.submit_one(&BlockId::number(0), SOURCE, uxt(Alice, 210))).unwrap();
let pending: Vec<_> = pool.validated_pool().ready().map(|a| a.data.transfer().nonce).collect();
assert_eq!(pending, vec![209, 210]);
@@ -119,16 +121,16 @@ fn prune_tags_should_work() {
fn should_ban_invalid_transactions() {
let pool = pool();
let uxt = uxt(Alice, 209);
let hash = block_on(pool.submit_one(&BlockId::number(0), uxt.clone())).unwrap();
let hash = block_on(pool.submit_one(&BlockId::number(0), SOURCE, uxt.clone())).unwrap();
pool.validated_pool().remove_invalid(&[hash]);
block_on(pool.submit_one(&BlockId::number(0), uxt.clone())).unwrap_err();
block_on(pool.submit_one(&BlockId::number(0), SOURCE, uxt.clone())).unwrap_err();
// when
let pending: Vec<_> = pool.validated_pool().ready().map(|a| a.data.transfer().nonce).collect();
assert_eq!(pending, Vec::<Index>::new());
// then
block_on(pool.submit_one(&BlockId::number(0), uxt.clone())).unwrap_err();
block_on(pool.submit_one(&BlockId::number(0), SOURCE, uxt.clone())).unwrap_err();
}
#[test]
@@ -139,7 +141,7 @@ fn should_correctly_prune_transactions_providing_more_than_one_tag() {
}));
let pool = Pool::new(Default::default(), api.clone());
let xt = uxt(Alice, 209);
block_on(pool.submit_one(&BlockId::number(0), xt.clone())).expect("1. Imported");
block_on(pool.submit_one(&BlockId::number(0), SOURCE, xt.clone())).expect("1. Imported");
assert_eq!(pool.validated_pool().status().ready, 1);
// remove the transaction that just got imported.
@@ -152,7 +154,7 @@ fn should_correctly_prune_transactions_providing_more_than_one_tag() {
// so now let's insert another transaction that also provides the 155
api.increment_nonce(Alice.into());
let xt = uxt(Alice, 211);
block_on(pool.submit_one(&BlockId::number(2), xt.clone())).expect("2. Imported");
block_on(pool.submit_one(&BlockId::number(2), SOURCE, xt.clone())).expect("2. Imported");
assert_eq!(pool.validated_pool().status().ready, 1);
assert_eq!(pool.validated_pool().status().future, 1);
let pending: Vec<_> = pool.validated_pool().ready().map(|a| a.data.transfer().nonce).collect();
@@ -190,7 +192,7 @@ fn should_prune_old_during_maintenance() {
let (pool, _guard) = maintained_pool();
block_on(pool.submit_one(&BlockId::number(0), xt.clone())).expect("1. Imported");
block_on(pool.submit_one(&BlockId::number(0), SOURCE, xt.clone())).expect("1. Imported");
assert_eq!(pool.status().ready, 1);
pool.api.push_block(1, vec![xt.clone()]);
@@ -205,8 +207,8 @@ fn should_revalidate_during_maintenance() {
let xt2 = uxt(Alice, 210);
let (pool, _guard) = maintained_pool();
block_on(pool.submit_one(&BlockId::number(0), xt1.clone())).expect("1. Imported");
block_on(pool.submit_one(&BlockId::number(0), xt2.clone())).expect("2. Imported");
block_on(pool.submit_one(&BlockId::number(0), SOURCE, xt1.clone())).expect("1. Imported");
block_on(pool.submit_one(&BlockId::number(0), SOURCE, xt2.clone())).expect("2. Imported");
assert_eq!(pool.status().ready, 2);
assert_eq!(pool.api.validation_requests().len(), 2);
@@ -230,7 +232,7 @@ fn should_resubmit_from_retracted_during_maintenance() {
let (pool, _guard) = maintained_pool();
block_on(pool.submit_one(&BlockId::number(0), xt.clone())).expect("1. Imported");
block_on(pool.submit_one(&BlockId::number(0), SOURCE, xt.clone())).expect("1. Imported");
assert_eq!(pool.status().ready, 1);
pool.api.push_block(1, vec![]);
@@ -249,7 +251,7 @@ fn should_not_retain_invalid_hashes_from_retracted() {
let (pool, _guard) = maintained_pool();
block_on(pool.submit_one(&BlockId::number(0), xt.clone())).expect("1. Imported");
block_on(pool.submit_one(&BlockId::number(0), SOURCE, xt.clone())).expect("1. Imported");
assert_eq!(pool.status().ready, 1);
pool.api.push_block(1, vec![]);
@@ -275,7 +277,7 @@ fn should_revalidate_transaction_multiple_times() {
let (pool, _guard) = maintained_pool();
block_on(pool.submit_one(&BlockId::number(0), xt.clone())).expect("1. Imported");
block_on(pool.submit_one(&BlockId::number(0), SOURCE, xt.clone())).expect("1. Imported");
assert_eq!(pool.status().ready, 1);
pool.api.push_block(1, vec![xt.clone()]);
@@ -284,7 +286,7 @@ fn should_revalidate_transaction_multiple_times() {
block_on(pool.maintain(block_event(1)));
block_on(futures_timer::Delay::new(BACKGROUND_REVALIDATION_INTERVAL*2));
block_on(pool.submit_one(&BlockId::number(0), xt.clone())).expect("1. Imported");
block_on(pool.submit_one(&BlockId::number(0), SOURCE, xt.clone())).expect("1. Imported");
assert_eq!(pool.status().ready, 1);
pool.api.push_block(2, vec![]);
@@ -305,8 +307,8 @@ fn should_revalidate_across_many_blocks() {
let (pool, _guard) = maintained_pool();
block_on(pool.submit_one(&BlockId::number(1), xt1.clone())).expect("1. Imported");
block_on(pool.submit_one(&BlockId::number(1), xt2.clone())).expect("1. Imported");
block_on(pool.submit_one(&BlockId::number(1), SOURCE, xt1.clone())).expect("1. Imported");
block_on(pool.submit_one(&BlockId::number(1), SOURCE, xt2.clone())).expect("1. Imported");
assert_eq!(pool.status().ready, 2);
pool.api.push_block(1, vec![]);
@@ -314,7 +316,7 @@ fn should_revalidate_across_many_blocks() {
block_on(futures_timer::Delay::new(BACKGROUND_REVALIDATION_INTERVAL*2));
block_on(pool.submit_one(&BlockId::number(2), xt3.clone())).expect("1. Imported");
block_on(pool.submit_one(&BlockId::number(2), SOURCE, xt3.clone())).expect("1. Imported");
assert_eq!(pool.status().ready, 3);
pool.api.push_block(2, vec![xt1.clone()]);
@@ -337,15 +339,25 @@ fn should_push_watchers_during_maintaince() {
let (pool, _guard) = maintained_pool();
let tx0 = alice_uxt(0);
let watcher0 = block_on(pool.submit_and_watch(&BlockId::Number(0), tx0.clone())).unwrap();
let watcher0 = block_on(
pool.submit_and_watch(&BlockId::Number(0), SOURCE, tx0.clone())
).unwrap();
let tx1 = alice_uxt(1);
let watcher1 = block_on(pool.submit_and_watch(&BlockId::Number(0), tx1.clone())).unwrap();
let watcher1 = block_on(
pool.submit_and_watch(&BlockId::Number(0), SOURCE, tx1.clone())
).unwrap();
let tx2 = alice_uxt(2);
let watcher2 = block_on(pool.submit_and_watch(&BlockId::Number(0), tx2.clone())).unwrap();
let watcher2 = block_on(
pool.submit_and_watch(&BlockId::Number(0), SOURCE, tx2.clone())
).unwrap();
let tx3 = alice_uxt(3);
let watcher3 = block_on(pool.submit_and_watch(&BlockId::Number(0), tx3.clone())).unwrap();
let watcher3 = block_on(
pool.submit_and_watch(&BlockId::Number(0), SOURCE, tx3.clone())
).unwrap();
let tx4 = alice_uxt(4);
let watcher4 = block_on(pool.submit_and_watch(&BlockId::Number(0), tx4.clone())).unwrap();
let watcher4 = block_on(
pool.submit_and_watch(&BlockId::Number(0), SOURCE, tx4.clone())
).unwrap();
assert_eq!(pool.status().ready, 5);
// when
@@ -398,10 +410,10 @@ fn should_push_watchers_during_maintaince() {
#[test]
fn can_track_heap_size() {
let (pool, _guard) = maintained_pool();
block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 209))).expect("1. Imported");
block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 210))).expect("1. Imported");
block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 211))).expect("1. Imported");
block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 212))).expect("1. Imported");
block_on(pool.submit_one(&BlockId::number(0), SOURCE, uxt(Alice, 209))).expect("1. Imported");
block_on(pool.submit_one(&BlockId::number(0), SOURCE, uxt(Alice, 210))).expect("1. Imported");
block_on(pool.submit_one(&BlockId::number(0), SOURCE, uxt(Alice, 211))).expect("1. Imported");
block_on(pool.submit_one(&BlockId::number(0), SOURCE, uxt(Alice, 212))).expect("1. Imported");
assert!(parity_util_mem::malloc_size(&pool) > 3000);
}
@@ -412,7 +424,9 @@ fn finalization() {
let api = TestApi::with_alice_nonce(209);
api.push_block(1, vec![]);
let (pool, _background) = BasicPool::new(Default::default(), api.into());
let watcher = block_on(pool.submit_and_watch(&BlockId::number(1), xt.clone())).expect("1. Imported");
let watcher = block_on(
pool.submit_and_watch(&BlockId::number(1), SOURCE, xt.clone())
).expect("1. Imported");
pool.api.push_block(2, vec![xt.clone()]);
let header = pool.api.chain().read().header_by_number.get(&2).cloned().unwrap();
@@ -462,7 +476,9 @@ fn fork_aware_finalization() {
// block B1
{
let watcher = block_on(pool.submit_and_watch(&BlockId::number(1), from_alice.clone())).expect("1. Imported");
let watcher = block_on(
pool.submit_and_watch(&BlockId::number(1), SOURCE, from_alice.clone())
).expect("1. Imported");
let header = pool.api.push_block(2, vec![from_alice.clone()]);
canon_watchers.push((watcher, header.hash()));
assert_eq!(pool.status().ready, 1);
@@ -483,8 +499,9 @@ fn fork_aware_finalization() {
// block C2
{
let header = pool.api.push_fork_block_with_parent(b1, vec![from_dave.clone()]);
from_dave_watcher = block_on(pool.submit_and_watch(&BlockId::number(1), from_dave.clone()))
.expect("1. Imported");
from_dave_watcher = block_on(
pool.submit_and_watch(&BlockId::number(1), SOURCE, from_dave.clone())
).expect("1. Imported");
assert_eq!(pool.status().ready, 1);
let event = ChainEvent::NewBlock {
id: BlockId::Hash(header.hash()),
@@ -499,7 +516,9 @@ fn fork_aware_finalization() {
// block D2
{
from_bob_watcher = block_on(pool.submit_and_watch(&BlockId::number(1), from_bob.clone())).expect("1. Imported");
from_bob_watcher = block_on(
pool.submit_and_watch(&BlockId::number(1), SOURCE, from_bob.clone())
).expect("1. Imported");
assert_eq!(pool.status().ready, 1);
let header = pool.api.push_fork_block_with_parent(c2, vec![from_bob.clone()]);
@@ -516,7 +535,9 @@ fn fork_aware_finalization() {
// block C1
{
let watcher = block_on(pool.submit_and_watch(&BlockId::number(1), from_charlie.clone())).expect("1.Imported");
let watcher = block_on(
pool.submit_and_watch(&BlockId::number(1), SOURCE, from_charlie.clone())
).expect("1.Imported");
assert_eq!(pool.status().ready, 1);
let header = pool.api.push_block(3, vec![from_charlie.clone()]);
@@ -536,7 +557,9 @@ fn fork_aware_finalization() {
// block D1
{
let xt = uxt(Eve, 0);
let w = block_on(pool.submit_and_watch(&BlockId::number(1), xt.clone())).expect("1. Imported");
let w = block_on(
pool.submit_and_watch(&BlockId::number(1), SOURCE, xt.clone())
).expect("1. Imported");
assert_eq!(pool.status().ready, 3);
let header = pool.api.push_block(4, vec![xt.clone()]);
canon_watchers.push((w, header.hash()));
@@ -608,7 +631,7 @@ fn fork_aware_finalization() {
fn ready_set_should_not_resolve_before_block_update() {
let (pool, _guard) = maintained_pool();
let xt1 = uxt(Alice, 209);
block_on(pool.submit_one(&BlockId::number(1), xt1.clone())).expect("1. Imported");
block_on(pool.submit_one(&BlockId::number(1), SOURCE, xt1.clone())).expect("1. Imported");
assert!(pool.ready_at(1).now_or_never().is_none());
}
@@ -620,7 +643,7 @@ fn ready_set_should_resolve_after_block_update() {
let xt1 = uxt(Alice, 209);
block_on(pool.submit_one(&BlockId::number(1), xt1.clone())).expect("1. Imported");
block_on(pool.submit_one(&BlockId::number(1), SOURCE, xt1.clone())).expect("1. Imported");
block_on(pool.maintain(block_event(1)));
assert!(pool.ready_at(1).now_or_never().is_some());
@@ -633,7 +656,7 @@ fn ready_set_should_eventually_resolve_when_block_update_arrives() {
let xt1 = uxt(Alice, 209);
block_on(pool.submit_one(&BlockId::number(1), xt1.clone())).expect("1. Imported");
block_on(pool.submit_one(&BlockId::number(1), SOURCE, xt1.clone())).expect("1. Imported");
let noop_waker = futures::task::noop_waker();
let mut context = futures::task::Context::from_waker(&noop_waker);
@@ -681,7 +704,7 @@ fn should_not_accept_old_signatures() {
let xt = Extrinsic::Transfer { transfer, signature: old_singature, exhaust_resources_when_not_first: false };
assert_matches::assert_matches!(
block_on(pool.submit_one(&BlockId::number(0), xt.clone())),
block_on(pool.submit_one(&BlockId::number(0), SOURCE, xt.clone())),
Err(error::Error::Pool(
sp_transaction_pool::error::Error::InvalidTransaction(InvalidTransaction::BadProof)
)),