Pool: A bunch of tests and fixes (#914)

* Implement Ready/Future events.

* Trigger invalid notification.

* Clearing stale transactions test.

* Fix RPC test.
This commit is contained in:
Tomasz Drwięga
2018-10-17 16:25:30 +02:00
committed by Gav Wood
parent 9886d12c26
commit 0c7389e108
7 changed files with 342 additions and 19 deletions
+2
View File
@@ -3319,6 +3319,7 @@ dependencies = [
name = "substrate-transaction-graph"
version = "0.1.0"
dependencies = [
"assert_matches 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -3326,6 +3327,7 @@ dependencies = [
"serde 1.0.79 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_derive 1.0.79 (registry+https://github.com/rust-lang/crates.io-index)",
"sr-primitives 0.1.0",
"substrate-test-runtime 0.1.0",
]
[[package]]
+5
View File
@@ -109,6 +109,11 @@ fn should_watch_extrinsic() {
Extrinsic { transfer: tx, signature }
};
AuthorApi::submit_rich_extrinsic(&p, replacement).unwrap();
let (res, data) = runtime.block_on(data.into_future()).unwrap();
assert_eq!(
res,
Some(r#"{"jsonrpc":"2.0","method":"test","params":{"result":"ready","subscription":1}}"#.into())
);
assert_eq!(
runtime.block_on(data.into_future()).unwrap().0,
Some(r#"{"jsonrpc":"2.0","method":"test","params":{"result":{"usurped":"0xed454dcee51431679c2559403187a56567fded1fc50b6ae3aada87c1d412df5c"},"subscription":1}}"#.into())
@@ -11,3 +11,7 @@ parking_lot = "0.4"
serde = "1.0"
serde_derive = "1.0"
sr-primitives = { path = "../../sr-primitives" }
[dev-dependencies]
assert_matches = "1.1"
substrate-test-runtime = { path = "../../test-runtime" }
@@ -37,6 +37,12 @@ extern crate sr_primitives;
#[macro_use] extern crate log;
#[macro_use] extern crate serde_derive;
#[cfg(test)]
extern crate substrate_test_runtime as test_runtime;
#[cfg(test)]
#[macro_use]
extern crate assert_matches;
mod future;
mod listener;
mod pool;
@@ -53,8 +53,8 @@ impl<H: hash::Hash + traits::Member, H2: Clone> Listener<H, H2> {
///
/// The watcher can be used to subscribe to lifecycle events of that extrinsic.
pub fn create_watcher(&mut self, hash: H) -> watcher::Watcher<H, H2> {
let sender = self.watchers.entry(hash).or_insert_with(watcher::Sender::default);
sender.new_watcher()
let sender = self.watchers.entry(hash.clone()).or_insert_with(watcher::Sender::default);
sender.new_watcher(hash)
}
/// Notify the listeners about extrinsic broadcast.
@@ -64,13 +64,15 @@ impl<H: hash::Hash + traits::Member, H2: Clone> Listener<H, H2> {
/// New transaction was added to the ready pool or promoted from the future pool.
pub fn ready(&mut self, tx: &H, old: Option<&H>) {
self.fire(tx, |watcher| watcher.ready());
if let Some(old) = old {
self.fire(old, |watcher| watcher.usurped(tx.clone()));
}
}
/// New transaction was added to the future pool.
pub fn future(&mut self, _tx: &H) {
pub fn future(&mut self, tx: &H) {
self.fire(tx, |watcher| watcher.future());
}
/// Transaction was dropped from the pool because of the limit.
@@ -81,14 +83,10 @@ impl<H: hash::Hash + traits::Member, H2: Clone> Listener<H, H2> {
})
}
/// Transaction was rejected from the pool.
pub fn rejected(&mut self, tx: &H, is_invalid: bool) {
warn!(target: "transaction-pool", "Extrinsic rejected ({}): {:?}", is_invalid, tx);
}
/// Transaction was removed as invalid.
pub fn invalid(&mut self, tx: &H) {
warn!(target: "transaction-pool", "Extrinsic invalid: {:?}", tx);
self.fire(tx, |watcher| watcher.invalid());
}
/// Transaction was pruned from the pool.
@@ -101,7 +101,7 @@ impl<B: ChainApi> Pool<B> {
.map(|xt| -> Result<_, B::Error> {
let hash = self.api.hash(&xt);
if self.rotator.is_banned(&hash) {
return Err(error::ErrorKind::TemporarilyBanned.into())?;
bail!(error::Error::from(error::ErrorKind::TemporarilyBanned))
}
match self.api.validate_transaction(at, &xt)? {
@@ -119,7 +119,7 @@ impl<B: ChainApi> Pool<B> {
bail!(error::Error::from(error::ErrorKind::InvalidTransaction))
},
TransactionValidity::Unknown => {
self.listener.write().rejected(&hash, false);
self.listener.write().invalid(&hash);
bail!(error::Error::from(error::ErrorKind::UnknownTransactionValidity))
},
}
@@ -127,7 +127,9 @@ impl<B: ChainApi> Pool<B> {
.map(|tx| {
let imported = self.pool.write().import(tx?)?;
self.import_notification_sinks.lock().retain(|sink| sink.unbounded_send(()).is_ok());
if let base::Imported::Ready { .. } = imported {
self.import_notification_sinks.lock().retain(|sink| sink.unbounded_send(()).is_ok());
}
let mut listener = self.listener.write();
fire_events(&mut *listener, &imported);
@@ -143,8 +145,10 @@ impl<B: ChainApi> Pool<B> {
/// Import a single extrinsic and starts to watch their progress in the pool.
pub fn submit_and_watch(&self, at: &BlockId<B::Block>, xt: ExtrinsicFor<B>) -> Result<Watcher<ExHash<B>, BlockHash<B>>, B::Error> {
let xt = self.submit_one(at, xt)?;
Ok(self.listener.write().create_watcher(xt))
let hash = self.api.hash(&xt);
let watcher = self.listener.write().create_watcher(hash);
self.submit_one(at, xt)?;
Ok(watcher)
}
/// Prunes ready transactions that provide given list of tags.
@@ -300,7 +304,7 @@ fn fire_events<H, H2, Ex>(
base::Imported::Ready { ref promoted, ref failed, ref removed, ref hash } => {
listener.ready(hash, None);
for f in failed {
listener.rejected(f, true);
listener.invalid(f);
}
for r in removed {
listener.dropped(&r.hash, Some(hash));
@@ -317,9 +321,287 @@ fn fire_events<H, H2, Ex>(
#[cfg(test)]
mod tests {
use super::*;
use futures::Stream;
use test_runtime::{Block, Extrinsic, Transfer};
#[derive(Debug, Default)]
struct TestApi;
impl ChainApi for TestApi {
type Block = Block;
type Hash = u64;
type Error = error::Error;
/// Verify extrinsic at given block.
fn validate_transaction(&self, at: &BlockId<Self::Block>, uxt: &ExtrinsicFor<Self>) -> Result<TransactionValidity, Self::Error> {
let block_number = self.block_id_to_number(at)?.unwrap();
let nonce = uxt.transfer.nonce;
if nonce < block_number {
Ok(TransactionValidity::Invalid)
} else {
Ok(TransactionValidity::Valid(
4,
if nonce > block_number { vec![vec![nonce as u8 - 1]] } else { vec![] },
vec![vec![nonce as u8]],
3,
))
}
}
/// Returns a block number given the block id.
fn block_id_to_number(&self, at: &BlockId<Self::Block>) -> Result<Option<NumberFor<Self>>, Self::Error> {
Ok(match at {
BlockId::Number(num) => Some(*num),
BlockId::Hash(_) => None,
})
}
/// Returns a block hash given the block id.
fn block_id_to_hash(&self, at: &BlockId<Self::Block>) -> Result<Option<BlockHash<Self>>, Self::Error> {
Ok(match at {
BlockId::Number(num) => Some((*num).into()),
BlockId::Hash(_) => None,
})
}
/// Hash the extrinsic.
fn hash(&self, uxt: &ExtrinsicFor<Self>) -> Self::Hash {
(uxt.transfer.from.low_u64() << 5) + uxt.transfer.nonce
}
}
fn uxt(transfer: Transfer) -> Extrinsic {
Extrinsic {
transfer,
signature: Default::default(),
}
}
fn pool() -> Pool<TestApi> {
Pool::new(Default::default(), TestApi::default())
}
#[test]
#[ignore]
fn should_have_some_basic_tests() {
assert_eq!(true, false);
fn should_validate_and_import_transaction() {
// given
let pool = pool();
// when
let hash = pool.submit_one(&BlockId::Number(0), uxt(Transfer {
from: 1.into(),
to: 2.into(),
amount: 5,
nonce: 0,
})).unwrap();
// then
assert_eq!(pool.ready(|pending| pending.map(|tx| tx.hash.clone()).collect::<Vec<_>>()), vec![hash]);
}
#[test]
fn should_reject_if_temporarily_banned() {
// given
let pool = pool();
let uxt = uxt(Transfer {
from: 1.into(),
to: 2.into(),
amount: 5,
nonce: 0,
});
// when
pool.rotator.ban(&time::Instant::now(), &[pool.hash_of(&uxt)]);
let res = pool.submit_one(&BlockId::Number(0), uxt);
assert_eq!(pool.status().ready, 0);
assert_eq!(pool.status().future, 0);
// then
assert_matches!(res.unwrap_err().kind(), error::ErrorKind::TemporarilyBanned);
}
#[test]
fn should_notify_about_pool_events() {
let stream = {
// given
let pool = pool();
let stream = pool.import_notification_stream();
// when
let _hash = pool.submit_one(&BlockId::Number(0), uxt(Transfer {
from: 1.into(),
to: 2.into(),
amount: 5,
nonce: 0,
})).unwrap();
let _hash = pool.submit_one(&BlockId::Number(0), uxt(Transfer {
from: 1.into(),
to: 2.into(),
amount: 5,
nonce: 1,
})).unwrap();
// future doesn't count
let _hash = pool.submit_one(&BlockId::Number(0), uxt(Transfer {
from: 1.into(),
to: 2.into(),
amount: 5,
nonce: 3,
})).unwrap();
assert_eq!(pool.status().ready, 2);
assert_eq!(pool.status().future, 1);
stream
};
// then
let mut it = stream.wait();
assert_eq!(it.next(), Some(Ok(())));
assert_eq!(it.next(), Some(Ok(())));
assert_eq!(it.next(), None);
}
#[test]
fn should_clear_stale_transactions() {
// given
let pool = pool();
let hash1 = pool.submit_one(&BlockId::Number(0), uxt(Transfer {
from: 1.into(),
to: 2.into(),
amount: 5,
nonce: 0,
})).unwrap();
let hash2 = pool.submit_one(&BlockId::Number(0), uxt(Transfer {
from: 1.into(),
to: 2.into(),
amount: 5,
nonce: 1,
})).unwrap();
let hash3 = pool.submit_one(&BlockId::Number(0), uxt(Transfer {
from: 1.into(),
to: 2.into(),
amount: 5,
nonce: 3,
})).unwrap();
// when
pool.clear_stale(&BlockId::Number(5)).unwrap();
// then
assert_eq!(pool.all(3).len(), 0);
assert_eq!(pool.status().future, 0);
assert_eq!(pool.status().ready, 0);
// make sure they are temporarily banned as well
assert!(pool.rotator.is_banned(&hash1));
assert!(pool.rotator.is_banned(&hash2));
assert!(pool.rotator.is_banned(&hash3));
}
mod listener {
use super::*;
#[test]
fn should_trigger_ready_and_finalised() {
// given
let pool = pool();
let watcher = pool.submit_and_watch(&BlockId::Number(0), uxt(Transfer {
from: 1.into(),
to: 2.into(),
amount: 5,
nonce: 0,
})).unwrap();
assert_eq!(pool.status().ready, 1);
assert_eq!(pool.status().future, 0);
// when
pool.prune_tags(&BlockId::Number(2), vec![vec![0u8]]).unwrap();
assert_eq!(pool.status().ready, 0);
assert_eq!(pool.status().future, 0);
// then
let mut stream = watcher.into_stream().wait();
assert_eq!(stream.next(), Some(Ok(::watcher::Status::Ready)));
assert_eq!(stream.next(), Some(Ok(::watcher::Status::Finalised(2.into()))));
assert_eq!(stream.next(), None);
}
#[test]
fn should_trigger_future_and_ready_after_promoted() {
// given
let pool = pool();
let watcher = pool.submit_and_watch(&BlockId::Number(0), uxt(Transfer {
from: 1.into(),
to: 2.into(),
amount: 5,
nonce: 1,
})).unwrap();
assert_eq!(pool.status().ready, 0);
assert_eq!(pool.status().future, 1);
// when
pool.submit_one(&BlockId::Number(0), uxt(Transfer {
from: 1.into(),
to: 2.into(),
amount: 5,
nonce: 0,
})).unwrap();
assert_eq!(pool.status().ready, 2);
// then
let mut stream = watcher.into_stream().wait();
assert_eq!(stream.next(), Some(Ok(::watcher::Status::Future)));
assert_eq!(stream.next(), Some(Ok(::watcher::Status::Ready)));
}
#[test]
fn should_trigger_invalid_and_ban() {
// given
let pool = pool();
let uxt = uxt(Transfer {
from: 1.into(),
to: 2.into(),
amount: 5,
nonce: 0,
});
let watcher = pool.submit_and_watch(&BlockId::Number(0), uxt).unwrap();
assert_eq!(pool.status().ready, 1);
// when
pool.remove_invalid(&[*watcher.hash()]);
// then
let mut stream = watcher.into_stream().wait();
assert_eq!(stream.next(), Some(Ok(::watcher::Status::Ready)));
assert_eq!(stream.next(), Some(Ok(::watcher::Status::Invalid)));
assert_eq!(stream.next(), None);
}
#[test]
fn should_trigger_broadcasted() {
// given
let pool = pool();
let uxt = uxt(Transfer {
from: 1.into(),
to: 2.into(),
amount: 5,
nonce: 0,
});
let watcher = pool.submit_and_watch(&BlockId::Number(0), uxt).unwrap();
assert_eq!(pool.status().ready, 1);
// when
let mut map = HashMap::new();
let peers = vec!["a".into(), "b".into(), "c".into()];
map.insert(*watcher.hash(), peers.clone());
pool.on_broadcasted(map);
// then
let mut stream = watcher.into_stream().wait();
assert_eq!(stream.next(), Some(Ok(::watcher::Status::Ready)));
assert_eq!(stream.next(), Some(Ok(::watcher::Status::Broadcast(peers))));
}
}
}
@@ -22,7 +22,7 @@ use futures::{
};
/// Possible extrinsic status events
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum Status<H, H2> {
/// Extrinsic is part of the future queue.
@@ -37,6 +37,8 @@ pub enum Status<H, H2> {
Broadcast(Vec<String>),
/// Extrinsic has been dropped from the pool because of the limit.
Dropped,
/// Extrinsic was detected as invalid.
Invalid,
}
/// Extrinsic watcher.
@@ -45,9 +47,15 @@ pub enum Status<H, H2> {
#[derive(Debug)]
pub struct Watcher<H, H2> {
receiver: mpsc::UnboundedReceiver<Status<H, H2>>,
hash: H,
}
impl<H, H2> Watcher<H, H2> {
/// Returns the transaction hash.
pub fn hash(&self) -> &H {
&self.hash
}
/// Pipe the notifications to given sink.
///
/// Make sure to drive the future to completion.
@@ -75,14 +83,25 @@ impl<H, H2> Default for Sender<H, H2> {
impl<H: Clone, H2: Clone> Sender<H, H2> {
/// Add a new watcher to this sender object.
pub fn new_watcher(&mut self) -> Watcher<H, H2> {
pub fn new_watcher(&mut self, hash: H) -> Watcher<H, H2> {
let (tx, receiver) = mpsc::unbounded();
self.receivers.push(tx);
Watcher {
receiver,
hash,
}
}
/// Transaction became ready.
pub fn ready(&mut self) {
self.send(Status::Ready)
}
/// Transaction was moved to future.
pub fn future(&mut self) {
self.send(Status::Future)
}
/// Some state change (perhaps another extrinsic was included) rendered this extrinsic invalid.
pub fn usurped(&mut self, hash: H) {
self.send(Status::Usurped(hash))
@@ -94,6 +113,13 @@ impl<H: Clone, H2: Clone> Sender<H, H2> {
self.finalised = true;
}
/// Extrinsic has been marked as invalid by the block builder.
pub fn invalid(&mut self) {
self.send(Status::Invalid);
// we mark as finalised as there are no more notifications
self.finalised = true;
}
/// Transaction has been dropped from the pool because of the limit.
pub fn dropped(&mut self) {
self.send(Status::Dropped);