sc-transcation-pool refactor (#9228)

* Use TransactionPool trait

* sc-transaction-pool-primitives

* sc-transaction-pool-api

* TP

* bye sc_transaction_graph

* fix line widths

* fix import errors

* fix import errors

* fix import errors 🤦🏾‍♂️

* fix import errors 🤦🏾‍♂️🤦🏾‍♂️🤦🏾‍♂️

* remove sp-keyring
This commit is contained in:
Seun Lanlege
2021-07-08 14:33:34 +01:00
committed by GitHub
parent 721a3b9e9c
commit 2ae9d36758
65 changed files with 384 additions and 388 deletions
@@ -1,8 +0,0 @@
Generic Transaction Pool
The pool is based on dependency graph between transactions
and their priority.
The pool is able to return an iterator that traverses transaction
graph in the correct order taking into account priorities and dependencies.
License: GPL-3.0-or-later WITH Classpath-exception-2.0
@@ -1,192 +0,0 @@
// This file is part of Substrate.
// Copyright (C) 2018-2021 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use criterion::{criterion_group, criterion_main, Criterion};
use futures::{future::{ready, Ready}, executor::block_on};
use sc_transaction_graph::*;
use codec::Encode;
use substrate_test_runtime::{Block, Extrinsic, Transfer, H256, AccountId};
use sp_runtime::{
generic::BlockId, traits::Block as BlockT,
transaction_validity::{
ValidTransaction, InvalidTransaction, TransactionValidity, TransactionTag as Tag,
TransactionSource,
},
};
use sp_core::blake2_256;
#[derive(Clone, Debug, Default)]
struct TestApi {
nonce_dependant: bool,
}
impl TestApi {
fn new_dependant() -> Self {
TestApi { nonce_dependant: true }
}
}
fn to_tag(nonce: u64, from: AccountId) -> Tag {
let mut data = [0u8; 40];
data[..8].copy_from_slice(&nonce.to_le_bytes()[..]);
data[8..].copy_from_slice(&from.0[..]);
data.to_vec()
}
impl ChainApi for TestApi {
type Block = Block;
type Error = sp_transaction_pool::error::Error;
type ValidationFuture = Ready<sp_transaction_pool::error::Result<TransactionValidity>>;
type BodyFuture = Ready<sp_transaction_pool::error::Result<Option<Vec<Extrinsic>>>>;
fn validate_transaction(
&self,
at: &BlockId<Self::Block>,
_source: TransactionSource,
uxt: ExtrinsicFor<Self>,
) -> Self::ValidationFuture {
let nonce = uxt.transfer().nonce;
let from = uxt.transfer().from.clone();
match self.block_id_to_number(at) {
Ok(Some(num)) if num > 5 => {
return ready(
Ok(Err(InvalidTransaction::Stale.into()))
)
},
_ => {},
}
ready(
Ok(Ok(ValidTransaction {
priority: 4,
requires: if nonce > 1 && self.nonce_dependant {
vec![to_tag(nonce-1, from.clone())]
} else { vec![] },
provides: vec![to_tag(nonce, from)],
longevity: 10,
propagate: true,
}))
)
}
fn block_id_to_number(
&self,
at: &BlockId<Self::Block>,
) -> Result<Option<NumberFor<Self>>, Self::Error> {
Ok(match at {
BlockId::Number(num) => Some(*num),
BlockId::Hash(_) => None,
})
}
fn block_id_to_hash(
&self,
at: &BlockId<Self::Block>,
) -> Result<Option<BlockHash<Self>>, Self::Error> {
Ok(match at {
BlockId::Number(num) => Some(H256::from_low_u64_be(*num)).into(),
BlockId::Hash(_) => None,
})
}
fn hash_and_length(&self, uxt: &ExtrinsicFor<Self>) -> (H256, usize) {
let encoded = uxt.encode();
(blake2_256(&encoded).into(), encoded.len())
}
fn block_body(&self, _id: &BlockId<Self::Block>) -> Self::BodyFuture {
ready(Ok(None))
}
fn block_header(
&self,
_: &BlockId<Self::Block>,
) -> Result<Option<<Self::Block as BlockT>::Header>, Self::Error> {
Ok(None)
}
}
fn uxt(transfer: Transfer) -> Extrinsic {
Extrinsic::Transfer {
transfer,
signature: Default::default(),
exhaust_resources_when_not_first: false,
}
}
fn bench_configured(pool: Pool<TestApi>, number: u64) {
let source = TransactionSource::External;
let mut futures = Vec::new();
let mut tags = Vec::new();
for nonce in 1..=number {
let xt = uxt(Transfer {
from: AccountId::from_h256(H256::from_low_u64_be(1)),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 5,
nonce,
});
tags.push(to_tag(nonce, AccountId::from_h256(H256::from_low_u64_be(1))));
futures.push(pool.submit_one(&BlockId::Number(1), source, xt));
}
let res = block_on(futures::future::join_all(futures.into_iter()));
assert!(res.iter().all(Result::is_ok));
assert_eq!(pool.validated_pool().status().future, 0);
assert_eq!(pool.validated_pool().status().ready, number as usize);
// Prune all transactions.
let block_num = 6;
block_on(pool.prune_tags(
&BlockId::Number(block_num),
tags,
vec![],
)).expect("Prune failed");
// pool is empty
assert_eq!(pool.validated_pool().status().ready, 0);
assert_eq!(pool.validated_pool().status().future, 0);
}
fn benchmark_main(c: &mut Criterion) {
c.bench_function("sequential 50 tx", |b| {
b.iter(|| {
bench_configured(
Pool::new(Default::default(), true.into(), TestApi::new_dependant().into()),
50,
);
});
});
c.bench_function("random 100 tx", |b| {
b.iter(|| {
bench_configured(
Pool::new(Default::default(), true.into(), TestApi::default().into()),
100,
);
});
});
}
criterion_group!(benches, benchmark_main);
criterion_main!(benches);
File diff suppressed because it is too large Load Diff
@@ -1,276 +0,0 @@
// This file is part of Substrate.
// Copyright (C) 2018-2021 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use std::{
collections::{HashMap, HashSet},
fmt,
hash,
sync::Arc,
};
use sp_core::hexdisplay::HexDisplay;
use sp_runtime::transaction_validity::{
TransactionTag as Tag,
};
use wasm_timer::Instant;
use crate::base_pool::Transaction;
#[cfg_attr(not(target_os = "unknown"), derive(parity_util_mem::MallocSizeOf))]
/// Transaction with partially satisfied dependencies.
pub struct WaitingTransaction<Hash, Ex> {
/// Transaction details.
pub transaction: Arc<Transaction<Hash, Ex>>,
/// Tags that are required and have not been satisfied yet by other transactions in the pool.
pub missing_tags: HashSet<Tag>,
/// Time of import to the Future Queue.
pub imported_at: Instant,
}
impl<Hash: fmt::Debug, Ex: fmt::Debug> fmt::Debug for WaitingTransaction<Hash, Ex> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "WaitingTransaction {{ ")?;
write!(fmt, "imported_at: {:?}, ", self.imported_at)?;
write!(fmt, "transaction: {:?}, ", self.transaction)?;
write!(
fmt,
"missing_tags: {{{}}}",
self.missing_tags.iter()
.map(|tag| HexDisplay::from(tag).to_string()).collect::<Vec<_>>().join(", "),
)?;
write!(fmt, "}}")
}
}
impl<Hash, Ex> Clone for WaitingTransaction<Hash, Ex> {
fn clone(&self) -> Self {
Self {
transaction: self.transaction.clone(),
missing_tags: self.missing_tags.clone(),
imported_at: self.imported_at,
}
}
}
impl<Hash, Ex> WaitingTransaction<Hash, Ex> {
/// Creates a new `WaitingTransaction`.
///
/// Computes the set of missing tags based on the requirements and tags that
/// are provided by all transactions in the ready queue.
pub fn new(
transaction: Transaction<Hash, Ex>,
provided: &HashMap<Tag, Hash>,
recently_pruned: &[HashSet<Tag>],
) -> Self {
let missing_tags = transaction.requires
.iter()
.filter(|tag| {
// is true if the tag is already satisfied either via transaction in the pool
// or one that was recently included.
let is_provided = provided.contains_key(&**tag) || recently_pruned.iter().any(|x| x.contains(&**tag));
!is_provided
})
.cloned()
.collect();
Self {
transaction: Arc::new(transaction),
missing_tags,
imported_at: Instant::now(),
}
}
/// Marks the tag as satisfied.
pub fn satisfy_tag(&mut self, tag: &Tag) {
self.missing_tags.remove(tag);
}
/// Returns true if transaction has all requirements satisfied.
pub fn is_ready(&self) -> bool {
self.missing_tags.is_empty()
}
}
/// A pool of transactions that are not yet ready to be included in the block.
///
/// Contains transactions that are still awaiting for some other transactions that
/// could provide a tag that they require.
#[derive(Debug)]
#[cfg_attr(not(target_os = "unknown"), derive(parity_util_mem::MallocSizeOf))]
pub struct FutureTransactions<Hash: hash::Hash + Eq, Ex> {
/// tags that are not yet provided by any transaction and we await for them
wanted_tags: HashMap<Tag, HashSet<Hash>>,
/// Transactions waiting for a particular other transaction
waiting: HashMap<Hash, WaitingTransaction<Hash, Ex>>,
}
impl<Hash: hash::Hash + Eq, Ex> Default for FutureTransactions<Hash, Ex> {
fn default() -> Self {
Self {
wanted_tags: Default::default(),
waiting: Default::default(),
}
}
}
const WAITING_PROOF: &str = r"#
In import we always insert to `waiting` if we push to `wanted_tags`;
when removing from `waiting` we always clear `wanted_tags`;
every hash from `wanted_tags` is always present in `waiting`;
qed
#";
impl<Hash: hash::Hash + Eq + Clone, Ex> FutureTransactions<Hash, Ex> {
/// Import transaction to Future queue.
///
/// Only transactions that don't have all their tags satisfied should occupy
/// the Future queue.
/// As soon as required tags are provided by some other transactions that are ready
/// we should remove the transactions from here and move them to the Ready queue.
pub fn import(&mut self, tx: WaitingTransaction<Hash, Ex>) {
assert!(!tx.is_ready(), "Transaction is ready.");
assert!(!self.waiting.contains_key(&tx.transaction.hash), "Transaction is already imported.");
// Add all tags that are missing
for tag in &tx.missing_tags {
let entry = self.wanted_tags.entry(tag.clone()).or_insert_with(HashSet::new);
entry.insert(tx.transaction.hash.clone());
}
// Add the transaction to a by-hash waiting map
self.waiting.insert(tx.transaction.hash.clone(), tx);
}
/// Returns true if given hash is part of the queue.
pub fn contains(&self, hash: &Hash) -> bool {
self.waiting.contains_key(hash)
}
/// Returns a list of known transactions
pub fn by_hashes(&self, hashes: &[Hash]) -> Vec<Option<Arc<Transaction<Hash, Ex>>>> {
hashes.iter().map(|h| self.waiting.get(h).map(|x| x.transaction.clone())).collect()
}
/// Satisfies provided tags in transactions that are waiting for them.
///
/// Returns (and removes) transactions that became ready after their last tag got
/// satisfied and now we can remove them from Future and move to Ready queue.
pub fn satisfy_tags<T: AsRef<Tag>>(&mut self, tags: impl IntoIterator<Item=T>) -> Vec<WaitingTransaction<Hash, Ex>> {
let mut became_ready = vec![];
for tag in tags {
if let Some(hashes) = self.wanted_tags.remove(tag.as_ref()) {
for hash in hashes {
let is_ready = {
let tx = self.waiting.get_mut(&hash).expect(WAITING_PROOF);
tx.satisfy_tag(tag.as_ref());
tx.is_ready()
};
if is_ready {
let tx = self.waiting.remove(&hash).expect(WAITING_PROOF);
became_ready.push(tx);
}
}
}
}
became_ready
}
/// Removes transactions for given list of hashes.
///
/// Returns a list of actually removed transactions.
pub fn remove(&mut self, hashes: &[Hash]) -> Vec<Arc<Transaction<Hash, Ex>>> {
let mut removed = vec![];
for hash in hashes {
if let Some(waiting_tx) = self.waiting.remove(hash) {
// remove from wanted_tags as well
for tag in waiting_tx.missing_tags {
let remove = if let Some(wanted) = self.wanted_tags.get_mut(&tag) {
wanted.remove(hash);
wanted.is_empty()
} else { false };
if remove {
self.wanted_tags.remove(&tag);
}
}
// add to result
removed.push(waiting_tx.transaction)
}
}
removed
}
/// Fold a list of future transactions to compute a single value.
pub fn fold<R, F: FnMut(Option<R>, &WaitingTransaction<Hash, Ex>) -> Option<R>>(&mut self, f: F) -> Option<R> {
self.waiting
.values()
.fold(None, f)
}
/// Returns iterator over all future transactions
pub fn all(&self) -> impl Iterator<Item=&Transaction<Hash, Ex>> {
self.waiting.values().map(|waiting| &*waiting.transaction)
}
/// Removes and returns all future transactions.
pub fn clear(&mut self) -> Vec<Arc<Transaction<Hash, Ex>>> {
self.wanted_tags.clear();
self.waiting.drain().map(|(_, tx)| tx.transaction).collect()
}
/// Returns number of transactions in the Future queue.
pub fn len(&self) -> usize {
self.waiting.len()
}
/// Returns sum of encoding lengths of all transactions in this queue.
pub fn bytes(&self) -> usize {
self.waiting.values().fold(0, |acc, tx| acc + tx.transaction.bytes)
}
}
#[cfg(test)]
mod tests {
use super::*;
use sp_runtime::transaction_validity::TransactionSource;
#[test]
fn can_track_heap_size() {
let mut future = FutureTransactions::default();
future.import(WaitingTransaction {
transaction: Transaction {
data: vec![0u8; 1024],
bytes: 1,
hash: 1,
priority: 1,
valid_till: 2,
requires: vec![vec![1], vec![2]],
provides: vec![vec![3], vec![4]],
propagate: true,
source: TransactionSource::External,
}.into(),
missing_tags: vec![vec![1u8], vec![2u8]].into_iter().collect(),
imported_at: std::time::Instant::now(),
});
// data is at least 1024!
assert!(parity_util_mem::malloc_size(&future) > 1024);
}
}
@@ -1,44 +0,0 @@
// This file is part of Substrate.
// Copyright (C) 2018-2021 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Generic Transaction Pool
//!
//! The pool is based on dependency graph between transactions
//! and their priority.
//! The pool is able to return an iterator that traverses transaction
//! graph in the correct order taking into account priorities and dependencies.
#![warn(missing_docs)]
#![warn(unused_extern_crates)]
mod future;
mod listener;
mod pool;
mod ready;
mod rotator;
mod validated_pool;
mod tracked_map;
pub mod base_pool;
pub mod watcher;
pub use self::base_pool::Transaction;
pub use self::pool::{
BlockHash, ChainApi, EventStream, ExtrinsicFor, ExtrinsicHash, IsValidator, NumberFor, Options,
Pool, TransactionFor, ValidatedTransaction,
};
@@ -1,140 +0,0 @@
// This file is part of Substrate.
// Copyright (C) 2018-2021 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use std::{
collections::HashMap, hash, fmt::Debug,
};
use linked_hash_map::LinkedHashMap;
use serde::Serialize;
use log::{debug, trace};
use sp_runtime::traits;
use crate::{watcher, ChainApi, ExtrinsicHash, BlockHash};
/// Extrinsic pool default listener.
pub struct Listener<H: hash::Hash + Eq, C: ChainApi> {
watchers: HashMap<H, watcher::Sender<H, ExtrinsicHash<C>>>,
finality_watchers: LinkedHashMap<ExtrinsicHash<C>, Vec<H>>,
}
/// Maximum number of blocks awaiting finality at any time.
const MAX_FINALITY_WATCHERS: usize = 512;
impl<H: hash::Hash + Eq + Debug, C: ChainApi> Default for Listener<H, C> {
fn default() -> Self {
Self {
watchers: Default::default(),
finality_watchers: Default::default(),
}
}
}
impl<H: hash::Hash + traits::Member + Serialize, C: ChainApi> Listener<H, C> {
fn fire<F>(&mut self, hash: &H, fun: F) where F: FnOnce(&mut watcher::Sender<H, ExtrinsicHash<C>>) {
let clean = if let Some(h) = self.watchers.get_mut(hash) {
fun(h);
h.is_done()
} else {
false
};
if clean {
self.watchers.remove(hash);
}
}
/// Creates a new watcher for given verified extrinsic.
///
/// The watcher can be used to subscribe to life-cycle events of that extrinsic.
pub fn create_watcher(&mut self, hash: H) -> watcher::Watcher<H, ExtrinsicHash<C>> {
let sender = self.watchers.entry(hash.clone()).or_insert_with(watcher::Sender::default);
sender.new_watcher(hash)
}
/// Notify the listeners about extrinsic broadcast.
pub fn broadcasted(&mut self, hash: &H, peers: Vec<String>) {
trace!(target: "txpool", "[{:?}] Broadcasted", hash);
self.fire(hash, |watcher| watcher.broadcast(peers));
}
/// New transaction was added to the ready pool or promoted from the future pool.
pub fn ready(&mut self, tx: &H, old: Option<&H>) {
trace!(target: "txpool", "[{:?}] Ready (replaced with {:?})", tx, old);
self.fire(tx, |watcher| watcher.ready());
if let Some(old) = old {
self.fire(old, |watcher| watcher.usurped(tx.clone()));
}
}
/// New transaction was added to the future pool.
pub fn future(&mut self, tx: &H) {
trace!(target: "txpool", "[{:?}] Future", tx);
self.fire(tx, |watcher| watcher.future());
}
/// Transaction was dropped from the pool because of the limit.
pub fn dropped(&mut self, tx: &H, by: Option<&H>) {
trace!(target: "txpool", "[{:?}] Dropped (replaced with {:?})", tx, by);
self.fire(tx, |watcher| match by {
Some(t) => watcher.usurped(t.clone()),
None => watcher.dropped(),
})
}
/// Transaction was removed as invalid.
pub fn invalid(&mut self, tx: &H) {
debug!(target: "txpool", "[{:?}] Extrinsic invalid", tx);
self.fire(tx, |watcher| watcher.invalid());
}
/// Transaction was pruned from the pool.
pub fn pruned(&mut self, block_hash: BlockHash<C>, tx: &H) {
debug!(target: "txpool", "[{:?}] Pruned at {:?}", tx, block_hash);
self.fire(tx, |s| s.in_block(block_hash));
self.finality_watchers.entry(block_hash).or_insert(vec![]).push(tx.clone());
while self.finality_watchers.len() > MAX_FINALITY_WATCHERS {
if let Some((hash, txs)) = self.finality_watchers.pop_front() {
for tx in txs {
self.fire(&tx, |s| s.finality_timeout(hash));
}
}
}
}
/// The block this transaction was included in has been retracted.
pub fn retracted(&mut self, block_hash: BlockHash<C>) {
if let Some(hashes) = self.finality_watchers.remove(&block_hash) {
for hash in hashes {
self.fire(&hash, |s| s.retracted(block_hash))
}
}
}
/// Notify all watchers that transactions have been finalized
pub fn finalized(&mut self, block_hash: BlockHash<C>) {
if let Some(hashes) = self.finality_watchers.remove(&block_hash) {
for hash in hashes {
log::debug!(target: "txpool", "[{:?}] Sent finalization event (block {:?})", hash, block_hash);
self.fire(&hash, |s| s.finalized(block_hash))
}
}
}
}
File diff suppressed because it is too large Load Diff
@@ -1,743 +0,0 @@
// This file is part of Substrate.
// Copyright (C) 2018-2021 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use std::{
collections::{HashMap, HashSet, BTreeSet},
cmp,
hash,
sync::Arc,
};
use serde::Serialize;
use log::trace;
use sp_runtime::traits::Member;
use sp_runtime::transaction_validity::{
TransactionTag as Tag,
};
use sp_transaction_pool::error;
use crate::{
base_pool::Transaction,
future::WaitingTransaction,
tracked_map::{self, ReadOnlyTrackedMap, TrackedMap},
};
/// An in-pool transaction reference.
///
/// Should be cheap to clone.
#[derive(Debug, parity_util_mem::MallocSizeOf)]
pub struct TransactionRef<Hash, Ex> {
/// The actual transaction data.
pub transaction: Arc<Transaction<Hash, Ex>>,
/// Unique id when transaction was inserted into the pool.
pub insertion_id: u64,
}
impl<Hash, Ex> Clone for TransactionRef<Hash, Ex> {
fn clone(&self) -> Self {
Self {
transaction: self.transaction.clone(),
insertion_id: self.insertion_id,
}
}
}
impl<Hash, Ex> Ord for TransactionRef<Hash, Ex> {
fn cmp(&self, other: &Self) -> cmp::Ordering {
self.transaction.priority.cmp(&other.transaction.priority)
.then_with(|| other.transaction.valid_till.cmp(&self.transaction.valid_till))
.then_with(|| other.insertion_id.cmp(&self.insertion_id))
}
}
impl<Hash, Ex> PartialOrd for TransactionRef<Hash, Ex> {
fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
Some(self.cmp(other))
}
}
impl<Hash, Ex> PartialEq for TransactionRef<Hash, Ex> {
fn eq(&self, other: &Self) -> bool {
self.cmp(other) == cmp::Ordering::Equal
}
}
impl<Hash, Ex> Eq for TransactionRef<Hash, Ex> {}
#[derive(Debug, parity_util_mem::MallocSizeOf)]
pub struct ReadyTx<Hash, Ex> {
/// A reference to a transaction
pub transaction: TransactionRef<Hash, Ex>,
/// A list of transactions that get unlocked by this one
pub unlocks: Vec<Hash>,
/// How many required tags are provided inherently
///
/// Some transactions might be already pruned from the queue,
/// so when we compute ready set we may consider this transactions ready earlier.
pub requires_offset: usize,
}
impl<Hash: Clone, Ex> Clone for ReadyTx<Hash, Ex> {
fn clone(&self) -> Self {
Self {
transaction: self.transaction.clone(),
unlocks: self.unlocks.clone(),
requires_offset: self.requires_offset,
}
}
}
const HASH_READY: &str = r#"
Every time transaction is imported its hash is placed in `ready` map and tags in `provided_tags`;
Every time transaction is removed from the queue we remove the hash from `ready` map and from `provided_tags`;
Hence every hash retrieved from `provided_tags` is always present in `ready`;
qed
"#;
/// Validated transactions that are block ready with all their dependencies met.
#[derive(Debug, parity_util_mem::MallocSizeOf)]
pub struct ReadyTransactions<Hash: hash::Hash + Eq, Ex> {
/// Next free insertion id (used to indicate when a transaction was inserted into the pool).
insertion_id: u64,
/// tags that are provided by Ready transactions
/// (only a single transaction can provide a specific tag)
provided_tags: HashMap<Tag, Hash>,
/// Transactions that are ready (i.e. don't have any requirements external to the pool)
ready: TrackedMap<Hash, ReadyTx<Hash, Ex>>,
/// Best transactions that are ready to be included to the block without any other previous transaction.
best: BTreeSet<TransactionRef<Hash, Ex>>,
}
impl<Hash, Ex> tracked_map::Size for ReadyTx<Hash, Ex> {
fn size(&self) -> usize {
self.transaction.transaction.bytes
}
}
impl<Hash: hash::Hash + Eq, Ex> Default for ReadyTransactions<Hash, Ex> {
fn default() -> Self {
Self {
insertion_id: Default::default(),
provided_tags: Default::default(),
ready: Default::default(),
best: Default::default(),
}
}
}
impl<Hash: hash::Hash + Member + Serialize, Ex> ReadyTransactions<Hash, Ex> {
/// Borrows a map of tags that are provided by transactions in this queue.
pub fn provided_tags(&self) -> &HashMap<Tag, Hash> {
&self.provided_tags
}
/// Returns an iterator of ready transactions.
///
/// Transactions are returned in order:
/// 1. First by the dependencies:
/// - never return transaction that requires a tag, which was not provided by one of the previously returned transactions
/// 2. Then by priority:
/// - If there are two transactions with all requirements satisfied the one with higher priority goes first.
/// 3. Then by the ttl that's left
/// - transactions that are valid for a shorter time go first
/// 4. Lastly we sort by the time in the queue
/// - transactions that are longer in the queue go first
pub fn get(&self) -> impl Iterator<Item=Arc<Transaction<Hash, Ex>>> {
BestIterator {
all: self.ready.clone(),
best: self.best.clone(),
awaiting: Default::default(),
}
}
/// Imports transactions to the pool of ready transactions.
///
/// The transaction needs to have all tags satisfied (be ready) by transactions
/// that are in this queue.
/// Returns transactions that were replaced by the one imported.
pub fn import(
&mut self,
tx: WaitingTransaction<Hash, Ex>,
) -> error::Result<Vec<Arc<Transaction<Hash, Ex>>>> {
assert!(
tx.is_ready(),
"Only ready transactions can be imported. Missing: {:?}", tx.missing_tags
);
assert!(!self.ready.read().contains_key(&tx.transaction.hash), "Transaction is already imported.");
self.insertion_id += 1;
let insertion_id = self.insertion_id;
let hash = tx.transaction.hash.clone();
let transaction = tx.transaction;
let (replaced, unlocks) = self.replace_previous(&transaction)?;
let mut goes_to_best = true;
let mut ready = self.ready.write();
let mut requires_offset = 0;
// Add links to transactions that unlock the current one
for tag in &transaction.requires {
// Check if the transaction that satisfies the tag is still in the queue.
if let Some(other) = self.provided_tags.get(tag) {
let tx = ready.get_mut(other).expect(HASH_READY);
tx.unlocks.push(hash.clone());
// this transaction depends on some other, so it doesn't go to best directly.
goes_to_best = false;
} else {
requires_offset += 1;
}
}
// update provided_tags
// call to replace_previous guarantees that we will be overwriting
// only entries that have been removed.
for tag in &transaction.provides {
self.provided_tags.insert(tag.clone(), hash.clone());
}
let transaction = TransactionRef {
insertion_id,
transaction
};
// insert to best if it doesn't require any other transaction to be included before it
if goes_to_best {
self.best.insert(transaction.clone());
}
// insert to Ready
ready.insert(hash, ReadyTx {
transaction,
unlocks,
requires_offset,
});
Ok(replaced)
}
/// Fold a list of ready transactions to compute a single value.
pub fn fold<R, F: FnMut(Option<R>, &ReadyTx<Hash, Ex>) -> Option<R>>(&mut self, f: F) -> Option<R> {
self.ready
.read()
.values()
.fold(None, f)
}
/// Returns true if given transaction is part of the queue.
pub fn contains(&self, hash: &Hash) -> bool {
self.ready.read().contains_key(hash)
}
/// Retrieve transaction by hash
pub fn by_hash(&self, hash: &Hash) -> Option<Arc<Transaction<Hash, Ex>>> {
self.by_hashes(&[hash.clone()]).into_iter().next().unwrap_or(None)
}
/// Retrieve transactions by hash
pub fn by_hashes(&self, hashes: &[Hash]) -> Vec<Option<Arc<Transaction<Hash, Ex>>>> {
let ready = self.ready.read();
hashes.iter().map(|hash| {
ready.get(hash).map(|x| x.transaction.transaction.clone())
}).collect()
}
/// Removes a subtree of transactions from the ready pool.
///
/// NOTE removing a transaction will also cause a removal of all transactions that depend on that one
/// (i.e. the entire subgraph that this transaction is a start of will be removed).
/// All removed transactions are returned.
pub fn remove_subtree(&mut self, hashes: &[Hash]) -> Vec<Arc<Transaction<Hash, Ex>>> {
let to_remove = hashes.to_vec();
self.remove_subtree_with_tag_filter(to_remove, None)
}
/// Removes a subtrees of transactions trees starting from roots given in `to_remove`.
///
/// We proceed with a particular branch only if there is at least one provided tag
/// that is not part of `provides_tag_filter`. I.e. the filter contains tags
/// that will stay in the pool, so that we can early exit and avoid descending.
fn remove_subtree_with_tag_filter(
&mut self,
mut to_remove: Vec<Hash>,
provides_tag_filter: Option<HashSet<Tag>>,
) -> Vec<Arc<Transaction<Hash, Ex>>> {
let mut removed = vec![];
let mut ready = self.ready.write();
while let Some(hash) = to_remove.pop() {
if let Some(mut tx) = ready.remove(&hash) {
let invalidated = tx.transaction.transaction.provides
.iter()
.filter(|tag| provides_tag_filter
.as_ref()
.map(|filter| !filter.contains(&**tag))
.unwrap_or(true)
);
let mut removed_some_tags = false;
// remove entries from provided_tags
for tag in invalidated {
removed_some_tags = true;
self.provided_tags.remove(tag);
}
// remove from unlocks
for tag in &tx.transaction.transaction.requires {
if let Some(hash) = self.provided_tags.get(tag) {
if let Some(tx) = ready.get_mut(hash) {
remove_item(&mut tx.unlocks, &hash);
}
}
}
// remove from best
self.best.remove(&tx.transaction);
if removed_some_tags {
// remove all transactions that the current one unlocks
to_remove.append(&mut tx.unlocks);
}
// add to removed
trace!(target: "txpool", "[{:?}] Removed as part of the subtree.", hash);
removed.push(tx.transaction.transaction);
}
}
removed
}
/// Removes transactions that provide given tag.
///
/// All transactions that lead to a transaction, which provides this tag
/// are going to be removed from the queue, but no other transactions are touched -
/// i.e. all other subgraphs starting from given tag are still considered valid & ready.
pub fn prune_tags(&mut self, tag: Tag) -> Vec<Arc<Transaction<Hash, Ex>>> {
let mut removed = vec![];
let mut to_remove = vec![tag];
while let Some(tag) = to_remove.pop() {
let res = self.provided_tags.remove(&tag)
.and_then(|hash| self.ready.write().remove(&hash));
if let Some(tx) = res {
let unlocks = tx.unlocks;
// Make sure we remove it from best txs
self.best.remove(&tx.transaction);
let tx = tx.transaction.transaction;
// prune previous transactions as well
{
let hash = &tx.hash;
let mut ready = self.ready.write();
let mut find_previous = |tag| -> Option<Vec<Tag>> {
let prev_hash = self.provided_tags.get(tag)?;
let tx2 = ready.get_mut(&prev_hash)?;
remove_item(&mut tx2.unlocks, hash);
// We eagerly prune previous transactions as well.
// But it might not always be good.
// Possible edge case:
// - tx provides two tags
// - the second tag enables some subgraph we don't know of yet
// - we will prune the transaction
// - when we learn about the subgraph it will go to future
// - we will have to wait for re-propagation of that transaction
// Alternatively the caller may attempt to re-import these transactions.
if tx2.unlocks.is_empty() {
Some(tx2.transaction.transaction.provides.clone())
} else {
None
}
};
// find previous transactions
for tag in &tx.requires {
if let Some(mut tags_to_remove) = find_previous(tag) {
to_remove.append(&mut tags_to_remove);
}
}
}
// add the transactions that just got unlocked to `best`
for hash in unlocks {
if let Some(tx) = self.ready.write().get_mut(&hash) {
tx.requires_offset += 1;
// this transaction is ready
if tx.requires_offset == tx.transaction.transaction.requires.len() {
self.best.insert(tx.transaction.clone());
}
}
}
// we also need to remove all other tags that this transaction provides,
// but since all the hard work is done, we only clear the provided_tag -> hash
// mapping.
let current_tag = &tag;
for tag in &tx.provides {
let removed = self.provided_tags.remove(tag);
assert_eq!(
removed.as_ref(),
if current_tag == tag { None } else { Some(&tx.hash) },
"The pool contains exactly one transaction providing given tag; the removed transaction
claims to provide that tag, so it has to be mapped to it's hash; qed"
);
}
removed.push(tx);
}
}
removed
}
/// Checks if the transaction is providing the same tags as other transactions.
///
/// In case that's true it determines if the priority of transactions that
/// we are about to replace is lower than the priority of the replacement transaction.
/// We remove/replace old transactions in case they have lower priority.
///
/// In case replacement is successful returns a list of removed transactions
/// and a list of hashes that are still in pool and gets unlocked by the new transaction.
fn replace_previous(
&mut self,
tx: &Transaction<Hash, Ex>,
) -> error::Result<
(Vec<Arc<Transaction<Hash, Ex>>>, Vec<Hash>)
> {
let (to_remove, unlocks) = {
// check if we are replacing a transaction
let replace_hashes = tx.provides
.iter()
.filter_map(|tag| self.provided_tags.get(tag))
.collect::<HashSet<_>>();
// early exit if we are not replacing anything.
if replace_hashes.is_empty() {
return Ok((vec![], vec![]));
}
// now check if collective priority is lower than the replacement transaction.
let old_priority = {
let ready = self.ready.read();
replace_hashes
.iter()
.filter_map(|hash| ready.get(hash))
.fold(0u64, |total, tx|
total.saturating_add(tx.transaction.transaction.priority)
)
};
// bail - the transaction has too low priority to replace the old ones
if old_priority >= tx.priority {
return Err(error::Error::TooLowPriority { old: old_priority, new: tx.priority })
}
// construct a list of unlocked transactions
let unlocks = {
let ready = self.ready.read();
replace_hashes
.iter()
.filter_map(|hash| ready.get(hash))
.fold(vec![], |mut list, tx| {
list.extend(tx.unlocks.iter().cloned());
list
})
};
(
replace_hashes.into_iter().cloned().collect::<Vec<_>>(),
unlocks
)
};
let new_provides = tx.provides.iter().cloned().collect::<HashSet<_>>();
let removed = self.remove_subtree_with_tag_filter(to_remove, Some(new_provides));
Ok((
removed,
unlocks
))
}
/// Returns number of transactions in this queue.
pub fn len(&self) -> usize {
self.ready.len()
}
/// Returns sum of encoding lengths of all transactions in this queue.
pub fn bytes(&self) -> usize {
self.ready.bytes()
}
}
/// Iterator of ready transactions ordered by priority.
pub struct BestIterator<Hash, Ex> {
all: ReadOnlyTrackedMap<Hash, ReadyTx<Hash, Ex>>,
awaiting: HashMap<Hash, (usize, TransactionRef<Hash, Ex>)>,
best: BTreeSet<TransactionRef<Hash, Ex>>,
}
impl<Hash: hash::Hash + Member, Ex> BestIterator<Hash, Ex> {
/// Depending on number of satisfied requirements insert given ref
/// either to awaiting set or to best set.
fn best_or_awaiting(&mut self, satisfied: usize, tx_ref: TransactionRef<Hash, Ex>) {
if satisfied >= tx_ref.transaction.requires.len() {
// If we have satisfied all deps insert to best
self.best.insert(tx_ref);
} else {
// otherwise we're still awaiting for some deps
self.awaiting.insert(tx_ref.transaction.hash.clone(), (satisfied, tx_ref));
}
}
}
impl<Hash: hash::Hash + Member, Ex> Iterator for BestIterator<Hash, Ex> {
type Item = Arc<Transaction<Hash, Ex>>;
fn next(&mut self) -> Option<Self::Item> {
loop {
let best = self.best.iter().next_back()?.clone();
let best = self.best.take(&best)?;
let next = self.all.read().get(&best.transaction.hash).cloned();
let ready = match next {
Some(ready) => ready,
// The transaction is not in all, maybe it was removed in the meantime?
None => continue,
};
// Insert transactions that just got unlocked.
for hash in &ready.unlocks {
// first check local awaiting transactions
let res = if let Some((mut satisfied, tx_ref)) = self.awaiting.remove(hash) {
satisfied += 1;
Some((satisfied, tx_ref))
// then get from the pool
} else {
self.all.read().get(hash).map(|next| (next.requires_offset + 1, next.transaction.clone()))
};
if let Some((satisfied, tx_ref)) = res {
self.best_or_awaiting(satisfied, tx_ref)
}
}
return Some(best.transaction)
}
}
}
// See: https://github.com/rust-lang/rust/issues/40062
fn remove_item<T: PartialEq>(vec: &mut Vec<T>, item: &T) {
if let Some(idx) = vec.iter().position(|i| i == item) {
vec.swap_remove(idx);
}
}
#[cfg(test)]
mod tests {
use super::*;
use sp_runtime::transaction_validity::TransactionSource as Source;
fn tx(id: u8) -> Transaction<u64, Vec<u8>> {
Transaction {
data: vec![id],
bytes: 1,
hash: id as u64,
priority: 1,
valid_till: 2,
requires: vec![vec![1], vec![2]],
provides: vec![vec![3], vec![4]],
propagate: true,
source: Source::External,
}
}
fn import<H: hash::Hash + Eq + Member + Serialize, Ex>(
ready: &mut ReadyTransactions<H, Ex>,
tx: Transaction<H, Ex>
) -> error::Result<Vec<Arc<Transaction<H, Ex>>>> {
let x = WaitingTransaction::new(tx, ready.provided_tags(), &[]);
ready.import(x)
}
#[test]
fn should_replace_transaction_that_provides_the_same_tag() {
// given
let mut ready = ReadyTransactions::default();
let mut tx1 = tx(1);
tx1.requires.clear();
let mut tx2 = tx(2);
tx2.requires.clear();
tx2.provides = vec![vec![3]];
let mut tx3 = tx(3);
tx3.requires.clear();
tx3.provides = vec![vec![4]];
// when
import(&mut ready, tx2).unwrap();
import(&mut ready, tx3).unwrap();
assert_eq!(ready.get().count(), 2);
// too low priority
import(&mut ready, tx1.clone()).unwrap_err();
tx1.priority = 10;
import(&mut ready, tx1).unwrap();
// then
assert_eq!(ready.get().count(), 1);
}
#[test]
fn should_replace_multiple_transactions_correctly() {
// given
let mut ready = ReadyTransactions::default();
let mut tx0 = tx(0);
tx0.requires = vec![];
tx0.provides = vec![vec![0]];
let mut tx1 = tx(1);
tx1.requires = vec![];
tx1.provides = vec![vec![1]];
let mut tx2 = tx(2);
tx2.requires = vec![vec![0], vec![1]];
tx2.provides = vec![vec![2], vec![3]];
let mut tx3 = tx(3);
tx3.requires = vec![vec![2]];
tx3.provides = vec![vec![4]];
let mut tx4 = tx(4);
tx4.requires = vec![vec![3]];
tx4.provides = vec![vec![5]];
// replacement
let mut tx2_2 = tx(5);
tx2_2.requires = vec![vec![0], vec![1]];
tx2_2.provides = vec![vec![2]];
tx2_2.priority = 10;
for tx in vec![tx0, tx1, tx2, tx3, tx4] {
import(&mut ready, tx).unwrap();
}
assert_eq!(ready.get().count(), 5);
// when
import(&mut ready, tx2_2).unwrap();
// then
assert_eq!(ready.get().count(), 3);
}
#[test]
fn should_return_best_transactions_in_correct_order() {
// given
let mut ready = ReadyTransactions::default();
let mut tx1 = tx(1);
tx1.requires.clear();
let mut tx2 = tx(2);
tx2.requires = tx1.provides.clone();
tx2.provides = vec![vec![106]];
let mut tx3 = tx(3);
tx3.requires = vec![tx1.provides[0].clone(), vec![106]];
tx3.provides = vec![];
let mut tx4 = tx(4);
tx4.requires = vec![tx1.provides[0].clone()];
tx4.provides = vec![];
let tx5 = Transaction {
data: vec![5],
bytes: 1,
hash: 5,
priority: 1,
valid_till: u64::MAX, // use the max here for testing.
requires: vec![tx1.provides[0].clone()],
provides: vec![],
propagate: true,
source: Source::External,
};
// when
for tx in vec![tx1, tx2, tx3, tx4, tx5] {
import(&mut ready, tx).unwrap();
}
// then
assert_eq!(ready.best.len(), 1);
let mut it = ready.get().map(|tx| tx.data[0]);
assert_eq!(it.next(), Some(1));
assert_eq!(it.next(), Some(2));
assert_eq!(it.next(), Some(3));
assert_eq!(it.next(), Some(4));
assert_eq!(it.next(), Some(5));
assert_eq!(it.next(), None);
}
#[test]
fn can_report_heap_size() {
let mut ready = ReadyTransactions::default();
let tx = Transaction {
data: vec![5],
bytes: 1,
hash: 5,
priority: 1,
valid_till: u64::MAX, // use the max here for testing.
requires: vec![],
provides: vec![],
propagate: true,
source: Source::External,
};
import(&mut ready, tx).unwrap();
assert!(parity_util_mem::malloc_size(&ready) > 200);
}
#[test]
fn should_order_refs() {
let mut id = 1;
let mut with_priority = |priority, longevity| {
id += 1;
let mut tx = tx(id);
tx.priority = priority;
tx.valid_till = longevity;
tx
};
// higher priority = better
assert!(TransactionRef {
transaction: Arc::new(with_priority(3, 3)),
insertion_id: 1,
} > TransactionRef {
transaction: Arc::new(with_priority(2, 3)),
insertion_id: 2,
});
// lower validity = better
assert!(TransactionRef {
transaction: Arc::new(with_priority(3, 2)),
insertion_id: 1,
} > TransactionRef {
transaction: Arc::new(with_priority(3, 3)),
insertion_id: 2,
});
// lower insertion_id = better
assert!(TransactionRef {
transaction: Arc::new(with_priority(3, 3)),
insertion_id: 1,
} > TransactionRef {
transaction: Arc::new(with_priority(3, 3)),
insertion_id: 2,
});
}
}
@@ -1,216 +0,0 @@
// This file is part of Substrate.
// Copyright (C) 2018-2021 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Rotate extrinsic inside the pool.
//!
//! Keeps only recent extrinsic and discard the ones kept for a significant amount of time.
//! Discarded extrinsics are banned so that they don't get re-imported again.
use std::{
collections::HashMap,
hash,
iter,
time::Duration,
};
use parking_lot::RwLock;
use wasm_timer::Instant;
use crate::base_pool::Transaction;
/// Expected size of the banned extrinsics cache.
const EXPECTED_SIZE: usize = 2048;
/// Pool rotator is responsible to only keep fresh extrinsics in the pool.
///
/// Extrinsics that occupy the pool for too long are culled and temporarily banned from entering
/// the pool again.
pub struct PoolRotator<Hash> {
/// How long the extrinsic is banned for.
ban_time: Duration,
/// Currently banned extrinsics.
banned_until: RwLock<HashMap<Hash, Instant>>,
}
impl<Hash: hash::Hash + Eq> Default for PoolRotator<Hash> {
fn default() -> Self {
Self {
ban_time: Duration::from_secs(60 * 30),
banned_until: Default::default(),
}
}
}
impl<Hash: hash::Hash + Eq + Clone> PoolRotator<Hash> {
/// Returns `true` if extrinsic hash is currently banned.
pub fn is_banned(&self, hash: &Hash) -> bool {
self.banned_until.read().contains_key(hash)
}
/// Bans given set of hashes.
pub fn ban(&self, now: &Instant, hashes: impl IntoIterator<Item=Hash>) {
let mut banned = self.banned_until.write();
for hash in hashes {
banned.insert(hash, *now + self.ban_time);
}
if banned.len() > 2 * EXPECTED_SIZE {
while banned.len() > EXPECTED_SIZE {
if let Some(key) = banned.keys().next().cloned() {
banned.remove(&key);
}
}
}
}
/// Bans extrinsic if it's stale.
///
/// Returns `true` if extrinsic is stale and got banned.
pub fn ban_if_stale<Ex>(&self, now: &Instant, current_block: u64, xt: &Transaction<Hash, Ex>) -> bool {
if xt.valid_till > current_block {
return false;
}
self.ban(now, iter::once(xt.hash.clone()));
true
}
/// Removes timed bans.
pub fn clear_timeouts(&self, now: &Instant) {
let mut banned = self.banned_until.write();
banned.retain(|_, &mut v| v >= *now);
}
}
#[cfg(test)]
mod tests {
use super::*;
use sp_runtime::transaction_validity::TransactionSource;
type Hash = u64;
type Ex = ();
fn rotator() -> PoolRotator<Hash> {
PoolRotator {
ban_time: Duration::from_millis(10),
..Default::default()
}
}
fn tx() -> (Hash, Transaction<Hash, Ex>) {
let hash = 5u64;
let tx = Transaction {
data: (),
bytes: 1,
hash: hash.clone(),
priority: 5,
valid_till: 1,
requires: vec![],
provides: vec![],
propagate: true,
source: TransactionSource::External,
};
(hash, tx)
}
#[test]
fn should_not_ban_if_not_stale() {
// given
let (hash, tx) = tx();
let rotator = rotator();
assert!(!rotator.is_banned(&hash));
let now = Instant::now();
let past_block = 0;
// when
assert!(!rotator.ban_if_stale(&now, past_block, &tx));
// then
assert!(!rotator.is_banned(&hash));
}
#[test]
fn should_ban_stale_extrinsic() {
// given
let (hash, tx) = tx();
let rotator = rotator();
assert!(!rotator.is_banned(&hash));
// when
assert!(rotator.ban_if_stale(&Instant::now(), 1, &tx));
// then
assert!(rotator.is_banned(&hash));
}
#[test]
fn should_clear_banned() {
// given
let (hash, tx) = tx();
let rotator = rotator();
assert!(rotator.ban_if_stale(&Instant::now(), 1, &tx));
assert!(rotator.is_banned(&hash));
// when
let future = Instant::now() + rotator.ban_time + rotator.ban_time;
rotator.clear_timeouts(&future);
// then
assert!(!rotator.is_banned(&hash));
}
#[test]
fn should_garbage_collect() {
// given
fn tx_with(i: u64, valid_till: u64) -> Transaction<Hash, Ex> {
let hash = i;
Transaction {
data: (),
bytes: 2,
hash,
priority: 5,
valid_till,
requires: vec![],
provides: vec![],
propagate: true,
source: TransactionSource::External,
}
}
let rotator = rotator();
let now = Instant::now();
let past_block = 0;
// when
for i in 0..2*EXPECTED_SIZE {
let tx = tx_with(i as u64, past_block);
assert!(rotator.ban_if_stale(&now, past_block, &tx));
}
assert_eq!(rotator.banned_until.read().len(), 2*EXPECTED_SIZE);
// then
let tx = tx_with(2*EXPECTED_SIZE as u64, past_block);
// trigger a garbage collection
assert!(rotator.ban_if_stale(&now, past_block, &tx));
assert_eq!(rotator.banned_until.read().len(), EXPECTED_SIZE);
}
}
@@ -1,189 +0,0 @@
// This file is part of Substrate.
// Copyright (C) 2018-2021 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use std::{
collections::HashMap,
sync::{Arc, atomic::{AtomicIsize, Ordering as AtomicOrdering}},
};
use parking_lot::{RwLock, RwLockWriteGuard, RwLockReadGuard};
/// Something that can report its size.
pub trait Size {
fn size(&self) -> usize;
}
/// Map with size tracking.
///
/// Size reported might be slightly off and only approximately true.
#[derive(Debug, parity_util_mem::MallocSizeOf)]
pub struct TrackedMap<K, V> {
index: Arc<RwLock<HashMap<K, V>>>,
bytes: AtomicIsize,
length: AtomicIsize,
}
impl<K, V> Default for TrackedMap<K, V> {
fn default() -> Self {
Self {
index: Arc::new(HashMap::default().into()),
bytes: 0.into(),
length: 0.into(),
}
}
}
impl<K, V> TrackedMap<K, V> {
/// Current tracked length of the content.
pub fn len(&self) -> usize {
std::cmp::max(self.length.load(AtomicOrdering::Relaxed), 0) as usize
}
/// Current sum of content length.
pub fn bytes(&self) -> usize {
std::cmp::max(self.bytes.load(AtomicOrdering::Relaxed), 0) as usize
}
/// Read-only clone of the interior.
pub fn clone(&self) -> ReadOnlyTrackedMap<K, V> {
ReadOnlyTrackedMap(self.index.clone())
}
/// Lock map for read.
pub fn read(&self) -> TrackedMapReadAccess<K, V> {
TrackedMapReadAccess {
inner_guard: self.index.read(),
}
}
/// Lock map for write.
pub fn write(&self) -> TrackedMapWriteAccess<K, V> {
TrackedMapWriteAccess {
inner_guard: self.index.write(),
bytes: &self.bytes,
length: &self.length,
}
}
}
/// Read-only access to map.
///
/// The only thing can be done is .read().
pub struct ReadOnlyTrackedMap<K, V>(Arc<RwLock<HashMap<K, V>>>);
impl<K, V> ReadOnlyTrackedMap<K, V>
where
K: Eq + std::hash::Hash
{
/// Lock map for read.
pub fn read(&self) -> TrackedMapReadAccess<K, V> {
TrackedMapReadAccess {
inner_guard: self.0.read(),
}
}
}
pub struct TrackedMapReadAccess<'a, K, V> {
inner_guard: RwLockReadGuard<'a, HashMap<K, V>>,
}
impl<'a, K, V> TrackedMapReadAccess<'a, K, V>
where
K: Eq + std::hash::Hash
{
/// Returns true if map contains key.
pub fn contains_key(&self, key: &K) -> bool {
self.inner_guard.contains_key(key)
}
/// Returns reference to the contained value by key, if exists.
pub fn get(&self, key: &K) -> Option<&V> {
self.inner_guard.get(key)
}
/// Returns iterator over all values.
pub fn values(&self) -> std::collections::hash_map::Values<K, V> {
self.inner_guard.values()
}
}
pub struct TrackedMapWriteAccess<'a, K, V> {
bytes: &'a AtomicIsize,
length: &'a AtomicIsize,
inner_guard: RwLockWriteGuard<'a, HashMap<K, V>>,
}
impl<'a, K, V> TrackedMapWriteAccess<'a, K, V>
where
K: Eq + std::hash::Hash, V: Size
{
/// Insert value and return previous (if any).
pub fn insert(&mut self, key: K, val: V) -> Option<V> {
let new_bytes = val.size();
self.bytes.fetch_add(new_bytes as isize, AtomicOrdering::Relaxed);
self.length.fetch_add(1, AtomicOrdering::Relaxed);
self.inner_guard.insert(key, val).map(|old_val| {
self.bytes.fetch_sub(old_val.size() as isize, AtomicOrdering::Relaxed);
self.length.fetch_sub(1, AtomicOrdering::Relaxed);
old_val
})
}
/// Remove value by key.
pub fn remove(&mut self, key: &K) -> Option<V> {
let val = self.inner_guard.remove(key);
if let Some(size) = val.as_ref().map(Size::size) {
self.bytes.fetch_sub(size as isize, AtomicOrdering::Relaxed);
self.length.fetch_sub(1, AtomicOrdering::Relaxed);
}
val
}
/// Returns mutable reference to the contained value by key, if exists.
pub fn get_mut(&mut self, key: &K) -> Option<&mut V> {
self.inner_guard.get_mut(key)
}
}
#[cfg(test)]
mod tests {
use super::*;
impl Size for i32 {
fn size(&self) -> usize { *self as usize / 10 }
}
#[test]
fn basic() {
let map = TrackedMap::default();
map.write().insert(5, 10);
map.write().insert(6, 20);
assert_eq!(map.bytes(), 3);
assert_eq!(map.len(), 2);
map.write().insert(6, 30);
assert_eq!(map.bytes(), 4);
assert_eq!(map.len(), 2);
map.write().remove(&6);
assert_eq!(map.bytes(), 1);
assert_eq!(map.len(), 1);
}
}
@@ -1,658 +0,0 @@
// This file is part of Substrate.
// Copyright (C) 2018-2021 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use std::{
collections::{HashSet, HashMap},
hash,
sync::Arc,
};
use serde::Serialize;
use parking_lot::{Mutex, RwLock};
use sp_runtime::{
generic::BlockId,
traits::{self, SaturatedConversion},
transaction_validity::{TransactionTag as Tag, ValidTransaction, TransactionSource},
};
use sp_transaction_pool::{error, PoolStatus};
use wasm_timer::Instant;
use futures::channel::mpsc::{channel, Sender};
use retain_mut::RetainMut;
use crate::base_pool::{self as base, PruneStatus};
use crate::listener::Listener;
use crate::rotator::PoolRotator;
use crate::watcher::Watcher;
use crate::pool::{
EventStream, Options, ChainApi, BlockHash, ExtrinsicHash, ExtrinsicFor, TransactionFor,
};
/// Pre-validated transaction. Validated pool only accepts transactions wrapped in this enum.
#[derive(Debug)]
pub enum ValidatedTransaction<Hash, Ex, Error> {
/// Transaction that has been validated successfully.
Valid(base::Transaction<Hash, Ex>),
/// Transaction that is invalid.
Invalid(Hash, Error),
/// Transaction which validity can't be determined.
///
/// We're notifying watchers about failure, if 'unknown' transaction is submitted.
Unknown(Hash, Error),
}
impl<Hash, Ex, Error> ValidatedTransaction<Hash, Ex, Error> {
/// Consume validity result, transaction data and produce ValidTransaction.
pub fn valid_at(
at: u64,
hash: Hash,
source: TransactionSource,
data: Ex,
bytes: usize,
validity: ValidTransaction,
) -> Self {
Self::Valid(base::Transaction {
data,
bytes,
hash,
source,
priority: validity.priority,
requires: validity.requires,
provides: validity.provides,
propagate: validity.propagate,
valid_till: at
.saturated_into::<u64>()
.saturating_add(validity.longevity),
})
}
}
/// A type of validated transaction stored in the pool.
pub type ValidatedTransactionFor<B> = ValidatedTransaction<
ExtrinsicHash<B>,
ExtrinsicFor<B>,
<B as ChainApi>::Error,
>;
/// A closure that returns true if the local node is a validator that can author blocks.
pub struct IsValidator(Box<dyn Fn() -> bool + Send + Sync>);
impl From<bool> for IsValidator {
fn from(is_validator: bool) -> Self {
Self(Box::new(move || is_validator))
}
}
impl From<Box<dyn Fn() -> bool + Send + Sync>> for IsValidator {
fn from(is_validator: Box<dyn Fn() -> bool + Send + Sync>) -> Self {
Self(is_validator)
}
}
/// Pool that deals with validated transactions.
pub struct ValidatedPool<B: ChainApi> {
api: Arc<B>,
is_validator: IsValidator,
options: Options,
listener: RwLock<Listener<ExtrinsicHash<B>, B>>,
pool: RwLock<base::BasePool<
ExtrinsicHash<B>,
ExtrinsicFor<B>,
>>,
import_notification_sinks: Mutex<Vec<Sender<ExtrinsicHash<B>>>>,
rotator: PoolRotator<ExtrinsicHash<B>>,
}
#[cfg(not(target_os = "unknown"))]
impl<B: ChainApi> parity_util_mem::MallocSizeOf for ValidatedPool<B>
where
ExtrinsicFor<B>: parity_util_mem::MallocSizeOf,
{
fn size_of(&self, ops: &mut parity_util_mem::MallocSizeOfOps) -> usize {
// other entries insignificant or non-primary references
self.pool.size_of(ops)
}
}
impl<B: ChainApi> ValidatedPool<B> {
/// Create a new transaction pool.
pub fn new(options: Options, is_validator: IsValidator, api: Arc<B>) -> Self {
let base_pool = base::BasePool::new(options.reject_future_transactions);
Self {
is_validator,
options,
listener: Default::default(),
api,
pool: RwLock::new(base_pool),
import_notification_sinks: Default::default(),
rotator: Default::default(),
}
}
/// Bans given set of hashes.
pub fn ban(&self, now: &Instant, hashes: impl IntoIterator<Item=ExtrinsicHash<B>>) {
self.rotator.ban(now, hashes)
}
/// Returns true if transaction with given hash is currently banned from the pool.
pub fn is_banned(&self, hash: &ExtrinsicHash<B>) -> bool {
self.rotator.is_banned(hash)
}
/// A fast check before doing any further processing of a transaction, like validation.
///
/// If `ignore_banned` is `true`, it will not check if the transaction is banned.
///
/// It checks if the transaction is already imported or banned. If so, it returns an error.
pub fn check_is_known(
&self,
tx_hash: &ExtrinsicHash<B>,
ignore_banned: bool,
) -> Result<(), B::Error> {
if !ignore_banned && self.is_banned(tx_hash) {
Err(error::Error::TemporarilyBanned.into())
} else if self.pool.read().is_imported(tx_hash) {
Err(error::Error::AlreadyImported(Box::new(*tx_hash)).into())
} else {
Ok(())
}
}
/// Imports a bunch of pre-validated transactions to the pool.
pub fn submit(
&self,
txs: impl IntoIterator<Item=ValidatedTransactionFor<B>>,
) -> Vec<Result<ExtrinsicHash<B>, B::Error>> {
let results = txs.into_iter()
.map(|validated_tx| self.submit_one(validated_tx))
.collect::<Vec<_>>();
// only enforce limits if there is at least one imported transaction
let removed = if results.iter().any(|res| res.is_ok()) {
self.enforce_limits()
} else {
Default::default()
};
results.into_iter().map(|res| match res {
Ok(ref hash) if removed.contains(hash) => Err(error::Error::ImmediatelyDropped.into()),
other => other,
}).collect()
}
/// Submit single pre-validated transaction to the pool.
fn submit_one(&self, tx: ValidatedTransactionFor<B>) -> Result<ExtrinsicHash<B>, B::Error> {
match tx {
ValidatedTransaction::Valid(tx) => {
if !tx.propagate && !(self.is_validator.0)() {
return Err(error::Error::Unactionable.into());
}
let imported = self.pool.write().import(tx)?;
if let base::Imported::Ready { ref hash, .. } = imported {
self.import_notification_sinks.lock()
.retain_mut(|sink| {
match sink.try_send(*hash) {
Ok(()) => true,
Err(e) => {
if e.is_full() {
log::warn!(target: "txpool", "[{:?}] Trying to notify an import but the channel is full", hash);
true
} else {
false
}
},
}
});
}
let mut listener = self.listener.write();
fire_events(&mut *listener, &imported);
Ok(*imported.hash())
},
ValidatedTransaction::Invalid(hash, err) => {
self.rotator.ban(&Instant::now(), std::iter::once(hash));
Err(err)
},
ValidatedTransaction::Unknown(hash, err) => {
self.listener.write().invalid(&hash);
Err(err)
},
}
}
fn enforce_limits(&self) -> HashSet<ExtrinsicHash<B>> {
let status = self.pool.read().status();
let ready_limit = &self.options.ready;
let future_limit = &self.options.future;
log::debug!(target: "txpool", "Pool Status: {:?}", status);
if ready_limit.is_exceeded(status.ready, status.ready_bytes)
|| future_limit.is_exceeded(status.future, status.future_bytes)
{
log::debug!(
target: "txpool",
"Enforcing limits ({}/{}kB ready, {}/{}kB future",
ready_limit.count, ready_limit.total_bytes / 1024,
future_limit.count, future_limit.total_bytes / 1024,
);
// clean up the pool
let removed = {
let mut pool = self.pool.write();
let removed = pool.enforce_limits(ready_limit, future_limit)
.into_iter().map(|x| x.hash).collect::<HashSet<_>>();
// ban all removed transactions
self.rotator.ban(&Instant::now(), removed.iter().copied());
removed
};
if !removed.is_empty() {
log::debug!(target: "txpool", "Enforcing limits: {} dropped", removed.len());
}
// run notifications
let mut listener = self.listener.write();
for h in &removed {
listener.dropped(h, None);
}
removed
} else {
Default::default()
}
}
/// Import a single extrinsic and starts to watch their progress in the pool.
pub fn submit_and_watch(
&self,
tx: ValidatedTransactionFor<B>,
) -> Result<Watcher<ExtrinsicHash<B>, ExtrinsicHash<B>>, B::Error> {
match tx {
ValidatedTransaction::Valid(tx) => {
let hash = self.api.hash_and_length(&tx.data).0;
let watcher = self.listener.write().create_watcher(hash);
self.submit(std::iter::once(ValidatedTransaction::Valid(tx)))
.pop()
.expect("One extrinsic passed; one result returned; qed")
.map(|_| watcher)
},
ValidatedTransaction::Invalid(hash, err) => {
self.rotator.ban(&Instant::now(), std::iter::once(hash));
Err(err)
},
ValidatedTransaction::Unknown(_, err) => Err(err),
}
}
/// Resubmits revalidated transactions back to the pool.
///
/// Removes and then submits passed transactions and all dependent transactions.
/// Transactions that are missing from the pool are not submitted.
pub fn resubmit(&self, mut updated_transactions: HashMap<ExtrinsicHash<B>, ValidatedTransactionFor<B>>) {
#[derive(Debug, Clone, Copy, PartialEq)]
enum Status { Future, Ready, Failed, Dropped }
let (mut initial_statuses, final_statuses) = {
let mut pool = self.pool.write();
// remove all passed transactions from the ready/future queues
// (this may remove additional transactions as well)
//
// for every transaction that has an entry in the `updated_transactions`,
// we store updated validation result in txs_to_resubmit
// for every transaction that has no entry in the `updated_transactions`,
// we store last validation result (i.e. the pool entry) in txs_to_resubmit
let mut initial_statuses = HashMap::new();
let mut txs_to_resubmit = Vec::with_capacity(updated_transactions.len());
while !updated_transactions.is_empty() {
let hash = updated_transactions.keys().next().cloned().expect("transactions is not empty; qed");
// note we are not considering tx with hash invalid here - we just want
// to remove it along with dependent transactions and `remove_subtree()`
// does exactly what we need
let removed = pool.remove_subtree(&[hash]);
for removed_tx in removed {
let removed_hash = removed_tx.hash;
let updated_transaction = updated_transactions.remove(&removed_hash);
let tx_to_resubmit = if let Some(updated_tx) = updated_transaction {
updated_tx
} else {
// in most cases we'll end up in successful `try_unwrap`, but if not
// we still need to reinsert transaction back to the pool => duplicate call
let transaction = match Arc::try_unwrap(removed_tx) {
Ok(transaction) => transaction,
Err(transaction) => transaction.duplicate(),
};
ValidatedTransaction::Valid(transaction)
};
initial_statuses.insert(removed_hash, Status::Ready);
txs_to_resubmit.push((removed_hash, tx_to_resubmit));
}
// make sure to remove the hash even if it's not present in the pool any more.
updated_transactions.remove(&hash);
}
// if we're rejecting future transactions, then insertion order matters here:
// if tx1 depends on tx2, then if tx1 is inserted before tx2, then it goes
// to the future queue and gets rejected immediately
// => let's temporary stop rejection and clear future queue before return
pool.with_futures_enabled(|pool, reject_future_transactions| {
// now resubmit all removed transactions back to the pool
let mut final_statuses = HashMap::new();
for (hash, tx_to_resubmit) in txs_to_resubmit {
match tx_to_resubmit {
ValidatedTransaction::Valid(tx) => match pool.import(tx) {
Ok(imported) => match imported {
base::Imported::Ready { promoted, failed, removed, .. } => {
final_statuses.insert(hash, Status::Ready);
for hash in promoted {
final_statuses.insert(hash, Status::Ready);
}
for hash in failed {
final_statuses.insert(hash, Status::Failed);
}
for tx in removed {
final_statuses.insert(tx.hash, Status::Dropped);
}
},
base::Imported::Future { .. } => {
final_statuses.insert(hash, Status::Future);
},
},
Err(err) => {
// we do not want to fail if single transaction import has failed
// nor we do want to propagate this error, because it could tx unknown to caller
// => let's just notify listeners (and issue debug message)
log::warn!(
target: "txpool",
"[{:?}] Removing invalid transaction from update: {}",
hash,
err,
);
final_statuses.insert(hash, Status::Failed);
},
},
ValidatedTransaction::Invalid(_, _) | ValidatedTransaction::Unknown(_, _) => {
final_statuses.insert(hash, Status::Failed);
},
}
}
// if the pool is configured to reject future transactions, let's clear the future
// queue, updating final statuses as required
if reject_future_transactions {
for future_tx in pool.clear_future() {
final_statuses.insert(future_tx.hash, Status::Dropped);
}
}
(initial_statuses, final_statuses)
})
};
// and now let's notify listeners about status changes
let mut listener = self.listener.write();
for (hash, final_status) in final_statuses {
let initial_status = initial_statuses.remove(&hash);
if initial_status.is_none() || Some(final_status) != initial_status {
match final_status {
Status::Future => listener.future(&hash),
Status::Ready => listener.ready(&hash, None),
Status::Dropped => listener.dropped(&hash, None),
Status::Failed => listener.invalid(&hash),
}
}
}
}
/// For each extrinsic, returns tags that it provides (if known), or None (if it is unknown).
pub fn extrinsics_tags(&self, hashes: &[ExtrinsicHash<B>]) -> Vec<Option<Vec<Tag>>> {
self.pool.read()
.by_hashes(&hashes)
.into_iter()
.map(|existing_in_pool|
existing_in_pool.map(|transaction| transaction.provides.to_vec())
)
.collect()
}
/// Get ready transaction by hash
pub fn ready_by_hash(&self, hash: &ExtrinsicHash<B>) -> Option<TransactionFor<B>> {
self.pool.read().ready_by_hash(hash)
}
/// Prunes ready transactions that provide given list of tags.
pub fn prune_tags(
&self,
tags: impl IntoIterator<Item=Tag>,
) -> Result<PruneStatus<ExtrinsicHash<B>, ExtrinsicFor<B>>, B::Error> {
// Perform tag-based pruning in the base pool
let status = self.pool.write().prune_tags(tags);
// Notify event listeners of all transactions
// that were promoted to `Ready` or were dropped.
{
let mut listener = self.listener.write();
for promoted in &status.promoted {
fire_events(&mut *listener, promoted);
}
for f in &status.failed {
listener.dropped(f, None);
}
}
Ok(status)
}
/// Resubmit transactions that have been revalidated after prune_tags call.
pub fn resubmit_pruned(
&self,
at: &BlockId<B::Block>,
known_imported_hashes: impl IntoIterator<Item=ExtrinsicHash<B>> + Clone,
pruned_hashes: Vec<ExtrinsicHash<B>>,
pruned_xts: Vec<ValidatedTransactionFor<B>>,
) -> Result<(), B::Error> {
debug_assert_eq!(pruned_hashes.len(), pruned_xts.len());
// Resubmit pruned transactions
let results = self.submit(pruned_xts);
// Collect the hashes of transactions that now became invalid (meaning that they are successfully pruned).
let hashes = results
.into_iter()
.enumerate()
.filter_map(|(idx, r)| match r.map_err(error::IntoPoolError::into_pool_error) {
Err(Ok(error::Error::InvalidTransaction(_))) => Some(pruned_hashes[idx]),
_ => None,
});
// Fire `pruned` notifications for collected hashes and make sure to include
// `known_imported_hashes` since they were just imported as part of the block.
let hashes = hashes.chain(known_imported_hashes.into_iter());
self.fire_pruned(at, hashes)?;
// perform regular cleanup of old transactions in the pool
// and update temporary bans.
self.clear_stale(at)?;
Ok(())
}
/// Fire notifications for pruned transactions.
pub fn fire_pruned(
&self,
at: &BlockId<B::Block>,
hashes: impl Iterator<Item=ExtrinsicHash<B>>,
) -> Result<(), B::Error> {
let header_hash = self.api.block_id_to_hash(at)?
.ok_or_else(|| error::Error::InvalidBlockId(format!("{:?}", at)))?;
let mut listener = self.listener.write();
let mut set = HashSet::with_capacity(hashes.size_hint().0);
for h in hashes {
// `hashes` has possibly duplicate hashes.
// we'd like to send out the `InBlock` notification only once.
if !set.contains(&h) {
listener.pruned(header_hash, &h);
set.insert(h);
}
}
Ok(())
}
/// Removes stale transactions from the pool.
///
/// Stale transactions are transaction beyond their longevity period.
/// Note this function does not remove transactions that are already included in the chain.
/// See `prune_tags` if you want this.
pub fn clear_stale(&self, at: &BlockId<B::Block>) -> Result<(), B::Error> {
let block_number = self.api.block_id_to_number(at)?
.ok_or_else(|| error::Error::InvalidBlockId(format!("{:?}", at)))?
.saturated_into::<u64>();
let now = Instant::now();
let to_remove = {
self.ready()
.filter(|tx| self.rotator.ban_if_stale(&now, block_number, &tx))
.map(|tx| tx.hash)
.collect::<Vec<_>>()
};
let futures_to_remove: Vec<ExtrinsicHash<B>> = {
let p = self.pool.read();
let mut hashes = Vec::new();
for tx in p.futures() {
if self.rotator.ban_if_stale(&now, block_number, &tx) {
hashes.push(tx.hash);
}
}
hashes
};
// removing old transactions
self.remove_invalid(&to_remove);
self.remove_invalid(&futures_to_remove);
// clear banned transactions timeouts
self.rotator.clear_timeouts(&now);
Ok(())
}
/// Get rotator reference.
#[cfg(test)]
pub fn rotator(&self) -> &PoolRotator<ExtrinsicHash<B>> {
&self.rotator
}
/// Get api reference.
pub fn api(&self) -> &B {
&self.api
}
/// Return an event stream of notifications for when transactions are imported to the pool.
///
/// Consumers of this stream should use the `ready` method to actually get the
/// pending transactions in the right order.
pub fn import_notification_stream(&self) -> EventStream<ExtrinsicHash<B>> {
const CHANNEL_BUFFER_SIZE: usize = 1024;
let (sink, stream) = channel(CHANNEL_BUFFER_SIZE);
self.import_notification_sinks.lock().push(sink);
stream
}
/// Invoked when extrinsics are broadcasted.
pub fn on_broadcasted(&self, propagated: HashMap<ExtrinsicHash<B>, Vec<String>>) {
let mut listener = self.listener.write();
for (hash, peers) in propagated.into_iter() {
listener.broadcasted(&hash, peers);
}
}
/// Remove a subtree of transactions from the pool and mark them invalid.
///
/// The transactions passed as an argument will be additionally banned
/// to prevent them from entering the pool right away.
/// Note this is not the case for the dependent transactions - those may
/// still be valid so we want to be able to re-import them.
pub fn remove_invalid(&self, hashes: &[ExtrinsicHash<B>]) -> Vec<TransactionFor<B>> {
// early exit in case there is no invalid transactions.
if hashes.is_empty() {
return vec![];
}
log::debug!(target: "txpool", "Removing invalid transactions: {:?}", hashes);
// temporarily ban invalid transactions
self.rotator.ban(&Instant::now(), hashes.iter().cloned());
let invalid = self.pool.write().remove_subtree(hashes);
log::debug!(target: "txpool", "Removed invalid transactions: {:?}", invalid);
let mut listener = self.listener.write();
for tx in &invalid {
listener.invalid(&tx.hash);
}
invalid
}
/// Get an iterator for ready transactions ordered by priority
pub fn ready(&self) -> impl Iterator<Item=TransactionFor<B>> + Send {
self.pool.read().ready()
}
/// Returns a Vec of hashes and extrinsics in the future pool.
pub fn futures(&self) -> Vec<(ExtrinsicHash<B>, ExtrinsicFor<B>)> {
self.pool.read().futures()
.map(|tx| (tx.hash.clone(), tx.data.clone()))
.collect()
}
/// Returns pool status.
pub fn status(&self) -> PoolStatus {
self.pool.read().status()
}
/// Notify all watchers that transactions in the block with hash have been finalized
pub async fn on_block_finalized(&self, block_hash: BlockHash<B>) -> Result<(), B::Error> {
log::trace!(target: "txpool", "Attempting to notify watchers of finalization for {}", block_hash);
self.listener.write().finalized(block_hash);
Ok(())
}
/// Notify the listener of retracted blocks
pub fn on_block_retracted(&self, block_hash: BlockHash<B>) {
self.listener.write().retracted(block_hash)
}
}
fn fire_events<H, B, Ex>(
listener: &mut Listener<H, B>,
imported: &base::Imported<H, Ex>,
) where
H: hash::Hash + Eq + traits::Member + Serialize,
B: ChainApi,
{
match *imported {
base::Imported::Ready { ref promoted, ref failed, ref removed, ref hash } => {
listener.ready(hash, None);
failed.into_iter().for_each(|f| listener.invalid(f));
removed.into_iter().for_each(|r| listener.dropped(&r.hash, Some(hash)));
promoted.into_iter().for_each(|p| listener.ready(p, None));
},
base::Imported::Future { ref hash } => {
listener.future(hash)
},
}
}
@@ -1,140 +0,0 @@
// This file is part of Substrate.
// Copyright (C) 2018-2021 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Extrinsics status updates.
use futures::Stream;
use sp_transaction_pool::TransactionStatus;
use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedSender, TracingUnboundedReceiver};
/// Extrinsic watcher.
///
/// Represents a stream of status updates for a particular extrinsic.
#[derive(Debug)]
pub struct Watcher<H, BH> {
receiver: TracingUnboundedReceiver<TransactionStatus<H, BH>>,
/// transaction hash of watched extrinsic
hash: H,
}
impl<H, BH> Watcher<H, BH> {
/// Returns the transaction hash.
pub fn hash(&self) -> &H {
&self.hash
}
/// Pipe the notifications to given sink.
///
/// Make sure to drive the future to completion.
pub fn into_stream(self) -> impl Stream<Item=TransactionStatus<H, BH>> {
self.receiver
}
}
/// Sender part of the watcher. Exposed only for testing purposes.
#[derive(Debug)]
pub struct Sender<H, BH> {
receivers: Vec<TracingUnboundedSender<TransactionStatus<H, BH>>>,
is_finalized: bool,
}
impl<H, BH> Default for Sender<H, BH> {
fn default() -> Self {
Sender {
receivers: Default::default(),
is_finalized: false,
}
}
}
impl<H: Clone, BH: Clone> Sender<H, BH> {
/// Add a new watcher to this sender object.
pub fn new_watcher(&mut self, hash: H) -> Watcher<H, BH> {
let (tx, receiver) = tracing_unbounded("mpsc_txpool_watcher");
self.receivers.push(tx);
Watcher {
receiver,
hash,
}
}
/// Transaction became ready.
pub fn ready(&mut self) {
self.send(TransactionStatus::Ready)
}
/// Transaction was moved to future.
pub fn future(&mut self) {
self.send(TransactionStatus::Future)
}
/// Some state change (perhaps another extrinsic was included) rendered this extrinsic invalid.
pub fn usurped(&mut self, hash: H) {
self.send(TransactionStatus::Usurped(hash));
self.is_finalized = true;
}
/// Extrinsic has been included in block with given hash.
pub fn in_block(&mut self, hash: BH) {
self.send(TransactionStatus::InBlock(hash));
}
/// Extrinsic has been finalized by a finality gadget.
pub fn finalized(&mut self, hash: BH) {
self.send(TransactionStatus::Finalized(hash));
self.is_finalized = true;
}
/// The block this extrinsic was included in has been retracted
pub fn finality_timeout(&mut self, hash: BH) {
self.send(TransactionStatus::FinalityTimeout(hash));
self.is_finalized = true;
}
/// The block this extrinsic was included in has been retracted
pub fn retracted(&mut self, hash: BH) {
self.send(TransactionStatus::Retracted(hash));
}
/// Extrinsic has been marked as invalid by the block builder.
pub fn invalid(&mut self) {
self.send(TransactionStatus::Invalid);
// we mark as finalized as there are no more notifications
self.is_finalized = true;
}
/// Transaction has been dropped from the pool because of the limit.
pub fn dropped(&mut self) {
self.send(TransactionStatus::Dropped);
self.is_finalized = true;
}
/// The extrinsic has been broadcast to the given peers.
pub fn broadcast(&mut self, peers: Vec<String>) {
self.send(TransactionStatus::Broadcast(peers))
}
/// Returns true if the are no more listeners for this extrinsic or it was finalized.
pub fn is_done(&self) -> bool {
self.is_finalized || self.receivers.is_empty()
}
fn send(&mut self, status: TransactionStatus<H, BH>) {
self.receivers.retain(|sender| sender.unbounded_send(status.clone()).is_ok())
}
}