mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-17 23:01:01 +00:00
* Allow replacing transactions. * Clear old transactions and ban them temporarily. * Move to a separate module and add some tests. * Add bound to banned transactions. * Remove unnecessary block and double PoolRotator.
This commit is contained in:
committed by
Robert Habermeier
parent
134b8f47dd
commit
d965e8f08e
@@ -37,6 +37,7 @@ pub mod watcher;
|
|||||||
mod error;
|
mod error;
|
||||||
mod listener;
|
mod listener;
|
||||||
mod pool;
|
mod pool;
|
||||||
|
mod rotator;
|
||||||
|
|
||||||
pub use listener::Listener;
|
pub use listener::Listener;
|
||||||
pub use pool::{Pool, ChainApi, EventStream, Verified, VerifiedFor, ExtrinsicFor, ExHash, AllExtrinsics};
|
pub use pool::{Pool, ChainApi, EventStream, Verified, VerifiedFor, ExtrinsicFor, ExHash, AllExtrinsics};
|
||||||
|
|||||||
@@ -14,15 +14,21 @@
|
|||||||
// You should have received a copy of the GNU General Public License
|
// You should have received a copy of the GNU General Public License
|
||||||
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
|
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
use std::{ collections::HashMap, fmt, sync::Arc, collections::BTreeMap};
|
use std::{
|
||||||
|
collections::{BTreeMap, HashMap},
|
||||||
|
fmt,
|
||||||
|
sync::Arc,
|
||||||
|
time,
|
||||||
|
};
|
||||||
use futures::sync::mpsc;
|
use futures::sync::mpsc;
|
||||||
use parking_lot::{Mutex, RwLock};
|
use parking_lot::{Mutex, RwLock};
|
||||||
use serde::{Serialize, de::DeserializeOwned};
|
use serde::{Serialize, de::DeserializeOwned};
|
||||||
use txpool::{self, Scoring, Readiness};
|
use txpool::{self, Scoring, Readiness};
|
||||||
|
|
||||||
use listener::Listener;
|
|
||||||
use watcher::Watcher;
|
|
||||||
use error::IntoPoolError;
|
use error::IntoPoolError;
|
||||||
|
use listener::Listener;
|
||||||
|
use rotator::PoolRotator;
|
||||||
|
use watcher::Watcher;
|
||||||
|
|
||||||
use runtime_primitives::{generic::BlockId, traits::Block as BlockT};
|
use runtime_primitives::{generic::BlockId, traits::Block as BlockT};
|
||||||
|
|
||||||
@@ -40,16 +46,18 @@ pub type AllExtrinsics<A> = BTreeMap<<<A as ChainApi>::VEx as txpool::VerifiedTr
|
|||||||
|
|
||||||
/// Verified extrinsic struct. Wraps original extrinsic and verification info.
|
/// Verified extrinsic struct. Wraps original extrinsic and verification info.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct Verified<Ex: ::std::fmt::Debug, VEx: txpool::VerifiedTransaction> {
|
pub struct Verified<Ex, VEx> {
|
||||||
/// Original extrinsic.
|
/// Original extrinsic.
|
||||||
pub original: Ex,
|
pub original: Ex,
|
||||||
/// Verification data.
|
/// Verification data.
|
||||||
pub verified: VEx,
|
pub verified: VEx,
|
||||||
|
/// Pool deadline, after it's reached we remove the extrinsic from the pool.
|
||||||
|
pub valid_till: time::Instant,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<Ex, VEx> txpool::VerifiedTransaction for Verified<Ex, VEx>
|
impl<Ex, VEx> txpool::VerifiedTransaction for Verified<Ex, VEx>
|
||||||
where
|
where
|
||||||
Ex: ::std::fmt::Debug,
|
Ex: fmt::Debug,
|
||||||
VEx: txpool::VerifiedTransaction,
|
VEx: txpool::VerifiedTransaction,
|
||||||
{
|
{
|
||||||
type Hash = <VEx as txpool::VerifiedTransaction>::Hash;
|
type Hash = <VEx as txpool::VerifiedTransaction>::Hash;
|
||||||
@@ -118,10 +126,17 @@ pub struct Ready<'a, 'b, B: 'a + ChainApi> {
|
|||||||
api: &'a B,
|
api: &'a B,
|
||||||
at: &'b BlockId<B::Block>,
|
at: &'b BlockId<B::Block>,
|
||||||
context: B::Ready,
|
context: B::Ready,
|
||||||
|
rotator: &'a PoolRotator<B::Hash>,
|
||||||
|
now: time::Instant,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, 'b, B: ChainApi> txpool::Ready<VerifiedFor<B>> for Ready<'a, 'b, B> {
|
impl<'a, 'b, B: ChainApi> txpool::Ready<VerifiedFor<B>> for Ready<'a, 'b, B> {
|
||||||
fn is_ready(&mut self, xt: &VerifiedFor<B>) -> Readiness {
|
fn is_ready(&mut self, xt: &VerifiedFor<B>) -> Readiness {
|
||||||
|
if self.rotator.ban_if_stale(&self.now, xt) {
|
||||||
|
debug!(target: "extrinsic-pool", "[{:?}] Banning as stale.", txpool::VerifiedTransaction::hash(xt));
|
||||||
|
return Readiness::Stale;
|
||||||
|
}
|
||||||
|
|
||||||
self.api.is_ready(self.at, &mut self.context, xt)
|
self.api.is_ready(self.at, &mut self.context, xt)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -155,6 +170,11 @@ impl<T: ChainApi> Scoring<VerifiedFor<T>> for ScoringAdapter<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Maximum time the transaction will be kept in the pool.
|
||||||
|
///
|
||||||
|
/// Transactions that don't get included within the limit are removed from the pool.
|
||||||
|
const POOL_TIME: time::Duration = time::Duration::from_secs(60 * 5);
|
||||||
|
|
||||||
/// Extrinsics pool.
|
/// Extrinsics pool.
|
||||||
pub struct Pool<B: ChainApi> {
|
pub struct Pool<B: ChainApi> {
|
||||||
api: B,
|
api: B,
|
||||||
@@ -164,6 +184,7 @@ pub struct Pool<B: ChainApi> {
|
|||||||
Listener<B::Hash>,
|
Listener<B::Hash>,
|
||||||
>>,
|
>>,
|
||||||
import_notification_sinks: Mutex<Vec<mpsc::UnboundedSender<()>>>,
|
import_notification_sinks: Mutex<Vec<mpsc::UnboundedSender<()>>>,
|
||||||
|
rotator: PoolRotator<B::Hash>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<B: ChainApi> Pool<B> {
|
impl<B: ChainApi> Pool<B> {
|
||||||
@@ -173,6 +194,7 @@ impl<B: ChainApi> Pool<B> {
|
|||||||
pool: RwLock::new(txpool::Pool::new(Listener::default(), ScoringAdapter::<B>(Default::default()), options)),
|
pool: RwLock::new(txpool::Pool::new(Listener::default(), ScoringAdapter::<B>(Default::default()), options)),
|
||||||
import_notification_sinks: Default::default(),
|
import_notification_sinks: Default::default(),
|
||||||
api,
|
api,
|
||||||
|
rotator: Default::default(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -206,9 +228,20 @@ impl<B: ChainApi> Pool<B> {
|
|||||||
{
|
{
|
||||||
xts
|
xts
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|xt| (self.api.verify_transaction(at, &xt), xt))
|
.map(|xt| {
|
||||||
|
match self.api.verify_transaction(at, &xt) {
|
||||||
|
Ok(ref verified) if self.rotator.is_banned(txpool::VerifiedTransaction::hash(verified)) => {
|
||||||
|
return (Err(txpool::Error::from("Temporarily Banned".to_owned()).into()), xt)
|
||||||
|
},
|
||||||
|
result => (result, xt),
|
||||||
|
}
|
||||||
|
})
|
||||||
.map(|(v, xt)| {
|
.map(|(v, xt)| {
|
||||||
let xt = Verified { original: xt, verified: v? };
|
let xt = Verified {
|
||||||
|
original: xt,
|
||||||
|
verified: v?,
|
||||||
|
valid_till: time::Instant::now() + POOL_TIME,
|
||||||
|
};
|
||||||
Ok(self.pool.write().import(xt)?)
|
Ok(self.pool.write().import(xt)?)
|
||||||
})
|
})
|
||||||
.collect()
|
.collect()
|
||||||
@@ -216,9 +249,7 @@ impl<B: ChainApi> Pool<B> {
|
|||||||
|
|
||||||
/// Imports one unverified extrinsic to the pool
|
/// Imports one unverified extrinsic to the pool
|
||||||
pub fn submit_one(&self, at: &BlockId<B::Block>, xt: ExtrinsicFor<B>) -> Result<Arc<VerifiedFor<B>>, B::Error> {
|
pub fn submit_one(&self, at: &BlockId<B::Block>, xt: ExtrinsicFor<B>) -> Result<Arc<VerifiedFor<B>>, B::Error> {
|
||||||
let v = self.api.verify_transaction(at, &xt)?;
|
Ok(self.submit_at(at, ::std::iter::once(xt))?.pop().expect("One extrinsic passed; one result returned; qed"))
|
||||||
let xt = Verified { original: xt, verified: v };
|
|
||||||
Ok(self.pool.write().import(xt)?)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Import a single extrinsic and starts to watch their progress in the pool.
|
/// Import a single extrinsic and starts to watch their progress in the pool.
|
||||||
@@ -244,7 +275,8 @@ impl<B: ChainApi> Pool<B> {
|
|||||||
senders: Option<&[<B::VEx as txpool::VerifiedTransaction>::Sender]>,
|
senders: Option<&[<B::VEx as txpool::VerifiedTransaction>::Sender]>,
|
||||||
) -> usize
|
) -> usize
|
||||||
{
|
{
|
||||||
let ready = Ready { api: &self.api, context: self.api.ready(), at };
|
self.rotator.clear_timeouts(&time::Instant::now());
|
||||||
|
let ready = self.ready(at);
|
||||||
self.pool.write().cull(senders, ready)
|
self.pool.write().cull(senders, ready)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -284,7 +316,7 @@ impl<B: ChainApi> Pool<B> {
|
|||||||
pub fn pending<F, T>(&self, at: &BlockId<B::Block>, f: F) -> T where
|
pub fn pending<F, T>(&self, at: &BlockId<B::Block>, f: F) -> T where
|
||||||
F: FnOnce(txpool::PendingIterator<VerifiedFor<B>, Ready<B>, ScoringAdapter<B>, Listener<B::Hash>>) -> T,
|
F: FnOnce(txpool::PendingIterator<VerifiedFor<B>, Ready<B>, ScoringAdapter<B>, Listener<B::Hash>>) -> T,
|
||||||
{
|
{
|
||||||
let ready = Ready { api: &self.api, context: self.api.ready(), at };
|
let ready = self.ready(at);
|
||||||
f(self.pool.read().pending(ready))
|
f(self.pool.read().pending(ready))
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -326,6 +358,16 @@ impl<B: ChainApi> Pool<B> {
|
|||||||
map
|
map
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn ready<'a, 'b>(&'a self, at: &'b BlockId<B::Block>) -> Ready<'a, 'b, B> {
|
||||||
|
Ready {
|
||||||
|
api: &self.api,
|
||||||
|
rotator: &self.rotator,
|
||||||
|
context: self.api.ready(),
|
||||||
|
at,
|
||||||
|
now: time::Instant::now(),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A Readiness implementation that returns `Ready` for all transactions.
|
/// A Readiness implementation that returns `Ready` for all transactions.
|
||||||
@@ -337,7 +379,7 @@ impl<VEx> txpool::Ready<VEx> for AlwaysReady {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
pub mod tests {
|
||||||
use txpool;
|
use txpool;
|
||||||
use super::{VerifiedFor, ExtrinsicFor};
|
use super::{VerifiedFor, ExtrinsicFor};
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
@@ -353,9 +395,9 @@ mod tests {
|
|||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct VerifiedTransaction {
|
pub struct VerifiedTransaction {
|
||||||
hash: Hash,
|
pub hash: Hash,
|
||||||
sender: AccountId,
|
pub sender: AccountId,
|
||||||
nonce: u64,
|
pub nonce: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl txpool::VerifiedTransaction for VerifiedTransaction {
|
impl txpool::VerifiedTransaction for VerifiedTransaction {
|
||||||
|
|||||||
@@ -0,0 +1,177 @@
|
|||||||
|
// Copyright 2018 Parity Technologies (UK) Ltd.
|
||||||
|
// This file is part of Polkadot.
|
||||||
|
|
||||||
|
// Polkadot is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
|
||||||
|
// Polkadot is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU General Public License for more details.
|
||||||
|
|
||||||
|
// You should have received a copy of the GNU General Public License
|
||||||
|
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
//! Rotate extrinsic inside the pool.
|
||||||
|
//!
|
||||||
|
//! Keeps only recent extrinsic and discard the ones kept for a significant amount of time.
|
||||||
|
//! Discarded extrinsics are banned so that they don't get re-imported again.
|
||||||
|
|
||||||
|
use std::{
|
||||||
|
collections::HashMap,
|
||||||
|
fmt,
|
||||||
|
hash,
|
||||||
|
time::{Duration, Instant},
|
||||||
|
};
|
||||||
|
use parking_lot::RwLock;
|
||||||
|
use txpool::VerifiedTransaction;
|
||||||
|
use Verified;
|
||||||
|
|
||||||
|
/// Expected size of the banned extrinsics cache.
|
||||||
|
const EXPECTED_SIZE: usize = 2048;
|
||||||
|
|
||||||
|
/// Pool rotator is responsible to only keep fresh extrinsics in the pool.
|
||||||
|
///
|
||||||
|
/// Extrinsics that occupy the pool for too long are culled and temporarily banned from entering
|
||||||
|
/// the pool again.
|
||||||
|
pub struct PoolRotator<Hash> {
|
||||||
|
/// How long the extrinsic is banned for.
|
||||||
|
ban_time: Duration,
|
||||||
|
/// Currently banned extrinsics.
|
||||||
|
banned_until: RwLock<HashMap<Hash, Instant>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<Hash: hash::Hash + Eq> Default for PoolRotator<Hash> {
|
||||||
|
fn default() -> Self {
|
||||||
|
PoolRotator {
|
||||||
|
ban_time: Duration::from_secs(60 * 30),
|
||||||
|
banned_until: Default::default(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<Hash: hash::Hash + Eq + Clone> PoolRotator<Hash> {
|
||||||
|
/// Returns `true` if extrinsic hash is currently banned.
|
||||||
|
pub fn is_banned(&self, hash: &Hash) -> bool {
|
||||||
|
self.banned_until.read().contains_key(hash)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Bans extrinsic if it's stale.
|
||||||
|
///
|
||||||
|
/// Returns `true` if extrinsic is stale and got banned.
|
||||||
|
pub fn ban_if_stale<Ex, VEx>(&self, now: &Instant, xt: &Verified<Ex, VEx>) -> bool where
|
||||||
|
VEx: VerifiedTransaction<Hash=Hash>,
|
||||||
|
Hash: fmt::Debug + fmt::LowerHex,
|
||||||
|
{
|
||||||
|
if &xt.valid_till > now {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut banned = self.banned_until.write();
|
||||||
|
banned.insert(xt.verified.hash().clone(), *now + self.ban_time);
|
||||||
|
|
||||||
|
if banned.len() > 2 * EXPECTED_SIZE {
|
||||||
|
while banned.len() > EXPECTED_SIZE {
|
||||||
|
if let Some(key) = banned.keys().next().cloned() {
|
||||||
|
banned.remove(&key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
true
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Removes timed bans.
|
||||||
|
pub fn clear_timeouts(&self, now: &Instant) {
|
||||||
|
let mut banned = self.banned_until.write();
|
||||||
|
|
||||||
|
let to_remove = banned
|
||||||
|
.iter()
|
||||||
|
.filter_map(|(k, v)| if v < now {
|
||||||
|
Some(k.clone())
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}).collect::<Vec<_>>();
|
||||||
|
|
||||||
|
for k in to_remove {
|
||||||
|
banned.remove(&k);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use pool::tests::VerifiedTransaction;
|
||||||
|
use test_client::runtime::Hash;
|
||||||
|
|
||||||
|
fn rotator() -> PoolRotator<Hash> {
|
||||||
|
PoolRotator {
|
||||||
|
ban_time: Duration::from_millis(10),
|
||||||
|
..Default::default()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn tx() -> (Hash, Verified<u64, VerifiedTransaction>) {
|
||||||
|
let hash = 5.into();
|
||||||
|
let tx = Verified {
|
||||||
|
original: 5,
|
||||||
|
verified: VerifiedTransaction {
|
||||||
|
hash,
|
||||||
|
sender: Default::default(),
|
||||||
|
nonce: Default::default(),
|
||||||
|
},
|
||||||
|
valid_till: Instant::now(),
|
||||||
|
};
|
||||||
|
|
||||||
|
(hash, tx)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn should_not_ban_if_not_stale() {
|
||||||
|
// given
|
||||||
|
let (hash, tx) = tx();
|
||||||
|
let rotator = rotator();
|
||||||
|
assert!(!rotator.is_banned(&hash));
|
||||||
|
let past = Instant::now() - Duration::from_millis(1000);
|
||||||
|
|
||||||
|
// when
|
||||||
|
assert!(!rotator.ban_if_stale(&past, &tx));
|
||||||
|
|
||||||
|
// then
|
||||||
|
assert!(!rotator.is_banned(&hash));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn should_ban_stale_extrinsic() {
|
||||||
|
// given
|
||||||
|
let (hash, tx) = tx();
|
||||||
|
let rotator = rotator();
|
||||||
|
assert!(!rotator.is_banned(&hash));
|
||||||
|
|
||||||
|
// when
|
||||||
|
assert!(rotator.ban_if_stale(&Instant::now(), &tx));
|
||||||
|
|
||||||
|
// then
|
||||||
|
assert!(rotator.is_banned(&hash));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn should_clear_banned() {
|
||||||
|
// given
|
||||||
|
let (hash, tx) = tx();
|
||||||
|
let rotator = rotator();
|
||||||
|
assert!(rotator.ban_if_stale(&Instant::now(), &tx));
|
||||||
|
assert!(rotator.is_banned(&hash));
|
||||||
|
|
||||||
|
// when
|
||||||
|
let future = Instant::now() + rotator.ban_time + rotator.ban_time;
|
||||||
|
rotator.clear_timeouts(&future);
|
||||||
|
|
||||||
|
// then
|
||||||
|
assert!(!rotator.is_banned(&hash));
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user