Revalidation queue for transaction pool (#4781)

* Revalidation queeue.

* add docs and license

* move test

* refactor worker to async/await

* address review

* fix warnings

* update Cargo.lock

* move background task to service

* use tomusdrw loop

* naming

* return From::from

* add doc comment

* add more doc comments

* fix merge bug

* add doc comment for test function

* Update client/transaction-pool/src/testing/pool.rs

Co-Authored-By: Tomasz Drwięga <tomusdrw@users.noreply.github.com>

* more review fixes

* refactor to allow service keep background tasks from isntantiated subsystems

* use const delay

* fix fallout

* remove fallout

* remove already moved test

* fix doc test

* add valid_at helper

Co-authored-by: Tomasz Drwięga <tomusdrw@users.noreply.github.com>
This commit is contained in:
Nikolay Volf
2020-02-17 16:48:24 +03:00
committed by GitHub
parent 590142b928
commit 86ab0cb4d9
18 changed files with 554 additions and 195 deletions
@@ -36,7 +36,8 @@ use sp_runtime::{
use sp_transaction_pool::error;
use wasm_timer::Instant;
use crate::validated_pool::{ValidatedPool, ValidatedTransaction};
use crate::validated_pool::ValidatedPool;
pub use crate::validated_pool::ValidatedTransaction;
/// Modification notification event stream type;
pub type EventStream<H> = mpsc::UnboundedReceiver<H>;
@@ -182,39 +183,19 @@ impl<B: ChainApi> Pool<B> {
self.validated_pool.submit_and_watch(tx)
}
/// Revalidate all ready transactions.
///
/// Returns future that performs validation of all ready transactions and
/// then resubmits all transactions back to the pool.
pub async fn revalidate_ready(
/// Resubmit some transaction that were validated elsewhere.
pub fn resubmit(
&self,
at: &BlockId<B::Block>,
max: Option<usize>,
) -> Result<(), B::Error> {
log::debug!(target: "txpool",
"Fetching ready transactions (up to: {})",
max.map(|x| format!("{}", x)).unwrap_or_else(|| "all".into())
);
let validated_pool = self.validated_pool.clone();
let ready = self.validated_pool.ready()
.map(|tx| tx.data.clone())
.take(max.unwrap_or_else(usize::max_value));
let now = Instant::now();
let revalidated_transactions = self.verify(at, ready, false).await?;
log::debug!(target: "txpool",
"Re-verified transactions, took {} ms. Resubmitting.",
now.elapsed().as_millis()
);
revalidated_transactions: HashMap<ExHash<B>, ValidatedTransactionFor<B>>,
) {
let now = Instant::now();
self.validated_pool.resubmit(revalidated_transactions);
log::debug!(target: "txpool",
"Resubmitted. Took {} ms. Status: {:?}",
now.elapsed().as_millis(),
validated_pool.status()
self.validated_pool.status()
);
Ok(())
}
/// Prunes known ready transactions.
@@ -402,21 +383,15 @@ impl<B: ChainApi> Pool<B> {
if validity.provides.is_empty() {
ValidatedTransaction::Invalid(hash.clone(), error::Error::NoTagsProvided.into())
} else {
ValidatedTransaction::Valid(base::Transaction {
data: xt,
ValidatedTransaction::valid_at(
block_number.saturated_into::<u64>(),
hash.clone(),
xt,
bytes,
hash: hash.clone(),
priority: validity.priority,
requires: validity.requires,
provides: validity.provides,
propagate: validity.propagate,
valid_till: block_number
.saturated_into::<u64>()
.saturating_add(validity.longevity),
})
validity,
)
}
},
Err(TransactionValidityError::Invalid(e)) =>
ValidatedTransaction::Invalid(hash.clone(), error::Error::InvalidTransaction(e).into()),
Err(TransactionValidityError::Unknown(e)) =>
@@ -988,80 +963,4 @@ mod tests {
assert_eq!(pool.validated_pool().status().future, 0);
}
}
#[test]
fn should_revalidate_ready_transactions() {
fn transfer(nonce: u64) -> Extrinsic {
uxt(Transfer {
from: AccountId::from_h256(H256::from_low_u64_be(1)),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 5,
nonce,
})
}
// given
let pool = pool();
let tx0 = transfer(0);
let hash0 = pool.validated_pool.api().hash_and_length(&tx0).0;
let watcher0 = block_on(pool.submit_and_watch(&BlockId::Number(0), tx0)).unwrap();
let tx1 = transfer(1);
let hash1 = pool.validated_pool.api().hash_and_length(&tx1).0;
let watcher1 = block_on(pool.submit_and_watch(&BlockId::Number(0), tx1)).unwrap();
let tx2 = transfer(2);
let hash2 = pool.validated_pool.api().hash_and_length(&tx2).0;
let watcher2 = block_on(pool.submit_and_watch(&BlockId::Number(0), tx2)).unwrap();
let tx3 = transfer(3);
let hash3 = pool.validated_pool.api().hash_and_length(&tx3).0;
let watcher3 = block_on(pool.submit_and_watch(&BlockId::Number(0), tx3)).unwrap();
let tx4 = transfer(4);
let hash4 = pool.validated_pool.api().hash_and_length(&tx4).0;
let watcher4 = block_on(pool.submit_and_watch(&BlockId::Number(0), tx4)).unwrap();
assert_eq!(pool.validated_pool().status().ready, 5);
// when
pool.validated_pool.api().invalidate.lock().insert(hash3);
pool.validated_pool.api().clear_requirements.lock().insert(hash1);
pool.validated_pool.api().add_requirements.lock().insert(hash0);
block_on(pool.revalidate_ready(&BlockId::Number(0), None)).unwrap();
// then
// hash0 now has unsatisfied requirements => it is moved to the future queue
// hash1 is now independent of hash0 => it is in ready queue
// hash2 still depends on hash1 => it is in ready queue
// hash3 is now invalid => it is removed from the pool
// hash4 now depends on invalidated hash3 => it is moved to the future queue
//
// events for hash3 are: Ready, Invalid
// events for hash4 are: Ready, Invalid
assert_eq!(pool.validated_pool().status().ready, 2);
assert_eq!(
futures::executor::block_on_stream(watcher3.into_stream()).collect::<Vec<_>>(),
vec![TransactionStatus::Ready, TransactionStatus::Invalid],
);
// when
pool.validated_pool.remove_invalid(&[hash0, hash1, hash2, hash4]);
// then
// events for hash0 are: Ready, Future, Invalid
// events for hash1 are: Ready, Invalid
// events for hash2 are: Ready, Invalid
assert_eq!(
futures::executor::block_on_stream(watcher0.into_stream()).collect::<Vec<_>>(),
vec![TransactionStatus::Ready, TransactionStatus::Future, TransactionStatus::Invalid],
);
assert_eq!(
futures::executor::block_on_stream(watcher1.into_stream()).collect::<Vec<_>>(),
vec![TransactionStatus::Ready, TransactionStatus::Invalid],
);
assert_eq!(
futures::executor::block_on_stream(watcher2.into_stream()).collect::<Vec<_>>(),
vec![TransactionStatus::Ready, TransactionStatus::Invalid],
);
assert_eq!(
futures::executor::block_on_stream(watcher4.into_stream()).collect::<Vec<_>>(),
vec![TransactionStatus::Ready, TransactionStatus::Future, TransactionStatus::Invalid],
);
}
}