Transaction eras (#758)

* Initial groundwork

* A mess.

* Integrate

* Fix tests

* Unit tests

* Tests for unchecked_extrisnic

* fix tab

* Improve binary format.

* fix tests

* Rename extrinsic-pool -> transaction-pool

Closes #770

* Implement unimplemented.

* typo
This commit is contained in:
Gav Wood
2018-09-20 14:01:01 +02:00
committed by GitHub
parent 43068f8fc3
commit 67bf1a6eaa
42 changed files with 782 additions and 232 deletions
@@ -0,0 +1,33 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
//! External Error trait for extrinsic pool.
use txpool;
/// Extrinsic pool error.
pub trait IntoPoolError: ::std::error::Error + Send + Sized {
/// Try to extract original `txpool::Error`
///
/// This implementation is optional and used only to
/// provide more descriptive error messages for end users
/// of RPC API.
fn into_pool_error(self) -> Result<txpool::Error, Self> { Err(self) }
}
impl IntoPoolError for txpool::Error {
fn into_pool_error(self) -> Result<txpool::Error, Self> { Ok(self) }
}
@@ -0,0 +1,49 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
// tag::description[]
//! Generic extrinsic pool.
// end::description[]
#![warn(missing_docs)]
#![warn(unused_extern_crates)]
extern crate futures;
extern crate parking_lot;
extern crate sr_primitives as runtime_primitives;
#[macro_use]
extern crate log;
extern crate serde;
#[macro_use]
extern crate serde_derive;
extern crate transaction_pool as txpool;
#[cfg(test)] extern crate substrate_test_client as test_client;
#[cfg(test)] extern crate substrate_keyring as keyring;
#[cfg(test)] extern crate parity_codec as codec;
pub mod watcher;
mod error;
mod listener;
mod pool;
mod rotator;
pub use listener::Listener;
pub use pool::{Pool, ChainApi, EventStream, Verified, VerifiedFor, ExtrinsicFor, ExHash, AllExtrinsics};
pub use txpool::scoring;
pub use txpool::{Error, ErrorKind};
pub use error::IntoPoolError;
pub use txpool::{Options, Status, LightStatus, VerifiedTransaction, Readiness, Transaction};
@@ -0,0 +1,95 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use std::{
sync::Arc,
fmt,
collections::HashMap,
};
use txpool;
use watcher;
/// Extrinsic pool default listener.
#[derive(Default)]
pub struct Listener<H: ::std::hash::Hash + Eq> {
watchers: HashMap<H, watcher::Sender<H>>
}
impl<H: ::std::hash::Hash + Eq + Copy + fmt::Debug + fmt::LowerHex + Default> Listener<H> {
/// Creates a new watcher for given verified extrinsic.
///
/// The watcher can be used to subscribe to lifecycle events of that extrinsic.
pub fn create_watcher<T: txpool::VerifiedTransaction<Hash=H>>(&mut self, xt: Arc<T>) -> watcher::Watcher<H> {
let sender = self.watchers.entry(*xt.hash()).or_insert_with(watcher::Sender::default);
sender.new_watcher()
}
/// Notify the listeners about extrinsic broadcast.
pub fn broadcasted(&mut self, hash: &H, peers: Vec<String>) {
self.fire(hash, |watcher| watcher.broadcast(peers));
}
fn fire<F>(&mut self, hash: &H, fun: F) where F: FnOnce(&mut watcher::Sender<H>) {
let clean = if let Some(h) = self.watchers.get_mut(hash) {
fun(h);
h.is_done()
} else {
false
};
if clean {
self.watchers.remove(hash);
}
}
}
impl<H, T> txpool::Listener<T> for Listener<H> where
H: ::std::hash::Hash + Eq + Copy + fmt::Debug + fmt::LowerHex + Default,
T: txpool::VerifiedTransaction<Hash=H>,
{
fn added(&mut self, tx: &Arc<T>, old: Option<&Arc<T>>) {
if let Some(old) = old {
let hash = tx.hash();
self.fire(old.hash(), |watcher| watcher.usurped(*hash));
}
}
fn dropped(&mut self, tx: &Arc<T>, by: Option<&T>) {
self.fire(tx.hash(), |watcher| match by {
Some(t) => watcher.usurped(*t.hash()),
None => watcher.dropped(),
})
}
fn rejected(&mut self, tx: &Arc<T>, reason: &txpool::ErrorKind) {
warn!(target: "transaction-pool", "Extrinsic rejected ({}): {:?}", reason, tx);
}
fn invalid(&mut self, tx: &Arc<T>) {
warn!(target: "transaction-pool", "Extrinsic invalid: {:?}", tx);
}
fn canceled(&mut self, tx: &Arc<T>) {
debug!(target: "transaction-pool", "Extrinsic canceled: {:?}", tx);
}
fn culled(&mut self, tx: &Arc<T>) {
// TODO [ToDr] latest block number?
let header_hash = Default::default();
self.fire(tx.hash(), |watcher| watcher.finalised(header_hash))
}
}
+606
View File
@@ -0,0 +1,606 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use std::{
collections::{BTreeMap, HashMap},
fmt,
sync::Arc,
time,
};
use futures::sync::mpsc;
use parking_lot::{Mutex, RwLock};
use serde::{Serialize, de::DeserializeOwned};
use txpool::{self, Scoring, Readiness};
use error::IntoPoolError;
use listener::Listener;
use rotator::PoolRotator;
use watcher::Watcher;
use runtime_primitives::{generic::BlockId, traits::Block as BlockT};
/// Modification notification event stream type;
pub type EventStream = mpsc::UnboundedReceiver<()>;
/// Extrinsic hash type for a pool.
pub type ExHash<A> = <A as ChainApi>::Hash;
/// Extrinsic type for a pool.
pub type ExtrinsicFor<A> = <<A as ChainApi>::Block as BlockT>::Extrinsic;
/// Verified extrinsic data for `ChainApi`.
pub type VerifiedFor<A> = Verified<ExtrinsicFor<A>, <A as ChainApi>::VEx>;
/// A collection of all extrinsics.
pub type AllExtrinsics<A> = BTreeMap<<<A as ChainApi>::VEx as txpool::VerifiedTransaction>::Sender, Vec<ExtrinsicFor<A>>>;
/// Verified extrinsic struct. Wraps original extrinsic and verification info.
#[derive(Debug)]
pub struct Verified<Ex, VEx> {
/// Original extrinsic.
pub original: Ex,
/// Verification data.
pub verified: VEx,
/// Pool deadline, after it's reached we remove the extrinsic from the pool.
pub valid_till: time::Instant,
}
impl<Ex, VEx> txpool::VerifiedTransaction for Verified<Ex, VEx>
where
Ex: fmt::Debug,
VEx: txpool::VerifiedTransaction,
{
type Hash = <VEx as txpool::VerifiedTransaction>::Hash;
type Sender = <VEx as txpool::VerifiedTransaction>::Sender;
fn hash(&self) -> &Self::Hash {
self.verified.hash()
}
fn sender(&self) -> &Self::Sender {
self.verified.sender()
}
fn mem_usage(&self) -> usize {
// TODO: add `original` mem usage.
self.verified.mem_usage()
}
}
/// Concrete extrinsic validation and query logic.
pub trait ChainApi: Send + Sync {
/// Block type.
type Block: BlockT;
/// Extrinsic hash type.
type Hash: ::std::hash::Hash + Eq + Copy + fmt::Debug + fmt::LowerHex + Serialize + DeserializeOwned + ::std::str::FromStr + Send + Sync + Default + 'static;
/// Extrinsic sender type.
type Sender: ::std::hash::Hash + fmt::Debug + Serialize + DeserializeOwned + Eq + Clone + Send + Sync + Ord + Default;
/// Unchecked extrinsic type.
/// Verified extrinsic type.
type VEx: txpool::VerifiedTransaction<Hash=Self::Hash, Sender=Self::Sender> + Send + Sync + Clone;
/// Readiness evaluator
type Ready;
/// Error type.
type Error: From<txpool::Error> + IntoPoolError;
/// Score type.
type Score: ::std::cmp::Ord + Clone + Default + fmt::Debug + Send + Send + Sync + fmt::LowerHex;
/// Custom scoring update event type.
type Event: ::std::fmt::Debug;
/// Verify extrinsic at given block.
fn verify_transaction(&self, at: &BlockId<Self::Block>, uxt: &ExtrinsicFor<Self>) -> Result<Self::VEx, Self::Error>;
/// Create new readiness evaluator.
fn ready(&self) -> Self::Ready;
/// Check readiness for verified extrinsic at given block.
fn is_ready(&self, at: &BlockId<Self::Block>, context: &mut Self::Ready, xt: &VerifiedFor<Self>) -> Readiness;
/// Decides on ordering of `T`s from a particular sender.
fn compare(old: &VerifiedFor<Self>, other: &VerifiedFor<Self>) -> ::std::cmp::Ordering;
/// Decides how to deal with two transactions from a sender that seem to occupy the same slot in the queue.
fn choose(old: &VerifiedFor<Self>, new: &VerifiedFor<Self>) -> txpool::scoring::Choice;
/// Updates the transaction scores given a list of transactions and a change to previous scoring.
/// NOTE: you can safely assume that both slices have the same length.
/// (i.e. score at index `i` represents transaction at the same index)
fn update_scores(xts: &[txpool::Transaction<VerifiedFor<Self>>], scores: &mut [Self::Score], change: txpool::scoring::Change<Self::Event>);
/// Decides if `new` should push out `old` transaction from the pool.
///
/// NOTE returning `InsertNew` here can lead to some transactions being accepted above pool limits.
fn should_replace(old: &VerifiedFor<Self>, new: &VerifiedFor<Self>) -> txpool::scoring::Choice;
}
pub struct Ready<'a, 'b, B: 'a + ChainApi> {
api: &'a B,
at: &'b BlockId<B::Block>,
context: B::Ready,
rotator: &'a PoolRotator<B::Hash>,
now: time::Instant,
}
impl<'a, 'b, B: ChainApi> txpool::Ready<VerifiedFor<B>> for Ready<'a, 'b, B> {
fn is_ready(&mut self, xt: &VerifiedFor<B>) -> Readiness {
if self.rotator.ban_if_stale(&self.now, xt) {
debug!(target: "transaction-pool", "[{:?}] Banning as stale.", txpool::VerifiedTransaction::hash(xt));
return Readiness::Stale;
}
self.api.is_ready(self.at, &mut self.context, xt)
}
}
pub struct ScoringAdapter<T>(::std::marker::PhantomData<T>);
impl<T> ::std::fmt::Debug for ScoringAdapter<T> {
fn fmt(&self, _f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
Ok(())
}
}
impl<T: ChainApi> Scoring<VerifiedFor<T>> for ScoringAdapter<T> {
type Score = <T as ChainApi>::Score;
type Event = <T as ChainApi>::Event;
fn compare(&self, old: &VerifiedFor<T>, other: &VerifiedFor<T>) -> ::std::cmp::Ordering {
T::compare(old, other)
}
fn choose(&self, old: &VerifiedFor<T>, new: &VerifiedFor<T>) -> txpool::scoring::Choice {
T::choose(old, new)
}
fn update_scores(&self, xts: &[txpool::Transaction<VerifiedFor<T>>], scores: &mut [Self::Score], change: txpool::scoring::Change<Self::Event>) {
T::update_scores(xts, scores, change)
}
fn should_replace(&self, old: &VerifiedFor<T>, new: &VerifiedFor<T>) -> txpool::scoring::Choice {
T::should_replace(old, new)
}
}
/// Maximum time the transaction will be kept in the pool.
///
/// Transactions that don't get included within the limit are removed from the pool.
const POOL_TIME: time::Duration = time::Duration::from_secs(60 * 5);
/// Extrinsics pool.
pub struct Pool<B: ChainApi> {
api: B,
pool: RwLock<txpool::Pool<
VerifiedFor<B>,
ScoringAdapter<B>,
Listener<B::Hash>,
>>,
import_notification_sinks: Mutex<Vec<mpsc::UnboundedSender<()>>>,
rotator: PoolRotator<B::Hash>,
}
impl<B: ChainApi> Pool<B> {
/// Create a new transaction pool.
pub fn new(options: txpool::Options, api: B) -> Self {
Pool {
pool: RwLock::new(txpool::Pool::new(Listener::default(), ScoringAdapter::<B>(Default::default()), options)),
import_notification_sinks: Default::default(),
api,
rotator: Default::default(),
}
}
/// Imports a pre-verified extrinsic to the pool.
pub fn import(&self, xt: VerifiedFor<B>) -> Result<Arc<VerifiedFor<B>>, B::Error> {
let result = self.pool.write().import(xt)?;
self.import_notification_sinks.lock()
.retain(|sink| sink.unbounded_send(()).is_ok());
Ok(result)
}
/// Return an event stream of transactions imported to the pool.
pub fn import_notification_stream(&self) -> EventStream {
let (sink, stream) = mpsc::unbounded();
self.import_notification_sinks.lock().push(sink);
stream
}
/// Invoked when extrinsics are broadcasted.
pub fn on_broadcasted(&self, propagated: HashMap<B::Hash, Vec<String>>) {
for (hash, peers) in propagated.into_iter() {
self.pool.write().listener_mut().broadcasted(&hash, peers);
}
}
/// Imports a bunch of unverified extrinsics to the pool
pub fn submit_at<T>(&self, at: &BlockId<B::Block>, xts: T) -> Result<Vec<Arc<VerifiedFor<B>>>, B::Error> where
T: IntoIterator<Item=ExtrinsicFor<B>>
{
xts
.into_iter()
.map(|xt| {
match self.api.verify_transaction(at, &xt) {
Ok(ref verified) if self.rotator.is_banned(txpool::VerifiedTransaction::hash(verified)) => {
return (Err(txpool::Error::from("Temporarily Banned".to_owned()).into()), xt)
},
result => (result, xt),
}
})
.map(|(v, xt)| {
let xt = Verified {
original: xt,
verified: v?,
valid_till: time::Instant::now() + POOL_TIME,
};
Ok(self.pool.write().import(xt)?)
})
.collect()
}
/// Imports one unverified extrinsic to the pool
pub fn submit_one(&self, at: &BlockId<B::Block>, xt: ExtrinsicFor<B>) -> Result<Arc<VerifiedFor<B>>, B::Error> {
Ok(self.submit_at(at, ::std::iter::once(xt))?.pop().expect("One extrinsic passed; one result returned; qed"))
}
/// Import a single extrinsic and starts to watch their progress in the pool.
pub fn submit_and_watch(&self, at: &BlockId<B::Block>, xt: ExtrinsicFor<B>) -> Result<Watcher<B::Hash>, B::Error> {
let xt = self.submit_at(at, Some(xt))?.pop().expect("One extrinsic passed; one result returned; qed");
Ok(self.pool.write().listener_mut().create_watcher(xt))
}
/// Remove from the pool.
pub fn remove(&self, hashes: &[B::Hash], is_valid: bool) -> Vec<Option<Arc<VerifiedFor<B>>>> {
let mut pool = self.pool.write();
let mut results = Vec::with_capacity(hashes.len());
// temporarily ban invalid transactions
if !is_valid {
debug!(target: "transaction-pool", "Banning invalid transactions: {:?}", hashes);
self.rotator.ban(&time::Instant::now(), hashes);
}
for hash in hashes {
results.push(pool.remove(hash, is_valid));
}
results
}
/// Cull transactions from the queue.
pub fn cull_from(
&self,
at: &BlockId<B::Block>,
senders: Option<&[<B::VEx as txpool::VerifiedTransaction>::Sender]>,
) -> usize
{
self.rotator.clear_timeouts(&time::Instant::now());
let ready = self.ready(at);
self.pool.write().cull(senders, ready)
}
/// Cull old transactions from the queue.
pub fn cull(&self, at: &BlockId<B::Block>) -> Result<usize, B::Error> {
Ok(self.cull_from(at, None))
}
/// Cull transactions from the queue and then compute the pending set.
pub fn cull_and_get_pending<F, T>(&self, at: &BlockId<B::Block>, f: F) -> Result<T, B::Error> where
F: FnOnce(txpool::PendingIterator<VerifiedFor<B>, Ready<B>, ScoringAdapter<B>, Listener<B::Hash>>) -> T,
{
self.cull_from(at, None);
Ok(self.pending(at, f))
}
/// Get the full status of the queue (including readiness)
pub fn status<R: txpool::Ready<VerifiedFor<B>>>(&self, ready: R) -> txpool::Status {
self.pool.read().status(ready)
}
/// Returns light status of the pool.
pub fn light_status(&self) -> txpool::LightStatus {
self.pool.read().light_status()
}
/// Removes all transactions from given sender
pub fn remove_sender(&self, sender: <B::VEx as txpool::VerifiedTransaction>::Sender) -> Vec<Arc<VerifiedFor<B>>> {
let mut pool = self.pool.write();
let pending = pool.pending_from_sender(|_: &VerifiedFor<B>| txpool::Readiness::Ready, &sender).collect();
// remove all transactions from this sender
pool.cull(Some(&[sender]), |_: &VerifiedFor<B>| txpool::Readiness::Stale);
pending
}
/// Retrieve the pending set. Be careful to not leak the pool `ReadGuard` to prevent deadlocks.
pub fn pending<F, T>(&self, at: &BlockId<B::Block>, f: F) -> T where
F: FnOnce(txpool::PendingIterator<VerifiedFor<B>, Ready<B>, ScoringAdapter<B>, Listener<B::Hash>>) -> T,
{
let ready = self.ready(at);
f(self.pool.read().pending(ready))
}
/// Retry to import all verified transactions from given sender.
pub fn retry_verification(&self, at: &BlockId<B::Block>, sender: <B::VEx as txpool::VerifiedTransaction>::Sender) -> Result<(), B::Error> {
let to_reverify = self.remove_sender(sender);
self.submit_at(at, to_reverify.into_iter().map(|ex| Arc::try_unwrap(ex).expect("Removed items have no references").original))?;
Ok(())
}
/// Reverify transaction that has been reported incorrect.
///
/// Returns `Ok(None)` in case the hash is missing, `Err(e)` in case of verification error and new transaction
/// reference otherwise.
///
/// TODO [ToDr] That method is currently unused, should be used together with BlockBuilder
/// when we detect that particular transaction has failed.
/// In such case we will attempt to remove or re-verify it.
pub fn reverify_transaction(&self, at: &BlockId<B::Block>, hash: B::Hash) -> Result<Option<Arc<VerifiedFor<B>>>, B::Error> {
let result = self.remove(&[hash], false).pop().expect("One hash passed; one result received; qed");
if let Some(ex) = result {
self.submit_one(at, Arc::try_unwrap(ex).expect("Removed items have no references").original).map(Some)
} else {
Ok(None)
}
}
/// Retrieve all transactions in the pool grouped by sender.
pub fn all(&self) -> AllExtrinsics<B> {
use txpool::VerifiedTransaction;
let pool = self.pool.read();
let all = pool.unordered_pending(AlwaysReady);
all.fold(Default::default(), |mut map: AllExtrinsics<B>, tx| {
// Map with `null` key is not serializable, so we fallback to default accountId.
map.entry(tx.verified.sender().clone())
.or_insert_with(Vec::new)
// use bytes type to make it serialize nicer.
.push(tx.original.clone());
map
})
}
fn ready<'a, 'b>(&'a self, at: &'b BlockId<B::Block>) -> Ready<'a, 'b, B> {
Ready {
api: &self.api,
rotator: &self.rotator,
context: self.api.ready(),
at,
now: time::Instant::now(),
}
}
}
/// A Readiness implementation that returns `Ready` for all transactions.
pub struct AlwaysReady;
impl<VEx> txpool::Ready<VEx> for AlwaysReady {
fn is_ready(&mut self, _tx: &VEx) -> txpool::Readiness {
txpool::Readiness::Ready
}
}
#[cfg(test)]
pub mod tests {
use txpool;
use super::{VerifiedFor, ExtrinsicFor};
use std::collections::HashMap;
use std::cmp::Ordering;
use {Pool, ChainApi, scoring, Readiness};
use keyring::Keyring::{self, *};
use codec::Encode;
use test_client::runtime::{AccountId, Block, Hash, Index, Extrinsic, Transfer};
use runtime_primitives::{generic, traits::{Hash as HashT, BlindCheckable, BlakeTwo256}};
use VerifiedTransaction as VerifiedExtrinsic;
type BlockId = generic::BlockId<Block>;
#[derive(Clone, Debug)]
pub struct VerifiedTransaction {
pub hash: Hash,
pub sender: AccountId,
pub nonce: u64,
}
impl txpool::VerifiedTransaction for VerifiedTransaction {
type Hash = Hash;
type Sender = AccountId;
fn hash(&self) -> &Self::Hash {
&self.hash
}
fn sender(&self) -> &Self::Sender {
&self.sender
}
fn mem_usage(&self) -> usize {
256
}
}
struct TestApi;
impl TestApi {
fn default() -> Self {
TestApi
}
}
impl ChainApi for TestApi {
type Block = Block;
type Hash = Hash;
type Sender = AccountId;
type Error = txpool::Error;
type VEx = VerifiedTransaction;
type Ready = HashMap<AccountId, u64>;
type Score = u64;
type Event = ();
fn verify_transaction(&self, _at: &BlockId, uxt: &ExtrinsicFor<Self>) -> Result<Self::VEx, Self::Error> {
let hash = BlakeTwo256::hash(&uxt.encode());
let xt = uxt.clone().check()?;
Ok(VerifiedTransaction {
hash,
sender: xt.transfer.from,
nonce: xt.transfer.nonce,
})
}
fn is_ready(&self, at: &BlockId, nonce_cache: &mut Self::Ready, xt: &VerifiedFor<Self>) -> Readiness {
let sender = xt.verified.sender;
let next_index = nonce_cache.entry(sender)
.or_insert_with(|| index(at, sender));
let result = match xt.original.transfer.nonce.cmp(&next_index) {
Ordering::Greater => Readiness::Future,
Ordering::Equal => Readiness::Ready,
Ordering::Less => Readiness::Stale,
};
// remember to increment `next_index`
*next_index = next_index.saturating_add(1);
result
}
fn ready(&self) -> Self::Ready {
HashMap::default()
}
fn compare(old: &VerifiedFor<Self>, other: &VerifiedFor<Self>) -> Ordering {
old.original.transfer.nonce.cmp(&other.original.transfer.nonce)
}
fn choose(old: &VerifiedFor<Self>, new: &VerifiedFor<Self>) -> scoring::Choice {
assert!(new.verified.sender == old.verified.sender, "Scoring::choose called with transactions from different senders");
if old.original.transfer.nonce == new.original.transfer.nonce {
return scoring::Choice::RejectNew;
}
scoring::Choice::InsertNew
}
fn update_scores(
xts: &[txpool::Transaction<VerifiedFor<Self>>],
scores: &mut [Self::Score],
_change: scoring::Change<()>
) {
for i in 0..xts.len() {
scores[i] = xts[i].original.transfer.amount;
}
}
fn should_replace(_old: &VerifiedFor<Self>, _new: &VerifiedFor<Self>) -> scoring::Choice {
scoring::Choice::InsertNew
}
}
fn index(at: &BlockId, _account: AccountId) -> u64 {
(_account[0] as u64) + number_of(at)
}
fn number_of(at: &BlockId) -> u64 {
match at {
generic::BlockId::Number(n) => *n as u64,
_ => 0,
}
}
fn uxt(who: Keyring, nonce: Index) -> Extrinsic {
let transfer = Transfer {
from: who.to_raw_public().into(),
to: AccountId::default(),
nonce,
amount: 1,
};
let signature = transfer.using_encoded(|e| who.sign(e));
Extrinsic {
transfer,
signature: signature.into(),
}
}
fn pool() -> Pool<TestApi> {
Pool::new(Default::default(), TestApi::default())
}
#[test]
fn submission_should_work() {
let pool = pool();
assert_eq!(209, index(&BlockId::number(0), Alice.to_raw_public().into()));
pool.submit_one(&BlockId::number(0), uxt(Alice, 209)).unwrap();
let pending: Vec<_> = pool.cull_and_get_pending(&BlockId::number(0), |p| p.map(|a| (*a.sender(), a.original.transfer.nonce)).collect()).unwrap();
assert_eq!(pending, vec![(Alice.to_raw_public().into(), 209)]);
}
#[test]
fn multiple_submission_should_work() {
let pool = pool();
pool.submit_one(&BlockId::number(0), uxt(Alice, 209)).unwrap();
pool.submit_one(&BlockId::number(0), uxt(Alice, 210)).unwrap();
let pending: Vec<_> = pool.cull_and_get_pending(&BlockId::number(0), |p| p.map(|a| (*a.sender(), a.original.transfer.nonce)).collect()).unwrap();
assert_eq!(pending, vec![(Alice.to_raw_public().into(), 209), (Alice.to_raw_public().into(), 210)]);
}
#[test]
fn early_nonce_should_be_culled() {
let pool = pool();
pool.submit_one(&BlockId::number(0), uxt(Alice, 208)).unwrap();
let pending: Vec<_> = pool.cull_and_get_pending(&BlockId::number(0), |p| p.map(|a| (*a.sender(), a.original.transfer.nonce)).collect()).unwrap();
assert_eq!(pending, vec![]);
}
#[test]
fn late_nonce_should_be_queued() {
let pool = pool();
pool.submit_one(&BlockId::number(0), uxt(Alice, 210)).unwrap();
let pending: Vec<_> = pool.cull_and_get_pending(&BlockId::number(0), |p| p.map(|a| (*a.sender(), a.original.transfer.nonce)).collect()).unwrap();
assert_eq!(pending, vec![]);
pool.submit_one(&BlockId::number(0), uxt(Alice, 209)).unwrap();
let pending: Vec<_> = pool.cull_and_get_pending(&BlockId::number(0), |p| p.map(|a| (*a.sender(), a.original.transfer.nonce)).collect()).unwrap();
assert_eq!(pending, vec![(Alice.to_raw_public().into(), 209), (Alice.to_raw_public().into(), 210)]);
}
#[test]
fn retrying_verification_might_not_change_anything() {
let pool = pool();
pool.submit_one(&BlockId::number(0), uxt(Alice, 209)).unwrap();
pool.submit_one(&BlockId::number(0), uxt(Alice, 210)).unwrap();
let pending: Vec<_> = pool.cull_and_get_pending(&BlockId::number(0), |p| p.map(|a| (*a.sender(), a.original.transfer.nonce)).collect()).unwrap();
assert_eq!(pending, vec![(Alice.to_raw_public().into(), 209), (Alice.to_raw_public().into(), 210)]);
pool.retry_verification(&BlockId::number(1), Alice.to_raw_public().into()).unwrap();
let pending: Vec<_> = pool.cull_and_get_pending(&BlockId::number(0), |p| p.map(|a| (*a.sender(), a.original.transfer.nonce)).collect()).unwrap();
assert_eq!(pending, vec![(Alice.to_raw_public().into(), 209), (Alice.to_raw_public().into(), 210)]);
}
#[test]
fn should_ban_invalid_transactions() {
let pool = pool();
let uxt = uxt(Alice, 209);
let hash = *pool.submit_one(&BlockId::number(0), uxt.clone()).unwrap().hash();
pool.remove(&[hash], true);
pool.submit_one(&BlockId::number(0), uxt.clone()).unwrap();
// when
pool.remove(&[hash], false);
let pending: Vec<AccountId> = pool.cull_and_get_pending(&BlockId::number(0), |p| p.map(|a| *a.sender()).collect()).unwrap();
assert_eq!(pending, vec![]);
// then
pool.submit_one(&BlockId::number(0), uxt.clone()).unwrap_err();
}
}
@@ -0,0 +1,209 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
//! Rotate extrinsic inside the pool.
//!
//! Keeps only recent extrinsic and discard the ones kept for a significant amount of time.
//! Discarded extrinsics are banned so that they don't get re-imported again.
use std::{
collections::HashMap,
fmt,
hash,
time::{Duration, Instant},
};
use parking_lot::RwLock;
use txpool::VerifiedTransaction;
use Verified;
/// Expected size of the banned extrinsics cache.
const EXPECTED_SIZE: usize = 2048;
/// Pool rotator is responsible to only keep fresh extrinsics in the pool.
///
/// Extrinsics that occupy the pool for too long are culled and temporarily banned from entering
/// the pool again.
pub struct PoolRotator<Hash> {
/// How long the extrinsic is banned for.
ban_time: Duration,
/// Currently banned extrinsics.
banned_until: RwLock<HashMap<Hash, Instant>>,
}
impl<Hash: hash::Hash + Eq> Default for PoolRotator<Hash> {
fn default() -> Self {
PoolRotator {
ban_time: Duration::from_secs(60 * 30),
banned_until: Default::default(),
}
}
}
impl<Hash: hash::Hash + Eq + Clone> PoolRotator<Hash> {
/// Returns `true` if extrinsic hash is currently banned.
pub fn is_banned(&self, hash: &Hash) -> bool {
self.banned_until.read().contains_key(hash)
}
/// Bans given set of hashes.
pub fn ban(&self, now: &Instant, hashes: &[Hash]) {
let mut banned = self.banned_until.write();
for hash in hashes {
banned.insert(hash.clone(), *now + self.ban_time);
}
if banned.len() > 2 * EXPECTED_SIZE {
while banned.len() > EXPECTED_SIZE {
if let Some(key) = banned.keys().next().cloned() {
banned.remove(&key);
}
}
}
}
/// Bans extrinsic if it's stale.
///
/// Returns `true` if extrinsic is stale and got banned.
pub fn ban_if_stale<Ex, VEx>(&self, now: &Instant, xt: &Verified<Ex, VEx>) -> bool where
VEx: VerifiedTransaction<Hash=Hash>,
Hash: fmt::Debug + fmt::LowerHex,
{
if &xt.valid_till > now {
return false;
}
self.ban(now, &[xt.verified.hash().clone()]);
true
}
/// Removes timed bans.
pub fn clear_timeouts(&self, now: &Instant) {
let mut banned = self.banned_until.write();
banned.retain(|_, &mut v| v >= *now);
}
}
#[cfg(test)]
mod tests {
use super::*;
use pool::tests::VerifiedTransaction;
use test_client::runtime::Hash;
fn rotator() -> PoolRotator<Hash> {
PoolRotator {
ban_time: Duration::from_millis(10),
..Default::default()
}
}
fn tx() -> (Hash, Verified<u64, VerifiedTransaction>) {
let hash = 5.into();
let tx = Verified {
original: 5,
verified: VerifiedTransaction {
hash,
sender: Default::default(),
nonce: Default::default(),
},
valid_till: Instant::now(),
};
(hash, tx)
}
#[test]
fn should_not_ban_if_not_stale() {
// given
let (hash, tx) = tx();
let rotator = rotator();
assert!(!rotator.is_banned(&hash));
let past = Instant::now() - Duration::from_millis(1000);
// when
assert!(!rotator.ban_if_stale(&past, &tx));
// then
assert!(!rotator.is_banned(&hash));
}
#[test]
fn should_ban_stale_extrinsic() {
// given
let (hash, tx) = tx();
let rotator = rotator();
assert!(!rotator.is_banned(&hash));
// when
assert!(rotator.ban_if_stale(&Instant::now(), &tx));
// then
assert!(rotator.is_banned(&hash));
}
#[test]
fn should_clear_banned() {
// given
let (hash, tx) = tx();
let rotator = rotator();
assert!(rotator.ban_if_stale(&Instant::now(), &tx));
assert!(rotator.is_banned(&hash));
// when
let future = Instant::now() + rotator.ban_time + rotator.ban_time;
rotator.clear_timeouts(&future);
// then
assert!(!rotator.is_banned(&hash));
}
#[test]
fn should_garbage_collect() {
// given
fn tx_with(i: u64, time: Instant) -> Verified<u64, VerifiedTransaction> {
let hash = i.into();
Verified {
original: i,
verified: VerifiedTransaction {
hash,
sender: Default::default(),
nonce: Default::default(),
},
valid_till: time,
}
}
let rotator = rotator();
let now = Instant::now();
let past = now - Duration::from_secs(1);
// when
for i in 0..2*EXPECTED_SIZE {
let tx = tx_with(i as u64, past);
assert!(rotator.ban_if_stale(&now, &tx));
}
assert_eq!(rotator.banned_until.read().len(), 2*EXPECTED_SIZE);
// then
let tx = tx_with(2*EXPECTED_SIZE as u64, past);
// trigger a garbage collection
assert!(rotator.ban_if_stale(&now, &tx));
assert_eq!(rotator.banned_until.read().len(), EXPECTED_SIZE);
}
}
@@ -0,0 +1,103 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
//! Extrinsics status updates.
use futures::{
Stream,
sync::mpsc,
};
/// Possible extrinsic status events
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum Status<H> {
/// Extrinsic has been finalised in block with given hash.
Finalised(H),
/// Some state change (perhaps another extrinsic was included) rendered this extrinsic invalid.
Usurped(H),
/// The extrinsic has been broadcast to the given peers.
Broadcast(Vec<String>),
/// Extrinsic has been dropped from the pool because of the limit.
Dropped,
}
/// Extrinsic watcher.
///
/// Represents a stream of status updates for particular extrinsic.
#[derive(Debug)]
pub struct Watcher<H> {
receiver: mpsc::UnboundedReceiver<Status<H>>,
}
impl<H> Watcher<H> {
/// Pipe the notifications to given sink.
///
/// Make sure to drive the future to completion.
pub fn into_stream(self) -> impl Stream<Item=Status<H>, Error=()> {
// we can safely ignore the error here, `UnboundedReceiver` never fails.
self.receiver.map_err(|_| ())
}
}
/// Sender part of the watcher. Exposed only for testing purposes.
#[derive(Debug, Default)]
pub struct Sender<H> {
receivers: Vec<mpsc::UnboundedSender<Status<H>>>,
finalised: bool,
}
impl<H: Clone> Sender<H> {
/// Add a new watcher to this sender object.
pub fn new_watcher(&mut self) -> Watcher<H> {
let (tx, receiver) = mpsc::unbounded();
self.receivers.push(tx);
Watcher {
receiver,
}
}
/// Some state change (perhaps another extrinsic was included) rendered this extrinsic invalid.
pub fn usurped(&mut self, hash: H) {
self.send(Status::Usurped(hash))
}
/// Extrinsic has been finalised in block with given hash.
pub fn finalised(&mut self, hash: H) {
self.send(Status::Finalised(hash));
self.finalised = true;
}
/// Transaction has been dropped from the pool because of the limit.
pub fn dropped(&mut self) {
self.send(Status::Dropped);
}
/// The extrinsic has been broadcast to the given peers.
pub fn broadcast(&mut self, peers: Vec<String>) {
self.send(Status::Broadcast(peers))
}
/// Returns true if the are no more listeners for this extrinsic or it was finalised.
pub fn is_done(&self) -> bool {
self.finalised || self.receivers.is_empty()
}
fn send(&mut self, status: Status<H>) {
self.receivers.retain(|sender| sender.unbounded_send(status.clone()).is_ok())
}
}