diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index 24ab0eab93..34c1f606c9 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -3319,6 +3319,7 @@ dependencies = [ name = "substrate-transaction-graph" version = "0.1.0" dependencies = [ + "assert_matches 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", @@ -3326,6 +3327,7 @@ dependencies = [ "serde 1.0.79 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.79 (registry+https://github.com/rust-lang/crates.io-index)", "sr-primitives 0.1.0", + "substrate-test-runtime 0.1.0", ] [[package]] diff --git a/substrate/core/rpc/src/author/tests.rs b/substrate/core/rpc/src/author/tests.rs index 2b79804014..249d3187c7 100644 --- a/substrate/core/rpc/src/author/tests.rs +++ b/substrate/core/rpc/src/author/tests.rs @@ -109,6 +109,11 @@ fn should_watch_extrinsic() { Extrinsic { transfer: tx, signature } }; AuthorApi::submit_rich_extrinsic(&p, replacement).unwrap(); + let (res, data) = runtime.block_on(data.into_future()).unwrap(); + assert_eq!( + res, + Some(r#"{"jsonrpc":"2.0","method":"test","params":{"result":"ready","subscription":1}}"#.into()) + ); assert_eq!( runtime.block_on(data.into_future()).unwrap().0, Some(r#"{"jsonrpc":"2.0","method":"test","params":{"result":{"usurped":"0xed454dcee51431679c2559403187a56567fded1fc50b6ae3aada87c1d412df5c"},"subscription":1}}"#.into()) diff --git a/substrate/core/transaction-pool/graph/Cargo.toml b/substrate/core/transaction-pool/graph/Cargo.toml index d537be2bb4..9c154d4b3a 100644 --- a/substrate/core/transaction-pool/graph/Cargo.toml +++ b/substrate/core/transaction-pool/graph/Cargo.toml @@ -11,3 +11,7 @@ parking_lot = "0.4" serde = "1.0" serde_derive = "1.0" sr-primitives = { path = "../../sr-primitives" } + +[dev-dependencies] +assert_matches = "1.1" +substrate-test-runtime = { path = "../../test-runtime" } diff --git a/substrate/core/transaction-pool/graph/src/lib.rs b/substrate/core/transaction-pool/graph/src/lib.rs index 7e8d479c08..e41284efe3 100644 --- a/substrate/core/transaction-pool/graph/src/lib.rs +++ b/substrate/core/transaction-pool/graph/src/lib.rs @@ -37,6 +37,12 @@ extern crate sr_primitives; #[macro_use] extern crate log; #[macro_use] extern crate serde_derive; +#[cfg(test)] +extern crate substrate_test_runtime as test_runtime; +#[cfg(test)] +#[macro_use] +extern crate assert_matches; + mod future; mod listener; mod pool; diff --git a/substrate/core/transaction-pool/graph/src/listener.rs b/substrate/core/transaction-pool/graph/src/listener.rs index 14f540cbc4..1947bfb93b 100644 --- a/substrate/core/transaction-pool/graph/src/listener.rs +++ b/substrate/core/transaction-pool/graph/src/listener.rs @@ -53,8 +53,8 @@ impl Listener { /// /// The watcher can be used to subscribe to lifecycle events of that extrinsic. pub fn create_watcher(&mut self, hash: H) -> watcher::Watcher { - let sender = self.watchers.entry(hash).or_insert_with(watcher::Sender::default); - sender.new_watcher() + let sender = self.watchers.entry(hash.clone()).or_insert_with(watcher::Sender::default); + sender.new_watcher(hash) } /// Notify the listeners about extrinsic broadcast. @@ -64,13 +64,15 @@ impl Listener { /// New transaction was added to the ready pool or promoted from the future pool. pub fn ready(&mut self, tx: &H, old: Option<&H>) { + self.fire(tx, |watcher| watcher.ready()); if let Some(old) = old { self.fire(old, |watcher| watcher.usurped(tx.clone())); } } /// New transaction was added to the future pool. - pub fn future(&mut self, _tx: &H) { + pub fn future(&mut self, tx: &H) { + self.fire(tx, |watcher| watcher.future()); } /// Transaction was dropped from the pool because of the limit. @@ -81,14 +83,10 @@ impl Listener { }) } - /// Transaction was rejected from the pool. - pub fn rejected(&mut self, tx: &H, is_invalid: bool) { - warn!(target: "transaction-pool", "Extrinsic rejected ({}): {:?}", is_invalid, tx); - } - /// Transaction was removed as invalid. pub fn invalid(&mut self, tx: &H) { warn!(target: "transaction-pool", "Extrinsic invalid: {:?}", tx); + self.fire(tx, |watcher| watcher.invalid()); } /// Transaction was pruned from the pool. diff --git a/substrate/core/transaction-pool/graph/src/pool.rs b/substrate/core/transaction-pool/graph/src/pool.rs index 6e89df581c..d315425738 100644 --- a/substrate/core/transaction-pool/graph/src/pool.rs +++ b/substrate/core/transaction-pool/graph/src/pool.rs @@ -101,7 +101,7 @@ impl Pool { .map(|xt| -> Result<_, B::Error> { let hash = self.api.hash(&xt); if self.rotator.is_banned(&hash) { - return Err(error::ErrorKind::TemporarilyBanned.into())?; + bail!(error::Error::from(error::ErrorKind::TemporarilyBanned)) } match self.api.validate_transaction(at, &xt)? { @@ -119,7 +119,7 @@ impl Pool { bail!(error::Error::from(error::ErrorKind::InvalidTransaction)) }, TransactionValidity::Unknown => { - self.listener.write().rejected(&hash, false); + self.listener.write().invalid(&hash); bail!(error::Error::from(error::ErrorKind::UnknownTransactionValidity)) }, } @@ -127,7 +127,9 @@ impl Pool { .map(|tx| { let imported = self.pool.write().import(tx?)?; - self.import_notification_sinks.lock().retain(|sink| sink.unbounded_send(()).is_ok()); + if let base::Imported::Ready { .. } = imported { + self.import_notification_sinks.lock().retain(|sink| sink.unbounded_send(()).is_ok()); + } let mut listener = self.listener.write(); fire_events(&mut *listener, &imported); @@ -143,8 +145,10 @@ impl Pool { /// Import a single extrinsic and starts to watch their progress in the pool. pub fn submit_and_watch(&self, at: &BlockId, xt: ExtrinsicFor) -> Result, BlockHash>, B::Error> { - let xt = self.submit_one(at, xt)?; - Ok(self.listener.write().create_watcher(xt)) + let hash = self.api.hash(&xt); + let watcher = self.listener.write().create_watcher(hash); + self.submit_one(at, xt)?; + Ok(watcher) } /// Prunes ready transactions that provide given list of tags. @@ -300,7 +304,7 @@ fn fire_events( base::Imported::Ready { ref promoted, ref failed, ref removed, ref hash } => { listener.ready(hash, None); for f in failed { - listener.rejected(f, true); + listener.invalid(f); } for r in removed { listener.dropped(&r.hash, Some(hash)); @@ -317,9 +321,287 @@ fn fire_events( #[cfg(test)] mod tests { + use super::*; + use futures::Stream; + use test_runtime::{Block, Extrinsic, Transfer}; + + #[derive(Debug, Default)] + struct TestApi; + + impl ChainApi for TestApi { + type Block = Block; + type Hash = u64; + type Error = error::Error; + + /// Verify extrinsic at given block. + fn validate_transaction(&self, at: &BlockId, uxt: &ExtrinsicFor) -> Result { + let block_number = self.block_id_to_number(at)?.unwrap(); + let nonce = uxt.transfer.nonce; + + if nonce < block_number { + Ok(TransactionValidity::Invalid) + } else { + Ok(TransactionValidity::Valid( + 4, + if nonce > block_number { vec![vec![nonce as u8 - 1]] } else { vec![] }, + vec![vec![nonce as u8]], + 3, + )) + } + } + + /// Returns a block number given the block id. + fn block_id_to_number(&self, at: &BlockId) -> Result>, Self::Error> { + Ok(match at { + BlockId::Number(num) => Some(*num), + BlockId::Hash(_) => None, + }) + } + + /// Returns a block hash given the block id. + fn block_id_to_hash(&self, at: &BlockId) -> Result>, Self::Error> { + Ok(match at { + BlockId::Number(num) => Some((*num).into()), + BlockId::Hash(_) => None, + }) + } + + /// Hash the extrinsic. + fn hash(&self, uxt: &ExtrinsicFor) -> Self::Hash { + (uxt.transfer.from.low_u64() << 5) + uxt.transfer.nonce + } + } + + fn uxt(transfer: Transfer) -> Extrinsic { + Extrinsic { + transfer, + signature: Default::default(), + } + } + + fn pool() -> Pool { + Pool::new(Default::default(), TestApi::default()) + } + + #[test] - #[ignore] - fn should_have_some_basic_tests() { - assert_eq!(true, false); + fn should_validate_and_import_transaction() { + // given + let pool = pool(); + + // when + let hash = pool.submit_one(&BlockId::Number(0), uxt(Transfer { + from: 1.into(), + to: 2.into(), + amount: 5, + nonce: 0, + })).unwrap(); + + // then + assert_eq!(pool.ready(|pending| pending.map(|tx| tx.hash.clone()).collect::>()), vec![hash]); + } + + #[test] + fn should_reject_if_temporarily_banned() { + // given + let pool = pool(); + let uxt = uxt(Transfer { + from: 1.into(), + to: 2.into(), + amount: 5, + nonce: 0, + }); + + // when + pool.rotator.ban(&time::Instant::now(), &[pool.hash_of(&uxt)]); + let res = pool.submit_one(&BlockId::Number(0), uxt); + assert_eq!(pool.status().ready, 0); + assert_eq!(pool.status().future, 0); + + // then + assert_matches!(res.unwrap_err().kind(), error::ErrorKind::TemporarilyBanned); + } + + #[test] + fn should_notify_about_pool_events() { + let stream = { + // given + let pool = pool(); + let stream = pool.import_notification_stream(); + + // when + let _hash = pool.submit_one(&BlockId::Number(0), uxt(Transfer { + from: 1.into(), + to: 2.into(), + amount: 5, + nonce: 0, + })).unwrap(); + let _hash = pool.submit_one(&BlockId::Number(0), uxt(Transfer { + from: 1.into(), + to: 2.into(), + amount: 5, + nonce: 1, + })).unwrap(); + // future doesn't count + let _hash = pool.submit_one(&BlockId::Number(0), uxt(Transfer { + from: 1.into(), + to: 2.into(), + amount: 5, + nonce: 3, + })).unwrap(); + + assert_eq!(pool.status().ready, 2); + assert_eq!(pool.status().future, 1); + stream + }; + + // then + let mut it = stream.wait(); + assert_eq!(it.next(), Some(Ok(()))); + assert_eq!(it.next(), Some(Ok(()))); + assert_eq!(it.next(), None); + } + + #[test] + fn should_clear_stale_transactions() { + // given + let pool = pool(); + let hash1 = pool.submit_one(&BlockId::Number(0), uxt(Transfer { + from: 1.into(), + to: 2.into(), + amount: 5, + nonce: 0, + })).unwrap(); + let hash2 = pool.submit_one(&BlockId::Number(0), uxt(Transfer { + from: 1.into(), + to: 2.into(), + amount: 5, + nonce: 1, + })).unwrap(); + let hash3 = pool.submit_one(&BlockId::Number(0), uxt(Transfer { + from: 1.into(), + to: 2.into(), + amount: 5, + nonce: 3, + })).unwrap(); + + // when + pool.clear_stale(&BlockId::Number(5)).unwrap(); + + // then + assert_eq!(pool.all(3).len(), 0); + assert_eq!(pool.status().future, 0); + assert_eq!(pool.status().ready, 0); + // make sure they are temporarily banned as well + assert!(pool.rotator.is_banned(&hash1)); + assert!(pool.rotator.is_banned(&hash2)); + assert!(pool.rotator.is_banned(&hash3)); + } + + mod listener { + use super::*; + + #[test] + fn should_trigger_ready_and_finalised() { + // given + let pool = pool(); + let watcher = pool.submit_and_watch(&BlockId::Number(0), uxt(Transfer { + from: 1.into(), + to: 2.into(), + amount: 5, + nonce: 0, + })).unwrap(); + assert_eq!(pool.status().ready, 1); + assert_eq!(pool.status().future, 0); + + // when + pool.prune_tags(&BlockId::Number(2), vec![vec![0u8]]).unwrap(); + assert_eq!(pool.status().ready, 0); + assert_eq!(pool.status().future, 0); + + // then + let mut stream = watcher.into_stream().wait(); + assert_eq!(stream.next(), Some(Ok(::watcher::Status::Ready))); + assert_eq!(stream.next(), Some(Ok(::watcher::Status::Finalised(2.into())))); + assert_eq!(stream.next(), None); + } + + #[test] + fn should_trigger_future_and_ready_after_promoted() { + // given + let pool = pool(); + let watcher = pool.submit_and_watch(&BlockId::Number(0), uxt(Transfer { + from: 1.into(), + to: 2.into(), + amount: 5, + nonce: 1, + })).unwrap(); + assert_eq!(pool.status().ready, 0); + assert_eq!(pool.status().future, 1); + + // when + pool.submit_one(&BlockId::Number(0), uxt(Transfer { + from: 1.into(), + to: 2.into(), + amount: 5, + nonce: 0, + })).unwrap(); + assert_eq!(pool.status().ready, 2); + + // then + let mut stream = watcher.into_stream().wait(); + assert_eq!(stream.next(), Some(Ok(::watcher::Status::Future))); + assert_eq!(stream.next(), Some(Ok(::watcher::Status::Ready))); + } + + #[test] + fn should_trigger_invalid_and_ban() { + // given + let pool = pool(); + let uxt = uxt(Transfer { + from: 1.into(), + to: 2.into(), + amount: 5, + nonce: 0, + }); + let watcher = pool.submit_and_watch(&BlockId::Number(0), uxt).unwrap(); + assert_eq!(pool.status().ready, 1); + + // when + pool.remove_invalid(&[*watcher.hash()]); + + + // then + let mut stream = watcher.into_stream().wait(); + assert_eq!(stream.next(), Some(Ok(::watcher::Status::Ready))); + assert_eq!(stream.next(), Some(Ok(::watcher::Status::Invalid))); + assert_eq!(stream.next(), None); + } + + #[test] + fn should_trigger_broadcasted() { + // given + let pool = pool(); + let uxt = uxt(Transfer { + from: 1.into(), + to: 2.into(), + amount: 5, + nonce: 0, + }); + let watcher = pool.submit_and_watch(&BlockId::Number(0), uxt).unwrap(); + assert_eq!(pool.status().ready, 1); + + // when + let mut map = HashMap::new(); + let peers = vec!["a".into(), "b".into(), "c".into()]; + map.insert(*watcher.hash(), peers.clone()); + pool.on_broadcasted(map); + + + // then + let mut stream = watcher.into_stream().wait(); + assert_eq!(stream.next(), Some(Ok(::watcher::Status::Ready))); + assert_eq!(stream.next(), Some(Ok(::watcher::Status::Broadcast(peers)))); + } } } diff --git a/substrate/core/transaction-pool/graph/src/watcher.rs b/substrate/core/transaction-pool/graph/src/watcher.rs index bfbd384482..7ca4a66283 100644 --- a/substrate/core/transaction-pool/graph/src/watcher.rs +++ b/substrate/core/transaction-pool/graph/src/watcher.rs @@ -22,7 +22,7 @@ use futures::{ }; /// Possible extrinsic status events -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub enum Status { /// Extrinsic is part of the future queue. @@ -37,6 +37,8 @@ pub enum Status { Broadcast(Vec), /// Extrinsic has been dropped from the pool because of the limit. Dropped, + /// Extrinsic was detected as invalid. + Invalid, } /// Extrinsic watcher. @@ -45,9 +47,15 @@ pub enum Status { #[derive(Debug)] pub struct Watcher { receiver: mpsc::UnboundedReceiver>, + hash: H, } impl Watcher { + /// Returns the transaction hash. + pub fn hash(&self) -> &H { + &self.hash + } + /// Pipe the notifications to given sink. /// /// Make sure to drive the future to completion. @@ -75,14 +83,25 @@ impl Default for Sender { impl Sender { /// Add a new watcher to this sender object. - pub fn new_watcher(&mut self) -> Watcher { + pub fn new_watcher(&mut self, hash: H) -> Watcher { let (tx, receiver) = mpsc::unbounded(); self.receivers.push(tx); Watcher { receiver, + hash, } } + /// Transaction became ready. + pub fn ready(&mut self) { + self.send(Status::Ready) + } + + /// Transaction was moved to future. + pub fn future(&mut self) { + self.send(Status::Future) + } + /// Some state change (perhaps another extrinsic was included) rendered this extrinsic invalid. pub fn usurped(&mut self, hash: H) { self.send(Status::Usurped(hash)) @@ -94,6 +113,13 @@ impl Sender { self.finalised = true; } + /// Extrinsic has been marked as invalid by the block builder. + pub fn invalid(&mut self) { + self.send(Status::Invalid); + // we mark as finalised as there are no more notifications + self.finalised = true; + } + /// Transaction has been dropped from the pool because of the limit. pub fn dropped(&mut self) { self.send(Status::Dropped);