From 59adb4767b280f164b1ccbb5f083d45c2cc24f8d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Thu, 31 May 2018 22:49:17 +0200 Subject: [PATCH] Extrinsic pool (#182) * Use latest version of txpool. * Initial version of the pool. * Fix abstraction. * Implement watchers and notifications. * Return hash from RPC. * Remove commented code. * Remove client dep. * Fix tests. --- polkadot/cli/src/lib.rs | 39 +----- polkadot/consensus/src/lib.rs | 64 ++++----- polkadot/consensus/src/service.rs | 3 +- polkadot/service/src/config.rs | 21 +-- polkadot/service/src/lib.rs | 91 ++++++------ polkadot/transaction-pool/Cargo.toml | 7 +- polkadot/transaction-pool/src/error.rs | 61 ++++++++ polkadot/transaction-pool/src/lib.rs | 184 +++++++++---------------- 8 files changed, 222 insertions(+), 248 deletions(-) create mode 100644 polkadot/transaction-pool/src/error.rs diff --git a/polkadot/cli/src/lib.rs b/polkadot/cli/src/lib.rs index 90b0655cd4..f734b154cd 100644 --- a/polkadot/cli/src/lib.rs +++ b/polkadot/cli/src/lib.rs @@ -61,39 +61,11 @@ mod informant; use std::io; use std::net::SocketAddr; use std::path::{Path, PathBuf}; -use std::sync::Arc; use futures::sync::mpsc; use futures::{Sink, Future, Stream}; use tokio_core::reactor; -use parking_lot::Mutex; use service::ChainSpec; -use primitives::block::Extrinsic; - -struct RpcTransactionPool { - inner: Arc>, - network: Arc, -} - -impl substrate_rpc::author::AuthorApi for RpcTransactionPool { - fn submit_extrinsic(&self, xt: Extrinsic) -> substrate_rpc::author::error::Result<()> { - use primitives::hexdisplay::HexDisplay; - use polkadot_runtime::UncheckedExtrinsic; - use codec::Slicable; - - info!("Extrinsic submitted: {}", HexDisplay::from(&xt.0)); - let decoded = xt.using_encoded(|ref mut s| UncheckedExtrinsic::decode(s)) - .ok_or(substrate_rpc::author::error::ErrorKind::InvalidFormat)?; - - info!("Correctly formatted: {:?}", decoded); - - self.inner.lock().import(decoded) - .map_err(|_| substrate_rpc::author::error::ErrorKind::PoolError)?; - - self.network.trigger_repropagate(); - Ok(()) - } -} struct Configuration(service::Configuration); @@ -238,11 +210,12 @@ fn run_until_exit(mut core: reactor::Core, service: service::Service let handler = || { let chain = rpc::apis::chain::Chain::new(service.client(), core.remote()); - let pool = RpcTransactionPool { - inner: service.transaction_pool(), - network: service.network(), - }; - rpc::rpc_handler(service.client(), chain, pool, Configuration(config.clone())) + rpc::rpc_handler( + service.client(), + chain, + service.transaction_pool(), + Configuration(config.clone()), + ) }; ( start_server(http_address, |address| rpc::start_http(address, handler())), diff --git a/polkadot/consensus/src/lib.rs b/polkadot/consensus/src/lib.rs index f9e3b35a26..def896548e 100644 --- a/polkadot/consensus/src/lib.rs +++ b/polkadot/consensus/src/lib.rs @@ -78,7 +78,6 @@ use tokio_core::reactor::{Handle, Timeout, Interval}; use futures::prelude::*; use futures::future::{self, Shared}; -use parking_lot::Mutex; use collation::CollationFetch; use dynamic_inclusion::DynamicInclusion; @@ -226,7 +225,7 @@ pub struct ProposerFactory { /// The client instance. pub client: Arc, /// The transaction pool. - pub transaction_pool: Arc>, + pub transaction_pool: Arc, /// The backing network handle. pub network: N, /// Parachain collators. @@ -319,7 +318,7 @@ pub struct Proposer { random_seed: Hash, router: R, table: Arc, - transaction_pool: Arc>, + transaction_pool: Arc, } impl bft::Proposer for Proposer @@ -495,17 +494,16 @@ impl bft::Proposer for Proposer use primitives::bft::{MisbehaviorKind, MisbehaviorReport}; use polkadot_runtime::{Call, Extrinsic, UncheckedExtrinsic, ConsensusCall}; + let local_id = self.local_key.public().0; - let mut pool = self.transaction_pool.lock(); let mut next_index = { let readiness_evaluator = Ready::create(self.parent_id.clone(), &*self.client); - - pool.cull(None, readiness_evaluator.clone()); - let cur_index = pool.pending(readiness_evaluator) + let cur_index = self.transaction_pool.cull_and_get_pending(readiness_evaluator, |pending| pending .filter(|tx| tx.as_ref().as_ref().signed == local_id) .last() .map(|tx| Ok(tx.as_ref().as_ref().index)) - .unwrap_or_else(|| self.client.index(&self.parent_id, local_id)); + .unwrap_or_else(|| self.client.index(&self.parent_id, local_id)) + ); match cur_index { Ok(cur_index) => cur_index + 1, @@ -541,7 +539,7 @@ impl bft::Proposer for Proposer let signature = self.local_key.sign(&extrinsic.encode()).into(); let uxt = UncheckedExtrinsic { extrinsic, signature }; - pool.import(uxt).expect("locally signed extrinsic is valid; qed"); + self.transaction_pool.import_unchecked_extrinsic(uxt).expect("locally signed extrinsic is valid; qed"); } } } @@ -609,7 +607,7 @@ pub struct CreateProposal { parent_number: BlockNumber, parent_id: C::CheckedBlockId, client: Arc, - transaction_pool: Arc>, + transaction_pool: Arc, collation: CollationFetch, router: R, table: Arc, @@ -631,37 +629,33 @@ impl CreateProposal candidates, )?; - let readiness_evaluator = Ready::create(self.parent_id.clone(), &*self.client); - { - let mut pool = self.transaction_pool.lock(); + let readiness_evaluator = Ready::create(self.parent_id.clone(), &*self.client); let mut unqueue_invalid = Vec::new(); - let mut pending_size = 0; - - pool.cull(None, readiness_evaluator.clone()); - for pending in pool.pending(readiness_evaluator.clone()) { - // skip and cull transactions which are too large. - if pending.encoded_size() > MAX_TRANSACTIONS_SIZE { - unqueue_invalid.push(pending.hash().clone()); - continue - } - - if pending_size + pending.encoded_size() >= MAX_TRANSACTIONS_SIZE { break } - - match block_builder.push_extrinsic(pending.as_transaction().clone()) { - Ok(()) => { - pending_size += pending.encoded_size(); - } - Err(e) => { - trace!(target: "transaction-pool", "Invalid transaction: {}", e); + self.transaction_pool.cull_and_get_pending(readiness_evaluator, |pending_iterator| { + let mut pending_size = 0; + for pending in pending_iterator { + // skip and cull transactions which are too large. + if pending.encoded_size() > MAX_TRANSACTIONS_SIZE { unqueue_invalid.push(pending.hash().clone()); + continue + } + + if pending_size + pending.encoded_size() >= MAX_TRANSACTIONS_SIZE { break } + + match block_builder.push_extrinsic(pending.as_transaction().clone()) { + Ok(()) => { + pending_size += pending.encoded_size(); + } + Err(e) => { + trace!(target: "transaction-pool", "Invalid transaction: {}", e); + unqueue_invalid.push(pending.hash().clone()); + } } } - } + }); - for tx_hash in unqueue_invalid { - pool.remove(&tx_hash, false); - } + self.transaction_pool.remove(&unqueue_invalid, false); } let polkadot_block = block_builder.bake(); diff --git a/polkadot/consensus/src/service.rs b/polkadot/consensus/src/service.rs index 2177e87412..dedab92135 100644 --- a/polkadot/consensus/src/service.rs +++ b/polkadot/consensus/src/service.rs @@ -28,7 +28,6 @@ use client::{BlockchainEvents, ChainHead}; use ed25519; use futures::prelude::*; use futures::{future, Canceled}; -use parking_lot::Mutex; use polkadot_api::LocalPolkadotApi; use polkadot_primitives::AccountId; use polkadot_primitives::parachain::{Id as ParaId, BlockData, Extrinsic, CandidateReceipt}; @@ -237,7 +236,7 @@ impl Service { client: Arc, api: Arc, network: Arc, - transaction_pool: Arc>, + transaction_pool: Arc, parachain_empty_duration: Duration, key: ed25519::Pair, ) -> Service diff --git a/polkadot/service/src/config.rs b/polkadot/service/src/config.rs index c6edbe141c..e5c27ef776 100644 --- a/polkadot/service/src/config.rs +++ b/polkadot/service/src/config.rs @@ -22,7 +22,7 @@ pub use network::NetworkConfiguration; /// The chain specification (this should eventually be replaced by a more general JSON-based chain /// specification). -#[derive(Clone)] +#[derive(Clone, Copy)] pub enum ChainSpec { /// Whatever the current runtime is, with just Alice as an auth. Development, @@ -33,6 +33,7 @@ pub enum ChainSpec { } /// Service configuration. +#[derive(Clone)] pub struct Configuration { /// Node roles. pub roles: Role, @@ -63,21 +64,3 @@ impl Default for Configuration { } } } - -impl Clone for Configuration { - fn clone(&self) -> Configuration { - Configuration { - roles: self.roles.clone(), - transaction_pool: transaction_pool::Options { - max_count: self.transaction_pool.max_count.clone(), - max_mem_usage: self.transaction_pool.max_mem_usage.clone(), - max_per_sender: self.transaction_pool.max_per_sender.clone(), - }, - network: self.network.clone(), - keystore_path: self.keystore_path.clone(), - database_path: self.database_path.clone(), - keys: self.keys.clone(), - chain_spec: self.chain_spec.clone(), - } - } -} diff --git a/polkadot/service/src/lib.rs b/polkadot/service/src/lib.rs index 436479d0a5..13dae55c4e 100644 --- a/polkadot/service/src/lib.rs +++ b/polkadot/service/src/lib.rs @@ -50,14 +50,14 @@ extern crate hex_literal; mod error; mod config; +use std::collections::HashMap; use std::sync::Arc; use std::thread; use futures::prelude::*; -use parking_lot::Mutex; use tokio_core::reactor::Core; use codec::Slicable; -use primitives::block::{Id as BlockId, Extrinsic, ExtrinsicHash, HeaderHash, Header}; -use primitives::{AuthorityId, hashing}; +use primitives::block::{Id as BlockId, ExtrinsicHash, HeaderHash, Header}; +use primitives::{AuthorityId}; use transaction_pool::TransactionPool; use substrate_executor::NativeExecutor; use polkadot_executor::Executor as LocalDispatch; @@ -80,13 +80,13 @@ pub struct Service { thread: Option>, client: Arc>, network: Arc, - transaction_pool: Arc>, + transaction_pool: Arc, signal: Option, _consensus: Option, } struct TransactionPoolAdapter where A: Send + Sync, E: Send + Sync { - pool: Arc>, + pool: Arc, client: Arc>, api: Arc, } @@ -108,19 +108,22 @@ impl network::TransactionPool for TransactionPoolAdapter }; let id = self.api.check_id(BlockId::Hash(best_block)).expect("Best block is always valid; qed."); - let mut pool = self.pool.lock(); - pool.cull(None, transaction_pool::Ready::create(id.clone(), &*self.api)); - pool.pending(transaction_pool::Ready::create(id, &*self.api)).map(|t| { - let hash = ::primitives::Hash::from(&t.hash()[..]); - let tx = codec::Slicable::encode(t.as_transaction()); - (hash, tx) - }).collect() + let ready = transaction_pool::Ready::create(id, &*self.api); + + self.pool.cull_and_get_pending(ready, |pending| pending + .map(|t| { + let hash = ::primitives::Hash::from(&t.hash()[..]); + let tx = codec::Slicable::encode(t.as_transaction()); + (hash, tx) + }) + .collect() + ) } fn import(&self, transaction: &[u8]) -> Option { - if let Some(tx) = codec::Slicable::decode(&mut &transaction[..]) { - match self.pool.lock().import(tx) { - Ok(t) => Some(t.hash()[..].into()), + if let Some(uxt) = codec::Slicable::decode(&mut &transaction[..]) { + match self.pool.import_unchecked_extrinsic(uxt) { + Ok(xt) => Some(*xt.hash()), Err(e) => match *e.kind() { transaction_pool::ErrorKind::AlreadyImported(hash) => Some(hash[..].into()), _ => { @@ -134,6 +137,10 @@ impl network::TransactionPool for TransactionPoolAdapter None } } + + fn on_broadcasted(&self, propagations: HashMap>) { + self.pool.on_broadcasted(propagations) + } } pub struct ChainConfig { @@ -341,7 +348,7 @@ impl Service C: Fn( Arc>, Arc, - Arc>, + Arc, &Keystore ) -> Result, error::Error>, A: PolkadotApi + Send + Sync + 'static, @@ -383,7 +390,7 @@ impl Service let api = api_creator(client.clone()); let best_header = client.best_block_header()?; info!("Starting Polkadot. Best block is #{}", best_header.number); - let transaction_pool = Arc::new(Mutex::new(TransactionPool::new(config.transaction_pool))); + let transaction_pool = Arc::new(TransactionPool::new(config.transaction_pool)); let transaction_pool_adapter = Arc::new(TransactionPoolAdapter { pool: transaction_pool.clone(), client: client.clone(), @@ -414,14 +421,28 @@ impl Service thread_barrier.wait(); let mut core = Core::new().expect("tokio::Core could not be created"); - let events = client.import_notification_stream().for_each(move |notification| { - network.on_block_imported(notification.hash, ¬ification.header); - prune_imported(&*client, &*txpool, notification.hash); - Ok(()) - }); + // block notifications + let network1 = network.clone(); + let txpool1 = txpool.clone(); + let events = client.import_notification_stream() + .for_each(move |notification| { + network1.on_block_imported(notification.hash, ¬ification.header); + prune_imported(&*api, &*txpool1, notification.hash); + Ok(()) + }); core.handle().spawn(events); + + // transaction notifications + let events = txpool.import_notification_stream() + // TODO [ToDr] Consider throttling? + .for_each(move |_| { + network.trigger_repropagate(); + Ok(()) + }); + core.handle().spawn(events); + if let Err(e) = core.run(exit) { debug!("Polkadot service event loop shutdown with {:?}", e); } @@ -457,30 +478,22 @@ impl Service } /// Get shared transaction pool instance. - pub fn transaction_pool(&self) -> Arc> { + pub fn transaction_pool(&self) -> Arc { self.transaction_pool.clone() } } -fn prune_transactions(pool: &mut TransactionPool, extrinsics: &[Extrinsic]) { - for extrinsic in extrinsics { - let hash: _ = hashing::blake2_256(&extrinsic.encode()).into(); - pool.remove(&hash, true); - } -} - /// Produce a task which prunes any finalized transactions from the pool. -pub fn prune_imported(client: &Client, pool: &Mutex, hash: HeaderHash) +pub fn prune_imported(api: &A, pool: &TransactionPool, hash: HeaderHash) where - B: Backend + Send + Sync, - E: CallExecutor + Send + Sync, - client::error::Error: From<<::State as state_machine::backend::Backend>::Error> + A: PolkadotApi, { - let id = BlockId::Hash(hash); - match client.body(&id) { - Ok(Some(body)) => prune_transactions(&mut *pool.lock(), &body[..]), - Ok(None) => warn!("Missing imported block {:?}", hash), - Err(e) => warn!("Failed to fetch block: {:?}", e), + match api.check_id(BlockId::Hash(hash)) { + Ok(id) => { + let ready = transaction_pool::Ready::create(id, api); + pool.cull(None, ready); + }, + Err(e) => warn!("Failed to check block id: {:?}", e), } } diff --git a/polkadot/transaction-pool/Cargo.toml b/polkadot/transaction-pool/Cargo.toml index d8cf5ebd80..140bea88e6 100644 --- a/polkadot/transaction-pool/Cargo.toml +++ b/polkadot/transaction-pool/Cargo.toml @@ -4,15 +4,14 @@ version = "0.1.0" authors = ["Parity Technologies "] [dependencies] -log = "0.4.0" -transaction-pool = "1.12.0" +log = "0.3.0" error-chain = "0.11" polkadot-api = { path = "../api" } polkadot-primitives = { path = "../primitives" } polkadot-runtime = { path = "../runtime" } substrate-client = { path = "../../substrate/client" } -substrate-rpc = { path = "../../substrate/rpc" } +substrate-codec = { path = "../../substrate/codec" } +substrate-extrinsic-pool = { path = "../../substrate/extrinsic-pool" } substrate-primitives = { path = "../../substrate/primitives" } substrate-runtime-primitives = { path = "../../substrate/runtime/primitives" } -substrate-codec = { path = "../../substrate/codec" } ed25519 = { path = "../../substrate/ed25519" } diff --git a/polkadot/transaction-pool/src/error.rs b/polkadot/transaction-pool/src/error.rs new file mode 100644 index 0000000000..18b79e59fe --- /dev/null +++ b/polkadot/transaction-pool/src/error.rs @@ -0,0 +1,61 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +use extrinsic_pool::{self, txpool}; +use primitives::Hash; +use runtime::UncheckedExtrinsic; + +error_chain! { + links { + Pool(txpool::Error, txpool::ErrorKind); + } + errors { + /// Unexpected extrinsic format submitted + InvalidExtrinsicFormat { + description("Invalid extrinsic format."), + display("Invalid extrinsic format."), + } + /// Attempted to queue an inherent transaction. + IsInherent(xt: UncheckedExtrinsic) { + description("Inherent transactions cannot be queued."), + display("Inehrent transactions cannot be queued."), + } + /// Attempted to queue a transaction with bad signature. + BadSignature(xt: UncheckedExtrinsic) { + description("Transaction had bad signature."), + display("Transaction had bad signature."), + } + /// Attempted to queue a transaction that is already in the pool. + AlreadyImported(hash: Hash) { + description("Transaction is already in the pool."), + display("Transaction {:?} is already in the pool.", hash), + } + /// Import error. + Import(err: Box<::std::error::Error + Send>) { + description("Error importing transaction"), + display("Error importing transaction: {}", err.description()), + } + } +} + +impl extrinsic_pool::api::Error for Error { + fn into_pool_error(self) -> ::std::result::Result { + match self { + Error(ErrorKind::Pool(e), c) => Ok(txpool::Error(e, c)), + e => Err(e), + } + } +} diff --git a/polkadot/transaction-pool/src/lib.rs b/polkadot/transaction-pool/src/lib.rs index faa3d09a0b..015777a863 100644 --- a/polkadot/transaction-pool/src/lib.rs +++ b/polkadot/transaction-pool/src/lib.rs @@ -1,4 +1,4 @@ -// Copyright 2017 Parity Technologies (UK) Ltd. +// Copyright 2018 Parity Technologies (UK) Ltd. // This file is part of Polkadot. // Polkadot is free software: you can redistribute it and/or modify @@ -16,12 +16,12 @@ extern crate ed25519; extern crate substrate_codec as codec; +extern crate substrate_extrinsic_pool as extrinsic_pool; extern crate substrate_primitives as substrate_primitives; -extern crate substrate_runtime_primitives as substrate_runtime_primitives; +extern crate substrate_runtime_primitives; extern crate polkadot_runtime as runtime; extern crate polkadot_primitives as primitives; extern crate polkadot_api; -extern crate transaction_pool; #[macro_use] extern crate error_chain; @@ -29,19 +29,27 @@ extern crate error_chain; #[macro_use] extern crate log; -use std::collections::HashMap; -use std::cmp::Ordering; -use std::sync::Arc; +mod error; +use std::{ + cmp::Ordering, + collections::HashMap, + ops::Deref, + sync::Arc, +}; + +use codec::Slicable; +use extrinsic_pool::{Pool, txpool::{self, Readiness, scoring::{Change, Choice}}}; use polkadot_api::PolkadotApi; -use primitives::{AccountId, Timestamp, Hash}; use primitives::parachain::CandidateReceipt; +use primitives::{AccountId, Timestamp, Hash}; use runtime::{Block, UncheckedExtrinsic, TimestampCall, ParachainsCall, Call}; +use substrate_primitives::block::{Extrinsic, ExtrinsicHash}; +use substrate_primitives::hexdisplay::HexDisplay; use substrate_runtime_primitives::traits::{Bounded, Checkable}; -use transaction_pool::{Transaction, Pool, Readiness}; -use transaction_pool::scoring::{Change, Choice}; -pub use transaction_pool::{Options, Status, LightStatus, NoopListener, VerifiedTransaction as VerifiedTransactionOps}; +pub use extrinsic_pool::txpool::{Options, Status, LightStatus, VerifiedTransaction as VerifiedTransactionOps}; +pub use error::{Error, ErrorKind, Result}; /// Useful functions for working with Polkadot blocks. pub struct PolkadotBlock { @@ -127,48 +135,17 @@ impl From for Block { } } -/// Iterator over pending transactions. -pub type PendingIterator<'a, C> = - transaction_pool::PendingIterator<'a, VerifiedTransaction, Ready<'a, C>, Scoring, NoopListener>; - -error_chain! { - errors { - /// Attempted to queue an inherent transaction. - IsInherent(xt: UncheckedExtrinsic) { - description("Inherent transactions cannot be queued."), - display("Inehrent transactions cannot be queued."), - } - /// Attempted to queue a transaction with bad signature. - BadSignature(xt: UncheckedExtrinsic) { - description("Transaction had bad signature."), - display("Transaction had bad signature."), - } - /// Attempted to queue a transaction that is already in the pool. - AlreadyImported(hash: Hash) { - description("Transaction is already in the pool."), - display("Transaction {:?} is already in the pool.", hash), - } - /// Import error. - Import(err: Box<::std::error::Error + Send>) { - description("Error importing transaction"), - display("Error importing transaction: {}", err.description()), - } - } -} - /// A verified transaction which should be includable and non-inherent. #[derive(Debug, Clone)] pub struct VerifiedTransaction { inner: ::Checked, - hash: Hash, - address: AccountId, - insertion_id: u64, + hash: ExtrinsicHash, encoded_size: usize, } impl VerifiedTransaction { /// Attempt to verify a transaction. - fn create(xt: UncheckedExtrinsic, insertion_id: u64) -> Result { + fn create(xt: UncheckedExtrinsic) -> Result { if !xt.is_signed() { bail!(ErrorKind::IsInherent(xt)) } @@ -176,15 +153,11 @@ impl VerifiedTransaction { let message = codec::Slicable::encode(&xt); match xt.check() { Ok(xt) => { - // TODO: make transaction-pool use generic types. let hash = substrate_primitives::hashing::blake2_256(&message); - let address = xt.signed; Ok(VerifiedTransaction { inner: xt, hash: hash.into(), encoded_size: message.len(), - address, - insertion_id, }) } Err(xt) => Err(ErrorKind::BadSignature(xt).into()), @@ -208,7 +181,7 @@ impl VerifiedTransaction { /// Get the account ID of the sender of this transaction. pub fn sender(&self) -> &AccountId { - &self.address + &self.inner.signed } /// Get encoded size of the transaction. @@ -223,16 +196,16 @@ impl AsRef< ::Checked > for VerifiedTransaction } } -impl transaction_pool::VerifiedTransaction for VerifiedTransaction { +impl txpool::VerifiedTransaction for VerifiedTransaction { type Hash = Hash; type Sender = AccountId; - fn hash(&self) -> &Hash { + fn hash(&self) -> &Self::Hash { &self.hash } - fn sender(&self) -> &AccountId { - &self.address + fn sender(&self) -> &Self::Sender { + &self.inner.signed } fn mem_usage(&self) -> usize { @@ -244,7 +217,7 @@ impl transaction_pool::VerifiedTransaction for VerifiedTransaction { #[derive(Debug)] pub struct Scoring; -impl transaction_pool::Scoring for Scoring { +impl txpool::Scoring for Scoring { type Score = u64; type Event = (); @@ -258,7 +231,7 @@ impl transaction_pool::Scoring for Scoring { fn update_scores( &self, - xts: &[Transaction], + xts: &[txpool::Transaction], scores: &mut [Self::Score], _change: Change<()> ) { @@ -276,7 +249,7 @@ impl transaction_pool::Scoring for Scoring { /// Readiness evaluator for polkadot transactions. pub struct Ready<'a, T: 'a + PolkadotApi> { at_block: T::CheckedBlockId, - api_handle: &'a T, + api: &'a T, known_indices: HashMap, } @@ -284,7 +257,7 @@ impl<'a, T: 'a + PolkadotApi> Clone for Ready<'a, T> { fn clone(&self) -> Self { Ready { at_block: self.at_block.clone(), - api_handle: self.api_handle, + api: self.api, known_indices: self.known_indices.clone(), } } @@ -293,106 +266,85 @@ impl<'a, T: 'a + PolkadotApi> Clone for Ready<'a, T> { impl<'a, T: 'a + PolkadotApi> Ready<'a, T> { /// Create a new readiness evaluator at the given block. Requires that /// the ID has already been checked for local corresponding and available state. - pub fn create(at: T::CheckedBlockId, client: &'a T) -> Self { + pub fn create(at: T::CheckedBlockId, api: &'a T) -> Self { Ready { at_block: at, - api_handle: client, + api, known_indices: HashMap::new(), } } } -impl<'a, T: 'a + PolkadotApi> transaction_pool::Ready for Ready<'a, T> { +impl<'a, T: 'a + PolkadotApi> txpool::Ready for Ready<'a, T> { fn is_ready(&mut self, xt: &VerifiedTransaction) -> Readiness { let sender = xt.inner.signed; trace!(target: "transaction-pool", "Checking readiness of {} (from {})", xt.hash, Hash::from(sender)); // TODO: find a way to handle index error properly -- will need changes to // transaction-pool trait. - let (api_handle, at_block) = (&self.api_handle, &self.at_block); + let (api, at_block) = (&self.api, &self.at_block); let next_index = self.known_indices.entry(sender) - .or_insert_with(|| api_handle.index(at_block, sender).ok().unwrap_or_else(Bounded::max_value)); + .or_insert_with(|| api.index(at_block, sender).ok().unwrap_or_else(Bounded::max_value)); trace!(target: "transaction-pool", "Next index for sender is {}; xt index is {}", next_index, xt.inner.index); - match xt.inner.index.cmp(&next_index) { + let result = match xt.inner.index.cmp(&next_index) { Ordering::Greater => Readiness::Future, Ordering::Equal => Readiness::Ready, - Ordering::Less => Readiness::Stale, // TODO: this is not "stalled" but rather stale and can be discarded. - } + Ordering::Less => Readiness::Stale, + }; + + // remember to increment `next_index` + *next_index = next_index.saturating_add(1); + + result + } +} + +pub struct Verifier; + +impl txpool::Verifier for Verifier { + type VerifiedTransaction = VerifiedTransaction; + type Error = Error; + + fn verify_transaction(&self, xt: Extrinsic) -> Result { + info!("Extrinsic submitted: {}", HexDisplay::from(&xt.0)); + let uxt = xt.using_encoded(|ref mut s| UncheckedExtrinsic::decode(s)) + .ok_or_else(|| ErrorKind::InvalidExtrinsicFormat)?; + info!("Correctly formatted: {:?}", uxt); + VerifiedTransaction::create(uxt) } } /// The polkadot transaction pool. /// -/// Wraps a `transaction-pool::Pool`. +/// Wraps a `extrinsic_pool::Pool`. pub struct TransactionPool { - inner: transaction_pool::Pool, - insertion_index: u64, // TODO: use AtomicU64 when it stabilizes + inner: Pool, } impl TransactionPool { /// Create a new transaction pool. pub fn new(options: Options) -> Self { TransactionPool { - inner: Pool::new(NoopListener, Scoring, options), - insertion_index: 0, + inner: Pool::new(options, Verifier, Scoring), } } - /// Verify and import a transaction into the pool. - pub fn import(&mut self, xt: UncheckedExtrinsic) -> Result> { - let insertion_index = self.insertion_index; - self.insertion_index += 1; - - let verified = VerifiedTransaction::create(xt, insertion_index)?; - - info!("Extrinsic verified {:?}. Importing...", verified); - - // TODO: just use a foreign link when the error type is made public. - let hash = verified.hash.clone(); - self.inner.import(verified) - .map_err(|e| - match e { - // TODO: make error types public in transaction_pool. For now just treat all errors as AlreadyImported - _ => ErrorKind::AlreadyImported(hash), - // transaction_pool::error::AlreadyImported(h) => ErrorKind::AlreadyImported(h), - // e => ErrorKind::Import(Box::new(e)), - }) - .map_err(Into::into) + pub fn import_unchecked_extrinsic(&self, uxt: UncheckedExtrinsic) -> Result> { + Ok(self.inner.import(VerifiedTransaction::create(uxt)?)?) } +} - /// Clear the pool. - pub fn clear(&mut self) { - self.inner.clear(); - } +impl Deref for TransactionPool { + type Target = Pool; - /// Remove from the pool. - pub fn remove(&mut self, hash: &Hash, is_valid: bool) -> Option> { - self.inner.remove(hash, is_valid) - } - - /// Cull transactions from the queue. - pub fn cull(&mut self, senders: Option<&[AccountId]>, ready: Ready) -> usize { - self.inner.cull(senders, ready) - } - - /// Get an iterator of pending transactions. - pub fn pending<'a, T: 'a + PolkadotApi>(&'a self, ready: Ready<'a, T>) -> PendingIterator<'a, T> { - self.inner.pending(ready) - } - - /// Get the full status of the queue (including readiness) - pub fn status(&self, ready: Ready) -> Status { - self.inner.status(ready) - } - - /// Returns light status of the pool. - pub fn light_status(&self) -> LightStatus { - self.inner.light_status() + fn deref(&self) -> &Self::Target { + &self.inner } } #[cfg(test)] mod tests { } +