// This file is part of Substrate.
// Copyright (C) 2018-2021 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see .
use std::{
collections::{HashMap, HashSet, BTreeSet},
cmp,
hash,
sync::Arc,
};
use serde::Serialize;
use log::trace;
use sp_runtime::traits::Member;
use sp_runtime::transaction_validity::{
TransactionTag as Tag,
};
use sp_transaction_pool::error;
use crate::{
base_pool::Transaction,
future::WaitingTransaction,
tracked_map::{self, ReadOnlyTrackedMap, TrackedMap},
};
/// An in-pool transaction reference.
///
/// Should be cheap to clone.
#[derive(Debug, parity_util_mem::MallocSizeOf)]
pub struct TransactionRef {
/// The actual transaction data.
pub transaction: Arc>,
/// Unique id when transaction was inserted into the pool.
pub insertion_id: u64,
}
impl Clone for TransactionRef {
fn clone(&self) -> Self {
Self {
transaction: self.transaction.clone(),
insertion_id: self.insertion_id,
}
}
}
impl Ord for TransactionRef {
fn cmp(&self, other: &Self) -> cmp::Ordering {
self.transaction.priority.cmp(&other.transaction.priority)
.then_with(|| other.transaction.valid_till.cmp(&self.transaction.valid_till))
.then_with(|| other.insertion_id.cmp(&self.insertion_id))
}
}
impl PartialOrd for TransactionRef {
fn partial_cmp(&self, other: &Self) -> Option {
Some(self.cmp(other))
}
}
impl PartialEq for TransactionRef {
fn eq(&self, other: &Self) -> bool {
self.cmp(other) == cmp::Ordering::Equal
}
}
impl Eq for TransactionRef {}
#[derive(Debug, parity_util_mem::MallocSizeOf)]
pub struct ReadyTx {
/// A reference to a transaction
pub transaction: TransactionRef,
/// A list of transactions that get unlocked by this one
pub unlocks: Vec,
/// How many required tags are provided inherently
///
/// Some transactions might be already pruned from the queue,
/// so when we compute ready set we may consider this transactions ready earlier.
pub requires_offset: usize,
}
impl Clone for ReadyTx {
fn clone(&self) -> Self {
Self {
transaction: self.transaction.clone(),
unlocks: self.unlocks.clone(),
requires_offset: self.requires_offset,
}
}
}
const HASH_READY: &str = r#"
Every time transaction is imported its hash is placed in `ready` map and tags in `provided_tags`;
Every time transaction is removed from the queue we remove the hash from `ready` map and from `provided_tags`;
Hence every hash retrieved from `provided_tags` is always present in `ready`;
qed
"#;
#[derive(Debug, parity_util_mem::MallocSizeOf)]
pub struct ReadyTransactions {
/// Insertion id
insertion_id: u64,
/// tags that are provided by Ready transactions
provided_tags: HashMap,
/// Transactions that are ready (i.e. don't have any requirements external to the pool)
ready: TrackedMap>,
/// Best transactions that are ready to be included to the block without any other previous transaction.
best: BTreeSet>,
}
impl tracked_map::Size for ReadyTx {
fn size(&self) -> usize {
self.transaction.transaction.bytes
}
}
impl Default for ReadyTransactions {
fn default() -> Self {
Self {
insertion_id: Default::default(),
provided_tags: Default::default(),
ready: Default::default(),
best: Default::default(),
}
}
}
impl ReadyTransactions {
/// Borrows a map of tags that are provided by transactions in this queue.
pub fn provided_tags(&self) -> &HashMap {
&self.provided_tags
}
/// Returns an iterator of ready transactions.
///
/// Transactions are returned in order:
/// 1. First by the dependencies:
/// - never return transaction that requires a tag, which was not provided by one of the previously returned transactions
/// 2. Then by priority:
/// - If there are two transactions with all requirements satisfied the one with higher priority goes first.
/// 3. Then by the ttl that's left
/// - transactions that are valid for a shorter time go first
/// 4. Lastly we sort by the time in the queue
/// - transactions that are longer in the queue go first
pub fn get(&self) -> impl Iterator>> {
BestIterator {
all: self.ready.clone(),
best: self.best.clone(),
awaiting: Default::default(),
}
}
/// Imports transactions to the pool of ready transactions.
///
/// The transaction needs to have all tags satisfied (be ready) by transactions
/// that are in this queue.
/// Returns transactions that were replaced by the one imported.
pub fn import(
&mut self,
tx: WaitingTransaction,
) -> error::Result>>> {
assert!(
tx.is_ready(),
"Only ready transactions can be imported. Missing: {:?}", tx.missing_tags
);
assert!(!self.ready.read().contains_key(&tx.transaction.hash), "Transaction is already imported.");
self.insertion_id += 1;
let insertion_id = self.insertion_id;
let hash = tx.transaction.hash.clone();
let transaction = tx.transaction;
let (replaced, unlocks) = self.replace_previous(&transaction)?;
let mut goes_to_best = true;
let mut ready = self.ready.write();
let mut requires_offset = 0;
// Add links to transactions that unlock the current one
for tag in &transaction.requires {
// Check if the transaction that satisfies the tag is still in the queue.
if let Some(other) = self.provided_tags.get(tag) {
let tx = ready.get_mut(other).expect(HASH_READY);
tx.unlocks.push(hash.clone());
// this transaction depends on some other, so it doesn't go to best directly.
goes_to_best = false;
} else {
requires_offset += 1;
}
}
// update provided_tags
// call to replace_previous guarantees that we will be overwriting
// only entries that have been removed.
for tag in &transaction.provides {
self.provided_tags.insert(tag.clone(), hash.clone());
}
let transaction = TransactionRef {
insertion_id,
transaction
};
// insert to best if it doesn't require any other transaction to be included before it
if goes_to_best {
self.best.insert(transaction.clone());
}
// insert to Ready
ready.insert(hash, ReadyTx {
transaction,
unlocks,
requires_offset,
});
Ok(replaced)
}
/// Fold a list of ready transactions to compute a single value.
pub fn fold, &ReadyTx) -> Option>(&mut self, f: F) -> Option {
self.ready
.read()
.values()
.fold(None, f)
}
/// Returns true if given hash is part of the queue.
pub fn contains(&self, hash: &Hash) -> bool {
self.ready.read().contains_key(hash)
}
/// Retrive transaction by hash
pub fn by_hash(&self, hash: &Hash) -> Option>> {
self.by_hashes(&[hash.clone()]).into_iter().next().unwrap_or(None)
}
/// Retrieve transactions by hash
pub fn by_hashes(&self, hashes: &[Hash]) -> Vec