Tagged transaction queue integration (#893)

* Make the graph generic.

* Adapting pool API for the graph.

* Merge pool & graph.

* Restructure.

* Fix test of transaction pool.

* Get rid of node/transaction-pool.

* Compilation fixes.

* Test7

* Fix compilation of tests.

* Revert runtime changes.

* Add validate_transaction to test-runtime.

* Fix RPC tests.

* Add clearing of the old transactions.

* Trigger pool events.

* Use new queue API.

* Fix wasm build, re-export Hasher.

* No warning if validate transaction fails.

* Get rid of Into<u64> and use As
This commit is contained in:
Tomasz Drwięga
2018-10-12 13:09:35 +02:00
committed by Gav Wood
parent 2404d3c89f
commit 671b0e0007
48 changed files with 1234 additions and 1524 deletions
@@ -0,0 +1,709 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
//! A basic version of the dependency graph.
//!
//! For a more full-featured pool, have a look at the `pool` module.
use std::{
hash,
sync::Arc,
};
use sr_primitives::traits::Member;
use sr_primitives::transaction_validity::{
TransactionTag as Tag,
TransactionLongevity as Longevity,
TransactionPriority as Priority,
};
use error;
use future::{FutureTransactions, WaitingTransaction};
use ready::ReadyTransactions;
/// Block number type.
pub type BlockNumber = u64;
/// Successful import result.
#[derive(Debug, PartialEq, Eq)]
pub enum Imported<Hash, Ex> {
/// Transaction was successfuly imported to Ready queue.
Ready {
/// Hash of transaction that was successfuly imported.
hash: Hash,
/// Transactions that got promoted from the Future queue.
promoted: Vec<Hash>,
/// Transactions that failed to be promoted from the Future queue and are now discarded.
failed: Vec<Hash>,
/// Transactions removed from the Ready pool (replaced).
removed: Vec<Arc<Transaction<Hash, Ex>>>,
},
/// Transaction was successfuly imported to Future queue.
Future {
/// Hash of transaction that was successfuly imported.
hash: Hash,
}
}
impl<Hash, Ex> Imported<Hash, Ex> {
/// Returns the hash of imported transaction.
pub fn hash(&self) -> &Hash {
use self::Imported::*;
match *self {
Ready { ref hash, .. } => hash,
Future { ref hash, .. } => hash,
}
}
}
/// Status of pruning the queue.
#[derive(Debug)]
pub struct PruneStatus<Hash, Ex> {
/// A list of imports that satisfying the tag triggered.
pub promoted: Vec<Imported<Hash, Ex>>,
/// A list of transactions that failed to be promoted and now are discarded.
pub failed: Vec<Hash>,
/// A list of transactions that got pruned from the ready queue.
pub pruned: Vec<Arc<Transaction<Hash, Ex>>>,
}
/// Immutable transaction
#[cfg_attr(test, derive(Clone))]
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct Transaction<Hash, Extrinsic> {
/// Raw extrinsic representing that transaction.
pub data: Extrinsic,
/// Transaction hash (unique)
pub hash: Hash,
/// Transaction priority (higher = better)
pub priority: Priority,
/// How many blocks the transaction is valid for.
pub longevity: Longevity,
/// Tags required by the transaction.
pub requires: Vec<Tag>,
/// Tags that this transaction provides.
pub provides: Vec<Tag>,
}
/// Transaction pool.
///
/// Builds a dependency graph for all transactions in the pool and returns
/// the ones that are currently ready to be executed.
///
/// General note:
/// If function returns some transactions it usually means that importing them
/// as-is for the second time will fail or produce unwanted results.
/// Most likely it is required to revalidate them and recompute set of
/// required tags.
#[derive(Debug)]
pub struct BasePool<Hash: hash::Hash + Eq, Ex> {
future: FutureTransactions<Hash, Ex>,
ready: ReadyTransactions<Hash, Ex>,
}
impl<Hash: hash::Hash + Eq, Ex> Default for BasePool<Hash, Ex> {
fn default() -> Self {
BasePool {
future: Default::default(),
ready: Default::default(),
}
}
}
impl<Hash: hash::Hash + Member, Ex: ::std::fmt::Debug> BasePool<Hash, Ex> {
/// Imports transaction to the pool.
///
/// The pool consists of two parts: Future and Ready.
/// The former contains transactions that require some tags that are not yet provided by
/// other transactions in the pool.
/// The latter contains transactions that have all the requirements satisfied and are
/// ready to be included in the block.
pub fn import(
&mut self,
block_number: BlockNumber,
tx: Transaction<Hash, Ex>,
) -> error::Result<Imported<Hash, Ex>> {
if self.future.contains(&tx.hash) || self.ready.contains(&tx.hash) {
bail!(error::ErrorKind::AlreadyImported)
}
let tx = WaitingTransaction::new(tx, self.ready.provided_tags());
trace!(target: "txpool", "[{:?}] {:?}", tx.transaction.hash, tx);
debug!(target: "txpool", "[{:?}] Importing to {}", tx.transaction.hash, if tx.is_ready() { "ready" } else { "future" });
// If all tags are not satisfied import to future.
if !tx.is_ready() {
let hash = tx.transaction.hash.clone();
self.future.import(tx);
return Ok(Imported::Future { hash });
}
self.import_to_ready(block_number, tx)
}
/// Imports transaction to ready queue.
///
/// NOTE the transaction has to have all requirements satisfied.
fn import_to_ready(&mut self, block_number: BlockNumber, tx: WaitingTransaction<Hash, Ex>) -> error::Result<Imported<Hash, Ex>> {
let hash = tx.transaction.hash.clone();
let mut promoted = vec![];
let mut failed = vec![];
let mut removed = vec![];
let mut first = true;
let mut to_import = vec![tx];
loop {
// take first transaction from the list
let tx = match to_import.pop() {
Some(tx) => tx,
None => break,
};
// find transactions in Future that it unlocks
to_import.append(&mut self.future.satisfy_tags(&tx.transaction.provides));
// import this transaction
let current_hash = tx.transaction.hash.clone();
match self.ready.import(block_number, tx) {
Ok(mut replaced) => {
if !first {
promoted.push(current_hash);
}
// The transactions were removed from the ready pool. We might attempt to re-import them.
removed.append(&mut replaced);
},
// transaction failed to be imported.
Err(e) => if first {
debug!(target: "txpool", "[{:?}] Error importing: {:?}", current_hash, e);
return Err(e)
} else {
failed.push(current_hash);
},
}
first = false;
}
// An edge case when importing transaction caused
// some future transactions to be imported and that
// future transactions pushed out current transaction.
// This means that there is a cycle and the transactions should
// be moved back to future, since we can't resolve it.
if removed.iter().any(|tx| tx.hash == hash) {
// We still need to remove all transactions that we promoted
// since they depend on each other and will never get to the best iterator.
self.ready.remove_invalid(&promoted);
debug!(target: "txpool", "[{:?}] Cycle detected, bailing.", hash);
bail!(error::ErrorKind::CycleDetected)
}
Ok(Imported::Ready {
hash,
promoted,
failed,
removed,
})
}
/// Returns an iterator over ready transactions in the pool.
pub fn ready<'a, 'b: 'a>(&'b self) -> impl Iterator<Item=Arc<Transaction<Hash, Ex>>> + 'a {
self.ready.get()
}
/// Removes all transactions represented by the hashes and all other transactions
/// that depend on them.
///
/// Returns a list of actually removed transactions.
/// NOTE some transactions might still be valid, but were just removed because
/// they were part of a chain, you may attempt to re-import them later.
/// NOTE If you want to remove ready transactions that were already used
/// and you don't want them to be stored in the pool use `prune_tags` method.
pub fn remove_invalid(&mut self, hashes: &[Hash]) -> Vec<Arc<Transaction<Hash, Ex>>> {
let mut removed = self.ready.remove_invalid(hashes);
removed.extend(self.future.remove(hashes).into_iter().map(Arc::new));
removed
}
/// Prunes transactions that provide given list of tags.
///
/// This will cause all transactions that provide these tags to be removed from the pool,
/// but unlike `remove_invalid`, dependent transactions are not touched.
/// Additional transactions from future queue might be promoted to ready if you satisfy tags
/// that the pool didn't previously know about.
pub fn prune_tags(&mut self, block_number: BlockNumber, tags: impl IntoIterator<Item=Tag>) -> PruneStatus<Hash, Ex> {
let mut to_import = vec![];
let mut pruned = vec![];
for tag in tags {
// make sure to promote any future transactions that could be unlocked
to_import.append(&mut self.future.satisfy_tags(::std::iter::once(&tag)));
// and actually prune transactions in ready queue
pruned.append(&mut self.ready.prune_tags(tag));
}
let mut promoted = vec![];
let mut failed = vec![];
for tx in to_import {
let hash = tx.transaction.hash.clone();
match self.import_to_ready(block_number, tx) {
Ok(res) => promoted.push(res),
Err(e) => {
warn!(target: "txpool", "[{:?}] Failed to promote during pruning: {:?}", hash, e);
failed.push(hash)
},
}
}
PruneStatus {
pruned,
failed,
promoted,
}
}
/// Get pool status.
pub fn status(&self) -> Status {
Status {
ready: self.ready.len(),
future: self.future.len(),
}
}
}
/// Pool status
pub struct Status {
/// Number of transactions in the ready queue.
pub ready: usize,
/// Number of transactions in the future queue.
pub future: usize,
}
#[cfg(test)]
mod tests {
use super::*;
type Hash = u64;
fn pool() -> BasePool<Hash, Vec<u8>> {
BasePool::default()
}
#[test]
fn should_import_transaction_to_ready() {
// given
let mut pool = pool();
// when
pool.import(1, Transaction {
data: vec![1u8],
hash: 1u64,
priority: 5u64,
longevity: 64u64,
requires: vec![],
provides: vec![vec![1]],
}).unwrap();
// then
assert_eq!(pool.ready().count(), 1);
assert_eq!(pool.ready.len(), 1);
}
#[test]
fn should_not_import_same_transaction_twice() {
// given
let mut pool = pool();
// when
pool.import(1, Transaction {
data: vec![1u8],
hash: 1,
priority: 5u64,
longevity: 64u64,
requires: vec![],
provides: vec![vec![1]],
}).unwrap();
pool.import(1, Transaction {
data: vec![1u8],
hash: 1,
priority: 5u64,
longevity: 64u64,
requires: vec![],
provides: vec![vec![1]],
}).unwrap_err();
// then
assert_eq!(pool.ready().count(), 1);
assert_eq!(pool.ready.len(), 1);
}
#[test]
fn should_import_transaction_to_future_and_promote_it_later() {
// given
let mut pool = pool();
// when
pool.import(1, Transaction {
data: vec![1u8],
hash: 1,
priority: 5u64,
longevity: 64u64,
requires: vec![vec![0]],
provides: vec![vec![1]],
}).unwrap();
assert_eq!(pool.ready().count(), 0);
assert_eq!(pool.ready.len(), 0);
pool.import(1, Transaction {
data: vec![2u8],
hash: 2,
priority: 5u64,
longevity: 64u64,
requires: vec![],
provides: vec![vec![0]],
}).unwrap();
// then
assert_eq!(pool.ready().count(), 2);
assert_eq!(pool.ready.len(), 2);
}
#[test]
fn should_promote_a_subgraph() {
// given
let mut pool = pool();
// when
pool.import(1, Transaction {
data: vec![1u8],
hash: 1,
priority: 5u64,
longevity: 64u64,
requires: vec![vec![0]],
provides: vec![vec![1]],
}).unwrap();
pool.import(1, Transaction {
data: vec![3u8],
hash: 3,
priority: 5u64,
longevity: 64u64,
requires: vec![vec![2]],
provides: vec![],
}).unwrap();
pool.import(1, Transaction {
data: vec![2u8],
hash: 2,
priority: 5u64,
longevity: 64u64,
requires: vec![vec![1]],
provides: vec![vec![3], vec![2]],
}).unwrap();
pool.import(1, Transaction {
data: vec![4u8],
hash: 4,
priority: 1_000u64,
longevity: 64u64,
requires: vec![vec![3], vec![4]],
provides: vec![],
}).unwrap();
assert_eq!(pool.ready().count(), 0);
assert_eq!(pool.ready.len(), 0);
let res = pool.import(1, Transaction {
data: vec![5u8],
hash: 5,
priority: 5u64,
longevity: 64u64,
requires: vec![],
provides: vec![vec![0], vec![4]],
}).unwrap();
// then
let mut it = pool.ready().into_iter().map(|tx| tx.data[0]);
assert_eq!(it.next(), Some(5));
assert_eq!(it.next(), Some(1));
assert_eq!(it.next(), Some(2));
assert_eq!(it.next(), Some(4));
assert_eq!(it.next(), Some(3));
assert_eq!(it.next(), None);
assert_eq!(res, Imported::Ready {
hash: 5,
promoted: vec![1, 2, 3, 4],
failed: vec![],
removed: vec![],
});
}
#[test]
fn should_handle_a_cycle() {
// given
let mut pool = pool();
pool.import(1, Transaction {
data: vec![1u8],
hash: 1,
priority: 5u64,
longevity: 64u64,
requires: vec![vec![0]],
provides: vec![vec![1]],
}).unwrap();
pool.import(1, Transaction {
data: vec![3u8],
hash: 3,
priority: 5u64,
longevity: 64u64,
requires: vec![vec![1]],
provides: vec![vec![2]],
}).unwrap();
assert_eq!(pool.ready().count(), 0);
assert_eq!(pool.ready.len(), 0);
// when
pool.import(1, Transaction {
data: vec![2u8],
hash: 2,
priority: 5u64,
longevity: 64u64,
requires: vec![vec![2]],
provides: vec![vec![0]],
}).unwrap();
// then
{
let mut it = pool.ready().into_iter().map(|tx| tx.data[0]);
assert_eq!(it.next(), None);
}
// all transactions occupy the Future queue - it's fine
assert_eq!(pool.future.len(), 3);
// let's close the cycle with one additional transaction
let res = pool.import(1, Transaction {
data: vec![4u8],
hash: 4,
priority: 50u64,
longevity: 64u64,
requires: vec![],
provides: vec![vec![0]],
}).unwrap();
let mut it = pool.ready().into_iter().map(|tx| tx.data[0]);
assert_eq!(it.next(), Some(4));
assert_eq!(it.next(), Some(1));
assert_eq!(it.next(), Some(3));
assert_eq!(it.next(), None);
assert_eq!(res, Imported::Ready {
hash: 4,
promoted: vec![1, 3],
failed: vec![2],
removed: vec![],
});
assert_eq!(pool.future.len(), 0);
}
#[test]
fn should_handle_a_cycle_with_low_priority() {
// given
let mut pool = pool();
pool.import(1, Transaction {
data: vec![1u8],
hash: 1,
priority: 5u64,
longevity: 64u64,
requires: vec![vec![0]],
provides: vec![vec![1]],
}).unwrap();
pool.import(1, Transaction {
data: vec![3u8],
hash: 3,
priority: 5u64,
longevity: 64u64,
requires: vec![vec![1]],
provides: vec![vec![2]],
}).unwrap();
assert_eq!(pool.ready().count(), 0);
assert_eq!(pool.ready.len(), 0);
// when
pool.import(1, Transaction {
data: vec![2u8],
hash: 2,
priority: 5u64,
longevity: 64u64,
requires: vec![vec![2]],
provides: vec![vec![0]],
}).unwrap();
// then
{
let mut it = pool.ready().into_iter().map(|tx| tx.data[0]);
assert_eq!(it.next(), None);
}
// all transactions occupy the Future queue - it's fine
assert_eq!(pool.future.len(), 3);
// let's close the cycle with one additional transaction
let err = pool.import(1, Transaction {
data: vec![4u8],
hash: 4,
priority: 1u64, // lower priority than Tx(2)
longevity: 64u64,
requires: vec![],
provides: vec![vec![0]],
}).unwrap_err();
let mut it = pool.ready().into_iter().map(|tx| tx.data[0]);
assert_eq!(it.next(), None);
assert_eq!(pool.ready.len(), 0);
assert_eq!(pool.future.len(), 0);
if let error::ErrorKind::CycleDetected = *err.kind() {
} else {
assert!(false, "Invalid error kind: {:?}", err.kind());
}
}
#[test]
fn should_remove_invalid_transactions() {
// given
let mut pool = pool();
pool.import(1, Transaction {
data: vec![5u8],
hash: 5,
priority: 5u64,
longevity: 64u64,
requires: vec![],
provides: vec![vec![0], vec![4]],
}).unwrap();
pool.import(1, Transaction {
data: vec![1u8],
hash: 1,
priority: 5u64,
longevity: 64u64,
requires: vec![vec![0]],
provides: vec![vec![1]],
}).unwrap();
pool.import(1, Transaction {
data: vec![3u8],
hash: 3,
priority: 5u64,
longevity: 64u64,
requires: vec![vec![2]],
provides: vec![],
}).unwrap();
pool.import(1, Transaction {
data: vec![2u8],
hash: 2,
priority: 5u64,
longevity: 64u64,
requires: vec![vec![1]],
provides: vec![vec![3], vec![2]],
}).unwrap();
pool.import(1, Transaction {
data: vec![4u8],
hash: 4,
priority: 1_000u64,
longevity: 64u64,
requires: vec![vec![3], vec![4]],
provides: vec![],
}).unwrap();
// future
pool.import(1, Transaction {
data: vec![6u8],
hash: 6,
priority: 1_000u64,
longevity: 64u64,
requires: vec![vec![11]],
provides: vec![],
}).unwrap();
assert_eq!(pool.ready().count(), 5);
assert_eq!(pool.future.len(), 1);
// when
pool.remove_invalid(&[6, 1]);
// then
assert_eq!(pool.ready().count(), 1);
assert_eq!(pool.future.len(), 0);
}
#[test]
fn should_prune_ready_transactions() {
// given
let mut pool = pool();
// future (waiting for 0)
pool.import(1, Transaction {
data: vec![5u8],
hash: 5,
priority: 5u64,
longevity: 64u64,
requires: vec![vec![0]],
provides: vec![vec![100]],
}).unwrap();
// ready
pool.import(1, Transaction {
data: vec![1u8],
hash: 1,
priority: 5u64,
longevity: 64u64,
requires: vec![],
provides: vec![vec![1]],
}).unwrap();
pool.import(1, Transaction {
data: vec![2u8],
hash: 2,
priority: 5u64,
longevity: 64u64,
requires: vec![vec![2]],
provides: vec![vec![3]],
}).unwrap();
pool.import(1, Transaction {
data: vec![3u8],
hash: 3,
priority: 5u64,
longevity: 64u64,
requires: vec![vec![1]],
provides: vec![vec![2]],
}).unwrap();
pool.import(1, Transaction {
data: vec![4u8],
hash: 4,
priority: 1_000u64,
longevity: 64u64,
requires: vec![vec![3], vec![2]],
provides: vec![vec![4]],
}).unwrap();
assert_eq!(pool.ready().count(), 4);
assert_eq!(pool.future.len(), 1);
// when
let result = pool.prune_tags(1, vec![vec![0], vec![2]]);
// then
assert_eq!(result.pruned.len(), 2);
assert_eq!(result.failed.len(), 0);
assert_eq!(result.promoted[0], Imported::Ready {
hash: 5,
promoted: vec![],
failed: vec![],
removed: vec![],
});
assert_eq!(result.promoted.len(), 1);
assert_eq!(pool.future.len(), 0);
assert_eq!(pool.ready.len(), 3);
assert_eq!(pool.ready().count(), 3);
}
}
@@ -0,0 +1,68 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
//! Transaction pool errors.
use sr_primitives::transaction_validity::TransactionPriority as Priority;
error_chain! {
errors {
/// Transaction is not verifiable yet, but might be in the future.
UnknownTransactionValidity {
description("Runtime cannot determine validity of the transaction yet."),
display("Unkown Transaction Validity"),
}
/// Transaction is invalid
InvalidTransaction {
description("Runtime check for the transaction failed."),
display("Invalid Transaction"),
}
/// The transaction is temporarily baned
TemporarilyBanned {
description("Transaction is temporarily banned from importing to the pool."),
display("Temporarily Banned"),
}
/// The transaction is already in the pool.
AlreadyImported {
description("Transaction is already in the pool."),
display("Already imported"),
}
/// The transaction cannot be imported cause it's a replacement and has too low priority.
TooLowPriority(old: Priority, new: Priority) {
description("The priority is too low to replace transactions already in the pool."),
display("Too low priority ({} > {})", old, new)
}
/// Deps cycle detected and we couldn't import transaction.
CycleDetected {
description("Transaction was not imported because of detected cycle."),
display("Cycle Detected"),
}
}
}
/// Transaction pool error conversion.
pub trait IntoPoolError: ::std::error::Error + Send + Sized {
/// Try to extract original `Error`
///
/// This implementation is optional and used only to
/// provide more descriptive error messages for end users
/// of RPC API.
fn into_pool_error(self) -> ::std::result::Result<Error, Self> { Err(self) }
}
impl IntoPoolError for Error {
fn into_pool_error(self) -> ::std::result::Result<Error, Self> { Ok(self) }
}
@@ -0,0 +1,176 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use std::{
collections::{HashMap, HashSet},
hash,
};
use sr_primitives::transaction_validity::{
TransactionTag as Tag,
};
use base_pool::Transaction;
/// Transaction with partially satisfied dependencies.
#[derive(Debug)]
pub struct WaitingTransaction<Hash, Ex> {
/// Transaction details.
pub transaction: Transaction<Hash, Ex>,
/// Tags that are required and have not been satisfied yet by other transactions in the pool.
pub missing_tags: HashSet<Tag>,
}
impl<Hash, Ex> WaitingTransaction<Hash, Ex> {
/// Creates a new `WaitingTransaction`.
///
/// Computes the set of missing tags based on the requirements and tags that
/// are provided by all transactions in the ready queue.
pub fn new(transaction: Transaction<Hash, Ex>, provided: &HashMap<Tag, Hash>) -> Self {
let missing_tags = transaction.requires
.iter()
.filter(|tag| !provided.contains_key(&**tag))
.cloned()
.collect();
WaitingTransaction {
transaction,
missing_tags,
}
}
/// Marks the tag as satisfied.
pub fn satisfy_tag(&mut self, tag: &Tag) {
self.missing_tags.remove(tag);
}
/// Returns true if transaction has all requirements satisfied.
pub fn is_ready(&self) -> bool {
self.missing_tags.is_empty()
}
}
/// A pool of transactions that are not yet ready to be included in the block.
///
/// Contains transactions that are still awaiting for some other transactions that
/// could provide a tag that they require.
#[derive(Debug)]
pub struct FutureTransactions<Hash: hash::Hash + Eq, Ex> {
/// tags that are not yet provided by any transaction and we await for them
wanted_tags: HashMap<Tag, HashSet<Hash>>,
/// Transactions waiting for a particular other transaction
waiting: HashMap<Hash, WaitingTransaction<Hash, Ex>>,
}
impl<Hash: hash::Hash + Eq, Ex> Default for FutureTransactions<Hash, Ex> {
fn default() -> Self {
FutureTransactions {
wanted_tags: Default::default(),
waiting: Default::default(),
}
}
}
const WAITING_PROOF: &str = r"#
In import we always insert to `waiting` if we push to `wanted_tags`;
when removing from `waiting` we always clear `wanted_tags`;
every hash from `wanted_tags` is always present in `waiting`;
qed
#";
impl<Hash: hash::Hash + Eq + Clone, Ex> FutureTransactions<Hash, Ex> {
/// Import transaction to Future queue.
///
/// Only transactions that don't have all their tags satisfied should occupy
/// the Future queue.
/// As soon as required tags are provided by some other transactions that are ready
/// we should remove the transactions from here and move them to the Ready queue.
pub fn import(&mut self, tx: WaitingTransaction<Hash, Ex>) {
assert!(!tx.is_ready(), "Transaction is ready.");
assert!(!self.waiting.contains_key(&tx.transaction.hash), "Transaction is already imported.");
// Add all tags that are missing
for tag in &tx.missing_tags {
let mut entry = self.wanted_tags.entry(tag.clone()).or_insert_with(HashSet::new);
entry.insert(tx.transaction.hash.clone());
}
// Add the transaction to a by-hash waiting map
self.waiting.insert(tx.transaction.hash.clone(), tx);
}
/// Returns true if given hash is part of the queue.
pub fn contains(&self, hash: &Hash) -> bool {
self.waiting.contains_key(hash)
}
/// Satisfies provided tags in transactions that are waiting for them.
///
/// Returns (and removes) transactions that became ready after their last tag got
/// satisfied and now we can remove them from Future and move to Ready queue.
pub fn satisfy_tags<T: AsRef<Tag>>(&mut self, tags: impl IntoIterator<Item=T>) -> Vec<WaitingTransaction<Hash, Ex>> {
let mut became_ready = vec![];
for tag in tags {
if let Some(hashes) = self.wanted_tags.remove(tag.as_ref()) {
for hash in hashes {
let is_ready = {
let mut tx = self.waiting.get_mut(&hash)
.expect(WAITING_PROOF);
tx.satisfy_tag(tag.as_ref());
tx.is_ready()
};
if is_ready {
let tx = self.waiting.remove(&hash).expect(WAITING_PROOF);
became_ready.push(tx);
}
}
}
}
became_ready
}
/// Removes transactions for given list of hashes.
///
/// Returns a list of actually removed transactions.
pub fn remove(&mut self, hashes: &[Hash]) -> Vec<Transaction<Hash, Ex>> {
let mut removed = vec![];
for hash in hashes {
if let Some(waiting_tx) = self.waiting.remove(hash) {
// remove from wanted_tags as well
for tag in waiting_tx.missing_tags {
let remove = if let Some(mut wanted) = self.wanted_tags.get_mut(&tag) {
wanted.remove(hash);
wanted.is_empty()
} else { false };
if remove {
self.wanted_tags.remove(&tag);
}
}
// add to result
removed.push(waiting_tx.transaction)
}
}
removed
}
/// Returns number of transactions in the Future queue.
pub fn len(&self) -> usize {
self.waiting.len()
}
}
@@ -0,0 +1,53 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
// tag::description[]
//! Generic Transaction Pool
//!
//! The pool is based on dependency graph between transactions
//! and their priority.
//! The pool is able to return an iterator that traverses transaction
//! graph in the correct order taking into account priorities and dependencies.
//!
//! TODO [ToDr]
//! - [ ] Longevity handling (remove obsolete transactions periodically)
//! - [ ] Multi-threading (getting ready transactions should not block the pool)
// end::description[]
#![warn(missing_docs)]
#![warn(unused_extern_crates)]
extern crate futures;
extern crate parking_lot;
extern crate sr_primitives;
#[macro_use] extern crate error_chain;
#[macro_use] extern crate log;
#[macro_use] extern crate serde_derive;
mod future;
mod listener;
mod pool;
mod ready;
mod rotator;
pub mod base_pool;
pub mod error;
pub mod watcher;
pub use self::error::IntoPoolError;
pub use self::base_pool::{Transaction, Status};
pub use self::pool::{Pool, Options, ChainApi, EventStream, ExtrinsicFor, BlockHash, ExHash, NumberFor, TransactionFor};
@@ -0,0 +1,98 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use std::{
collections::HashMap,
hash,
};
use watcher;
use sr_primitives::traits;
/// Extrinsic pool default listener.
pub struct Listener<H: hash::Hash + Eq, H2> {
watchers: HashMap<H, watcher::Sender<H, H2>>
}
impl<H: hash::Hash + Eq, H2> Default for Listener<H, H2> {
fn default() -> Self {
Listener {
watchers: Default::default(),
}
}
}
impl<H: hash::Hash + traits::Member, H2: Clone> Listener<H, H2> {
fn fire<F>(&mut self, hash: &H, fun: F) where F: FnOnce(&mut watcher::Sender<H, H2>) {
let clean = if let Some(h) = self.watchers.get_mut(hash) {
fun(h);
h.is_done()
} else {
false
};
if clean {
self.watchers.remove(hash);
}
}
/// Creates a new watcher for given verified extrinsic.
///
/// The watcher can be used to subscribe to lifecycle events of that extrinsic.
pub fn create_watcher(&mut self, hash: H) -> watcher::Watcher<H, H2> {
let sender = self.watchers.entry(hash).or_insert_with(watcher::Sender::default);
sender.new_watcher()
}
/// Notify the listeners about extrinsic broadcast.
pub fn broadcasted(&mut self, hash: &H, peers: Vec<String>) {
self.fire(hash, |watcher| watcher.broadcast(peers));
}
/// New transaction was added to the ready pool or promoted from the future pool.
pub fn ready(&mut self, tx: &H, old: Option<&H>) {
if let Some(old) = old {
self.fire(old, |watcher| watcher.usurped(tx.clone()));
}
}
/// New transaction was added to the future pool.
pub fn future(&mut self, _tx: &H) {
}
/// Transaction was dropped from the pool because of the limit.
pub fn dropped(&mut self, tx: &H, by: Option<&H>) {
self.fire(tx, |watcher| match by {
Some(t) => watcher.usurped(t.clone()),
None => watcher.dropped(),
})
}
/// Transaction was rejected from the pool.
pub fn rejected(&mut self, tx: &H, is_invalid: bool) {
warn!(target: "transaction-pool", "Extrinsic rejected ({}): {:?}", is_invalid, tx);
}
/// Transaction was removed as invalid.
pub fn invalid(&mut self, tx: &H) {
warn!(target: "transaction-pool", "Extrinsic invalid: {:?}", tx);
}
/// Transaction was pruned from the pool.
pub fn pruned(&mut self, header_hash: H2, tx: &H) {
self.fire(tx, |watcher| watcher.finalised(header_hash))
}
}
@@ -0,0 +1,333 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use std::{
collections::HashMap,
hash,
sync::Arc,
time,
};
use base_pool as base;
use error;
use listener::Listener;
use rotator::PoolRotator;
use watcher::Watcher;
use futures::sync::mpsc;
use parking_lot::{Mutex, RwLock};
use sr_primitives::{
generic::BlockId,
traits::{self, As},
transaction_validity::{TransactionValidity, TransactionTag as Tag},
};
/// Modification notification event stream type;
pub type EventStream = mpsc::UnboundedReceiver<()>;
/// Extrinsic hash type for a pool.
pub type ExHash<A> = <A as ChainApi>::Hash;
/// Block hash type for a pool.
pub type BlockHash<A> = <<A as ChainApi>::Block as traits::Block>::Hash;
/// Extrinsic type for a pool.
pub type ExtrinsicFor<A> = <<A as ChainApi>::Block as traits::Block>::Extrinsic;
/// Block number type for the ChainApi
pub type NumberFor<A> = traits::NumberFor<<A as ChainApi>::Block>;
/// A type of transaction stored in the pool
pub type TransactionFor<A> = Arc<base::Transaction<ExHash<A>, TxData<ExtrinsicFor<A>>>>;
/// Concrete extrinsic validation and query logic.
pub trait ChainApi: Send + Sync {
/// Block type.
type Block: traits::Block;
/// Hash type
type Hash: hash::Hash + Eq + traits::Member;
/// Error type.
type Error: From<error::Error> + error::IntoPoolError;
/// Verify extrinsic at given block.
fn validate_transaction(&self, at: &BlockId<Self::Block>, uxt: &ExtrinsicFor<Self>) -> Result<TransactionValidity, Self::Error>;
/// Returns a block number given the block id.
fn block_id_to_number(&self, at: &BlockId<Self::Block>) -> Result<Option<NumberFor<Self>>, Self::Error>;
/// Returns a block hash given the block id.
fn block_id_to_hash(&self, at: &BlockId<Self::Block>) -> Result<Option<BlockHash<Self>>, Self::Error>;
/// Hash the extrinsic.
fn hash(&self, uxt: &ExtrinsicFor<Self>) -> Self::Hash;
}
/// Maximum time the transaction will be kept in the pool.
///
/// Transactions that don't get included within the limit are removed from the pool.
const POOL_TIME: time::Duration = time::Duration::from_secs(60 * 5);
/// Additional transaction data
#[derive(Debug, Serialize, Deserialize)]
pub struct TxData<Ex> {
/// Raw data stored by the user.
pub raw: Ex,
/// Transaction validity deadline.
/// TODO [ToDr] Should we use longevity instead?
#[serde(skip)]
pub valid_till: Option<time::Instant>,
}
/// Pool configuration options.
#[derive(Debug, Clone, Default)]
pub struct Options;
/// Extrinsics pool.
pub struct Pool<B: ChainApi> {
api: B,
listener: RwLock<Listener<ExHash<B>, BlockHash<B>>>,
pool: RwLock<base::BasePool<
ExHash<B>,
TxData<ExtrinsicFor<B>>,
>>,
import_notification_sinks: Mutex<Vec<mpsc::UnboundedSender<()>>>,
rotator: PoolRotator<ExHash<B>>,
}
impl<B: ChainApi> Pool<B> {
/// Imports a bunch of unverified extrinsics to the pool
pub fn submit_at<T>(&self, at: &BlockId<B::Block>, xts: T) -> Result<Vec<Result<ExHash<B>, B::Error>>, B::Error> where
T: IntoIterator<Item=ExtrinsicFor<B>>
{
let block_number = self.api.block_id_to_number(at)?
.ok_or_else(|| error::ErrorKind::Msg(format!("Invalid block id: {:?}", at)).into())?;
Ok(xts
.into_iter()
.map(|xt| -> Result<_, B::Error> {
let hash = self.api.hash(&xt);
if self.rotator.is_banned(&hash) {
return Err(error::ErrorKind::TemporarilyBanned.into())?;
}
match self.api.validate_transaction(at, &xt)? {
TransactionValidity::Valid(priority, requires, provides, longevity)=> {
Ok(base::Transaction {
data: TxData {
raw: xt,
valid_till: Some(time::Instant::now() + POOL_TIME),
},
hash,
priority,
requires,
provides,
longevity,
})
},
TransactionValidity::Invalid => {
bail!(error::Error::from(error::ErrorKind::InvalidTransaction))
},
TransactionValidity::Unknown => {
self.listener.write().rejected(&hash, false);
bail!(error::Error::from(error::ErrorKind::UnknownTransactionValidity))
},
}
})
.map(|tx| {
let imported = self.pool.write().import(block_number.as_(), tx?)?;
self.import_notification_sinks.lock().retain(|sink| sink.unbounded_send(()).is_ok());
let mut listener = self.listener.write();
fire_events(&mut *listener, &imported);
Ok(imported.hash().clone())
})
.collect())
}
/// Imports one unverified extrinsic to the pool
pub fn submit_one(&self, at: &BlockId<B::Block>, xt: ExtrinsicFor<B>) -> Result<ExHash<B>, B::Error> {
Ok(self.submit_at(at, ::std::iter::once(xt))?.pop().expect("One extrinsic passed; one result returned; qed")?)
}
/// Import a single extrinsic and starts to watch their progress in the pool.
pub fn submit_and_watch(&self, at: &BlockId<B::Block>, xt: ExtrinsicFor<B>) -> Result<Watcher<ExHash<B>, BlockHash<B>>, B::Error> {
let xt = self.submit_one(at, xt)?;
Ok(self.listener.write().create_watcher(xt))
}
/// Prunes ready transactions that provide given list of tags.
pub fn prune_tags(&self, at: &BlockId<B::Block>, tags: impl IntoIterator<Item=Tag>) -> Result<(), B::Error> {
let block_number = self.api.block_id_to_number(at)?
.ok_or_else(|| error::ErrorKind::Msg(format!("Invalid block id: {:?}", at)).into())?;
let status = self.pool.write().prune_tags(block_number.as_(), tags);
{
let mut listener = self.listener.write();
for promoted in &status.promoted {
fire_events(&mut *listener, promoted);
}
for f in &status.failed {
listener.dropped(f, None);
}
}
// try to re-submit pruned transactions since some of them might be still valid.
let hashes = status.pruned.iter().map(|tx| tx.hash.clone()).collect::<Vec<_>>();
let results = self.submit_at(at, status.pruned.into_iter().map(|tx| tx.data.raw.clone()))?;
// Fire mined event for transactions that became invalid.
let hashes = results.into_iter().enumerate().filter_map(|(idx, r)| match r.map_err(error::IntoPoolError::into_pool_error) {
Err(Ok(err)) => match err.kind() {
error::ErrorKind::InvalidTransaction => Some(hashes[idx].clone()),
_ => None,
},
_ => None,
});
{
let header_hash = self.api.block_id_to_hash(at)?
.ok_or_else(|| error::ErrorKind::Msg(format!("Invalid block id: {:?}", at)).into())?;
let mut listener = self.listener.write();
for h in hashes {
listener.pruned(header_hash, &h)
}
}
// clear old transactions
self.clear_stale(at)?;
Ok(())
}
/// Removes stale transactions from the pool.
///
/// Stale transactions are transaction beyond their longevity period.
/// Note this function does not remove transactions that are already included in the chain.
/// See `prune_tags` ifyou want this.
pub fn clear_stale(&self, _at: &BlockId<B::Block>) -> Result<(), B::Error> {
let now = time::Instant::now();
let to_remove = self.ready(|pending| pending
.filter(|tx| self.rotator.ban_if_stale(&now, &tx))
.map(|tx| tx.hash.clone())
.collect::<Vec<_>>()
);
// removing old transactions
self.remove_invalid(&to_remove);
// clear banned transactions timeouts
self.rotator.clear_timeouts(&now);
Ok(())
}
}
impl<B: ChainApi> Pool<B> {
/// Create a new transaction pool.
/// TODO [ToDr] Options
pub fn new(_options: Options, api: B) -> Self {
Pool {
api,
listener: Default::default(),
pool: Default::default(),
import_notification_sinks: Default::default(),
rotator: Default::default(),
}
}
/// Return an event stream of transactions imported to the pool.
pub fn import_notification_stream(&self) -> EventStream {
let (sink, stream) = mpsc::unbounded();
self.import_notification_sinks.lock().push(sink);
stream
}
/// Invoked when extrinsics are broadcasted.
pub fn on_broadcasted(&self, propagated: HashMap<ExHash<B>, Vec<String>>) {
let mut listener = self.listener.write();
for (hash, peers) in propagated.into_iter() {
listener.broadcasted(&hash, peers);
}
}
/// Remove from the pool.
pub fn remove_invalid(&self, hashes: &[ExHash<B>]) -> Vec<TransactionFor<B>> {
// temporarily ban invalid transactions
debug!(target: "txpool", "Banning invalid transactions: {:?}", hashes);
self.rotator.ban(&time::Instant::now(), hashes);
let invalid = self.pool.write().remove_invalid(hashes);
let mut listener = self.listener.write();
for tx in &invalid {
listener.invalid(&tx.hash);
}
invalid
}
/// Get ready transactions ordered by priority
pub fn ready<F, X>(&self, f: F) -> X where
F: FnOnce(&mut Iterator<Item=TransactionFor<B>>) -> X,
{
let pool = self.pool.read();
let mut ready = pool.ready();
f(&mut ready)
}
/// Returns all transactions in the pool.
///
/// Be careful with large limit values, as querying the entire pool might be time consuming.
pub fn all(&self, limit: usize) -> Vec<ExtrinsicFor<B>> {
self.ready(|it| it.take(limit).map(|ex| ex.data.raw.clone()).collect())
}
/// Returns pool status.
pub fn status(&self) -> base::Status {
self.pool.read().status()
}
/// Returns transaction hash
pub fn hash_of(&self, xt: &ExtrinsicFor<B>) -> ExHash<B> {
self.api.hash(xt)
}
}
fn fire_events<H, H2, Ex>(
listener: &mut Listener<H, H2>,
imported: &base::Imported<H, Ex>,
) where
H: hash::Hash + Eq + traits::Member,
H2: Clone,
{
match *imported {
base::Imported::Ready { ref promoted, ref failed, ref removed, ref hash } => {
listener.ready(hash, None);
for f in failed {
listener.rejected(f, true);
}
for r in removed {
listener.dropped(&r.hash, Some(hash));
}
for p in promoted {
listener.ready(p, None);
}
},
base::Imported::Future { ref hash } => {
listener.future(hash)
},
}
}
#[cfg(test)]
mod tests {
#[test]
#[ignore]
fn should_have_some_basic_tests() {
assert_eq!(true, false);
}
}
@@ -0,0 +1,579 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use std::{
collections::{HashMap, HashSet, BTreeSet},
cmp,
hash,
sync::Arc,
};
use sr_primitives::traits::Member;
use sr_primitives::transaction_validity::{
TransactionTag as Tag,
};
use error;
use future::WaitingTransaction;
use base_pool::{BlockNumber, Transaction};
#[derive(Debug)]
pub struct TransactionRef<Hash, Ex> {
pub transaction: Arc<Transaction<Hash, Ex>>,
pub valid_till: BlockNumber,
pub insertion_id: u64,
}
impl<Hash, Ex> Clone for TransactionRef<Hash, Ex> {
fn clone(&self) -> Self {
TransactionRef {
transaction: self.transaction.clone(),
valid_till: self.valid_till,
insertion_id: self.insertion_id,
}
}
}
impl<Hash, Ex> Ord for TransactionRef<Hash, Ex> {
fn cmp(&self, other: &Self) -> cmp::Ordering {
self.transaction.priority.cmp(&other.transaction.priority)
.then(other.valid_till.cmp(&self.valid_till))
.then(other.insertion_id.cmp(&self.insertion_id))
}
}
impl<Hash, Ex> PartialOrd for TransactionRef<Hash, Ex> {
fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
Some(self.cmp(other))
}
}
impl<Hash, Ex> PartialEq for TransactionRef<Hash, Ex> {
fn eq(&self, other: &Self) -> bool {
self.cmp(other) == cmp::Ordering::Equal
}
}
impl<Hash, Ex> Eq for TransactionRef<Hash, Ex> {}
#[derive(Debug)]
struct ReadyTx<Hash, Ex> {
/// A reference to a transaction
pub transaction: TransactionRef<Hash, Ex>,
/// A list of transactions that get unlocked by this one
pub unlocks: Vec<Hash>,
/// How many required tags are provided inherently
///
/// Some transactions might be already pruned from the queue,
/// so when we compute ready set we may consider this transactions ready earlier.
pub requires_offset: usize,
}
const HASH_READY: &str = r#"
Every time transaction is imported its hash is placed in `ready` map and tags in `provided_tags`;
Every time transaction is removed from the queue we remove the hash from `ready` map and from `provided_tags`;
Hence every hash retrieved from `provided_tags` is always present in `ready`;
qed
"#;
#[derive(Debug)]
pub struct ReadyTransactions<Hash: hash::Hash + Eq, Ex> {
/// Insertion id
insertion_id: u64,
/// tags that are provided by Ready transactions
provided_tags: HashMap<Tag, Hash>,
/// Transactions that are ready (i.e. don't have any requirements external to the pool)
ready: HashMap<Hash, ReadyTx<Hash, Ex>>,
// ^^ TODO [ToDr] Consider wrapping this into `Arc<RwLock<>>` and allow multiple concurrent iterators
/// Best transactions that are ready to be included to the block without any other previous transaction.
best: BTreeSet<TransactionRef<Hash, Ex>>,
}
impl<Hash: hash::Hash + Eq, Ex> Default for ReadyTransactions<Hash, Ex> {
fn default() -> Self {
ReadyTransactions {
insertion_id: Default::default(),
provided_tags: Default::default(),
ready: Default::default(),
best: Default::default(),
}
}
}
impl<Hash: hash::Hash + Member, Ex> ReadyTransactions<Hash, Ex> {
/// Borrows a map of tags that are provided by transactions in this queue.
pub fn provided_tags(&self) -> &HashMap<Tag, Hash> {
&self.provided_tags
}
/// Returns an iterator of ready transactions.
///
/// Transactions are returned in order:
/// 1. First by the dependencies:
/// - never return transaction that requires a tag, which was not provided by one of the previously returned transactions
/// 2. Then by priority:
/// - If there are two transactions with all requirements satisfied the one with higher priority goes first.
/// 3. Then by the ttl that's left
/// - transactions that are valid for a shorter time go first
/// 4. Lastly we sort by the time in the queue
/// - transactions that are longer in the queue go first
pub fn get<'a>(&'a self) -> impl Iterator<Item=Arc<Transaction<Hash, Ex>>> + 'a {
BestIterator {
all: &self.ready,
best: self.best.clone(),
awaiting: Default::default(),
}
}
/// Imports transactions to the pool of ready transactions.
///
/// The transaction needs to have all tags satisfied (be ready) by transactions
/// that are in this queue.
pub fn import(
&mut self,
block_number: BlockNumber,
tx: WaitingTransaction<Hash, Ex>,
) -> error::Result<Vec<Arc<Transaction<Hash, Ex>>>> {
assert!(tx.is_ready(), "Only ready transactions can be imported.");
assert!(!self.ready.contains_key(&tx.transaction.hash), "Transaction is already imported.");
self.insertion_id += 1;
let insertion_id = self.insertion_id;
let hash = tx.transaction.hash.clone();
let tx = tx.transaction;
let replaced = self.replace_previous(&tx)?;
let mut goes_to_best = true;
// Add links to transactions that unlock the current one
for tag in &tx.requires {
// Check if the transaction that satisfies the tag is still in the queue.
if let Some(other) = self.provided_tags.get(tag) {
let mut tx = self.ready.get_mut(other).expect(HASH_READY);
tx.unlocks.push(hash.clone());
// this transaction depends on some other, so it doesn't go to best directly.
goes_to_best = false;
}
}
// update provided_tags
for tag in tx.provides.clone() {
self.provided_tags.insert(tag, hash.clone());
}
let transaction = TransactionRef {
insertion_id,
valid_till: block_number.saturating_add(tx.longevity),
transaction: Arc::new(tx),
};
// insert to best if it doesn't require any other transaction to be included before it
if goes_to_best {
self.best.insert(transaction.clone());
}
// insert to Ready
self.ready.insert(hash, ReadyTx {
transaction,
unlocks: vec![],
requires_offset: 0,
});
Ok(replaced)
}
/// Returns true if given hash is part of the queue.
pub fn contains(&self, hash: &Hash) -> bool {
self.ready.contains_key(hash)
}
/// Removes invalid transactions from the ready pool.
///
/// NOTE removing a transaction will also cause a removal of all transactions that depend on that one
/// (i.e. the entire subgraph that this transaction is a start of will be removed).
/// All removed transactions are returned.
pub fn remove_invalid(&mut self, hashes: &[Hash]) -> Vec<Arc<Transaction<Hash, Ex>>> {
let mut removed = vec![];
let mut to_remove = hashes.iter().cloned().collect::<Vec<_>>();
loop {
let hash = match to_remove.pop() {
Some(hash) => hash,
None => return removed,
};
if let Some(mut tx) = self.ready.remove(&hash) {
// remove entries from provided_tags
for tag in &tx.transaction.transaction.provides {
self.provided_tags.remove(tag);
}
// remove from unlocks
for tag in &tx.transaction.transaction.requires {
if let Some(hash) = self.provided_tags.get(tag) {
if let Some(tx) = self.ready.get_mut(hash) {
remove_item(&mut tx.unlocks, &hash);
}
}
}
// remove from best
self.best.remove(&tx.transaction);
// remove all transactions that the current one unlocks
to_remove.append(&mut tx.unlocks);
// add to removed
debug!(target: "txpool", "[{:?}] Removed as invalid: ", hash);
removed.push(tx.transaction.transaction);
}
}
}
/// Removes transactions that provide given tag.
///
/// All transactions that lead to a transaction, which provides this tag
/// are going to be removed from the queue, but no other transactions are touched -
/// i.e. all other subgraphs starting from given tag are still considered valid & ready.
pub fn prune_tags(&mut self, tag: Tag) -> Vec<Arc<Transaction<Hash, Ex>>> {
let mut removed = vec![];
let mut to_remove = vec![tag];
loop {
let tag = match to_remove.pop() {
Some(tag) => tag,
None => return removed,
};
let res = self.provided_tags.remove(&tag)
.and_then(|hash| self.ready.remove(&hash));
if let Some(tx) = res {
let unlocks = tx.unlocks;
let tx = tx.transaction.transaction;
// prune previous transactions as well
{
let hash = &tx.hash;
let mut find_previous = |tag| -> Option<Vec<Tag>> {
let prev_hash = self.provided_tags.get(tag)?;
let tx2 = self.ready.get_mut(&prev_hash)?;
remove_item(&mut tx2.unlocks, hash);
// We eagerly prune previous transactions as well.
// But it might not always be good.
// Possible edge case:
// - tx provides two tags
// - the second tag enables some subgraph we don't know of yet
// - we will prune the transaction
// - when we learn about the subgraph it will go to future
// - we will have to wait for re-propagation of that transaction
// Alternatively the caller may attempt to re-import these transactions.
if tx2.unlocks.is_empty() {
Some(tx2.transaction.transaction.provides.clone())
} else {
None
}
};
// find previous transactions
for tag in &tx.requires {
if let Some(mut tags_to_remove) = find_previous(tag) {
to_remove.append(&mut tags_to_remove);
}
}
}
// add the transactions that just got unlocked to `best`
for hash in unlocks {
if let Some(tx) = self.ready.get_mut(&hash) {
tx.requires_offset += 1;
// this transaction is ready
if tx.requires_offset == tx.transaction.transaction.requires.len() {
self.best.insert(tx.transaction.clone());
}
}
}
debug!(target: "txpool", "[{:?}] Pruned.", tx.hash);
removed.push(tx);
}
}
}
/// Checks if the transaction is providing the same tags as other transactions.
///
/// In case that's true it determines if the priority of transactions that
/// we are about to replace is lower than the priority of the replacement transaction.
/// We remove/replace old transactions in case they have lower priority.
///
/// In case replacement is succesful returns a list of removed transactions.
fn replace_previous(&mut self, tx: &Transaction<Hash, Ex>) -> error::Result<Vec<Arc<Transaction<Hash, Ex>>>> {
let mut to_remove = {
// check if we are replacing a transaction
let replace_hashes = tx.provides
.iter()
.filter_map(|tag| self.provided_tags.get(tag))
.collect::<HashSet<_>>();
// early exit if we are not replacing anything.
if replace_hashes.is_empty() {
return Ok(vec![]);
}
// now check if collective priority is lower than the replacement transaction.
let old_priority = replace_hashes
.iter()
.filter_map(|hash| self.ready.get(hash))
.fold(0u64, |total, tx| total.saturating_add(tx.transaction.transaction.priority));
// bail - the transaction has too low priority to replace the old ones
if old_priority >= tx.priority {
bail!(error::ErrorKind::TooLowPriority(old_priority, tx.priority))
}
replace_hashes.into_iter().cloned().collect::<Vec<_>>()
};
let new_provides = tx.provides.iter().cloned().collect::<HashSet<_>>();
let mut removed = vec![];
loop {
let hash = match to_remove.pop() {
Some(hash) => hash,
None => return Ok(removed),
};
let tx = self.ready.remove(&hash).expect(HASH_READY);
// check if this transaction provides stuff that is not provided by the new one.
let (mut unlocks, tx) = (tx.unlocks, tx.transaction.transaction);
{
let invalidated = tx.provides
.iter()
.filter(|tag| !new_provides.contains(&**tag));
for tag in invalidated {
// remove the tag since it's no longer provided by any transaction
self.provided_tags.remove(tag);
// add more transactions to remove
to_remove.append(&mut unlocks);
}
}
removed.push(tx);
}
}
/// Returns number of transactions in this queue.
pub fn len(&self) -> usize {
self.ready.len()
}
}
pub struct BestIterator<'a, Hash: 'a, Ex: 'a> {
all: &'a HashMap<Hash, ReadyTx<Hash, Ex>>,
awaiting: HashMap<Hash, (usize, TransactionRef<Hash, Ex>)>,
best: BTreeSet<TransactionRef<Hash, Ex>>,
}
impl<'a, Hash: 'a + hash::Hash + Member, Ex: 'a> BestIterator<'a, Hash, Ex> {
/// Depending on number of satisfied requirements insert given ref
/// either to awaiting set or to best set.
fn best_or_awaiting(&mut self, satisfied: usize, tx_ref: TransactionRef<Hash, Ex>) {
if satisfied == tx_ref.transaction.requires.len() {
// If we have satisfied all deps insert to best
self.best.insert(tx_ref);
} else {
// otherwise we're still awaiting for some deps
self.awaiting.insert(tx_ref.transaction.hash.clone(), (satisfied, tx_ref));
}
}
}
impl<'a, Hash: 'a + hash::Hash + Member, Ex: 'a> Iterator for BestIterator<'a, Hash, Ex> {
type Item = Arc<Transaction<Hash, Ex>>;
fn next(&mut self) -> Option<Self::Item> {
let best = self.best.iter().next_back()?.clone();
let best = self.best.take(&best)?;
let ready = match self.all.get(&best.transaction.hash) {
Some(ready) => ready,
// The transaction is not in all, maybe it was removed in the meantime?
None => return self.next(),
};
// Insert transactions that just got unlocked.
for hash in &ready.unlocks {
// first check local awaiting transactions
if let Some((mut satisfied, tx_ref)) = self.awaiting.remove(hash) {
satisfied += 1;
self.best_or_awaiting(satisfied, tx_ref);
// then get from the pool
} else if let Some(next) = self.all.get(hash) {
self.best_or_awaiting(next.requires_offset + 1, next.transaction.clone());
}
}
Some(best.transaction.clone())
}
}
// See: https://github.com/rust-lang/rust/issues/40062
fn remove_item<T: PartialEq>(vec: &mut Vec<T>, item: &T) {
if let Some(idx) = vec.iter().position(|i| i == item) {
vec.swap_remove(idx);
}
}
#[cfg(test)]
mod tests {
use super::*;
fn tx(id: u8) -> Transaction<u64, Vec<u8>> {
Transaction {
data: vec![id],
hash: id as u64,
priority: 1,
longevity: 2,
requires: vec![vec![1], vec![2]],
provides: vec![vec![3], vec![4]],
}
}
#[test]
fn should_replace_transaction_that_provides_the_same_tag() {
// given
let mut ready = ReadyTransactions::default();
let block_number = 1;
let mut tx1 = tx(1);
tx1.requires.clear();
let mut tx2 = tx(2);
tx2.requires.clear();
tx2.provides = vec![vec![3]];
let mut tx3 = tx(3);
tx3.requires.clear();
tx3.provides = vec![vec![4]];
// when
let x = WaitingTransaction::new(tx2, &ready.provided_tags());
ready.import(block_number, x).unwrap();
let x = WaitingTransaction::new(tx3, &ready.provided_tags());
ready.import(block_number, x).unwrap();
assert_eq!(ready.get().count(), 2);
// too low priority
let x = WaitingTransaction::new(tx1.clone(), &ready.provided_tags());
ready.import(block_number, x).unwrap_err();
tx1.priority = 10;
let x = WaitingTransaction::new(tx1.clone(), &ready.provided_tags());
ready.import(block_number, x).unwrap();
// then
assert_eq!(ready.get().count(), 1);
}
#[test]
fn should_return_best_transactions_in_correct_order() {
// given
let mut ready = ReadyTransactions::default();
let mut tx1 = tx(1);
tx1.requires.clear();
let mut tx2 = tx(2);
tx2.requires = tx1.provides.clone();
tx2.provides = vec![vec![106]];
let mut tx3 = tx(3);
tx3.requires = vec![tx1.provides[0].clone(), vec![106]];
tx3.provides = vec![];
let mut tx4 = tx(4);
tx4.requires = vec![tx1.provides[0].clone()];
tx4.provides = vec![];
let block_number = 1;
let tx5 = Transaction {
data: vec![5],
hash: 5,
priority: 1,
longevity: u64::max_value(), // use the max_value() here for testing.
requires: vec![tx1.provides[0].clone()],
provides: vec![],
};
// when
let x = WaitingTransaction::new(tx1, &ready.provided_tags());
ready.import(block_number, x).unwrap();
let x = WaitingTransaction::new(tx2, &ready.provided_tags());
ready.import(block_number, x).unwrap();
let x = WaitingTransaction::new(tx3, &ready.provided_tags());
ready.import(block_number, x).unwrap();
let x = WaitingTransaction::new(tx4, &ready.provided_tags());
ready.import(block_number, x).unwrap();
let x = WaitingTransaction::new(tx5, &ready.provided_tags());
ready.import(block_number, x).unwrap();
// then
assert_eq!(ready.best.len(), 1);
let mut it = ready.get().map(|tx| tx.data[0]);
assert_eq!(it.next(), Some(1));
assert_eq!(it.next(), Some(2));
assert_eq!(it.next(), Some(3));
assert_eq!(it.next(), Some(4));
assert_eq!(it.next(), Some(5));
assert_eq!(it.next(), None);
}
#[test]
fn should_order_refs() {
let mut id = 1;
let mut with_priority = |priority| {
id += 1;
let mut tx = tx(id);
tx.priority = priority;
tx
};
// higher priority = better
assert!(TransactionRef {
transaction: Arc::new(with_priority(3)),
valid_till: 3,
insertion_id: 1,
} > TransactionRef {
transaction: Arc::new(with_priority(2)),
valid_till: 3,
insertion_id: 2,
});
// lower validity = better
assert!(TransactionRef {
transaction: Arc::new(with_priority(3)),
valid_till: 2,
insertion_id: 1,
} > TransactionRef {
transaction: Arc::new(with_priority(3)),
valid_till: 3,
insertion_id: 2,
});
// lower insertion_id = better
assert!(TransactionRef {
transaction: Arc::new(with_priority(3)),
valid_till: 3,
insertion_id: 1,
} > TransactionRef {
transaction: Arc::new(with_priority(3)),
valid_till: 3,
insertion_id: 2,
});
}
}
@@ -0,0 +1,215 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
//! Rotate extrinsic inside the pool.
//!
//! Keeps only recent extrinsic and discard the ones kept for a significant amount of time.
//! Discarded extrinsics are banned so that they don't get re-imported again.
use std::{
collections::HashMap,
hash,
time::{Duration, Instant},
};
use parking_lot::RwLock;
use base_pool::Transaction;
use pool::TxData;
/// Expected size of the banned extrinsics cache.
const EXPECTED_SIZE: usize = 2048;
/// Pool rotator is responsible to only keep fresh extrinsics in the pool.
///
/// Extrinsics that occupy the pool for too long are culled and temporarily banned from entering
/// the pool again.
pub struct PoolRotator<Hash> {
/// How long the extrinsic is banned for.
ban_time: Duration,
/// Currently banned extrinsics.
banned_until: RwLock<HashMap<Hash, Instant>>,
}
impl<Hash: hash::Hash + Eq> Default for PoolRotator<Hash> {
fn default() -> Self {
PoolRotator {
ban_time: Duration::from_secs(60 * 30),
banned_until: Default::default(),
}
}
}
impl<Hash: hash::Hash + Eq + Clone> PoolRotator<Hash> {
/// Returns `true` if extrinsic hash is currently banned.
pub fn is_banned(&self, hash: &Hash) -> bool {
self.banned_until.read().contains_key(hash)
}
/// Bans given set of hashes.
pub fn ban(&self, now: &Instant, hashes: &[Hash]) {
let mut banned = self.banned_until.write();
for hash in hashes {
banned.insert(hash.clone(), *now + self.ban_time);
}
if banned.len() > 2 * EXPECTED_SIZE {
while banned.len() > EXPECTED_SIZE {
if let Some(key) = banned.keys().next().cloned() {
banned.remove(&key);
}
}
}
}
/// Bans extrinsic if it's stale.
///
/// Returns `true` if extrinsic is stale and got banned.
pub fn ban_if_stale<Ex>(&self, now: &Instant, xt: &Transaction<Hash, TxData<Ex>>) -> bool {
match xt.data.valid_till {
Some(ref valid_till) if valid_till > now => {
return false;
}
_ => {},
}
self.ban(now, &[xt.hash.clone()]);
true
}
/// Removes timed bans.
pub fn clear_timeouts(&self, now: &Instant) {
let mut banned = self.banned_until.write();
banned.retain(|_, &mut v| v >= *now);
}
}
#[cfg(test)]
mod tests {
use super::*;
type Hash = u64;
type Ex = ();
fn rotator() -> PoolRotator<Hash> {
PoolRotator {
ban_time: Duration::from_millis(10),
..Default::default()
}
}
fn tx() -> (Hash, Transaction<Hash, TxData<Ex>>) {
let hash = 5u64;
let tx = Transaction {
data: TxData {
raw: (),
valid_till: Some(Instant::now()),
},
hash: hash.clone(),
priority: 5,
longevity: 3,
requires: vec![],
provides: vec![],
};
(hash, tx)
}
#[test]
fn should_not_ban_if_not_stale() {
// given
let (hash, tx) = tx();
let rotator = rotator();
assert!(!rotator.is_banned(&hash));
let past = Instant::now() - Duration::from_millis(1000);
// when
assert!(!rotator.ban_if_stale(&past, &tx));
// then
assert!(!rotator.is_banned(&hash));
}
#[test]
fn should_ban_stale_extrinsic() {
// given
let (hash, tx) = tx();
let rotator = rotator();
assert!(!rotator.is_banned(&hash));
// when
assert!(rotator.ban_if_stale(&Instant::now(), &tx));
// then
assert!(rotator.is_banned(&hash));
}
#[test]
fn should_clear_banned() {
// given
let (hash, tx) = tx();
let rotator = rotator();
assert!(rotator.ban_if_stale(&Instant::now(), &tx));
assert!(rotator.is_banned(&hash));
// when
let future = Instant::now() + rotator.ban_time + rotator.ban_time;
rotator.clear_timeouts(&future);
// then
assert!(!rotator.is_banned(&hash));
}
#[test]
fn should_garbage_collect() {
// given
fn tx_with(i: u64, time: Instant) -> Transaction<Hash, TxData<Ex>> {
let hash = i;
Transaction {
data: TxData {
raw: (),
valid_till: Some(time),
},
hash,
priority: 5,
longevity: 3,
requires: vec![],
provides: vec![],
}
}
let rotator = rotator();
let now = Instant::now();
let past = now - Duration::from_secs(1);
// when
for i in 0..2*EXPECTED_SIZE {
let tx = tx_with(i as u64, past);
assert!(rotator.ban_if_stale(&now, &tx));
}
assert_eq!(rotator.banned_until.read().len(), 2*EXPECTED_SIZE);
// then
let tx = tx_with(2*EXPECTED_SIZE as u64, past);
// trigger a garbage collection
assert!(rotator.ban_if_stale(&now, &tx));
assert_eq!(rotator.banned_until.read().len(), EXPECTED_SIZE);
}
}
@@ -0,0 +1,116 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
//! Extrinsics status updates.
use futures::{
Stream,
sync::mpsc,
};
/// Possible extrinsic status events
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum Status<H, H2> {
/// Extrinsic is part of the future queue.
Future,
/// Extrinsic is part of the ready queue.
Ready,
/// Extrinsic has been finalised in block with given hash.
Finalised(H2),
/// Some state change (perhaps another extrinsic was included) rendered this extrinsic invalid.
Usurped(H),
/// The extrinsic has been broadcast to the given peers.
Broadcast(Vec<String>),
/// Extrinsic has been dropped from the pool because of the limit.
Dropped,
}
/// Extrinsic watcher.
///
/// Represents a stream of status updates for particular extrinsic.
#[derive(Debug)]
pub struct Watcher<H, H2> {
receiver: mpsc::UnboundedReceiver<Status<H, H2>>,
}
impl<H, H2> Watcher<H, H2> {
/// Pipe the notifications to given sink.
///
/// Make sure to drive the future to completion.
pub fn into_stream(self) -> impl Stream<Item=Status<H, H2>, Error=()> {
// we can safely ignore the error here, `UnboundedReceiver` never fails.
self.receiver.map_err(|_| ())
}
}
/// Sender part of the watcher. Exposed only for testing purposes.
#[derive(Debug)]
pub struct Sender<H, H2> {
receivers: Vec<mpsc::UnboundedSender<Status<H, H2>>>,
finalised: bool,
}
impl<H, H2> Default for Sender<H, H2> {
fn default() -> Self {
Sender {
receivers: Default::default(),
finalised: false,
}
}
}
impl<H: Clone, H2: Clone> Sender<H, H2> {
/// Add a new watcher to this sender object.
pub fn new_watcher(&mut self) -> Watcher<H, H2> {
let (tx, receiver) = mpsc::unbounded();
self.receivers.push(tx);
Watcher {
receiver,
}
}
/// Some state change (perhaps another extrinsic was included) rendered this extrinsic invalid.
pub fn usurped(&mut self, hash: H) {
self.send(Status::Usurped(hash))
}
/// Extrinsic has been finalised in block with given hash.
pub fn finalised(&mut self, hash: H2) {
self.send(Status::Finalised(hash));
self.finalised = true;
}
/// Transaction has been dropped from the pool because of the limit.
pub fn dropped(&mut self) {
self.send(Status::Dropped);
}
/// The extrinsic has been broadcast to the given peers.
pub fn broadcast(&mut self, peers: Vec<String>) {
self.send(Status::Broadcast(peers))
}
/// Returns true if the are no more listeners for this extrinsic or it was finalised.
pub fn is_done(&self) -> bool {
self.finalised || self.receivers.is_empty()
}
fn send(&mut self, status: Status<H, H2>) {
self.receivers.retain(|sender| sender.unbounded_send(status.clone()).is_ok())
}
}