Recover transaction pool on light client (#3833)

* recover tx pool on light client

* revert local tests fix

* removed import renamings

* futures03::Future -> std::future::Future

* Update core/transaction-pool/graph/src/error.rs

Co-Authored-By: Tomasz Drwięga <tomusdrw@users.noreply.github.com>

* replace remove_from_ready with remove_invalid

* avoid excess hashing

* debug -> warn

* TransactionPool + BasicTransactionPool

* pause future tx reject when resubmitting

* bump impl_version to make CI happy

* and revert back local test fixes

* alter doc to restart CI

* Transaction::clone() -> Transaction::duplicate()

* transactions -> updated_tranasctions

* remove explicit consensus-common ref

* ::std:: -> std::

* manual set/unset flag -> calling clusore with given flag value

* removed comments

* removed force argument

* BestIterator -> Box<Iterator>

* separate crate for TxPool + Maintainer trait

* long line fix

* pos-merge fix

* fix benches compilation

* Rename txpoolapi to txpool_api

* Clean up.

* Finalize merge.

* post-merge fix

* Move transaction pool api to primitives directly.

* Consistent naming for txpool-runtime-api

* Warn about missing docs.

* Move  abstraction for offchain calls to tx-pool-api.

* Merge RPC instantiation.

* Update cargo.lock

* Post merge fixes.

* Avoid depending on client.

* Fix build
This commit is contained in:
Svyatoslav Nikolsky
2019-11-28 03:00:54 +03:00
committed by Gavin Wood
parent 3e26fceda4
commit a782021ee8
64 changed files with 2370 additions and 667 deletions
@@ -34,8 +34,8 @@ use sr_primitives::transaction_validity::{
TransactionLongevity as Longevity,
TransactionPriority as Priority,
};
use txpool_api::{error, PoolStatus, InPoolTransaction};
use crate::error;
use crate::future::{FutureTransactions, WaitingTransaction};
use crate::ready::ReadyTransactions;
@@ -104,13 +104,65 @@ pub struct Transaction<Hash, Extrinsic> {
pub propagate: bool,
}
impl<Hash, Extrinsic> Transaction<Hash, Extrinsic> {
/// Returns `true` if the transaction should be propagated to other peers.
pub fn is_propagateable(&self) -> bool {
impl<Hash, Extrinsic> AsRef<Extrinsic> for Transaction<Hash, Extrinsic> {
fn as_ref(&self) -> &Extrinsic {
&self.data
}
}
impl<Hash, Extrinsic> InPoolTransaction for Transaction<Hash, Extrinsic> {
type Transaction = Extrinsic;
type Hash = Hash;
fn data(&self) -> &Extrinsic {
&self.data
}
fn hash(&self) -> &Hash {
&self.hash
}
fn priority(&self) -> &Priority {
&self.priority
}
fn longevity(&self) ->&Longevity {
&self.valid_till
}
fn requires(&self) -> &[Tag] {
&self.requires
}
fn provides(&self) -> &[Tag] {
&self.provides
}
fn is_propagateable(&self) -> bool {
self.propagate
}
}
impl<Hash: Clone, Extrinsic: Clone> Transaction<Hash, Extrinsic> {
/// Explicit transaction clone.
///
/// Transaction should be cloned only if absolutely necessary && we want
/// every reason to be commented. That's why we `Transaction` is not `Clone`,
/// but there's explicit `duplicate` method.
pub fn duplicate(&self) -> Self {
Transaction {
data: self.data.clone(),
bytes: self.bytes.clone(),
hash: self.hash.clone(),
priority: self.priority.clone(),
valid_till: self.valid_till.clone(),
requires: self.requires.clone(),
provides: self.provides.clone(),
propagate: self.propagate,
}
}
}
impl<Hash, Extrinsic> fmt::Debug for Transaction<Hash, Extrinsic> where
Hash: fmt::Debug,
Extrinsic: fmt::Debug,
@@ -159,6 +211,7 @@ const RECENTLY_PRUNED_TAGS: usize = 2;
/// required tags.
#[derive(Debug)]
pub struct BasePool<Hash: hash::Hash + Eq, Ex> {
reject_future_transactions: bool,
future: FutureTransactions<Hash, Ex>,
ready: ReadyTransactions<Hash, Ex>,
/// Store recently pruned tags (for last two invocations).
@@ -169,18 +222,37 @@ pub struct BasePool<Hash: hash::Hash + Eq, Ex> {
recently_pruned_index: usize,
}
impl<Hash: hash::Hash + Eq, Ex> Default for BasePool<Hash, Ex> {
impl<Hash: hash::Hash + Member + Serialize, Ex: std::fmt::Debug> Default for BasePool<Hash, Ex> {
fn default() -> Self {
Self::new(false)
}
}
impl<Hash: hash::Hash + Member + Serialize, Ex: std::fmt::Debug> BasePool<Hash, Ex> {
/// Create new pool given reject_future_transactions flag.
pub fn new(reject_future_transactions: bool) -> Self {
BasePool {
reject_future_transactions,
future: Default::default(),
ready: Default::default(),
recently_pruned: Default::default(),
recently_pruned_index: 0,
}
}
}
impl<Hash: hash::Hash + Member + Serialize, Ex: ::std::fmt::Debug> BasePool<Hash, Ex> {
/// Temporary enables future transactions, runs closure and then restores
/// `reject_future_transactions` flag back to previous value.
///
/// The closure accepts the mutable reference to the pool and original value
/// of the `reject_future_transactions` flag.
pub(crate) fn with_futures_enabled<T>(&mut self, closure: impl FnOnce(&mut Self, bool) -> T) -> T {
let previous = self.reject_future_transactions;
self.reject_future_transactions = false;
let return_value = closure(self, previous);
self.reject_future_transactions = previous;
return_value
}
/// Imports transaction to the pool.
///
/// The pool consists of two parts: Future and Ready.
@@ -206,6 +278,10 @@ impl<Hash: hash::Hash + Member + Serialize, Ex: ::std::fmt::Debug> BasePool<Hash
// If all tags are not satisfied import to future.
if !tx.is_ready() {
if self.reject_future_transactions {
return Err(error::Error::RejectedFutureTransaction);
}
let hash = tx.transaction.hash.clone();
self.future.import(tx);
return Ok(Imported::Future { hash });
@@ -370,6 +446,11 @@ impl<Hash: hash::Hash + Member + Serialize, Ex: ::std::fmt::Debug> BasePool<Hash
removed
}
/// Removes and returns all transactions from the future queue.
pub fn clear_future(&mut self) -> Vec<Arc<Transaction<Hash, Ex>>> {
self.future.clear()
}
/// Prunes transactions that provide given list of tags.
///
/// This will cause all transactions that provide these tags to be removed from the pool,
@@ -385,7 +466,7 @@ impl<Hash: hash::Hash + Member + Serialize, Ex: ::std::fmt::Debug> BasePool<Hash
for tag in tags {
// make sure to promote any future transactions that could be unlocked
to_import.append(&mut self.future.satisfy_tags(::std::iter::once(&tag)));
to_import.append(&mut self.future.satisfy_tags(std::iter::once(&tag)));
// and actually prune transactions in ready queue
pruned.append(&mut self.ready.prune_tags(tag.clone()));
// store the tags for next submission
@@ -413,8 +494,8 @@ impl<Hash: hash::Hash + Member + Serialize, Ex: ::std::fmt::Debug> BasePool<Hash
}
/// Get pool status.
pub fn status(&self) -> Status {
Status {
pub fn status(&self) -> PoolStatus {
PoolStatus {
ready: self.ready.len(),
ready_bytes: self.ready.bytes(),
future: self.future.len(),
@@ -423,26 +504,6 @@ impl<Hash: hash::Hash + Member + Serialize, Ex: ::std::fmt::Debug> BasePool<Hash
}
}
/// Pool status
#[derive(Debug)]
pub struct Status {
/// Number of transactions in the ready queue.
pub ready: usize,
/// Sum of bytes of ready transaction encodings.
pub ready_bytes: usize,
/// Number of transactions in the future queue.
pub future: usize,
/// Sum of bytes of ready transaction encodings.
pub future_bytes: usize,
}
impl Status {
/// Returns true if the are no transactions in the pool.
pub fn is_empty(&self) -> bool {
self.ready == 0 && self.future == 0
}
}
/// Queue limits
#[derive(Debug, Clone)]
pub struct Limit {
@@ -972,4 +1033,85 @@ requires: [03,02], provides: [04], data: [4]}".to_owned()
propagate: false,
}.is_propagateable(), false);
}
#[test]
fn should_reject_future_transactions() {
// given
let mut pool = pool();
// when
pool.reject_future_transactions = true;
// then
let err = pool.import(Transaction {
data: vec![5u8],
bytes: 1,
hash: 5,
priority: 5u64,
valid_till: 64u64,
requires: vec![vec![0]],
provides: vec![],
propagate: true,
});
if let Err(error::Error::RejectedFutureTransaction) = err {
} else {
assert!(false, "Invalid error kind: {:?}", err);
}
}
#[test]
fn should_clear_future_queue() {
// given
let mut pool = pool();
// when
pool.import(Transaction {
data: vec![5u8],
bytes: 1,
hash: 5,
priority: 5u64,
valid_till: 64u64,
requires: vec![vec![0]],
provides: vec![],
propagate: true,
}).unwrap();
// then
assert_eq!(pool.future.len(), 1);
// and then when
assert_eq!(pool.clear_future().len(), 1);
// then
assert_eq!(pool.future.len(), 0);
}
#[test]
fn should_accept_future_transactions_when_explcitly_asked_to() {
// given
let mut pool = pool();
pool.reject_future_transactions = true;
// when
let flag_value = pool.with_futures_enabled(|pool, flag| {
pool.import(Transaction {
data: vec![5u8],
bytes: 1,
hash: 5,
priority: 5u64,
valid_till: 64u64,
requires: vec![vec![0]],
provides: vec![],
propagate: true,
}).unwrap();
flag
});
// then
assert_eq!(flag_value, true);
assert_eq!(pool.reject_future_transactions, true);
assert_eq!(pool.future.len(), 1);
}
}
@@ -227,6 +227,12 @@ impl<Hash: hash::Hash + Eq + Clone, Ex> FutureTransactions<Hash, Ex> {
self.waiting.values().map(|waiting| &*waiting.transaction)
}
/// Removes and returns all future transactions.
pub fn clear(&mut self) -> Vec<Arc<Transaction<Hash, Ex>>> {
self.wanted_tags.clear();
self.waiting.drain().map(|(_, tx)| tx.transaction).collect()
}
/// Returns number of transactions in the Future queue.
pub fn len(&self) -> usize {
self.waiting.len()
@@ -32,11 +32,9 @@ mod rotator;
mod validated_pool;
pub mod base_pool;
pub mod error;
pub mod watcher;
pub use self::error::IntoPoolError;
pub use self::base_pool::{Transaction, Status};
pub use self::base_pool::Transaction;
pub use self::pool::{
Pool,
Options, ChainApi, EventStream, ExtrinsicFor,
@@ -92,7 +92,7 @@ impl<H: hash::Hash + traits::Member + Serialize, H2: Clone + fmt::Debug> Listene
/// Transaction was removed as invalid.
pub fn invalid(&mut self, tx: &H) {
warn!(target: "transaction-pool", "Extrinsic invalid: {:?}", tx);
warn!(target: "txpool", "Extrinsic invalid: {:?}", tx);
self.fire(tx, |watcher| watcher.invalid());
}
@@ -21,7 +21,6 @@ use std::{
};
use crate::base_pool as base;
use crate::error;
use crate::watcher::Watcher;
use serde::Serialize;
@@ -35,6 +34,8 @@ use sr_primitives::{
traits::{self, SaturatedConversion},
transaction_validity::{TransactionValidity, TransactionTag as Tag, TransactionValidityError},
};
use txpool_api::{error, PoolStatus};
use crate::validated_pool::{ValidatedPool, ValidatedTransaction};
/// Modification notification event stream type;
@@ -92,6 +93,8 @@ pub struct Options {
pub ready: base::Limit,
/// Future queue limits.
pub future: base::Limit,
/// Reject future transactions.
pub reject_future_transactions: bool,
}
impl Default for Options {
@@ -105,6 +108,7 @@ impl Default for Options {
count: 128,
total_bytes: 1 * 1024 * 1024,
},
reject_future_transactions: false,
}
}
}
@@ -131,7 +135,9 @@ impl<B: ChainApi> Pool<B> {
let validated_pool = self.validated_pool.clone();
self.verify(at, xts, force)
.map(move |validated_transactions| validated_transactions
.map(|validated_transactions| validated_pool.submit(validated_transactions)))
.map(|validated_transactions| validated_pool.submit(validated_transactions
.into_iter()
.map(|(_, tx)| tx))))
}
/// Imports one unverified extrinsic to the pool
@@ -161,10 +167,40 @@ impl<B: ChainApi> Pool<B> {
let validated_pool = self.validated_pool.clone();
Either::Right(
self.verify_one(at, block_number, xt, false)
.map(move |validated_transactions| validated_pool.submit_and_watch(validated_transactions))
.map(move |validated_transactions| validated_pool.submit_and_watch(validated_transactions.1))
)
}
/// Revalidate all ready transactions.
///
/// Returns future that performs validation of all ready transactions and
/// then resubmits all transactions back to the pool.
pub fn revalidate_ready(&self, at: &BlockId<B::Block>) -> impl Future<Output=Result<(), B::Error>> {
let validated_pool = self.validated_pool.clone();
let ready = self.validated_pool.ready().map(|tx| tx.data.clone());
self.verify(at, ready, false)
.map(move |revalidated_transactions| revalidated_transactions.map(
move |revalidated_transactions| validated_pool.resubmit(revalidated_transactions)
))
}
/// Prunes known ready transactions.
///
/// Used to clear the pool from transactions that were part of recently imported block.
/// The main difference from the `prune` is that we do not revalidate any transactions
/// and ignore unknown passed hashes.
pub fn prune_known(&self, at: &BlockId<B::Block>, hashes: &[ExHash<B>]) -> Result<(), B::Error> {
// Get details of all extrinsics that are already in the pool
let in_pool_tags = self.validated_pool.extrinsics_tags(hashes)
.into_iter().filter_map(|x| x).flat_map(|x| x);
// Prune all transactions that provide given tags
let prune_status = self.validated_pool.prune_tags(in_pool_tags)?;
let pruned_transactions = hashes.into_iter().cloned()
.chain(prune_status.pruned.iter().map(|tx| tx.hash.clone()));
self.validated_pool.fire_pruned(at, pruned_transactions)
}
/// Prunes ready transactions.
///
/// Used to clear the pool from transactions that were part of recently imported block.
@@ -184,7 +220,8 @@ impl<B: ChainApi> Pool<B> {
extrinsics.len()
);
// Get details of all extrinsics that are already in the pool
let (in_pool_hashes, in_pool_tags) = self.validated_pool.extrinsics_tags(extrinsics);
let in_pool_hashes = extrinsics.iter().map(|extrinsic| self.hash_of(extrinsic)).collect::<Vec<_>>();
let in_pool_tags = self.validated_pool.extrinsics_tags(&in_pool_hashes);
// Zip the ones from the pool with the full list (we get pairs `(Extrinsic, Option<Vec<Tag>>)`)
let all = extrinsics.iter().zip(in_pool_tags.into_iter());
@@ -274,7 +311,7 @@ impl<B: ChainApi> Pool<B> {
&at,
known_imported_hashes,
pruned_hashes,
reverified_transactions,
reverified_transactions.into_iter().map(|(_, xt)| xt).collect(),
))
)))
}
@@ -303,7 +340,7 @@ impl<B: ChainApi> Pool<B> {
}
/// Returns pool status.
pub fn status(&self) -> base::Status {
pub fn status(&self) -> PoolStatus {
self.validated_pool.status()
}
@@ -325,7 +362,7 @@ impl<B: ChainApi> Pool<B> {
at: &BlockId<B::Block>,
xts: impl IntoIterator<Item=ExtrinsicFor<B>>,
force: bool,
) -> impl Future<Output=Result<Vec<ValidatedTransactionFor<B>>, B::Error>> {
) -> impl Future<Output=Result<HashMap<ExHash<B>, ValidatedTransactionFor<B>>, B::Error>> {
// we need a block number to compute tx validity
let block_number = match self.resolve_block_number(at) {
Ok(block_number) => block_number,
@@ -338,7 +375,7 @@ impl<B: ChainApi> Pool<B> {
);
// make single validation future that waits all until all extrinsics are validated
Either::Right(join_all(validation_futures).then(|x| ready(Ok(x))))
Either::Right(join_all(validation_futures).then(|x| ready(Ok(x.into_iter().collect()))))
}
/// Returns future that validates single transaction at given block.
@@ -348,14 +385,17 @@ impl<B: ChainApi> Pool<B> {
block_number: NumberFor<B>,
xt: ExtrinsicFor<B>,
force: bool,
) -> impl Future<Output=ValidatedTransactionFor<B>> {
) -> impl Future<Output=(ExHash<B>, ValidatedTransactionFor<B>)> {
let (hash, bytes) = self.validated_pool.api().hash_and_length(&xt);
if !force && self.validated_pool.is_banned(&hash) {
return Either::Left(ready(ValidatedTransaction::Invalid(hash, error::Error::TemporarilyBanned.into())))
return Either::Left(ready((
hash.clone(),
ValidatedTransaction::Invalid(hash, error::Error::TemporarilyBanned.into()),
)))
}
Either::Right(self.validated_pool.api().validate_transaction(block_id, xt.clone())
.then(move |validation_result| ready(match validation_result {
.then(move |validation_result| ready((hash.clone(), match validation_result {
Ok(validity) => match validity {
Ok(validity) => if validity.provides.is_empty() {
ValidatedTransaction::Invalid(hash, error::Error::NoTagsProvided.into())
@@ -379,7 +419,7 @@ impl<B: ChainApi> Pool<B> {
ValidatedTransaction::Unknown(hash, error::Error::UnknownTransaction(e).into()),
},
Err(e) => ValidatedTransaction::Invalid(hash, e),
})))
}))))
}
}
@@ -391,50 +431,30 @@ impl<B: ChainApi> Clone for Pool<B> {
}
}
impl<A: ChainApi> sr_primitives::offchain::TransactionPool<A::Block> for Pool<A> {
fn submit_at(
&self,
at: &BlockId<A::Block>,
extrinsic: <A::Block as sr_primitives::traits::Block>::Extrinsic,
) -> Result<(), ()> {
log::debug!(
target: "txpool",
"(offchain call) Submitting a transaction to the pool: {:?}",
extrinsic
);
let result = futures::executor::block_on(self.submit_one(&at, extrinsic));
result.map(|_| ())
.map_err(|e| log::warn!(
target: "txpool",
"(offchain call) Error submitting a transaction to the pool: {:?}",
e
))
}
}
#[cfg(test)]
mod tests {
use std::{
collections::HashMap,
collections::{HashMap, HashSet},
time::Instant,
};
use parking_lot::Mutex;
use futures::executor::block_on;
use super::*;
use txpool_api::TransactionStatus;
use sr_primitives::transaction_validity::{ValidTransaction, InvalidTransaction};
use codec::Encode;
use test_runtime::{Block, Extrinsic, Transfer, H256, AccountId};
use assert_matches::assert_matches;
use crate::base_pool::Limit;
use crate::watcher;
const INVALID_NONCE: u64 = 254;
#[derive(Clone, Debug, Default)]
struct TestApi {
delay: Arc<Mutex<Option<std::sync::mpsc::Receiver<()>>>>,
invalidate: Arc<Mutex<HashSet<u64>>>,
clear_requirements: Arc<Mutex<HashSet<u64>>>,
add_requirements: Arc<Mutex<HashSet<u64>>>,
}
impl ChainApi for TestApi {
@@ -449,6 +469,7 @@ mod tests {
at: &BlockId<Self::Block>,
uxt: ExtrinsicFor<Self>,
) -> Self::ValidationFuture {
let hash = self.hash_and_length(&uxt).0;
let block_number = self.block_id_to_number(at).unwrap().unwrap();
let nonce = uxt.transfer().nonce;
@@ -462,16 +483,30 @@ mod tests {
}
}
if self.invalidate.lock().contains(&hash) {
return futures::future::ready(Ok(InvalidTransaction::Custom(0).into()));
}
futures::future::ready(if nonce < block_number {
Ok(InvalidTransaction::Stale.into())
} else {
Ok(Ok(ValidTransaction {
let mut transaction = ValidTransaction {
priority: 4,
requires: if nonce > block_number { vec![vec![nonce as u8 - 1]] } else { vec![] },
provides: if nonce == INVALID_NONCE { vec![] } else { vec![vec![nonce as u8]] },
longevity: 3,
propagate: true,
}))
};
if self.clear_requirements.lock().contains(&hash) {
transaction.requires.clear();
}
if self.add_requirements.lock().contains(&hash) {
transaction.requires.push(vec![128]);
}
Ok(Ok(transaction))
})
}
@@ -651,6 +686,7 @@ mod tests {
let pool = Pool::new(Options {
ready: limit.clone(),
future: limit.clone(),
..Default::default()
}, TestApi::default());
let hash1 = block_on(pool.submit_one(&BlockId::Number(0), uxt(Transfer {
@@ -685,6 +721,7 @@ mod tests {
let pool = Pool::new(Options {
ready: limit.clone(),
future: limit.clone(),
..Default::default()
}, TestApi::default());
// when
@@ -742,8 +779,8 @@ mod tests {
// then
let mut stream = futures::executor::block_on_stream(watcher.into_stream());
assert_eq!(stream.next(), Some(watcher::Status::Ready));
assert_eq!(stream.next(), Some(watcher::Status::Finalized(H256::from_low_u64_be(2).into())));
assert_eq!(stream.next(), Some(TransactionStatus::Ready));
assert_eq!(stream.next(), Some(TransactionStatus::Finalized(H256::from_low_u64_be(2).into())));
assert_eq!(stream.next(), None);
}
@@ -767,8 +804,8 @@ mod tests {
// then
let mut stream = futures::executor::block_on_stream(watcher.into_stream());
assert_eq!(stream.next(), Some(watcher::Status::Ready));
assert_eq!(stream.next(), Some(watcher::Status::Finalized(H256::from_low_u64_be(2).into())));
assert_eq!(stream.next(), Some(TransactionStatus::Ready));
assert_eq!(stream.next(), Some(TransactionStatus::Finalized(H256::from_low_u64_be(2).into())));
assert_eq!(stream.next(), None);
}
@@ -796,8 +833,8 @@ mod tests {
// then
let mut stream = futures::executor::block_on_stream(watcher.into_stream());
assert_eq!(stream.next(), Some(watcher::Status::Future));
assert_eq!(stream.next(), Some(watcher::Status::Ready));
assert_eq!(stream.next(), Some(TransactionStatus::Future));
assert_eq!(stream.next(), Some(TransactionStatus::Ready));
}
#[test]
@@ -819,8 +856,8 @@ mod tests {
// then
let mut stream = futures::executor::block_on_stream(watcher.into_stream());
assert_eq!(stream.next(), Some(watcher::Status::Ready));
assert_eq!(stream.next(), Some(watcher::Status::Invalid));
assert_eq!(stream.next(), Some(TransactionStatus::Ready));
assert_eq!(stream.next(), Some(TransactionStatus::Invalid));
assert_eq!(stream.next(), None);
}
@@ -846,8 +883,8 @@ mod tests {
// then
let mut stream = futures::executor::block_on_stream(watcher.into_stream());
assert_eq!(stream.next(), Some(watcher::Status::Ready));
assert_eq!(stream.next(), Some(watcher::Status::Broadcast(peers)));
assert_eq!(stream.next(), Some(TransactionStatus::Ready));
assert_eq!(stream.next(), Some(TransactionStatus::Broadcast(peers)));
}
#[test]
@@ -860,6 +897,7 @@ mod tests {
let pool = Pool::new(Options {
ready: limit.clone(),
future: limit.clone(),
..Default::default()
}, TestApi::default());
let xt = uxt(Transfer {
@@ -883,8 +921,8 @@ mod tests {
// then
let mut stream = futures::executor::block_on_stream(watcher.into_stream());
assert_eq!(stream.next(), Some(watcher::Status::Ready));
assert_eq!(stream.next(), Some(watcher::Status::Dropped));
assert_eq!(stream.next(), Some(TransactionStatus::Ready));
assert_eq!(stream.next(), Some(TransactionStatus::Dropped));
}
#[test]
@@ -941,5 +979,81 @@ mod tests {
assert_eq!(pool.status().future, 0);
}
}
#[test]
fn should_revalidate_ready_transactions() {
fn transfer(nonce: u64) -> Extrinsic {
uxt(Transfer {
from: AccountId::from_h256(H256::from_low_u64_be(1)),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 5,
nonce,
})
}
// given
let pool = pool();
let tx0 = transfer(0);
let hash0 = pool.validated_pool.api().hash_and_length(&tx0).0;
let watcher0 = block_on(pool.submit_and_watch(&BlockId::Number(0), tx0)).unwrap();
let tx1 = transfer(1);
let hash1 = pool.validated_pool.api().hash_and_length(&tx1).0;
let watcher1 = block_on(pool.submit_and_watch(&BlockId::Number(0), tx1)).unwrap();
let tx2 = transfer(2);
let hash2 = pool.validated_pool.api().hash_and_length(&tx2).0;
let watcher2 = block_on(pool.submit_and_watch(&BlockId::Number(0), tx2)).unwrap();
let tx3 = transfer(3);
let hash3 = pool.validated_pool.api().hash_and_length(&tx3).0;
let watcher3 = block_on(pool.submit_and_watch(&BlockId::Number(0), tx3)).unwrap();
let tx4 = transfer(4);
let hash4 = pool.validated_pool.api().hash_and_length(&tx4).0;
let watcher4 = block_on(pool.submit_and_watch(&BlockId::Number(0), tx4)).unwrap();
assert_eq!(pool.status().ready, 5);
// when
pool.validated_pool.api().invalidate.lock().insert(hash3);
pool.validated_pool.api().clear_requirements.lock().insert(hash1);
pool.validated_pool.api().add_requirements.lock().insert(hash0);
block_on(pool.revalidate_ready(&BlockId::Number(0))).unwrap();
// then
// hash0 now has unsatisfied requirements => it is moved to the future queue
// hash1 is now independent of hash0 => it is in ready queue
// hash2 still depends on hash1 => it is in ready queue
// hash3 is now invalid => it is removed from the pool
// hash4 now depends on invalidated hash3 => it is moved to the future queue
//
// events for hash3 are: Ready, Invalid
// events for hash4 are: Ready, Invalid
assert_eq!(pool.status().ready, 2);
assert_eq!(
futures::executor::block_on_stream(watcher3.into_stream()).collect::<Vec<_>>(),
vec![TransactionStatus::Ready, TransactionStatus::Invalid],
);
// when
pool.validated_pool.remove_invalid(&[hash0, hash1, hash2, hash4]);
// then
// events for hash0 are: Ready, Future, Invalid
// events for hash1 are: Ready, Invalid
// events for hash2 are: Ready, Invalid
assert_eq!(
futures::executor::block_on_stream(watcher0.into_stream()).collect::<Vec<_>>(),
vec![TransactionStatus::Ready, TransactionStatus::Future, TransactionStatus::Invalid],
);
assert_eq!(
futures::executor::block_on_stream(watcher1.into_stream()).collect::<Vec<_>>(),
vec![TransactionStatus::Ready, TransactionStatus::Invalid],
);
assert_eq!(
futures::executor::block_on_stream(watcher2.into_stream()).collect::<Vec<_>>(),
vec![TransactionStatus::Ready, TransactionStatus::Invalid],
);
assert_eq!(
futures::executor::block_on_stream(watcher4.into_stream()).collect::<Vec<_>>(),
vec![TransactionStatus::Ready, TransactionStatus::Future, TransactionStatus::Invalid],
);
}
}
@@ -28,8 +28,8 @@ use sr_primitives::traits::Member;
use sr_primitives::transaction_validity::{
TransactionTag as Tag,
};
use txpool_api::error;
use crate::error;
use crate::future::WaitingTransaction;
use crate::base_pool::Transaction;
@@ -433,6 +433,7 @@ impl<Hash: hash::Hash + Member + Serialize, Ex> ReadyTransactions<Hash, Ex> {
}
}
/// Iterator of ready transactions ordered by priority.
pub struct BestIterator<Hash, Ex> {
all: Arc<RwLock<HashMap<Hash, ReadyTx<Hash, Ex>>>>,
awaiting: HashMap<Hash, (usize, TransactionRef<Hash, Ex>)>,
@@ -18,16 +18,16 @@ use std::{
collections::{HashSet, HashMap},
fmt,
hash,
sync::Arc,
time,
};
use crate::base_pool as base;
use crate::error;
use crate::listener::Listener;
use crate::rotator::PoolRotator;
use crate::watcher::Watcher;
use serde::Serialize;
use log::debug;
use log::{debug, warn};
use futures::channel::mpsc;
use parking_lot::{Mutex, RwLock};
@@ -36,6 +36,7 @@ use sr_primitives::{
traits::{self, SaturatedConversion},
transaction_validity::TransactionTag as Tag,
};
use txpool_api::{error, PoolStatus};
use crate::base_pool::PruneStatus;
use crate::pool::{EventStream, Options, ChainApi, BlockHash, ExHash, ExtrinsicFor, TransactionFor};
@@ -76,11 +77,12 @@ pub(crate) struct ValidatedPool<B: ChainApi> {
impl<B: ChainApi> ValidatedPool<B> {
/// Create a new transaction pool.
pub fn new(options: Options, api: B) -> Self {
let base_pool = base::BasePool::new(options.reject_future_transactions);
ValidatedPool {
api,
options,
listener: Default::default(),
pool: Default::default(),
pool: RwLock::new(base_pool),
import_notification_sinks: Default::default(),
rotator: Default::default(),
}
@@ -189,18 +191,134 @@ impl<B: ChainApi> ValidatedPool<B> {
}
}
/// Resubmits revalidated transactions back to the pool.
///
/// Removes and then submits passed transactions and all dependent transactions.
/// Transactions that are missing from the pool are not submitted.
pub fn resubmit(&self, mut updated_transactions: HashMap<ExHash<B>, ValidatedTransactionFor<B>>) {
#[derive(Debug, Clone, Copy, PartialEq)]
enum Status { Future, Ready, Failed, Dropped };
let (mut initial_statuses, final_statuses) = {
let mut pool = self.pool.write();
// remove all passed transactions from the ready/future queues
// (this may remove additional transactions as well)
//
// for every transaction that has an entry in the `updated_transactions`,
// we store updated validation result in txs_to_resubmit
// for every transaction that has no entry in the `updated_transactions`,
// we store last validation result (i.e. the pool entry) in txs_to_resubmit
let mut initial_statuses = HashMap::new();
let mut txs_to_resubmit = Vec::with_capacity(updated_transactions.len());
while !updated_transactions.is_empty() {
let hash = updated_transactions.keys().next().cloned().expect("transactions is not empty; qed");
// note we are not considering tx with hash invalid here - we just want
// to remove it along with dependent transactions and `remove_invalid()`
// does exactly what we need
let removed = pool.remove_invalid(&[hash.clone()]);
for removed_tx in removed {
let removed_hash = removed_tx.hash.clone();
let updated_transaction = updated_transactions.remove(&removed_hash);
let tx_to_resubmit = if let Some(updated_tx) = updated_transaction {
updated_tx
} else {
// in most cases we'll end up in successful `try_unwrap`, but if not
// we still need to reinsert transaction back to the pool => duplicate call
let transaction = match Arc::try_unwrap(removed_tx) {
Ok(transaction) => transaction,
Err(transaction) => transaction.duplicate(),
};
ValidatedTransaction::Valid(transaction)
};
initial_statuses.insert(removed_hash.clone(), Status::Ready);
txs_to_resubmit.push((removed_hash, tx_to_resubmit));
}
}
// if we're rejecting future transactions, then insertion order matters here:
// if tx1 depends on tx2, then if tx1 is inserted before tx2, then it goes
// to the future queue and gets rejected immediately
// => let's temporary stop rejection and clear future queue before return
pool.with_futures_enabled(|pool, reject_future_transactions| {
// now resubmit all removed transactions back to the pool
let mut final_statuses = HashMap::new();
for (hash, tx_to_resubmit) in txs_to_resubmit {
match tx_to_resubmit {
ValidatedTransaction::Valid(tx) => match pool.import(tx) {
Ok(imported) => match imported {
base::Imported::Ready { promoted, failed, removed, .. } => {
final_statuses.insert(hash, Status::Ready);
for hash in promoted {
final_statuses.insert(hash, Status::Ready);
}
for hash in failed {
final_statuses.insert(hash, Status::Failed);
}
for tx in removed {
final_statuses.insert(tx.hash.clone(), Status::Dropped);
}
},
base::Imported::Future { .. } => {
final_statuses.insert(hash, Status::Future);
},
},
Err(err) => {
// we do not want to fail if single transaction import has failed
// nor we do want to propagate this error, because it could tx unknown to caller
// => let's just notify listeners (and issue debug message)
warn!(
target: "txpool",
"[{:?}] Removing invalid transaction from update: {}",
hash,
err,
);
final_statuses.insert(hash, Status::Failed);
},
},
ValidatedTransaction::Invalid(_, _) | ValidatedTransaction::Unknown(_, _) => {
final_statuses.insert(hash, Status::Failed);
},
}
}
// if the pool is configured to reject future transactions, let's clear the future
// queue, updating final statuses as required
if reject_future_transactions {
for future_tx in pool.clear_future() {
final_statuses.insert(future_tx.hash.clone(), Status::Dropped);
}
}
(initial_statuses, final_statuses)
})
};
// and now let's notify listeners about status changes
let mut listener = self.listener.write();
for (hash, final_status) in final_statuses {
let initial_status = initial_statuses.remove(&hash);
if initial_status.is_none() || Some(final_status) != initial_status {
match final_status {
Status::Future => listener.future(&hash),
Status::Ready => listener.ready(&hash, None),
Status::Failed => listener.invalid(&hash),
Status::Dropped => listener.dropped(&hash, None),
}
}
}
}
/// For each extrinsic, returns tags that it provides (if known), or None (if it is unknown).
pub fn extrinsics_tags(&self, extrinsics: &[ExtrinsicFor<B>]) -> (Vec<ExHash<B>>, Vec<Option<Vec<Tag>>>) {
let hashes = extrinsics.iter().map(|extrinsic| self.api.hash_and_length(extrinsic).0).collect::<Vec<_>>();
let in_pool = self.pool.read().by_hash(&hashes);
(
hashes,
in_pool.into_iter()
.map(|existing_in_pool| existing_in_pool
.map(|transaction| transaction.provides.iter().cloned()
.collect()))
.collect(),
)
pub fn extrinsics_tags(&self, hashes: &[ExHash<B>]) -> Vec<Option<Vec<Tag>>> {
self.pool.read().by_hash(&hashes)
.into_iter()
.map(|existing_in_pool| existing_in_pool
.map(|transaction| transaction.provides.iter().cloned()
.collect()))
.collect()
}
/// Prunes ready transactions that provide given list of tags.
@@ -249,20 +367,29 @@ impl<B: ChainApi> ValidatedPool<B> {
// Fire `pruned` notifications for collected hashes and make sure to include
// `known_imported_hashes` since they were just imported as part of the block.
let hashes = hashes.chain(known_imported_hashes.into_iter());
{
let header_hash = self.api.block_id_to_hash(at)?
.ok_or_else(|| error::Error::InvalidBlockId(format!("{:?}", at)).into())?;
let mut listener = self.listener.write();
for h in hashes {
listener.pruned(header_hash, &h);
}
}
self.fire_pruned(at, hashes)?;
// perform regular cleanup of old transactions in the pool
// and update temporary bans.
self.clear_stale(at)?;
Ok(())
}
/// Fire notifications for pruned transactions.
pub fn fire_pruned(
&self,
at: &BlockId<B::Block>,
hashes: impl Iterator<Item=ExHash<B>>,
) -> Result<(), B::Error> {
let header_hash = self.api.block_id_to_hash(at)?
.ok_or_else(|| error::Error::InvalidBlockId(format!("{:?}", at)).into())?;
let mut listener = self.listener.write();
for h in hashes {
listener.pruned(header_hash, &h);
}
Ok(())
}
/// Removes stale transactions from the pool.
///
/// Stale transactions are transaction beyond their longevity period.
@@ -270,8 +397,8 @@ impl<B: ChainApi> ValidatedPool<B> {
/// See `prune_tags` if you want this.
pub fn clear_stale(&self, at: &BlockId<B::Block>) -> Result<(), B::Error> {
let block_number = self.api.block_id_to_number(at)?
.ok_or_else(|| error::Error::InvalidBlockId(format!("{:?}", at)).into())?
.saturated_into::<u64>();
.ok_or_else(|| error::Error::InvalidBlockId(format!("{:?}", at)).into())?
.saturated_into::<u64>();
let now = time::Instant::now();
let to_remove = {
self.ready()
@@ -346,7 +473,7 @@ impl<B: ChainApi> ValidatedPool<B> {
}
/// Returns pool status.
pub fn status(&self) -> base::Status {
pub fn status(&self) -> PoolStatus {
self.pool.read().status()
}
}
@@ -20,34 +20,14 @@ use futures::{
Stream,
channel::mpsc,
};
use serde::{Serialize, Deserialize};
/// Possible extrinsic status events
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum Status<H, H2> {
/// Extrinsic is part of the future queue.
Future,
/// Extrinsic is part of the ready queue.
Ready,
/// Extrinsic has been finalized in block with given hash.
Finalized(H2),
/// Some state change (perhaps another extrinsic was included) rendered this extrinsic invalid.
Usurped(H),
/// The extrinsic has been broadcast to the given peers.
Broadcast(Vec<String>),
/// Extrinsic has been dropped from the pool because of the limit.
Dropped,
/// Extrinsic was detected as invalid.
Invalid,
}
use txpool_api::TransactionStatus;
/// Extrinsic watcher.
///
/// Represents a stream of status updates for particular extrinsic.
#[derive(Debug)]
pub struct Watcher<H, H2> {
receiver: mpsc::UnboundedReceiver<Status<H, H2>>,
receiver: mpsc::UnboundedReceiver<TransactionStatus<H, H2>>,
hash: H,
}
@@ -60,7 +40,7 @@ impl<H, H2> Watcher<H, H2> {
/// Pipe the notifications to given sink.
///
/// Make sure to drive the future to completion.
pub fn into_stream(self) -> impl Stream<Item=Status<H, H2>> {
pub fn into_stream(self) -> impl Stream<Item=TransactionStatus<H, H2>> {
self.receiver
}
}
@@ -68,7 +48,7 @@ impl<H, H2> Watcher<H, H2> {
/// Sender part of the watcher. Exposed only for testing purposes.
#[derive(Debug)]
pub struct Sender<H, H2> {
receivers: Vec<mpsc::UnboundedSender<Status<H, H2>>>,
receivers: Vec<mpsc::UnboundedSender<TransactionStatus<H, H2>>>,
finalized: bool,
}
@@ -94,49 +74,48 @@ impl<H: Clone, H2: Clone> Sender<H, H2> {
/// Transaction became ready.
pub fn ready(&mut self) {
self.send(Status::Ready)
self.send(TransactionStatus::Ready)
}
/// Transaction was moved to future.
pub fn future(&mut self) {
self.send(Status::Future)
self.send(TransactionStatus::Future)
}
/// Some state change (perhaps another extrinsic was included) rendered this extrinsic invalid.
pub fn usurped(&mut self, hash: H) {
self.send(Status::Usurped(hash))
self.send(TransactionStatus::Usurped(hash))
}
/// Extrinsic has been finalized in block with given hash.
pub fn finalized(&mut self, hash: H2) {
self.send(Status::Finalized(hash));
self.send(TransactionStatus::Finalized(hash));
self.finalized = true;
}
/// Extrinsic has been marked as invalid by the block builder.
pub fn invalid(&mut self) {
self.send(Status::Invalid);
self.send(TransactionStatus::Invalid);
// we mark as finalized as there are no more notifications
self.finalized = true;
}
/// Transaction has been dropped from the pool because of the limit.
pub fn dropped(&mut self) {
self.send(Status::Dropped);
self.send(TransactionStatus::Dropped);
}
/// The extrinsic has been broadcast to the given peers.
pub fn broadcast(&mut self, peers: Vec<String>) {
self.send(Status::Broadcast(peers))
self.send(TransactionStatus::Broadcast(peers))
}
/// Returns true if the are no more listeners for this extrinsic or it was finalized.
pub fn is_done(&self) -> bool {
self.finalized || self.receivers.is_empty()
}
fn send(&mut self, status: Status<H, H2>) {
fn send(&mut self, status: TransactionStatus<H, H2>) {
self.receivers.retain(|sender| sender.unbounded_send(status.clone()).is_ok())
}
}