// This file is part of Substrate.
// Copyright (C) 2018-2021 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see .
//! A basic version of the dependency graph.
//!
//! For a more full-featured pool, have a look at the `pool` module.
use std::{
collections::HashSet,
fmt,
hash,
sync::Arc,
};
use log::{trace, debug, warn};
use serde::Serialize;
use sp_core::hexdisplay::HexDisplay;
use sp_runtime::traits::Member;
use sp_runtime::transaction_validity::{
TransactionTag as Tag,
TransactionLongevity as Longevity,
TransactionPriority as Priority,
TransactionSource as Source,
};
use sp_transaction_pool::{error, PoolStatus, InPoolTransaction};
use crate::future::{FutureTransactions, WaitingTransaction};
use crate::ready::ReadyTransactions;
/// Successful import result.
#[derive(Debug, PartialEq, Eq)]
pub enum Imported {
/// Transaction was successfully imported to Ready queue.
Ready {
/// Hash of transaction that was successfully imported.
hash: Hash,
/// Transactions that got promoted from the Future queue.
promoted: Vec,
/// Transactions that failed to be promoted from the Future queue and are now discarded.
failed: Vec,
/// Transactions removed from the Ready pool (replaced).
removed: Vec>>,
},
/// Transaction was successfully imported to Future queue.
Future {
/// Hash of transaction that was successfully imported.
hash: Hash,
}
}
impl Imported {
/// Returns the hash of imported transaction.
pub fn hash(&self) -> &Hash {
use self::Imported::*;
match *self {
Ready { ref hash, .. } => hash,
Future { ref hash, .. } => hash,
}
}
}
/// Status of pruning the queue.
#[derive(Debug)]
pub struct PruneStatus {
/// A list of imports that satisfying the tag triggered.
pub promoted: Vec>,
/// A list of transactions that failed to be promoted and now are discarded.
pub failed: Vec,
/// A list of transactions that got pruned from the ready queue.
pub pruned: Vec>>,
}
/// Immutable transaction
#[cfg_attr(test, derive(Clone))]
#[derive(PartialEq, Eq, parity_util_mem::MallocSizeOf)]
pub struct Transaction {
/// Raw extrinsic representing that transaction.
pub data: Extrinsic,
/// Number of bytes encoding of the transaction requires.
pub bytes: usize,
/// Transaction hash (unique)
pub hash: Hash,
/// Transaction priority (higher = better)
pub priority: Priority,
/// At which block the transaction becomes invalid?
pub valid_till: Longevity,
/// Tags required by the transaction.
pub requires: Vec,
/// Tags that this transaction provides.
pub provides: Vec,
/// Should that transaction be propagated.
pub propagate: bool,
/// Source of that transaction.
pub source: Source,
}
impl AsRef for Transaction {
fn as_ref(&self) -> &Extrinsic {
&self.data
}
}
impl InPoolTransaction for Transaction {
type Transaction = Extrinsic;
type Hash = Hash;
fn data(&self) -> &Extrinsic {
&self.data
}
fn hash(&self) -> &Hash {
&self.hash
}
fn priority(&self) -> &Priority {
&self.priority
}
fn longevity(&self) ->&Longevity {
&self.valid_till
}
fn requires(&self) -> &[Tag] {
&self.requires
}
fn provides(&self) -> &[Tag] {
&self.provides
}
fn is_propagable(&self) -> bool {
self.propagate
}
}
impl Transaction {
/// Explicit transaction clone.
///
/// Transaction should be cloned only if absolutely necessary && we want
/// every reason to be commented. That's why we `Transaction` is not `Clone`,
/// but there's explicit `duplicate` method.
pub fn duplicate(&self) -> Self {
Self {
data: self.data.clone(),
bytes: self.bytes,
hash: self.hash.clone(),
priority: self.priority,
source: self.source,
valid_till: self.valid_till,
requires: self.requires.clone(),
provides: self.provides.clone(),
propagate: self.propagate,
}
}
}
impl fmt::Debug for Transaction where
Hash: fmt::Debug,
Extrinsic: fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
let join_tags = |tags: &[Tag]| {
tags.iter().map(|tag| HexDisplay::from(tag).to_string()).collect::>().join(", ")
};
write!(fmt, "Transaction {{ ")?;
write!(fmt, "hash: {:?}, ", &self.hash)?;
write!(fmt, "priority: {:?}, ", &self.priority)?;
write!(fmt, "valid_till: {:?}, ", &self.valid_till)?;
write!(fmt, "bytes: {:?}, ", &self.bytes)?;
write!(fmt, "propagate: {:?}, ", &self.propagate)?;
write!(fmt, "source: {:?}, ", &self.source)?;
write!(fmt, "requires: [{}], ", join_tags(&self.requires))?;
write!(fmt, "provides: [{}], ", join_tags(&self.provides))?;
write!(fmt, "data: {:?}", &self.data)?;
write!(fmt, "}}")?;
Ok(())
}
}
/// Store last pruned tags for given number of invocations.
const RECENTLY_PRUNED_TAGS: usize = 2;
/// Transaction pool.
///
/// Builds a dependency graph for all transactions in the pool and returns
/// the ones that are currently ready to be executed.
///
/// General note:
/// If function returns some transactions it usually means that importing them
/// as-is for the second time will fail or produce unwanted results.
/// Most likely it is required to revalidate them and recompute set of
/// required tags.
#[derive(Debug)]
#[cfg_attr(not(target_os = "unknown"), derive(parity_util_mem::MallocSizeOf))]
pub struct BasePool {
reject_future_transactions: bool,
future: FutureTransactions,
ready: ReadyTransactions,
/// Store recently pruned tags (for last two invocations).
///
/// This is used to make sure we don't accidentally put
/// transactions to future in case they were just stuck in verification.
recently_pruned: [HashSet; RECENTLY_PRUNED_TAGS],
recently_pruned_index: usize,
}
impl Default for BasePool {
fn default() -> Self {
Self::new(false)
}
}
impl BasePool {
/// Create new pool given reject_future_transactions flag.
pub fn new(reject_future_transactions: bool) -> Self {
Self {
reject_future_transactions,
future: Default::default(),
ready: Default::default(),
recently_pruned: Default::default(),
recently_pruned_index: 0,
}
}
/// Temporary enables future transactions, runs closure and then restores
/// `reject_future_transactions` flag back to previous value.
///
/// The closure accepts the mutable reference to the pool and original value
/// of the `reject_future_transactions` flag.
pub(crate) fn with_futures_enabled(&mut self, closure: impl FnOnce(&mut Self, bool) -> T) -> T {
let previous = self.reject_future_transactions;
self.reject_future_transactions = false;
let return_value = closure(self, previous);
self.reject_future_transactions = previous;
return_value
}
/// Returns if the transaction for the given hash is already imported.
pub fn is_imported(&self, tx_hash: &Hash) -> bool {
self.future.contains(tx_hash) || self.ready.contains(tx_hash)
}
/// Imports transaction to the pool.
///
/// The pool consists of two parts: Future and Ready.
/// The former contains transactions that require some tags that are not yet provided by
/// other transactions in the pool.
/// The latter contains transactions that have all the requirements satisfied and are
/// ready to be included in the block.
pub fn import(
&mut self,
tx: Transaction,
) -> error::Result> {
if self.is_imported(&tx.hash) {
return Err(error::Error::AlreadyImported(Box::new(tx.hash)))
}
let tx = WaitingTransaction::new(
tx,
self.ready.provided_tags(),
&self.recently_pruned,
);
trace!(target: "txpool", "[{:?}] {:?}", tx.transaction.hash, tx);
debug!(
target: "txpool",
"[{:?}] Importing to {}",
tx.transaction.hash,
if tx.is_ready() { "ready" } else { "future" }
);
// If all tags are not satisfied import to future.
if !tx.is_ready() {
if self.reject_future_transactions {
return Err(error::Error::RejectedFutureTransaction);
}
let hash = tx.transaction.hash.clone();
self.future.import(tx);
return Ok(Imported::Future { hash });
}
self.import_to_ready(tx)
}
/// Imports transaction to ready queue.
///
/// NOTE the transaction has to have all requirements satisfied.
fn import_to_ready(&mut self, tx: WaitingTransaction) -> error::Result> {
let hash = tx.transaction.hash.clone();
let mut promoted = vec![];
let mut failed = vec![];
let mut removed = vec![];
let mut first = true;
let mut to_import = vec![tx];
// take first transaction from the list
while let Some(tx) = to_import.pop() {
// find transactions in Future that it unlocks
to_import.append(&mut self.future.satisfy_tags(&tx.transaction.provides));
// import this transaction
let current_hash = tx.transaction.hash.clone();
match self.ready.import(tx) {
Ok(mut replaced) => {
if !first {
promoted.push(current_hash);
}
// The transactions were removed from the ready pool. We might attempt to re-import them.
removed.append(&mut replaced);
},
// transaction failed to be imported.
Err(e) => if first {
debug!(target: "txpool", "[{:?}] Error importing: {:?}", current_hash, e);
return Err(e)
} else {
failed.push(current_hash);
},
}
first = false;
}
// An edge case when importing transaction caused
// some future transactions to be imported and that
// future transactions pushed out current transaction.
// This means that there is a cycle and the transactions should
// be moved back to future, since we can't resolve it.
if removed.iter().any(|tx| tx.hash == hash) {
// We still need to remove all transactions that we promoted
// since they depend on each other and will never get to the best iterator.
self.ready.remove_subtree(&promoted);
debug!(target: "txpool", "[{:?}] Cycle detected, bailing.", hash);
return Err(error::Error::CycleDetected)
}
Ok(Imported::Ready {
hash,
promoted,
failed,
removed,
})
}
/// Returns an iterator over ready transactions in the pool.
pub fn ready(&self) -> impl Iterator>> {
self.ready.get()
}
/// Returns an iterator over future transactions in the pool.
pub fn futures(&self) -> impl Iterator> {
self.future.all()
}
/// Returns pool transactions given list of hashes.
///
/// Includes both ready and future pool. For every hash in the `hashes`
/// iterator an `Option` is produced (so the resulting `Vec` always have the same length).
pub fn by_hashes(&self, hashes: &[Hash]) -> Vec