Limit transaction pool size (#1676)

* Avoid excessive hashing. Store extrinsic len.

* Implement pool limits.

* Fix issues.

* Make sure we return error in case it doesn't make into the pool.

* Pass parameters from CLI.

* Remove redundant todo.

* Fix tests.
This commit is contained in:
Tomasz Drwięga
2019-02-06 19:03:05 +01:00
committed by Gav Wood
parent 461cd384fc
commit 4e3eace15f
14 changed files with 398 additions and 38 deletions
+178 -18
View File
@@ -15,7 +15,7 @@
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use std::{
collections::HashMap,
collections::{HashSet, HashMap},
hash,
sync::Arc,
time,
@@ -38,6 +38,8 @@ use sr_primitives::{
transaction_validity::{TransactionValidity, TransactionTag as Tag},
};
pub use crate::base_pool::Limit;
/// Modification notification event stream type;
pub type EventStream = mpsc::UnboundedReceiver<()>;
@@ -70,17 +72,38 @@ pub trait ChainApi: Send + Sync {
/// Returns a block hash given the block id.
fn block_id_to_hash(&self, at: &BlockId<Self::Block>) -> Result<Option<BlockHash<Self>>, Self::Error>;
/// Hash the extrinsic.
fn hash(&self, uxt: &ExtrinsicFor<Self>) -> Self::Hash;
/// Returns hash and encoding length of the extrinsic.
fn hash_and_length(&self, uxt: &ExtrinsicFor<Self>) -> (Self::Hash, usize);
}
/// Pool configuration options.
#[derive(Debug, Clone, Default)]
pub struct Options;
#[derive(Debug, Clone)]
pub struct Options {
/// Ready queue limits.
pub ready: Limit,
/// Future queue limits.
pub future: Limit,
}
impl Default for Options {
fn default() -> Self {
Options {
ready: Limit {
count: 512,
total_bytes: 10 * 1024 * 1024,
},
future: Limit {
count: 128,
total_bytes: 1 * 1024 * 1024,
},
}
}
}
/// Extrinsics pool.
pub struct Pool<B: ChainApi> {
api: B,
options: Options,
listener: RwLock<Listener<ExHash<B>, BlockHash<B>>>,
pool: RwLock<base::BasePool<
ExHash<B>,
@@ -91,7 +114,6 @@ pub struct Pool<B: ChainApi> {
}
impl<B: ChainApi> Pool<B> {
/// Imports a bunch of unverified extrinsics to the pool
pub fn submit_at<T>(&self, at: &BlockId<B::Block>, xts: T) -> Result<Vec<Result<ExHash<B>, B::Error>>, B::Error> where
T: IntoIterator<Item=ExtrinsicFor<B>>
@@ -99,10 +121,10 @@ impl<B: ChainApi> Pool<B> {
let block_number = self.api.block_id_to_number(at)?
.ok_or_else(|| error::ErrorKind::Msg(format!("Invalid block id: {:?}", at)).into())?;
Ok(xts
let results = xts
.into_iter()
.map(|xt| -> Result<_, B::Error> {
let hash = self.api.hash(&xt);
let (hash, bytes) = self.api.hash_and_length(&xt);
if self.rotator.is_banned(&hash) {
bail!(error::Error::from(error::ErrorKind::TemporarilyBanned))
}
@@ -111,6 +133,7 @@ impl<B: ChainApi> Pool<B> {
TransactionValidity::Valid { priority, requires, provides, longevity } => {
Ok(base::Transaction {
data: xt,
bytes,
hash,
priority,
requires,
@@ -138,7 +161,42 @@ impl<B: ChainApi> Pool<B> {
fire_events(&mut *listener, &imported);
Ok(imported.hash().clone())
})
.collect())
.collect::<Vec<_>>();
let removed = self.enforce_limits();
Ok(results.into_iter().map(|res| match res {
Ok(ref hash) if removed.contains(hash) => Err(error::Error::from(error::ErrorKind::ImmediatelyDropped).into()),
other => other,
}).collect())
}
fn enforce_limits(&self) -> HashSet<ExHash<B>> {
let status = self.pool.read().status();
let ready_limit = &self.options.ready;
let future_limit = &self.options.future;
if ready_limit.is_exceeded(status.ready, status.ready_bytes)
|| future_limit.is_exceeded(status.future, status.future_bytes) {
// clean up the pool
let removed = {
let mut pool = self.pool.write();
let removed = pool.enforce_limits(ready_limit, future_limit)
.into_iter().map(|x| x.hash.clone()).collect::<HashSet<_>>();
// ban all removed transactions
self.rotator.ban(&std::time::Instant::now(), removed.iter().map(|x| x.clone()));
removed
};
// run notifications
let mut listener = self.listener.write();
for h in &removed {
listener.dropped(h, None);
}
removed
} else {
Default::default()
}
}
/// Imports one unverified extrinsic to the pool
@@ -148,7 +206,7 @@ impl<B: ChainApi> Pool<B> {
/// Import a single extrinsic and starts to watch their progress in the pool.
pub fn submit_and_watch(&self, at: &BlockId<B::Block>, xt: ExtrinsicFor<B>) -> Result<Watcher<ExHash<B>, BlockHash<B>>, B::Error> {
let hash = self.api.hash(&xt);
let hash = self.api.hash_and_length(&xt).0;
let watcher = self.listener.write().create_watcher(hash);
self.submit_one(at, xt)?;
Ok(watcher)
@@ -163,7 +221,7 @@ impl<B: ChainApi> Pool<B> {
pub fn prune(&self, at: &BlockId<B::Block>, parent: &BlockId<B::Block>, extrinsics: &[ExtrinsicFor<B>]) -> Result<(), B::Error> {
let mut tags = Vec::with_capacity(extrinsics.len());
// Get details of all extrinsics that are already in the pool
let hashes = extrinsics.iter().map(|extrinsic| self.api.hash(extrinsic)).collect::<Vec<_>>();
let hashes = extrinsics.iter().map(|extrinsic| self.api.hash_and_length(extrinsic).0).collect::<Vec<_>>();
let in_pool = self.pool.read().by_hash(&hashes);
{
// Zip the ones from the pool with the full list (we get pairs `(Extrinsic, Option<TransactionDetails>)`)
@@ -303,13 +361,12 @@ impl<B: ChainApi> Pool<B> {
Ok(())
}
}
impl<B: ChainApi> Pool<B> {
/// Create a new transaction pool.
pub fn new(_options: Options, api: B) -> Self {
pub fn new(options: Options, api: B) -> Self {
Pool {
api,
options,
listener: Default::default(),
pool: Default::default(),
import_notification_sinks: Default::default(),
@@ -359,8 +416,9 @@ impl<B: ChainApi> Pool<B> {
}
/// Returns transaction hash
pub fn hash_of(&self, xt: &ExtrinsicFor<B>) -> ExHash<B> {
self.api.hash(xt)
#[cfg(test)]
fn hash_of(&self, xt: &ExtrinsicFor<B>) -> ExHash<B> {
self.api.hash_and_length(xt).0
}
}
@@ -394,6 +452,7 @@ fn fire_events<H, H2, Ex>(
mod tests {
use super::*;
use futures::Stream;
use parity_codec::Encode;
use test_runtime::{Block, Extrinsic, Transfer, H256};
use assert_matches::assert_matches;
use crate::watcher;
@@ -440,8 +499,12 @@ mod tests {
}
/// Hash the extrinsic.
fn hash(&self, uxt: &ExtrinsicFor<Self>) -> Self::Hash {
(uxt.transfer().from.to_low_u64_be() << 5) + uxt.transfer().nonce
fn hash_and_length(&self, uxt: &ExtrinsicFor<Self>) -> (Self::Hash, usize) {
let len = uxt.encode().len();
(
(uxt.transfer().from.to_low_u64_be() << 5) + uxt.transfer().nonce,
len
)
}
}
@@ -586,6 +649,66 @@ mod tests {
assert!(pool.rotator.is_banned(&hash1));
}
#[test]
fn should_limit_futures() {
// given
let limit = Limit {
count: 100,
total_bytes: 200,
};
let pool = Pool::new(Options {
ready: limit.clone(),
future: limit.clone(),
}, TestApi::default());
let hash1 = pool.submit_one(&BlockId::Number(0), uxt(Transfer {
from: H256::from_low_u64_be(1),
to: H256::from_low_u64_be(2),
amount: 5,
nonce: 1,
})).unwrap();
assert_eq!(pool.status().future, 1);
// when
let hash2 = pool.submit_one(&BlockId::Number(0), uxt(Transfer {
from: H256::from_low_u64_be(2),
to: H256::from_low_u64_be(2),
amount: 5,
nonce: 10,
})).unwrap();
// then
assert_eq!(pool.status().future, 1);
assert!(pool.rotator.is_banned(&hash1));
assert!(!pool.rotator.is_banned(&hash2));
}
#[test]
fn should_error_if_reject_immediately() {
// given
let limit = Limit {
count: 100,
total_bytes: 10,
};
let pool = Pool::new(Options {
ready: limit.clone(),
future: limit.clone(),
}, TestApi::default());
// when
pool.submit_one(&BlockId::Number(0), uxt(Transfer {
from: H256::from_low_u64_be(1),
to: H256::from_low_u64_be(2),
amount: 5,
nonce: 1,
})).unwrap_err();
// then
assert_eq!(pool.status().ready, 0);
assert_eq!(pool.status().future, 0);
}
mod listener {
use super::*;
@@ -716,5 +839,42 @@ mod tests {
assert_eq!(stream.next(), Some(Ok(watcher::Status::Ready)));
assert_eq!(stream.next(), Some(Ok(watcher::Status::Broadcast(peers))));
}
#[test]
fn should_trigger_dropped() {
// given
let limit = Limit {
count: 1,
total_bytes: 1000,
};
let pool = Pool::new(Options {
ready: limit.clone(),
future: limit.clone(),
}, TestApi::default());
let xt = uxt(Transfer {
from: H256::from_low_u64_be(1),
to: H256::from_low_u64_be(2),
amount: 5,
nonce: 0,
});
let watcher = pool.submit_and_watch(&BlockId::Number(0), xt).unwrap();
assert_eq!(pool.status().ready, 1);
// when
let xt = uxt(Transfer {
from: H256::from_low_u64_be(2),
to: H256::from_low_u64_be(1),
amount: 4,
nonce: 1,
});
pool.submit_one(&BlockId::Number(1), xt).unwrap();
assert_eq!(pool.status().ready, 1);
// then
let mut stream = watcher.into_stream().wait();
assert_eq!(stream.next(), Some(Ok(watcher::Status::Ready)));
assert_eq!(stream.next(), Some(Ok(watcher::Status::Dropped)));
}
}
}