mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-26 16:57:58 +00:00
8ae1aa4c28
* Switch to bounded mpsc for txpool import notification stream * Update client/transaction-pool/graph/src/validated_pool.rs Co-authored-by: Nikolay Volf <nikvolf@gmail.com> Co-authored-by: Nikolay Volf <nikvolf@gmail.com>
1027 lines
32 KiB
Rust
1027 lines
32 KiB
Rust
// This file is part of Substrate.
|
|
|
|
// Copyright (C) 2018-2020 Parity Technologies (UK) Ltd.
|
|
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
|
|
|
|
// This program is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
|
|
// This program is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU General Public License for more details.
|
|
|
|
// You should have received a copy of the GNU General Public License
|
|
// along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
use std::{
|
|
collections::HashMap,
|
|
sync::Arc,
|
|
};
|
|
|
|
use crate::{base_pool as base, watcher::Watcher};
|
|
|
|
use futures::Future;
|
|
use sp_runtime::{
|
|
generic::BlockId,
|
|
traits::{self, SaturatedConversion, Block as BlockT},
|
|
transaction_validity::{
|
|
TransactionValidity, TransactionTag as Tag, TransactionValidityError, TransactionSource,
|
|
},
|
|
};
|
|
use sp_transaction_pool::error;
|
|
use wasm_timer::Instant;
|
|
use futures::channel::mpsc::Receiver;
|
|
|
|
use crate::validated_pool::ValidatedPool;
|
|
pub use crate::validated_pool::ValidatedTransaction;
|
|
|
|
/// Modification notification event stream type;
|
|
pub type EventStream<H> = Receiver<H>;
|
|
|
|
/// Block hash type for a pool.
|
|
pub type BlockHash<A> = <<A as ChainApi>::Block as traits::Block>::Hash;
|
|
/// Extrinsic hash type for a pool.
|
|
pub type ExtrinsicHash<A> = <<A as ChainApi>::Block as traits::Block>::Hash;
|
|
/// Extrinsic type for a pool.
|
|
pub type ExtrinsicFor<A> = <<A as ChainApi>::Block as traits::Block>::Extrinsic;
|
|
/// Block number type for the ChainApi
|
|
pub type NumberFor<A> = traits::NumberFor<<A as ChainApi>::Block>;
|
|
/// A type of transaction stored in the pool
|
|
pub type TransactionFor<A> = Arc<base::Transaction<ExtrinsicHash<A>, ExtrinsicFor<A>>>;
|
|
/// A type of validated transaction stored in the pool.
|
|
pub type ValidatedTransactionFor<A> = ValidatedTransaction<
|
|
ExtrinsicHash<A>,
|
|
ExtrinsicFor<A>,
|
|
<A as ChainApi>::Error,
|
|
>;
|
|
|
|
/// Concrete extrinsic validation and query logic.
|
|
pub trait ChainApi: Send + Sync {
|
|
/// Block type.
|
|
type Block: BlockT;
|
|
/// Error type.
|
|
type Error: From<error::Error> + error::IntoPoolError;
|
|
/// Validate transaction future.
|
|
type ValidationFuture: Future<Output=Result<TransactionValidity, Self::Error>> + Send + Unpin;
|
|
/// Body future (since block body might be remote)
|
|
type BodyFuture: Future<
|
|
Output = Result<Option<Vec<<Self::Block as traits::Block>::Extrinsic>>, Self::Error>
|
|
> + Unpin + Send + 'static;
|
|
|
|
/// Verify extrinsic at given block.
|
|
fn validate_transaction(
|
|
&self,
|
|
at: &BlockId<Self::Block>,
|
|
source: TransactionSource,
|
|
uxt: ExtrinsicFor<Self>,
|
|
) -> Self::ValidationFuture;
|
|
|
|
/// Returns a block number given the block id.
|
|
fn block_id_to_number(
|
|
&self,
|
|
at: &BlockId<Self::Block>,
|
|
) -> Result<Option<NumberFor<Self>>, Self::Error>;
|
|
|
|
/// Returns a block hash given the block id.
|
|
fn block_id_to_hash(
|
|
&self,
|
|
at: &BlockId<Self::Block>,
|
|
) -> Result<Option<<Self::Block as BlockT>::Hash>, Self::Error>;
|
|
|
|
/// Returns hash and encoding length of the extrinsic.
|
|
fn hash_and_length(&self, uxt: &ExtrinsicFor<Self>) -> (ExtrinsicHash<Self>, usize);
|
|
|
|
/// Returns a block body given the block id.
|
|
fn block_body(&self, at: &BlockId<Self::Block>) -> Self::BodyFuture;
|
|
}
|
|
|
|
/// Pool configuration options.
|
|
#[derive(Debug, Clone)]
|
|
pub struct Options {
|
|
/// Ready queue limits.
|
|
pub ready: base::Limit,
|
|
/// Future queue limits.
|
|
pub future: base::Limit,
|
|
/// Reject future transactions.
|
|
pub reject_future_transactions: bool,
|
|
}
|
|
|
|
impl Default for Options {
|
|
fn default() -> Self {
|
|
Options {
|
|
ready: base::Limit {
|
|
count: 8192,
|
|
total_bytes: 20 * 1024 * 1024,
|
|
},
|
|
future: base::Limit {
|
|
count: 512,
|
|
total_bytes: 1 * 1024 * 1024,
|
|
},
|
|
reject_future_transactions: false,
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Should we check that the transaction is banned
|
|
/// in the pool, before we verify it?
|
|
#[derive(Copy, Clone)]
|
|
enum CheckBannedBeforeVerify {
|
|
Yes,
|
|
No,
|
|
}
|
|
|
|
/// Extrinsics pool that performs validation.
|
|
pub struct Pool<B: ChainApi> {
|
|
validated_pool: Arc<ValidatedPool<B>>,
|
|
}
|
|
|
|
#[cfg(not(target_os = "unknown"))]
|
|
impl<B: ChainApi> parity_util_mem::MallocSizeOf for Pool<B>
|
|
where
|
|
ExtrinsicFor<B>: parity_util_mem::MallocSizeOf,
|
|
{
|
|
fn size_of(&self, ops: &mut parity_util_mem::MallocSizeOfOps) -> usize {
|
|
self.validated_pool.size_of(ops)
|
|
}
|
|
}
|
|
|
|
impl<B: ChainApi> Pool<B> {
|
|
/// Create a new transaction pool.
|
|
pub fn new(options: Options, api: Arc<B>) -> Self {
|
|
Pool {
|
|
validated_pool: Arc::new(ValidatedPool::new(options, api)),
|
|
}
|
|
}
|
|
|
|
/// Imports a bunch of unverified extrinsics to the pool
|
|
pub async fn submit_at(
|
|
&self,
|
|
at: &BlockId<B::Block>,
|
|
source: TransactionSource,
|
|
xts: impl IntoIterator<Item=ExtrinsicFor<B>>,
|
|
) -> Result<Vec<Result<ExtrinsicHash<B>, B::Error>>, B::Error> {
|
|
let xts = xts.into_iter().map(|xt| (source, xt));
|
|
let validated_transactions = self.verify(at, xts, CheckBannedBeforeVerify::Yes).await?;
|
|
Ok(self.validated_pool.submit(validated_transactions.into_iter().map(|(_, tx)| tx)))
|
|
}
|
|
|
|
/// Resubmit the given extrinsics to the pool.
|
|
///
|
|
/// This does not check if a transaction is banned, before we verify it again.
|
|
pub async fn resubmit_at(
|
|
&self,
|
|
at: &BlockId<B::Block>,
|
|
source: TransactionSource,
|
|
xts: impl IntoIterator<Item=ExtrinsicFor<B>>,
|
|
) -> Result<Vec<Result<ExtrinsicHash<B>, B::Error>>, B::Error> {
|
|
let xts = xts.into_iter().map(|xt| (source, xt));
|
|
let validated_transactions = self.verify(at, xts, CheckBannedBeforeVerify::No).await?;
|
|
Ok(self.validated_pool.submit(validated_transactions.into_iter().map(|(_, tx)| tx)))
|
|
}
|
|
|
|
/// Imports one unverified extrinsic to the pool
|
|
pub async fn submit_one(
|
|
&self,
|
|
at: &BlockId<B::Block>,
|
|
source: TransactionSource,
|
|
xt: ExtrinsicFor<B>,
|
|
) -> Result<ExtrinsicHash<B>, B::Error> {
|
|
let res = self.submit_at(at, source, std::iter::once(xt)).await?.pop();
|
|
res.expect("One extrinsic passed; one result returned; qed")
|
|
}
|
|
|
|
/// Import a single extrinsic and starts to watch their progress in the pool.
|
|
pub async fn submit_and_watch(
|
|
&self,
|
|
at: &BlockId<B::Block>,
|
|
source: TransactionSource,
|
|
xt: ExtrinsicFor<B>,
|
|
) -> Result<Watcher<ExtrinsicHash<B>, ExtrinsicHash<B>>, B::Error> {
|
|
let block_number = self.resolve_block_number(at)?;
|
|
let (_, tx) = self.verify_one(
|
|
at,
|
|
block_number,
|
|
source,
|
|
xt,
|
|
CheckBannedBeforeVerify::Yes,
|
|
).await;
|
|
self.validated_pool.submit_and_watch(tx)
|
|
}
|
|
|
|
/// Resubmit some transaction that were validated elsewhere.
|
|
pub fn resubmit(
|
|
&self,
|
|
revalidated_transactions: HashMap<ExtrinsicHash<B>, ValidatedTransactionFor<B>>,
|
|
) {
|
|
|
|
let now = Instant::now();
|
|
self.validated_pool.resubmit(revalidated_transactions);
|
|
log::debug!(target: "txpool",
|
|
"Resubmitted. Took {} ms. Status: {:?}",
|
|
now.elapsed().as_millis(),
|
|
self.validated_pool.status()
|
|
);
|
|
}
|
|
|
|
/// Prunes known ready transactions.
|
|
///
|
|
/// Used to clear the pool from transactions that were part of recently imported block.
|
|
/// The main difference from the `prune` is that we do not revalidate any transactions
|
|
/// and ignore unknown passed hashes.
|
|
pub fn prune_known(
|
|
&self,
|
|
at: &BlockId<B::Block>,
|
|
hashes: &[ExtrinsicHash<B>],
|
|
) -> Result<(), B::Error> {
|
|
// Get details of all extrinsics that are already in the pool
|
|
let in_pool_tags = self.validated_pool.extrinsics_tags(hashes)
|
|
.into_iter().filter_map(|x| x).flat_map(|x| x);
|
|
|
|
// Prune all transactions that provide given tags
|
|
let prune_status = self.validated_pool.prune_tags(in_pool_tags)?;
|
|
let pruned_transactions = hashes.into_iter().cloned()
|
|
.chain(prune_status.pruned.iter().map(|tx| tx.hash.clone()));
|
|
self.validated_pool.fire_pruned(at, pruned_transactions)
|
|
}
|
|
|
|
/// Prunes ready transactions.
|
|
///
|
|
/// Used to clear the pool from transactions that were part of recently imported block.
|
|
/// To perform pruning we need the tags that each extrinsic provides and to avoid calling
|
|
/// into runtime too often we first lookup all extrinsics that are in the pool and get
|
|
/// their provided tags from there. Otherwise we query the runtime at the `parent` block.
|
|
pub async fn prune(
|
|
&self,
|
|
at: &BlockId<B::Block>,
|
|
parent: &BlockId<B::Block>,
|
|
extrinsics: &[ExtrinsicFor<B>],
|
|
) -> Result<(), B::Error> {
|
|
log::debug!(
|
|
target: "txpool",
|
|
"Starting pruning of block {:?} (extrinsics: {})",
|
|
at,
|
|
extrinsics.len()
|
|
);
|
|
// Get details of all extrinsics that are already in the pool
|
|
let in_pool_hashes = extrinsics.iter().map(|extrinsic| self.hash_of(extrinsic)).collect::<Vec<_>>();
|
|
let in_pool_tags = self.validated_pool.extrinsics_tags(&in_pool_hashes);
|
|
|
|
// Zip the ones from the pool with the full list (we get pairs `(Extrinsic, Option<Vec<Tag>>)`)
|
|
let all = extrinsics.iter().zip(in_pool_tags.into_iter());
|
|
|
|
let mut future_tags = Vec::new();
|
|
for (extrinsic, in_pool_tags) in all {
|
|
match in_pool_tags {
|
|
// reuse the tags for extrinsics that were found in the pool
|
|
Some(tags) => future_tags.extend(tags),
|
|
// if it's not found in the pool query the runtime at parent block
|
|
// to get validity info and tags that the extrinsic provides.
|
|
None => {
|
|
let validity = self.validated_pool.api()
|
|
.validate_transaction(parent, TransactionSource::InBlock, extrinsic.clone())
|
|
.await;
|
|
|
|
if let Ok(Ok(validity)) = validity {
|
|
future_tags.extend(validity.provides);
|
|
}
|
|
},
|
|
}
|
|
}
|
|
|
|
self.prune_tags(at, future_tags, in_pool_hashes).await
|
|
}
|
|
|
|
/// Prunes ready transactions that provide given list of tags.
|
|
///
|
|
/// Given tags are assumed to be always provided now, so all transactions
|
|
/// in the Future Queue that require that particular tag (and have other
|
|
/// requirements satisfied) are promoted to Ready Queue.
|
|
///
|
|
/// Moreover for each provided tag we remove transactions in the pool that:
|
|
/// 1. Provide that tag directly
|
|
/// 2. Are a dependency of pruned transaction.
|
|
///
|
|
/// Returns transactions that have been removed from the pool and must be reverified
|
|
/// before reinserting to the pool.
|
|
///
|
|
/// By removing predecessor transactions as well we might actually end up
|
|
/// pruning too much, so all removed transactions are reverified against
|
|
/// the runtime (`validate_transaction`) to make sure they are invalid.
|
|
///
|
|
/// However we avoid revalidating transactions that are contained within
|
|
/// the second parameter of `known_imported_hashes`. These transactions
|
|
/// (if pruned) are not revalidated and become temporarily banned to
|
|
/// prevent importing them in the (near) future.
|
|
pub async fn prune_tags(
|
|
&self,
|
|
at: &BlockId<B::Block>,
|
|
tags: impl IntoIterator<Item=Tag>,
|
|
known_imported_hashes: impl IntoIterator<Item=ExtrinsicHash<B>> + Clone,
|
|
) -> Result<(), B::Error> {
|
|
log::debug!(target: "txpool", "Pruning at {:?}", at);
|
|
// Prune all transactions that provide given tags
|
|
let prune_status = match self.validated_pool.prune_tags(tags) {
|
|
Ok(prune_status) => prune_status,
|
|
Err(e) => return Err(e),
|
|
};
|
|
|
|
// Make sure that we don't revalidate extrinsics that were part of the recently
|
|
// imported block. This is especially important for UTXO-like chains cause the
|
|
// inputs are pruned so such transaction would go to future again.
|
|
self.validated_pool.ban(&Instant::now(), known_imported_hashes.clone().into_iter());
|
|
|
|
// Try to re-validate pruned transactions since some of them might be still valid.
|
|
// note that `known_imported_hashes` will be rejected here due to temporary ban.
|
|
let pruned_hashes = prune_status.pruned
|
|
.iter()
|
|
.map(|tx| tx.hash.clone()).collect::<Vec<_>>();
|
|
let pruned_transactions = prune_status.pruned
|
|
.into_iter()
|
|
.map(|tx| (tx.source, tx.data.clone()));
|
|
|
|
let reverified_transactions = self.verify(
|
|
at,
|
|
pruned_transactions,
|
|
CheckBannedBeforeVerify::Yes,
|
|
).await?;
|
|
|
|
log::trace!(target: "txpool", "Pruning at {:?}. Resubmitting transactions.", at);
|
|
// And finally - submit reverified transactions back to the pool
|
|
|
|
self.validated_pool.resubmit_pruned(
|
|
&at,
|
|
known_imported_hashes,
|
|
pruned_hashes,
|
|
reverified_transactions.into_iter().map(|(_, xt)| xt).collect(),
|
|
)
|
|
}
|
|
|
|
/// Returns transaction hash
|
|
pub fn hash_of(&self, xt: &ExtrinsicFor<B>) -> ExtrinsicHash<B> {
|
|
self.validated_pool.api().hash_and_length(xt).0
|
|
}
|
|
|
|
/// Resolves block number by id.
|
|
fn resolve_block_number(&self, at: &BlockId<B::Block>) -> Result<NumberFor<B>, B::Error> {
|
|
self.validated_pool.api().block_id_to_number(at)
|
|
.and_then(|number| number.ok_or_else(||
|
|
error::Error::InvalidBlockId(format!("{:?}", at)).into()))
|
|
}
|
|
|
|
/// Returns future that validates a bunch of transactions at given block.
|
|
async fn verify(
|
|
&self,
|
|
at: &BlockId<B::Block>,
|
|
xts: impl IntoIterator<Item=(TransactionSource, ExtrinsicFor<B>)>,
|
|
check: CheckBannedBeforeVerify,
|
|
) -> Result<HashMap<ExtrinsicHash<B>, ValidatedTransactionFor<B>>, B::Error> {
|
|
// we need a block number to compute tx validity
|
|
let block_number = self.resolve_block_number(at)?;
|
|
|
|
let res = futures::future::join_all(
|
|
xts.into_iter()
|
|
.map(|(source, xt)| self.verify_one(at, block_number, source, xt, check))
|
|
).await.into_iter().collect::<HashMap<_, _>>();
|
|
|
|
Ok(res)
|
|
}
|
|
|
|
/// Returns future that validates single transaction at given block.
|
|
async fn verify_one(
|
|
&self,
|
|
block_id: &BlockId<B::Block>,
|
|
block_number: NumberFor<B>,
|
|
source: TransactionSource,
|
|
xt: ExtrinsicFor<B>,
|
|
check: CheckBannedBeforeVerify,
|
|
) -> (ExtrinsicHash<B>, ValidatedTransactionFor<B>) {
|
|
let (hash, bytes) = self.validated_pool.api().hash_and_length(&xt);
|
|
|
|
let ignore_banned = matches!(check, CheckBannedBeforeVerify::No);
|
|
if let Err(err) = self.validated_pool.check_is_known(&hash, ignore_banned) {
|
|
return (hash.clone(), ValidatedTransaction::Invalid(hash, err.into()))
|
|
}
|
|
|
|
let validation_result = self.validated_pool.api().validate_transaction(
|
|
block_id,
|
|
source,
|
|
xt.clone(),
|
|
).await;
|
|
|
|
let status = match validation_result {
|
|
Ok(status) => status,
|
|
Err(e) => return (hash.clone(), ValidatedTransaction::Invalid(hash, e)),
|
|
};
|
|
|
|
let validity = match status {
|
|
Ok(validity) => {
|
|
if validity.provides.is_empty() {
|
|
ValidatedTransaction::Invalid(hash.clone(), error::Error::NoTagsProvided.into())
|
|
} else {
|
|
ValidatedTransaction::valid_at(
|
|
block_number.saturated_into::<u64>(),
|
|
hash.clone(),
|
|
source,
|
|
xt,
|
|
bytes,
|
|
validity,
|
|
)
|
|
}
|
|
},
|
|
Err(TransactionValidityError::Invalid(e)) =>
|
|
ValidatedTransaction::Invalid(hash.clone(), error::Error::InvalidTransaction(e).into()),
|
|
Err(TransactionValidityError::Unknown(e)) =>
|
|
ValidatedTransaction::Unknown(hash.clone(), error::Error::UnknownTransaction(e).into()),
|
|
};
|
|
|
|
(hash, validity)
|
|
}
|
|
|
|
/// get a reference to the underlying validated pool.
|
|
pub fn validated_pool(&self) -> &ValidatedPool<B> {
|
|
&self.validated_pool
|
|
}
|
|
}
|
|
|
|
impl<B: ChainApi> Clone for Pool<B> {
|
|
fn clone(&self) -> Self {
|
|
Self {
|
|
validated_pool: self.validated_pool.clone(),
|
|
}
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use std::collections::{HashMap, HashSet};
|
|
use parking_lot::Mutex;
|
|
use futures::executor::block_on;
|
|
use super::*;
|
|
use sp_transaction_pool::TransactionStatus;
|
|
use sp_runtime::{
|
|
traits::Hash,
|
|
transaction_validity::{ValidTransaction, InvalidTransaction, TransactionSource},
|
|
};
|
|
use codec::Encode;
|
|
use substrate_test_runtime::{Block, Extrinsic, Transfer, H256, AccountId, Hashing};
|
|
use assert_matches::assert_matches;
|
|
use wasm_timer::Instant;
|
|
use crate::base_pool::Limit;
|
|
|
|
const INVALID_NONCE: u64 = 254;
|
|
const SOURCE: TransactionSource = TransactionSource::External;
|
|
|
|
#[derive(Clone, Debug, Default)]
|
|
struct TestApi {
|
|
delay: Arc<Mutex<Option<std::sync::mpsc::Receiver<()>>>>,
|
|
invalidate: Arc<Mutex<HashSet<H256>>>,
|
|
clear_requirements: Arc<Mutex<HashSet<H256>>>,
|
|
add_requirements: Arc<Mutex<HashSet<H256>>>,
|
|
}
|
|
|
|
impl ChainApi for TestApi {
|
|
type Block = Block;
|
|
type Error = error::Error;
|
|
type ValidationFuture = futures::future::Ready<error::Result<TransactionValidity>>;
|
|
type BodyFuture = futures::future::Ready<error::Result<Option<Vec<Extrinsic>>>>;
|
|
|
|
/// Verify extrinsic at given block.
|
|
fn validate_transaction(
|
|
&self,
|
|
at: &BlockId<Self::Block>,
|
|
_source: TransactionSource,
|
|
uxt: ExtrinsicFor<Self>,
|
|
) -> Self::ValidationFuture {
|
|
let hash = self.hash_and_length(&uxt).0;
|
|
let block_number = self.block_id_to_number(at).unwrap().unwrap();
|
|
let nonce = uxt.transfer().nonce;
|
|
|
|
// This is used to control the test flow.
|
|
if nonce > 0 {
|
|
let opt = self.delay.lock().take();
|
|
if let Some(delay) = opt {
|
|
if delay.recv().is_err() {
|
|
println!("Error waiting for delay!");
|
|
}
|
|
}
|
|
}
|
|
|
|
if self.invalidate.lock().contains(&hash) {
|
|
return futures::future::ready(Ok(InvalidTransaction::Custom(0).into()));
|
|
}
|
|
|
|
futures::future::ready(if nonce < block_number {
|
|
Ok(InvalidTransaction::Stale.into())
|
|
} else {
|
|
let mut transaction = ValidTransaction {
|
|
priority: 4,
|
|
requires: if nonce > block_number { vec![vec![nonce as u8 - 1]] } else { vec![] },
|
|
provides: if nonce == INVALID_NONCE { vec![] } else { vec![vec![nonce as u8]] },
|
|
longevity: 3,
|
|
propagate: true,
|
|
};
|
|
|
|
if self.clear_requirements.lock().contains(&hash) {
|
|
transaction.requires.clear();
|
|
}
|
|
|
|
if self.add_requirements.lock().contains(&hash) {
|
|
transaction.requires.push(vec![128]);
|
|
}
|
|
|
|
Ok(Ok(transaction))
|
|
})
|
|
}
|
|
|
|
/// Returns a block number given the block id.
|
|
fn block_id_to_number(
|
|
&self,
|
|
at: &BlockId<Self::Block>,
|
|
) -> Result<Option<NumberFor<Self>>, Self::Error> {
|
|
Ok(match at {
|
|
BlockId::Number(num) => Some(*num),
|
|
BlockId::Hash(_) => None,
|
|
})
|
|
}
|
|
|
|
/// Returns a block hash given the block id.
|
|
fn block_id_to_hash(
|
|
&self,
|
|
at: &BlockId<Self::Block>,
|
|
) -> Result<Option<<Self::Block as BlockT>::Hash>, Self::Error> {
|
|
Ok(match at {
|
|
BlockId::Number(num) => Some(H256::from_low_u64_be(*num)).into(),
|
|
BlockId::Hash(_) => None,
|
|
})
|
|
}
|
|
|
|
/// Hash the extrinsic.
|
|
fn hash_and_length(&self, uxt: &ExtrinsicFor<Self>) -> (BlockHash<Self>, usize) {
|
|
let encoded = uxt.encode();
|
|
let len = encoded.len();
|
|
(Hashing::hash(&encoded), len)
|
|
}
|
|
|
|
fn block_body(&self, _id: &BlockId<Self::Block>) -> Self::BodyFuture {
|
|
futures::future::ready(Ok(None))
|
|
}
|
|
}
|
|
|
|
fn uxt(transfer: Transfer) -> Extrinsic {
|
|
Extrinsic::Transfer {
|
|
transfer,
|
|
signature: Default::default(),
|
|
exhaust_resources_when_not_first: false,
|
|
}
|
|
}
|
|
|
|
fn pool() -> Pool<TestApi> {
|
|
Pool::new(Default::default(), TestApi::default().into())
|
|
}
|
|
|
|
#[test]
|
|
fn should_validate_and_import_transaction() {
|
|
// given
|
|
let pool = pool();
|
|
|
|
// when
|
|
let hash = block_on(pool.submit_one(&BlockId::Number(0), SOURCE, uxt(Transfer {
|
|
from: AccountId::from_h256(H256::from_low_u64_be(1)),
|
|
to: AccountId::from_h256(H256::from_low_u64_be(2)),
|
|
amount: 5,
|
|
nonce: 0,
|
|
}))).unwrap();
|
|
|
|
// then
|
|
assert_eq!(pool.validated_pool().ready().map(|v| v.hash).collect::<Vec<_>>(), vec![hash]);
|
|
}
|
|
|
|
#[test]
|
|
fn should_reject_if_temporarily_banned() {
|
|
// given
|
|
let pool = pool();
|
|
let uxt = uxt(Transfer {
|
|
from: AccountId::from_h256(H256::from_low_u64_be(1)),
|
|
to: AccountId::from_h256(H256::from_low_u64_be(2)),
|
|
amount: 5,
|
|
nonce: 0,
|
|
});
|
|
|
|
// when
|
|
pool.validated_pool.rotator().ban(&Instant::now(), vec![pool.hash_of(&uxt)]);
|
|
let res = block_on(pool.submit_one(&BlockId::Number(0), SOURCE, uxt));
|
|
assert_eq!(pool.validated_pool().status().ready, 0);
|
|
assert_eq!(pool.validated_pool().status().future, 0);
|
|
|
|
// then
|
|
assert_matches!(res.unwrap_err(), error::Error::TemporarilyBanned);
|
|
}
|
|
|
|
#[test]
|
|
fn should_notify_about_pool_events() {
|
|
let (stream, hash0, hash1) = {
|
|
// given
|
|
let pool = pool();
|
|
let stream = pool.validated_pool().import_notification_stream();
|
|
|
|
// when
|
|
let hash0 = block_on(pool.submit_one(&BlockId::Number(0), SOURCE, uxt(Transfer {
|
|
from: AccountId::from_h256(H256::from_low_u64_be(1)),
|
|
to: AccountId::from_h256(H256::from_low_u64_be(2)),
|
|
amount: 5,
|
|
nonce: 0,
|
|
}))).unwrap();
|
|
let hash1 = block_on(pool.submit_one(&BlockId::Number(0), SOURCE, uxt(Transfer {
|
|
from: AccountId::from_h256(H256::from_low_u64_be(1)),
|
|
to: AccountId::from_h256(H256::from_low_u64_be(2)),
|
|
amount: 5,
|
|
nonce: 1,
|
|
}))).unwrap();
|
|
// future doesn't count
|
|
let _hash = block_on(pool.submit_one(&BlockId::Number(0), SOURCE, uxt(Transfer {
|
|
from: AccountId::from_h256(H256::from_low_u64_be(1)),
|
|
to: AccountId::from_h256(H256::from_low_u64_be(2)),
|
|
amount: 5,
|
|
nonce: 3,
|
|
}))).unwrap();
|
|
|
|
assert_eq!(pool.validated_pool().status().ready, 2);
|
|
assert_eq!(pool.validated_pool().status().future, 1);
|
|
|
|
(stream, hash0, hash1)
|
|
};
|
|
|
|
// then
|
|
let mut it = futures::executor::block_on_stream(stream);
|
|
assert_eq!(it.next(), Some(hash0));
|
|
assert_eq!(it.next(), Some(hash1));
|
|
assert_eq!(it.next(), None);
|
|
}
|
|
|
|
#[test]
|
|
fn should_clear_stale_transactions() {
|
|
// given
|
|
let pool = pool();
|
|
let hash1 = block_on(pool.submit_one(&BlockId::Number(0), SOURCE, uxt(Transfer {
|
|
from: AccountId::from_h256(H256::from_low_u64_be(1)),
|
|
to: AccountId::from_h256(H256::from_low_u64_be(2)),
|
|
amount: 5,
|
|
nonce: 0,
|
|
}))).unwrap();
|
|
let hash2 = block_on(pool.submit_one(&BlockId::Number(0), SOURCE, uxt(Transfer {
|
|
from: AccountId::from_h256(H256::from_low_u64_be(1)),
|
|
to: AccountId::from_h256(H256::from_low_u64_be(2)),
|
|
amount: 5,
|
|
nonce: 1,
|
|
}))).unwrap();
|
|
let hash3 = block_on(pool.submit_one(&BlockId::Number(0), SOURCE, uxt(Transfer {
|
|
from: AccountId::from_h256(H256::from_low_u64_be(1)),
|
|
to: AccountId::from_h256(H256::from_low_u64_be(2)),
|
|
amount: 5,
|
|
nonce: 3,
|
|
}))).unwrap();
|
|
|
|
// when
|
|
pool.validated_pool.clear_stale(&BlockId::Number(5)).unwrap();
|
|
|
|
// then
|
|
assert_eq!(pool.validated_pool().ready().count(), 0);
|
|
assert_eq!(pool.validated_pool().status().future, 0);
|
|
assert_eq!(pool.validated_pool().status().ready, 0);
|
|
// make sure they are temporarily banned as well
|
|
assert!(pool.validated_pool.rotator().is_banned(&hash1));
|
|
assert!(pool.validated_pool.rotator().is_banned(&hash2));
|
|
assert!(pool.validated_pool.rotator().is_banned(&hash3));
|
|
}
|
|
|
|
#[test]
|
|
fn should_ban_mined_transactions() {
|
|
// given
|
|
let pool = pool();
|
|
let hash1 = block_on(pool.submit_one(&BlockId::Number(0), SOURCE, uxt(Transfer {
|
|
from: AccountId::from_h256(H256::from_low_u64_be(1)),
|
|
to: AccountId::from_h256(H256::from_low_u64_be(2)),
|
|
amount: 5,
|
|
nonce: 0,
|
|
}))).unwrap();
|
|
|
|
// when
|
|
block_on(pool.prune_tags(&BlockId::Number(1), vec![vec![0]], vec![hash1.clone()])).unwrap();
|
|
|
|
// then
|
|
assert!(pool.validated_pool.rotator().is_banned(&hash1));
|
|
}
|
|
|
|
#[test]
|
|
fn should_limit_futures() {
|
|
// given
|
|
let limit = Limit {
|
|
count: 100,
|
|
total_bytes: 200,
|
|
};
|
|
let pool = Pool::new(Options {
|
|
ready: limit.clone(),
|
|
future: limit.clone(),
|
|
..Default::default()
|
|
}, TestApi::default().into());
|
|
|
|
let hash1 = block_on(pool.submit_one(&BlockId::Number(0), SOURCE, uxt(Transfer {
|
|
from: AccountId::from_h256(H256::from_low_u64_be(1)),
|
|
to: AccountId::from_h256(H256::from_low_u64_be(2)),
|
|
amount: 5,
|
|
nonce: 1,
|
|
}))).unwrap();
|
|
assert_eq!(pool.validated_pool().status().future, 1);
|
|
|
|
// when
|
|
let hash2 = block_on(pool.submit_one(&BlockId::Number(0), SOURCE, uxt(Transfer {
|
|
from: AccountId::from_h256(H256::from_low_u64_be(2)),
|
|
to: AccountId::from_h256(H256::from_low_u64_be(2)),
|
|
amount: 5,
|
|
nonce: 10,
|
|
}))).unwrap();
|
|
|
|
// then
|
|
assert_eq!(pool.validated_pool().status().future, 1);
|
|
assert!(pool.validated_pool.rotator().is_banned(&hash1));
|
|
assert!(!pool.validated_pool.rotator().is_banned(&hash2));
|
|
}
|
|
|
|
#[test]
|
|
fn should_error_if_reject_immediately() {
|
|
// given
|
|
let limit = Limit {
|
|
count: 100,
|
|
total_bytes: 10,
|
|
};
|
|
let pool = Pool::new(Options {
|
|
ready: limit.clone(),
|
|
future: limit.clone(),
|
|
..Default::default()
|
|
}, TestApi::default().into());
|
|
|
|
// when
|
|
block_on(pool.submit_one(&BlockId::Number(0), SOURCE, uxt(Transfer {
|
|
from: AccountId::from_h256(H256::from_low_u64_be(1)),
|
|
to: AccountId::from_h256(H256::from_low_u64_be(2)),
|
|
amount: 5,
|
|
nonce: 1,
|
|
}))).unwrap_err();
|
|
|
|
// then
|
|
assert_eq!(pool.validated_pool().status().ready, 0);
|
|
assert_eq!(pool.validated_pool().status().future, 0);
|
|
}
|
|
|
|
#[test]
|
|
fn should_reject_transactions_with_no_provides() {
|
|
// given
|
|
let pool = pool();
|
|
|
|
// when
|
|
let err = block_on(pool.submit_one(&BlockId::Number(0), SOURCE, uxt(Transfer {
|
|
from: AccountId::from_h256(H256::from_low_u64_be(1)),
|
|
to: AccountId::from_h256(H256::from_low_u64_be(2)),
|
|
amount: 5,
|
|
nonce: INVALID_NONCE,
|
|
}))).unwrap_err();
|
|
|
|
// then
|
|
assert_eq!(pool.validated_pool().status().ready, 0);
|
|
assert_eq!(pool.validated_pool().status().future, 0);
|
|
assert_matches!(err, error::Error::NoTagsProvided);
|
|
}
|
|
|
|
mod listener {
|
|
use super::*;
|
|
|
|
#[test]
|
|
fn should_trigger_ready_and_finalized() {
|
|
// given
|
|
let pool = pool();
|
|
let watcher = block_on(pool.submit_and_watch(&BlockId::Number(0), SOURCE, uxt(Transfer {
|
|
from: AccountId::from_h256(H256::from_low_u64_be(1)),
|
|
to: AccountId::from_h256(H256::from_low_u64_be(2)),
|
|
amount: 5,
|
|
nonce: 0,
|
|
}))).unwrap();
|
|
assert_eq!(pool.validated_pool().status().ready, 1);
|
|
assert_eq!(pool.validated_pool().status().future, 0);
|
|
|
|
// when
|
|
block_on(pool.prune_tags(&BlockId::Number(2), vec![vec![0u8]], vec![])).unwrap();
|
|
assert_eq!(pool.validated_pool().status().ready, 0);
|
|
assert_eq!(pool.validated_pool().status().future, 0);
|
|
|
|
// then
|
|
let mut stream = futures::executor::block_on_stream(watcher.into_stream());
|
|
assert_eq!(stream.next(), Some(TransactionStatus::Ready));
|
|
assert_eq!(
|
|
stream.next(),
|
|
Some(TransactionStatus::InBlock(H256::from_low_u64_be(2).into())),
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn should_trigger_ready_and_finalized_when_pruning_via_hash() {
|
|
// given
|
|
let pool = pool();
|
|
let watcher = block_on(pool.submit_and_watch(&BlockId::Number(0), SOURCE, uxt(Transfer {
|
|
from: AccountId::from_h256(H256::from_low_u64_be(1)),
|
|
to: AccountId::from_h256(H256::from_low_u64_be(2)),
|
|
amount: 5,
|
|
nonce: 0,
|
|
}))).unwrap();
|
|
assert_eq!(pool.validated_pool().status().ready, 1);
|
|
assert_eq!(pool.validated_pool().status().future, 0);
|
|
|
|
// when
|
|
block_on(
|
|
pool.prune_tags(&BlockId::Number(2), vec![vec![0u8]], vec![watcher.hash().clone()]),
|
|
).unwrap();
|
|
assert_eq!(pool.validated_pool().status().ready, 0);
|
|
assert_eq!(pool.validated_pool().status().future, 0);
|
|
|
|
// then
|
|
let mut stream = futures::executor::block_on_stream(watcher.into_stream());
|
|
assert_eq!(stream.next(), Some(TransactionStatus::Ready));
|
|
assert_eq!(
|
|
stream.next(),
|
|
Some(TransactionStatus::InBlock(H256::from_low_u64_be(2).into())),
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn should_trigger_future_and_ready_after_promoted() {
|
|
// given
|
|
let pool = pool();
|
|
let watcher = block_on(pool.submit_and_watch(&BlockId::Number(0), SOURCE, uxt(Transfer {
|
|
from: AccountId::from_h256(H256::from_low_u64_be(1)),
|
|
to: AccountId::from_h256(H256::from_low_u64_be(2)),
|
|
amount: 5,
|
|
nonce: 1,
|
|
}))).unwrap();
|
|
assert_eq!(pool.validated_pool().status().ready, 0);
|
|
assert_eq!(pool.validated_pool().status().future, 1);
|
|
|
|
// when
|
|
block_on(pool.submit_one(&BlockId::Number(0), SOURCE, uxt(Transfer {
|
|
from: AccountId::from_h256(H256::from_low_u64_be(1)),
|
|
to: AccountId::from_h256(H256::from_low_u64_be(2)),
|
|
amount: 5,
|
|
nonce: 0,
|
|
}))).unwrap();
|
|
assert_eq!(pool.validated_pool().status().ready, 2);
|
|
|
|
// then
|
|
let mut stream = futures::executor::block_on_stream(watcher.into_stream());
|
|
assert_eq!(stream.next(), Some(TransactionStatus::Future));
|
|
assert_eq!(stream.next(), Some(TransactionStatus::Ready));
|
|
}
|
|
|
|
#[test]
|
|
fn should_trigger_invalid_and_ban() {
|
|
// given
|
|
let pool = pool();
|
|
let uxt = uxt(Transfer {
|
|
from: AccountId::from_h256(H256::from_low_u64_be(1)),
|
|
to: AccountId::from_h256(H256::from_low_u64_be(2)),
|
|
amount: 5,
|
|
nonce: 0,
|
|
});
|
|
let watcher = block_on(pool.submit_and_watch(&BlockId::Number(0), SOURCE, uxt)).unwrap();
|
|
assert_eq!(pool.validated_pool().status().ready, 1);
|
|
|
|
// when
|
|
pool.validated_pool.remove_invalid(&[*watcher.hash()]);
|
|
|
|
|
|
// then
|
|
let mut stream = futures::executor::block_on_stream(watcher.into_stream());
|
|
assert_eq!(stream.next(), Some(TransactionStatus::Ready));
|
|
assert_eq!(stream.next(), Some(TransactionStatus::Invalid));
|
|
assert_eq!(stream.next(), None);
|
|
}
|
|
|
|
#[test]
|
|
fn should_trigger_broadcasted() {
|
|
// given
|
|
let pool = pool();
|
|
let uxt = uxt(Transfer {
|
|
from: AccountId::from_h256(H256::from_low_u64_be(1)),
|
|
to: AccountId::from_h256(H256::from_low_u64_be(2)),
|
|
amount: 5,
|
|
nonce: 0,
|
|
});
|
|
let watcher = block_on(pool.submit_and_watch(&BlockId::Number(0), SOURCE, uxt)).unwrap();
|
|
assert_eq!(pool.validated_pool().status().ready, 1);
|
|
|
|
// when
|
|
let mut map = HashMap::new();
|
|
let peers = vec!["a".into(), "b".into(), "c".into()];
|
|
map.insert(*watcher.hash(), peers.clone());
|
|
pool.validated_pool().on_broadcasted(map);
|
|
|
|
|
|
// then
|
|
let mut stream = futures::executor::block_on_stream(watcher.into_stream());
|
|
assert_eq!(stream.next(), Some(TransactionStatus::Ready));
|
|
assert_eq!(stream.next(), Some(TransactionStatus::Broadcast(peers)));
|
|
}
|
|
|
|
#[test]
|
|
fn should_trigger_dropped() {
|
|
// given
|
|
let limit = Limit {
|
|
count: 1,
|
|
total_bytes: 1000,
|
|
};
|
|
let pool = Pool::new(Options {
|
|
ready: limit.clone(),
|
|
future: limit.clone(),
|
|
..Default::default()
|
|
}, TestApi::default().into());
|
|
|
|
let xt = uxt(Transfer {
|
|
from: AccountId::from_h256(H256::from_low_u64_be(1)),
|
|
to: AccountId::from_h256(H256::from_low_u64_be(2)),
|
|
amount: 5,
|
|
nonce: 0,
|
|
});
|
|
let watcher = block_on(pool.submit_and_watch(&BlockId::Number(0), SOURCE, xt)).unwrap();
|
|
assert_eq!(pool.validated_pool().status().ready, 1);
|
|
|
|
// when
|
|
let xt = uxt(Transfer {
|
|
from: AccountId::from_h256(H256::from_low_u64_be(2)),
|
|
to: AccountId::from_h256(H256::from_low_u64_be(1)),
|
|
amount: 4,
|
|
nonce: 1,
|
|
});
|
|
block_on(pool.submit_one(&BlockId::Number(1), SOURCE, xt)).unwrap();
|
|
assert_eq!(pool.validated_pool().status().ready, 1);
|
|
|
|
// then
|
|
let mut stream = futures::executor::block_on_stream(watcher.into_stream());
|
|
assert_eq!(stream.next(), Some(TransactionStatus::Ready));
|
|
assert_eq!(stream.next(), Some(TransactionStatus::Dropped));
|
|
}
|
|
|
|
#[test]
|
|
fn should_handle_pruning_in_the_middle_of_import() {
|
|
// given
|
|
let (ready, is_ready) = std::sync::mpsc::sync_channel(0);
|
|
let (tx, rx) = std::sync::mpsc::sync_channel(1);
|
|
let mut api = TestApi::default();
|
|
api.delay = Arc::new(Mutex::new(rx.into()));
|
|
let pool = Arc::new(Pool::new(Default::default(), api.into()));
|
|
|
|
// when
|
|
let xt = uxt(Transfer {
|
|
from: AccountId::from_h256(H256::from_low_u64_be(1)),
|
|
to: AccountId::from_h256(H256::from_low_u64_be(2)),
|
|
amount: 5,
|
|
nonce: 1,
|
|
});
|
|
|
|
// This transaction should go to future, since we use `nonce: 1`
|
|
let pool2 = pool.clone();
|
|
std::thread::spawn(move || {
|
|
block_on(pool2.submit_one(&BlockId::Number(0), SOURCE, xt)).unwrap();
|
|
ready.send(()).unwrap();
|
|
});
|
|
|
|
// But now before the previous one is imported we import
|
|
// the one that it depends on.
|
|
let xt = uxt(Transfer {
|
|
from: AccountId::from_h256(H256::from_low_u64_be(1)),
|
|
to: AccountId::from_h256(H256::from_low_u64_be(2)),
|
|
amount: 4,
|
|
nonce: 0,
|
|
});
|
|
// The tag the above transaction provides (TestApi is using just nonce as u8)
|
|
let provides = vec![0_u8];
|
|
block_on(pool.submit_one(&BlockId::Number(0), SOURCE, xt)).unwrap();
|
|
assert_eq!(pool.validated_pool().status().ready, 1);
|
|
|
|
// Now block import happens before the second transaction is able to finish verification.
|
|
block_on(pool.prune_tags(&BlockId::Number(1), vec![provides], vec![])).unwrap();
|
|
assert_eq!(pool.validated_pool().status().ready, 0);
|
|
|
|
|
|
// so when we release the verification of the previous one it will have
|
|
// something in `requires`, but should go to ready directly, since the previous transaction was imported
|
|
// correctly.
|
|
tx.send(()).unwrap();
|
|
|
|
// then
|
|
is_ready.recv().unwrap(); // wait for finish
|
|
assert_eq!(pool.validated_pool().status().ready, 1);
|
|
assert_eq!(pool.validated_pool().status().future, 0);
|
|
}
|
|
}
|
|
}
|