From 5d4dafb8c09bcaade8bafca4ea76db504b5a2f27 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Tue, 26 Jun 2018 11:45:57 +0200 Subject: [PATCH] Handle AccountIndices in transaction pool (#225) * Merge remote-tracking branch 'origin/master' into gav-xts-dont-panic * Update wasm. * consensus, session and staking all panic-safe. * Democracy doesn't panic in apply. * Fix tests. * Extra helper macro, council depanicked. * Fix one test. * Fix up all council tests. No panics! * Council voting depanicked. * Dispatch returns result. * session & staking tests updated * Fix democracy tests. * Fix council tests. * Fix up polkadot parachains in runtime * Fix borked merge * More Slicable support Support general `Option` and array types. * Basic storage types. * Existential deposit for contract creation * Basic implemnetation along with removals * Fix tests. * externalities builder fix. * Tests. * Fix up the runtime. * Fix tests. * Add generic `Address` type. * Initial function integration of Address into Extrinsic. * Fix build * All tests compile. * Fix (some) tests. * Fix signing. * Push error. * transfer can accept Address * Make Address generic over AccountIndex * Fix test * Make Council use Address for dispatch. * Fix build * Bend over backwards to support braindead derive. * Repot some files. * Fix tests. * Fix grumbles * Remove Default bound * Fix build for new nightly. * Make `apply_extrinsic` never panic, return useful Result. * More merge hell * Doesn't build, but might do soon * Serde woes * get substrate-runtime-staking compiling * Polkadot builds again! * Fix all build. * Fix tests & binaries. * Reserve some extra initial byte values of address for future format changes * Make semantic of `ReservedBalance` clear. * Fix panic handler. * Integrate other balance transformations into the new model Fix up staking tests. * Fix runtime tests. * Fix panic build. * Tests for demonstrating interaction between balance types. * Repot some runtime code * Fix checkedblock in non-std builds * Get rid of `DoLookup` phantom. * Attempt to make transaction_pool work with lookups. * Remove vscode settings * New attempt at making transaction pool work. * It builds again! * --all builds * Fix tests. * New build. * Test account nonce reset. * polkadot transaction pool tests/framework. * Address grumbles. * Pool support non-verified transactions. * Revert bad `map_or` * Rebuild binaries, workaround. * Avoid casting to usize early. * Make verification use provided block_id. * Fix tests. * Alter tests to use retry. * Fix tests & add call to re-verify. * Semi-refactor. * Integrate new queue with the rest of the code. * Fix tests. * Add reverify_transaction method. * Use result. --- polkadot/cli/Cargo.toml | 6 +- polkadot/cli/src/lib.rs | 11 +- polkadot/consensus/src/lib.rs | 31 +- polkadot/consensus/src/service.rs | 3 +- polkadot/service/src/components.rs | 61 +-- polkadot/service/src/lib.rs | 29 +- polkadot/transaction-pool/src/error.rs | 7 +- polkadot/transaction-pool/src/lib.rs | 536 +++++++++++++++---------- 8 files changed, 402 insertions(+), 282 deletions(-) diff --git a/polkadot/cli/Cargo.toml b/polkadot/cli/Cargo.toml index 48dd25ebc8..cd0040c439 100644 --- a/polkadot/cli/Cargo.toml +++ b/polkadot/cli/Cargo.toml @@ -27,12 +27,12 @@ parking_lot = "0.4" serde_json = "1.0" serde = "1.0" substrate-client = { path = "../../substrate/client" } -substrate-state-machine = { path = "../../substrate/state-machine" } -substrate-rpc = { path = "../../substrate/rpc" } -substrate-rpc-servers = { path = "../../substrate/rpc-servers" } substrate-network = { path = "../../substrate/network" } substrate-primitives = { path = "../../substrate/primitives" } +substrate-rpc = { path = "../../substrate/rpc" } +substrate-rpc-servers = { path = "../../substrate/rpc-servers" } substrate-runtime-primitives = { path = "../../substrate/runtime/primitives" } +substrate-state-machine = { path = "../../substrate/state-machine" } substrate-telemetry = { path = "../../substrate/telemetry" } polkadot-primitives = { path = "../primitives" } polkadot-runtime = { path = "../runtime" } diff --git a/polkadot/cli/src/lib.rs b/polkadot/cli/src/lib.rs index ba68e7dfce..ac4f266b94 100644 --- a/polkadot/cli/src/lib.rs +++ b/polkadot/cli/src/lib.rs @@ -34,13 +34,13 @@ extern crate parking_lot; extern crate serde; extern crate serde_json; -extern crate substrate_primitives; -extern crate substrate_state_machine as state_machine; extern crate substrate_client as client; extern crate substrate_network as network; +extern crate substrate_primitives; extern crate substrate_rpc; extern crate substrate_rpc_servers as rpc; extern crate substrate_runtime_primitives as runtime_primitives; +extern crate substrate_state_machine as state_machine; extern crate polkadot_primitives; extern crate polkadot_runtime; extern crate polkadot_service as service; @@ -192,12 +192,12 @@ pub fn run(args: I) -> error::Result<()> where let (mut genesis_storage, boot_nodes) = PresetConfig::from_spec(chain_spec) .map(PresetConfig::deconstruct) - .unwrap_or_else(|f| (Box::new(move || + .unwrap_or_else(|f| (Box::new(move || read_storage_json(&f) .map(|s| { info!("{} storage items read from {}", s.len(), f); s }) .unwrap_or_else(|| panic!("Bad genesis state file: {}", f)) ), vec![])); - + if matches.is_present("build-genesis") { info!("Building genesis"); for (i, (k, v)) in genesis_storage().iter().enumerate() { @@ -285,10 +285,11 @@ fn run_until_exit(mut core: reactor::Core, service: service::Service, matc let handler = || { let chain = rpc::apis::chain::Chain::new(service.client(), core.remote()); + let author = rpc::apis::author::Author::new(service.client(), service.transaction_pool()); rpc::rpc_handler::( service.client(), chain, - service.transaction_pool(), + author, sys_conf.clone(), ) }; diff --git a/polkadot/consensus/src/lib.rs b/polkadot/consensus/src/lib.rs index 6146e6d87d..5dd4c1d484 100644 --- a/polkadot/consensus/src/lib.rs +++ b/polkadot/consensus/src/lib.rs @@ -74,7 +74,7 @@ use polkadot_primitives::{Hash, Block, BlockId, BlockNumber, Header, Timestamp}; use polkadot_primitives::parachain::{Id as ParaId, Chain, DutyRoster, BlockData, Extrinsic as ParachainExtrinsic, CandidateReceipt}; use polkadot_runtime::BareExtrinsic; use primitives::AuthorityId; -use transaction_pool::{Ready, TransactionPool}; +use transaction_pool::{TransactionPool}; use tokio_core::reactor::{Handle, Timeout, Interval}; use futures::prelude::*; @@ -226,7 +226,7 @@ pub struct ProposerFactory { /// The client instance. pub client: Arc, /// The transaction pool. - pub transaction_pool: Arc, + pub transaction_pool: Arc>, /// The backing network handle. pub network: N, /// Parachain collators. @@ -239,7 +239,8 @@ pub struct ProposerFactory { impl bft::ProposerFactory for ProposerFactory where - C: PolkadotApi, + C: PolkadotApi + Send + Sync, + C::CheckedBlockId: Sync, N: Network, P: Collators, { @@ -319,12 +320,13 @@ pub struct Proposer { random_seed: Hash, router: R, table: Arc, - transaction_pool: Arc, + transaction_pool: Arc>, } impl bft::Proposer for Proposer where - C: PolkadotApi, + C: PolkadotApi + Send + Sync, + C::CheckedBlockId: Sync, R: TableRouter, P: Collators, { @@ -501,8 +503,7 @@ impl bft::Proposer for Proposer let local_id = self.local_key.public().0.into(); let mut next_index = { - let readiness_evaluator = Ready::create(self.parent_id.clone(), &*self.client); - let cur_index = self.transaction_pool.cull_and_get_pending(readiness_evaluator, |pending| pending + let cur_index = self.transaction_pool.cull_and_get_pending(BlockId::hash(self.parent_hash), |pending| pending .filter(|tx| tx.sender().map(|s| s == local_id).unwrap_or(false)) .last() .map(|tx| Ok(tx.index())) @@ -510,7 +511,11 @@ impl bft::Proposer for Proposer ); match cur_index { - Ok(cur_index) => cur_index + 1, + Ok(Ok(cur_index)) => cur_index + 1, + Ok(Err(e)) => { + warn!(target: "consensus", "Error computing next transaction index: {}", e); + return; + } Err(e) => { warn!(target: "consensus", "Error computing next transaction index: {}", e); return; @@ -549,7 +554,7 @@ impl bft::Proposer for Proposer }; let uxt = UncheckedExtrinsic::new(extrinsic, signature); - self.transaction_pool.import_unchecked_extrinsic(uxt) + self.transaction_pool.import_unchecked_extrinsic(BlockId::hash(self.parent_hash), uxt) .expect("locally signed extrinsic is valid; qed"); } } @@ -618,7 +623,7 @@ pub struct CreateProposal { parent_number: BlockNumber, parent_id: C::CheckedBlockId, client: Arc, - transaction_pool: Arc, + transaction_pool: Arc>, collation: CollationFetch, router: R, table: Arc, @@ -640,9 +645,8 @@ impl CreateProposal let mut block_builder = self.client.build_block(&self.parent_id, timestamp, candidates)?; { - let readiness_evaluator = Ready::create(self.parent_id.clone(), &*self.client); let mut unqueue_invalid = Vec::new(); - self.transaction_pool.cull_and_get_pending(readiness_evaluator, |pending_iterator| { + let result = self.transaction_pool.cull_and_get_pending(BlockId::hash(self.parent_hash), |pending_iterator| { let mut pending_size = 0; for pending in pending_iterator { // skip and cull transactions which are too large. @@ -664,6 +668,9 @@ impl CreateProposal } } }); + if let Err(e) = result { + warn!("Unable to get the pending set: {:?}", e); + } self.transaction_pool.remove(&unqueue_invalid, false); } diff --git a/polkadot/consensus/src/service.rs b/polkadot/consensus/src/service.rs index f1ee7f49fe..7e7f5e9146 100644 --- a/polkadot/consensus/src/service.rs +++ b/polkadot/consensus/src/service.rs @@ -235,13 +235,14 @@ impl Service { client: Arc, api: Arc, network: Arc>, - transaction_pool: Arc, + transaction_pool: Arc>, parachain_empty_duration: Duration, key: ed25519::Pair, ) -> Service where A: LocalPolkadotApi + Send + Sync + 'static, C: BlockchainEvents + ChainHead + bft::BlockImport + bft::Authorities + Send + Sync + 'static, + A::CheckedBlockId: Sync, { let (signal, exit) = ::exit_future::signal(); let thread = thread::spawn(move || { diff --git a/polkadot/service/src/components.rs b/polkadot/service/src/components.rs index e8274f4dfc..7211fa660b 100644 --- a/polkadot/service/src/components.rs +++ b/polkadot/service/src/components.rs @@ -55,11 +55,11 @@ pub trait Components { fn build_api(&self, client: Arc>) -> Arc; /// Create network transaction pool adapter. - fn build_network_tx_pool(&self, client: Arc>, api: Arc, tx_pool: Arc) + fn build_network_tx_pool(&self, client: Arc>, tx_pool: Arc>) -> Arc>; /// Create consensus service. - fn build_consensus(&self, client: Arc>, network: Arc>, tx_pool: Arc, keystore: &Keystore) + fn build_consensus(&self, client: Arc>, network: Arc>, tx_pool: Arc>, keystore: &Keystore) -> Result, error::Error>; } @@ -83,17 +83,16 @@ impl Components for FullComponents { client } - fn build_network_tx_pool(&self, client: Arc>, api: Arc, pool: Arc) + fn build_network_tx_pool(&self, client: Arc>, pool: Arc>) -> Arc> { Arc::new(TransactionPoolAdapter { imports_external_transactions: true, pool, client, - api, }) } - fn build_consensus(&self, client: Arc>, network: Arc>, tx_pool: Arc, keystore: &Keystore) + fn build_consensus(&self, client: Arc>, network: Arc>, tx_pool: Arc>, keystore: &Keystore) -> Result, error::Error> { if !self.is_validator { return Ok(None); @@ -134,17 +133,16 @@ impl Components for LightComponents { Arc::new(polkadot_api::light::RemotePolkadotApiWrapper(client.clone())) } - fn build_network_tx_pool(&self, client: Arc>, api: Arc, pool: Arc) + fn build_network_tx_pool(&self, client: Arc>, pool: Arc>) -> Arc> { Arc::new(TransactionPoolAdapter { imports_external_transactions: false, pool, client, - api, }) } - fn build_consensus(&self, _client: Arc>, _network: Arc>, _tx_pool: Arc, _keystore: &Keystore) + fn build_consensus(&self, _client: Arc>, _network: Arc>, _tx_pool: Arc>, _keystore: &Keystore) -> Result, error::Error> { Ok(None) } @@ -153,9 +151,25 @@ impl Components for LightComponents { /// Transaction pool adapter. pub struct TransactionPoolAdapter where A: Send + Sync, E: Send + Sync { imports_external_transactions: bool, - pool: Arc, + pool: Arc>, client: Arc>, - api: Arc, +} + +impl TransactionPoolAdapter + where + A: Send + Sync, + B: client::backend::Backend + Send + Sync, + E: client::CallExecutor + Send + Sync, + client::error::Error: From<<>::State as state_machine::backend::Backend>::Error>, +{ + fn best_block_id(&self) -> Option { + self.client.info() + .map(|info| BlockId::hash(info.chain.best_hash)) + .map_err(|e| { + debug!("Error getting best block: {:?}", e); + }) + .ok() + } } impl network::TransactionPool for TransactionPoolAdapter @@ -166,28 +180,20 @@ impl network::TransactionPool for TransactionPoolAdapter Vec<(Hash, Vec)> { - let best_block = match self.client.info() { - Ok(info) => info.chain.best_hash, - Err(e) => { - debug!("Error getting best block: {:?}", e); - return Vec::new(); - } + let best_block_id = match self.best_block_id() { + Some(id) => id, + None => return vec![], }; - - let id = match self.api.check_id(BlockId::hash(best_block)) { - Ok(id) => id, - Err(_) => return Vec::new(), - }; - - let ready = transaction_pool::Ready::create(id, &*self.api); - - self.pool.cull_and_get_pending(ready, |pending| pending + self.pool.cull_and_get_pending(best_block_id, |pending| pending .map(|t| { let hash = t.hash().clone(); (hash, t.primitive_extrinsic()) }) .collect() - ) + ).unwrap_or_else(|e| { + warn!("Error retrieving pending set: {}", e); + vec![] + }) } fn import(&self, transaction: &Vec) -> Option { @@ -197,7 +203,8 @@ impl network::TransactionPool for TransactionPoolAdapter Some(*xt.hash()), Err(e) => match *e.kind() { transaction_pool::ErrorKind::AlreadyImported(hash) => Some(hash[..].into()), diff --git a/polkadot/service/src/lib.rs b/polkadot/service/src/lib.rs index 75ddf1ffbe..230a9ae587 100644 --- a/polkadot/service/src/lib.rs +++ b/polkadot/service/src/lib.rs @@ -75,7 +75,7 @@ pub struct Service { thread: Option>, client: Arc>, network: Arc>, - transaction_pool: Arc, + transaction_pool: Arc>, signal: Option, _consensus: Option, } @@ -127,8 +127,8 @@ impl Service info!("Best block is #{}", best_header.number); telemetry!("node.start"; "height" => best_header.number, "best" => ?best_header.hash()); - let transaction_pool = Arc::new(TransactionPool::new(config.transaction_pool)); - let transaction_pool_adapter = components.build_network_tx_pool(client.clone(), api.clone(), transaction_pool.clone()); + let transaction_pool = Arc::new(TransactionPool::new(config.transaction_pool, api.clone())); + let transaction_pool_adapter = components.build_network_tx_pool(client.clone(), transaction_pool.clone()); let network_params = network::Params { config: network::ProtocolConfig { roles: config.roles, @@ -161,7 +161,8 @@ impl Service let events = client.import_notification_stream() .for_each(move |notification| { network1.on_block_imported(notification.hash, ¬ification.header); - prune_imported(&*api, &*txpool1, notification.hash); + prune_imported(&*txpool1, notification.hash); + Ok(()) }); core.handle().spawn(events); @@ -210,22 +211,22 @@ impl Service } /// Get shared transaction pool instance. - pub fn transaction_pool(&self) -> Arc { + pub fn transaction_pool(&self) -> Arc> { self.transaction_pool.clone() } } /// Produce a task which prunes any finalized transactions from the pool. -pub fn prune_imported(api: &A, pool: &TransactionPool, hash: Hash) - where - A: PolkadotApi, +pub fn prune_imported(pool: &TransactionPool, hash: Hash) + where A: PolkadotApi, { - match api.check_id(BlockId::hash(hash)) { - Ok(id) => { - let ready = transaction_pool::Ready::create(id, api); - pool.cull(None, ready); - }, - Err(e) => warn!("Failed to check block id: {:?}", e), + let block = BlockId::hash(hash); + if let Err(e) = pool.cull(block) { + warn!("Culling error: {:?}", e); + } + + if let Err(e) = pool.retry_verification(block) { + warn!("Re-verifying error: {:?}", e); } } diff --git a/polkadot/transaction-pool/src/error.rs b/polkadot/transaction-pool/src/error.rs index 78f9d3e771..ef6cdf6b41 100644 --- a/polkadot/transaction-pool/src/error.rs +++ b/polkadot/transaction-pool/src/error.rs @@ -15,12 +15,14 @@ // along with Polkadot. If not, see . use extrinsic_pool::{self, txpool}; +use polkadot_api; use primitives::Hash; use runtime::{Address, UncheckedExtrinsic}; error_chain! { links { Pool(txpool::Error, txpool::ErrorKind); + Api(polkadot_api::Error, polkadot_api::ErrorKind); } errors { /// Unexpected extrinsic format submitted @@ -53,11 +55,6 @@ error_chain! { description("Unrecognised address in extrinsic"), display("Unrecognised address in extrinsic: {}", who), } - /// Extrinsic is not yet checked. - NotReady { - description("Indexed address is unverified"), - display("Indexed address is unverified"), - } } } diff --git a/polkadot/transaction-pool/src/lib.rs b/polkadot/transaction-pool/src/lib.rs index 843af20fb1..1fab5d481e 100644 --- a/polkadot/transaction-pool/src/lib.rs +++ b/polkadot/transaction-pool/src/lib.rs @@ -15,6 +15,7 @@ // along with Polkadot. If not, see . extern crate ed25519; +extern crate substrate_client as client; extern crate substrate_codec as codec; extern crate substrate_extrinsic_pool as extrinsic_pool; extern crate substrate_primitives as substrate_primitives; @@ -37,19 +38,17 @@ mod error; use std::{ cmp::Ordering, - collections::{hash_map::Entry, HashMap}, + collections::HashMap, ops::Deref, sync::Arc, - result }; -use parking_lot::Mutex; use codec::Slicable; -use extrinsic_pool::{Pool, txpool::{self, Readiness, scoring::{Change, Choice}}}; +use extrinsic_pool::{Pool, Listener, txpool::{self, Readiness, scoring::{Change, Choice}}}; use extrinsic_pool::api::ExtrinsicPool; use polkadot_api::PolkadotApi; -use primitives::{AccountId, AccountIndex, Hash, Index, UncheckedExtrinsic as FutureProofUncheckedExtrinsic}; -use runtime::{Address, RawAddress, UncheckedExtrinsic}; +use primitives::{AccountId, BlockId, Hash, Index, UncheckedExtrinsic as FutureProofUncheckedExtrinsic}; +use runtime::{Address, UncheckedExtrinsic}; use substrate_runtime_primitives::traits::{Bounded, Checkable, Hashing, BlakeTwo256}; pub use extrinsic_pool::txpool::{Options, Status, LightStatus, VerifiedTransaction as VerifiedTransactionOps}; @@ -59,65 +58,16 @@ pub use error::{Error, ErrorKind, Result}; pub type CheckedExtrinsic = ::Checked; /// A verified transaction which should be includable and non-inherent. -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct VerifiedTransaction { original: UncheckedExtrinsic, - // `create()` will leave this as `Some` only if the `Address` is an `AccountId`, otherwise a - // call to `polish` is needed. - inner: Mutex>, + inner: Option, + sender: Option, hash: Hash, encoded_size: usize, } -impl Clone for VerifiedTransaction { - fn clone(&self) -> Self { - VerifiedTransaction { - original: self.original.clone(), - inner: Mutex::new(self.inner.lock().clone()), - hash: self.hash.clone(), - encoded_size: self.encoded_size.clone(), - } - } -} - impl VerifiedTransaction { - /// Attempt to verify a transaction. - fn create(original: UncheckedExtrinsic) -> Result { - if !original.is_signed() { - bail!(ErrorKind::IsInherent(original)) - } - const UNAVAILABLE_MESSAGE: &'static str = "chain state not available"; - let (encoded_size, hash) = original.using_encoded(|e| (e.len(), BlakeTwo256::hash(e))); - let lookup = |a| match a { - RawAddress::Id(i) => Ok(i), - _ => Err(UNAVAILABLE_MESSAGE), - }; - let inner = Mutex::new(match original.clone().check(lookup) { - Ok(xt) => Some(xt), - Err(e) if e == UNAVAILABLE_MESSAGE => None, - Err(e) => bail!(ErrorKind::BadSignature(e)), - }); - Ok(VerifiedTransaction { original, inner, hash, encoded_size }) - } - - /// If this transaction isn't really verified, verify it and morph it into a really verified - /// transaction. - pub fn polish(&self, lookup: F) -> Result<()> where - F: FnOnce(Address) -> result::Result + Send + Sync - { - let inner: result::Result = self.original - .clone() - .check(lookup) - .map_err(|e| ErrorKind::BadSignature(e).into()); - *self.inner.lock() = Some(inner?); - Ok(()) - } - - /// Is this transaction *really* verified? - pub fn is_really_verified(&self) -> bool { - self.inner.lock().is_some() - } - /// Access the underlying transaction. pub fn as_transaction(&self) -> &UncheckedExtrinsic { &self.original @@ -129,9 +79,9 @@ impl VerifiedTransaction { .expect("UncheckedExtrinsic shares repr with Vec; qed") } - /// Consume the verified transaciton, yielding the unchecked counterpart. - pub fn into_inner(self) -> Result { - self.inner.lock().clone().ok_or_else(|| ErrorKind::NotReady.into()) + /// Consume the verified transaction, yielding the checked counterpart. + pub fn into_inner(self) -> Option { + self.inner } /// Get the 256-bit hash of this transaction. @@ -140,8 +90,8 @@ impl VerifiedTransaction { } /// Get the account ID of the sender of this transaction. - pub fn sender(&self) -> Result { - self.inner.lock().as_ref().map(|i| i.signed.clone()).ok_or_else(|| ErrorKind::NotReady.into()) + pub fn sender(&self) -> Option { + self.sender } /// Get the account ID of the sender of this transaction. @@ -153,22 +103,27 @@ impl VerifiedTransaction { pub fn encoded_size(&self) -> usize { self.encoded_size } + + /// Returns `true` if the transaction is not yet fully verified. + pub fn is_fully_verified(&self) -> bool { + self.inner.is_some() + } } impl txpool::VerifiedTransaction for VerifiedTransaction { type Hash = Hash; - type Sender = Address; + type Sender = Option; fn hash(&self) -> &Self::Hash { &self.hash } fn sender(&self) -> &Self::Sender { - self.original.sender() + &self.sender } fn mem_usage(&self) -> usize { - 1 // TODO + self.encoded_size // TODO } } @@ -184,7 +139,19 @@ impl txpool::Scoring for Scoring { old.index().cmp(&other.index()) } - fn choose(&self, _old: &VerifiedTransaction, _new: &VerifiedTransaction) -> Choice { + fn choose(&self, old: &VerifiedTransaction, new: &VerifiedTransaction) -> Choice { + if old.is_fully_verified() { + assert!(new.is_fully_verified(), "Scoring::choose called with transactions from different senders"); + if old.index() == new.index() { + // TODO [ToDr] Do we allow replacement? If yes then it should be Choice::ReplaceOld + return Choice::RejectNew; + } + } + + // This will keep both transactions, even though they have the same indices. + // It's fine for not fully verified transactions, we might also allow it for + // verified transactions but it would mean that only one of the two is actually valid + // (most likely the first to be included in the block). Choice::InsertNew } @@ -195,33 +162,37 @@ impl txpool::Scoring for Scoring { _change: Change<()> ) { for i in 0..xts.len() { - // all the same score since there are no fees. - // TODO: prioritize things like misbehavior or fishermen reports - scores[i] = 1; + if !xts[i].is_fully_verified() { + scores[i] = 0; + } else { + // all the same score since there are no fees. + // TODO: prioritize things like misbehavior or fishermen reports + scores[i] = 1; + } } } - fn should_replace(&self, _old: &VerifiedTransaction, _new: &VerifiedTransaction) -> bool { - false // no fees to determine which is better. + + fn should_replace(&self, old: &VerifiedTransaction, _new: &VerifiedTransaction) -> bool { + // Always replace not fully verified transactions. + !old.is_fully_verified() } } /// Readiness evaluator for polkadot transactions. -pub struct Ready<'a, T: 'a + PolkadotApi> { - at_block: T::CheckedBlockId, - api: &'a T, - known_nonces: HashMap, - known_indexes: HashMap, +pub struct Ready<'a, A: 'a + PolkadotApi> { + at_block: A::CheckedBlockId, + api: &'a A, + known_nonces: HashMap, } -impl<'a, T: 'a + PolkadotApi> Ready<'a, T> { +impl<'a, A: 'a + PolkadotApi> Ready<'a, A> { /// Create a new readiness evaluator at the given block. Requires that /// the ID has already been checked for local corresponding and available state. - pub fn create(at: T::CheckedBlockId, api: &'a T) -> Self { + fn create(at: A::CheckedBlockId, api: &'a A) -> Self { Ready { at_block: at, api, known_nonces: HashMap::new(), - known_indexes: HashMap::new(), } } } @@ -232,136 +203,214 @@ impl<'a, T: 'a + PolkadotApi> Clone for Ready<'a, T> { at_block: self.at_block.clone(), api: self.api, known_nonces: self.known_nonces.clone(), - known_indexes: self.known_indexes.clone(), } } } -impl<'a, T: 'a + PolkadotApi> txpool::Ready for Ready<'a, T> +impl<'a, A: 'a + PolkadotApi> txpool::Ready for Ready<'a, A> { fn is_ready(&mut self, xt: &VerifiedTransaction) -> Readiness { - if !xt.is_really_verified() { - let id = match xt.original.extrinsic.signed.clone() { - RawAddress::Id(id) => id.clone(), // should never happen, since we're not verified. - RawAddress::Index(i) => match self.known_indexes.entry(i) { - Entry::Occupied(e) => e.get().clone(), - Entry::Vacant(e) => { - let (api, at_block) = (&self.api, &self.at_block); - if let Some(id) = api.lookup(at_block, RawAddress::Index(i)) - .ok() - .and_then(|o| o) - { - e.insert(id.clone()); - id - } else { - // Invalid index. - // return stale in order to get the pool to throw it away. - return Readiness::Stale - } - } - }, - }; - if VerifiedTransaction::polish(xt, move |_| Ok(id)).is_err() { - // Invalid signature. - // return stale in order to get the pool to throw it away. - return Readiness::Stale - } - } + let sender = match xt.sender() { + Some(sender) => sender, + None => return Readiness::Future + }; - // guaranteed to be properly verified at this point. - - let sender = xt.sender().expect("only way to get here is `is_really_verified` or successful `polish`; either guarantees `is_really_verified`; `sender` is `Ok` if `is_really_verified`; qed"); trace!(target: "transaction-pool", "Checking readiness of {} (from {})", xt.hash, Hash::from(sender)); - let is_index_sender = match xt.original.extrinsic.signed { RawAddress::Index(_) => false, _ => true }; - // TODO: find a way to handle index error properly -- will need changes to // transaction-pool trait. let (api, at_block) = (&self.api, &self.at_block); - let get_nonce = || api.index(at_block, sender).ok().unwrap_or_else(Bounded::max_value); - let (next_nonce, was_index_sender) = self.known_nonces.entry(sender).or_insert_with(|| (get_nonce(), is_index_sender)); + let next_index = self.known_nonces.entry(sender) + .or_insert_with(|| api.index(at_block, sender).ok().unwrap_or_else(Bounded::max_value)); - trace!(target: "transaction-pool", "Next index for sender is {}; xt index is {}", next_nonce, xt.original.extrinsic.index); + trace!(target: "transaction-pool", "Next index for sender is {}; xt index is {}", next_index, xt.original.extrinsic.index); - if *was_index_sender == is_index_sender || get_nonce() == *next_nonce { - match xt.original.extrinsic.index.cmp(&next_nonce) { - Ordering::Greater => Readiness::Future, - Ordering::Less => Readiness::Stale, - Ordering::Equal => { - // remember to increment `next_nonce` - // TODO: this won't work perfectly since accounts can now be killed, returning the nonce - // to zero. - *next_nonce = next_nonce.saturating_add(1); - Readiness::Ready - } - } - } else { - // ignore for now. - Readiness::Future + let result = match xt.original.extrinsic.index.cmp(&next_index) { + // TODO: this won't work perfectly since accounts can now be killed, returning the nonce + // to zero. + // We should detect if the index was reset and mark all transactions as `Stale` for cull to work correctly. + // Otherwise those transactions will keep occupying the queue. + // Perhaps we could mark as stale if `index - state_index` > X? + Ordering::Greater => Readiness::Future, + Ordering::Equal => Readiness::Ready, + // TODO [ToDr] Should mark transactions referrencing too old blockhash as `Stale` as well. + Ordering::Less => Readiness::Stale, + }; + + // remember to increment `next_index` + *next_index = next_index.saturating_add(1); + + result + } +} + +pub struct Verifier<'a, A: 'a, B> { + api: &'a A, + at_block: B, +} + +impl<'a, A> Verifier<'a, A, A::CheckedBlockId> where + A: 'a + PolkadotApi, +{ + const NO_ACCOUNT: &'static str = "Account not found."; + + fn lookup(&self, address: Address) -> ::std::result::Result { + // TODO [ToDr] Consider introducing a cache for this. + match self.api.lookup(&self.at_block, address.clone()) { + Ok(Some(address)) => Ok(address), + Ok(None) => Err(Self::NO_ACCOUNT.into()), + Err(e) => { + error!("Error looking up address: {:?}: {:?}", address, e); + Err("API error.") + }, } } } -pub struct Verifier; - -impl txpool::Verifier for Verifier { +impl<'a, A> txpool::Verifier for Verifier<'a, A, A::CheckedBlockId> where + A: 'a + PolkadotApi, +{ type VerifiedTransaction = VerifiedTransaction; type Error = Error; fn verify_transaction(&self, uxt: UncheckedExtrinsic) -> Result { info!("Extrinsic Submitted: {:?}", uxt); - VerifiedTransaction::create(uxt) + + if !uxt.is_signed() { + bail!(ErrorKind::IsInherent(uxt)) + } + + let (encoded_size, hash) = uxt.using_encoded(|e| (e.len(), BlakeTwo256::hash(e))); + let inner = match uxt.clone().check(|a| self.lookup(a)) { + Ok(xt) => Some(xt), + // keep the transaction around in the future pool and attempt to promote it later. + Err(Self::NO_ACCOUNT) => None, + Err(e) => bail!(e), + }; + let sender = inner.as_ref().map(|x| x.signed.clone()); + + Ok(VerifiedTransaction { + original: uxt, + inner, + sender, + hash, + encoded_size + }) } } /// The polkadot transaction pool. /// /// Wraps a `extrinsic_pool::Pool`. -pub struct TransactionPool { - inner: Pool, +pub struct TransactionPool { + inner: Pool, + api: Arc, } -impl TransactionPool { +impl TransactionPool where + A: PolkadotApi, +{ /// Create a new transaction pool. - pub fn new(options: Options) -> Self { + pub fn new(options: Options, api: Arc) -> Self { TransactionPool { - inner: Pool::new(options, Verifier, Scoring), + inner: Pool::new(options, Scoring), + api, } } - // TODO: remove. This is pointless - just use `submit()` directly. - pub fn import_unchecked_extrinsic(&self, uxt: UncheckedExtrinsic) -> Result> { - self.inner.submit(vec![uxt]).map(|mut v| v.swap_remove(0)) + /// Attempt to directly import `UncheckedExtrinsic` without going through serialization. + pub fn import_unchecked_extrinsic(&self, block: BlockId, uxt: UncheckedExtrinsic) -> Result> { + let verifier = Verifier { + api: &*self.api, + at_block: self.api.check_id(block)?, + }; + self.inner.submit(verifier, vec![uxt]).map(|mut v| v.swap_remove(0)) + } + + /// Retry to import all semi-verified transactions (unknown account indices) + pub fn retry_verification(&self, block: BlockId) -> Result<()> { + let to_reverify = self.inner.remove_sender(None); + let verifier = Verifier { + api: &*self.api, + at_block: self.api.check_id(block)?, + }; + + self.inner.submit(verifier, to_reverify.into_iter().map(|tx| tx.original.clone()))?; + Ok(()) + } + + /// Reverify transaction that has been reported incorrect. + /// + /// Returns `Ok(None)` in case the hash is missing, `Err(e)` in case of verification error and new transaction + /// reference otherwise. + /// + /// TODO [ToDr] That method is currently unused, should be used together with BlockBuilder + /// when we detect that particular transaction has failed. + /// In such case we will attempt to remove or re-verify it. + pub fn reverify_transaction(&self, block: BlockId, hash: Hash) -> Result>> { + let result = self.inner.remove(&[hash], false).pop().expect("One hash passed; one result received; qed"); + if let Some(tx) = result { + self.import_unchecked_extrinsic(block, tx.original.clone()).map(Some) + } else { + Ok(None) + } + } + + /// Cull old transactions from the queue. + pub fn cull(&self, block: BlockId) -> Result { + let id = self.api.check_id(block)?; + let ready = Ready::create(id, &*self.api); + Ok(self.inner.cull(None, ready)) + } + + /// Cull transactions from the queue and then compute the pending set. + pub fn cull_and_get_pending(&self, block: BlockId, f: F) -> Result where + F: FnOnce(txpool::PendingIterator, Scoring, Listener>) -> T, + { + let id = self.api.check_id(block)?; + let ready = Ready::create(id, &*self.api); + self.inner.cull(None, ready.clone()); + Ok(self.inner.pending(ready, f)) + } + + /// Remove a set of transactions idenitified by hashes. + pub fn remove(&self, hashes: &[Hash], is_valid: bool) -> Vec>> { + self.inner.remove(hashes, is_valid) } } -impl Deref for TransactionPool { - type Target = Pool; +impl Deref for TransactionPool { + type Target = Pool; fn deref(&self) -> &Self::Target { &self.inner } } -impl ExtrinsicPool for TransactionPool { +impl ExtrinsicPool for TransactionPool where + A: Send + Sync + 'static, + A: PolkadotApi, +{ type Error = Error; - fn submit(&self, xts: Vec) -> Result> { + fn submit(&self, block: BlockId, xts: Vec) -> Result> { // TODO: more general transaction pool, which can handle more kinds of vec-encoded transactions, // even when runtime is out of date. xts.into_iter() .map(|xt| xt.encode()) - .map(|encoded| UncheckedExtrinsic::decode(&mut &encoded[..])) - .map(|maybe_decoded| maybe_decoded.ok_or_else(|| ErrorKind::InvalidExtrinsicFormat.into())) - .map(|x| x.and_then(|x| self.import_unchecked_extrinsic(x))) - .map(|x| x.map(|x| x.hash().clone())) + .map(|encoded| { + let decoded = UncheckedExtrinsic::decode(&mut &encoded[..]).ok_or(ErrorKind::InvalidExtrinsicFormat)?; + let tx = self.import_unchecked_extrinsic(block, decoded)?; + Ok(*tx.hash()) + }) .collect() } } #[cfg(test)] mod tests { - use super::{TransactionPool, Ready}; + use std::sync::{atomic::{self, AtomicBool}, Arc}; + use super::TransactionPool; use substrate_keyring::Keyring::{self, *}; use codec::Slicable; use polkadot_api::{PolkadotApi, BlockBuilder, CheckedBlockId, Result}; @@ -390,8 +439,23 @@ mod tests { } } - #[derive(Clone)] - struct TestPolkadotApi; + #[derive(Default, Clone)] + struct TestPolkadotApi { + no_lookup: Arc, + } + + impl TestPolkadotApi { + fn without_lookup() -> Self { + TestPolkadotApi { + no_lookup: Arc::new(AtomicBool::new(true)), + } + } + + pub fn enable_lookup(&self) { + self.no_lookup.store(false, atomic::Ordering::SeqCst); + } + } + impl PolkadotApi for TestPolkadotApi { type CheckedBlockId = TestCheckedBlockId; type BlockBuilder = TestBlockBuilder; @@ -415,6 +479,7 @@ mod tests { fn lookup(&self, _at: &TestCheckedBlockId, _address: RawAddress) -> Result> { match _address { RawAddress::Id(i) => Ok(Some(i)), + RawAddress::Index(_) if self.no_lookup.load(atomic::Ordering::SeqCst) => Ok(None), RawAddress::Index(i) => Ok(match (i < 8, i + (number_of(_at) as u64) % 8) { (false, _) => None, (_, 0) => Some(Alice.to_raw_public().into()), @@ -456,130 +521,168 @@ mod tests { }, MaybeUnsigned(sig.into())).using_encoded(|e| UncheckedExtrinsic::decode(&mut &e[..])).unwrap() } + fn pool(api: &TestPolkadotApi) -> TransactionPool { + TransactionPool::new(Default::default(), Arc::new(api.clone())) + } + #[test] fn id_submission_should_work() { - let pool = TransactionPool::new(Default::default()); - pool.submit(vec![uxt(Alice, 209, true)]).unwrap(); + let api = TestPolkadotApi::default(); + let pool = pool(&api); + pool.import_unchecked_extrinsic(BlockId::number(0), uxt(Alice, 209, true)).unwrap(); - let ready = Ready::create(TestPolkadotApi.check_id(BlockId::number(0)).unwrap(), &TestPolkadotApi); - let pending: Vec<_> = pool.cull_and_get_pending(ready, |p| p.map(|a| (a.sender().ok(), a.index())).collect()); + let pending: Vec<_> = pool.cull_and_get_pending(BlockId::number(0), |p| p.map(|a| (a.sender(), a.index())).collect()).unwrap(); assert_eq!(pending, vec![(Some(Alice.to_raw_public().into()), 209)]); } #[test] fn index_submission_should_work() { - let pool = TransactionPool::new(Default::default()); - pool.submit(vec![uxt(Alice, 209, false)]).unwrap(); + let api = TestPolkadotApi::default(); + let pool = pool(&api); + pool.import_unchecked_extrinsic(BlockId::number(0), uxt(Alice, 209, false)).unwrap(); - let ready = Ready::create(TestPolkadotApi.check_id(BlockId::number(0)).unwrap(), &TestPolkadotApi); - let pending: Vec<_> = pool.cull_and_get_pending(ready, |p| p.map(|a| (a.sender().ok(), a.index())).collect()); + let pending: Vec<_> = pool.cull_and_get_pending(BlockId::number(0), |p| p.map(|a| (a.sender(), a.index())).collect()).unwrap(); assert_eq!(pending, vec![(Some(Alice.to_raw_public().into()), 209)]); } #[test] fn multiple_id_submission_should_work() { - let pool = TransactionPool::new(Default::default()); - pool.submit(vec![uxt(Alice, 209, true)]).unwrap(); - pool.submit(vec![uxt(Alice, 210, true)]).unwrap(); + let api = TestPolkadotApi::default(); + let pool = pool(&api); + pool.import_unchecked_extrinsic(BlockId::number(0), uxt(Alice, 209, true)).unwrap(); + pool.import_unchecked_extrinsic(BlockId::number(0), uxt(Alice, 210, true)).unwrap(); - let ready = Ready::create(TestPolkadotApi.check_id(BlockId::number(0)).unwrap(), &TestPolkadotApi); - let pending: Vec<_> = pool.cull_and_get_pending(ready, |p| p.map(|a| (a.sender().ok(), a.index())).collect()); + let pending: Vec<_> = pool.cull_and_get_pending(BlockId::number(0), |p| p.map(|a| (a.sender(), a.index())).collect()).unwrap(); assert_eq!(pending, vec![(Some(Alice.to_raw_public().into()), 209), (Some(Alice.to_raw_public().into()), 210)]); } #[test] fn multiple_index_submission_should_work() { - let pool = TransactionPool::new(Default::default()); - pool.submit(vec![uxt(Alice, 209, false)]).unwrap(); - pool.submit(vec![uxt(Alice, 210, false)]).unwrap(); + let api = TestPolkadotApi::default(); + let pool = pool(&api); + pool.import_unchecked_extrinsic(BlockId::number(0), uxt(Alice, 209, false)).unwrap(); + pool.import_unchecked_extrinsic(BlockId::number(0), uxt(Alice, 210, false)).unwrap(); - let ready = Ready::create(TestPolkadotApi.check_id(BlockId::number(0)).unwrap(), &TestPolkadotApi); - let pending: Vec<_> = pool.cull_and_get_pending(ready, |p| p.map(|a| (a.sender().ok(), a.index())).collect()); + let pending: Vec<_> = pool.cull_and_get_pending(BlockId::number(0), |p| p.map(|a| (a.sender(), a.index())).collect()).unwrap(); assert_eq!(pending, vec![(Some(Alice.to_raw_public().into()), 209), (Some(Alice.to_raw_public().into()), 210)]); } #[test] fn id_based_early_nonce_should_be_culled() { - let pool = TransactionPool::new(Default::default()); - pool.submit(vec![uxt(Alice, 208, true)]).unwrap(); + let api = TestPolkadotApi::default(); + let pool = pool(&api); + pool.import_unchecked_extrinsic(BlockId::number(0), uxt(Alice, 208, true)).unwrap(); - let ready = Ready::create(TestPolkadotApi.check_id(BlockId::number(0)).unwrap(), &TestPolkadotApi); - let pending: Vec<_> = pool.cull_and_get_pending(ready, |p| p.map(|a| (a.sender().ok(), a.index())).collect()); + let pending: Vec<_> = pool.cull_and_get_pending(BlockId::number(0), |p| p.map(|a| (a.sender(), a.index())).collect()).unwrap(); assert_eq!(pending, vec![]); } #[test] fn index_based_early_nonce_should_be_culled() { - let pool = TransactionPool::new(Default::default()); - pool.submit(vec![uxt(Alice, 208, false)]).unwrap(); + let api = TestPolkadotApi::default(); + let pool = pool(&api); + pool.import_unchecked_extrinsic(BlockId::number(0), uxt(Alice, 208, false)).unwrap(); - let ready = Ready::create(TestPolkadotApi.check_id(BlockId::number(0)).unwrap(), &TestPolkadotApi); - let pending: Vec<_> = pool.cull_and_get_pending(ready, |p| p.map(|a| (a.sender().ok(), a.index())).collect()); + let pending: Vec<_> = pool.cull_and_get_pending(BlockId::number(0), |p| p.map(|a| (a.sender(), a.index())).collect()).unwrap(); assert_eq!(pending, vec![]); } #[test] fn id_based_late_nonce_should_be_queued() { - let pool = TransactionPool::new(Default::default()); - let ready = || Ready::create(TestPolkadotApi.check_id(BlockId::number(0)).unwrap(), &TestPolkadotApi); + let api = TestPolkadotApi::default(); + let pool = pool(&api); - pool.submit(vec![uxt(Alice, 210, true)]).unwrap(); - let pending: Vec<_> = pool.cull_and_get_pending(ready(), |p| p.map(|a| (a.sender().ok(), a.index())).collect()); + pool.import_unchecked_extrinsic(BlockId::number(0), uxt(Alice, 210, true)).unwrap(); + let pending: Vec<_> = pool.cull_and_get_pending(BlockId::number(0), |p| p.map(|a| (a.sender(), a.index())).collect()).unwrap(); assert_eq!(pending, vec![]); - pool.submit(vec![uxt(Alice, 209, true)]).unwrap(); - let pending: Vec<_> = pool.cull_and_get_pending(ready(), |p| p.map(|a| (a.sender().ok(), a.index())).collect()); + pool.import_unchecked_extrinsic(BlockId::number(0), uxt(Alice, 209, true)).unwrap(); + let pending: Vec<_> = pool.cull_and_get_pending(BlockId::number(0), |p| p.map(|a| (a.sender(), a.index())).collect()).unwrap(); assert_eq!(pending, vec![(Some(Alice.to_raw_public().into()), 209), (Some(Alice.to_raw_public().into()), 210)]); } #[test] fn index_based_late_nonce_should_be_queued() { - let pool = TransactionPool::new(Default::default()); - let ready = || Ready::create(TestPolkadotApi.check_id(BlockId::number(0)).unwrap(), &TestPolkadotApi); + let api = TestPolkadotApi::default(); + let pool = pool(&api); - pool.submit(vec![uxt(Alice, 210, false)]).unwrap(); - let pending: Vec<_> = pool.cull_and_get_pending(ready(), |p| p.map(|a| (a.sender().ok(), a.index())).collect()); + pool.import_unchecked_extrinsic(BlockId::number(0), uxt(Alice, 210, false)).unwrap(); + let pending: Vec<_> = pool.cull_and_get_pending(BlockId::number(0), |p| p.map(|a| (a.sender(), a.index())).collect()).unwrap(); assert_eq!(pending, vec![]); - pool.submit(vec![uxt(Alice, 209, false)]).unwrap(); - let pending: Vec<_> = pool.cull_and_get_pending(ready(), |p| p.map(|a| (a.sender().ok(), a.index())).collect()); + pool.import_unchecked_extrinsic(BlockId::number(0), uxt(Alice, 209, false)).unwrap(); + let pending: Vec<_> = pool.cull_and_get_pending(BlockId::number(0), |p| p.map(|a| (a.sender(), a.index())).collect()).unwrap(); assert_eq!(pending, vec![(Some(Alice.to_raw_public().into()), 209), (Some(Alice.to_raw_public().into()), 210)]); } #[test] fn index_then_id_submission_should_make_progress() { - let pool = TransactionPool::new(Default::default()); - pool.submit(vec![uxt(Alice, 209, false)]).unwrap(); - pool.submit(vec![uxt(Alice, 210, true)]).unwrap(); + let api = TestPolkadotApi::without_lookup(); + let pool = pool(&api); + pool.import_unchecked_extrinsic(BlockId::number(0), uxt(Alice, 209, false)).unwrap(); + pool.import_unchecked_extrinsic(BlockId::number(0), uxt(Alice, 210, true)).unwrap(); - let ready = Ready::create(TestPolkadotApi.check_id(BlockId::number(0)).unwrap(), &TestPolkadotApi); - let pending: Vec<_> = pool.cull_and_get_pending(ready, |p| p.map(|a| (a.sender().ok(), a.index())).collect()); + let pending: Vec<_> = pool.cull_and_get_pending(BlockId::number(0), |p| p.map(|a| (a.sender(), a.index())).collect()).unwrap(); + assert_eq!(pending, vec![]); + + api.enable_lookup(); + pool.retry_verification(BlockId::number(0)).unwrap(); + + let pending: Vec<_> = pool.cull_and_get_pending(BlockId::number(0), |p| p.map(|a| (a.sender(), a.index())).collect()).unwrap(); assert_eq!(pending, vec![ - (Some(Alice.to_raw_public().into()), 209) + (Some(Alice.to_raw_public().into()), 209), + (Some(Alice.to_raw_public().into()), 210) ]); } #[test] - fn id_then_index_submission_should_make_progress() { - let pool = TransactionPool::new(Default::default()); - pool.submit(vec![uxt(Alice, 209, true)]).unwrap(); - pool.submit(vec![uxt(Alice, 210, false)]).unwrap(); + fn retrying_verification_might_not_change_anything() { + let api = TestPolkadotApi::without_lookup(); + let pool = pool(&api); + pool.import_unchecked_extrinsic(BlockId::number(0), uxt(Alice, 209, false)).unwrap(); + pool.import_unchecked_extrinsic(BlockId::number(0), uxt(Alice, 210, true)).unwrap(); - let ready = Ready::create(TestPolkadotApi.check_id(BlockId::number(0)).unwrap(), &TestPolkadotApi); - let pending: Vec<_> = pool.cull_and_get_pending(ready, |p| p.map(|a| (a.sender().ok(), a.index())).collect()); + let pending: Vec<_> = pool.cull_and_get_pending(BlockId::number(0), |p| p.map(|a| (a.sender(), a.index())).collect()).unwrap(); + assert_eq!(pending, vec![]); + + pool.retry_verification(BlockId::number(1)).unwrap(); + + let pending: Vec<_> = pool.cull_and_get_pending(BlockId::number(0), |p| p.map(|a| (a.sender(), a.index())).collect()).unwrap(); + assert_eq!(pending, vec![]); + } + + #[test] + fn id_then_index_submission_should_make_progress() { + let api = TestPolkadotApi::without_lookup(); + let pool = pool(&api); + pool.import_unchecked_extrinsic(BlockId::number(0), uxt(Alice, 209, true)).unwrap(); + pool.import_unchecked_extrinsic(BlockId::number(0), uxt(Alice, 210, false)).unwrap(); + + let pending: Vec<_> = pool.cull_and_get_pending(BlockId::number(0), |p| p.map(|a| (a.sender(), a.index())).collect()).unwrap(); assert_eq!(pending, vec![ (Some(Alice.to_raw_public().into()), 209) ]); + + // when + api.enable_lookup(); + pool.retry_verification(BlockId::number(0)).unwrap(); + + let pending: Vec<_> = pool.cull_and_get_pending(BlockId::number(0), |p| p.map(|a| (a.sender(), a.index())).collect()).unwrap(); + assert_eq!(pending, vec![ + (Some(Alice.to_raw_public().into()), 209), + (Some(Alice.to_raw_public().into()), 210) + ]); } #[test] fn index_change_should_result_in_second_tx_culled_or_future() { - let pool = TransactionPool::new(Default::default()); - pool.submit(vec![uxt(Alice, 209, false)]).unwrap(); - pool.submit(vec![uxt(Alice, 210, false)]).unwrap(); + let api = TestPolkadotApi::default(); + let pool = pool(&api); + let block = BlockId::number(0); + pool.import_unchecked_extrinsic(block, uxt(Alice, 209, false)).unwrap(); + let hash = *pool.import_unchecked_extrinsic(block, uxt(Alice, 210, false)).unwrap().hash(); - let ready = Ready::create(TestPolkadotApi.check_id(BlockId::number(0)).unwrap(), &TestPolkadotApi); - let pending: Vec<_> = pool.cull_and_get_pending(ready, |p| p.map(|a| (a.sender().ok(), a.index())).collect()); + let pending: Vec<_> = pool.cull_and_get_pending(block, |p| p.map(|a| (a.sender(), a.index())).collect()).unwrap(); assert_eq!(pending, vec![ (Some(Alice.to_raw_public().into()), 209), (Some(Alice.to_raw_public().into()), 210) @@ -593,11 +696,14 @@ mod tests { // after this, a re-evaluation of the second's readiness should result in it being thrown // out (or maybe placed in future queue). -/* - // TODO: uncomment once the new queue design is in. - let ready = Ready::create(TestPolkadotApi.check_id(BlockId::number(1)).unwrap(), &TestPolkadotApi); - let pending: Vec<_> = pool.cull_and_get_pending(ready, |p| p.map(|a| (a.sender().ok(), a.index())).collect()); + let err = pool.reverify_transaction(BlockId::number(1), hash).unwrap_err(); + match *err.kind() { + ::error::ErrorKind::Msg(ref m) if m == "bad signature in extrinsic" => {}, + ref e => assert!(false, "The transaction should be rejected with BadSignature error, got: {:?}", e), + } + + let pending: Vec<_> = pool.cull_and_get_pending(BlockId::number(1), |p| p.map(|a| (a.sender(), a.index())).collect()).unwrap(); assert_eq!(pending, vec![]); -*/ + } }