From 671b0e0007d3738ffb72f3f0039714f369e550ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Fri, 12 Oct 2018 13:09:35 +0200 Subject: [PATCH] Tagged transaction queue integration (#893) * Make the graph generic. * Adapting pool API for the graph. * Merge pool & graph. * Restructure. * Fix test of transaction pool. * Get rid of node/transaction-pool. * Compilation fixes. * Test7 * Fix compilation of tests. * Revert runtime changes. * Add validate_transaction to test-runtime. * Fix RPC tests. * Add clearing of the old transactions. * Trigger pool events. * Use new queue API. * Fix wasm build, re-export Hasher. * No warning if validate transaction fails. * Get rid of Into and use As --- substrate/Cargo.lock | 52 +- substrate/Cargo.toml | 3 +- substrate/core/cli/src/informant.rs | 8 +- substrate/core/client/src/client.rs | 34 +- substrate/core/primitives/src/lib.rs | 1 + substrate/core/rpc/Cargo.toml | 1 + substrate/core/rpc/src/author/error.rs | 4 +- substrate/core/rpc/src/author/mod.rs | 27 +- substrate/core/rpc/src/author/tests.rs | 152 ++--- substrate/core/rpc/src/lib.rs | 3 + substrate/core/service/src/components.rs | 10 +- substrate/core/service/src/config.rs | 4 +- substrate/core/service/src/lib.rs | 35 +- substrate/core/service/test/src/lib.rs | 2 +- substrate/core/sr-api/src/lib.rs | 15 +- .../core/sr-primitives/src/generic/header.rs | 1 - .../src/generic/unchecked_mortal_extrinsic.rs | 2 +- substrate/core/sr-primitives/src/traits.rs | 2 +- .../sr-primitives/src/transaction_validity.rs | 8 +- substrate/core/test-runtime/src/lib.rs | 9 +- substrate/core/test-runtime/src/system.rs | 58 +- substrate/core/transaction-pool/Cargo.toml | 10 +- .../graph}/Cargo.toml | 6 +- .../core/transaction-pool/graph/README.adoc | 13 + .../graph/src/base_pool.rs} | 151 +++-- .../graph}/src/error.rs | 31 + .../graph}/src/future.rs | 25 +- .../graph}/src/lib.rs | 24 +- .../{ => graph}/src/listener.rs | 99 ++- .../core/transaction-pool/graph/src/pool.rs | 333 ++++++++++ .../graph}/src/ready.rs | 77 ++- .../{ => graph}/src/rotator.rs | 66 +- .../{ => graph}/src/watcher.rs | 6 +- .../transaction-pool/{ => src}/README.adoc | 0 substrate/core/transaction-pool/src/api.rs | 78 +++ substrate/core/transaction-pool/src/error.rs | 25 +- substrate/core/transaction-pool/src/lib.rs | 42 +- substrate/core/transaction-pool/src/pool.rs | 626 ------------------ substrate/core/transaction-pool/src/tests.rs | 177 +++++ substrate/node/consensus/Cargo.toml | 18 +- substrate/node/consensus/src/lib.rs | 91 +-- substrate/node/consensus/src/service.rs | 11 +- substrate/node/service/Cargo.toml | 22 +- substrate/node/service/src/lib.rs | 19 +- substrate/node/transaction-pool/Cargo.toml | 16 - substrate/node/transaction-pool/src/error.rs | 73 -- substrate/node/transaction-pool/src/lib.rs | 275 -------- substrate/srml/executive/src/lib.rs | 13 +- 48 files changed, 1234 insertions(+), 1524 deletions(-) rename substrate/core/{transaction-graph => transaction-pool/graph}/Cargo.toml (58%) create mode 100644 substrate/core/transaction-pool/graph/README.adoc rename substrate/core/{transaction-graph/src/pool.rs => transaction-pool/graph/src/base_pool.rs} (85%) rename substrate/core/{transaction-graph => transaction-pool/graph}/src/error.rs (57%) rename substrate/core/{transaction-graph => transaction-pool/graph}/src/future.rs (88%) rename substrate/core/{transaction-graph => transaction-pool/graph}/src/lib.rs (72%) rename substrate/core/transaction-pool/{ => graph}/src/listener.rs (50%) create mode 100644 substrate/core/transaction-pool/graph/src/pool.rs rename substrate/core/{transaction-graph => transaction-pool/graph}/src/ready.rs (90%) rename substrate/core/transaction-pool/{ => graph}/src/rotator.rs (82%) rename substrate/core/transaction-pool/{ => graph}/src/watcher.rs (96%) rename substrate/core/transaction-pool/{ => src}/README.adoc (100%) create mode 100644 substrate/core/transaction-pool/src/api.rs delete mode 100644 substrate/core/transaction-pool/src/pool.rs create mode 100644 substrate/core/transaction-pool/src/tests.rs delete mode 100644 substrate/node/transaction-pool/Cargo.toml delete mode 100644 substrate/node/transaction-pool/src/error.rs delete mode 100644 substrate/node/transaction-pool/src/lib.rs diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index a18260532e..a399308d55 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -1598,7 +1598,6 @@ dependencies = [ "log 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", "node-primitives 0.1.0", "node-runtime 0.1.0", - "node-transaction-pool 0.1.0", "parity-codec 2.0.3 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "rhododendron 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1608,6 +1607,7 @@ dependencies = [ "substrate-client 0.1.0", "substrate-keyring 0.1.0", "substrate-primitives 0.1.0", + "substrate-transaction-pool 0.1.0", "tokio 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -1715,7 +1715,6 @@ dependencies = [ "node-network 0.1.0", "node-primitives 0.1.0", "node-runtime 0.1.0", - "node-transaction-pool 0.1.0", "parity-codec 2.0.3 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "rhododendron 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1731,23 +1730,8 @@ dependencies = [ "substrate-service-test 0.3.0", "substrate-telemetry 0.3.0", "substrate-test-client 0.1.0", - "tokio 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", -] - -[[package]] -name = "node-transaction-pool" -version = "0.1.0" -dependencies = [ - "error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", - "log 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", - "node-primitives 0.1.0", - "node-runtime 0.1.0", - "parity-codec 2.0.3 (registry+https://github.com/rust-lang/crates.io-index)", - "sr-primitives 0.1.0", - "substrate-client 0.1.0", - "substrate-keyring 0.1.0", - "substrate-primitives 0.1.0", "substrate-transaction-pool 0.1.0", + "tokio 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -3107,6 +3091,7 @@ version = "0.1.0" dependencies = [ "assert_matches 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", + "hex-literal 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-core 8.0.2 (git+https://github.com/paritytech/jsonrpc.git)", "jsonrpc-macros 8.0.1 (git+https://github.com/paritytech/jsonrpc.git)", "jsonrpc-pubsub 8.0.1 (git+https://github.com/paritytech/jsonrpc.git)", @@ -3275,7 +3260,11 @@ name = "substrate-transaction-graph" version = "0.1.0" dependencies = [ "error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.24 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", + "parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.70 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_derive 1.0.70 (registry+https://github.com/rust-lang/crates.io-index)", "sr-primitives 0.1.0", ] @@ -3288,12 +3277,12 @@ dependencies = [ "log 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", "parity-codec 2.0.3 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", - "serde 1.0.70 (registry+https://github.com/rust-lang/crates.io-index)", - "serde_derive 1.0.70 (registry+https://github.com/rust-lang/crates.io-index)", "sr-primitives 0.1.0", + "substrate-client 0.1.0", "substrate-keyring 0.1.0", + "substrate-primitives 0.1.0", "substrate-test-client 0.1.0", - "transaction-pool 1.13.3 (registry+https://github.com/rust-lang/crates.io-index)", + "substrate-transaction-graph 0.1.0", ] [[package]] @@ -3686,30 +3675,11 @@ dependencies = [ "tokio-reactor 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "trace-time" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "traitobject" version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -[[package]] -name = "transaction-pool" -version = "1.13.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", - "log 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", - "smallvec 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", - "trace-time 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "trie-bench" version = "0.9.0" @@ -4391,9 +4361,7 @@ dependencies = [ "checksum tokio-tls 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "772f4b04e560117fe3b0a53e490c16ddc8ba6ec437015d91fa385564996ed913" "checksum tokio-udp 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "137bda266504893ac4774e0ec4c2108f7ccdbcb7ac8dced6305fe9e4e0b5041a" "checksum tokio-uds 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "424c1ed15a0132251813ccea50640b224c809d6ceafb88154c1a8775873a0e89" -"checksum trace-time 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5aea07da6582e957c6e737eeb63a5af79e648eeeaaaba8fd9a417f1124bafa41" "checksum traitobject 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "efd1f82c56340fdf16f2a953d7bda4f8fdffba13d93b00844c25572110b26079" -"checksum transaction-pool 1.13.3 (registry+https://github.com/rust-lang/crates.io-index)" = "e5866e5126b14358f1d7af4bf51a0be677a363799b90e655edcec8254edef1d2" "checksum trie-bench 0.9.0 (git+https://github.com/paritytech/trie)" = "" "checksum trie-db 0.9.0 (git+https://github.com/paritytech/trie)" = "" "checksum trie-root 0.9.0 (git+https://github.com/paritytech/trie)" = "" diff --git a/substrate/Cargo.toml b/substrate/Cargo.toml index 937e7f8065..a0c4730ce9 100644 --- a/substrate/Cargo.toml +++ b/substrate/Cargo.toml @@ -36,8 +36,8 @@ members = [ "core/sr-sandbox", "core/sr-std", "core/sr-version", - "core/transaction-graph", "core/transaction-pool", + "core/transaction-pool/graph", "srml/support", "srml/balances", "srml/consensus", @@ -69,7 +69,6 @@ members = [ "node/primitives", "node/runtime", "node/service", - "node/transaction-pool", "subkey", ] exclude = [ diff --git a/substrate/core/cli/src/informant.rs b/substrate/core/cli/src/informant.rs index 99e3ddfaff..85d82132b4 100644 --- a/substrate/core/cli/src/informant.rs +++ b/substrate/core/cli/src/informant.rs @@ -58,7 +58,7 @@ pub fn start(service: &Service, exit: ::exit_future::Exit, handle: TaskExe (SyncState::Downloading, Some(n)) => (format!("Syncing{}", speed()), format!(", target=#{}", n)), }; last_number = Some(best_number); - let txpool_status = txpool.light_status(); + let txpool_status = txpool.status(); info!( target: "substrate", "{}{} ({} peers), best: #{} ({})", @@ -81,7 +81,7 @@ pub fn start(service: &Service, exit: ::exit_future::Exit, handle: TaskExe "peers" => num_peers, "height" => best_number, "best" => ?hash, - "txcount" => txpool_status.transaction_count, + "txcount" => txpool_status.ready, "cpu" => cpu_usage, "memory" => memory ); @@ -100,8 +100,8 @@ pub fn start(service: &Service, exit: ::exit_future::Exit, handle: TaskExe let txpool = service.transaction_pool(); let display_txpool_import = txpool.import_notification_stream().for_each(move |_| { - let status = txpool.light_status(); - telemetry!("txpool.import"; "mem_usage" => status.mem_usage, "count" => status.transaction_count, "sender" => status.senders); + let status = txpool.status(); + telemetry!("txpool.import"; "ready" => status.ready, "future" => status.future); Ok(()) }); diff --git a/substrate/core/client/src/client.rs b/substrate/core/client/src/client.rs index 75e5442a2a..22024642cf 100644 --- a/substrate/core/client/src/client.rs +++ b/substrate/core/client/src/client.rs @@ -21,7 +21,11 @@ use error::{Error, ErrorKind}; use futures::sync::mpsc; use parking_lot::{Mutex, RwLock}; use primitives::AuthorityId; -use runtime_primitives::{bft::Justification, generic::{BlockId, SignedBlock, Block as RuntimeBlock}}; +use runtime_primitives::{ + bft::Justification, + generic::{BlockId, SignedBlock, Block as RuntimeBlock}, + transaction_validity::{TransactionValidity, TransactionTag}, +}; use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, Zero, As, NumberFor, CurrentHeight, BlockNumberToHash}; use runtime_primitives::{ApplyResult, BuildStorage}; use runtime_api as api; @@ -158,6 +162,8 @@ pub struct BlockImportNotification { pub header: Block::Header, /// Is this the new best block. pub is_new_best: bool, + /// Tags provided by transactions imported in that block. + pub tags: Vec, } /// Summary of a finalized block. @@ -539,6 +545,30 @@ impl Client where result } + // TODO [ToDr] Optimize and re-use tags from the pool. + fn transaction_tags(&self, at: Block::Hash, body: &Option>) -> error::Result> { + let id = BlockId::Hash(at); + Ok(match body { + None => vec![], + Some(ref extrinsics) => { + let mut tags = vec![]; + for tx in extrinsics { + let tx = api::TaggedTransactionQueue::validate_transaction(self, &id, &tx)?; + match tx { + TransactionValidity::Valid(_, _, mut provides, ..) => { + tags.append(&mut provides); + }, + // silently ignore invalid extrinsics, + // cause they might just be inherent + _ => {} + } + + } + tags + }, + }) + } + fn execute_and_import_block( &self, origin: BlockOrigin, @@ -574,6 +604,7 @@ impl Client where self.apply_finality(parent_hash, last_best, make_notifications)?; } + let tags = self.transaction_tags(parent_hash, &body)?; let mut transaction = self.backend.begin_operation(BlockId::Hash(parent_hash))?; let (storage_update, changes_update, storage_changes) = match transaction.state()? { Some(transaction_state) => { @@ -659,6 +690,7 @@ impl Client where origin, header, is_new_best, + tags, }; self.import_notification_sinks.lock() diff --git a/substrate/core/primitives/src/lib.rs b/substrate/core/primitives/src/lib.rs index 0ba1693845..affa1c5c00 100644 --- a/substrate/core/primitives/src/lib.rs +++ b/substrate/core/primitives/src/lib.rs @@ -114,6 +114,7 @@ pub use self::uint::U256; pub use authority_id::AuthorityId; pub use changes_trie::ChangesTrieConfiguration; +pub use hash_db::Hasher; // Switch back to Blake after PoC-3 is out // pub use self::hasher::blake::BlakeHasher; pub use self::hasher::blake2::Blake2Hasher; diff --git a/substrate/core/rpc/Cargo.toml b/substrate/core/rpc/Cargo.toml index e1ac0a34aa..a846ae3605 100644 --- a/substrate/core/rpc/Cargo.toml +++ b/substrate/core/rpc/Cargo.toml @@ -23,3 +23,4 @@ tokio = "0.1.7" assert_matches = "1.1" substrate-test-client = { path = "../test-client" } rustc-hex = "2.0" +hex-literal = "0.1" diff --git a/substrate/core/rpc/src/author/error.rs b/substrate/core/rpc/src/author/error.rs index 02cfcc591a..32f4809646 100644 --- a/substrate/core/rpc/src/author/error.rs +++ b/substrate/core/rpc/src/author/error.rs @@ -17,14 +17,14 @@ //! Authoring RPC module errors. use client; -use transaction_pool; +use transaction_pool::txpool; use rpc; use errors; error_chain! { links { - Pool(transaction_pool::Error, transaction_pool::ErrorKind) #[doc = "Pool error"]; + Pool(txpool::error::Error, txpool::error::ErrorKind) #[doc = "Pool error"]; Client(client::error::Error, client::error::ErrorKind) #[doc = "Client error"]; } errors { diff --git a/substrate/core/rpc/src/author/mod.rs b/substrate/core/rpc/src/author/mod.rs index 73eea6f104..cd84f3947b 100644 --- a/substrate/core/rpc/src/author/mod.rs +++ b/substrate/core/rpc/src/author/mod.rs @@ -21,15 +21,15 @@ use std::sync::Arc; use client::{self, Client}; use codec::Decode; use transaction_pool::{ - Pool, - IntoPoolError, - ChainApi as PoolChainApi, - watcher::Status, - VerifiedTransaction, - AllExtrinsics, - ExHash, - ExtrinsicFor, - HashOf, + txpool::{ + ChainApi as PoolChainApi, + BlockHash, + ExHash, + ExtrinsicFor, + IntoPoolError, + Pool, + watcher::Status, + }, }; use jsonrpc_macros::pubsub; use jsonrpc_pubsub::SubscriptionId; @@ -103,7 +103,7 @@ impl Author where } } -impl AuthorApi, HashOf, ExtrinsicFor

, AllExtrinsics

> for Author where +impl AuthorApi, BlockHash

, ExtrinsicFor

, Vec>> for Author where B: client::backend::Backend<

::Block, Blake2Hasher> + Send + Sync + 'static, E: client::CallExecutor<

::Block, Blake2Hasher> + Send + Sync + 'static, P: PoolChainApi + Sync + Send + 'static, @@ -124,14 +124,13 @@ impl AuthorApi, HashOf, ExtrinsicFor

, AllExtrins .map(Into::into) .unwrap_or_else(|e| error::ErrorKind::Verification(Box::new(e)).into()) ) - .map(|ex| ex.hash().clone()) } - fn pending_extrinsics(&self) -> Result> { - Ok(self.pool.all()) + fn pending_extrinsics(&self) -> Result>> { + Ok(self.pool.all(usize::max_value())) } - fn watch_extrinsic(&self, _metadata: Self::Metadata, subscriber: pubsub::Subscriber, HashOf>>, xt: Bytes) { + fn watch_extrinsic(&self, _metadata: Self::Metadata, subscriber: pubsub::Subscriber, BlockHash

>>, xt: Bytes) { let submit = || -> Result<_> { let best_block_hash = self.client.info()?.chain.best_hash; let dxt = <

::Block as traits::Block>::Extrinsic::decode(&mut &xt[..]).ok_or(error::Error::from(error::ErrorKind::BadFormat))?; diff --git a/substrate/core/rpc/src/author/tests.rs b/substrate/core/rpc/src/author/tests.rs index c69366832e..2b79804014 100644 --- a/substrate/core/rpc/src/author/tests.rs +++ b/substrate/core/rpc/src/author/tests.rs @@ -16,126 +16,66 @@ use super::*; -use std::{sync::Arc, result::Result}; +use std::sync::Arc; use codec::Encode; -use transaction_pool::{VerifiedTransaction, scoring, Transaction, ChainApi, Error as PoolError, - Readiness, ExtrinsicFor, VerifiedFor}; -use test_client::runtime::{Block, Extrinsic, Transfer}; +use transaction_pool::{ + txpool::Pool, + ChainApi, +}; +use primitives::H256; +use test_client::keyring::Keyring; +use test_client::runtime::{Extrinsic, Transfer}; use test_client; use tokio::runtime; -use runtime_primitives::{traits, generic::BlockId}; -#[derive(Clone, Debug)] -pub struct Verified -{ - sender: u64, - hash: u64, -} - -impl VerifiedTransaction for Verified { - type Hash = u64; - type Sender = u64; - - fn hash(&self) -> &Self::Hash { &self.hash } - fn sender(&self) -> &Self::Sender { &self.sender } - fn mem_usage(&self) -> usize { 256 } -} - -struct TestApi; - -impl ChainApi for TestApi { - type Block = Block; - type Hash = u64; - type Sender = u64; - type Error = PoolError; - type VEx = Verified; - type Score = u64; - type Event = (); - type Ready = (); - - fn latest_hash(&self) -> ::Hash { - 1.into() - } - - fn verify_transaction(&self, _at: &BlockId, uxt: &ExtrinsicFor) -> Result { - Ok(Verified { - sender: uxt.transfer.from[31] as u64, - hash: uxt.transfer.nonce, - }) - } - - fn is_ready(&self, _at: &BlockId, _c: &mut Self::Ready, _xt: &VerifiedFor) -> Readiness { - Readiness::Ready - } - - fn ready(&self) -> Self::Ready { } - - fn compare(old: &VerifiedFor, other: &VerifiedFor) -> ::std::cmp::Ordering { - old.verified.hash().cmp(&other.verified.hash()) - } - - fn choose(_old: &VerifiedFor, _new: &VerifiedFor) -> scoring::Choice { - scoring::Choice::ReplaceOld - } - - fn update_scores(xts: &[Transaction>], scores: &mut [Self::Score], _change: scoring::Change<()>) { - for i in 0..xts.len() { - scores[i] = xts[i].verified.sender - } - } - - fn should_replace(_old: &VerifiedFor, _new: &VerifiedFor) -> scoring::Choice { - scoring::Choice::ReplaceOld - } -} - -type DummyTxPool = Pool; - -fn uxt(sender: u64, hash: u64) -> Extrinsic { - Extrinsic { - signature: Default::default(), - transfer: Transfer { - amount: Default::default(), - nonce: hash, - from: From::from(sender), - to: Default::default(), - } - } +fn uxt(sender: Keyring, nonce: u64) -> Extrinsic { + let tx = Transfer { + amount: Default::default(), + nonce, + from: sender.to_raw_public().into(), + to: Default::default(), + }; + let signature = Keyring::from_raw_public(tx.from.0).unwrap().sign(&tx.encode()).into(); + Extrinsic { transfer: tx, signature } } #[test] fn submit_transaction_should_not_cause_error() { let runtime = runtime::Runtime::new().unwrap(); + let client = Arc::new(test_client::new()); let p = Author { - client: Arc::new(test_client::new()), - pool: Arc::new(DummyTxPool::new(Default::default(), TestApi)), + client: client.clone(), + pool: Arc::new(Pool::new(Default::default(), ChainApi::new(client))), subscriptions: Subscriptions::new(runtime.executor()), }; + let h: H256 = hex!("e10ad66bce51ef3e2a1167934ce3740d2d8c703810f9b314e89f2e783f75e826").into(); assert_matches!( - AuthorApi::submit_extrinsic(&p, uxt(5, 1).encode().into()), - Ok(1) + AuthorApi::submit_extrinsic(&p, uxt(Keyring::Alice, 1).encode().into()), + Ok(h2) if h == h2 ); assert!( - AuthorApi::submit_extrinsic(&p, uxt(5, 1).encode().into()).is_err() + AuthorApi::submit_extrinsic(&p, uxt(Keyring::Alice, 1).encode().into()).is_err() ); } #[test] fn submit_rich_transaction_should_not_cause_error() { let runtime = runtime::Runtime::new().unwrap(); + let client = Arc::new(test_client::new()); let p = Author { - client: Arc::new(test_client::new()), - pool: Arc::new(DummyTxPool::new(Default::default(), TestApi)), + client: client.clone(), + pool: Arc::new(Pool::new(Default::default(), ChainApi::new(client.clone()))), subscriptions: Subscriptions::new(runtime.executor()), }; + let h: H256 = hex!("fccc48291473c53746cd267cf848449edd7711ee6511fba96919d5f9f4859e4f").into(); assert_matches!( - AuthorApi::submit_rich_extrinsic(&p, uxt(5, 0)), - Ok(0) + AuthorApi::submit_rich_extrinsic(&p, uxt(Keyring::Alice, 0)), + Ok(h2) if h == h2 ); assert!( - AuthorApi::submit_rich_extrinsic(&p, uxt(5, 0)).is_err() + AuthorApi::submit_rich_extrinsic(&p, uxt(Keyring::Alice, 0)).is_err() ); } @@ -143,40 +83,52 @@ fn submit_rich_transaction_should_not_cause_error() { fn should_watch_extrinsic() { //given let mut runtime = runtime::Runtime::new().unwrap(); - let pool = Arc::new(DummyTxPool::new(Default::default(), TestApi)); + let client = Arc::new(test_client::new()); + let pool = Arc::new(Pool::new(Default::default(), ChainApi::new(client.clone()))); let p = Author { - client: Arc::new(test_client::new()), + client, pool: pool.clone(), subscriptions: Subscriptions::new(runtime.executor()), }; let (subscriber, id_rx, data) = ::jsonrpc_macros::pubsub::Subscriber::new_test("test"); // when - p.watch_extrinsic(Default::default(), subscriber, uxt(5, 5).encode().into()); + p.watch_extrinsic(Default::default(), subscriber, uxt(Keyring::Alice, 0).encode().into()); // then assert_eq!(runtime.block_on(id_rx), Ok(Ok(1.into()))); // check notifications - AuthorApi::submit_rich_extrinsic(&p, uxt(5, 1)).unwrap(); + let replacement = { + let tx = Transfer { + amount: 5, + nonce: 0, + from: Keyring::Alice.to_raw_public().into(), + to: Default::default(), + }; + let signature = Keyring::from_raw_public(tx.from.0).unwrap().sign(&tx.encode()).into(); + Extrinsic { transfer: tx, signature } + }; + AuthorApi::submit_rich_extrinsic(&p, replacement).unwrap(); assert_eq!( runtime.block_on(data.into_future()).unwrap().0, - Some(r#"{"jsonrpc":"2.0","method":"test","params":{"result":{"usurped":1},"subscription":1}}"#.into()) + Some(r#"{"jsonrpc":"2.0","method":"test","params":{"result":{"usurped":"0xed454dcee51431679c2559403187a56567fded1fc50b6ae3aada87c1d412df5c"},"subscription":1}}"#.into()) ); } #[test] fn should_return_pending_extrinsics() { let runtime = runtime::Runtime::new().unwrap(); - let pool = Arc::new(DummyTxPool::new(Default::default(), TestApi)); + let client = Arc::new(test_client::new()); + let pool = Arc::new(Pool::new(Default::default(), ChainApi::new(client.clone()))); let p = Author { - client: Arc::new(test_client::new()), + client, pool: pool.clone(), subscriptions: Subscriptions::new(runtime.executor()), }; - let ex = uxt(5, 1); + let ex = uxt(Keyring::Alice, 0); AuthorApi::submit_rich_extrinsic(&p, ex.clone()).unwrap(); assert_matches!( p.pending_extrinsics(), - Ok(ref expected) if expected.get(&5) == Some(&vec![ex]) + Ok(ref expected) if expected == &vec![ex] ); } diff --git a/substrate/core/rpc/src/lib.rs b/substrate/core/rpc/src/lib.rs index c1d2fed445..9e4bd96978 100644 --- a/substrate/core/rpc/src/lib.rs +++ b/substrate/core/rpc/src/lib.rs @@ -42,6 +42,9 @@ extern crate log; #[macro_use] extern crate assert_matches; #[cfg(test)] +#[macro_use] +extern crate hex_literal; +#[cfg(test)] extern crate substrate_test_client as test_client; #[cfg(test)] extern crate rustc_hex; diff --git a/substrate/core/service/src/components.rs b/substrate/core/service/src/components.rs index 31d7ddd1eb..531a6848db 100644 --- a/substrate/core/service/src/components.rs +++ b/substrate/core/service/src/components.rs @@ -28,7 +28,7 @@ use client::{self, Client}; use {error, Service}; use network::{self, OnDemand}; use substrate_executor::{NativeExecutor, NativeExecutionDispatch}; -use transaction_pool::{self, Options as TransactionPoolOptions, Pool as TransactionPool}; +use transaction_pool::txpool::{self, Options as TransactionPoolOptions, Pool as TransactionPool}; use runtime_primitives::{traits::Block as BlockT, traits::Header as HeaderT, BuildStorage}; use config::Configuration; use primitives::{Blake2Hasher}; @@ -105,7 +105,7 @@ pub type ComponentClient = Client< pub type ComponentBlock = <::Factory as ServiceFactory>::Block; /// Extrinsic hash type for `Components` -pub type ComponentExHash = <::TransactionPoolApi as transaction_pool::ChainApi>::Hash; +pub type ComponentExHash = <::TransactionPoolApi as txpool::ChainApi>::Hash; /// Extrinsic type. pub type ComponentExtrinsic = as BlockT>::Extrinsic; @@ -128,9 +128,9 @@ pub trait ServiceFactory: 'static + Sized { /// Chain runtime. type RuntimeDispatch: NativeExecutionDispatch + Send + Sync + 'static; /// Extrinsic pool backend type for the full client. - type FullTransactionPoolApi: transaction_pool::ChainApi + Send + 'static; + type FullTransactionPoolApi: txpool::ChainApi + Send + 'static; /// Extrinsic pool backend type for the light client. - type LightTransactionPoolApi: transaction_pool::ChainApi + 'static; + type LightTransactionPoolApi: txpool::ChainApi + 'static; /// Genesis configuration for the runtime. type Genesis: RuntimeGenesis; /// Other configuration for service members. @@ -169,7 +169,7 @@ pub trait Components: 'static { /// Client executor. type Executor: 'static + client::CallExecutor, Blake2Hasher> + Send + Sync; /// Extrinsic pool type. - type TransactionPoolApi: 'static + transaction_pool::ChainApi< + type TransactionPoolApi: 'static + txpool::ChainApi< Hash = ::ExtrinsicHash, Block = FactoryBlock >; diff --git a/substrate/core/service/src/config.rs b/substrate/core/service/src/config.rs index 1423387229..00866d9563 100644 --- a/substrate/core/service/src/config.rs +++ b/substrate/core/service/src/config.rs @@ -39,7 +39,7 @@ pub struct Configuration { /// Node roles. pub roles: Roles, /// Extrinsic pool configuration. - pub transaction_pool: transaction_pool::Options, + pub transaction_pool: transaction_pool::txpool::Options, /// Network configuration. pub network: NetworkConfiguration, /// Path to key files. @@ -113,7 +113,7 @@ pub fn platform() -> String { let env_dash = if env.is_empty() { "" } else { "-" }; format!("{}-{}{}{}", Target::arch(), Target::os(), env_dash, env) } - + /// Returns full version string, using supplied version and commit. pub fn full_version_from_strs(impl_version: &str, impl_commit: &str) -> String { let commit_dash = if impl_commit.is_empty() { "" } else { "-" }; diff --git a/substrate/core/service/src/lib.rs b/substrate/core/service/src/lib.rs index 91b39bcae0..0528116694 100644 --- a/substrate/core/service/src/lib.rs +++ b/substrate/core/service/src/lib.rs @@ -75,7 +75,7 @@ use codec::{Encode, Decode}; pub use self::error::{ErrorKind, Error}; pub use config::{Configuration, Roles, PruningMode}; pub use chain_spec::ChainSpec; -pub use transaction_pool::{Pool as TransactionPool, Options as TransactionPoolOptions, ChainApi, VerifiedTransaction, IntoPoolError}; +pub use transaction_pool::txpool::{self, Pool as TransactionPool, Options as TransactionPoolOptions, ChainApi, IntoPoolError}; pub use client::ExecutionStrategy; pub use components::{ServiceFactory, FullBackend, FullExecutor, LightBackend, @@ -116,6 +116,8 @@ pub fn new_client(config: FactoryFullConfig impl Service where Components: components::Components, + txpool::ExHash: serde::de::DeserializeOwned + serde::Serialize, + txpool::ExtrinsicFor: serde::de::DeserializeOwned + serde::Serialize, { /// Creates a new service. pub fn new( @@ -196,7 +198,7 @@ impl Service if let Some(network) = network.upgrade() { network.on_block_imported(notification.hash, ¬ification.header); } - txpool.cull(&BlockId::hash(notification.hash)) + txpool.prune_tags(&BlockId::hash(notification.hash), notification.tags) .map_err(|e| warn!("Error removing extrinsics: {:?}", e))?; Ok(()) }) @@ -288,7 +290,11 @@ impl Service _telemetry: telemetry, }) } +} +impl Service where + Components: components::Components, +{ /// Get shared client instance. pub fn client(&self) -> Arc> { self.client.clone() @@ -386,21 +392,13 @@ impl TransactionPoolAdapter { impl network::TransactionPool, ComponentBlock> for TransactionPoolAdapter { fn transactions(&self) -> Vec<(ComponentExHash, ComponentExtrinsic)> { - let best_block_id = match self.best_block_id() { - Some(id) => id, - None => return vec![], - }; - self.pool.cull_and_get_pending(&best_block_id, |pending| pending + self.pool.ready(|pending| pending .map(|t| { - let hash = t.hash().clone(); - let ex: ComponentExtrinsic = t.original.clone(); + let hash = t.hash.clone(); + let ex: ComponentExtrinsic = t.data.raw.clone(); (hash, ex) }) - .collect() - ).unwrap_or_else(|e| { - warn!("Error retrieving pending set: {}", e); - vec![] - }) + .collect()) } fn import(&self, transaction: &ComponentExtrinsic) -> Option> { @@ -412,13 +410,12 @@ impl network::TransactionPool, ComponentBlock< let encoded = transaction.encode(); if let Some(uxt) = Decode::decode(&mut &encoded[..]) { let best_block_id = self.best_block_id()?; + let hash = self.pool.hash_of(&uxt); match self.pool.submit_one(&best_block_id, uxt) { - Ok(xt) => Some(*xt.hash()), + Ok(hash) => Some(hash), Err(e) => match e.into_pool_error() { Ok(e) => match e.kind() { - transaction_pool::ErrorKind::AlreadyImported(hash) => - Some(::std::str::FromStr::from_str(&hash[2..]).map_err(|_| {}) - .expect("Hash string is always valid")), + txpool::error::ErrorKind::AlreadyImported => Some(hash), _ => { debug!("Error adding transaction to the pool: {:?}", e); None @@ -427,7 +424,7 @@ impl network::TransactionPool, ComponentBlock< Err(e) => { debug!("Error converting pool error: {:?}", e); None - } + }, } } } else { diff --git a/substrate/core/service/test/src/lib.rs b/substrate/core/service/test/src/lib.rs index 929ca480dc..2a61f5dd60 100644 --- a/substrate/core/service/test/src/lib.rs +++ b/substrate/core/service/test/src/lib.rs @@ -247,7 +247,7 @@ where let best_block = BlockId::number(first_service.client().info().unwrap().chain.best_number); first_service.transaction_pool().submit_one(&best_block, extrinsic_factory(&first_service)).unwrap(); network.run_until_all_full(|_index, service| - service.transaction_pool().all().len() == 1 + service.transaction_pool().all(usize::max_value()).len() == 1 ); } diff --git a/substrate/core/sr-api/src/lib.rs b/substrate/core/sr-api/src/lib.rs index e66d88ef3d..838fcb5832 100644 --- a/substrate/core/sr-api/src/lib.rs +++ b/substrate/core/sr-api/src/lib.rs @@ -25,7 +25,7 @@ pub extern crate parity_codec as codec; extern crate sr_version as runtime_version; #[doc(hidden)] -pub use primitives::{traits::Block as BlockT, generic::BlockId, ApplyResult}; +pub use primitives::{traits::Block as BlockT, generic::BlockId, transaction_validity::TransactionValidity, ApplyResult}; use runtime_version::RuntimeVersion; use rstd::vec::Vec; #[doc(hidden)] @@ -485,7 +485,7 @@ decl_apis! { /// #[macro_use] /// extern crate sr_api as runtime_api; /// -/// use runtime_api::runtime::{Core, OldTxQueue}; +/// use runtime_api::runtime::{Core, TaggedTransactionQueue}; /// /// impl_apis! { /// impl Core for Runtime { @@ -498,13 +498,10 @@ decl_apis! { /// } /// } /// -/// impl OldTxQueue for Runtime { -/// fn account_nonce(account: AccountId) -> Index { -/// 0 -/// } -/// fn lookup_address(address: Address) -> Option { -/// None -/// } +/// impl TaggedTransactionQueue for Runtime { +/// fn validate_transaction(tx: ::Extrinsic) -> TransactionValidity { +/// unimplemented!() +/// } /// } /// } /// diff --git a/substrate/core/sr-primitives/src/generic/header.rs b/substrate/core/sr-primitives/src/generic/header.rs index aa262a542b..6e5736908b 100644 --- a/substrate/core/sr-primitives/src/generic/header.rs +++ b/substrate/core/sr-primitives/src/generic/header.rs @@ -81,7 +81,6 @@ impl<'a, Number: 'a, Hash: 'a + HashT, DigestItem: 'a> Deserialize<'a> for Heade } } -// TODO [ToDr] Issue with bounds impl Decode for Header where Number: Decode, Hash: HashT, diff --git a/substrate/core/sr-primitives/src/generic/unchecked_mortal_extrinsic.rs b/substrate/core/sr-primitives/src/generic/unchecked_mortal_extrinsic.rs index 21fa47e498..abaa9e7a93 100644 --- a/substrate/core/sr-primitives/src/generic/unchecked_mortal_extrinsic.rs +++ b/substrate/core/sr-primitives/src/generic/unchecked_mortal_extrinsic.rs @@ -200,7 +200,7 @@ mod tests { const DUMMY_FUNCTION: u64 = 0; const DUMMY_ACCOUNTID: u64 = 0; - + type Ex = UncheckedMortalExtrinsic; type CEx = CheckedExtrinsic; diff --git a/substrate/core/sr-primitives/src/traits.rs b/substrate/core/sr-primitives/src/traits.rs index 28efbbbf4d..8f4cdf2106 100644 --- a/substrate/core/sr-primitives/src/traits.rs +++ b/substrate/core/sr-primitives/src/traits.rs @@ -78,7 +78,7 @@ pub trait BlockNumberToHash { type BlockNumber: Zero; /// The type of the hash. - type Hash; + type Hash: Encode; /// Get the hash for a given block number, or `None` if unknown. fn block_number_to_hash(&self, n: Self::BlockNumber) -> Option; diff --git a/substrate/core/sr-primitives/src/transaction_validity.rs b/substrate/core/sr-primitives/src/transaction_validity.rs index b68375943c..fc125e24a8 100644 --- a/substrate/core/sr-primitives/src/transaction_validity.rs +++ b/substrate/core/sr-primitives/src/transaction_validity.rs @@ -30,8 +30,14 @@ pub type TransactionTag = Vec; /// Information on a transaction's validity and, if valid, on how it relates to other transactions. #[derive(Clone, PartialEq, Eq, Encode, Decode)] +#[cfg_attr(feature = "std", derive(Debug))] pub enum TransactionValidity { Invalid, - Valid(TransactionPriority, Vec, Vec, TransactionLongevity), + Valid( + /* priority: */TransactionPriority, + /* requires: */Vec, + /* provides: */Vec, + /* longevity: */TransactionLongevity + ), Unknown, } diff --git a/substrate/core/test-runtime/src/lib.rs b/substrate/core/test-runtime/src/lib.rs index 1bba48c032..1957e12ea0 100644 --- a/substrate/core/test-runtime/src/lib.rs +++ b/substrate/core/test-runtime/src/lib.rs @@ -55,7 +55,7 @@ use codec::{Encode, Decode}; use runtime_api::runtime::*; use runtime_primitives::traits::{BlindCheckable, BlakeTwo256, Block as BlockT}; -use runtime_primitives::{ApplyResult, Ed25519Signature}; +use runtime_primitives::{ApplyResult, Ed25519Signature, transaction_validity::TransactionValidity}; use runtime_version::RuntimeVersion; pub use primitives::hash::H256; use primitives::AuthorityId; @@ -159,6 +159,7 @@ mod test_api { } } } + use test_api::runtime::TestAPI; struct Runtime; @@ -178,6 +179,12 @@ impl_apis! { } } + impl TaggedTransactionQueue for Runtime { + fn validate_transaction(utx: ::Extrinsic) -> TransactionValidity { + system::validate_transaction(utx) + } + } + impl BlockBuilder for Runtime { fn initialise_block(header: ::Header) { system::initialise_block(header) diff --git a/substrate/core/test-runtime/src/system.rs b/substrate/core/test-runtime/src/system.rs index 5d50e97135..0fbbcdf4de 100644 --- a/substrate/core/test-runtime/src/system.rs +++ b/substrate/core/test-runtime/src/system.rs @@ -18,14 +18,14 @@ //! and depositing logs. use rstd::prelude::*; -use runtime_io::{storage_root, enumerated_trie_root, storage_changes_root}; +use runtime_io::{storage_root, enumerated_trie_root, storage_changes_root, twox_128}; use runtime_support::storage::{self, StorageValue, StorageMap}; use runtime_primitives::traits::{Hash as HashT, BlakeTwo256, Digest as DigestT}; use runtime_primitives::generic; -use runtime_primitives::{ApplyError, ApplyOutcome, ApplyResult}; +use runtime_primitives::{ApplyError, ApplyOutcome, ApplyResult, transaction_validity::TransactionValidity}; use codec::{KeyedVec, Encode}; use super::{AccountId, BlockNumber, Extrinsic, H256 as Hash, Block, Header, Digest}; -use primitives::Blake2Hasher; +use primitives::{Blake2Hasher}; use primitives::storage::well_known_keys; const NONCE_OF: &[u8] = b"nonce:"; @@ -99,6 +99,47 @@ pub fn execute_block(block: Block) { assert!(digest == header.digest, "Header digest items must match that calculated."); } +/// Execute a transaction outside of the block execution function. +/// This doesn't attempt to validate anything regarding the block. +pub fn validate_transaction(utx: Extrinsic) -> TransactionValidity { + let tx = match check_signature(&utx) { + Ok(tx) => tx, + Err(_) => return TransactionValidity::Invalid, + }; + + let nonce_key = tx.from.to_keyed_vec(NONCE_OF); + let expected_nonce: u64 = storage::get_or(&nonce_key, 0); + if tx.nonce < expected_nonce { + return TransactionValidity::Invalid; + } + if tx.nonce > expected_nonce + 64 { + return TransactionValidity::Unknown; + } + + let hash = |from: &AccountId, nonce: u64| { + twox_128(&nonce.to_keyed_vec(&*from)).to_vec() + }; + let requires = if tx.nonce != expected_nonce && tx.nonce > 0 { + let mut deps = Vec::new(); + deps.push(hash(&tx.from, tx.nonce - 1)); + deps + } else { Vec::new() }; + + let provides = { + let mut p = Vec::new(); + p.push(hash(&tx.from, tx.nonce)); + p + }; + + TransactionValidity::Valid( + /* priority: */tx.amount, + requires, + provides, + /* longevity: */64 + ) +} + + /// Execute a transaction outside of the block execution function. /// This doesn't attempt to validate anything regarding the block. pub fn execute_transaction(utx: Extrinsic) -> ApplyResult { @@ -135,16 +176,21 @@ pub fn finalise_block() -> Header { } } -fn execute_transaction_backend(utx: &Extrinsic) -> ApplyResult { +#[inline(always)] +fn check_signature(utx: &Extrinsic) -> Result<::Transfer, ApplyError> { use runtime_primitives::traits::BlindCheckable; - // check signature let utx = match utx.clone().check() { Ok(tx) => tx, Err(_) => return Err(ApplyError::BadSignature), }; - let tx: ::Transfer = utx.transfer; + Ok(utx.transfer) +} + +fn execute_transaction_backend(utx: &Extrinsic) -> ApplyResult { + // check signature + let tx = check_signature(utx)?; // check nonce let nonce_key = tx.from.to_keyed_vec(NONCE_OF); diff --git a/substrate/core/transaction-pool/Cargo.toml b/substrate/core/transaction-pool/Cargo.toml index 5ce0255fd9..85ecd32cd2 100644 --- a/substrate/core/transaction-pool/Cargo.toml +++ b/substrate/core/transaction-pool/Cargo.toml @@ -4,16 +4,16 @@ version = "0.1.0" authors = ["Parity Technologies "] [dependencies] -serde = "1.0" -serde_derive = "1.0" error-chain = "0.12" futures = "0.1" log = "0.4" +parity-codec = "2.0" parking_lot = "0.4" -transaction-pool = "1.13.3" -sr-primitives = { path = "../../core/sr-primitives" } +sr-primitives = { path = "../sr-primitives" } +substrate-client = { path = "../client" } +substrate-primitives = { path = "../primitives" } +substrate-transaction-graph = { path = "./graph" } [dev-dependencies] substrate-test-client = { path = "../../core/test-client" } substrate-keyring = { path = "../../core/keyring" } -parity-codec = "2.0" diff --git a/substrate/core/transaction-graph/Cargo.toml b/substrate/core/transaction-pool/graph/Cargo.toml similarity index 58% rename from substrate/core/transaction-graph/Cargo.toml rename to substrate/core/transaction-pool/graph/Cargo.toml index dbf3def19d..d537be2bb4 100644 --- a/substrate/core/transaction-graph/Cargo.toml +++ b/substrate/core/transaction-pool/graph/Cargo.toml @@ -5,5 +5,9 @@ authors = ["Parity Technologies "] [dependencies] error-chain = "0.12" +futures = "0.1" log = "0.4" -sr-primitives = { path = "../sr-primitives" } +parking_lot = "0.4" +serde = "1.0" +serde_derive = "1.0" +sr-primitives = { path = "../../sr-primitives" } diff --git a/substrate/core/transaction-pool/graph/README.adoc b/substrate/core/transaction-pool/graph/README.adoc new file mode 100644 index 0000000000..6746537217 --- /dev/null +++ b/substrate/core/transaction-pool/graph/README.adoc @@ -0,0 +1,13 @@ + += transaction-graph + +.Summary +[source, toml] +---- +include::Cargo.toml[lines=2..5] +---- + +.Description +---- +include::src/lib.rs[tag=description] +---- diff --git a/substrate/core/transaction-graph/src/pool.rs b/substrate/core/transaction-pool/graph/src/base_pool.rs similarity index 85% rename from substrate/core/transaction-graph/src/pool.rs rename to substrate/core/transaction-pool/graph/src/base_pool.rs index 5dcb55e8f0..a416efa531 100644 --- a/substrate/core/transaction-graph/src/pool.rs +++ b/substrate/core/transaction-pool/graph/src/base_pool.rs @@ -14,6 +14,10 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . +//! A basic version of the dependency graph. +//! +//! For a more full-featured pool, have a look at the `pool` module. + use std::{ hash, sync::Arc, @@ -30,11 +34,12 @@ use error; use future::{FutureTransactions, WaitingTransaction}; use ready::ReadyTransactions; +/// Block number type. pub type BlockNumber = u64; /// Successful import result. #[derive(Debug, PartialEq, Eq)] -pub enum Imported { +pub enum Imported { /// Transaction was successfuly imported to Ready queue. Ready { /// Hash of transaction that was successfuly imported. @@ -44,7 +49,7 @@ pub enum Imported { /// Transactions that failed to be promoted from the Future queue and are now discarded. failed: Vec, /// Transactions removed from the Ready pool (replaced). - removed: Vec>>, + removed: Vec>>, }, /// Transaction was successfuly imported to Future queue. Future { @@ -53,23 +58,34 @@ pub enum Imported { } } +impl Imported { + /// Returns the hash of imported transaction. + pub fn hash(&self) -> &Hash { + use self::Imported::*; + match *self { + Ready { ref hash, .. } => hash, + Future { ref hash, .. } => hash, + } + } +} + /// Status of pruning the queue. #[derive(Debug)] -pub struct PruneStatus { +pub struct PruneStatus { /// A list of imports that satisfying the tag triggered. - pub promoted: Vec>, + pub promoted: Vec>, /// A list of transactions that failed to be promoted and now are discarded. pub failed: Vec, /// A list of transactions that got pruned from the ready queue. - pub pruned: Vec>>, + pub pruned: Vec>>, } /// Immutable transaction #[cfg_attr(test, derive(Clone))] -#[derive(Debug, PartialEq, Eq)] -pub struct Transaction { +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct Transaction { /// Raw extrinsic representing that transaction. - pub ex: Vec, + pub data: Extrinsic, /// Transaction hash (unique) pub hash: Hash, /// Transaction priority (higher = better) @@ -92,13 +108,22 @@ pub struct Transaction { /// as-is for the second time will fail or produce unwanted results. /// Most likely it is required to revalidate them and recompute set of /// required tags. -#[derive(Default, Debug)] -pub struct Pool { - future: FutureTransactions, - ready: ReadyTransactions, +#[derive(Debug)] +pub struct BasePool { + future: FutureTransactions, + ready: ReadyTransactions, } -impl Pool { +impl Default for BasePool { + fn default() -> Self { + BasePool { + future: Default::default(), + ready: Default::default(), + } + } +} + +impl BasePool { /// Imports transaction to the pool. /// /// The pool consists of two parts: Future and Ready. @@ -109,8 +134,8 @@ impl Pool { pub fn import( &mut self, block_number: BlockNumber, - tx: Transaction, - ) -> error::Result> { + tx: Transaction, + ) -> error::Result> { if self.future.contains(&tx.hash) || self.ready.contains(&tx.hash) { bail!(error::ErrorKind::AlreadyImported) } @@ -132,7 +157,7 @@ impl Pool { /// Imports transaction to ready queue. /// /// NOTE the transaction has to have all requirements satisfied. - fn import_to_ready(&mut self, block_number: BlockNumber, tx: WaitingTransaction) -> error::Result> { + fn import_to_ready(&mut self, block_number: BlockNumber, tx: WaitingTransaction) -> error::Result> { let hash = tx.transaction.hash.clone(); let mut promoted = vec![]; let mut failed = vec![]; @@ -195,7 +220,7 @@ impl Pool { } /// Returns an iterator over ready transactions in the pool. - pub fn ready<'a>(&'a self) -> impl Iterator>> + 'a { + pub fn ready<'a, 'b: 'a>(&'b self) -> impl Iterator>> + 'a { self.ready.get() } @@ -207,7 +232,7 @@ impl Pool { /// they were part of a chain, you may attempt to re-import them later. /// NOTE If you want to remove ready transactions that were already used /// and you don't want them to be stored in the pool use `prune_tags` method. - pub fn remove_invalid(&mut self, hashes: &[Hash]) -> Vec>> { + pub fn remove_invalid(&mut self, hashes: &[Hash]) -> Vec>> { let mut removed = self.ready.remove_invalid(hashes); removed.extend(self.future.remove(hashes).into_iter().map(Arc::new)); removed @@ -219,7 +244,7 @@ impl Pool { /// but unlike `remove_invalid`, dependent transactions are not touched. /// Additional transactions from future queue might be promoted to ready if you satisfy tags /// that the pool didn't previously know about. - pub fn prune_tags(&mut self, block_number: BlockNumber, tags: impl IntoIterator) -> PruneStatus { + pub fn prune_tags(&mut self, block_number: BlockNumber, tags: impl IntoIterator) -> PruneStatus { let mut to_import = vec![]; let mut pruned = vec![]; @@ -249,6 +274,22 @@ impl Pool { promoted, } } + + /// Get pool status. + pub fn status(&self) -> Status { + Status { + ready: self.ready.len(), + future: self.future.len(), + } + } +} + +/// Pool status +pub struct Status { + /// Number of transactions in the ready queue. + pub ready: usize, + /// Number of transactions in the future queue. + pub future: usize, } #[cfg(test)] @@ -257,8 +298,8 @@ mod tests { type Hash = u64; - fn pool() -> Pool { - Pool::default() + fn pool() -> BasePool> { + BasePool::default() } #[test] @@ -268,7 +309,7 @@ mod tests { // when pool.import(1, Transaction { - ex: vec![1u8], + data: vec![1u8], hash: 1u64, priority: 5u64, longevity: 64u64, @@ -288,7 +329,7 @@ mod tests { // when pool.import(1, Transaction { - ex: vec![1u8], + data: vec![1u8], hash: 1, priority: 5u64, longevity: 64u64, @@ -296,7 +337,7 @@ mod tests { provides: vec![vec![1]], }).unwrap(); pool.import(1, Transaction { - ex: vec![1u8], + data: vec![1u8], hash: 1, priority: 5u64, longevity: 64u64, @@ -317,7 +358,7 @@ mod tests { // when pool.import(1, Transaction { - ex: vec![1u8], + data: vec![1u8], hash: 1, priority: 5u64, longevity: 64u64, @@ -327,7 +368,7 @@ mod tests { assert_eq!(pool.ready().count(), 0); assert_eq!(pool.ready.len(), 0); pool.import(1, Transaction { - ex: vec![2u8], + data: vec![2u8], hash: 2, priority: 5u64, longevity: 64u64, @@ -347,7 +388,7 @@ mod tests { // when pool.import(1, Transaction { - ex: vec![1u8], + data: vec![1u8], hash: 1, priority: 5u64, longevity: 64u64, @@ -355,7 +396,7 @@ mod tests { provides: vec![vec![1]], }).unwrap(); pool.import(1, Transaction { - ex: vec![3u8], + data: vec![3u8], hash: 3, priority: 5u64, longevity: 64u64, @@ -363,7 +404,7 @@ mod tests { provides: vec![], }).unwrap(); pool.import(1, Transaction { - ex: vec![2u8], + data: vec![2u8], hash: 2, priority: 5u64, longevity: 64u64, @@ -371,7 +412,7 @@ mod tests { provides: vec![vec![3], vec![2]], }).unwrap(); pool.import(1, Transaction { - ex: vec![4u8], + data: vec![4u8], hash: 4, priority: 1_000u64, longevity: 64u64, @@ -382,7 +423,7 @@ mod tests { assert_eq!(pool.ready.len(), 0); let res = pool.import(1, Transaction { - ex: vec![5u8], + data: vec![5u8], hash: 5, priority: 5u64, longevity: 64u64, @@ -391,7 +432,7 @@ mod tests { }).unwrap(); // then - let mut it = pool.ready().into_iter().map(|tx| tx.ex[0]); + let mut it = pool.ready().into_iter().map(|tx| tx.data[0]); assert_eq!(it.next(), Some(5)); assert_eq!(it.next(), Some(1)); @@ -412,7 +453,7 @@ mod tests { // given let mut pool = pool(); pool.import(1, Transaction { - ex: vec![1u8], + data: vec![1u8], hash: 1, priority: 5u64, longevity: 64u64, @@ -420,7 +461,7 @@ mod tests { provides: vec![vec![1]], }).unwrap(); pool.import(1, Transaction { - ex: vec![3u8], + data: vec![3u8], hash: 3, priority: 5u64, longevity: 64u64, @@ -432,7 +473,7 @@ mod tests { // when pool.import(1, Transaction { - ex: vec![2u8], + data: vec![2u8], hash: 2, priority: 5u64, longevity: 64u64, @@ -442,7 +483,7 @@ mod tests { // then { - let mut it = pool.ready().into_iter().map(|tx| tx.ex[0]); + let mut it = pool.ready().into_iter().map(|tx| tx.data[0]); assert_eq!(it.next(), None); } // all transactions occupy the Future queue - it's fine @@ -450,14 +491,14 @@ mod tests { // let's close the cycle with one additional transaction let res = pool.import(1, Transaction { - ex: vec![4u8], + data: vec![4u8], hash: 4, priority: 50u64, longevity: 64u64, requires: vec![], provides: vec![vec![0]], }).unwrap(); - let mut it = pool.ready().into_iter().map(|tx| tx.ex[0]); + let mut it = pool.ready().into_iter().map(|tx| tx.data[0]); assert_eq!(it.next(), Some(4)); assert_eq!(it.next(), Some(1)); assert_eq!(it.next(), Some(3)); @@ -477,7 +518,7 @@ mod tests { // given let mut pool = pool(); pool.import(1, Transaction { - ex: vec![1u8], + data: vec![1u8], hash: 1, priority: 5u64, longevity: 64u64, @@ -485,7 +526,7 @@ mod tests { provides: vec![vec![1]], }).unwrap(); pool.import(1, Transaction { - ex: vec![3u8], + data: vec![3u8], hash: 3, priority: 5u64, longevity: 64u64, @@ -497,7 +538,7 @@ mod tests { // when pool.import(1, Transaction { - ex: vec![2u8], + data: vec![2u8], hash: 2, priority: 5u64, longevity: 64u64, @@ -507,7 +548,7 @@ mod tests { // then { - let mut it = pool.ready().into_iter().map(|tx| tx.ex[0]); + let mut it = pool.ready().into_iter().map(|tx| tx.data[0]); assert_eq!(it.next(), None); } // all transactions occupy the Future queue - it's fine @@ -515,14 +556,14 @@ mod tests { // let's close the cycle with one additional transaction let err = pool.import(1, Transaction { - ex: vec![4u8], + data: vec![4u8], hash: 4, priority: 1u64, // lower priority than Tx(2) longevity: 64u64, requires: vec![], provides: vec![vec![0]], }).unwrap_err(); - let mut it = pool.ready().into_iter().map(|tx| tx.ex[0]); + let mut it = pool.ready().into_iter().map(|tx| tx.data[0]); assert_eq!(it.next(), None); assert_eq!(pool.ready.len(), 0); assert_eq!(pool.future.len(), 0); @@ -537,7 +578,7 @@ mod tests { // given let mut pool = pool(); pool.import(1, Transaction { - ex: vec![5u8], + data: vec![5u8], hash: 5, priority: 5u64, longevity: 64u64, @@ -545,7 +586,7 @@ mod tests { provides: vec![vec![0], vec![4]], }).unwrap(); pool.import(1, Transaction { - ex: vec![1u8], + data: vec![1u8], hash: 1, priority: 5u64, longevity: 64u64, @@ -553,7 +594,7 @@ mod tests { provides: vec![vec![1]], }).unwrap(); pool.import(1, Transaction { - ex: vec![3u8], + data: vec![3u8], hash: 3, priority: 5u64, longevity: 64u64, @@ -561,7 +602,7 @@ mod tests { provides: vec![], }).unwrap(); pool.import(1, Transaction { - ex: vec![2u8], + data: vec![2u8], hash: 2, priority: 5u64, longevity: 64u64, @@ -569,7 +610,7 @@ mod tests { provides: vec![vec![3], vec![2]], }).unwrap(); pool.import(1, Transaction { - ex: vec![4u8], + data: vec![4u8], hash: 4, priority: 1_000u64, longevity: 64u64, @@ -578,7 +619,7 @@ mod tests { }).unwrap(); // future pool.import(1, Transaction { - ex: vec![6u8], + data: vec![6u8], hash: 6, priority: 1_000u64, longevity: 64u64, @@ -603,7 +644,7 @@ mod tests { let mut pool = pool(); // future (waiting for 0) pool.import(1, Transaction { - ex: vec![5u8], + data: vec![5u8], hash: 5, priority: 5u64, longevity: 64u64, @@ -612,7 +653,7 @@ mod tests { }).unwrap(); // ready pool.import(1, Transaction { - ex: vec![1u8], + data: vec![1u8], hash: 1, priority: 5u64, longevity: 64u64, @@ -620,7 +661,7 @@ mod tests { provides: vec![vec![1]], }).unwrap(); pool.import(1, Transaction { - ex: vec![2u8], + data: vec![2u8], hash: 2, priority: 5u64, longevity: 64u64, @@ -628,7 +669,7 @@ mod tests { provides: vec![vec![3]], }).unwrap(); pool.import(1, Transaction { - ex: vec![3u8], + data: vec![3u8], hash: 3, priority: 5u64, longevity: 64u64, @@ -636,7 +677,7 @@ mod tests { provides: vec![vec![2]], }).unwrap(); pool.import(1, Transaction { - ex: vec![4u8], + data: vec![4u8], hash: 4, priority: 1_000u64, longevity: 64u64, diff --git a/substrate/core/transaction-graph/src/error.rs b/substrate/core/transaction-pool/graph/src/error.rs similarity index 57% rename from substrate/core/transaction-graph/src/error.rs rename to substrate/core/transaction-pool/graph/src/error.rs index bf25accac1..308d575a2c 100644 --- a/substrate/core/transaction-graph/src/error.rs +++ b/substrate/core/transaction-pool/graph/src/error.rs @@ -14,10 +14,27 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . +//! Transaction pool errors. + use sr_primitives::transaction_validity::TransactionPriority as Priority; error_chain! { errors { + /// Transaction is not verifiable yet, but might be in the future. + UnknownTransactionValidity { + description("Runtime cannot determine validity of the transaction yet."), + display("Unkown Transaction Validity"), + } + /// Transaction is invalid + InvalidTransaction { + description("Runtime check for the transaction failed."), + display("Invalid Transaction"), + } + /// The transaction is temporarily baned + TemporarilyBanned { + description("Transaction is temporarily banned from importing to the pool."), + display("Temporarily Banned"), + } /// The transaction is already in the pool. AlreadyImported { description("Transaction is already in the pool."), @@ -35,3 +52,17 @@ error_chain! { } } } + +/// Transaction pool error conversion. +pub trait IntoPoolError: ::std::error::Error + Send + Sized { + /// Try to extract original `Error` + /// + /// This implementation is optional and used only to + /// provide more descriptive error messages for end users + /// of RPC API. + fn into_pool_error(self) -> ::std::result::Result { Err(self) } +} + +impl IntoPoolError for Error { + fn into_pool_error(self) -> ::std::result::Result { Ok(self) } +} diff --git a/substrate/core/transaction-graph/src/future.rs b/substrate/core/transaction-pool/graph/src/future.rs similarity index 88% rename from substrate/core/transaction-graph/src/future.rs rename to substrate/core/transaction-pool/graph/src/future.rs index e69358daa7..80743c9c89 100644 --- a/substrate/core/transaction-graph/src/future.rs +++ b/substrate/core/transaction-pool/graph/src/future.rs @@ -23,23 +23,23 @@ use sr_primitives::transaction_validity::{ TransactionTag as Tag, }; -use pool::Transaction; +use base_pool::Transaction; /// Transaction with partially satisfied dependencies. #[derive(Debug)] -pub struct WaitingTransaction { +pub struct WaitingTransaction { /// Transaction details. - pub transaction: Transaction, + pub transaction: Transaction, /// Tags that are required and have not been satisfied yet by other transactions in the pool. pub missing_tags: HashSet, } -impl WaitingTransaction { +impl WaitingTransaction { /// Creates a new `WaitingTransaction`. /// /// Computes the set of missing tags based on the requirements and tags that /// are provided by all transactions in the ready queue. - pub fn new(transaction: Transaction, provided: &HashMap) -> Self { + pub fn new(transaction: Transaction, provided: &HashMap) -> Self { let missing_tags = transaction.requires .iter() .filter(|tag| !provided.contains_key(&**tag)) @@ -68,14 +68,14 @@ impl WaitingTransaction { /// Contains transactions that are still awaiting for some other transactions that /// could provide a tag that they require. #[derive(Debug)] -pub struct FutureTransactions { +pub struct FutureTransactions { /// tags that are not yet provided by any transaction and we await for them wanted_tags: HashMap>, /// Transactions waiting for a particular other transaction - waiting: HashMap>, + waiting: HashMap>, } -impl Default for FutureTransactions { +impl Default for FutureTransactions { fn default() -> Self { FutureTransactions { wanted_tags: Default::default(), @@ -91,14 +91,14 @@ every hash from `wanted_tags` is always present in `waiting`; qed #"; -impl FutureTransactions { +impl FutureTransactions { /// Import transaction to Future queue. /// /// Only transactions that don't have all their tags satisfied should occupy /// the Future queue. /// As soon as required tags are provided by some other transactions that are ready /// we should remove the transactions from here and move them to the Ready queue. - pub fn import(&mut self, tx: WaitingTransaction) { + pub fn import(&mut self, tx: WaitingTransaction) { assert!(!tx.is_ready(), "Transaction is ready."); assert!(!self.waiting.contains_key(&tx.transaction.hash), "Transaction is already imported."); @@ -121,7 +121,7 @@ impl FutureTransactions { /// /// Returns (and removes) transactions that became ready after their last tag got /// satisfied and now we can remove them from Future and move to Ready queue. - pub fn satisfy_tags>(&mut self, tags: impl IntoIterator) -> Vec> { + pub fn satisfy_tags>(&mut self, tags: impl IntoIterator) -> Vec> { let mut became_ready = vec![]; for tag in tags { @@ -148,7 +148,7 @@ impl FutureTransactions { /// Removes transactions for given list of hashes. /// /// Returns a list of actually removed transactions. - pub fn remove(&mut self, hashes: &[Hash]) -> Vec> { + pub fn remove(&mut self, hashes: &[Hash]) -> Vec> { let mut removed = vec![]; for hash in hashes { if let Some(waiting_tx) = self.waiting.remove(hash) { @@ -170,7 +170,6 @@ impl FutureTransactions { } /// Returns number of transactions in the Future queue. - #[cfg(test)] pub fn len(&self) -> usize { self.waiting.len() } diff --git a/substrate/core/transaction-graph/src/lib.rs b/substrate/core/transaction-pool/graph/src/lib.rs similarity index 72% rename from substrate/core/transaction-graph/src/lib.rs rename to substrate/core/transaction-pool/graph/src/lib.rs index a33c42a693..1cdd78cd28 100644 --- a/substrate/core/transaction-graph/src/lib.rs +++ b/substrate/core/transaction-pool/graph/src/lib.rs @@ -14,6 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . +// tag::description[] //! Generic Transaction Pool //! //! The pool is based on dependency graph between transactions @@ -23,23 +24,30 @@ //! //! TODO [ToDr] //! - [ ] Longevity handling (remove obsolete transactions periodically) -//! - [ ] Banning / Future-rotation (once rejected (as invalid) should not be accepted for some time) //! - [ ] Multi-threading (getting ready transactions should not block the pool) +// end::description[] #![warn(missing_docs)] #![warn(unused_extern_crates)] +extern crate futures; +extern crate parking_lot; extern crate sr_primitives; -#[macro_use] -extern crate error_chain; +#[macro_use] extern crate error_chain; +#[macro_use] extern crate log; +#[macro_use] extern crate serde_derive; -#[macro_use] -extern crate log; - -mod error; mod future; +mod listener; mod pool; mod ready; +mod rotator; -pub use self::pool::{Transaction, Pool}; +pub mod base_pool; +pub mod error; +pub mod watcher; + +pub use self::error::IntoPoolError; +pub use self::base_pool::{Transaction, Status}; +pub use self::pool::{Pool, Options, ChainApi, EventStream, ExtrinsicFor, BlockHash, ExHash, NumberFor, TransactionFor}; diff --git a/substrate/core/transaction-pool/src/listener.rs b/substrate/core/transaction-pool/graph/src/listener.rs similarity index 50% rename from substrate/core/transaction-pool/src/listener.rs rename to substrate/core/transaction-pool/graph/src/listener.rs index c86337db6a..14f540cbc4 100644 --- a/substrate/core/transaction-pool/src/listener.rs +++ b/substrate/core/transaction-pool/graph/src/listener.rs @@ -1,3 +1,4 @@ + // Copyright 2018 Parity Technologies (UK) Ltd. // This file is part of Substrate. @@ -15,53 +16,27 @@ // along with Substrate. If not, see . use std::{ - sync::Arc, - fmt, collections::HashMap, + hash, }; -use txpool; use watcher; - -/// Returns the hash of the latest block. -pub trait LatestHash { - type Hash: Clone; - - /// Hash of the latest block. - fn latest_hash(&self) -> Self::Hash; -} +use sr_primitives::traits; /// Extrinsic pool default listener. -pub struct Listener { - watchers: HashMap>, - chain: C, +pub struct Listener { + watchers: HashMap> } -impl Listener where - H: ::std::hash::Hash + Eq + Copy + fmt::Debug + fmt::LowerHex + Default, - C: LatestHash, -{ - /// Creates a new listener with given latest hash provider. - pub fn new(chain: C) -> Self { +impl Default for Listener { + fn default() -> Self { Listener { watchers: Default::default(), - chain, } } +} - /// Creates a new watcher for given verified extrinsic. - /// - /// The watcher can be used to subscribe to lifecycle events of that extrinsic. - pub fn create_watcher>(&mut self, xt: Arc) -> watcher::Watcher { - let sender = self.watchers.entry(*xt.hash()).or_insert_with(watcher::Sender::default); - sender.new_watcher() - } - - /// Notify the listeners about extrinsic broadcast. - pub fn broadcasted(&mut self, hash: &H, peers: Vec) { - self.fire(hash, |watcher| watcher.broadcast(peers)); - } - - fn fire(&mut self, hash: &H, fun: F) where F: FnOnce(&mut watcher::Sender) { +impl Listener { + fn fire(&mut self, hash: &H, fun: F) where F: FnOnce(&mut watcher::Sender) { let clean = if let Some(h) = self.watchers.get_mut(hash) { fun(h); h.is_done() @@ -73,41 +48,51 @@ impl Listener where self.watchers.remove(hash); } } -} -impl txpool::Listener for Listener where - H: ::std::hash::Hash + Eq + Copy + fmt::Debug + fmt::LowerHex + Default, - T: txpool::VerifiedTransaction, - C: LatestHash, -{ - fn added(&mut self, tx: &Arc, old: Option<&Arc>) { + /// Creates a new watcher for given verified extrinsic. + /// + /// The watcher can be used to subscribe to lifecycle events of that extrinsic. + pub fn create_watcher(&mut self, hash: H) -> watcher::Watcher { + let sender = self.watchers.entry(hash).or_insert_with(watcher::Sender::default); + sender.new_watcher() + } + + /// Notify the listeners about extrinsic broadcast. + pub fn broadcasted(&mut self, hash: &H, peers: Vec) { + self.fire(hash, |watcher| watcher.broadcast(peers)); + } + + /// New transaction was added to the ready pool or promoted from the future pool. + pub fn ready(&mut self, tx: &H, old: Option<&H>) { if let Some(old) = old { - let hash = tx.hash(); - self.fire(old.hash(), |watcher| watcher.usurped(*hash)); + self.fire(old, |watcher| watcher.usurped(tx.clone())); } } - fn dropped(&mut self, tx: &Arc, by: Option<&T>) { - self.fire(tx.hash(), |watcher| match by { - Some(t) => watcher.usurped(*t.hash()), + /// New transaction was added to the future pool. + pub fn future(&mut self, _tx: &H) { + } + + /// Transaction was dropped from the pool because of the limit. + pub fn dropped(&mut self, tx: &H, by: Option<&H>) { + self.fire(tx, |watcher| match by { + Some(t) => watcher.usurped(t.clone()), None => watcher.dropped(), }) } - fn rejected(&mut self, tx: &Arc, reason: &txpool::ErrorKind) { - warn!(target: "transaction-pool", "Extrinsic rejected ({}): {:?}", reason, tx); + /// Transaction was rejected from the pool. + pub fn rejected(&mut self, tx: &H, is_invalid: bool) { + warn!(target: "transaction-pool", "Extrinsic rejected ({}): {:?}", is_invalid, tx); } - fn invalid(&mut self, tx: &Arc) { + /// Transaction was removed as invalid. + pub fn invalid(&mut self, tx: &H) { warn!(target: "transaction-pool", "Extrinsic invalid: {:?}", tx); } - fn canceled(&mut self, tx: &Arc) { - debug!(target: "transaction-pool", "Extrinsic canceled: {:?}", tx); - } - - fn culled(&mut self, tx: &Arc) { - let header_hash = self.chain.latest_hash(); - self.fire(tx.hash(), |watcher| watcher.finalised(header_hash)) + /// Transaction was pruned from the pool. + pub fn pruned(&mut self, header_hash: H2, tx: &H) { + self.fire(tx, |watcher| watcher.finalised(header_hash)) } } diff --git a/substrate/core/transaction-pool/graph/src/pool.rs b/substrate/core/transaction-pool/graph/src/pool.rs new file mode 100644 index 0000000000..6e6975a23f --- /dev/null +++ b/substrate/core/transaction-pool/graph/src/pool.rs @@ -0,0 +1,333 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +use std::{ + collections::HashMap, + hash, + sync::Arc, + time, +}; + +use base_pool as base; +use error; +use listener::Listener; +use rotator::PoolRotator; +use watcher::Watcher; + +use futures::sync::mpsc; +use parking_lot::{Mutex, RwLock}; +use sr_primitives::{ + generic::BlockId, + traits::{self, As}, + transaction_validity::{TransactionValidity, TransactionTag as Tag}, +}; + +/// Modification notification event stream type; +pub type EventStream = mpsc::UnboundedReceiver<()>; + +/// Extrinsic hash type for a pool. +pub type ExHash = ::Hash; +/// Block hash type for a pool. +pub type BlockHash = <::Block as traits::Block>::Hash; +/// Extrinsic type for a pool. +pub type ExtrinsicFor = <::Block as traits::Block>::Extrinsic; +/// Block number type for the ChainApi +pub type NumberFor = traits::NumberFor<::Block>; +/// A type of transaction stored in the pool +pub type TransactionFor = Arc, TxData>>>; + +/// Concrete extrinsic validation and query logic. +pub trait ChainApi: Send + Sync { + /// Block type. + type Block: traits::Block; + /// Hash type + type Hash: hash::Hash + Eq + traits::Member; + /// Error type. + type Error: From + error::IntoPoolError; + + /// Verify extrinsic at given block. + fn validate_transaction(&self, at: &BlockId, uxt: &ExtrinsicFor) -> Result; + + /// Returns a block number given the block id. + fn block_id_to_number(&self, at: &BlockId) -> Result>, Self::Error>; + + /// Returns a block hash given the block id. + fn block_id_to_hash(&self, at: &BlockId) -> Result>, Self::Error>; + + /// Hash the extrinsic. + fn hash(&self, uxt: &ExtrinsicFor) -> Self::Hash; +} + +/// Maximum time the transaction will be kept in the pool. +/// +/// Transactions that don't get included within the limit are removed from the pool. +const POOL_TIME: time::Duration = time::Duration::from_secs(60 * 5); + +/// Additional transaction data +#[derive(Debug, Serialize, Deserialize)] +pub struct TxData { + /// Raw data stored by the user. + pub raw: Ex, + /// Transaction validity deadline. + /// TODO [ToDr] Should we use longevity instead? + #[serde(skip)] + pub valid_till: Option, +} + +/// Pool configuration options. +#[derive(Debug, Clone, Default)] +pub struct Options; + +/// Extrinsics pool. +pub struct Pool { + api: B, + listener: RwLock, BlockHash>>, + pool: RwLock, + TxData>, + >>, + import_notification_sinks: Mutex>>, + rotator: PoolRotator>, +} + +impl Pool { + + /// Imports a bunch of unverified extrinsics to the pool + pub fn submit_at(&self, at: &BlockId, xts: T) -> Result, B::Error>>, B::Error> where + T: IntoIterator> + { + let block_number = self.api.block_id_to_number(at)? + .ok_or_else(|| error::ErrorKind::Msg(format!("Invalid block id: {:?}", at)).into())?; + + Ok(xts + .into_iter() + .map(|xt| -> Result<_, B::Error> { + let hash = self.api.hash(&xt); + if self.rotator.is_banned(&hash) { + return Err(error::ErrorKind::TemporarilyBanned.into())?; + } + + match self.api.validate_transaction(at, &xt)? { + TransactionValidity::Valid(priority, requires, provides, longevity)=> { + Ok(base::Transaction { + data: TxData { + raw: xt, + valid_till: Some(time::Instant::now() + POOL_TIME), + }, + hash, + priority, + requires, + provides, + longevity, + }) + }, + TransactionValidity::Invalid => { + bail!(error::Error::from(error::ErrorKind::InvalidTransaction)) + }, + TransactionValidity::Unknown => { + self.listener.write().rejected(&hash, false); + bail!(error::Error::from(error::ErrorKind::UnknownTransactionValidity)) + }, + } + }) + .map(|tx| { + let imported = self.pool.write().import(block_number.as_(), tx?)?; + + self.import_notification_sinks.lock().retain(|sink| sink.unbounded_send(()).is_ok()); + + let mut listener = self.listener.write(); + fire_events(&mut *listener, &imported); + Ok(imported.hash().clone()) + }) + .collect()) + } + + /// Imports one unverified extrinsic to the pool + pub fn submit_one(&self, at: &BlockId, xt: ExtrinsicFor) -> Result, B::Error> { + Ok(self.submit_at(at, ::std::iter::once(xt))?.pop().expect("One extrinsic passed; one result returned; qed")?) + } + + /// Import a single extrinsic and starts to watch their progress in the pool. + pub fn submit_and_watch(&self, at: &BlockId, xt: ExtrinsicFor) -> Result, BlockHash>, B::Error> { + let xt = self.submit_one(at, xt)?; + Ok(self.listener.write().create_watcher(xt)) + } + + /// Prunes ready transactions that provide given list of tags. + pub fn prune_tags(&self, at: &BlockId, tags: impl IntoIterator) -> Result<(), B::Error> { + let block_number = self.api.block_id_to_number(at)? + .ok_or_else(|| error::ErrorKind::Msg(format!("Invalid block id: {:?}", at)).into())?; + + let status = self.pool.write().prune_tags(block_number.as_(), tags); + { + let mut listener = self.listener.write(); + for promoted in &status.promoted { + fire_events(&mut *listener, promoted); + } + for f in &status.failed { + listener.dropped(f, None); + } + } + // try to re-submit pruned transactions since some of them might be still valid. + let hashes = status.pruned.iter().map(|tx| tx.hash.clone()).collect::>(); + let results = self.submit_at(at, status.pruned.into_iter().map(|tx| tx.data.raw.clone()))?; + // Fire mined event for transactions that became invalid. + let hashes = results.into_iter().enumerate().filter_map(|(idx, r)| match r.map_err(error::IntoPoolError::into_pool_error) { + Err(Ok(err)) => match err.kind() { + error::ErrorKind::InvalidTransaction => Some(hashes[idx].clone()), + _ => None, + }, + _ => None, + }); + { + let header_hash = self.api.block_id_to_hash(at)? + .ok_or_else(|| error::ErrorKind::Msg(format!("Invalid block id: {:?}", at)).into())?; + let mut listener = self.listener.write(); + for h in hashes { + listener.pruned(header_hash, &h) + } + } + // clear old transactions + self.clear_stale(at)?; + Ok(()) + } + + /// Removes stale transactions from the pool. + /// + /// Stale transactions are transaction beyond their longevity period. + /// Note this function does not remove transactions that are already included in the chain. + /// See `prune_tags` ifyou want this. + pub fn clear_stale(&self, _at: &BlockId) -> Result<(), B::Error> { + let now = time::Instant::now(); + let to_remove = self.ready(|pending| pending + .filter(|tx| self.rotator.ban_if_stale(&now, &tx)) + .map(|tx| tx.hash.clone()) + .collect::>() + ); + // removing old transactions + self.remove_invalid(&to_remove); + // clear banned transactions timeouts + self.rotator.clear_timeouts(&now); + + Ok(()) + } +} + +impl Pool { + /// Create a new transaction pool. + /// TODO [ToDr] Options + pub fn new(_options: Options, api: B) -> Self { + Pool { + api, + listener: Default::default(), + pool: Default::default(), + import_notification_sinks: Default::default(), + rotator: Default::default(), + } + } + + /// Return an event stream of transactions imported to the pool. + pub fn import_notification_stream(&self) -> EventStream { + let (sink, stream) = mpsc::unbounded(); + self.import_notification_sinks.lock().push(sink); + stream + } + + /// Invoked when extrinsics are broadcasted. + pub fn on_broadcasted(&self, propagated: HashMap, Vec>) { + let mut listener = self.listener.write(); + for (hash, peers) in propagated.into_iter() { + listener.broadcasted(&hash, peers); + } + } + + /// Remove from the pool. + pub fn remove_invalid(&self, hashes: &[ExHash]) -> Vec> { + // temporarily ban invalid transactions + debug!(target: "txpool", "Banning invalid transactions: {:?}", hashes); + self.rotator.ban(&time::Instant::now(), hashes); + + let invalid = self.pool.write().remove_invalid(hashes); + + let mut listener = self.listener.write(); + for tx in &invalid { + listener.invalid(&tx.hash); + } + + invalid + } + + /// Get ready transactions ordered by priority + pub fn ready(&self, f: F) -> X where + F: FnOnce(&mut Iterator>) -> X, + { + let pool = self.pool.read(); + let mut ready = pool.ready(); + f(&mut ready) + } + + /// Returns all transactions in the pool. + /// + /// Be careful with large limit values, as querying the entire pool might be time consuming. + pub fn all(&self, limit: usize) -> Vec> { + self.ready(|it| it.take(limit).map(|ex| ex.data.raw.clone()).collect()) + } + + /// Returns pool status. + pub fn status(&self) -> base::Status { + self.pool.read().status() + } + + /// Returns transaction hash + pub fn hash_of(&self, xt: &ExtrinsicFor) -> ExHash { + self.api.hash(xt) + } +} + +fn fire_events( + listener: &mut Listener, + imported: &base::Imported, +) where + H: hash::Hash + Eq + traits::Member, + H2: Clone, +{ + match *imported { + base::Imported::Ready { ref promoted, ref failed, ref removed, ref hash } => { + listener.ready(hash, None); + for f in failed { + listener.rejected(f, true); + } + for r in removed { + listener.dropped(&r.hash, Some(hash)); + } + for p in promoted { + listener.ready(p, None); + } + }, + base::Imported::Future { ref hash } => { + listener.future(hash) + }, + } +} + +#[cfg(test)] +mod tests { + #[test] + #[ignore] + fn should_have_some_basic_tests() { + assert_eq!(true, false); + } +} diff --git a/substrate/core/transaction-graph/src/ready.rs b/substrate/core/transaction-pool/graph/src/ready.rs similarity index 90% rename from substrate/core/transaction-graph/src/ready.rs rename to substrate/core/transaction-pool/graph/src/ready.rs index fd239479e2..0da9d29930 100644 --- a/substrate/core/transaction-graph/src/ready.rs +++ b/substrate/core/transaction-pool/graph/src/ready.rs @@ -28,16 +28,26 @@ use sr_primitives::transaction_validity::{ use error; use future::WaitingTransaction; -use pool::{BlockNumber, Transaction}; +use base_pool::{BlockNumber, Transaction}; -#[derive(Debug, Clone)] -pub struct TransactionRef { - pub transaction: Arc>, +#[derive(Debug)] +pub struct TransactionRef { + pub transaction: Arc>, pub valid_till: BlockNumber, pub insertion_id: u64, } -impl Ord for TransactionRef { +impl Clone for TransactionRef { + fn clone(&self) -> Self { + TransactionRef { + transaction: self.transaction.clone(), + valid_till: self.valid_till, + insertion_id: self.insertion_id, + } + } +} + +impl Ord for TransactionRef { fn cmp(&self, other: &Self) -> cmp::Ordering { self.transaction.priority.cmp(&other.transaction.priority) .then(other.valid_till.cmp(&self.valid_till)) @@ -45,23 +55,23 @@ impl Ord for TransactionRef { } } -impl PartialOrd for TransactionRef { +impl PartialOrd for TransactionRef { fn partial_cmp(&self, other: &Self) -> Option { Some(self.cmp(other)) } } -impl PartialEq for TransactionRef { +impl PartialEq for TransactionRef { fn eq(&self, other: &Self) -> bool { self.cmp(other) == cmp::Ordering::Equal } } -impl Eq for TransactionRef {} +impl Eq for TransactionRef {} #[derive(Debug)] -struct ReadyTx { +struct ReadyTx { /// A reference to a transaction - pub transaction: TransactionRef, + pub transaction: TransactionRef, /// A list of transactions that get unlocked by this one pub unlocks: Vec, /// How many required tags are provided inherently @@ -79,19 +89,19 @@ qed "#; #[derive(Debug)] -pub struct ReadyTransactions { +pub struct ReadyTransactions { /// Insertion id insertion_id: u64, /// tags that are provided by Ready transactions provided_tags: HashMap, /// Transactions that are ready (i.e. don't have any requirements external to the pool) - ready: HashMap>, + ready: HashMap>, // ^^ TODO [ToDr] Consider wrapping this into `Arc>` and allow multiple concurrent iterators /// Best transactions that are ready to be included to the block without any other previous transaction. - best: BTreeSet>, + best: BTreeSet>, } -impl Default for ReadyTransactions { +impl Default for ReadyTransactions { fn default() -> Self { ReadyTransactions { insertion_id: Default::default(), @@ -102,7 +112,7 @@ impl Default for ReadyTransactions { } } -impl ReadyTransactions { +impl ReadyTransactions { /// Borrows a map of tags that are provided by transactions in this queue. pub fn provided_tags(&self) -> &HashMap { &self.provided_tags @@ -119,7 +129,7 @@ impl ReadyTransactions { /// - transactions that are valid for a shorter time go first /// 4. Lastly we sort by the time in the queue /// - transactions that are longer in the queue go first - pub fn get<'a>(&'a self) -> impl Iterator>> + 'a { + pub fn get<'a>(&'a self) -> impl Iterator>> + 'a { BestIterator { all: &self.ready, best: self.best.clone(), @@ -134,8 +144,8 @@ impl ReadyTransactions { pub fn import( &mut self, block_number: BlockNumber, - tx: WaitingTransaction, - ) -> error::Result>>> { + tx: WaitingTransaction, + ) -> error::Result>>> { assert!(tx.is_ready(), "Only ready transactions can be imported."); assert!(!self.ready.contains_key(&tx.transaction.hash), "Transaction is already imported."); @@ -194,7 +204,7 @@ impl ReadyTransactions { /// NOTE removing a transaction will also cause a removal of all transactions that depend on that one /// (i.e. the entire subgraph that this transaction is a start of will be removed). /// All removed transactions are returned. - pub fn remove_invalid(&mut self, hashes: &[Hash]) -> Vec>> { + pub fn remove_invalid(&mut self, hashes: &[Hash]) -> Vec>> { let mut removed = vec![]; let mut to_remove = hashes.iter().cloned().collect::>(); @@ -236,7 +246,7 @@ impl ReadyTransactions { /// All transactions that lead to a transaction, which provides this tag /// are going to be removed from the queue, but no other transactions are touched - /// i.e. all other subgraphs starting from given tag are still considered valid & ready. - pub fn prune_tags(&mut self, tag: Tag) -> Vec>> { + pub fn prune_tags(&mut self, tag: Tag) -> Vec>> { let mut removed = vec![]; let mut to_remove = vec![tag]; @@ -308,7 +318,7 @@ impl ReadyTransactions { /// We remove/replace old transactions in case they have lower priority. /// /// In case replacement is succesful returns a list of removed transactions. - fn replace_previous(&mut self, tx: &Transaction) -> error::Result>>> { + fn replace_previous(&mut self, tx: &Transaction) -> error::Result>>> { let mut to_remove = { // check if we are replacing a transaction let replace_hashes = tx.provides @@ -364,23 +374,22 @@ impl ReadyTransactions { } /// Returns number of transactions in this queue. - #[cfg(test)] pub fn len(&self) -> usize { self.ready.len() } } -pub struct BestIterator<'a, Hash: 'a> { - all: &'a HashMap>, - awaiting: HashMap)>, - best: BTreeSet>, +pub struct BestIterator<'a, Hash: 'a, Ex: 'a> { + all: &'a HashMap>, + awaiting: HashMap)>, + best: BTreeSet>, } -impl<'a, Hash: 'a + hash::Hash + Member> BestIterator<'a, Hash> { +impl<'a, Hash: 'a + hash::Hash + Member, Ex: 'a> BestIterator<'a, Hash, Ex> { /// Depending on number of satisfied requirements insert given ref /// either to awaiting set or to best set. - fn best_or_awaiting(&mut self, satisfied: usize, tx_ref: TransactionRef) { + fn best_or_awaiting(&mut self, satisfied: usize, tx_ref: TransactionRef) { if satisfied == tx_ref.transaction.requires.len() { // If we have satisfied all deps insert to best self.best.insert(tx_ref); @@ -392,8 +401,8 @@ impl<'a, Hash: 'a + hash::Hash + Member> BestIterator<'a, Hash> { } } -impl<'a, Hash: 'a + hash::Hash + Member> Iterator for BestIterator<'a, Hash> { - type Item = Arc>; +impl<'a, Hash: 'a + hash::Hash + Member, Ex: 'a> Iterator for BestIterator<'a, Hash, Ex> { + type Item = Arc>; fn next(&mut self) -> Option { let best = self.best.iter().next_back()?.clone(); @@ -432,9 +441,9 @@ fn remove_item(vec: &mut Vec, item: &T) { mod tests { use super::*; - fn tx(id: u8) -> Transaction { + fn tx(id: u8) -> Transaction> { Transaction { - ex: vec![id], + data: vec![id], hash: id as u64, priority: 1, longevity: 2, @@ -494,7 +503,7 @@ mod tests { tx4.provides = vec![]; let block_number = 1; let tx5 = Transaction { - ex: vec![5], + data: vec![5], hash: 5, priority: 1, longevity: u64::max_value(), // use the max_value() here for testing. @@ -517,7 +526,7 @@ mod tests { // then assert_eq!(ready.best.len(), 1); - let mut it = ready.get().map(|tx| tx.ex[0]); + let mut it = ready.get().map(|tx| tx.data[0]); assert_eq!(it.next(), Some(1)); assert_eq!(it.next(), Some(2)); diff --git a/substrate/core/transaction-pool/src/rotator.rs b/substrate/core/transaction-pool/graph/src/rotator.rs similarity index 82% rename from substrate/core/transaction-pool/src/rotator.rs rename to substrate/core/transaction-pool/graph/src/rotator.rs index 93acce9e91..b8fdd6863b 100644 --- a/substrate/core/transaction-pool/src/rotator.rs +++ b/substrate/core/transaction-pool/graph/src/rotator.rs @@ -21,13 +21,13 @@ use std::{ collections::HashMap, - fmt, hash, time::{Duration, Instant}, }; use parking_lot::RwLock; -use txpool::VerifiedTransaction; -use Verified; + +use base_pool::Transaction; +use pool::TxData; /// Expected size of the banned extrinsics cache. const EXPECTED_SIZE: usize = 2048; @@ -75,18 +75,19 @@ impl PoolRotator { } } + /// Bans extrinsic if it's stale. /// /// Returns `true` if extrinsic is stale and got banned. - pub fn ban_if_stale(&self, now: &Instant, xt: &Verified) -> bool where - VEx: VerifiedTransaction, - Hash: fmt::Debug + fmt::LowerHex, - { - if &xt.valid_till > now { - return false; + pub fn ban_if_stale(&self, now: &Instant, xt: &Transaction>) -> bool { + match xt.data.valid_till { + Some(ref valid_till) if valid_till > now => { + return false; + } + _ => {}, } - self.ban(now, &[xt.verified.hash().clone()]); + self.ban(now, &[xt.hash.clone()]); true } @@ -101,8 +102,9 @@ impl PoolRotator { #[cfg(test)] mod tests { use super::*; - use pool::tests::VerifiedTransaction; - use test_client::runtime::Hash; + + type Hash = u64; + type Ex = (); fn rotator() -> PoolRotator { PoolRotator { @@ -111,16 +113,18 @@ mod tests { } } - fn tx() -> (Hash, Verified) { - let hash = 5.into(); - let tx = Verified { - original: 5, - verified: VerifiedTransaction { - hash, - sender: Default::default(), - nonce: Default::default(), + fn tx() -> (Hash, Transaction>) { + let hash = 5u64; + let tx = Transaction { + data: TxData { + raw: (), + valid_till: Some(Instant::now()), }, - valid_till: Instant::now(), + hash: hash.clone(), + priority: 5, + longevity: 3, + requires: vec![], + provides: vec![], }; (hash, tx) @@ -175,16 +179,18 @@ mod tests { #[test] fn should_garbage_collect() { // given - fn tx_with(i: u64, time: Instant) -> Verified { - let hash = i.into(); - Verified { - original: i, - verified: VerifiedTransaction { - hash, - sender: Default::default(), - nonce: Default::default(), + fn tx_with(i: u64, time: Instant) -> Transaction> { + let hash = i; + Transaction { + data: TxData { + raw: (), + valid_till: Some(time), }, - valid_till: time, + hash, + priority: 5, + longevity: 3, + requires: vec![], + provides: vec![], } } diff --git a/substrate/core/transaction-pool/src/watcher.rs b/substrate/core/transaction-pool/graph/src/watcher.rs similarity index 96% rename from substrate/core/transaction-pool/src/watcher.rs rename to substrate/core/transaction-pool/graph/src/watcher.rs index fd6758fdba..bfbd384482 100644 --- a/substrate/core/transaction-pool/src/watcher.rs +++ b/substrate/core/transaction-pool/graph/src/watcher.rs @@ -25,6 +25,10 @@ use futures::{ #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub enum Status { + /// Extrinsic is part of the future queue. + Future, + /// Extrinsic is part of the ready queue. + Ready, /// Extrinsic has been finalised in block with given hash. Finalised(H2), /// Some state change (perhaps another extrinsic was included) rendered this extrinsic invalid. @@ -64,7 +68,7 @@ impl Default for Sender { fn default() -> Self { Sender { receivers: Default::default(), - finalised: Default::default(), + finalised: false, } } } diff --git a/substrate/core/transaction-pool/README.adoc b/substrate/core/transaction-pool/src/README.adoc similarity index 100% rename from substrate/core/transaction-pool/README.adoc rename to substrate/core/transaction-pool/src/README.adoc diff --git a/substrate/core/transaction-pool/src/api.rs b/substrate/core/transaction-pool/src/api.rs new file mode 100644 index 0000000000..5c0b8fc6e1 --- /dev/null +++ b/substrate/core/transaction-pool/src/api.rs @@ -0,0 +1,78 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +//! Chain api required for the transaction pool. + +use std::{ + sync::Arc, +}; +use client::{self, runtime_api::TaggedTransactionQueue}; +use parity_codec::Encode; +use txpool; +use substrate_primitives::{ + H256, + Blake2Hasher, + Hasher, +}; +use sr_primitives::{ + generic::BlockId, + traits, + transaction_validity::TransactionValidity, +}; + +use error; + +/// The transaction pool logic +pub struct ChainApi { + client: Arc>, +} + +impl ChainApi { + /// Create new transaction pool logic. + pub fn new(client: Arc>) -> Self { + ChainApi { + client, + } + } +} + +impl txpool::ChainApi for ChainApi where + Block: traits::Block, + B: client::backend::Backend + Send + Sync + 'static, + E: client::CallExecutor + Send + Sync + Clone + 'static, +{ + type Block = Block; + type Hash = H256; + type Error = error::Error; + + fn validate_transaction(&self, at: &BlockId, uxt: &txpool::ExtrinsicFor) -> error::Result { + Ok(self.client.validate_transaction(at, uxt)?) + } + + // TODO [toDr] Use proper lbock number type + fn block_id_to_number(&self, at: &BlockId) -> error::Result>> { + Ok(self.client.block_number_from_id(at)?) + } + + fn block_id_to_hash(&self, at: &BlockId) -> error::Result>> { + Ok(self.client.block_hash_from_id(at)?) + } + + fn hash(&self, ex: &txpool::ExtrinsicFor) -> Self::Hash { + Blake2Hasher::hash(&ex.encode()) + } +} + diff --git a/substrate/core/transaction-pool/src/error.rs b/substrate/core/transaction-pool/src/error.rs index 047041d18d..009c80a315 100644 --- a/substrate/core/transaction-pool/src/error.rs +++ b/substrate/core/transaction-pool/src/error.rs @@ -14,20 +14,23 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . -//! External Error trait for extrinsic pool. +//! Transaction pool error. +use client; use txpool; -/// Extrinsic pool error. -pub trait IntoPoolError: ::std::error::Error + Send + Sized { - /// Try to extract original `txpool::Error` - /// - /// This implementation is optional and used only to - /// provide more descriptive error messages for end users - /// of RPC API. - fn into_pool_error(self) -> Result { Err(self) } +error_chain! { + links { + Client(client::error::Error, client::error::ErrorKind) #[doc = "Client error"]; + Pool(txpool::error::Error, txpool::error::ErrorKind) #[doc = "Pool error"]; + } } -impl IntoPoolError for txpool::Error { - fn into_pool_error(self) -> Result { Ok(self) } +impl txpool::IntoPoolError for Error { + fn into_pool_error(self) -> ::std::result::Result { + match self { + Error(ErrorKind::Pool(e), c) => Ok(txpool::error::Error(e, c)), + e => Err(e), + } + } } diff --git a/substrate/core/transaction-pool/src/lib.rs b/substrate/core/transaction-pool/src/lib.rs index 1e79f5efd2..5d31ac8fb7 100644 --- a/substrate/core/transaction-pool/src/lib.rs +++ b/substrate/core/transaction-pool/src/lib.rs @@ -15,35 +15,31 @@ // along with Substrate. If not, see . // tag::description[] -//! Generic extrinsic pool. +//! Substrate transaction pool. // end::description[] #![warn(missing_docs)] #![warn(unused_extern_crates)] -extern crate futures; -extern crate parking_lot; -extern crate sr_primitives as runtime_primitives; +extern crate parity_codec; +extern crate sr_primitives; +extern crate substrate_client as client; +extern crate substrate_primitives; + +pub extern crate substrate_transaction_graph as txpool; #[macro_use] -extern crate log; -extern crate serde; -#[macro_use] -extern crate serde_derive; -extern crate transaction_pool as txpool; -#[cfg(test)] extern crate substrate_test_client as test_client; -#[cfg(test)] extern crate substrate_keyring as keyring; -#[cfg(test)] extern crate parity_codec as codec; +extern crate error_chain; -pub mod watcher; -mod error; -mod listener; -mod pool; -mod rotator; +#[cfg(test)] +extern crate substrate_test_client as test_client; +#[cfg(test)] +extern crate substrate_keyring as keyring; -pub use listener::Listener; -pub use pool::{Pool, ChainApi, EventStream, Verified, VerifiedFor, ExtrinsicFor, ExHash, AllExtrinsics, HashOf}; -pub use txpool::scoring; -pub use txpool::{Error, ErrorKind}; -pub use error::IntoPoolError; -pub use txpool::{Options, Status, LightStatus, VerifiedTransaction, Readiness, Transaction}; +mod api; +#[cfg(test)] +mod tests; + +pub mod error; + +pub use api::ChainApi; diff --git a/substrate/core/transaction-pool/src/pool.rs b/substrate/core/transaction-pool/src/pool.rs deleted file mode 100644 index d6c8504085..0000000000 --- a/substrate/core/transaction-pool/src/pool.rs +++ /dev/null @@ -1,626 +0,0 @@ -// Copyright 2018 Parity Technologies (UK) Ltd. -// This file is part of Substrate. - -// Substrate is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Substrate is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Substrate. If not, see . - -use std::{ - collections::{BTreeMap, HashMap}, - fmt, - sync::Arc, - time, -}; -use futures::sync::mpsc; -use parking_lot::{Mutex, RwLock}; -use serde::{Serialize, de::DeserializeOwned}; -use txpool::{self, Scoring, Readiness}; - -use error::IntoPoolError; -use listener::{self, Listener}; -use rotator::PoolRotator; -use watcher::Watcher; - -use runtime_primitives::{generic::BlockId, traits}; - -/// Modification notification event stream type; -pub type EventStream = mpsc::UnboundedReceiver<()>; - -/// Extrinsic hash type for a pool. -pub type ExHash = ::Hash; -/// Extrinsic type for a pool. -pub type ExtrinsicFor = <::Block as traits::Block>::Extrinsic; -/// Verified extrinsic data for `ChainApi`. -pub type VerifiedFor = Verified, ::VEx>; -/// A collection of all extrinsics. -pub type AllExtrinsics = BTreeMap<<::VEx as txpool::VerifiedTransaction>::Sender, Vec>>; - -/// Verified extrinsic struct. Wraps original extrinsic and verification info. -#[derive(Debug)] -pub struct Verified { - /// Original extrinsic. - pub original: Ex, - /// Verification data. - pub verified: VEx, - /// Pool deadline, after it's reached we remove the extrinsic from the pool. - pub valid_till: time::Instant, -} - -impl txpool::VerifiedTransaction for Verified -where - Ex: fmt::Debug, - VEx: txpool::VerifiedTransaction, -{ - type Hash = ::Hash; - type Sender = ::Sender; - - fn hash(&self) -> &Self::Hash { - self.verified.hash() - } - - fn sender(&self) -> &Self::Sender { - self.verified.sender() - } - - fn mem_usage(&self) -> usize { - // TODO: add `original` mem usage. - self.verified.mem_usage() - } -} - -/// Concrete extrinsic validation and query logic. -pub trait ChainApi: Send + Sync { - /// Block type. - type Block: traits::Block; - /// Extrinsic hash type. - type Hash: ::std::hash::Hash + Eq + Copy + fmt::Debug + fmt::LowerHex + Serialize + DeserializeOwned + ::std::str::FromStr + Send + Sync + Default + 'static; - /// Extrinsic sender type. - type Sender: ::std::hash::Hash + fmt::Debug + Serialize + DeserializeOwned + Eq + Clone + Send + Sync + Ord + Default; - /// Unchecked extrinsic type. - /// Verified extrinsic type. - type VEx: txpool::VerifiedTransaction + Send + Sync + Clone; - /// Readiness evaluator - type Ready; - /// Error type. - type Error: From + IntoPoolError; - /// Score type. - type Score: ::std::cmp::Ord + Clone + Default + fmt::Debug + Send + Send + Sync + fmt::LowerHex; - /// Custom scoring update event type. - type Event: ::std::fmt::Debug; - - /// Verify extrinsic at given block. - fn verify_transaction(&self, at: &BlockId, uxt: &ExtrinsicFor) -> Result; - - /// Create new readiness evaluator. - fn ready(&self) -> Self::Ready; - - /// Check readiness for verified extrinsic at given block. - fn is_ready(&self, at: &BlockId, context: &mut Self::Ready, xt: &VerifiedFor) -> Readiness; - - /// Decides on ordering of `T`s from a particular sender. - fn compare(old: &VerifiedFor, other: &VerifiedFor) -> ::std::cmp::Ordering; - - /// Decides how to deal with two transactions from a sender that seem to occupy the same slot in the queue. - fn choose(old: &VerifiedFor, new: &VerifiedFor) -> txpool::scoring::Choice; - - /// Updates the transaction scores given a list of transactions and a change to previous scoring. - /// NOTE: you can safely assume that both slices have the same length. - /// (i.e. score at index `i` represents transaction at the same index) - fn update_scores(xts: &[txpool::Transaction>], scores: &mut [Self::Score], change: txpool::scoring::Change); - - /// Decides if `new` should push out `old` transaction from the pool. - /// - /// NOTE returning `InsertNew` here can lead to some transactions being accepted above pool limits. - fn should_replace(old: &VerifiedFor, new: &VerifiedFor) -> txpool::scoring::Choice; - - /// Returns hash of the latest block in chain. - fn latest_hash(&self) -> HashOf; -} - -/// Returns block's hash type. -pub type HashOf = ::Hash; - -impl listener::LatestHash for Arc { - type Hash = HashOf; - - fn latest_hash(&self) -> HashOf { - ChainApi::latest_hash(&**self) - } -} - -pub struct Ready<'a, 'b, B: 'a + ChainApi> { - api: &'a B, - at: &'b BlockId, - context: B::Ready, - rotator: &'a PoolRotator, - now: time::Instant, -} - -impl<'a, 'b, B: ChainApi> txpool::Ready> for Ready<'a, 'b, B> { - fn is_ready(&mut self, xt: &VerifiedFor) -> Readiness { - if self.rotator.ban_if_stale(&self.now, xt) { - debug!(target: "transaction-pool", "[{:?}] Banning as stale.", txpool::VerifiedTransaction::hash(xt)); - return Readiness::Stale; - } - - self.api.is_ready(self.at, &mut self.context, xt) - } -} - -pub struct ScoringAdapter(::std::marker::PhantomData); - -impl ::std::fmt::Debug for ScoringAdapter { - fn fmt(&self, _f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { - Ok(()) - } -} - -impl Scoring> for ScoringAdapter { - type Score = ::Score; - type Event = ::Event; - - fn compare(&self, old: &VerifiedFor, other: &VerifiedFor) -> ::std::cmp::Ordering { - T::compare(old, other) - } - - fn choose(&self, old: &VerifiedFor, new: &VerifiedFor) -> txpool::scoring::Choice { - T::choose(old, new) - } - - fn update_scores(&self, xts: &[txpool::Transaction>], scores: &mut [Self::Score], change: txpool::scoring::Change) { - T::update_scores(xts, scores, change) - } - - fn should_replace(&self, old: &VerifiedFor, new: &VerifiedFor) -> txpool::scoring::Choice { - T::should_replace(old, new) - } -} - -/// Maximum time the transaction will be kept in the pool. -/// -/// Transactions that don't get included within the limit are removed from the pool. -const POOL_TIME: time::Duration = time::Duration::from_secs(60 * 5); - -/// Extrinsics pool. -pub struct Pool { - api: Arc, - pool: RwLock, - ScoringAdapter, - Listener>, - >>, - import_notification_sinks: Mutex>>, - rotator: PoolRotator, -} - -impl Pool { - /// Create a new transaction pool. - pub fn new(options: txpool::Options, api: B) -> Self { - let api = Arc::new(api); - Pool { - pool: RwLock::new(txpool::Pool::new(Listener::new(api.clone()), ScoringAdapter::(Default::default()), options)), - import_notification_sinks: Default::default(), - api, - rotator: Default::default(), - } - } - - /// Imports a pre-verified extrinsic to the pool. - pub fn import(&self, xt: VerifiedFor) -> Result>, B::Error> { - let result = self.pool.write().import(xt)?; - - self.import_notification_sinks.lock() - .retain(|sink| sink.unbounded_send(()).is_ok()); - - Ok(result) - } - - /// Return an event stream of transactions imported to the pool. - pub fn import_notification_stream(&self) -> EventStream { - let (sink, stream) = mpsc::unbounded(); - self.import_notification_sinks.lock().push(sink); - stream - } - - /// Invoked when extrinsics are broadcasted. - pub fn on_broadcasted(&self, propagated: HashMap>) { - for (hash, peers) in propagated.into_iter() { - self.pool.write().listener_mut().broadcasted(&hash, peers); - } - } - - /// Imports a bunch of unverified extrinsics to the pool - pub fn submit_at(&self, at: &BlockId, xts: T) -> Result>>, B::Error> where - T: IntoIterator> - { - xts - .into_iter() - .map(|xt| { - match self.api.verify_transaction(at, &xt) { - Ok(ref verified) if self.rotator.is_banned(txpool::VerifiedTransaction::hash(verified)) => { - return (Err(txpool::Error::from("Temporarily Banned".to_owned()).into()), xt) - }, - result => (result, xt), - } - }) - .map(|(v, xt)| { - let xt = Verified { - original: xt, - verified: v?, - valid_till: time::Instant::now() + POOL_TIME, - }; - Ok(self.pool.write().import(xt)?) - }) - .collect() - } - - /// Imports one unverified extrinsic to the pool - pub fn submit_one(&self, at: &BlockId, xt: ExtrinsicFor) -> Result>, B::Error> { - Ok(self.submit_at(at, ::std::iter::once(xt))?.pop().expect("One extrinsic passed; one result returned; qed")) - } - - /// Import a single extrinsic and starts to watch their progress in the pool. - pub fn submit_and_watch(&self, at: &BlockId, xt: ExtrinsicFor) -> Result>, B::Error> { - let xt = self.submit_at(at, Some(xt))?.pop().expect("One extrinsic passed; one result returned; qed"); - Ok(self.pool.write().listener_mut().create_watcher(xt)) - } - - /// Remove from the pool. - pub fn remove(&self, hashes: &[B::Hash], is_valid: bool) -> Vec>>> { - let mut pool = self.pool.write(); - let mut results = Vec::with_capacity(hashes.len()); - - // temporarily ban invalid transactions - if !is_valid { - debug!(target: "transaction-pool", "Banning invalid transactions: {:?}", hashes); - self.rotator.ban(&time::Instant::now(), hashes); - } - - for hash in hashes { - results.push(pool.remove(hash, is_valid)); - } - - results - } - - /// Cull transactions from the queue. - pub fn cull_from( - &self, - at: &BlockId, - senders: Option<&[::Sender]>, - ) -> usize - { - self.rotator.clear_timeouts(&time::Instant::now()); - let ready = self.ready(at); - self.pool.write().cull(senders, ready) - } - - /// Cull old transactions from the queue. - pub fn cull(&self, at: &BlockId) -> Result { - Ok(self.cull_from(at, None)) - } - - /// Cull transactions from the queue and then compute the pending set. - pub fn cull_and_get_pending(&self, at: &BlockId, f: F) -> Result where - F: FnOnce(txpool::PendingIterator, Ready, ScoringAdapter, Listener>>) -> T, - { - self.cull_from(at, None); - Ok(self.pending(at, f)) - } - - /// Get the full status of the queue (including readiness) - pub fn status>>(&self, ready: R) -> txpool::Status { - self.pool.read().status(ready) - } - - /// Returns light status of the pool. - pub fn light_status(&self) -> txpool::LightStatus { - self.pool.read().light_status() - } - - /// Removes all transactions from given sender - pub fn remove_sender(&self, sender: ::Sender) -> Vec>> { - let mut pool = self.pool.write(); - let pending = pool.pending_from_sender(|_: &VerifiedFor| txpool::Readiness::Ready, &sender).collect(); - // remove all transactions from this sender - pool.cull(Some(&[sender]), |_: &VerifiedFor| txpool::Readiness::Stale); - pending - } - - /// Retrieve the pending set. Be careful to not leak the pool `ReadGuard` to prevent deadlocks. - pub fn pending(&self, at: &BlockId, f: F) -> T where - F: FnOnce(txpool::PendingIterator, Ready, ScoringAdapter, Listener>>) -> T, - { - let ready = self.ready(at); - f(self.pool.read().pending(ready)) - } - - /// Retry to import all verified transactions from given sender. - pub fn retry_verification(&self, at: &BlockId, sender: ::Sender) -> Result<(), B::Error> { - let to_reverify = self.remove_sender(sender); - self.submit_at(at, to_reverify.into_iter().map(|ex| Arc::try_unwrap(ex).expect("Removed items have no references").original))?; - Ok(()) - } - - /// Reverify transaction that has been reported incorrect. - /// - /// Returns `Ok(None)` in case the hash is missing, `Err(e)` in case of verification error and new transaction - /// reference otherwise. - /// - /// TODO [ToDr] That method is currently unused, should be used together with BlockBuilder - /// when we detect that particular transaction has failed. - /// In such case we will attempt to remove or re-verify it. - pub fn reverify_transaction(&self, at: &BlockId, hash: B::Hash) -> Result>>, B::Error> { - let result = self.remove(&[hash], false).pop().expect("One hash passed; one result received; qed"); - if let Some(ex) = result { - self.submit_one(at, Arc::try_unwrap(ex).expect("Removed items have no references").original).map(Some) - } else { - Ok(None) - } - } - - /// Retrieve all transactions in the pool grouped by sender. - pub fn all(&self) -> AllExtrinsics { - use txpool::VerifiedTransaction; - let pool = self.pool.read(); - let all = pool.unordered_pending(AlwaysReady); - all.fold(Default::default(), |mut map: AllExtrinsics, tx| { - // Map with `null` key is not serializable, so we fallback to default accountId. - map.entry(tx.verified.sender().clone()) - .or_insert_with(Vec::new) - // use bytes type to make it serialize nicer. - .push(tx.original.clone()); - map - }) - } - - fn ready<'a, 'b>(&'a self, at: &'b BlockId) -> Ready<'a, 'b, B> { - Ready { - api: &self.api, - rotator: &self.rotator, - context: self.api.ready(), - at, - now: time::Instant::now(), - } - } -} - - /// A Readiness implementation that returns `Ready` for all transactions. -pub struct AlwaysReady; -impl txpool::Ready for AlwaysReady { - fn is_ready(&mut self, _tx: &VEx) -> txpool::Readiness { - txpool::Readiness::Ready - } -} - -#[cfg(test)] -pub mod tests { - use txpool; - use super::{VerifiedFor, ExtrinsicFor, HashOf}; - use std::collections::HashMap; - use std::cmp::Ordering; - use {Pool, ChainApi, scoring, Readiness}; - use keyring::Keyring::{self, *}; - use codec::Encode; - use test_client::runtime::{AccountId, Block, Hash, Index, Extrinsic, Transfer}; - use runtime_primitives::{generic, traits::{Hash as HashT, BlindCheckable, BlakeTwo256}}; - use VerifiedTransaction as VerifiedExtrinsic; - - type BlockId = generic::BlockId; - - #[derive(Clone, Debug)] - pub struct VerifiedTransaction { - pub hash: Hash, - pub sender: AccountId, - pub nonce: u64, - } - - impl txpool::VerifiedTransaction for VerifiedTransaction { - type Hash = Hash; - type Sender = AccountId; - - fn hash(&self) -> &Self::Hash { - &self.hash - } - - fn sender(&self) -> &Self::Sender { - &self.sender - } - - fn mem_usage(&self) -> usize { - 256 - } - } - - struct TestApi; - - impl TestApi { - fn default() -> Self { - TestApi - } - } - - impl ChainApi for TestApi { - type Block = Block; - type Hash = Hash; - type Sender = AccountId; - type Error = txpool::Error; - type VEx = VerifiedTransaction; - type Ready = HashMap; - type Score = u64; - type Event = (); - - fn latest_hash(&self) -> HashOf { - 1.into() - } - - fn verify_transaction(&self, _at: &BlockId, uxt: &ExtrinsicFor) -> Result { - let hash = BlakeTwo256::hash(&uxt.encode()); - let xt = uxt.clone().check()?; - Ok(VerifiedTransaction { - hash, - sender: xt.transfer.from, - nonce: xt.transfer.nonce, - }) - } - - fn is_ready(&self, at: &BlockId, nonce_cache: &mut Self::Ready, xt: &VerifiedFor) -> Readiness { - let sender = xt.verified.sender; - let next_index = nonce_cache.entry(sender) - .or_insert_with(|| index(at, sender)); - - let result = match xt.original.transfer.nonce.cmp(&next_index) { - Ordering::Greater => Readiness::Future, - Ordering::Equal => Readiness::Ready, - Ordering::Less => Readiness::Stale, - }; - - // remember to increment `next_index` - *next_index = next_index.saturating_add(1); - - result - } - - fn ready(&self) -> Self::Ready { - HashMap::default() - } - - fn compare(old: &VerifiedFor, other: &VerifiedFor) -> Ordering { - old.original.transfer.nonce.cmp(&other.original.transfer.nonce) - } - - fn choose(old: &VerifiedFor, new: &VerifiedFor) -> scoring::Choice { - assert!(new.verified.sender == old.verified.sender, "Scoring::choose called with transactions from different senders"); - if old.original.transfer.nonce == new.original.transfer.nonce { - return scoring::Choice::RejectNew; - } - scoring::Choice::InsertNew - } - - fn update_scores( - xts: &[txpool::Transaction>], - scores: &mut [Self::Score], - _change: scoring::Change<()> - ) { - for i in 0..xts.len() { - scores[i] = xts[i].original.transfer.amount; - } - } - - fn should_replace(_old: &VerifiedFor, _new: &VerifiedFor) -> scoring::Choice { - scoring::Choice::InsertNew - } - } - - fn index(at: &BlockId, _account: AccountId) -> u64 { - (_account[0] as u64) + number_of(at) - } - - fn number_of(at: &BlockId) -> u64 { - match at { - generic::BlockId::Number(n) => *n as u64, - _ => 0, - } - } - - fn uxt(who: Keyring, nonce: Index) -> Extrinsic { - let transfer = Transfer { - from: who.to_raw_public().into(), - to: AccountId::default(), - nonce, - amount: 1, - }; - let signature = transfer.using_encoded(|e| who.sign(e)); - Extrinsic { - transfer, - signature: signature.into(), - } - } - - fn pool() -> Pool { - Pool::new(Default::default(), TestApi::default()) - } - - #[test] - fn submission_should_work() { - let pool = pool(); - assert_eq!(209, index(&BlockId::number(0), Alice.to_raw_public().into())); - pool.submit_one(&BlockId::number(0), uxt(Alice, 209)).unwrap(); - - let pending: Vec<_> = pool.cull_and_get_pending(&BlockId::number(0), |p| p.map(|a| (*a.sender(), a.original.transfer.nonce)).collect()).unwrap(); - assert_eq!(pending, vec![(Alice.to_raw_public().into(), 209)]); - } - - #[test] - fn multiple_submission_should_work() { - let pool = pool(); - pool.submit_one(&BlockId::number(0), uxt(Alice, 209)).unwrap(); - pool.submit_one(&BlockId::number(0), uxt(Alice, 210)).unwrap(); - - let pending: Vec<_> = pool.cull_and_get_pending(&BlockId::number(0), |p| p.map(|a| (*a.sender(), a.original.transfer.nonce)).collect()).unwrap(); - assert_eq!(pending, vec![(Alice.to_raw_public().into(), 209), (Alice.to_raw_public().into(), 210)]); - } - - #[test] - fn early_nonce_should_be_culled() { - let pool = pool(); - pool.submit_one(&BlockId::number(0), uxt(Alice, 208)).unwrap(); - - let pending: Vec<_> = pool.cull_and_get_pending(&BlockId::number(0), |p| p.map(|a| (*a.sender(), a.original.transfer.nonce)).collect()).unwrap(); - assert_eq!(pending, vec![]); - } - - #[test] - fn late_nonce_should_be_queued() { - let pool = pool(); - - pool.submit_one(&BlockId::number(0), uxt(Alice, 210)).unwrap(); - let pending: Vec<_> = pool.cull_and_get_pending(&BlockId::number(0), |p| p.map(|a| (*a.sender(), a.original.transfer.nonce)).collect()).unwrap(); - assert_eq!(pending, vec![]); - - pool.submit_one(&BlockId::number(0), uxt(Alice, 209)).unwrap(); - let pending: Vec<_> = pool.cull_and_get_pending(&BlockId::number(0), |p| p.map(|a| (*a.sender(), a.original.transfer.nonce)).collect()).unwrap(); - assert_eq!(pending, vec![(Alice.to_raw_public().into(), 209), (Alice.to_raw_public().into(), 210)]); - } - - #[test] - fn retrying_verification_might_not_change_anything() { - let pool = pool(); - pool.submit_one(&BlockId::number(0), uxt(Alice, 209)).unwrap(); - pool.submit_one(&BlockId::number(0), uxt(Alice, 210)).unwrap(); - - let pending: Vec<_> = pool.cull_and_get_pending(&BlockId::number(0), |p| p.map(|a| (*a.sender(), a.original.transfer.nonce)).collect()).unwrap(); - assert_eq!(pending, vec![(Alice.to_raw_public().into(), 209), (Alice.to_raw_public().into(), 210)]); - - pool.retry_verification(&BlockId::number(1), Alice.to_raw_public().into()).unwrap(); - - let pending: Vec<_> = pool.cull_and_get_pending(&BlockId::number(0), |p| p.map(|a| (*a.sender(), a.original.transfer.nonce)).collect()).unwrap(); - assert_eq!(pending, vec![(Alice.to_raw_public().into(), 209), (Alice.to_raw_public().into(), 210)]); - } - - #[test] - fn should_ban_invalid_transactions() { - let pool = pool(); - let uxt = uxt(Alice, 209); - let hash = *pool.submit_one(&BlockId::number(0), uxt.clone()).unwrap().hash(); - pool.remove(&[hash], true); - pool.submit_one(&BlockId::number(0), uxt.clone()).unwrap(); - - // when - pool.remove(&[hash], false); - let pending: Vec = pool.cull_and_get_pending(&BlockId::number(0), |p| p.map(|a| *a.sender()).collect()).unwrap(); - assert_eq!(pending, vec![]); - - // then - pool.submit_one(&BlockId::number(0), uxt.clone()).unwrap_err(); - } -} diff --git a/substrate/core/transaction-pool/src/tests.rs b/substrate/core/transaction-pool/src/tests.rs new file mode 100644 index 0000000000..e1ff23af9b --- /dev/null +++ b/substrate/core/transaction-pool/src/tests.rs @@ -0,0 +1,177 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + + +use super::*; + +use keyring::Keyring::{self, *}; +use parity_codec::Encode; +use txpool::{self, Pool}; +use test_client::runtime::{AccountId, Block, Hash, Index, Extrinsic, Transfer}; +use sr_primitives::{ + generic::{self, BlockId}, + traits::{Hash as HashT, BlakeTwo256}, + transaction_validity::TransactionValidity, +}; + +struct TestApi; + +impl TestApi { + fn default() -> Self { + TestApi + } +} + +impl txpool::ChainApi for TestApi { + type Block = Block; + type Hash = Hash; + type Error = error::Error; + + fn validate_transaction(&self, at: &BlockId, uxt: &txpool::ExtrinsicFor) -> error::Result { + let expected = index(at); + let requires = if expected == uxt.transfer.nonce { + vec![] + } else { + vec![vec![uxt.transfer.nonce as u8 - 1]] + }; + let provides = vec![vec![uxt.transfer.nonce as u8]]; + + Ok(TransactionValidity::Valid( + /* priority: */1, + requires, + provides, + /* longevity: */64 + )) + } + + fn block_id_to_number(&self, at: &BlockId) -> error::Result>> { + Ok(Some(number_of(at))) + } + + fn block_id_to_hash(&self, at: &BlockId) -> error::Result>> { + Ok(match at { + generic::BlockId::Hash(x) => Some(x.clone()), + _ => Some(Default::default()), + }) + } + + fn hash(&self, ex: &txpool::ExtrinsicFor) -> Self::Hash { + BlakeTwo256::hash(&ex.encode()) + } + +} + +fn index(at: &BlockId) -> u64 { + 209 + number_of(at) +} + +fn number_of(at: &BlockId) -> u64 { + match at { + generic::BlockId::Number(n) => *n as u64, + _ => 0, + } +} + +fn uxt(who: Keyring, nonce: Index) -> Extrinsic { + let transfer = Transfer { + from: who.to_raw_public().into(), + to: AccountId::default(), + nonce, + amount: 1, + }; + let signature = transfer.using_encoded(|e| who.sign(e)); + Extrinsic { + transfer, + signature: signature.into(), + } +} + +fn pool() -> Pool { + Pool::new(Default::default(), TestApi::default()) +} + +#[test] +fn submission_should_work() { + let pool = pool(); + assert_eq!(209, index(&BlockId::number(0))); + pool.submit_one(&BlockId::number(0), uxt(Alice, 209)).unwrap(); + + let pending: Vec<_> = pool.ready(|p| p.map(|a| a.data.raw.transfer.nonce).collect()); + assert_eq!(pending, vec![209]); +} + +#[test] +fn multiple_submission_should_work() { + let pool = pool(); + pool.submit_one(&BlockId::number(0), uxt(Alice, 209)).unwrap(); + pool.submit_one(&BlockId::number(0), uxt(Alice, 210)).unwrap(); + + let pending: Vec<_> = pool.ready(|p| p.map(|a| a.data.raw.transfer.nonce).collect()); + assert_eq!(pending, vec![209, 210]); +} + +#[test] +fn early_nonce_should_be_culled() { + let pool = pool(); + pool.submit_one(&BlockId::number(0), uxt(Alice, 208)).unwrap(); + + let pending: Vec<_> = pool.ready(|p| p.map(|a| a.data.raw.transfer.nonce).collect()); + assert_eq!(pending, Vec::::new()); +} + +#[test] +fn late_nonce_should_be_queued() { + let pool = pool(); + + pool.submit_one(&BlockId::number(0), uxt(Alice, 210)).unwrap(); + let pending: Vec<_> = pool.ready(|p| p.map(|a| a.data.raw.transfer.nonce).collect()); + assert_eq!(pending, Vec::::new()); + + pool.submit_one(&BlockId::number(0), uxt(Alice, 209)).unwrap(); + let pending: Vec<_> = pool.ready(|p| p.map(|a| a.data.raw.transfer.nonce).collect()); + assert_eq!(pending, vec![209, 210]); +} + +#[test] +fn prune_tags_should_work() { + let pool = pool(); + pool.submit_one(&BlockId::number(0), uxt(Alice, 209)).unwrap(); + pool.submit_one(&BlockId::number(0), uxt(Alice, 210)).unwrap(); + + let pending: Vec<_> = pool.ready(|p| p.map(|a| a.data.raw.transfer.nonce).collect()); + assert_eq!(pending, vec![209, 210]); + + pool.prune_tags(&BlockId::number(1), vec![vec![209]]).unwrap(); + + let pending: Vec<_> = pool.ready(|p| p.map(|a| a.data.raw.transfer.nonce).collect()); + assert_eq!(pending, vec![210]); +} + +#[test] +fn should_ban_invalid_transactions() { + let pool = pool(); + let uxt = uxt(Alice, 209); + let hash = pool.submit_one(&BlockId::number(0), uxt.clone()).unwrap(); + pool.remove_invalid(&[hash]); + pool.submit_one(&BlockId::number(0), uxt.clone()).unwrap_err(); + + // when + let pending: Vec<_> = pool.ready(|p| p.map(|a| a.data.raw.transfer.nonce).collect()); + assert_eq!(pending, Vec::::new()); + + // then + pool.submit_one(&BlockId::number(0), uxt.clone()).unwrap_err(); +} diff --git a/substrate/node/consensus/Cargo.toml b/substrate/node/consensus/Cargo.toml index c63992dc7f..6607b5ca3c 100644 --- a/substrate/node/consensus/Cargo.toml +++ b/substrate/node/consensus/Cargo.toml @@ -4,22 +4,22 @@ version = "0.1.0" authors = ["Parity Technologies "] [dependencies] -futures = "0.1.17" -parking_lot = "0.4" -tokio = "0.1.7" error-chain = "0.12" -log = "0.4" exit-future = "0.1" -rhododendron = "0.3" +futures = "0.1.17" +log = "0.4" node-primitives = { path = "../primitives" } node-runtime = { path = "../runtime" } -node-transaction-pool = { path = "../transaction-pool" } -substrate-bft = { path = "../../core/bft" } parity-codec = "2.0" -substrate-primitives = { path = "../../core/primitives" } -substrate-client = { path = "../../core/client" } +parking_lot = "0.4" +rhododendron = "0.3" sr-primitives = { path = "../../core/sr-primitives" } srml-system = { path = "../../srml/system" } +substrate-bft = { path = "../../core/bft" } +substrate-client = { path = "../../core/client" } +substrate-primitives = { path = "../../core/primitives" } +substrate-transaction-pool = { path = "../../core/transaction-pool" } +tokio = "0.1.7" [dev-dependencies] substrate-keyring = { path = "../../core/keyring" } diff --git a/substrate/node/consensus/src/lib.rs b/substrate/node/consensus/src/lib.rs index 3a49b3db42..b8bb32e3a5 100644 --- a/substrate/node/consensus/src/lib.rs +++ b/substrate/node/consensus/src/lib.rs @@ -16,25 +16,25 @@ //! This service uses BFT consensus provided by the substrate. -extern crate parking_lot; -extern crate node_transaction_pool as transaction_pool; extern crate node_runtime; extern crate node_primitives; -extern crate substrate_bft as bft; extern crate parity_codec as codec; -extern crate substrate_primitives as primitives; extern crate sr_primitives as runtime_primitives; extern crate srml_system; +extern crate substrate_bft as bft; extern crate substrate_client as client; +extern crate substrate_primitives as primitives; +extern crate substrate_transaction_pool as transaction_pool; extern crate exit_future; -extern crate tokio; +extern crate futures; +extern crate parking_lot; extern crate rhododendron; +extern crate tokio; #[macro_use] extern crate error_chain; -extern crate futures; #[macro_use] extern crate log; @@ -51,10 +51,10 @@ use codec::{Decode, Encode}; use node_primitives::{AccountId, Timestamp, SessionKey, InherentData}; use node_runtime::Runtime; use primitives::{AuthorityId, ed25519, Blake2Hasher}; -use runtime_primitives::traits::{Block as BlockT, Hash as HashT, Header as HeaderT, As}; +use runtime_primitives::traits::{Block as BlockT, Hash as HashT, Header as HeaderT, As, BlockNumberToHash}; use runtime_primitives::generic::{BlockId, Era}; use srml_system::Trait as SystemT; -use transaction_pool::{TransactionPool, Client as TPClient}; +use transaction_pool::txpool::{self, Pool as TransactionPool}; use tokio::runtime::TaskExecutor; use tokio::timer::Delay; @@ -95,7 +95,7 @@ pub trait AuthoringApi: /// The block used for this API type. type Block: BlockT; /// The error used by this API type. - type Error; + type Error: std::error::Error; /// Build a block on top of the given, with inherent extrinsics pre-pushed. fn build_block) -> ()>( @@ -166,13 +166,14 @@ pub trait Network { } /// Proposer factory. -pub struct ProposerFactory where - C: AuthoringApi + TPClient, +pub struct ProposerFactory where + C: AuthoringApi, + A: txpool::ChainApi, { /// The client instance. pub client: Arc, /// The transaction pool. - pub transaction_pool: Arc>, + pub transaction_pool: Arc>, /// The backing network handle. pub network: N, /// handle to remote task executor @@ -183,14 +184,15 @@ pub struct ProposerFactory where pub force_delay: Timestamp, } -impl bft::Environment<::Block> for ProposerFactory where +impl bft::Environment<::Block> for ProposerFactory where N: Network::Block>, - C: AuthoringApi + TPClient::Block>, + C: AuthoringApi + BlockNumberToHash, + A: txpool::ChainApi::Block>, <::Block as BlockT>::Hash: Into<::Hash> + PartialEq + Into, Error: From<::Error> { - type Proposer = Proposer; + type Proposer = Proposer; type Input = N::Input; type Output = N::Output; type Error = Error; @@ -240,7 +242,7 @@ impl bft::Environment<::Block> for ProposerFactory { +pub struct Proposer { client: Arc, start: Instant, local_key: Arc, @@ -248,13 +250,13 @@ pub struct Proposer { parent_id: BlockId<::Block>, parent_number: <<::Block as BlockT>::Header as HeaderT>::Number, random_seed: <::Block as BlockT>::Hash, - transaction_pool: Arc>, + transaction_pool: Arc>, offline: SharedOfflineTracker, validators: Vec, minimum_timestamp: u64, } -impl Proposer { +impl Proposer { fn primary_index(&self, round_number: usize, len: usize) -> usize { use primitives::uint::U256; @@ -265,8 +267,9 @@ impl Proposer { } } -impl bft::Proposer<::Block> for Proposer where - C: AuthoringApi + TPClient::Block>, +impl bft::Proposer<::Block> for Proposer where + C: AuthoringApi + BlockNumberToHash, + A: txpool::ChainApi::Block>, <::Block as BlockT>::Hash: Into<::Hash> + PartialEq + Into, error::Error: From<::Error> @@ -307,27 +310,26 @@ impl bft::Proposer<::Block> for Proposer where inherent_data, |block_builder| { let mut unqueue_invalid = Vec::new(); - let result = self.transaction_pool.cull_and_get_pending(&BlockId::hash(self.parent_hash), |pending_iterator| { + self.transaction_pool.ready(|pending_iterator| { let mut pending_size = 0; for pending in pending_iterator { - if pending_size + pending.verified.encoded_size() >= MAX_TRANSACTIONS_SIZE { break } + // TODO [ToDr] Probably get rid of it, and validate in runtime. + let encoded_size = pending.data.raw.encode().len(); + if pending_size + encoded_size >= MAX_TRANSACTIONS_SIZE { break } - match block_builder.push_extrinsic(pending.original.clone()) { + match block_builder.push_extrinsic(pending.data.raw.clone()) { Ok(()) => { - pending_size += pending.verified.encoded_size(); + pending_size += encoded_size; } Err(e) => { trace!(target: "transaction-pool", "Invalid transaction: {}", e); - unqueue_invalid.push(pending.verified.hash().clone()); + unqueue_invalid.push(pending.hash.clone()); } } } }); - if let Err(e) = result { - warn!("Unable to get the pending set: {:?}", e); - } - self.transaction_pool.remove(&unqueue_invalid, false); + self.transaction_pool.remove_invalid(&unqueue_invalid); })?; info!("Proposing block [number: {}; hash: {}; parent_hash: {}; extrinsics: [{}]]", @@ -440,24 +442,22 @@ impl bft::Proposer<::Block> for Proposer where use runtime_primitives::bft::{MisbehaviorKind, MisbehaviorReport}; use node_runtime::{Call, UncheckedExtrinsic, ConsensusCall}; - let local_id = self.local_key.public().0.into(); let mut next_index = { - let cur_index = self.transaction_pool.cull_and_get_pending(&BlockId::hash(self.parent_hash), |pending| pending - .filter(|tx| tx.verified.sender == local_id) - .last() - .map(|tx| Ok(tx.verified.index())) - .unwrap_or_else(|| self.client.account_nonce(&self.parent_id, &local_id)) - .map_err(Error::from) - ); + let local_id = self.local_key.public().0; + // let cur_index = self.transaction_pool.cull_and_get_pending(&BlockId::hash(self.parent_hash), |pending| pending + // .filter(|tx| tx.verified.sender == local_id) + // .last() + // .map(|tx| Ok(tx.verified.index())) + // .unwrap_or_else(|| self.client.account_nonce(&self.parent_id, local_id)) + // .map_err(Error::from) + // ); + // TODO [ToDr] Use pool data + let cur_index: Result = self.client.account_nonce(&self.parent_id, &local_id).map_err(Error::from); match cur_index { - Ok(Ok(cur_index)) => cur_index + 1, - Ok(Err(e)) => { - warn!(target: "consensus", "Error computing next transaction index: {}", e); - return; - } + Ok(cur_index) => cur_index + 1, Err(e) => { - warn!(target: "consensus", "Error computing next transaction index: {}", e); + warn!(target: "consensus", "Error computing next transaction index: {:?}", e); return; } } @@ -488,8 +488,9 @@ impl bft::Proposer<::Block> for Proposer where }; let uxt: <::Block as BlockT>::Extrinsic = Decode::decode(&mut extrinsic.encode().as_slice()).expect("Encoded extrinsic is valid"); let hash = BlockId::<::Block>::hash(self.parent_hash); - self.transaction_pool.submit_one(&hash, uxt) - .expect("locally signed extrinsic is valid; qed"); + if let Err(e) = self.transaction_pool.submit_one(&hash, uxt) { + warn!("Error importing misbehavior report: {:?}", e); + } } } diff --git a/substrate/node/consensus/src/service.rs b/substrate/node/consensus/src/service.rs index f8198161df..3b9a8f9192 100644 --- a/substrate/node/consensus/src/service.rs +++ b/substrate/node/consensus/src/service.rs @@ -26,9 +26,9 @@ use bft::{self, BftService}; use client::{BlockchainEvents, ChainHead, BlockBody}; use ed25519; use futures::prelude::*; -use transaction_pool::{TransactionPool, Client as TPClient}; +use transaction_pool::txpool::{Pool as TransactionPool, ChainApi as PoolChainApi}; use primitives; -use runtime_primitives::traits::{Block as BlockT, Header as HeaderT}; +use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, BlockNumberToHash}; use tokio::executor::current_thread::TaskExecutor as LocalThreadHandle; use tokio::runtime::TaskExecutor as ThreadPoolHandle; @@ -72,18 +72,19 @@ pub struct Service { impl Service { /// Create and start a new instance. - pub fn new( + pub fn new( client: Arc, api: Arc, network: N, - transaction_pool: Arc>, + transaction_pool: Arc>, thread_pool: ThreadPoolHandle, key: ed25519::Pair, block_delay: u64, ) -> Service where - A: AuthoringApi + TPClient::Block> + 'static, error::Error: From<::Error>, + A: AuthoringApi + BlockNumberToHash + 'static, + P: PoolChainApi::Block> + 'static, C: BlockchainEvents<::Block> + ChainHead<::Block> + BlockBody<::Block>, diff --git a/substrate/node/service/Cargo.toml b/substrate/node/service/Cargo.toml index a9c58ebb47..513f128eaf 100644 --- a/substrate/node/service/Cargo.toml +++ b/substrate/node/service/Cargo.toml @@ -4,27 +4,27 @@ version = "0.1.0" authors = ["Parity Technologies "] [dependencies] -parking_lot = "0.4" error-chain = "0.12" +hex-literal = "0.1" lazy_static = "1.0" log = "0.4" -slog = "^2" -tokio = "0.1.7" -hex-literal = "0.1" -parity-codec = { version = "2.0" } +node-consensus = { path = "../consensus" } +node-executor = { path = "../executor" } +node-network = { path = "../network" } node-primitives = { path = "../primitives" } node-runtime = { path = "../runtime" } -node-executor = { path = "../executor" } -node-consensus = { path = "../consensus" } -node-network = { path = "../network" } -node-transaction-pool = { path = "../transaction-pool" } +parity-codec = { version = "2.0" } +parking_lot = "0.4" +slog = "^2" sr-io = { path = "../../core/sr-io" } sr-primitives = { path = "../../core/sr-primitives" } -substrate-primitives = { path = "../../core/primitives" } -substrate-network = { path = "../../core/network" } substrate-client = { path = "../../core/client" } +substrate-network = { path = "../../core/network" } +substrate-primitives = { path = "../../core/primitives" } substrate-service = { path = "../../core/service" } substrate-telemetry = { path = "../../core/telemetry" } +substrate-transaction-pool = { path = "../../core/transaction-pool" } +tokio = "0.1.7" [dev-dependencies] substrate-service-test = { path = "../../core/service/test" } diff --git a/substrate/node/service/src/lib.rs b/substrate/node/service/src/lib.rs index d20c099c10..60cf81b6e4 100644 --- a/substrate/node/service/src/lib.rs +++ b/substrate/node/service/src/lib.rs @@ -22,12 +22,12 @@ extern crate node_primitives; extern crate node_runtime; extern crate node_executor; extern crate node_network; -extern crate node_transaction_pool as transaction_pool; extern crate node_consensus as consensus; -extern crate substrate_primitives as primitives; -extern crate substrate_network as network; extern crate substrate_client as client; +extern crate substrate_network as network; +extern crate substrate_primitives as primitives; extern crate substrate_service as service; +extern crate substrate_transaction_pool as transaction_pool; extern crate parity_codec as codec; extern crate tokio; #[cfg(test)] @@ -51,13 +51,12 @@ pub mod chain_spec; use std::sync::Arc; use codec::Decode; -use transaction_pool::TransactionPool; +use transaction_pool::txpool::{Pool as TransactionPool}; use node_primitives::{Block, Hash, Timestamp, BlockId}; use node_runtime::{GenesisConfig, BlockPeriod, StorageValue, Runtime}; use client::Client; use consensus::AuthoringApi; use node_network::{Protocol as DemoProtocol, consensus::ConsensusNetwork}; -use transaction_pool::Client as TPApi; use tokio::runtime::TaskExecutor; use service::FactoryFullConfiguration; use primitives::{Blake2Hasher, storage::StorageKey, twox_128}; @@ -75,7 +74,7 @@ pub type NetworkService = network::Service; /// Client executor. @@ -109,21 +108,21 @@ impl service::ServiceFactory for Factory { type ExtrinsicHash = Hash; type NetworkProtocol = DemoProtocol; type RuntimeDispatch = node_executor::Executor; - type FullTransactionPoolApi = transaction_pool::ChainApi>; - type LightTransactionPoolApi = transaction_pool::ChainApi>; + type FullTransactionPoolApi = transaction_pool::ChainApi, service::FullExecutor, Block>; + type LightTransactionPoolApi = transaction_pool::ChainApi, service::LightExecutor, Block>; type Genesis = GenesisConfig; type Configuration = CustomConfiguration; type FullService = Service>; type LightService = Service>; fn build_full_transaction_pool(config: TransactionPoolOptions, client: Arc>) - -> Result>, Error> + -> Result, Error> { Ok(TransactionPool::new(config, transaction_pool::ChainApi::new(client))) } fn build_light_transaction_pool(config: TransactionPoolOptions, client: Arc>) - -> Result>, Error> + -> Result, Error> { Ok(TransactionPool::new(config, transaction_pool::ChainApi::new(client))) } diff --git a/substrate/node/transaction-pool/Cargo.toml b/substrate/node/transaction-pool/Cargo.toml deleted file mode 100644 index afe516b815..0000000000 --- a/substrate/node/transaction-pool/Cargo.toml +++ /dev/null @@ -1,16 +0,0 @@ -[package] -name = "node-transaction-pool" -version = "0.1.0" -authors = ["Parity Technologies "] - -[dependencies] -log = "0.4" -error-chain = "0.12" -node-primitives = { path = "../primitives" } -node-runtime = { path = "../runtime" } -substrate-client = { path = "../../core/client" } -parity-codec = "2.0" -substrate-keyring = { path = "../../core/keyring" } -substrate-transaction-pool = { path = "../../core/transaction-pool" } -substrate-primitives = { path = "../../core/primitives" } -sr-primitives = { path = "../../core/sr-primitives" } diff --git a/substrate/node/transaction-pool/src/error.rs b/substrate/node/transaction-pool/src/error.rs deleted file mode 100644 index fcf6e6de2a..0000000000 --- a/substrate/node/transaction-pool/src/error.rs +++ /dev/null @@ -1,73 +0,0 @@ -// Copyright 2018 Parity Technologies (UK) Ltd. -// This file is part of Substrate. - -// Substrate is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Substrate is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Substrate. If not, see . - -use transaction_pool; -use primitives::Hash; -use runtime::{Address, UncheckedExtrinsic}; -use client; - -error_chain! { - links { - Client(client::error::Error, client::error::ErrorKind); - Pool(transaction_pool::Error, transaction_pool::ErrorKind); - } - errors { - /// Unexpected extrinsic format submitted - InvalidExtrinsicFormat { - description("Invalid extrinsic format."), - display("Invalid extrinsic format."), - } - /// Attempted to queue an inherent transaction. - IsInherent(xt: UncheckedExtrinsic) { - description("Inherent transactions cannot be queued."), - display("Inherent transactions cannot be queued."), - } - /// Attempted to queue a transaction with bad signature. - BadSignature(e: &'static str) { - description("Transaction had bad signature."), - display("Transaction had bad signature: {}", e), - } - /// Attempted to queue a transaction that is already in the pool. - AlreadyImported(hash: Hash) { - description("Transaction is already in the pool."), - display("Transaction {:?} is already in the pool.", hash), - } - /// Import error. - Import(err: Box<::std::error::Error + Send>) { - description("Error importing transaction"), - display("Error importing transaction: {}", err.description()), - } - /// Runtime failure. - UnrecognisedAddress(who: Address) { - description("Unrecognised address in extrinsic"), - display("Unrecognised address in extrinsic: {}", who), - } - /// Extrinsic too large - TooLarge(got: usize, max: usize) { - description("Extrinsic too large"), - display("Extrinsic is too large ({} > {})", got, max), - } - } -} - -impl transaction_pool::IntoPoolError for Error { - fn into_pool_error(self) -> ::std::result::Result { - match self { - Error(ErrorKind::Pool(e), c) => Ok(transaction_pool::Error(e, c)), - e => Err(e), - } - } -} diff --git a/substrate/node/transaction-pool/src/lib.rs b/substrate/node/transaction-pool/src/lib.rs deleted file mode 100644 index bb8eb1c529..0000000000 --- a/substrate/node/transaction-pool/src/lib.rs +++ /dev/null @@ -1,275 +0,0 @@ -// Copyright 2018 Parity Technologies (UK) Ltd. -// This file is part of Substrate. - -// Substrate is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Substrate is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Substrate. If not, see . - -extern crate substrate_client as client; -extern crate parity_codec as codec; -extern crate substrate_transaction_pool as transaction_pool; -extern crate substrate_primitives; -extern crate sr_primitives; -extern crate node_runtime as runtime; -extern crate node_primitives as primitives; - -#[cfg(test)] -extern crate substrate_keyring; - -#[macro_use] -extern crate error_chain; - -#[macro_use] -extern crate log; - -mod error; - -use std::{ - cmp::Ordering, - collections::HashMap, - sync::Arc, -}; - -use codec::{Decode, Encode}; -use client::{Client as SubstrateClient, CallExecutor}; -use client::runtime_api::OldTxQueue; -use transaction_pool::{Readiness, scoring::{Change, Choice}, VerifiedFor, ExtrinsicFor}; -use primitives::{AccountId, Hash, Index}; -use runtime::{Address, UncheckedExtrinsic}; -use substrate_primitives::{Blake2Hasher}; -use sr_primitives::generic::BlockId; -use sr_primitives::traits::{ - Bounded, Checkable, Block as BlockT, Hash as HashT, Header as HeaderT, BlakeTwo256, Lookup, CurrentHeight, - BlockNumberToHash -}; - -pub use transaction_pool::{Options, Status, LightStatus, VerifiedTransaction as VerifiedTransactionOps}; -pub use error::{Error, ErrorKind, Result}; - -/// Maximal size of a single encoded extrinsic. -const MAX_TRANSACTION_SIZE: usize = 4 * 1024 * 1024; - -/// Local client abstraction for the transaction-pool. -pub trait Client: - Send - + Sync - + CurrentHeight::Block as BlockT>::Header as HeaderT>::Number> - + BlockNumberToHash::Block as BlockT>::Header as HeaderT>::Number, Hash=<::Block as BlockT>::Hash> - + OldTxQueue<::Block> -{ - /// The block used for this API type. - type Block: BlockT; -} - -impl Client for SubstrateClient where - B: client::backend::Backend + Send + Sync + 'static, - E: CallExecutor + Send + Sync + Clone + 'static, - Block: BlockT, -{ - type Block = Block; -} - -/// Type alias for the transaction pool. -pub type TransactionPool = transaction_pool::Pool>; - -/// A verified transaction which should be includable and non-inherent. -#[derive(Clone, Debug)] -pub struct VerifiedTransaction { - /// Transaction hash. - pub hash: Hash, - /// Transaction sender. - pub sender: AccountId, - /// Transaction index. - pub index: Index, - encoded_size: usize, -} - -impl VerifiedTransaction { - /// Get the 256-bit hash of this transaction. - pub fn hash(&self) -> &Hash { - &self.hash - } - - /// Get the account ID of the sender of this transaction. - pub fn index(&self) -> Index { - self.index - } - - /// Get encoded size of the transaction. - pub fn encoded_size(&self) -> usize { - self.encoded_size - } -} - -impl transaction_pool::VerifiedTransaction for VerifiedTransaction { - type Hash = Hash; - type Sender = AccountId; - - fn hash(&self) -> &Self::Hash { - &self.hash - } - - fn sender(&self) -> &Self::Sender { - &self.sender - } - - fn mem_usage(&self) -> usize { - self.encoded_size // TODO - } -} - -/// The transaction pool logic. -pub struct ChainApi { - api: Arc, -} - -impl ChainApi { - /// Create a new instance. - pub fn new(api: Arc) -> Self { - ChainApi { - api, - } - } -} - -/// "Chain" context (used for checking transactions) which uses data local to our node/transaction pool. -/// -/// This is due for removal when #721 lands -pub struct LocalContext<'a, A: 'a>(&'a Arc); -impl<'a, C: 'a + Client> CurrentHeight for LocalContext<'a, C> { - type BlockNumber = ::BlockNumber; - fn current_height(&self) -> Self::BlockNumber { - self.0.current_height() - } -} -impl<'a, C: 'a + Client> BlockNumberToHash for LocalContext<'a, C> { - type BlockNumber = ::BlockNumber; - type Hash = ::Hash; - fn block_number_to_hash(&self, n: Self::BlockNumber) -> Option { - self.0.block_number_to_hash(n) - } -} -impl<'a, C: 'a + Client> Lookup for LocalContext<'a, C> { - type Source = Address; - type Target = AccountId; - fn lookup(&self, a: Address) -> ::std::result::Result { - self.0.lookup_address(&BlockId::number(self.current_height()), &a).unwrap_or(None).ok_or("error with lookup") - } -} - -impl transaction_pool::ChainApi for ChainApi { - type Block = C::Block; - type Hash = Hash; - type Sender = AccountId; - type VEx = VerifiedTransaction; - type Ready = HashMap; - type Error = Error; - type Score = u64; - type Event = (); - - fn latest_hash(&self) -> ::Hash { - self.api.block_number_to_hash(self.api.current_height()).expect("Latest block number always has a hash; qed") - } - - fn verify_transaction(&self, _at: &BlockId, xt: &ExtrinsicFor) -> Result { - let encoded = xt.encode(); - let uxt = UncheckedExtrinsic::decode(&mut encoded.as_slice()).ok_or_else(|| ErrorKind::InvalidExtrinsicFormat)?; - if !uxt.is_signed() { - bail!(ErrorKind::IsInherent(uxt)) - } - - let (encoded_size, hash) = (encoded.len(), BlakeTwo256::hash(&encoded)); - if encoded_size > MAX_TRANSACTION_SIZE { - bail!(ErrorKind::TooLarge(encoded_size, MAX_TRANSACTION_SIZE)); - } - - debug!(target: "transaction-pool", "Transaction submitted: {}", ::substrate_primitives::hexdisplay::HexDisplay::from(&encoded)); - let checked = uxt.clone().check(&LocalContext(&self.api))?; - let (sender, index) = checked.signed.expect("function previously bailed unless uxt.is_signed(); qed"); - - - if encoded_size < 1024 { - debug!(target: "transaction-pool", "Transaction verified: {} => {:?}", hash, uxt); - } else { - debug!(target: "transaction-pool", "Transaction verified: {} ({} bytes is too large to display)", hash, encoded_size); - } - - Ok(VerifiedTransaction { - index, - sender, - hash, - encoded_size, - }) - } - - fn ready(&self) -> Self::Ready { - HashMap::default() - } - - fn is_ready(&self, at: &BlockId, known_nonces: &mut Self::Ready, xt: &VerifiedFor) -> Readiness { - let sender = xt.verified.sender().clone(); - trace!(target: "transaction-pool", "Checking readiness of {} (from {})", xt.verified.hash, sender); - - // TODO: find a way to handle index error properly -- will need changes to - // transaction-pool trait. - let api = &self.api; - let next_index = known_nonces.entry(sender) - .or_insert_with(|| api.account_nonce(at, &sender).ok().unwrap_or_else(Bounded::max_value)); - - trace!(target: "transaction-pool", "Next index for sender is {}; xt index is {}", next_index, xt.verified.index); - - let result = match xt.verified.index.cmp(&next_index) { - // TODO: this won't work perfectly since accounts can now be killed, returning the nonce - // to zero. - // We should detect if the index was reset and mark all transactions as `Stale` for cull to work correctly. - // Otherwise those transactions will keep occupying the queue. - // Perhaps we could mark as stale if `index - state_index` > X? - Ordering::Greater => Readiness::Future, - Ordering::Equal => Readiness::Ready, - // TODO [ToDr] Should mark transactions referencing too old blockhash as `Stale` as well. - Ordering::Less => Readiness::Stale, - }; - - // remember to increment `next_index` - *next_index = next_index.saturating_add(1); - - result - } - - fn compare(old: &VerifiedFor, other: &VerifiedFor) -> Ordering { - old.verified.index().cmp(&other.verified.index()) - } - - fn choose(old: &VerifiedFor, new: &VerifiedFor) -> Choice { - if old.verified.index() == new.verified.index() { - return Choice::ReplaceOld; - } - Choice::InsertNew - } - - fn update_scores( - xts: &[transaction_pool::Transaction>], - scores: &mut [Self::Score], - _change: Change<()> - ) { - for i in 0..xts.len() { - // all the same score since there are no fees. - // TODO: prioritize things like misbehavior or fishermen reports - scores[i] = 1; - } - } - - fn should_replace(_old: &VerifiedFor, _new: &VerifiedFor) -> Choice { - // Don't allow new transactions if we are reaching the limit. - Choice::RejectNew - } -} diff --git a/substrate/srml/executive/src/lib.rs b/substrate/srml/executive/src/lib.rs index 0978670350..faebde9b50 100644 --- a/substrate/srml/executive/src/lib.rs +++ b/substrate/srml/executive/src/lib.rs @@ -224,11 +224,11 @@ impl< /// Check a given transaction for validity. This doesn't execute any /// side-effects; it merely checks whether the transaction would panic if it were included or not. - /// + /// /// Changes made to the storage should be discarded. pub fn validate_transaction(uxt: Block::Extrinsic) -> TransactionValidity { let encoded_len = uxt.encode().len(); - + let xt = match uxt.check(&Default::default()) { // Checks out. Carry on. Ok(xt) => xt, @@ -259,8 +259,13 @@ impl< deps.push((sender, expected_index).encode()); expected_index = expected_index + One::one(); } - - TransactionValidity::Valid(encoded_len as TransactionPriority, deps, vec![(sender, *index).encode()], TransactionLongevity::max_value()) + + TransactionValidity::Valid( + /*priority: */encoded_len as TransactionPriority, + /*requires: */deps, + /*provides: */vec![(sender, *index).encode()], + /*longevity: */TransactionLongevity::max_value(), + ) } else { return TransactionValidity::Invalid }