Make transaction pool prune transactions only of canonical blocks (#6123)

* Make tx pool aware of retracted fork blocks

* Make it compile

* Update client/transaction-pool/src/lib.rs

Co-authored-by: Nikolay Volf <nikvolf@gmail.com>

* Fix doc test

* Simplify the implementation

* Send tree route as arc to prevent heavy clones

* Switch to use `ExtrinsicHash` to make it more clear

* Fix benchmark

Co-authored-by: Nikolay Volf <nikvolf@gmail.com>
This commit is contained in:
Bastian Köcher
2020-06-05 23:12:00 +02:00
committed by GitHub
parent 8a8b4f99c3
commit d2846e2b9a
31 changed files with 808 additions and 359 deletions
@@ -51,7 +51,6 @@ fn to_tag(nonce: u64, from: AccountId) -> Tag {
impl ChainApi for TestApi {
type Block = Block;
type Hash = H256;
type Error = sp_transaction_pool::error::Error;
type ValidationFuture = Ready<sp_transaction_pool::error::Result<TransactionValidity>>;
type BodyFuture = Ready<sp_transaction_pool::error::Result<Option<Vec<Extrinsic>>>>;
@@ -107,7 +106,7 @@ impl ChainApi for TestApi {
})
}
fn hash_and_length(&self, uxt: &ExtrinsicFor<Self>) -> (Self::Hash, usize) {
fn hash_and_length(&self, uxt: &ExtrinsicFor<Self>) -> (H256, usize) {
let encoded = uxt.encode();
(blake2_256(&encoded).into(), encoded.len())
}
@@ -38,8 +38,6 @@ pub mod watcher;
pub use self::base_pool::Transaction;
pub use self::pool::{
Pool,
Options, ChainApi, EventStream, ExtrinsicFor,
BlockHash, ExHash, NumberFor, TransactionFor,
ValidatedTransaction,
Pool, Options, ChainApi, EventStream, ExtrinsicFor, ExtrinsicHash,
BlockHash, NumberFor, TransactionFor, ValidatedTransaction,
};
@@ -22,14 +22,14 @@ use std::{
};
use linked_hash_map::LinkedHashMap;
use serde::Serialize;
use crate::{watcher, ChainApi, BlockHash};
use crate::{watcher, ChainApi, ExtrinsicHash, BlockHash};
use log::{debug, trace, warn};
use sp_runtime::traits;
/// Extrinsic pool default listener.
pub struct Listener<H: hash::Hash + Eq + Debug, C: ChainApi> {
watchers: HashMap<H, watcher::Sender<H, BlockHash<C>>>,
finality_watchers: LinkedHashMap<BlockHash<C>, Vec<H>>,
pub struct Listener<H: hash::Hash + Eq, C: ChainApi> {
watchers: HashMap<H, watcher::Sender<H, ExtrinsicHash<C>>>,
finality_watchers: LinkedHashMap<ExtrinsicHash<C>, Vec<H>>,
}
/// Maximum number of blocks awaiting finality at any time.
@@ -45,7 +45,7 @@ impl<H: hash::Hash + Eq + Debug, C: ChainApi> Default for Listener<H, C> {
}
impl<H: hash::Hash + traits::Member + Serialize, C: ChainApi> Listener<H, C> {
fn fire<F>(&mut self, hash: &H, fun: F) where F: FnOnce(&mut watcher::Sender<H, BlockHash<C>>) {
fn fire<F>(&mut self, hash: &H, fun: F) where F: FnOnce(&mut watcher::Sender<H, ExtrinsicHash<C>>) {
let clean = if let Some(h) = self.watchers.get_mut(hash) {
fun(h);
h.is_done()
@@ -61,7 +61,7 @@ impl<H: hash::Hash + traits::Member + Serialize, C: ChainApi> Listener<H, C> {
/// Creates a new watcher for given verified extrinsic.
///
/// The watcher can be used to subscribe to life-cycle events of that extrinsic.
pub fn create_watcher(&mut self, hash: H) -> watcher::Watcher<H, BlockHash<C>> {
pub fn create_watcher(&mut self, hash: H) -> watcher::Watcher<H, ExtrinsicHash<C>> {
let sender = self.watchers.entry(hash.clone()).or_insert_with(watcher::Sender::default);
sender.new_watcher(hash)
}
@@ -17,19 +17,16 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use std::{
hash,
collections::HashMap,
sync::Arc,
};
use crate::base_pool as base;
use crate::watcher::Watcher;
use serde::Serialize;
use crate::{base_pool as base, watcher::Watcher};
use futures::{Future, FutureExt};
use sp_runtime::{
generic::BlockId,
traits::{self, SaturatedConversion},
traits::{self, SaturatedConversion, Block as BlockT},
transaction_validity::{
TransactionValidity, TransactionTag as Tag, TransactionValidityError, TransactionSource,
},
@@ -44,19 +41,19 @@ pub use crate::validated_pool::ValidatedTransaction;
/// Modification notification event stream type;
pub type EventStream<H> = TracingUnboundedReceiver<H>;
/// Extrinsic hash type for a pool.
pub type ExHash<A> = <A as ChainApi>::Hash;
/// Block hash type for a pool.
pub type BlockHash<A> = <<A as ChainApi>::Block as traits::Block>::Hash;
/// Extrinsic hash type for a pool.
pub type ExtrinsicHash<A> = <<A as ChainApi>::Block as traits::Block>::Hash;
/// Extrinsic type for a pool.
pub type ExtrinsicFor<A> = <<A as ChainApi>::Block as traits::Block>::Extrinsic;
/// Block number type for the ChainApi
pub type NumberFor<A> = traits::NumberFor<<A as ChainApi>::Block>;
/// A type of transaction stored in the pool
pub type TransactionFor<A> = Arc<base::Transaction<ExHash<A>, ExtrinsicFor<A>>>;
pub type TransactionFor<A> = Arc<base::Transaction<ExtrinsicHash<A>, ExtrinsicFor<A>>>;
/// A type of validated transaction stored in the pool.
pub type ValidatedTransactionFor<A> = ValidatedTransaction<
ExHash<A>,
ExtrinsicHash<A>,
ExtrinsicFor<A>,
<A as ChainApi>::Error,
>;
@@ -64,15 +61,15 @@ pub type ValidatedTransactionFor<A> = ValidatedTransaction<
/// Concrete extrinsic validation and query logic.
pub trait ChainApi: Send + Sync {
/// Block type.
type Block: traits::Block;
/// Transaction Hash type
type Hash: hash::Hash + Eq + traits::Member + Serialize;
type Block: BlockT;
/// Error type.
type Error: From<error::Error> + error::IntoPoolError;
/// Validate transaction future.
type ValidationFuture: Future<Output=Result<TransactionValidity, Self::Error>> + Send + Unpin;
/// Body future (since block body might be remote)
type BodyFuture: Future<Output = Result<Option<Vec<<Self::Block as traits::Block>::Extrinsic>>, Self::Error>> + Unpin + Send + 'static;
type BodyFuture: Future<
Output = Result<Option<Vec<<Self::Block as traits::Block>::Extrinsic>>, Self::Error>
> + Unpin + Send + 'static;
/// Verify extrinsic at given block.
fn validate_transaction(
@@ -83,13 +80,19 @@ pub trait ChainApi: Send + Sync {
) -> Self::ValidationFuture;
/// Returns a block number given the block id.
fn block_id_to_number(&self, at: &BlockId<Self::Block>) -> Result<Option<NumberFor<Self>>, Self::Error>;
fn block_id_to_number(
&self,
at: &BlockId<Self::Block>,
) -> Result<Option<NumberFor<Self>>, Self::Error>;
/// Returns a block hash given the block id.
fn block_id_to_hash(&self, at: &BlockId<Self::Block>) -> Result<Option<BlockHash<Self>>, Self::Error>;
fn block_id_to_hash(
&self,
at: &BlockId<Self::Block>,
) -> Result<Option<<Self::Block as BlockT>::Hash>, Self::Error>;
/// Returns hash and encoding length of the extrinsic.
fn hash_and_length(&self, uxt: &ExtrinsicFor<Self>) -> (Self::Hash, usize);
fn hash_and_length(&self, uxt: &ExtrinsicFor<Self>) -> (ExtrinsicHash<Self>, usize);
/// Returns a block body given the block id.
fn block_body(&self, at: &BlockId<Self::Block>) -> Self::BodyFuture;
@@ -130,7 +133,6 @@ pub struct Pool<B: ChainApi> {
#[cfg(not(target_os = "unknown"))]
impl<B: ChainApi> parity_util_mem::MallocSizeOf for Pool<B>
where
B::Hash: parity_util_mem::MallocSizeOf,
ExtrinsicFor<B>: parity_util_mem::MallocSizeOf,
{
fn size_of(&self, ops: &mut parity_util_mem::MallocSizeOfOps) -> usize {
@@ -153,7 +155,7 @@ impl<B: ChainApi> Pool<B> {
source: TransactionSource,
xts: T,
force: bool,
) -> Result<Vec<Result<ExHash<B>, B::Error>>, B::Error> where
) -> Result<Vec<Result<ExtrinsicHash<B>, B::Error>>, B::Error> where
T: IntoIterator<Item=ExtrinsicFor<B>>,
{
let validated_pool = self.validated_pool.clone();
@@ -172,7 +174,7 @@ impl<B: ChainApi> Pool<B> {
at: &BlockId<B::Block>,
source: TransactionSource,
xt: ExtrinsicFor<B>,
) -> Result<ExHash<B>, B::Error> {
) -> Result<ExtrinsicHash<B>, B::Error> {
self.submit_at(at, source, std::iter::once(xt), false)
.map(|import_result| import_result.and_then(|mut import_result| import_result
.pop()
@@ -187,7 +189,7 @@ impl<B: ChainApi> Pool<B> {
at: &BlockId<B::Block>,
source: TransactionSource,
xt: ExtrinsicFor<B>,
) -> Result<Watcher<ExHash<B>, BlockHash<B>>, B::Error> {
) -> Result<Watcher<ExtrinsicHash<B>, ExtrinsicHash<B>>, B::Error> {
let block_number = self.resolve_block_number(at)?;
let (_, tx) = self.verify_one(
at, block_number, source, xt, false
@@ -198,7 +200,7 @@ impl<B: ChainApi> Pool<B> {
/// Resubmit some transaction that were validated elsewhere.
pub fn resubmit(
&self,
revalidated_transactions: HashMap<ExHash<B>, ValidatedTransactionFor<B>>,
revalidated_transactions: HashMap<ExtrinsicHash<B>, ValidatedTransactionFor<B>>,
) {
let now = Instant::now();
@@ -215,7 +217,11 @@ impl<B: ChainApi> Pool<B> {
/// Used to clear the pool from transactions that were part of recently imported block.
/// The main difference from the `prune` is that we do not revalidate any transactions
/// and ignore unknown passed hashes.
pub fn prune_known(&self, at: &BlockId<B::Block>, hashes: &[ExHash<B>]) -> Result<(), B::Error> {
pub fn prune_known(
&self,
at: &BlockId<B::Block>,
hashes: &[ExtrinsicHash<B>],
) -> Result<(), B::Error> {
// Get details of all extrinsics that are already in the pool
let in_pool_tags = self.validated_pool.extrinsics_tags(hashes)
.into_iter().filter_map(|x| x).flat_map(|x| x);
@@ -299,7 +305,7 @@ impl<B: ChainApi> Pool<B> {
&self,
at: &BlockId<B::Block>,
tags: impl IntoIterator<Item=Tag>,
known_imported_hashes: impl IntoIterator<Item=ExHash<B>> + Clone,
known_imported_hashes: impl IntoIterator<Item=ExtrinsicHash<B>> + Clone,
) -> Result<(), B::Error> {
log::debug!(target: "txpool", "Pruning at {:?}", at);
// Prune all transactions that provide given tags
@@ -336,7 +342,7 @@ impl<B: ChainApi> Pool<B> {
}
/// Returns transaction hash
pub fn hash_of(&self, xt: &ExtrinsicFor<B>) -> ExHash<B> {
pub fn hash_of(&self, xt: &ExtrinsicFor<B>) -> ExtrinsicHash<B> {
self.validated_pool.api().hash_and_length(xt).0
}
@@ -353,7 +359,7 @@ impl<B: ChainApi> Pool<B> {
at: &BlockId<B::Block>,
xts: impl IntoIterator<Item=(TransactionSource, ExtrinsicFor<B>)>,
force: bool,
) -> Result<HashMap<ExHash<B>, ValidatedTransactionFor<B>>, B::Error> {
) -> Result<HashMap<ExtrinsicHash<B>, ValidatedTransactionFor<B>>, B::Error> {
// we need a block number to compute tx validity
let block_number = self.resolve_block_number(at)?;
let mut result = HashMap::new();
@@ -379,7 +385,7 @@ impl<B: ChainApi> Pool<B> {
source: TransactionSource,
xt: ExtrinsicFor<B>,
force: bool,
) -> (ExHash<B>, ValidatedTransactionFor<B>) {
) -> (ExtrinsicHash<B>, ValidatedTransactionFor<B>) {
let (hash, bytes) = self.validated_pool.api().hash_and_length(&xt);
if !force && self.validated_pool.is_banned(&hash) {
return (
@@ -444,9 +450,12 @@ mod tests {
use futures::executor::block_on;
use super::*;
use sp_transaction_pool::TransactionStatus;
use sp_runtime::transaction_validity::{ValidTransaction, InvalidTransaction, TransactionSource};
use sp_runtime::{
traits::Hash,
transaction_validity::{ValidTransaction, InvalidTransaction, TransactionSource},
};
use codec::Encode;
use substrate_test_runtime::{Block, Extrinsic, Transfer, H256, AccountId};
use substrate_test_runtime::{Block, Extrinsic, Transfer, H256, AccountId, Hashing};
use assert_matches::assert_matches;
use wasm_timer::Instant;
use crate::base_pool::Limit;
@@ -457,14 +466,13 @@ mod tests {
#[derive(Clone, Debug, Default)]
struct TestApi {
delay: Arc<Mutex<Option<std::sync::mpsc::Receiver<()>>>>,
invalidate: Arc<Mutex<HashSet<u64>>>,
clear_requirements: Arc<Mutex<HashSet<u64>>>,
add_requirements: Arc<Mutex<HashSet<u64>>>,
invalidate: Arc<Mutex<HashSet<H256>>>,
clear_requirements: Arc<Mutex<HashSet<H256>>>,
add_requirements: Arc<Mutex<HashSet<H256>>>,
}
impl ChainApi for TestApi {
type Block = Block;
type Hash = u64;
type Error = error::Error;
type ValidationFuture = futures::future::Ready<error::Result<TransactionValidity>>;
type BodyFuture = futures::future::Ready<error::Result<Option<Vec<Extrinsic>>>>;
@@ -518,7 +526,10 @@ mod tests {
}
/// Returns a block number given the block id.
fn block_id_to_number(&self, at: &BlockId<Self::Block>) -> Result<Option<NumberFor<Self>>, Self::Error> {
fn block_id_to_number(
&self,
at: &BlockId<Self::Block>,
) -> Result<Option<NumberFor<Self>>, Self::Error> {
Ok(match at {
BlockId::Number(num) => Some(*num),
BlockId::Hash(_) => None,
@@ -526,7 +537,10 @@ mod tests {
}
/// Returns a block hash given the block id.
fn block_id_to_hash(&self, at: &BlockId<Self::Block>) -> Result<Option<BlockHash<Self>>, Self::Error> {
fn block_id_to_hash(
&self,
at: &BlockId<Self::Block>,
) -> Result<Option<<Self::Block as BlockT>::Hash>, Self::Error> {
Ok(match at {
BlockId::Number(num) => Some(H256::from_low_u64_be(*num)).into(),
BlockId::Hash(_) => None,
@@ -534,12 +548,10 @@ mod tests {
}
/// Hash the extrinsic.
fn hash_and_length(&self, uxt: &ExtrinsicFor<Self>) -> (Self::Hash, usize) {
let len = uxt.encode().len();
(
(H256::from(uxt.transfer().from.clone()).to_low_u64_be() << 5) + uxt.transfer().nonce,
len
)
fn hash_and_length(&self, uxt: &ExtrinsicFor<Self>) -> (BlockHash<Self>, usize) {
let encoded = uxt.encode();
let len = encoded.len();
(Hashing::hash(&encoded), len)
}
fn block_body(&self, _id: &BlockId<Self::Block>) -> Self::BodyFuture {
@@ -599,19 +611,19 @@ mod tests {
#[test]
fn should_notify_about_pool_events() {
let stream = {
let (stream, hash0, hash1) = {
// given
let pool = pool();
let stream = pool.validated_pool().import_notification_stream();
// when
let _hash = block_on(pool.submit_one(&BlockId::Number(0), SOURCE, uxt(Transfer {
let hash0 = block_on(pool.submit_one(&BlockId::Number(0), SOURCE, uxt(Transfer {
from: AccountId::from_h256(H256::from_low_u64_be(1)),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 5,
nonce: 0,
}))).unwrap();
let _hash = block_on(pool.submit_one(&BlockId::Number(0), SOURCE, uxt(Transfer {
let hash1 = block_on(pool.submit_one(&BlockId::Number(0), SOURCE, uxt(Transfer {
from: AccountId::from_h256(H256::from_low_u64_be(1)),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 5,
@@ -627,13 +639,14 @@ mod tests {
assert_eq!(pool.validated_pool().status().ready, 2);
assert_eq!(pool.validated_pool().status().future, 1);
stream
(stream, hash0, hash1)
};
// then
let mut it = futures::executor::block_on_stream(stream);
assert_eq!(it.next(), Some(32));
assert_eq!(it.next(), Some(33));
assert_eq!(it.next(), Some(hash0));
assert_eq!(it.next(), Some(hash1));
assert_eq!(it.next(), None);
}
@@ -795,7 +808,10 @@ mod tests {
// then
let mut stream = futures::executor::block_on_stream(watcher.into_stream());
assert_eq!(stream.next(), Some(TransactionStatus::Ready));
assert_eq!(stream.next(), Some(TransactionStatus::InBlock(H256::from_low_u64_be(2).into())));
assert_eq!(
stream.next(),
Some(TransactionStatus::InBlock(H256::from_low_u64_be(2).into())),
);
}
#[test]
@@ -812,14 +828,19 @@ mod tests {
assert_eq!(pool.validated_pool().status().future, 0);
// when
block_on(pool.prune_tags(&BlockId::Number(2), vec![vec![0u8]], vec![2u64])).unwrap();
block_on(
pool.prune_tags(&BlockId::Number(2), vec![vec![0u8]], vec![watcher.hash().clone()]),
).unwrap();
assert_eq!(pool.validated_pool().status().ready, 0);
assert_eq!(pool.validated_pool().status().future, 0);
// then
let mut stream = futures::executor::block_on_stream(watcher.into_stream());
assert_eq!(stream.next(), Some(TransactionStatus::Ready));
assert_eq!(stream.next(), Some(TransactionStatus::InBlock(H256::from_low_u64_be(2).into())));
assert_eq!(
stream.next(),
Some(TransactionStatus::InBlock(H256::from_low_u64_be(2).into())),
);
}
#[test]
@@ -22,7 +22,7 @@ use std::{
sync::Arc,
};
use crate::{base_pool as base, BlockHash};
use crate::base_pool as base;
use crate::listener::Listener;
use crate::rotator::PoolRotator;
use crate::watcher::Watcher;
@@ -39,7 +39,9 @@ use wasm_timer::Instant;
use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedSender};
use crate::base_pool::PruneStatus;
use crate::pool::{EventStream, Options, ChainApi, ExHash, ExtrinsicFor, TransactionFor};
use crate::pool::{
EventStream, Options, ChainApi, BlockHash, ExtrinsicHash, ExtrinsicFor, TransactionFor,
};
/// Pre-validated transaction. Validated pool only accepts transactions wrapped in this enum.
#[derive(Debug)]
@@ -82,7 +84,7 @@ impl<Hash, Ex, Error> ValidatedTransaction<Hash, Ex, Error> {
/// A type of validated transaction stored in the pool.
pub type ValidatedTransactionFor<B> = ValidatedTransaction<
ExHash<B>,
ExtrinsicHash<B>,
ExtrinsicFor<B>,
<B as ChainApi>::Error,
>;
@@ -91,19 +93,18 @@ pub type ValidatedTransactionFor<B> = ValidatedTransaction<
pub struct ValidatedPool<B: ChainApi> {
api: Arc<B>,
options: Options,
listener: RwLock<Listener<ExHash<B>, B>>,
listener: RwLock<Listener<ExtrinsicHash<B>, B>>,
pool: RwLock<base::BasePool<
ExHash<B>,
ExtrinsicHash<B>,
ExtrinsicFor<B>,
>>,
import_notification_sinks: Mutex<Vec<TracingUnboundedSender<ExHash<B>>>>,
rotator: PoolRotator<ExHash<B>>,
import_notification_sinks: Mutex<Vec<TracingUnboundedSender<ExtrinsicHash<B>>>>,
rotator: PoolRotator<ExtrinsicHash<B>>,
}
#[cfg(not(target_os = "unknown"))]
impl<B: ChainApi> parity_util_mem::MallocSizeOf for ValidatedPool<B>
where
B::Hash: parity_util_mem::MallocSizeOf,
ExtrinsicFor<B>: parity_util_mem::MallocSizeOf,
{
fn size_of(&self, ops: &mut parity_util_mem::MallocSizeOfOps) -> usize {
@@ -127,17 +128,17 @@ impl<B: ChainApi> ValidatedPool<B> {
}
/// Bans given set of hashes.
pub fn ban(&self, now: &Instant, hashes: impl IntoIterator<Item=ExHash<B>>) {
pub fn ban(&self, now: &Instant, hashes: impl IntoIterator<Item=ExtrinsicHash<B>>) {
self.rotator.ban(now, hashes)
}
/// Returns true if transaction with given hash is currently banned from the pool.
pub fn is_banned(&self, hash: &ExHash<B>) -> bool {
pub fn is_banned(&self, hash: &ExtrinsicHash<B>) -> bool {
self.rotator.is_banned(hash)
}
/// Imports a bunch of pre-validated transactions to the pool.
pub fn submit<T>(&self, txs: T) -> Vec<Result<ExHash<B>, B::Error>> where
pub fn submit<T>(&self, txs: T) -> Vec<Result<ExtrinsicHash<B>, B::Error>> where
T: IntoIterator<Item=ValidatedTransactionFor<B>>
{
let results = txs.into_iter()
@@ -158,7 +159,7 @@ impl<B: ChainApi> ValidatedPool<B> {
}
/// Submit single pre-validated transaction to the pool.
fn submit_one(&self, tx: ValidatedTransactionFor<B>) -> Result<ExHash<B>, B::Error> {
fn submit_one(&self, tx: ValidatedTransactionFor<B>) -> Result<ExtrinsicHash<B>, B::Error> {
match tx {
ValidatedTransaction::Valid(tx) => {
let imported = self.pool.write().import(tx)?;
@@ -183,7 +184,7 @@ impl<B: ChainApi> ValidatedPool<B> {
}
}
fn enforce_limits(&self) -> HashSet<ExHash<B>> {
fn enforce_limits(&self) -> HashSet<ExtrinsicHash<B>> {
let status = self.pool.read().status();
let ready_limit = &self.options.ready;
let future_limit = &self.options.future;
@@ -228,7 +229,7 @@ impl<B: ChainApi> ValidatedPool<B> {
pub fn submit_and_watch(
&self,
tx: ValidatedTransactionFor<B>,
) -> Result<Watcher<ExHash<B>, BlockHash<B>>, B::Error> {
) -> Result<Watcher<ExtrinsicHash<B>, ExtrinsicHash<B>>, B::Error> {
match tx {
ValidatedTransaction::Valid(tx) => {
let hash = self.api.hash_and_length(&tx.data).0;
@@ -250,7 +251,7 @@ impl<B: ChainApi> ValidatedPool<B> {
///
/// Removes and then submits passed transactions and all dependent transactions.
/// Transactions that are missing from the pool are not submitted.
pub fn resubmit(&self, mut updated_transactions: HashMap<ExHash<B>, ValidatedTransactionFor<B>>) {
pub fn resubmit(&self, mut updated_transactions: HashMap<ExtrinsicHash<B>, ValidatedTransactionFor<B>>) {
#[derive(Debug, Clone, Copy, PartialEq)]
enum Status { Future, Ready, Failed, Dropped };
@@ -369,7 +370,7 @@ impl<B: ChainApi> ValidatedPool<B> {
}
/// For each extrinsic, returns tags that it provides (if known), or None (if it is unknown).
pub fn extrinsics_tags(&self, hashes: &[ExHash<B>]) -> Vec<Option<Vec<Tag>>> {
pub fn extrinsics_tags(&self, hashes: &[ExtrinsicHash<B>]) -> Vec<Option<Vec<Tag>>> {
self.pool.read().by_hashes(&hashes)
.into_iter()
.map(|existing_in_pool| existing_in_pool
@@ -378,7 +379,7 @@ impl<B: ChainApi> ValidatedPool<B> {
}
/// Get ready transaction by hash
pub fn ready_by_hash(&self, hash: &ExHash<B>) -> Option<TransactionFor<B>> {
pub fn ready_by_hash(&self, hash: &ExtrinsicHash<B>) -> Option<TransactionFor<B>> {
self.pool.read().ready_by_hash(hash)
}
@@ -386,7 +387,7 @@ impl<B: ChainApi> ValidatedPool<B> {
pub fn prune_tags(
&self,
tags: impl IntoIterator<Item=Tag>,
) -> Result<PruneStatus<ExHash<B>, ExtrinsicFor<B>>, B::Error> {
) -> Result<PruneStatus<ExtrinsicHash<B>, ExtrinsicFor<B>>, B::Error> {
// Perform tag-based pruning in the base pool
let status = self.pool.write().prune_tags(tags);
// Notify event listeners of all transactions
@@ -408,8 +409,8 @@ impl<B: ChainApi> ValidatedPool<B> {
pub fn resubmit_pruned(
&self,
at: &BlockId<B::Block>,
known_imported_hashes: impl IntoIterator<Item=ExHash<B>> + Clone,
pruned_hashes: Vec<ExHash<B>>,
known_imported_hashes: impl IntoIterator<Item=ExtrinsicHash<B>> + Clone,
pruned_hashes: Vec<ExtrinsicHash<B>>,
pruned_xts: Vec<ValidatedTransactionFor<B>>,
) -> Result<(), B::Error> {
debug_assert_eq!(pruned_hashes.len(), pruned_xts.len());
@@ -440,7 +441,7 @@ impl<B: ChainApi> ValidatedPool<B> {
pub fn fire_pruned(
&self,
at: &BlockId<B::Block>,
hashes: impl Iterator<Item=ExHash<B>>,
hashes: impl Iterator<Item=ExtrinsicHash<B>>,
) -> Result<(), B::Error> {
let header_hash = self.api.block_id_to_hash(at)?
.ok_or_else(|| error::Error::InvalidBlockId(format!("{:?}", at)).into())?;
@@ -473,7 +474,7 @@ impl<B: ChainApi> ValidatedPool<B> {
.map(|tx| tx.hash.clone())
.collect::<Vec<_>>()
};
let futures_to_remove: Vec<ExHash<B>> = {
let futures_to_remove: Vec<ExtrinsicHash<B>> = {
let p = self.pool.read();
let mut hashes = Vec::new();
for tx in p.futures() {
@@ -494,7 +495,7 @@ impl<B: ChainApi> ValidatedPool<B> {
/// Get rotator reference.
#[cfg(test)]
pub fn rotator(&self) -> &PoolRotator<ExHash<B>> {
pub fn rotator(&self) -> &PoolRotator<ExtrinsicHash<B>> {
&self.rotator
}
@@ -507,14 +508,14 @@ impl<B: ChainApi> ValidatedPool<B> {
///
/// Consumers of this stream should use the `ready` method to actually get the
/// pending transactions in the right order.
pub fn import_notification_stream(&self) -> EventStream<ExHash<B>> {
pub fn import_notification_stream(&self) -> EventStream<ExtrinsicHash<B>> {
let (sink, stream) = tracing_unbounded("mpsc_import_notifications");
self.import_notification_sinks.lock().push(sink);
stream
}
/// Invoked when extrinsics are broadcasted.
pub fn on_broadcasted(&self, propagated: HashMap<ExHash<B>, Vec<String>>) {
pub fn on_broadcasted(&self, propagated: HashMap<ExtrinsicHash<B>, Vec<String>>) {
let mut listener = self.listener.write();
for (hash, peers) in propagated.into_iter() {
listener.broadcasted(&hash, peers);
@@ -527,7 +528,7 @@ impl<B: ChainApi> ValidatedPool<B> {
/// to prevent them from entering the pool right away.
/// Note this is not the case for the dependent transactions - those may
/// still be valid so we want to be able to re-import them.
pub fn remove_invalid(&self, hashes: &[ExHash<B>]) -> Vec<TransactionFor<B>> {
pub fn remove_invalid(&self, hashes: &[ExtrinsicHash<B>]) -> Vec<TransactionFor<B>> {
// early exit in case there is no invalid transactions.
if hashes.is_empty() {
return vec![];
+39 -27
View File
@@ -25,9 +25,7 @@ use futures::{
};
use sc_client_api::{
blockchain::HeaderBackend,
light::{Fetcher, RemoteCallRequest, RemoteBodyRequest},
BlockBackend,
blockchain::HeaderBackend, light::{Fetcher, RemoteCallRequest, RemoteBodyRequest}, BlockBackend,
};
use sp_runtime::{
generic::BlockId, traits::{self, Block as BlockT, BlockIdTo, Header as HeaderT, Hash as HashT},
@@ -45,10 +43,7 @@ pub struct FullChainApi<Client, Block> {
_marker: PhantomData<Block>,
}
impl<Client, Block> FullChainApi<Client, Block> where
Block: BlockT,
Client: ProvideRuntimeApi<Block> + BlockIdTo<Block>,
{
impl<Client, Block> FullChainApi<Client, Block> {
/// Create new transaction pool logic.
pub fn new(client: Arc<Client>) -> Self {
FullChainApi {
@@ -58,12 +53,13 @@ impl<Client, Block> FullChainApi<Client, Block> where
.name_prefix("txpool-verifier")
.create()
.expect("Failed to spawn verifier threads, that are critical for node operation."),
_marker: Default::default()
_marker: Default::default(),
}
}
}
impl<Client, Block> sc_transaction_graph::ChainApi for FullChainApi<Client, Block> where
impl<Client, Block> sc_transaction_graph::ChainApi for FullChainApi<Client, Block>
where
Block: BlockT,
Client: ProvideRuntimeApi<Block> + BlockBackend<Block> + BlockIdTo<Block>,
Client: Send + Sync + 'static,
@@ -71,9 +67,10 @@ impl<Client, Block> sc_transaction_graph::ChainApi for FullChainApi<Client, Bloc
sp_api::ApiErrorFor<Client, Block>: Send,
{
type Block = Block;
type Hash = Block::Hash;
type Error = error::Error;
type ValidationFuture = Pin<Box<dyn Future<Output = error::Result<TransactionValidity>> + Send>>;
type ValidationFuture = Pin<
Box<dyn Future<Output = error::Result<TransactionValidity>> + Send>
>;
type BodyFuture = Ready<error::Result<Option<Vec<<Self::Block as BlockT>::Extrinsic>>>>;
fn block_body(&self, id: &BlockId<Self::Block>) -> Self::BodyFuture {
@@ -136,7 +133,10 @@ impl<Client, Block> sc_transaction_graph::ChainApi for FullChainApi<Client, Bloc
self.client.to_hash(at).map_err(|e| Error::BlockIdConversion(format!("{:?}", e)))
}
fn hash_and_length(&self, ex: &sc_transaction_graph::ExtrinsicFor<Self>) -> (Self::Hash, usize) {
fn hash_and_length(
&self,
ex: &sc_transaction_graph::ExtrinsicFor<Self>,
) -> (sc_transaction_graph::ExtrinsicHash<Self>, usize) {
ex.using_encoded(|x| {
(<traits::HashFor::<Block> as traits::Hash>::hash(x), x.len())
})
@@ -150,11 +150,7 @@ pub struct LightChainApi<Client, F, Block> {
_phantom: PhantomData<Block>,
}
impl<Client, F, Block> LightChainApi<Client, F, Block> where
Block: BlockT,
Client: HeaderBackend<Block>,
F: Fetcher<Block>,
{
impl<Client, F, Block> LightChainApi<Client, F, Block> {
/// Create new transaction pool logic.
pub fn new(client: Arc<Client>, fetcher: Arc<F>) -> Self {
LightChainApi {
@@ -165,16 +161,23 @@ impl<Client, F, Block> LightChainApi<Client, F, Block> where
}
}
impl<Client, F, Block> sc_transaction_graph::ChainApi for LightChainApi<Client, F, Block> where
Block: BlockT,
Client: HeaderBackend<Block> + 'static,
F: Fetcher<Block> + 'static,
impl<Client, F, Block> sc_transaction_graph::ChainApi for
LightChainApi<Client, F, Block> where
Block: BlockT,
Client: HeaderBackend<Block> + 'static,
F: Fetcher<Block> + 'static,
{
type Block = Block;
type Hash = Block::Hash;
type Error = error::Error;
type ValidationFuture = Box<dyn Future<Output = error::Result<TransactionValidity>> + Send + Unpin>;
type BodyFuture = Pin<Box<dyn Future<Output = error::Result<Option<Vec<<Self::Block as BlockT>::Extrinsic>>>> + Send>>;
type ValidationFuture = Box<
dyn Future<Output = error::Result<TransactionValidity>> + Send + Unpin
>;
type BodyFuture = Pin<
Box<
dyn Future<Output = error::Result<Option<Vec<<Self::Block as BlockT>::Extrinsic>>>>
+ Send
>
>;
fn validate_transaction(
&self,
@@ -211,15 +214,24 @@ impl<Client, F, Block> sc_transaction_graph::ChainApi for LightChainApi<Client,
Box::new(remote_validation_request)
}
fn block_id_to_number(&self, at: &BlockId<Self::Block>) -> error::Result<Option<sc_transaction_graph::NumberFor<Self>>> {
fn block_id_to_number(
&self,
at: &BlockId<Self::Block>,
) -> error::Result<Option<sc_transaction_graph::NumberFor<Self>>> {
Ok(self.client.block_number_from_id(at)?)
}
fn block_id_to_hash(&self, at: &BlockId<Self::Block>) -> error::Result<Option<sc_transaction_graph::BlockHash<Self>>> {
fn block_id_to_hash(
&self,
at: &BlockId<Self::Block>,
) -> error::Result<Option<sc_transaction_graph::BlockHash<Self>>> {
Ok(self.client.block_hash_from_id(at)?)
}
fn hash_and_length(&self, ex: &sc_transaction_graph::ExtrinsicFor<Self>) -> (Self::Hash, usize) {
fn hash_and_length(
&self,
ex: &sc_transaction_graph::ExtrinsicFor<Self>,
) -> (sc_transaction_graph::ExtrinsicHash<Self>, usize) {
ex.using_encoded(|x| {
(<<Block::Header as HeaderT>::Hashing as HashT>::hash(x), x.len())
})
+91 -42
View File
@@ -35,7 +35,7 @@ pub use sc_transaction_graph as txpool;
pub use crate::api::{FullChainApi, LightChainApi};
use std::{collections::HashMap, sync::Arc, pin::Pin};
use futures::{prelude::*, future::ready, channel::oneshot};
use futures::{prelude::*, future::{ready, self}, channel::oneshot};
use parking_lot::Mutex;
use sp_runtime::{
@@ -47,14 +47,19 @@ use sp_transaction_pool::{
TransactionStatusStreamFor, MaintainedTransactionPool, PoolFuture, ChainEvent,
TransactionSource,
};
use sc_transaction_graph::ChainApi;
use wasm_timer::Instant;
use prometheus_endpoint::Registry as PrometheusRegistry;
use crate::metrics::MetricsLink as PrometheusMetrics;
type BoxedReadyIterator<Hash, Data> = Box<dyn Iterator<Item=Arc<sc_transaction_graph::base_pool::Transaction<Hash, Data>>> + Send>;
type BoxedReadyIterator<Hash, Data> = Box<
dyn Iterator<Item=Arc<sc_transaction_graph::base_pool::Transaction<Hash, Data>>> + Send
>;
type ReadyIteratorFor<PoolApi> = BoxedReadyIterator<sc_transaction_graph::ExHash<PoolApi>, sc_transaction_graph::ExtrinsicFor<PoolApi>>;
type ReadyIteratorFor<PoolApi> = BoxedReadyIterator<
sc_transaction_graph::ExtrinsicHash<PoolApi>, sc_transaction_graph::ExtrinsicFor<PoolApi>
>;
type PolledIterator<PoolApi> = Pin<Box<dyn Future<Output=ReadyIteratorFor<PoolApi>> + Send>>;
@@ -62,7 +67,7 @@ type PolledIterator<PoolApi> = Pin<Box<dyn Future<Output=ReadyIteratorFor<PoolAp
pub struct BasicPool<PoolApi, Block>
where
Block: BlockT,
PoolApi: sc_transaction_graph::ChainApi<Block=Block, Hash=Block::Hash>,
PoolApi: ChainApi<Block=Block>,
{
pool: Arc<sc_transaction_graph::Pool<PoolApi>>,
api: Arc<PoolApi>,
@@ -116,8 +121,7 @@ impl<T, Block: BlockT> ReadyPoll<T, Block> {
#[cfg(not(target_os = "unknown"))]
impl<PoolApi, Block> parity_util_mem::MallocSizeOf for BasicPool<PoolApi, Block>
where
PoolApi: sc_transaction_graph::ChainApi<Block=Block, Hash=Block::Hash>,
PoolApi::Hash: parity_util_mem::MallocSizeOf,
PoolApi: ChainApi<Block=Block>,
Block: BlockT,
{
fn size_of(&self, ops: &mut parity_util_mem::MallocSizeOfOps) -> usize {
@@ -146,7 +150,7 @@ pub enum RevalidationType {
impl<PoolApi, Block> BasicPool<PoolApi, Block>
where
Block: BlockT,
PoolApi: sc_transaction_graph::ChainApi<Block=Block, Hash=Block::Hash> + 'static,
PoolApi: ChainApi<Block=Block> + 'static,
{
/// Create new basic transaction pool with provided api.
///
@@ -226,11 +230,13 @@ impl<PoolApi, Block> BasicPool<PoolApi, Block>
impl<PoolApi, Block> TransactionPool for BasicPool<PoolApi, Block>
where
Block: BlockT,
PoolApi: 'static + sc_transaction_graph::ChainApi<Block=Block, Hash=Block::Hash>,
PoolApi: 'static + ChainApi<Block=Block>,
{
type Block = PoolApi::Block;
type Hash = sc_transaction_graph::ExHash<PoolApi>;
type InPoolTransaction = sc_transaction_graph::base_pool::Transaction<TxHash<Self>, TransactionFor<Self>>;
type Hash = sc_transaction_graph::ExtrinsicHash<PoolApi>;
type InPoolTransaction = sc_transaction_graph::base_pool::Transaction<
TxHash<Self>, TransactionFor<Self>
>;
type Error = PoolApi::Error;
fn submit_at(
@@ -429,22 +435,51 @@ impl<N: Clone + Copy + AtLeast32Bit> RevalidationStatus<N> {
}
}
/// Prune the known txs for the given block.
async fn prune_known_txs_for_block<Block: BlockT, Api: ChainApi<Block = Block>>(
block_id: BlockId<Block>,
api: &Api,
pool: &sc_transaction_graph::Pool<Api>,
) {
// We don't query block if we won't prune anything
if pool.validated_pool().status().is_empty() {
return;
}
let hashes = api.block_body(&block_id).await
.unwrap_or_else(|e| {
log::warn!("Prune known transactions: error request {:?}!", e);
None
})
.unwrap_or_default()
.into_iter()
.map(|tx| pool.hash_of(&tx))
.collect::<Vec<_>>();
if let Err(e) = pool.prune_known(&block_id, &hashes) {
log::error!("Cannot prune known in the pool {:?}!", e);
}
}
impl<PoolApi, Block> MaintainedTransactionPool for BasicPool<PoolApi, Block>
where
Block: BlockT,
PoolApi: 'static + sc_transaction_graph::ChainApi<Block=Block, Hash=Block::Hash>,
PoolApi: 'static + ChainApi<Block=Block>,
{
fn maintain(&self, event: ChainEvent<Self::Block>) -> Pin<Box<dyn Future<Output=()> + Send>> {
match event {
ChainEvent::NewBlock { id, retracted, .. } => {
let id = id.clone();
ChainEvent::NewBlock { id, tree_route, is_new_best, .. } => {
let pool = self.pool.clone();
let api = self.api.clone();
let block_number = match api.block_id_to_number(&id) {
Ok(Some(number)) => number,
_ => {
log::trace!(target: "txpool", "Skipping chain event - no number for that block {:?}", id);
log::trace!(
target: "txpool",
"Skipping chain event - no number for that block {:?}",
id,
);
return Box::pin(ready(()));
}
};
@@ -455,40 +490,40 @@ impl<PoolApi, Block> MaintainedTransactionPool for BasicPool<PoolApi, Block>
Some(20.into()),
);
let revalidation_strategy = self.revalidation_strategy.clone();
let retracted = retracted.clone();
let revalidation_queue = self.revalidation_queue.clone();
let ready_poll = self.ready_poll.clone();
async move {
// We don't query block if we won't prune anything
if !pool.validated_pool().status().is_empty() {
let hashes = api.block_body(&id).await
.unwrap_or_else(|e| {
log::warn!("Prune known transactions: error request {:?}!", e);
None
})
.unwrap_or_default()
.into_iter()
.map(|tx| pool.hash_of(&tx))
.collect::<Vec<_>>();
if let Err(e) = pool.prune_known(&id, &hashes) {
log::error!("Cannot prune known in the pool {:?}!", e);
}
// If there is a tree route, we use this to prune known tx based on the enacted
// blocks and otherwise we only prune known txs if the block is
// the new best block.
if let Some(ref tree_route) = tree_route {
future::join_all(
tree_route
.enacted()
.iter().map(|h|
prune_known_txs_for_block(
BlockId::Hash(h.hash.clone()),
&*api,
&*pool,
),
),
).await;
} else if is_new_best {
prune_known_txs_for_block(id.clone(), &*api, &*pool).await;
}
let extra_pool = pool.clone();
// After #5200 lands, this arguably might be moved to the handler of "all blocks notification".
ready_poll.lock().trigger(block_number, move || Box::new(extra_pool.validated_pool().ready()));
if next_action.resubmit {
if let (true, Some(tree_route)) = (next_action.resubmit, tree_route) {
let mut resubmit_transactions = Vec::new();
for retracted_hash in retracted {
// notify txs awaiting finality that it has been retracted
pool.validated_pool().on_block_retracted(retracted_hash.clone());
for retracted in tree_route.retracted() {
let hash = retracted.hash.clone();
let block_transactions = api.block_body(&BlockId::hash(retracted_hash.clone())).await
// notify txs awaiting finality that it has been retracted
pool.validated_pool().on_block_retracted(hash.clone());
let block_transactions = api.block_body(&BlockId::hash(hash))
.await
.unwrap_or_else(|e| {
log::warn!("Failed to fetch block body {:?}!", e);
None
@@ -499,23 +534,37 @@ impl<PoolApi, Block> MaintainedTransactionPool for BasicPool<PoolApi, Block>
resubmit_transactions.extend(block_transactions);
}
if let Err(e) = pool.submit_at(
&id,
// These transactions are coming from retracted blocks, we should
// simply consider them external.
TransactionSource::External,
resubmit_transactions,
true
true,
).await {
log::debug!(
target: "txpool",
"[{:?}] Error re-submitting transactions: {:?}", id, e
"[{:?}] Error re-submitting transactions: {:?}",
id,
e,
)
}
}
let extra_pool = pool.clone();
// After #5200 lands, this arguably might be moved to the
// handler of "all blocks notification".
ready_poll.lock().trigger(
block_number,
move || Box::new(extra_pool.validated_pool().ready()),
);
if next_action.revalidate {
let hashes = pool.validated_pool().ready().map(|tx| tx.hash.clone()).collect();
let hashes = pool.validated_pool()
.ready()
.map(|tx| tx.hash.clone())
.collect();
revalidation_queue.revalidate_later(block_number, hashes).await;
}
@@ -20,7 +20,7 @@
use std::{sync::Arc, pin::Pin, collections::{HashMap, HashSet, BTreeMap}};
use sc_transaction_graph::{ChainApi, Pool, ExHash, NumberFor, ValidatedTransaction};
use sc_transaction_graph::{ChainApi, Pool, ExtrinsicHash, NumberFor, ValidatedTransaction};
use sp_runtime::traits::{Zero, SaturatedConversion};
use sp_runtime::generic::BlockId;
use sp_runtime::transaction_validity::TransactionValidityError;
@@ -39,7 +39,7 @@ const BACKGROUND_REVALIDATION_BATCH_SIZE: usize = 20;
/// Payload from queue to worker.
struct WorkerPayload<Api: ChainApi> {
at: NumberFor<Api>,
transactions: Vec<ExHash<Api>>,
transactions: Vec<ExtrinsicHash<Api>>,
}
/// Async revalidation worker.
@@ -49,8 +49,8 @@ struct RevalidationWorker<Api: ChainApi> {
api: Arc<Api>,
pool: Arc<Pool<Api>>,
best_block: NumberFor<Api>,
block_ordered: BTreeMap<NumberFor<Api>, HashSet<ExHash<Api>>>,
members: HashMap<ExHash<Api>, NumberFor<Api>>,
block_ordered: BTreeMap<NumberFor<Api>, HashSet<ExtrinsicHash<Api>>>,
members: HashMap<ExtrinsicHash<Api>, NumberFor<Api>>,
}
impl<Api: ChainApi> Unpin for RevalidationWorker<Api> {}
@@ -63,7 +63,7 @@ async fn batch_revalidate<Api: ChainApi>(
pool: Arc<Pool<Api>>,
api: Arc<Api>,
at: NumberFor<Api>,
batch: impl IntoIterator<Item=ExHash<Api>>,
batch: impl IntoIterator<Item=ExtrinsicHash<Api>>,
) {
let mut invalid_hashes = Vec::new();
let mut revalidated = HashMap::new();
@@ -129,7 +129,7 @@ impl<Api: ChainApi> RevalidationWorker<Api> {
}
}
fn prepare_batch(&mut self) -> Vec<ExHash<Api>> {
fn prepare_batch(&mut self) -> Vec<ExtrinsicHash<Api>> {
let mut queued_exts = Vec::new();
let mut left = BACKGROUND_REVALIDATION_BATCH_SIZE;
@@ -334,7 +334,7 @@ where
/// If queue configured with background worker, this will return immediately.
/// If queue configured without background worker, this will resolve after
/// revalidation is actually done.
pub async fn revalidate_later(&self, at: NumberFor<Api>, transactions: Vec<ExHash<Api>>) {
pub async fn revalidate_later(&self, at: NumberFor<Api>, transactions: Vec<ExtrinsicHash<Api>>) {
if transactions.len() > 0 {
log::debug!(target: "txpool", "Sent {} transactions to revalidation queue", transactions.len());
}
@@ -359,9 +359,7 @@ mod tests {
use sp_transaction_pool::TransactionSource;
use substrate_test_runtime_transaction_pool::{TestApi, uxt};
use futures::executor::block_on;
use substrate_test_runtime_client::{
AccountKeyring::*,
};
use substrate_test_runtime_client::AccountKeyring::*;
fn setup() -> (Arc<TestApi>, Pool<TestApi>) {
let test_api = Arc::new(TestApi::empty());
@@ -25,12 +25,12 @@ use sp_runtime::{
transaction_validity::{ValidTransaction, TransactionSource, InvalidTransaction},
};
use substrate_test_runtime_client::{
runtime::{Block, Hash, Index, Header, Extrinsic, Transfer},
AccountKeyring::*,
runtime::{Block, Hash, Index, Header, Extrinsic, Transfer}, AccountKeyring::*,
};
use substrate_test_runtime_transaction_pool::{TestApi, uxt};
use futures::{prelude::*, task::Poll};
use codec::Encode;
use std::collections::BTreeSet;
fn pool() -> Pool<TestApi> {
Pool::new(Default::default(), TestApi::with_alice_nonce(209).into())
@@ -42,7 +42,7 @@ fn maintained_pool() -> (
intervalier::BackSignalControl,
) {
let (pool, background_task, notifier) = BasicPool::new_test(
std::sync::Arc::new(TestApi::with_alice_nonce(209))
Arc::new(TestApi::with_alice_nonce(209)),
);
let thread_pool = futures::executor::ThreadPool::new().unwrap();
@@ -112,6 +112,7 @@ fn prune_tags_should_work() {
let pending: Vec<_> = pool.validated_pool().ready().map(|a| a.data.transfer().nonce).collect();
assert_eq!(pending, vec![209, 210]);
pool.validated_pool().api().push_block(1, Vec::new());
block_on(
pool.prune_tags(
&BlockId::number(1),
@@ -140,6 +141,37 @@ fn should_ban_invalid_transactions() {
block_on(pool.submit_one(&BlockId::number(0), SOURCE, uxt.clone())).unwrap_err();
}
#[test]
fn only_prune_on_new_best() {
let pool = maintained_pool().0;
let uxt = uxt(Alice, 209);
let _ = block_on(
pool.submit_and_watch(&BlockId::number(1), SOURCE, uxt.clone())
).expect("1. Imported");
let header = pool.api.push_block(1, vec![uxt.clone()]);
assert_eq!(pool.status().ready, 1);
let event = ChainEvent::NewBlock {
id: BlockId::Number(1),
is_new_best: false,
header: header.clone(),
tree_route: None,
};
block_on(pool.maintain(event));
assert_eq!(pool.status().ready, 1);
let header = pool.api.push_block(2, vec![uxt]);
let event = ChainEvent::NewBlock {
id: BlockId::Number(2),
is_new_best: true,
header: header.clone(),
tree_route: None,
};
block_on(pool.maintain(event));
assert_eq!(pool.status().ready, 0);
}
#[test]
fn should_correctly_prune_transactions_providing_more_than_one_tag() {
let api = Arc::new(TestApi::with_alice_nonce(209));
@@ -153,6 +185,7 @@ fn should_correctly_prune_transactions_providing_more_than_one_tag() {
// remove the transaction that just got imported.
api.increment_nonce(Alice.into());
api.push_block(1, Vec::new());
block_on(pool.prune_tags(&BlockId::number(1), vec![vec![209]], vec![])).expect("1. Pruned");
assert_eq!(pool.validated_pool().status().ready, 0);
// it's re-imported to future
@@ -160,6 +193,7 @@ fn should_correctly_prune_transactions_providing_more_than_one_tag() {
// so now let's insert another transaction that also provides the 155
api.increment_nonce(Alice.into());
api.push_block(2, Vec::new());
let xt = uxt(Alice, 211);
block_on(pool.submit_one(&BlockId::number(2), SOURCE, xt.clone())).expect("2. Imported");
assert_eq!(pool.validated_pool().status().ready, 1);
@@ -169,6 +203,7 @@ fn should_correctly_prune_transactions_providing_more_than_one_tag() {
// prune it and make sure the pool is empty
api.increment_nonce(Alice.into());
api.push_block(3, Vec::new());
block_on(pool.prune_tags(&BlockId::number(3), vec![vec![155]], vec![])).expect("2. Pruned");
assert_eq!(pool.validated_pool().status().ready, 0);
assert_eq!(pool.validated_pool().status().future, 2);
@@ -178,21 +213,26 @@ fn block_event(id: u64) -> ChainEvent<Block> {
ChainEvent::NewBlock {
id: BlockId::number(id),
is_new_best: true,
retracted: vec![],
tree_route: None,
header: header(id),
}
}
fn block_event_with_retracted(id: u64, retracted: Vec<Hash>) -> ChainEvent<Block> {
fn block_event_with_retracted(
header: Header,
retracted_start: Hash,
api: &TestApi,
) -> ChainEvent<Block> {
let tree_route = api.tree_route(retracted_start, header.hash()).expect("Tree route exists");
ChainEvent::NewBlock {
id: BlockId::number(id),
id: BlockId::hash(header.hash()),
is_new_best: true,
retracted: retracted,
header: header(id),
tree_route: Some(Arc::new(tree_route)),
header,
}
}
#[test]
fn should_prune_old_during_maintenance() {
let xt = uxt(Alice, 209);
@@ -232,17 +272,16 @@ fn should_revalidate_during_maintenance() {
#[test]
fn should_resubmit_from_retracted_during_maintenance() {
let xt = uxt(Alice, 209);
let retracted_hash = Hash::random();
let (pool, _guard, _notifier) = maintained_pool();
block_on(pool.submit_one(&BlockId::number(0), SOURCE, xt.clone())).expect("1. Imported");
assert_eq!(pool.status().ready, 1);
pool.api.push_block(1, vec![]);
pool.api.push_fork_block(retracted_hash, vec![xt.clone()]);
let header = pool.api.push_block(1, vec![]);
let fork_header = pool.api.push_block(1, vec![]);
let event = block_event_with_retracted(1, vec![retracted_hash]);
let event = block_event_with_retracted(header, fork_header.hash(), &*pool.api);
block_on(pool.maintain(event));
assert_eq!(pool.status().ready, 1);
@@ -251,18 +290,17 @@ fn should_resubmit_from_retracted_during_maintenance() {
#[test]
fn should_not_retain_invalid_hashes_from_retracted() {
let xt = uxt(Alice, 209);
let retracted_hash = Hash::random();
let (pool, _guard, mut notifier) = maintained_pool();
block_on(pool.submit_one(&BlockId::number(0), SOURCE, xt.clone())).expect("1. Imported");
assert_eq!(pool.status().ready, 1);
pool.api.push_block(1, vec![]);
pool.api.push_fork_block(retracted_hash, vec![xt.clone()]);
let header = pool.api.push_block(1, vec![]);
let fork_header = pool.api.push_block(1, vec![xt.clone()]);
pool.api.add_invalid(&xt);
let event = block_event_with_retracted(1, vec![retracted_hash]);
let event = block_event_with_retracted(header, fork_header.hash(), &*pool.api);
block_on(pool.maintain(event));
block_on(notifier.next());
@@ -389,15 +427,24 @@ fn should_push_watchers_during_maintaince() {
// events for hash2 are: Ready, InBlock
assert_eq!(
futures::executor::block_on_stream(watcher0).collect::<Vec<_>>(),
vec![TransactionStatus::Ready, TransactionStatus::InBlock(header_hash.clone()), TransactionStatus::Finalized(header_hash.clone())],
vec![
TransactionStatus::Ready,
TransactionStatus::InBlock(header_hash.clone()),
TransactionStatus::Finalized(header_hash.clone())],
);
assert_eq!(
futures::executor::block_on_stream(watcher1).collect::<Vec<_>>(),
vec![TransactionStatus::Ready, TransactionStatus::InBlock(header_hash.clone()), TransactionStatus::Finalized(header_hash.clone())],
vec![
TransactionStatus::Ready,
TransactionStatus::InBlock(header_hash.clone()),
TransactionStatus::Finalized(header_hash.clone())],
);
assert_eq!(
futures::executor::block_on_stream(watcher2).collect::<Vec<_>>(),
vec![TransactionStatus::Ready, TransactionStatus::InBlock(header_hash.clone()), TransactionStatus::Finalized(header_hash.clone())],
vec![
TransactionStatus::Ready,
TransactionStatus::InBlock(header_hash.clone()),
TransactionStatus::Finalized(header_hash.clone())],
);
}
@@ -423,12 +470,12 @@ fn finalization() {
).expect("1. Imported");
pool.api.push_block(2, vec![xt.clone()]);
let header = pool.api.chain().read().header_by_number.get(&2).cloned().unwrap();
let header = pool.api.chain().read().block_by_number.get(&2).unwrap()[0].header().clone();
let event = ChainEvent::NewBlock {
id: BlockId::Hash(header.hash()),
is_new_best: true,
header: header.clone(),
retracted: vec![]
tree_route: None,
};
block_on(pool.maintain(event));
@@ -467,7 +514,6 @@ fn fork_aware_finalization() {
let c2;
let d2;
// block B1
{
let watcher = block_on(
@@ -481,7 +527,7 @@ fn fork_aware_finalization() {
id: BlockId::Number(2),
is_new_best: true,
header: header.clone(),
retracted: vec![],
tree_route: None,
};
b1 = header.hash();
block_on(pool.maintain(event));
@@ -492,7 +538,7 @@ fn fork_aware_finalization() {
// block C2
{
let header = pool.api.push_fork_block_with_parent(b1, vec![from_dave.clone()]);
let header = pool.api.push_block_with_parent(b1, vec![from_dave.clone()]);
from_dave_watcher = block_on(
pool.submit_and_watch(&BlockId::number(1), SOURCE, from_dave.clone())
).expect("1. Imported");
@@ -501,7 +547,7 @@ fn fork_aware_finalization() {
id: BlockId::Hash(header.hash()),
is_new_best: true,
header: header.clone(),
retracted: vec![]
tree_route: None,
};
c2 = header.hash();
block_on(pool.maintain(event));
@@ -514,13 +560,13 @@ fn fork_aware_finalization() {
pool.submit_and_watch(&BlockId::number(1), SOURCE, from_bob.clone())
).expect("1. Imported");
assert_eq!(pool.status().ready, 1);
let header = pool.api.push_fork_block_with_parent(c2, vec![from_bob.clone()]);
let header = pool.api.push_block_with_parent(c2, vec![from_bob.clone()]);
let event = ChainEvent::NewBlock {
id: BlockId::Hash(header.hash()),
is_new_best: true,
header: header.clone(),
retracted: vec![]
tree_route: None,
};
d2 = header.hash();
block_on(pool.maintain(event));
@@ -536,14 +582,10 @@ fn fork_aware_finalization() {
let header = pool.api.push_block(3, vec![from_charlie.clone()]);
canon_watchers.push((watcher, header.hash()));
let event = ChainEvent::NewBlock {
id: BlockId::Number(3),
is_new_best: true,
header: header.clone(),
retracted: vec![c2, d2],
};
let event = block_event_with_retracted(header.clone(), d2, &*pool.api);
block_on(pool.maintain(event));
assert_eq!(pool.status().ready, 2);
let event = ChainEvent::Finalized { hash: header.hash() };
block_on(pool.maintain(event));
}
@@ -562,7 +604,7 @@ fn fork_aware_finalization() {
id: BlockId::Hash(header.hash()),
is_new_best: true,
header: header.clone(),
retracted: vec![]
tree_route: None,
};
d1 = header.hash();
block_on(pool.maintain(event));
@@ -581,7 +623,7 @@ fn fork_aware_finalization() {
id: BlockId::Hash(header.hash()),
is_new_best: true,
header: header.clone(),
retracted: vec![]
tree_route: None,
};
block_on(pool.maintain(event));
assert_eq!(pool.status().ready, 0);
@@ -599,7 +641,7 @@ fn fork_aware_finalization() {
{
let mut stream= futures::executor::block_on_stream(from_dave_watcher);
let mut stream = futures::executor::block_on_stream(from_dave_watcher);
assert_eq!(stream.next(), Some(TransactionStatus::Ready));
assert_eq!(stream.next(), Some(TransactionStatus::InBlock(c2.clone())));
assert_eq!(stream.next(), Some(TransactionStatus::Retracted(c2)));
@@ -610,7 +652,7 @@ fn fork_aware_finalization() {
}
{
let mut stream= futures::executor::block_on_stream(from_bob_watcher);
let mut stream = futures::executor::block_on_stream(from_bob_watcher);
assert_eq!(stream.next(), Some(TransactionStatus::Ready));
assert_eq!(stream.next(), Some(TransactionStatus::InBlock(d2.clone())));
assert_eq!(stream.next(), Some(TransactionStatus::Retracted(d2)));
@@ -621,6 +663,217 @@ fn fork_aware_finalization() {
}
}
/// This test ensures that transactions from a fork are re-submitted if
/// the forked block is not part of the retracted blocks. This happens as the
/// retracted block list only contains the route from the old best to the new
/// best, without any further forks.
///
/// Given the following:
///
/// -> D0 (old best, tx0)
/// /
/// C - -> D1 (tx1)
/// \
/// -> D2 (new best)
///
/// Retracted will contain `D0`, but we need to re-submit `tx0` and `tx1` as both
/// blocks are not part of the canonical chain.
#[test]
fn resubmit_tx_of_fork_that_is_not_part_of_retracted() {
let api = TestApi::empty();
// starting block A1 (last finalized.)
api.push_block(1, vec![]);
let (pool, _background, _) = BasicPool::new_test(api.into());
let tx0 = uxt(Alice, 1);
let tx1 = uxt(Dave, 2);
pool.api.increment_nonce(Alice.into());
pool.api.increment_nonce(Dave.into());
let d0;
// Block D0
{
let _ = block_on(
pool.submit_and_watch(&BlockId::number(1), SOURCE, tx0.clone())
).expect("1. Imported");
let header = pool.api.push_block(2, vec![tx0.clone()]);
assert_eq!(pool.status().ready, 1);
let event = ChainEvent::NewBlock {
id: BlockId::Number(2),
is_new_best: true,
header: header.clone(),
tree_route: None,
};
d0 = header.hash();
block_on(pool.maintain(event));
assert_eq!(pool.status().ready, 0);
}
// Block D1
{
let _ = block_on(
pool.submit_and_watch(&BlockId::number(1), SOURCE, tx1.clone())
).expect("1. Imported");
let header = pool.api.push_block(2, vec![tx1.clone()]);
assert_eq!(pool.status().ready, 1);
let event = ChainEvent::NewBlock {
id: BlockId::Hash(header.hash()),
is_new_best: false,
header: header.clone(),
tree_route: None,
};
block_on(pool.maintain(event));
// Only transactions from new best should be pruned
assert_eq!(pool.status().ready, 1);
}
// Block D2
{
let header = pool.api.push_block(2, vec![]);
let event = block_event_with_retracted(header, d0, &*pool.api);
block_on(pool.maintain(event));
assert_eq!(pool.status().ready, 2);
}
}
#[test]
fn resubmit_from_retracted_fork() {
let api = TestApi::empty();
// starting block A1 (last finalized.)
api.push_block(1, vec![]);
let (pool, _background, _) = BasicPool::new_test(api.into());
let tx0 = uxt(Alice, 1);
let tx1 = uxt(Dave, 2);
let tx2 = uxt(Bob, 3);
// Transactions of the fork that will be enacted later
let tx3 = uxt(Eve, 1);
let tx4 = uxt(Ferdie, 2);
let tx5 = uxt(One, 3);
pool.api.increment_nonce(Alice.into());
pool.api.increment_nonce(Dave.into());
pool.api.increment_nonce(Bob.into());
pool.api.increment_nonce(Eve.into());
pool.api.increment_nonce(Ferdie.into());
pool.api.increment_nonce(One.into());
// Block D0
{
let _ = block_on(
pool.submit_and_watch(&BlockId::number(1), SOURCE, tx0.clone())
).expect("1. Imported");
let header = pool.api.push_block(2, vec![tx0.clone()]);
assert_eq!(pool.status().ready, 1);
let event = ChainEvent::NewBlock {
id: BlockId::Number(2),
is_new_best: true,
header: header.clone(),
tree_route: None,
};
block_on(pool.maintain(event));
assert_eq!(pool.status().ready, 0);
}
// Block E0
{
let _ = block_on(
pool.submit_and_watch(&BlockId::number(1), SOURCE, tx1.clone())
).expect("1. Imported");
let header = pool.api.push_block(3, vec![tx1.clone()]);
let event = ChainEvent::NewBlock {
id: BlockId::Hash(header.hash()),
is_new_best: true,
header: header.clone(),
tree_route: None,
};
block_on(pool.maintain(event));
assert_eq!(pool.status().ready, 0);
}
// Block F0
let f0 = {
let _ = block_on(
pool.submit_and_watch(&BlockId::number(1), SOURCE, tx2.clone())
).expect("1. Imported");
let header = pool.api.push_block(4, vec![tx2.clone()]);
let event = ChainEvent::NewBlock {
id: BlockId::Hash(header.hash()),
is_new_best: true,
header: header.clone(),
tree_route: None,
};
block_on(pool.maintain(event));
assert_eq!(pool.status().ready, 0);
header.hash()
};
// Block D1
let d1 = {
let _ = block_on(
pool.submit_and_watch(&BlockId::number(1), SOURCE, tx3.clone())
).expect("1. Imported");
let header = pool.api.push_block(2, vec![tx3.clone()]);
let event = ChainEvent::NewBlock {
id: BlockId::Hash(header.hash()),
is_new_best: false,
header: header.clone(),
tree_route: None,
};
block_on(pool.maintain(event));
assert_eq!(pool.status().ready, 1);
header.hash()
};
// Block E1
let e1 = {
let _ = block_on(
pool.submit_and_watch(&BlockId::number(1), SOURCE, tx4.clone())
).expect("1. Imported");
let header = pool.api.push_block_with_parent(d1.clone(), vec![tx4.clone()]);
let event = ChainEvent::NewBlock {
id: BlockId::Hash(header.hash()),
is_new_best: false,
header: header.clone(),
tree_route: None,
};
block_on(pool.maintain(event));
assert_eq!(pool.status().ready, 2);
header.hash()
};
// Block F1
let f1_header = {
let _ = block_on(
pool.submit_and_watch(&BlockId::number(1), SOURCE, tx5.clone())
).expect("1. Imported");
let header = pool.api.push_block_with_parent(e1.clone(), vec![tx5.clone()]);
// Don't announce the block event to the pool directly, because we will
// re-org to this block.
assert_eq!(pool.status().ready, 3);
header
};
let ready = pool.ready().map(|t| t.data.encode()).collect::<BTreeSet<_>>();
let expected_ready = vec![tx3, tx4, tx5].iter().map(Encode::encode).collect::<BTreeSet<_>>();
assert_eq!(expected_ready, ready);
let event = block_event_with_retracted(f1_header, f0, &*pool.api);
block_on(pool.maintain(event));
assert_eq!(pool.status().ready, 3);
let ready = pool.ready().map(|t| t.data.encode()).collect::<BTreeSet<_>>();
let expected_ready = vec![tx0, tx1, tx2].iter().map(Encode::encode).collect::<BTreeSet<_>>();
assert_eq!(expected_ready, ready);
}
#[test]
fn ready_set_should_not_resolve_before_block_update() {
let (pool, _guard, _notifier) = maintained_pool();
@@ -678,6 +931,7 @@ fn should_not_accept_old_signatures() {
use std::convert::TryFrom;
let client = Arc::new(substrate_test_runtime_client::new());
let pool = Arc::new(
BasicPool::new_test(Arc::new(FullChainApi::new(client))).0
);