From 1dada4f7a03b49bb3c935f46332a40d9e30e9dfa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Thu, 31 May 2018 22:49:17 +0200 Subject: [PATCH] Extrinsic pool (#182) * Use latest version of txpool. * Initial version of the pool. * Fix abstraction. * Implement watchers and notifications. * Return hash from RPC. * Remove commented code. * Remove client dep. * Fix tests. --- substrate/Cargo.lock | 19 +- substrate/Cargo.toml | 14 +- substrate/demo/cli/Cargo.toml | 1 + substrate/demo/cli/src/lib.rs | 14 +- substrate/polkadot/cli/src/lib.rs | 39 +--- substrate/polkadot/consensus/src/lib.rs | 64 +++--- substrate/polkadot/consensus/src/service.rs | 3 +- substrate/polkadot/service/src/config.rs | 21 +- substrate/polkadot/service/src/lib.rs | 91 +++++---- .../polkadot/transaction-pool/Cargo.toml | 7 +- .../polkadot/transaction-pool/src/error.rs | 61 ++++++ .../polkadot/transaction-pool/src/lib.rs | 184 +++++++----------- substrate/substrate/client/src/client.rs | 5 +- substrate/substrate/extrinsic-pool/Cargo.toml | 12 ++ substrate/substrate/extrinsic-pool/src/api.rs | 64 ++++++ substrate/substrate/extrinsic-pool/src/lib.rs | 37 ++++ .../substrate/extrinsic-pool/src/listener.rs | 89 +++++++++ .../substrate/extrinsic-pool/src/pool.rs | 141 ++++++++++++++ .../substrate/extrinsic-pool/src/watcher.rs | 88 +++++++++ substrate/substrate/network/src/protocol.rs | 37 ++-- substrate/substrate/network/src/service.rs | 12 +- substrate/substrate/network/src/test/mod.rs | 2 + substrate/substrate/rpc/Cargo.toml | 1 + substrate/substrate/rpc/src/author/error.rs | 18 +- substrate/substrate/rpc/src/author/mod.rs | 20 +- substrate/substrate/rpc/src/author/tests.rs | 40 ++-- substrate/substrate/rpc/src/lib.rs | 1 + 27 files changed, 770 insertions(+), 315 deletions(-) create mode 100644 substrate/polkadot/transaction-pool/src/error.rs create mode 100644 substrate/substrate/extrinsic-pool/Cargo.toml create mode 100644 substrate/substrate/extrinsic-pool/src/api.rs create mode 100644 substrate/substrate/extrinsic-pool/src/lib.rs create mode 100644 substrate/substrate/extrinsic-pool/src/listener.rs create mode 100644 substrate/substrate/extrinsic-pool/src/pool.rs create mode 100644 substrate/substrate/extrinsic-pool/src/watcher.rs diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index ddcbc1936e..5b8d05d04e 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -229,6 +229,7 @@ dependencies = [ "substrate-client 0.1.0", "substrate-codec 0.1.0", "substrate-executor 0.1.0", + "substrate-extrinsic-pool 0.1.0", "substrate-primitives 0.1.0", "substrate-rpc 0.1.0", "substrate-rpc-servers 0.1.0", @@ -1429,16 +1430,15 @@ version = "0.1.0" dependencies = [ "ed25519 0.1.0", "error-chain 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)", - "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", "polkadot-api 0.1.0", "polkadot-primitives 0.1.0", "polkadot-runtime 0.1.0", "substrate-client 0.1.0", "substrate-codec 0.1.0", + "substrate-extrinsic-pool 0.1.0", "substrate-primitives 0.1.0", - "substrate-rpc 0.1.0", "substrate-runtime-primitives 0.1.0", - "transaction-pool 1.12.0 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -1942,6 +1942,18 @@ dependencies = [ "wasmi 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "substrate-extrinsic-pool" +version = "0.1.0" +dependencies = [ + "error-chain 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", + "parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", + "substrate-primitives 0.1.0", + "transaction-pool 1.12.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "substrate-keyring" version = "0.1.0" @@ -2025,6 +2037,7 @@ dependencies = [ "parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "substrate-client 0.1.0", "substrate-executor 0.1.0", + "substrate-extrinsic-pool 0.1.0", "substrate-primitives 0.1.0", "substrate-state-machine 0.1.0", "substrate-test-client 0.1.0", diff --git a/substrate/Cargo.toml b/substrate/Cargo.toml index 61f26c7657..d58205e310 100644 --- a/substrate/Cargo.toml +++ b/substrate/Cargo.toml @@ -22,22 +22,22 @@ members = [ "polkadot/parachain", "polkadot/primitives", "polkadot/runtime", + "polkadot/service", "polkadot/statement-table", "polkadot/transaction-pool", - "polkadot/service", - "substrate/bft", "substrate/client", "substrate/client/db", "substrate/codec", "substrate/environmental", "substrate/executor", + "substrate/extrinsic-pool", "substrate/keyring", - "substrate/network", "substrate/misbehavior-check", + "substrate/network", "substrate/primitives", - "substrate/rpc-servers", "substrate/rpc", + "substrate/rpc-servers", "substrate/runtime-io", "substrate/runtime-sandbox", "substrate/runtime-std", @@ -55,10 +55,10 @@ members = [ "substrate/state-machine", "substrate/test-runtime", - "demo/runtime", - "demo/primitives", - "demo/executor", "demo/cli", + "demo/executor", + "demo/primitives", + "demo/runtime", "safe-mix", "subkey", ] diff --git a/substrate/demo/cli/Cargo.toml b/substrate/demo/cli/Cargo.toml index 645429cb31..6f488c2ad8 100644 --- a/substrate/demo/cli/Cargo.toml +++ b/substrate/demo/cli/Cargo.toml @@ -17,6 +17,7 @@ tokio-core = "0.1.12" triehash = "0.1" substrate-client = { path = "../../substrate/client" } substrate-codec = { path = "../../substrate/codec" } +substrate-extrinsic-pool = { path = "../../substrate/extrinsic-pool" } substrate-runtime-io = { path = "../../substrate/runtime-io" } substrate-state-machine = { path = "../../substrate/state-machine" } substrate-executor = { path = "../../substrate/executor" } diff --git a/substrate/demo/cli/src/lib.rs b/substrate/demo/cli/src/lib.rs index d0704438b7..5d7fbfe085 100644 --- a/substrate/demo/cli/src/lib.rs +++ b/substrate/demo/cli/src/lib.rs @@ -31,6 +31,7 @@ extern crate substrate_rpc; extern crate substrate_rpc_servers as rpc; extern crate substrate_runtime_io as runtime_io; extern crate substrate_state_machine as state_machine; +extern crate substrate_extrinsic_pool as extrinsic_pool; extern crate demo_executor; extern crate demo_primitives; extern crate demo_runtime; @@ -53,11 +54,14 @@ use demo_runtime::{GenesisConfig, ConsensusConfig, CouncilConfig, DemocracyConfi SessionConfig, StakingConfig, BuildExternalities}; use futures::{Future, Sink, Stream}; - struct DummyPool; -impl substrate_rpc::author::AuthorApi for DummyPool { - fn submit_extrinsic(&self, _: primitives::block::Extrinsic) -> substrate_rpc::author::error::Result<()> { - Err(substrate_rpc::author::error::ErrorKind::Unimplemented.into()) +impl extrinsic_pool::api::ExtrinsicPool for DummyPool { + type Error = extrinsic_pool::txpool::Error; + + fn submit(&self, _: Vec) + -> Result, Self::Error> + { + Err("unimplemented".into()) } } @@ -156,7 +160,7 @@ pub fn run(args: I) -> error::Result<()> where let _rpc_servers = { let handler = || { let chain = rpc::apis::chain::Chain::new(client.clone(), core.remote()); - rpc::rpc_handler(client.clone(), chain, DummyPool, DummySystem) + rpc::rpc_handler(client.clone(), chain, Arc::new(DummyPool), DummySystem) }; let http_address = "127.0.0.1:9933".parse().unwrap(); let ws_address = "127.0.0.1:9944".parse().unwrap(); diff --git a/substrate/polkadot/cli/src/lib.rs b/substrate/polkadot/cli/src/lib.rs index 90b0655cd4..f734b154cd 100644 --- a/substrate/polkadot/cli/src/lib.rs +++ b/substrate/polkadot/cli/src/lib.rs @@ -61,39 +61,11 @@ mod informant; use std::io; use std::net::SocketAddr; use std::path::{Path, PathBuf}; -use std::sync::Arc; use futures::sync::mpsc; use futures::{Sink, Future, Stream}; use tokio_core::reactor; -use parking_lot::Mutex; use service::ChainSpec; -use primitives::block::Extrinsic; - -struct RpcTransactionPool { - inner: Arc>, - network: Arc, -} - -impl substrate_rpc::author::AuthorApi for RpcTransactionPool { - fn submit_extrinsic(&self, xt: Extrinsic) -> substrate_rpc::author::error::Result<()> { - use primitives::hexdisplay::HexDisplay; - use polkadot_runtime::UncheckedExtrinsic; - use codec::Slicable; - - info!("Extrinsic submitted: {}", HexDisplay::from(&xt.0)); - let decoded = xt.using_encoded(|ref mut s| UncheckedExtrinsic::decode(s)) - .ok_or(substrate_rpc::author::error::ErrorKind::InvalidFormat)?; - - info!("Correctly formatted: {:?}", decoded); - - self.inner.lock().import(decoded) - .map_err(|_| substrate_rpc::author::error::ErrorKind::PoolError)?; - - self.network.trigger_repropagate(); - Ok(()) - } -} struct Configuration(service::Configuration); @@ -238,11 +210,12 @@ fn run_until_exit(mut core: reactor::Core, service: service::Service let handler = || { let chain = rpc::apis::chain::Chain::new(service.client(), core.remote()); - let pool = RpcTransactionPool { - inner: service.transaction_pool(), - network: service.network(), - }; - rpc::rpc_handler(service.client(), chain, pool, Configuration(config.clone())) + rpc::rpc_handler( + service.client(), + chain, + service.transaction_pool(), + Configuration(config.clone()), + ) }; ( start_server(http_address, |address| rpc::start_http(address, handler())), diff --git a/substrate/polkadot/consensus/src/lib.rs b/substrate/polkadot/consensus/src/lib.rs index f9e3b35a26..def896548e 100644 --- a/substrate/polkadot/consensus/src/lib.rs +++ b/substrate/polkadot/consensus/src/lib.rs @@ -78,7 +78,6 @@ use tokio_core::reactor::{Handle, Timeout, Interval}; use futures::prelude::*; use futures::future::{self, Shared}; -use parking_lot::Mutex; use collation::CollationFetch; use dynamic_inclusion::DynamicInclusion; @@ -226,7 +225,7 @@ pub struct ProposerFactory { /// The client instance. pub client: Arc, /// The transaction pool. - pub transaction_pool: Arc>, + pub transaction_pool: Arc, /// The backing network handle. pub network: N, /// Parachain collators. @@ -319,7 +318,7 @@ pub struct Proposer { random_seed: Hash, router: R, table: Arc, - transaction_pool: Arc>, + transaction_pool: Arc, } impl bft::Proposer for Proposer @@ -495,17 +494,16 @@ impl bft::Proposer for Proposer use primitives::bft::{MisbehaviorKind, MisbehaviorReport}; use polkadot_runtime::{Call, Extrinsic, UncheckedExtrinsic, ConsensusCall}; + let local_id = self.local_key.public().0; - let mut pool = self.transaction_pool.lock(); let mut next_index = { let readiness_evaluator = Ready::create(self.parent_id.clone(), &*self.client); - - pool.cull(None, readiness_evaluator.clone()); - let cur_index = pool.pending(readiness_evaluator) + let cur_index = self.transaction_pool.cull_and_get_pending(readiness_evaluator, |pending| pending .filter(|tx| tx.as_ref().as_ref().signed == local_id) .last() .map(|tx| Ok(tx.as_ref().as_ref().index)) - .unwrap_or_else(|| self.client.index(&self.parent_id, local_id)); + .unwrap_or_else(|| self.client.index(&self.parent_id, local_id)) + ); match cur_index { Ok(cur_index) => cur_index + 1, @@ -541,7 +539,7 @@ impl bft::Proposer for Proposer let signature = self.local_key.sign(&extrinsic.encode()).into(); let uxt = UncheckedExtrinsic { extrinsic, signature }; - pool.import(uxt).expect("locally signed extrinsic is valid; qed"); + self.transaction_pool.import_unchecked_extrinsic(uxt).expect("locally signed extrinsic is valid; qed"); } } } @@ -609,7 +607,7 @@ pub struct CreateProposal { parent_number: BlockNumber, parent_id: C::CheckedBlockId, client: Arc, - transaction_pool: Arc>, + transaction_pool: Arc, collation: CollationFetch, router: R, table: Arc, @@ -631,37 +629,33 @@ impl CreateProposal candidates, )?; - let readiness_evaluator = Ready::create(self.parent_id.clone(), &*self.client); - { - let mut pool = self.transaction_pool.lock(); + let readiness_evaluator = Ready::create(self.parent_id.clone(), &*self.client); let mut unqueue_invalid = Vec::new(); - let mut pending_size = 0; - - pool.cull(None, readiness_evaluator.clone()); - for pending in pool.pending(readiness_evaluator.clone()) { - // skip and cull transactions which are too large. - if pending.encoded_size() > MAX_TRANSACTIONS_SIZE { - unqueue_invalid.push(pending.hash().clone()); - continue - } - - if pending_size + pending.encoded_size() >= MAX_TRANSACTIONS_SIZE { break } - - match block_builder.push_extrinsic(pending.as_transaction().clone()) { - Ok(()) => { - pending_size += pending.encoded_size(); - } - Err(e) => { - trace!(target: "transaction-pool", "Invalid transaction: {}", e); + self.transaction_pool.cull_and_get_pending(readiness_evaluator, |pending_iterator| { + let mut pending_size = 0; + for pending in pending_iterator { + // skip and cull transactions which are too large. + if pending.encoded_size() > MAX_TRANSACTIONS_SIZE { unqueue_invalid.push(pending.hash().clone()); + continue + } + + if pending_size + pending.encoded_size() >= MAX_TRANSACTIONS_SIZE { break } + + match block_builder.push_extrinsic(pending.as_transaction().clone()) { + Ok(()) => { + pending_size += pending.encoded_size(); + } + Err(e) => { + trace!(target: "transaction-pool", "Invalid transaction: {}", e); + unqueue_invalid.push(pending.hash().clone()); + } } } - } + }); - for tx_hash in unqueue_invalid { - pool.remove(&tx_hash, false); - } + self.transaction_pool.remove(&unqueue_invalid, false); } let polkadot_block = block_builder.bake(); diff --git a/substrate/polkadot/consensus/src/service.rs b/substrate/polkadot/consensus/src/service.rs index 2177e87412..dedab92135 100644 --- a/substrate/polkadot/consensus/src/service.rs +++ b/substrate/polkadot/consensus/src/service.rs @@ -28,7 +28,6 @@ use client::{BlockchainEvents, ChainHead}; use ed25519; use futures::prelude::*; use futures::{future, Canceled}; -use parking_lot::Mutex; use polkadot_api::LocalPolkadotApi; use polkadot_primitives::AccountId; use polkadot_primitives::parachain::{Id as ParaId, BlockData, Extrinsic, CandidateReceipt}; @@ -237,7 +236,7 @@ impl Service { client: Arc, api: Arc, network: Arc, - transaction_pool: Arc>, + transaction_pool: Arc, parachain_empty_duration: Duration, key: ed25519::Pair, ) -> Service diff --git a/substrate/polkadot/service/src/config.rs b/substrate/polkadot/service/src/config.rs index c6edbe141c..e5c27ef776 100644 --- a/substrate/polkadot/service/src/config.rs +++ b/substrate/polkadot/service/src/config.rs @@ -22,7 +22,7 @@ pub use network::NetworkConfiguration; /// The chain specification (this should eventually be replaced by a more general JSON-based chain /// specification). -#[derive(Clone)] +#[derive(Clone, Copy)] pub enum ChainSpec { /// Whatever the current runtime is, with just Alice as an auth. Development, @@ -33,6 +33,7 @@ pub enum ChainSpec { } /// Service configuration. +#[derive(Clone)] pub struct Configuration { /// Node roles. pub roles: Role, @@ -63,21 +64,3 @@ impl Default for Configuration { } } } - -impl Clone for Configuration { - fn clone(&self) -> Configuration { - Configuration { - roles: self.roles.clone(), - transaction_pool: transaction_pool::Options { - max_count: self.transaction_pool.max_count.clone(), - max_mem_usage: self.transaction_pool.max_mem_usage.clone(), - max_per_sender: self.transaction_pool.max_per_sender.clone(), - }, - network: self.network.clone(), - keystore_path: self.keystore_path.clone(), - database_path: self.database_path.clone(), - keys: self.keys.clone(), - chain_spec: self.chain_spec.clone(), - } - } -} diff --git a/substrate/polkadot/service/src/lib.rs b/substrate/polkadot/service/src/lib.rs index 436479d0a5..13dae55c4e 100644 --- a/substrate/polkadot/service/src/lib.rs +++ b/substrate/polkadot/service/src/lib.rs @@ -50,14 +50,14 @@ extern crate hex_literal; mod error; mod config; +use std::collections::HashMap; use std::sync::Arc; use std::thread; use futures::prelude::*; -use parking_lot::Mutex; use tokio_core::reactor::Core; use codec::Slicable; -use primitives::block::{Id as BlockId, Extrinsic, ExtrinsicHash, HeaderHash, Header}; -use primitives::{AuthorityId, hashing}; +use primitives::block::{Id as BlockId, ExtrinsicHash, HeaderHash, Header}; +use primitives::{AuthorityId}; use transaction_pool::TransactionPool; use substrate_executor::NativeExecutor; use polkadot_executor::Executor as LocalDispatch; @@ -80,13 +80,13 @@ pub struct Service { thread: Option>, client: Arc>, network: Arc, - transaction_pool: Arc>, + transaction_pool: Arc, signal: Option, _consensus: Option, } struct TransactionPoolAdapter where A: Send + Sync, E: Send + Sync { - pool: Arc>, + pool: Arc, client: Arc>, api: Arc, } @@ -108,19 +108,22 @@ impl network::TransactionPool for TransactionPoolAdapter }; let id = self.api.check_id(BlockId::Hash(best_block)).expect("Best block is always valid; qed."); - let mut pool = self.pool.lock(); - pool.cull(None, transaction_pool::Ready::create(id.clone(), &*self.api)); - pool.pending(transaction_pool::Ready::create(id, &*self.api)).map(|t| { - let hash = ::primitives::Hash::from(&t.hash()[..]); - let tx = codec::Slicable::encode(t.as_transaction()); - (hash, tx) - }).collect() + let ready = transaction_pool::Ready::create(id, &*self.api); + + self.pool.cull_and_get_pending(ready, |pending| pending + .map(|t| { + let hash = ::primitives::Hash::from(&t.hash()[..]); + let tx = codec::Slicable::encode(t.as_transaction()); + (hash, tx) + }) + .collect() + ) } fn import(&self, transaction: &[u8]) -> Option { - if let Some(tx) = codec::Slicable::decode(&mut &transaction[..]) { - match self.pool.lock().import(tx) { - Ok(t) => Some(t.hash()[..].into()), + if let Some(uxt) = codec::Slicable::decode(&mut &transaction[..]) { + match self.pool.import_unchecked_extrinsic(uxt) { + Ok(xt) => Some(*xt.hash()), Err(e) => match *e.kind() { transaction_pool::ErrorKind::AlreadyImported(hash) => Some(hash[..].into()), _ => { @@ -134,6 +137,10 @@ impl network::TransactionPool for TransactionPoolAdapter None } } + + fn on_broadcasted(&self, propagations: HashMap>) { + self.pool.on_broadcasted(propagations) + } } pub struct ChainConfig { @@ -341,7 +348,7 @@ impl Service C: Fn( Arc>, Arc, - Arc>, + Arc, &Keystore ) -> Result, error::Error>, A: PolkadotApi + Send + Sync + 'static, @@ -383,7 +390,7 @@ impl Service let api = api_creator(client.clone()); let best_header = client.best_block_header()?; info!("Starting Polkadot. Best block is #{}", best_header.number); - let transaction_pool = Arc::new(Mutex::new(TransactionPool::new(config.transaction_pool))); + let transaction_pool = Arc::new(TransactionPool::new(config.transaction_pool)); let transaction_pool_adapter = Arc::new(TransactionPoolAdapter { pool: transaction_pool.clone(), client: client.clone(), @@ -414,14 +421,28 @@ impl Service thread_barrier.wait(); let mut core = Core::new().expect("tokio::Core could not be created"); - let events = client.import_notification_stream().for_each(move |notification| { - network.on_block_imported(notification.hash, ¬ification.header); - prune_imported(&*client, &*txpool, notification.hash); - Ok(()) - }); + // block notifications + let network1 = network.clone(); + let txpool1 = txpool.clone(); + let events = client.import_notification_stream() + .for_each(move |notification| { + network1.on_block_imported(notification.hash, ¬ification.header); + prune_imported(&*api, &*txpool1, notification.hash); + Ok(()) + }); core.handle().spawn(events); + + // transaction notifications + let events = txpool.import_notification_stream() + // TODO [ToDr] Consider throttling? + .for_each(move |_| { + network.trigger_repropagate(); + Ok(()) + }); + core.handle().spawn(events); + if let Err(e) = core.run(exit) { debug!("Polkadot service event loop shutdown with {:?}", e); } @@ -457,30 +478,22 @@ impl Service } /// Get shared transaction pool instance. - pub fn transaction_pool(&self) -> Arc> { + pub fn transaction_pool(&self) -> Arc { self.transaction_pool.clone() } } -fn prune_transactions(pool: &mut TransactionPool, extrinsics: &[Extrinsic]) { - for extrinsic in extrinsics { - let hash: _ = hashing::blake2_256(&extrinsic.encode()).into(); - pool.remove(&hash, true); - } -} - /// Produce a task which prunes any finalized transactions from the pool. -pub fn prune_imported(client: &Client, pool: &Mutex, hash: HeaderHash) +pub fn prune_imported(api: &A, pool: &TransactionPool, hash: HeaderHash) where - B: Backend + Send + Sync, - E: CallExecutor + Send + Sync, - client::error::Error: From<<::State as state_machine::backend::Backend>::Error> + A: PolkadotApi, { - let id = BlockId::Hash(hash); - match client.body(&id) { - Ok(Some(body)) => prune_transactions(&mut *pool.lock(), &body[..]), - Ok(None) => warn!("Missing imported block {:?}", hash), - Err(e) => warn!("Failed to fetch block: {:?}", e), + match api.check_id(BlockId::Hash(hash)) { + Ok(id) => { + let ready = transaction_pool::Ready::create(id, api); + pool.cull(None, ready); + }, + Err(e) => warn!("Failed to check block id: {:?}", e), } } diff --git a/substrate/polkadot/transaction-pool/Cargo.toml b/substrate/polkadot/transaction-pool/Cargo.toml index d8cf5ebd80..140bea88e6 100644 --- a/substrate/polkadot/transaction-pool/Cargo.toml +++ b/substrate/polkadot/transaction-pool/Cargo.toml @@ -4,15 +4,14 @@ version = "0.1.0" authors = ["Parity Technologies "] [dependencies] -log = "0.4.0" -transaction-pool = "1.12.0" +log = "0.3.0" error-chain = "0.11" polkadot-api = { path = "../api" } polkadot-primitives = { path = "../primitives" } polkadot-runtime = { path = "../runtime" } substrate-client = { path = "../../substrate/client" } -substrate-rpc = { path = "../../substrate/rpc" } +substrate-codec = { path = "../../substrate/codec" } +substrate-extrinsic-pool = { path = "../../substrate/extrinsic-pool" } substrate-primitives = { path = "../../substrate/primitives" } substrate-runtime-primitives = { path = "../../substrate/runtime/primitives" } -substrate-codec = { path = "../../substrate/codec" } ed25519 = { path = "../../substrate/ed25519" } diff --git a/substrate/polkadot/transaction-pool/src/error.rs b/substrate/polkadot/transaction-pool/src/error.rs new file mode 100644 index 0000000000..18b79e59fe --- /dev/null +++ b/substrate/polkadot/transaction-pool/src/error.rs @@ -0,0 +1,61 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +use extrinsic_pool::{self, txpool}; +use primitives::Hash; +use runtime::UncheckedExtrinsic; + +error_chain! { + links { + Pool(txpool::Error, txpool::ErrorKind); + } + errors { + /// Unexpected extrinsic format submitted + InvalidExtrinsicFormat { + description("Invalid extrinsic format."), + display("Invalid extrinsic format."), + } + /// Attempted to queue an inherent transaction. + IsInherent(xt: UncheckedExtrinsic) { + description("Inherent transactions cannot be queued."), + display("Inehrent transactions cannot be queued."), + } + /// Attempted to queue a transaction with bad signature. + BadSignature(xt: UncheckedExtrinsic) { + description("Transaction had bad signature."), + display("Transaction had bad signature."), + } + /// Attempted to queue a transaction that is already in the pool. + AlreadyImported(hash: Hash) { + description("Transaction is already in the pool."), + display("Transaction {:?} is already in the pool.", hash), + } + /// Import error. + Import(err: Box<::std::error::Error + Send>) { + description("Error importing transaction"), + display("Error importing transaction: {}", err.description()), + } + } +} + +impl extrinsic_pool::api::Error for Error { + fn into_pool_error(self) -> ::std::result::Result { + match self { + Error(ErrorKind::Pool(e), c) => Ok(txpool::Error(e, c)), + e => Err(e), + } + } +} diff --git a/substrate/polkadot/transaction-pool/src/lib.rs b/substrate/polkadot/transaction-pool/src/lib.rs index faa3d09a0b..015777a863 100644 --- a/substrate/polkadot/transaction-pool/src/lib.rs +++ b/substrate/polkadot/transaction-pool/src/lib.rs @@ -1,4 +1,4 @@ -// Copyright 2017 Parity Technologies (UK) Ltd. +// Copyright 2018 Parity Technologies (UK) Ltd. // This file is part of Polkadot. // Polkadot is free software: you can redistribute it and/or modify @@ -16,12 +16,12 @@ extern crate ed25519; extern crate substrate_codec as codec; +extern crate substrate_extrinsic_pool as extrinsic_pool; extern crate substrate_primitives as substrate_primitives; -extern crate substrate_runtime_primitives as substrate_runtime_primitives; +extern crate substrate_runtime_primitives; extern crate polkadot_runtime as runtime; extern crate polkadot_primitives as primitives; extern crate polkadot_api; -extern crate transaction_pool; #[macro_use] extern crate error_chain; @@ -29,19 +29,27 @@ extern crate error_chain; #[macro_use] extern crate log; -use std::collections::HashMap; -use std::cmp::Ordering; -use std::sync::Arc; +mod error; +use std::{ + cmp::Ordering, + collections::HashMap, + ops::Deref, + sync::Arc, +}; + +use codec::Slicable; +use extrinsic_pool::{Pool, txpool::{self, Readiness, scoring::{Change, Choice}}}; use polkadot_api::PolkadotApi; -use primitives::{AccountId, Timestamp, Hash}; use primitives::parachain::CandidateReceipt; +use primitives::{AccountId, Timestamp, Hash}; use runtime::{Block, UncheckedExtrinsic, TimestampCall, ParachainsCall, Call}; +use substrate_primitives::block::{Extrinsic, ExtrinsicHash}; +use substrate_primitives::hexdisplay::HexDisplay; use substrate_runtime_primitives::traits::{Bounded, Checkable}; -use transaction_pool::{Transaction, Pool, Readiness}; -use transaction_pool::scoring::{Change, Choice}; -pub use transaction_pool::{Options, Status, LightStatus, NoopListener, VerifiedTransaction as VerifiedTransactionOps}; +pub use extrinsic_pool::txpool::{Options, Status, LightStatus, VerifiedTransaction as VerifiedTransactionOps}; +pub use error::{Error, ErrorKind, Result}; /// Useful functions for working with Polkadot blocks. pub struct PolkadotBlock { @@ -127,48 +135,17 @@ impl From for Block { } } -/// Iterator over pending transactions. -pub type PendingIterator<'a, C> = - transaction_pool::PendingIterator<'a, VerifiedTransaction, Ready<'a, C>, Scoring, NoopListener>; - -error_chain! { - errors { - /// Attempted to queue an inherent transaction. - IsInherent(xt: UncheckedExtrinsic) { - description("Inherent transactions cannot be queued."), - display("Inehrent transactions cannot be queued."), - } - /// Attempted to queue a transaction with bad signature. - BadSignature(xt: UncheckedExtrinsic) { - description("Transaction had bad signature."), - display("Transaction had bad signature."), - } - /// Attempted to queue a transaction that is already in the pool. - AlreadyImported(hash: Hash) { - description("Transaction is already in the pool."), - display("Transaction {:?} is already in the pool.", hash), - } - /// Import error. - Import(err: Box<::std::error::Error + Send>) { - description("Error importing transaction"), - display("Error importing transaction: {}", err.description()), - } - } -} - /// A verified transaction which should be includable and non-inherent. #[derive(Debug, Clone)] pub struct VerifiedTransaction { inner: ::Checked, - hash: Hash, - address: AccountId, - insertion_id: u64, + hash: ExtrinsicHash, encoded_size: usize, } impl VerifiedTransaction { /// Attempt to verify a transaction. - fn create(xt: UncheckedExtrinsic, insertion_id: u64) -> Result { + fn create(xt: UncheckedExtrinsic) -> Result { if !xt.is_signed() { bail!(ErrorKind::IsInherent(xt)) } @@ -176,15 +153,11 @@ impl VerifiedTransaction { let message = codec::Slicable::encode(&xt); match xt.check() { Ok(xt) => { - // TODO: make transaction-pool use generic types. let hash = substrate_primitives::hashing::blake2_256(&message); - let address = xt.signed; Ok(VerifiedTransaction { inner: xt, hash: hash.into(), encoded_size: message.len(), - address, - insertion_id, }) } Err(xt) => Err(ErrorKind::BadSignature(xt).into()), @@ -208,7 +181,7 @@ impl VerifiedTransaction { /// Get the account ID of the sender of this transaction. pub fn sender(&self) -> &AccountId { - &self.address + &self.inner.signed } /// Get encoded size of the transaction. @@ -223,16 +196,16 @@ impl AsRef< ::Checked > for VerifiedTransaction } } -impl transaction_pool::VerifiedTransaction for VerifiedTransaction { +impl txpool::VerifiedTransaction for VerifiedTransaction { type Hash = Hash; type Sender = AccountId; - fn hash(&self) -> &Hash { + fn hash(&self) -> &Self::Hash { &self.hash } - fn sender(&self) -> &AccountId { - &self.address + fn sender(&self) -> &Self::Sender { + &self.inner.signed } fn mem_usage(&self) -> usize { @@ -244,7 +217,7 @@ impl transaction_pool::VerifiedTransaction for VerifiedTransaction { #[derive(Debug)] pub struct Scoring; -impl transaction_pool::Scoring for Scoring { +impl txpool::Scoring for Scoring { type Score = u64; type Event = (); @@ -258,7 +231,7 @@ impl transaction_pool::Scoring for Scoring { fn update_scores( &self, - xts: &[Transaction], + xts: &[txpool::Transaction], scores: &mut [Self::Score], _change: Change<()> ) { @@ -276,7 +249,7 @@ impl transaction_pool::Scoring for Scoring { /// Readiness evaluator for polkadot transactions. pub struct Ready<'a, T: 'a + PolkadotApi> { at_block: T::CheckedBlockId, - api_handle: &'a T, + api: &'a T, known_indices: HashMap, } @@ -284,7 +257,7 @@ impl<'a, T: 'a + PolkadotApi> Clone for Ready<'a, T> { fn clone(&self) -> Self { Ready { at_block: self.at_block.clone(), - api_handle: self.api_handle, + api: self.api, known_indices: self.known_indices.clone(), } } @@ -293,106 +266,85 @@ impl<'a, T: 'a + PolkadotApi> Clone for Ready<'a, T> { impl<'a, T: 'a + PolkadotApi> Ready<'a, T> { /// Create a new readiness evaluator at the given block. Requires that /// the ID has already been checked for local corresponding and available state. - pub fn create(at: T::CheckedBlockId, client: &'a T) -> Self { + pub fn create(at: T::CheckedBlockId, api: &'a T) -> Self { Ready { at_block: at, - api_handle: client, + api, known_indices: HashMap::new(), } } } -impl<'a, T: 'a + PolkadotApi> transaction_pool::Ready for Ready<'a, T> { +impl<'a, T: 'a + PolkadotApi> txpool::Ready for Ready<'a, T> { fn is_ready(&mut self, xt: &VerifiedTransaction) -> Readiness { let sender = xt.inner.signed; trace!(target: "transaction-pool", "Checking readiness of {} (from {})", xt.hash, Hash::from(sender)); // TODO: find a way to handle index error properly -- will need changes to // transaction-pool trait. - let (api_handle, at_block) = (&self.api_handle, &self.at_block); + let (api, at_block) = (&self.api, &self.at_block); let next_index = self.known_indices.entry(sender) - .or_insert_with(|| api_handle.index(at_block, sender).ok().unwrap_or_else(Bounded::max_value)); + .or_insert_with(|| api.index(at_block, sender).ok().unwrap_or_else(Bounded::max_value)); trace!(target: "transaction-pool", "Next index for sender is {}; xt index is {}", next_index, xt.inner.index); - match xt.inner.index.cmp(&next_index) { + let result = match xt.inner.index.cmp(&next_index) { Ordering::Greater => Readiness::Future, Ordering::Equal => Readiness::Ready, - Ordering::Less => Readiness::Stale, // TODO: this is not "stalled" but rather stale and can be discarded. - } + Ordering::Less => Readiness::Stale, + }; + + // remember to increment `next_index` + *next_index = next_index.saturating_add(1); + + result + } +} + +pub struct Verifier; + +impl txpool::Verifier for Verifier { + type VerifiedTransaction = VerifiedTransaction; + type Error = Error; + + fn verify_transaction(&self, xt: Extrinsic) -> Result { + info!("Extrinsic submitted: {}", HexDisplay::from(&xt.0)); + let uxt = xt.using_encoded(|ref mut s| UncheckedExtrinsic::decode(s)) + .ok_or_else(|| ErrorKind::InvalidExtrinsicFormat)?; + info!("Correctly formatted: {:?}", uxt); + VerifiedTransaction::create(uxt) } } /// The polkadot transaction pool. /// -/// Wraps a `transaction-pool::Pool`. +/// Wraps a `extrinsic_pool::Pool`. pub struct TransactionPool { - inner: transaction_pool::Pool, - insertion_index: u64, // TODO: use AtomicU64 when it stabilizes + inner: Pool, } impl TransactionPool { /// Create a new transaction pool. pub fn new(options: Options) -> Self { TransactionPool { - inner: Pool::new(NoopListener, Scoring, options), - insertion_index: 0, + inner: Pool::new(options, Verifier, Scoring), } } - /// Verify and import a transaction into the pool. - pub fn import(&mut self, xt: UncheckedExtrinsic) -> Result> { - let insertion_index = self.insertion_index; - self.insertion_index += 1; - - let verified = VerifiedTransaction::create(xt, insertion_index)?; - - info!("Extrinsic verified {:?}. Importing...", verified); - - // TODO: just use a foreign link when the error type is made public. - let hash = verified.hash.clone(); - self.inner.import(verified) - .map_err(|e| - match e { - // TODO: make error types public in transaction_pool. For now just treat all errors as AlreadyImported - _ => ErrorKind::AlreadyImported(hash), - // transaction_pool::error::AlreadyImported(h) => ErrorKind::AlreadyImported(h), - // e => ErrorKind::Import(Box::new(e)), - }) - .map_err(Into::into) + pub fn import_unchecked_extrinsic(&self, uxt: UncheckedExtrinsic) -> Result> { + Ok(self.inner.import(VerifiedTransaction::create(uxt)?)?) } +} - /// Clear the pool. - pub fn clear(&mut self) { - self.inner.clear(); - } +impl Deref for TransactionPool { + type Target = Pool; - /// Remove from the pool. - pub fn remove(&mut self, hash: &Hash, is_valid: bool) -> Option> { - self.inner.remove(hash, is_valid) - } - - /// Cull transactions from the queue. - pub fn cull(&mut self, senders: Option<&[AccountId]>, ready: Ready) -> usize { - self.inner.cull(senders, ready) - } - - /// Get an iterator of pending transactions. - pub fn pending<'a, T: 'a + PolkadotApi>(&'a self, ready: Ready<'a, T>) -> PendingIterator<'a, T> { - self.inner.pending(ready) - } - - /// Get the full status of the queue (including readiness) - pub fn status(&self, ready: Ready) -> Status { - self.inner.status(ready) - } - - /// Returns light status of the pool. - pub fn light_status(&self) -> LightStatus { - self.inner.light_status() + fn deref(&self) -> &Self::Target { + &self.inner } } #[cfg(test)] mod tests { } + diff --git a/substrate/substrate/client/src/client.rs b/substrate/substrate/client/src/client.rs index af07213bb0..29b7b88f5f 100644 --- a/substrate/substrate/client/src/client.rs +++ b/substrate/substrate/client/src/client.rs @@ -231,7 +231,7 @@ impl Client where /// No changes are made. pub fn execution_proof(&self, id: &BlockId, method: &str, call_data: &[u8]) -> error::Result<(Vec, Vec>)> { use call_executor::state_to_execution_proof; - + let result = self.executor.call(id, method, call_data); let result = result?.return_data; let proof = self.backend.state_at(*id).map(|state| state_to_execution_proof(&state))?; @@ -344,7 +344,8 @@ impl Client where header: header, is_new_best: is_new_best, }; - self.import_notification_sinks.lock().retain(|sink| !sink.unbounded_send(notification.clone()).is_err()); + self.import_notification_sinks.lock() + .retain(|sink| sink.unbounded_send(notification.clone()).is_ok()); } Ok(ImportResult::Queued) } diff --git a/substrate/substrate/extrinsic-pool/Cargo.toml b/substrate/substrate/extrinsic-pool/Cargo.toml new file mode 100644 index 0000000000..aecd25ba8d --- /dev/null +++ b/substrate/substrate/extrinsic-pool/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "substrate-extrinsic-pool" +version = "0.1.0" +authors = ["Parity Technologies "] + +[dependencies] +error-chain = "0.11" +futures = "0.1" +log = "0.3" +parking_lot = "0.4" +substrate-primitives = { path = "../primitives" } +transaction-pool = "1.12" diff --git a/substrate/substrate/extrinsic-pool/src/api.rs b/substrate/substrate/extrinsic-pool/src/api.rs new file mode 100644 index 0000000000..a7841e3ef6 --- /dev/null +++ b/substrate/substrate/extrinsic-pool/src/api.rs @@ -0,0 +1,64 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! External API for extrinsic pool. + +use std::ops::Deref; +use txpool::{self, VerifiedTransaction}; +use primitives::{ + Hash, + block::{Extrinsic, ExtrinsicHash}, +}; + +/// Extrinsic pool error. +pub trait Error: ::std::error::Error + Send + Sized { + /// Try to extract original `txpool::Error` + /// + /// This implementation is optional and used only to + /// provide more descriptive error messages for end users + /// of RPC API. + fn into_pool_error(self) -> Result { Err(self) } +} + +impl Error for txpool::Error { + fn into_pool_error(self) -> Result { Ok(self) } +} + +/// Extrinsic pool. +pub trait ExtrinsicPool: Send + Sync + 'static { + /// Error type + type Error: Error; + + /// Submit a collection of extrinsics to the pool. + fn submit(&self, xt: Vec) -> Result, Self::Error>; +} + +// Blanket implementation for anything that `Derefs` to the pool. +impl ExtrinsicPool for T where + T: Deref> + Send + Sync + 'static, + V: txpool::Verifier, + S: txpool::Scoring, + V::VerifiedTransaction: txpool::VerifiedTransaction, + E: From, + E: From, + E: Error, +{ + type Error = E; + + fn submit(&self, xt: Vec) -> Result, Self::Error> { + self.deref().submit(xt).map(|result| result.into_iter().map(|xt| *xt.hash()).collect()) + } +} diff --git a/substrate/substrate/extrinsic-pool/src/lib.rs b/substrate/substrate/extrinsic-pool/src/lib.rs new file mode 100644 index 0000000000..35ed5e3d6e --- /dev/null +++ b/substrate/substrate/extrinsic-pool/src/lib.rs @@ -0,0 +1,37 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +#![warn(missing_docs)] + +//! Generic extrinsic pool. + +extern crate futures; +extern crate parking_lot; +extern crate substrate_primitives as primitives; + +#[macro_use] +extern crate log; + +pub extern crate transaction_pool as txpool; + +pub mod api; + +mod listener; +mod pool; +mod watcher; + +pub use self::pool::Pool; +pub use self::watcher::Watcher; diff --git a/substrate/substrate/extrinsic-pool/src/listener.rs b/substrate/substrate/extrinsic-pool/src/listener.rs new file mode 100644 index 0000000000..948ad205dd --- /dev/null +++ b/substrate/substrate/extrinsic-pool/src/listener.rs @@ -0,0 +1,89 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +use std::{ + sync::Arc, + collections::HashMap, +}; +use primitives::Hash; +use txpool; + +use watcher; + +#[derive(Default)] +pub struct Listener { + watchers: HashMap +} + +impl Listener { + pub fn create_watcher>(&mut self, xt: Arc) -> watcher::Watcher { + let sender = self.watchers.entry(*xt.hash()).or_insert_with(watcher::Sender::default); + sender.new_watcher() + } + + pub fn broadcasted(&mut self, hash: &Hash, peers: Vec) { + self.fire(hash, |watcher| watcher.broadcast(peers)); + } + + fn fire(&mut self, hash: &Hash, fun: F) { + let clean = if let Some(h) = self.watchers.get_mut(hash) { + fun(h); + h.is_done() + } else { + false + }; + + if clean { + self.watchers.remove(hash); + } + } +} + +impl> txpool::Listener for Listener { + fn added(&mut self, tx: &Arc, old: Option<&Arc>) { + if let Some(old) = old { + let hash = tx.hash(); + self.fire(old.hash(), |watcher| watcher.usurped(*hash)); + } + } + + fn dropped(&mut self, tx: &Arc, by: Option<&T>) { + self.fire(tx.hash(), |watcher| match by { + Some(t) => watcher.usurped(*t.hash()), + None => watcher.dropped(), + }) + } + + fn rejected(&mut self, tx: &Arc, reason: &txpool::ErrorKind) { + warn!("Extrinsic rejected ({}): {:?}", reason, tx); + } + + fn invalid(&mut self, tx: &Arc) { + warn!("Extrinsic invalid: {:?}", tx); + } + + fn canceled(&mut self, tx: &Arc) { + warn!("Extrinsic canceled: {:?}", tx); + } + + fn mined(&mut self, tx: &Arc) { + // TODO [ToDr] latest block number? + let header_hash = 1.into(); + self.fire(tx.hash(), |watcher| watcher.finalised(header_hash)) + } +} + + diff --git a/substrate/substrate/extrinsic-pool/src/pool.rs b/substrate/substrate/extrinsic-pool/src/pool.rs new file mode 100644 index 0000000000..433a4dc2b4 --- /dev/null +++ b/substrate/substrate/extrinsic-pool/src/pool.rs @@ -0,0 +1,141 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +use std::{ + collections::HashMap, + marker::PhantomData, + sync::{Arc, Weak}, +}; + +use futures::sync::mpsc; +use parking_lot::{RwLock, Mutex}; +use txpool; +use primitives::{Hash, block::Extrinsic}; + +use listener::Listener; +use watcher::Watcher; + +/// Extrinsics pool. +pub struct Pool where + V: txpool::Verifier, + S: txpool::Scoring, +{ + _error: Mutex>, + pool: RwLock>, + verifier: V, + import_notification_sinks: Mutex>>>, +} + +impl Pool where + V: txpool::Verifier, + S: txpool::Scoring, + V::VerifiedTransaction: txpool::VerifiedTransaction, + E: From, + E: From, +{ + /// Create a new transaction pool. + pub fn new(options: txpool::Options, verifier: V, scoring: S) -> Self { + Pool { + _error: Default::default(), + pool: RwLock::new(txpool::Pool::new(Listener::default(), scoring, options)), + verifier, + import_notification_sinks: Default::default(), + } + } + + /// Imports a pre-verified extrinsic to the pool. + pub fn import(&self, xt: V::VerifiedTransaction) -> Result, E> { + let result = self.pool.write().import(xt)?; + + let weak = Arc::downgrade(&result); + self.import_notification_sinks.lock() + .retain(|sink| sink.unbounded_send(weak.clone()).is_ok()); + + Ok(result) + } + + /// Return an event stream of transactions imported to the pool. + pub fn import_notification_stream(&self) -> mpsc::UnboundedReceiver> { + let (sink, stream) = mpsc::unbounded(); + self.import_notification_sinks.lock().push(sink); + stream + } + + /// Invoked when extrinsics are broadcasted. + pub fn on_broadcasted(&self, propagated: HashMap>) { + for (hash, peers) in propagated.into_iter() { + self.pool.write().listener_mut().broadcasted(&hash, peers); + } + } + + /// Imports a bunch of extrinsics to the pool + pub fn submit(&self, xts: Vec) -> Result>, E> { + xts + .into_iter() + .map(|xt| self.verifier.verify_transaction(xt)) + .map(|xt| { + Ok(self.pool.write().import(xt?)?) + }) + .collect() + } + + /// Import a single extrinsic and starts to watch their progress in the pool. + pub fn submit_and_watch(&self, xt: Extrinsic) -> Result { + let xt = self.submit(vec![xt])?.pop().expect("One extrinsic passed; one result returned; qed"); + Ok(self.pool.write().listener_mut().create_watcher(xt)) + } + + /// Remove from the pool. + pub fn remove(&self, hashes: &[Hash], is_valid: bool) -> Vec>> { + let mut pool = self.pool.write(); + let mut results = Vec::with_capacity(hashes.len()); + for hash in hashes { + results.push(pool.remove(hash, is_valid)); + } + results + } + + /// Cull transactions from the queue. + pub fn cull(&self, senders: Option<&[::Sender]>, ready: R) -> usize where + R: txpool::Ready, + { + self.pool.write().cull(senders, ready) + } + + /// Cull transactions from the queue and then compute the pending set. + pub fn cull_and_get_pending(&self, ready: R, f: F) -> T where + R: txpool::Ready + Clone, + F: FnOnce(txpool::PendingIterator) -> T, + { + let mut pool = self.pool.write(); + pool.cull(None, ready.clone()); + f(pool.pending(ready)) + } + + /// Get the full status of the queue (including readiness) + pub fn status>(&self, ready: R) -> txpool::Status { + self.pool.read().status(ready) + } + + /// Returns light status of the pool. + pub fn light_status(&self) -> txpool::LightStatus { + self.pool.read().light_status() + } +} diff --git a/substrate/substrate/extrinsic-pool/src/watcher.rs b/substrate/substrate/extrinsic-pool/src/watcher.rs new file mode 100644 index 0000000000..3b3dc4cc53 --- /dev/null +++ b/substrate/substrate/extrinsic-pool/src/watcher.rs @@ -0,0 +1,88 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +use futures::sync::mpsc; +use primitives::{ + block::{HeaderHash, ExtrinsicHash} +}; + +/// Possible extrinsic status events +#[derive(Debug, Clone)] +pub enum Status { + /// Extrinsic has been finalised in block with given hash. + Finalised(HeaderHash), + /// Some state change (perhaps another extrinsic was included) rendered this extrinsic invalid. + Usurped(ExtrinsicHash), + /// The extrinsic has been broadcast to the given peers. + Broadcast(Vec), + /// Extrinsic has been dropped from the pool because of the limit. + Dropped, +} + +/// Extrinsic watcher. +/// +/// Represents a stream of status updates for particular extrinsic. +#[derive(Debug)] +pub struct Watcher { + receiver: mpsc::UnboundedReceiver, +} + +#[derive(Debug, Default)] +pub(crate) struct Sender { + receivers: Vec>, + finalised: bool, +} + +impl Sender { + /// Add a new watcher to this sender object. + pub fn new_watcher(&mut self) -> Watcher { + let (tx, receiver) = mpsc::unbounded(); + self.receivers.push(tx); + Watcher { + receiver, + } + } + + /// Some state change (perhaps another extrinsic was included) rendered this extrinsic invalid. + pub fn usurped(&mut self, hash: ExtrinsicHash) { + self.send(Status::Usurped(hash)) + } + + /// Extrinsic has been finalised in block with given hash. + pub fn finalised(&mut self, hash: HeaderHash) { + self.send(Status::Finalised(hash)); + self.finalised = true; + } + + /// Transaction has been dropped from the pool because of the limit. + pub fn dropped(&mut self) { + self.send(Status::Dropped); + } + + /// The extrinsic has been broadcast to the given peers. + pub fn broadcast(&mut self, peers: Vec) { + self.send(Status::Broadcast(peers)) + } + + /// Returns true if the are no more listeners for this extrinsic or it was finalised. + pub fn is_done(&self) -> bool { + self.finalised || self.receivers.is_empty() + } + + fn send(&mut self, status: Status) { + self.receivers.retain(|sender| sender.unbounded_send(status.clone()).is_ok()) + } +} diff --git a/substrate/substrate/network/src/protocol.rs b/substrate/substrate/network/src/protocol.rs index 52e9e55484..1c2ab8bb63 100644 --- a/substrate/substrate/network/src/protocol.rs +++ b/substrate/substrate/network/src/protocol.rs @@ -14,7 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see .? -use std::collections::{HashMap, HashSet, BTreeMap}; +use std::collections::{HashMap, HashSet}; use std::{mem, cmp}; use std::sync::Arc; use std::time; @@ -24,7 +24,7 @@ use serde_json; use primitives::block::{HeaderHash, ExtrinsicHash, Number as BlockNumber, Header, Id as BlockId}; use primitives::{Hash, blake2_256}; use runtime_support::Hashable; -use network::{PeerId, NodeId}; +use network::PeerId; use message::{self, Message}; use sync::{ChainSync, Status as SyncStatus, SyncState}; @@ -103,15 +103,6 @@ pub struct PeerInfo { pub best_number: BlockNumber, } -/// Transaction stats -#[derive(Debug)] -pub struct TransactionStats { - /// Block number where this TX was first seen. - pub first_seen: u64, - /// Peers it was propagated to. - pub propagated_to: BTreeMap, -} - impl Protocol { /// Create a new instance. pub fn new(config: ProtocolConfig, chain: Arc, on_demand: Option>, transaction_pool: Arc) -> error::Result { @@ -463,15 +454,31 @@ impl Protocol { let transactions = self.transaction_pool.transactions(); + let mut propagated_to = HashMap::new(); let mut peers = self.peers.write(); for (peer_id, ref mut peer) in peers.iter_mut() { - let to_send: Vec<_> = transactions.iter().filter_map(|&(hash, ref t)| - if peer.known_transactions.insert(hash.clone()) { Some(t.clone()) } else { None }).collect(); + let (hashes, to_send): (Vec<_>, Vec<_>) = transactions + .iter() + .cloned() + .filter(|&(hash, _)| peer.known_transactions.insert(hash)) + .unzip(); + if !to_send.is_empty() { + let node_id = io.peer_session_info(*peer_id).map(|info| match info.id { + Some(id) => format!("{}@{:x}", info.remote_address, id), + None => info.remote_address.clone(), + }); + + if let Some(id) = node_id { + for hash in hashes { + propagated_to.entry(hash).or_insert_with(Vec::new).push(id.clone()); + } + } trace!(target: "sync", "Sending {} transactions to {}", to_send.len(), peer_id); self.send_message(io, *peer_id, Message::Transactions(to_send)); } } + self.transaction_pool.on_broadcasted(propagated_to); } /// Send Status message @@ -551,10 +558,6 @@ impl Protocol { self.on_demand.as_ref().map(|s| s.on_remote_response(io, peer_id, response)); } - pub fn transactions_stats(&self) -> BTreeMap { - BTreeMap::new() - } - pub fn chain(&self) -> &Client { &*self.chain } diff --git a/substrate/substrate/network/src/service.rs b/substrate/substrate/network/src/service.rs index c664d0bd06..06b7eff216 100644 --- a/substrate/substrate/network/src/service.rs +++ b/substrate/substrate/network/src/service.rs @@ -14,8 +14,8 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see .? +use std::collections::HashMap; use std::sync::Arc; -use std::collections::{BTreeMap}; use std::io; use std::time::Duration; use futures::sync::{oneshot, mpsc}; @@ -26,7 +26,7 @@ use primitives::block::{ExtrinsicHash, Header, HeaderHash}; use primitives::Hash; use core_io::{TimerToken}; use io::NetSyncIo; -use protocol::{Protocol, ProtocolStatus, PeerInfo as ProtocolPeerInfo, TransactionStats}; +use protocol::{Protocol, ProtocolStatus, PeerInfo as ProtocolPeerInfo}; use config::{ProtocolConfig}; use error::Error; use chain::Client; @@ -75,8 +75,6 @@ pub trait SyncProvider: Send + Sync { fn peers(&self) -> Vec; /// Get this node id if available. fn node_id(&self) -> Option; - /// Returns propagation count for pending transactions. - fn transactions_stats(&self) -> BTreeMap; } /// Transaction pool interface @@ -85,6 +83,8 @@ pub trait TransactionPool: Send + Sync { fn transactions(&self) -> Vec<(ExtrinsicHash, Vec)>; /// Import a transction into the pool. fn import(&self, transaction: &[u8]) -> Option; + /// Notify the pool about transactions broadcast. + fn on_broadcasted(&self, propagations: HashMap>); } /// ConsensusService @@ -249,10 +249,6 @@ impl SyncProvider for Service { fn node_id(&self) -> Option { self.network.external_url() } - - fn transactions_stats(&self) -> BTreeMap { - self.handler.protocol.transactions_stats() - } } /// ConsensusService diff --git a/substrate/substrate/network/src/test/mod.rs b/substrate/substrate/network/src/test/mod.rs index 3699802daf..3652d4f317 100644 --- a/substrate/substrate/network/src/test/mod.rs +++ b/substrate/substrate/network/src/test/mod.rs @@ -204,6 +204,8 @@ impl TransactionPool for EmptyTransactionPool { fn import(&self, _transaction: &[u8]) -> Option { None } + + fn on_broadcasted(&self, _: HashMap>) {} } pub struct TestNet { diff --git a/substrate/substrate/rpc/Cargo.toml b/substrate/substrate/rpc/Cargo.toml index d2e2547f4c..a11c5d5050 100644 --- a/substrate/substrate/rpc/Cargo.toml +++ b/substrate/substrate/rpc/Cargo.toml @@ -12,6 +12,7 @@ log = "0.3" parking_lot = "0.4" substrate-client = { path = "../client" } substrate-executor = { path = "../executor" } +substrate-extrinsic-pool = { path = "../extrinsic-pool" } substrate-primitives = { path = "../primitives" } substrate-state-machine = { path = "../state-machine" } tokio-core = "0.1.12" diff --git a/substrate/substrate/rpc/src/author/error.rs b/substrate/substrate/rpc/src/author/error.rs index ba37779383..5f7b9a82b2 100644 --- a/substrate/substrate/rpc/src/author/error.rs +++ b/substrate/substrate/rpc/src/author/error.rs @@ -16,12 +16,12 @@ //! Authoring RPC module errors. -use client; +use extrinsic_pool::txpool; use rpc; error_chain! { links { - Client(client::error::Error, client::error::ErrorKind) #[doc = "Client error"]; + Pool(txpool::Error, txpool::ErrorKind) #[doc = "Pool error"]; } errors { /// Not implemented yet @@ -29,15 +29,10 @@ error_chain! { description("not yet implemented"), display("Method Not Implemented"), } - /// Invalid format - InvalidFormat { - description("invalid format"), - display("Invalid format for the extrinsic data"), - } - /// Some error with the pool since the import failed. - PoolError { - description("pool import failed"), - display("Pool import failed"), + /// Verification error + Verification(e: Box<::std::error::Error + Send>) { + description("extrinsic verification error"), + display("Extrinsic verification error: {}", e.description()), } } } @@ -50,6 +45,7 @@ impl From for rpc::Error { message: "Not implemented yet".into(), data: None, }, + // TODO [ToDr] Unwrap Pool errors. _ => rpc::Error::internal_error(), } } diff --git a/substrate/substrate/rpc/src/author/mod.rs b/substrate/substrate/rpc/src/author/mod.rs index e10234282f..83062d5c08 100644 --- a/substrate/substrate/rpc/src/author/mod.rs +++ b/substrate/substrate/rpc/src/author/mod.rs @@ -16,7 +16,9 @@ //! Substrate block-author/full-node API. -use primitives::block::Extrinsic; +use std::sync::Arc; +use primitives::block::{Extrinsic, ExtrinsicHash}; +use extrinsic_pool::api::{Error, ExtrinsicPool}; pub mod error; @@ -30,6 +32,20 @@ build_rpc_trait! { pub trait AuthorApi { /// Submit extrinsic for inclusion in block. #[rpc(name = "author_submitExtrinsic")] - fn submit_extrinsic(&self, Extrinsic) -> Result<()>; + fn submit_extrinsic(&self, Extrinsic) -> Result; + } +} + +impl AuthorApi for Arc where + T: ExtrinsicPool, +{ + fn submit_extrinsic(&self, xt: Extrinsic) -> Result { + self + .submit(vec![xt]) + .map(|mut res| res.pop().expect("One extrinsic passed; one result back; qed")) + .map_err(|e| e.into_pool_error() + .map(Into::into) + .unwrap_or_else(|e| error::ErrorKind::Verification(Box::new(e)).into()) + ) } } diff --git a/substrate/substrate/rpc/src/author/tests.rs b/substrate/substrate/rpc/src/author/tests.rs index 8a2c3eb2e3..10648cc1af 100644 --- a/substrate/substrate/rpc/src/author/tests.rs +++ b/substrate/substrate/rpc/src/author/tests.rs @@ -15,37 +15,53 @@ // along with Substrate. If not, see . use super::*; -use super::error::*; -use std::sync::Arc; +use std::{fmt, sync::Arc}; +use extrinsic_pool::api; use parking_lot::Mutex; use primitives::block; #[derive(Default)] struct DummyTxPool { - submitted: Vec, + submitted: Mutex>, } -impl AuthorApi for Arc> { +#[derive(Debug)] +struct Error; +impl api::Error for Error {} +impl ::std::error::Error for Error { + fn description(&self) -> &str { "Error" } +} +impl fmt::Display for Error { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt::Debug::fmt(self, fmt) + } +} + +impl api::ExtrinsicPool for DummyTxPool { + type Error = Error; + /// Submit extrinsic for inclusion in block. - fn submit_extrinsic(&self, xt: Extrinsic) -> Result<()> { - let mut s = self.lock(); - if s.submitted.len() < 1 { - s.submitted.push(xt); - Ok(()) + fn submit(&self, xt: Vec) -> ::std::result::Result, Self::Error> { + let mut submitted = self.submitted.lock(); + if submitted.len() < 1 { + let hashes = xt.iter().map(|_xt| 1.into()).collect(); + submitted.extend(xt); + Ok(hashes) } else { - Err(ErrorKind::PoolError.into()) + Err(Error) } } } #[test] fn submit_transaction_should_not_cause_error() { - let p = Arc::new(Mutex::new(DummyTxPool::default())); + let p = Arc::new(DummyTxPool::default()); + let hash: ExtrinsicHash = 1.into(); assert_matches!( AuthorApi::submit_extrinsic(&p, block::Extrinsic(vec![])), - Ok(()) + Ok(hash) ); assert!( AuthorApi::submit_extrinsic(&p, block::Extrinsic(vec![])).is_err() diff --git a/substrate/substrate/rpc/src/lib.rs b/substrate/substrate/rpc/src/lib.rs index 59f278b3a7..ec94c39c44 100644 --- a/substrate/substrate/rpc/src/lib.rs +++ b/substrate/substrate/rpc/src/lib.rs @@ -22,6 +22,7 @@ extern crate jsonrpc_core as rpc; extern crate jsonrpc_pubsub; extern crate parking_lot; extern crate substrate_client as client; +extern crate substrate_extrinsic_pool as extrinsic_pool; extern crate substrate_primitives as primitives; extern crate substrate_state_machine as state_machine; extern crate tokio_core;