From 3514ae98073f24ad75f3c4b7070d9e31b0b63591 Mon Sep 17 00:00:00 2001 From: Arkadiy Paronyan Date: Mon, 20 Aug 2018 14:54:03 +0200 Subject: [PATCH] More generic extrinsic pool (#579) --- substrate/Cargo.lock | 5 +- substrate/demo/cli/src/lib.rs | 80 ++- substrate/substrate/cli/Cargo.toml | 1 - substrate/substrate/cli/src/informant.rs | 1 - substrate/substrate/cli/src/lib.rs | 1 - substrate/substrate/extrinsic-pool/Cargo.toml | 6 + substrate/substrate/extrinsic-pool/src/api.rs | 66 --- .../substrate/extrinsic-pool/src/error.rs | 33 ++ substrate/substrate/extrinsic-pool/src/lib.rs | 21 +- .../substrate/extrinsic-pool/src/pool.rs | 498 +++++++++++++++--- substrate/substrate/network/src/protocol.rs | 42 +- substrate/substrate/network/src/service.rs | 40 +- substrate/substrate/network/src/test/mod.rs | 8 +- substrate/substrate/rpc-servers/src/lib.rs | 5 +- substrate/substrate/rpc/src/author/error.rs | 5 +- substrate/substrate/rpc/src/author/mod.rs | 59 ++- substrate/substrate/rpc/src/author/tests.rs | 153 +++--- .../substrate/runtime/primitives/src/lib.rs | 3 + substrate/substrate/service/src/components.rs | 62 ++- substrate/substrate/service/src/config.rs | 2 +- substrate/substrate/service/src/lib.rs | 109 +++- 21 files changed, 845 insertions(+), 355 deletions(-) delete mode 100644 substrate/substrate/extrinsic-pool/src/api.rs create mode 100644 substrate/substrate/extrinsic-pool/src/error.rs diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index 505d8eb229..c49662d977 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -2376,7 +2376,6 @@ dependencies = [ "regex 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", "slog 2.2.3 (registry+https://github.com/rust-lang/crates.io-index)", "substrate-client 0.1.0", - "substrate-extrinsic-pool 0.1.0", "substrate-network 0.1.0", "substrate-network-libp2p 0.1.0", "substrate-runtime-primitives 0.1.0", @@ -2494,6 +2493,10 @@ dependencies = [ "parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.70 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.70 (registry+https://github.com/rust-lang/crates.io-index)", + "substrate-codec 0.1.0", + "substrate-keyring 0.1.0", + "substrate-runtime-primitives 0.1.0", + "substrate-test-client 0.1.0", "transaction-pool 1.12.3 (registry+https://github.com/rust-lang/crates.io-index)", ] diff --git a/substrate/demo/cli/src/lib.rs b/substrate/demo/cli/src/lib.rs index 27a317e45f..a8d2fea6c1 100644 --- a/substrate/demo/cli/src/lib.rs +++ b/substrate/demo/cli/src/lib.rs @@ -48,41 +48,77 @@ extern crate log; pub mod error; use std::sync::Arc; -use demo_primitives::Hash; -use demo_runtime::{Block, BlockId, UncheckedExtrinsic, GenesisConfig, +use demo_primitives::{AccountId, Hash}; +use demo_runtime::{Block, BlockId, GenesisConfig, ConsensusConfig, CouncilConfig, DemocracyConfig, SessionConfig, StakingConfig, TimestampConfig}; use futures::{Future, Sink, Stream}; use tokio::runtime::Runtime; use demo_executor::NativeExecutor; +use extrinsic_pool::{Pool as ExtrinsicPool, ExtrinsicFor, VerifiedFor, scoring, Readiness}; -struct DummyPool; -impl extrinsic_pool::api::ExtrinsicPool for DummyPool { - type Error = extrinsic_pool::txpool::Error; - type InPool = (); +#[derive(Debug, Clone)] +struct VerifiedExtrinsic { + sender: AccountId, + hash: Hash, +} - fn submit(&self, _block: BlockId, _: Vec) - -> Result, Self::Error> - { - Err("unimplemented".into()) +impl extrinsic_pool::VerifiedTransaction for VerifiedExtrinsic { + type Hash = Hash; + type Sender = AccountId; + + fn hash(&self) -> &Self::Hash { + &self.hash } - fn submit_and_watch(&self, _block: BlockId, _: UncheckedExtrinsic) - -> Result, Self::Error> - { - Err("unimplemented".into()) + fn sender(&self) -> &Self::Sender { + &self.sender } - fn light_status(&self) -> extrinsic_pool::txpool::LightStatus { - unreachable!() + fn mem_usage(&self) -> usize { + 0 + } +} + +struct Pool; +impl extrinsic_pool::ChainApi for Pool { + type Block = Block; + type Hash = Hash; + type Sender = AccountId; + type VEx = VerifiedExtrinsic; + type Ready = (); + type Error = extrinsic_pool::Error; + type Score = u64; + type Event = (); + + fn verify_transaction(&self, _at: &BlockId, _xt: &ExtrinsicFor) -> Result { + unimplemented!() } - fn import_notification_stream(&self) -> extrinsic_pool::api::EventStream { - unreachable!() + fn ready(&self) -> Self::Ready { } + + fn is_ready(&self, _at: &BlockId, _ready: &mut Self::Ready, _xt: &VerifiedFor) -> Readiness { + unimplemented!() } - fn all(&self) -> Self::InPool { - unreachable!() + fn compare(_old: &VerifiedFor, _other: &VerifiedFor) -> ::std::cmp::Ordering { + unimplemented!() + } + + fn choose(_old: &VerifiedFor, _new: &VerifiedFor) -> scoring::Choice { + unimplemented!() + } + + fn update_scores( + _xts: &[extrinsic_pool::Transaction>], + _scores: &mut [Self::Score], + _change: scoring::Change<()> + ) { + unimplemented!() + } + + fn should_replace(_old: &VerifiedFor, _new: &VerifiedFor) -> scoring::Choice { + unimplemented!() } } @@ -180,8 +216,8 @@ pub fn run(args: I) -> error::Result<()> where let handler = || { let state = rpc::apis::state::State::new(client.clone(), runtime.executor()); let chain = rpc::apis::chain::Chain::new(client.clone(), runtime.executor()); - let author = rpc::apis::author::Author::new(client.clone(), Arc::new(DummyPool), runtime.executor()); - rpc::rpc_handler::(state, chain, author, DummySystem) + let author = rpc::apis::author::Author::new(client.clone(), Arc::new(ExtrinsicPool::new(Default::default(), Pool)), runtime.executor()); + rpc::rpc_handler::(state, chain, author, DummySystem) }; let http_address = "127.0.0.1:9933".parse().unwrap(); let ws_address = "127.0.0.1:9944".parse().unwrap(); diff --git a/substrate/substrate/cli/Cargo.toml b/substrate/substrate/cli/Cargo.toml index 176d59bbe7..7ae824c3dc 100644 --- a/substrate/substrate/cli/Cargo.toml +++ b/substrate/substrate/cli/Cargo.toml @@ -24,7 +24,6 @@ fdlimit = "0.1" exit-future = "0.1" sysinfo = "0.5.7" substrate-client = { path = "../../substrate/client" } -substrate-extrinsic-pool = { path = "../../substrate/extrinsic-pool" } substrate-network = { path = "../../substrate/network" } substrate-network-libp2p = { path = "../../substrate/network-libp2p" } substrate-runtime-primitives = { path = "../../substrate/runtime/primitives" } diff --git a/substrate/substrate/cli/src/informant.rs b/substrate/substrate/cli/src/informant.rs index 2cc61a8e83..eacc95c50d 100644 --- a/substrate/substrate/cli/src/informant.rs +++ b/substrate/substrate/cli/src/informant.rs @@ -26,7 +26,6 @@ use sysinfo::{get_current_pid, ProcessExt, System, SystemExt}; use network::{SyncState, SyncProvider}; use client::BlockchainEvents; use runtime_primitives::traits::{Header, As}; -use substrate_extrinsic_pool::api::ExtrinsicPool; const TIMER_INTERVAL_MS: u64 = 5000; diff --git a/substrate/substrate/cli/src/lib.rs b/substrate/substrate/cli/src/lib.rs index f2dee51e45..d5fe981b57 100644 --- a/substrate/substrate/cli/src/lib.rs +++ b/substrate/substrate/cli/src/lib.rs @@ -36,7 +36,6 @@ extern crate substrate_client as client; extern crate substrate_network as network; extern crate substrate_network_libp2p as network_libp2p; extern crate substrate_runtime_primitives as runtime_primitives; -extern crate substrate_extrinsic_pool; extern crate substrate_service as service; #[macro_use] extern crate slog; // needed until we can reexport `slog_info` from `substrate_telemetry` diff --git a/substrate/substrate/extrinsic-pool/Cargo.toml b/substrate/substrate/extrinsic-pool/Cargo.toml index b991bd697e..b31c49c8f6 100644 --- a/substrate/substrate/extrinsic-pool/Cargo.toml +++ b/substrate/substrate/extrinsic-pool/Cargo.toml @@ -11,3 +11,9 @@ futures = "0.1" log = "0.3" parking_lot = "0.4" transaction-pool = "1.12" +substrate-runtime-primitives = { path = "../../substrate/runtime/primitives" } + +[dev-dependencies] +substrate-test-client = { path = "../../substrate/test-client" } +substrate-keyring = { path = "../../substrate/keyring" } +substrate-codec = { path = "../../substrate/codec" } diff --git a/substrate/substrate/extrinsic-pool/src/api.rs b/substrate/substrate/extrinsic-pool/src/api.rs deleted file mode 100644 index 148ab49e4f..0000000000 --- a/substrate/substrate/extrinsic-pool/src/api.rs +++ /dev/null @@ -1,66 +0,0 @@ -// Copyright 2018 Parity Technologies (UK) Ltd. -// This file is part of Polkadot. - -// Polkadot is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Polkadot is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Polkadot. If not, see . - -//! External API for extrinsic pool. - -use std::fmt::Debug; - -use serde::{Serialize, de::DeserializeOwned}; -use txpool; -use futures::sync::mpsc; - -use watcher::Watcher; - -/// Extrinsic pool error. -pub trait Error: ::std::error::Error + Send + Sized { - /// Try to extract original `txpool::Error` - /// - /// This implementation is optional and used only to - /// provide more descriptive error messages for end users - /// of RPC API. - fn into_pool_error(self) -> Result { Err(self) } -} - -impl Error for txpool::Error { - fn into_pool_error(self) -> Result { Ok(self) } -} - -/// Modification notification event stream type; -pub type EventStream = mpsc::UnboundedReceiver<()>; - -/// Extrinsic pool. -pub trait ExtrinsicPool: Send + Sync + 'static { - /// Error type - type Error: Error; - - /// Pooled extrinsics - type InPool: Debug + Serialize + DeserializeOwned + Send + Sync + 'static; - - /// Submit a collection of extrinsics to the pool. - fn submit(&self, block: BlockId, xt: Vec) -> Result, Self::Error>; - - /// Submit an extrinsic to the pool and start watching it's progress. - fn submit_and_watch(&self, block: BlockId, xt: Ex) -> Result, Self::Error>; - - /// Returns light status of the pool. - fn light_status(&self) -> txpool::LightStatus; - - /// Return an event stream of transactions imported to the pool. - fn import_notification_stream(&self) -> EventStream; - - /// Return all extrinsics in the pool aggregated by the sender. - fn all(&self) -> Self::InPool; -} diff --git a/substrate/substrate/extrinsic-pool/src/error.rs b/substrate/substrate/extrinsic-pool/src/error.rs new file mode 100644 index 0000000000..506182ba18 --- /dev/null +++ b/substrate/substrate/extrinsic-pool/src/error.rs @@ -0,0 +1,33 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! External Error trait for extrinsic pool. + +use txpool; + +/// Extrinsic pool error. +pub trait IntoPoolError: ::std::error::Error + Send + Sized { + /// Try to extract original `txpool::Error` + /// + /// This implementation is optional and used only to + /// provide more descriptive error messages for end users + /// of RPC API. + fn into_pool_error(self) -> Result { Err(self) } +} + +impl IntoPoolError for txpool::Error { + fn into_pool_error(self) -> Result { Ok(self) } +} diff --git a/substrate/substrate/extrinsic-pool/src/lib.rs b/substrate/substrate/extrinsic-pool/src/lib.rs index ae6ac60f0f..7aee7130e3 100644 --- a/substrate/substrate/extrinsic-pool/src/lib.rs +++ b/substrate/substrate/extrinsic-pool/src/lib.rs @@ -15,25 +15,32 @@ // along with Polkadot. If not, see . #![warn(missing_docs)] +#![warn(unused_extern_crates)] //! Generic extrinsic pool. extern crate futures; extern crate parking_lot; -extern crate serde; +extern crate substrate_runtime_primitives as runtime_primitives; #[macro_use] extern crate log; +extern crate serde; #[macro_use] extern crate serde_derive; +extern crate transaction_pool as txpool; +#[cfg(test)] extern crate substrate_test_client as test_client; +#[cfg(test)] extern crate substrate_keyring as keyring; +#[cfg(test)] extern crate substrate_codec as codec; -pub extern crate transaction_pool as txpool; - -pub mod api; pub mod watcher; - +mod error; mod listener; mod pool; -pub use self::listener::Listener; -pub use self::pool::Pool; +pub use listener::Listener; +pub use pool::{Pool, ChainApi, EventStream, Verified, VerifiedFor, ExtrinsicFor, ExHash, AllExtrinsics}; +pub use txpool::scoring; +pub use txpool::{Error, ErrorKind}; +pub use error::IntoPoolError; +pub use txpool::{Options, Status, LightStatus, VerifiedTransaction, Readiness, Transaction}; diff --git a/substrate/substrate/extrinsic-pool/src/pool.rs b/substrate/substrate/extrinsic-pool/src/pool.rs index e874086f8d..5f70123eb9 100644 --- a/substrate/substrate/extrinsic-pool/src/pool.rs +++ b/substrate/substrate/extrinsic-pool/src/pool.rs @@ -1,65 +1,183 @@ // Copyright 2018 Parity Technologies (UK) Ltd. -// This file is part of Polkadot. +// This file is part of Substrate. -// Polkadot is free software: you can redistribute it and/or modify +// Substrate is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. -// Polkadot is distributed in the hope that it will be useful, +// Substrate is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License -// along with Polkadot. If not, see . - -use std::{ - collections::HashMap, - fmt, - marker::PhantomData, - sync::Arc, -}; +// along with Substrate. If not, see . +use std::{ collections::HashMap, fmt, sync::Arc, collections::BTreeMap}; use futures::sync::mpsc; -use parking_lot::{RwLock, Mutex}; -use txpool; +use parking_lot::{Mutex, RwLock}; +use serde::{Serialize, de::DeserializeOwned}; +use txpool::{self, Scoring, Readiness}; use listener::Listener; use watcher::Watcher; +use error::IntoPoolError; + +use runtime_primitives::{generic::BlockId, traits::Block as BlockT}; + +/// Modification notification event stream type; +pub type EventStream = mpsc::UnboundedReceiver<()>; + +/// Extrinsic hash type for a pool. +pub type ExHash = ::Hash; +/// Extrinsic type for a pool. +pub type ExtrinsicFor = <::Block as BlockT>::Extrinsic; +/// Verified extrinsic data for `ChainApi`. +pub type VerifiedFor = Verified, ::VEx>; +/// A collection of all extrinsics. +pub type AllExtrinsics = BTreeMap<<::VEx as txpool::VerifiedTransaction>::Sender, Vec>>; + +/// Verified extrinsic struct. Wraps original extrinsic and verification info. +#[derive(Debug)] +pub struct Verified { + /// Original extrinsic. + pub original: Ex, + /// Verification data. + pub verified: VEx, +} + +impl txpool::VerifiedTransaction for Verified +where + Ex: ::std::fmt::Debug, + VEx: txpool::VerifiedTransaction, +{ + type Hash = ::Hash; + type Sender = ::Sender; + + fn hash(&self) -> &Self::Hash { + self.verified.hash() + } + + fn sender(&self) -> &Self::Sender { + self.verified.sender() + } + + fn mem_usage(&self) -> usize { + // TODO: add `original` mem usage. + self.verified.mem_usage() + } +} + +/// Concrete extrinsic validation and query logic. +pub trait ChainApi: Send + Sync { + /// Block type. + type Block: BlockT; + /// Extrinsic hash type. + type Hash: ::std::hash::Hash + Eq + Copy + fmt::Debug + fmt::LowerHex + Serialize + DeserializeOwned + ::std::str::FromStr + Send + Sync + Default + 'static; + /// Extrinsic sender type. + type Sender: ::std::hash::Hash + fmt::Debug + Serialize + DeserializeOwned + Eq + Clone + Send + Sync + Ord + Default; + /// Unchecked extrinsic type. + /// Verified extrinsic type. + type VEx: txpool::VerifiedTransaction + Send + Sync + Clone; + /// Readiness evaluator + type Ready; + /// Error type. + type Error: From + IntoPoolError; + /// Score type. + type Score: ::std::cmp::Ord + Clone + Default + fmt::Debug + Send + Send + Sync; + /// Custom scoring update event type. + type Event: ::std::fmt::Debug; + /// Verify extrinsic at given block. + fn verify_transaction(&self, at: &BlockId, uxt: &ExtrinsicFor) -> Result; + + /// Create new readiness evaluator. + fn ready(&self) -> Self::Ready; + + /// Check readiness for verified extrinsic at given block. + fn is_ready(&self, at: &BlockId, context: &mut Self::Ready, xt: &VerifiedFor) -> Readiness; + + /// Decides on ordering of `T`s from a particular sender. + fn compare(old: &VerifiedFor, other: &VerifiedFor) -> ::std::cmp::Ordering; + + /// Decides how to deal with two transactions from a sender that seem to occupy the same slot in the queue. + fn choose(old: &VerifiedFor, new: &VerifiedFor) -> txpool::scoring::Choice; + + /// Updates the transaction scores given a list of transactions and a change to previous scoring. + /// NOTE: you can safely assume that both slices have the same length. + /// (i.e. score at index `i` represents transaction at the same index) + fn update_scores(xts: &[txpool::Transaction>], scores: &mut [Self::Score], change: txpool::scoring::Change); + + /// Decides if `new` should push out `old` transaction from the pool. + /// + /// NOTE returning `InsertNew` here can lead to some transactions being accepted above pool limits. + fn should_replace(old: &VerifiedFor, new: &VerifiedFor) -> txpool::scoring::Choice; +} + +pub struct Ready<'a, 'b, B: 'a + ChainApi> { + api: &'a B, + at: &'b BlockId, + context: B::Ready, +} + +impl<'a, 'b, B: ChainApi> txpool::Ready> for Ready<'a, 'b, B> { + fn is_ready(&mut self, xt: &VerifiedFor) -> Readiness { + self.api.is_ready(self.at, &mut self.context, xt) + } +} + +pub struct ScoringAdapter(::std::marker::PhantomData); + +impl ::std::fmt::Debug for ScoringAdapter { + fn fmt(&self, _f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { + Ok(()) + } +} + +impl Scoring> for ScoringAdapter { + type Score = ::Score; + type Event = ::Event; + + fn compare(&self, old: &VerifiedFor, other: &VerifiedFor) -> ::std::cmp::Ordering { + T::compare(old, other) + } + + fn choose(&self, old: &VerifiedFor, new: &VerifiedFor) -> txpool::scoring::Choice { + T::choose(old, new) + } + + fn update_scores(&self, xts: &[txpool::Transaction>], scores: &mut [Self::Score], change: txpool::scoring::Change) { + T::update_scores(xts, scores, change) + } + + fn should_replace(&self, old: &VerifiedFor, new: &VerifiedFor) -> txpool::scoring::Choice { + T::should_replace(old, new) + } +} /// Extrinsics pool. -pub struct Pool where - Hash: ::std::hash::Hash + Eq + Copy + fmt::Debug + fmt::LowerHex, - S: txpool::Scoring, - VEx: txpool::VerifiedTransaction, -{ - _error: Mutex>, +pub struct Pool { + api: B, pool: RwLock, + VerifiedFor, + ScoringAdapter, + Listener, >>, import_notification_sinks: Mutex>>, } -impl Pool where - Hash: ::std::hash::Hash + Eq + Copy + fmt::Debug + fmt::LowerHex + Default, - S: txpool::Scoring, - VEx: txpool::VerifiedTransaction, - E: From, -{ +impl Pool { /// Create a new transaction pool. - pub fn new(options: txpool::Options, scoring: S) -> Self { + pub fn new(options: txpool::Options, api: B) -> Self { Pool { - _error: Default::default(), - pool: RwLock::new(txpool::Pool::new(Listener::default(), scoring, options)), + pool: RwLock::new(txpool::Pool::new(Listener::default(), ScoringAdapter::(Default::default()), options)), import_notification_sinks: Default::default(), + api, } } /// Imports a pre-verified extrinsic to the pool. - pub fn import(&self, xt: VEx) -> Result, E> { + pub fn import(&self, xt: VerifiedFor) -> Result>, B::Error> { let result = self.pool.write().import(xt)?; self.import_notification_sinks.lock() @@ -69,62 +187,82 @@ impl Pool where } /// Return an event stream of transactions imported to the pool. - pub fn import_notification_stream(&self) -> mpsc::UnboundedReceiver<()> { + pub fn import_notification_stream(&self) -> EventStream { let (sink, stream) = mpsc::unbounded(); self.import_notification_sinks.lock().push(sink); stream } /// Invoked when extrinsics are broadcasted. - pub fn on_broadcasted(&self, propagated: HashMap>) { + pub fn on_broadcasted(&self, propagated: HashMap>) { for (hash, peers) in propagated.into_iter() { self.pool.write().listener_mut().broadcasted(&hash, peers); } } /// Imports a bunch of unverified extrinsics to the pool - pub fn submit(&self, verifier: V, xts: T) -> Result>, E> where - V: txpool::Verifier, - E: From, - T: IntoIterator + pub fn submit_at(&self, at: &BlockId, xts: T) -> Result>>, B::Error> where + T: IntoIterator> { xts .into_iter() - .map(|xt| verifier.verify_transaction(xt)) - .map(|xt| { - Ok(self.pool.write().import(xt?)?) + .map(|xt| (self.api.verify_transaction(at, &xt), xt)) + .map(|(v, xt)| { + let xt = Verified { original: xt, verified: v? }; + Ok(self.pool.write().import(xt)?) }) .collect() } + /// Imports one unverified extrinsic to the pool + pub fn submit_one(&self, at: &BlockId, xt: ExtrinsicFor) -> Result>, B::Error> { + let v = self.api.verify_transaction(at, &xt)?; + let xt = Verified { original: xt, verified: v }; + Ok(self.pool.write().import(xt)?) + } + /// Import a single extrinsic and starts to watch their progress in the pool. - pub fn submit_and_watch(&self, verifier: V, xt: Ex) -> Result, E> where - V: txpool::Verifier, - E: From, - { - let xt = self.submit(verifier, vec![xt])?.pop().expect("One extrinsic passed; one result returned; qed"); + pub fn submit_and_watch(&self, at: &BlockId, xt: ExtrinsicFor) -> Result, B::Error> { + let xt = self.submit_at(at, Some(xt))?.pop().expect("One extrinsic passed; one result returned; qed"); Ok(self.pool.write().listener_mut().create_watcher(xt)) } /// Remove from the pool. - pub fn remove(&self, hashes: &[Hash], is_valid: bool) -> Vec>> { + pub fn remove(&self, hashes: &[B::Hash], is_valid: bool) -> Vec>>> { let mut pool = self.pool.write(); let mut results = Vec::with_capacity(hashes.len()); for hash in hashes { - results.push(pool.remove(hash, !is_valid)); + results.push(pool.remove(hash, is_valid)); } results } /// Cull transactions from the queue. - pub fn cull(&self, senders: Option<&[::Sender]>, ready: R) -> usize where - R: txpool::Ready, + pub fn cull_from( + &self, + at: &BlockId, + senders: Option<&[::Sender]>, + ) -> usize { + let ready = Ready { api: &self.api, context: self.api.ready(), at }; self.pool.write().cull(senders, ready) } + /// Cull old transactions from the queue. + pub fn cull(&self, at: &BlockId) -> Result { + Ok(self.cull_from(at, None)) + } + + /// Cull transactions from the queue and then compute the pending set. + pub fn cull_and_get_pending(&self, at: &BlockId, f: F) -> Result where + F: FnOnce(txpool::PendingIterator, Ready, ScoringAdapter, Listener>) -> T, + { + self.cull_from(at, None); + Ok(self.pending(at, f)) + } + /// Get the full status of the queue (including readiness) - pub fn status>(&self, ready: R) -> txpool::Status { + pub fn status>>(&self, ready: R) -> txpool::Status { self.pool.read().status(ready) } @@ -134,34 +272,268 @@ impl Pool where } /// Removes all transactions from given sender - pub fn remove_sender(&self, sender: VEx::Sender) -> Vec> { + pub fn remove_sender(&self, sender: ::Sender) -> Vec>> { let mut pool = self.pool.write(); - let pending = pool.pending_from_sender(|_: &VEx| txpool::Readiness::Ready, &sender).collect(); + let pending = pool.pending_from_sender(|_: &VerifiedFor| txpool::Readiness::Ready, &sender).collect(); // remove all transactions from this sender - pool.cull(Some(&[sender]), |_: &VEx| txpool::Readiness::Stale); + pool.cull(Some(&[sender]), |_: &VerifiedFor| txpool::Readiness::Stale); pending } /// Retrieve the pending set. Be careful to not leak the pool `ReadGuard` to prevent deadlocks. - pub fn pending(&self, ready: R, f: F) -> T where - R: txpool::Ready, - F: FnOnce(txpool::PendingIterator>) -> T, + pub fn pending(&self, at: &BlockId, f: F) -> T where + F: FnOnce(txpool::PendingIterator, Ready, ScoringAdapter, Listener>) -> T, { + let ready = Ready { api: &self.api, context: self.api.ready(), at }; f(self.pool.read().pending(ready)) + } + + /// Retry to import all verified transactions from given sender. + pub fn retry_verification(&self, at: &BlockId, sender: ::Sender) -> Result<(), B::Error> { + let to_reverify = self.remove_sender(sender); + self.submit_at(at, to_reverify.into_iter().map(|ex| Arc::try_unwrap(ex).expect("Removed items have no references").original))?; + Ok(()) } - /// Retrieve all transactions in the pool. The transactions might be unordered. - pub fn all(&self, f: F) -> T where - F: FnOnce(txpool::UnorderedIterator) -> T, - { - f(self.pool.read().unordered_pending(AlwaysReady)) + /// Reverify transaction that has been reported incorrect. + /// + /// Returns `Ok(None)` in case the hash is missing, `Err(e)` in case of verification error and new transaction + /// reference otherwise. + /// + /// TODO [ToDr] That method is currently unused, should be used together with BlockBuilder + /// when we detect that particular transaction has failed. + /// In such case we will attempt to remove or re-verify it. + pub fn reverify_transaction(&self, at: &BlockId, hash: B::Hash) -> Result>>, B::Error> { + let result = self.remove(&[hash], false).pop().expect("One hash passed; one result received; qed"); + if let Some(ex) = result { + self.submit_one(at, Arc::try_unwrap(ex).expect("Removed items have no references").original).map(Some) + } else { + Ok(None) + } + } + + /// Retrieve all transactions in the pool grouped by sender. + pub fn all(&self) -> AllExtrinsics { + use txpool::VerifiedTransaction; + let pool = self.pool.read(); + let all = pool.unordered_pending(AlwaysReady); + all.fold(Default::default(), |mut map: AllExtrinsics, tx| { + // Map with `null` key is not serializable, so we fallback to default accountId. + map.entry(tx.verified.sender().clone()) + .or_insert_with(Vec::new) + // use bytes type to make it serialize nicer. + .push(tx.original.clone()); + map + }) } } -/// A Readiness implementation that returns `Ready` for all transactions. + /// A Readiness implementation that returns `Ready` for all transactions. pub struct AlwaysReady; impl txpool::Ready for AlwaysReady { fn is_ready(&mut self, _tx: &VEx) -> txpool::Readiness { txpool::Readiness::Ready } } + +#[cfg(test)] +mod tests { + use txpool; + use super::{VerifiedFor, ExtrinsicFor}; + use std::collections::HashMap; + use std::cmp::Ordering; + use {Pool, ChainApi, scoring, Readiness}; + use keyring::Keyring::{self, *}; + use codec::Encode; + use test_client::runtime::{AccountId, Block, Hash, Index, Extrinsic, Transfer}; + use runtime_primitives::{generic, traits::{Hash as HashT, BlindCheckable, BlakeTwo256}}; + use VerifiedTransaction as VerifiedExtrinsic; + + type BlockId = generic::BlockId; + + #[derive(Clone, Debug)] + pub struct VerifiedTransaction { + hash: Hash, + sender: AccountId, + nonce: u64, + } + + impl txpool::VerifiedTransaction for VerifiedTransaction { + type Hash = Hash; + type Sender = AccountId; + + fn hash(&self) -> &Self::Hash { + &self.hash + } + + fn sender(&self) -> &Self::Sender { + &self.sender + } + + fn mem_usage(&self) -> usize { + 256 + } + } + + struct TestApi; + + impl TestApi { + fn default() -> Self { + TestApi + } + } + + impl ChainApi for TestApi { + type Block = Block; + type Hash = Hash; + type Sender = AccountId; + type Error = txpool::Error; + type VEx = VerifiedTransaction; + type Ready = HashMap; + type Score = u64; + type Event = (); + + fn verify_transaction(&self, _at: &BlockId, uxt: &ExtrinsicFor) -> Result { + let hash = BlakeTwo256::hash(&uxt.encode()); + let xt = uxt.clone().check()?; + Ok(VerifiedTransaction { + hash, + sender: xt.transfer.from, + nonce: xt.transfer.nonce, + }) + } + + fn is_ready(&self, at: &BlockId, nonce_cache: &mut Self::Ready, xt: &VerifiedFor) -> Readiness { + let sender = xt.verified.sender; + let next_index = nonce_cache.entry(sender) + .or_insert_with(|| index(at, sender)); + + let result = match xt.original.transfer.nonce.cmp(&next_index) { + Ordering::Greater => Readiness::Future, + Ordering::Equal => Readiness::Ready, + Ordering::Less => Readiness::Stale, + }; + + // remember to increment `next_index` + *next_index = next_index.saturating_add(1); + + result + } + + fn ready(&self) -> Self::Ready { + HashMap::default() + } + + fn compare(old: &VerifiedFor, other: &VerifiedFor) -> Ordering { + old.original.transfer.nonce.cmp(&other.original.transfer.nonce) + } + + fn choose(old: &VerifiedFor, new: &VerifiedFor) -> scoring::Choice { + assert!(new.verified.sender == old.verified.sender, "Scoring::choose called with transactions from different senders"); + if old.original.transfer.nonce == new.original.transfer.nonce { + return scoring::Choice::RejectNew; + } + scoring::Choice::InsertNew + } + + fn update_scores( + xts: &[txpool::Transaction>], + scores: &mut [Self::Score], + _change: scoring::Change<()> + ) { + for i in 0..xts.len() { + scores[i] = xts[i].original.transfer.amount; + } + } + + fn should_replace(_old: &VerifiedFor, _new: &VerifiedFor) -> scoring::Choice { + scoring::Choice::InsertNew + } + } + + fn index(at: &BlockId, _account: AccountId) -> u64 { + (_account[0] as u64) + number_of(at) + } + + fn number_of(at: &BlockId) -> u64 { + match at { + generic::BlockId::Number(n) => *n as u64, + _ => 0, + } + } + + fn uxt(who: Keyring, nonce: Index) -> Extrinsic { + let transfer = Transfer { + from: who.to_raw_public().into(), + to: AccountId::default(), + nonce, + amount: 1, + }; + let signature = transfer.using_encoded(|e| who.sign(e)); + Extrinsic { + transfer, + signature: signature.into(), + } + } + + fn pool() -> Pool { + Pool::new(Default::default(), TestApi::default()) + } + + #[test] + fn submission_should_work() { + let pool = pool(); + assert_eq!(209, index(&BlockId::number(0), Alice.to_raw_public().into())); + pool.submit_one(&BlockId::number(0), uxt(Alice, 209)).unwrap(); + + let pending: Vec<_> = pool.cull_and_get_pending(&BlockId::number(0), |p| p.map(|a| (*a.sender(), a.original.transfer.nonce)).collect()).unwrap(); + assert_eq!(pending, vec![(Alice.to_raw_public().into(), 209)]); + } + + #[test] + fn multiple_submission_should_work() { + let pool = pool(); + pool.submit_one(&BlockId::number(0), uxt(Alice, 209)).unwrap(); + pool.submit_one(&BlockId::number(0), uxt(Alice, 210)).unwrap(); + + let pending: Vec<_> = pool.cull_and_get_pending(&BlockId::number(0), |p| p.map(|a| (*a.sender(), a.original.transfer.nonce)).collect()).unwrap(); + assert_eq!(pending, vec![(Alice.to_raw_public().into(), 209), (Alice.to_raw_public().into(), 210)]); + } + + #[test] + fn early_nonce_should_be_culled() { + let pool = pool(); + pool.submit_one(&BlockId::number(0), uxt(Alice, 208)).unwrap(); + + let pending: Vec<_> = pool.cull_and_get_pending(&BlockId::number(0), |p| p.map(|a| (*a.sender(), a.original.transfer.nonce)).collect()).unwrap(); + assert_eq!(pending, vec![]); + } + + #[test] + fn late_nonce_should_be_queued() { + let pool = pool(); + + pool.submit_one(&BlockId::number(0), uxt(Alice, 210)).unwrap(); + let pending: Vec<_> = pool.cull_and_get_pending(&BlockId::number(0), |p| p.map(|a| (*a.sender(), a.original.transfer.nonce)).collect()).unwrap(); + assert_eq!(pending, vec![]); + + pool.submit_one(&BlockId::number(0), uxt(Alice, 209)).unwrap(); + let pending: Vec<_> = pool.cull_and_get_pending(&BlockId::number(0), |p| p.map(|a| (*a.sender(), a.original.transfer.nonce)).collect()).unwrap(); + assert_eq!(pending, vec![(Alice.to_raw_public().into(), 209), (Alice.to_raw_public().into(), 210)]); + } + + #[test] + fn retrying_verification_might_not_change_anything() { + let pool = pool(); + pool.submit_one(&BlockId::number(0), uxt(Alice, 209)).unwrap(); + pool.submit_one(&BlockId::number(0), uxt(Alice, 210)).unwrap(); + + let pending: Vec<_> = pool.cull_and_get_pending(&BlockId::number(0), |p| p.map(|a| (*a.sender(), a.original.transfer.nonce)).collect()).unwrap(); + assert_eq!(pending, vec![(Alice.to_raw_public().into(), 209), (Alice.to_raw_public().into(), 210)]); + + pool.retry_verification(&BlockId::number(1), Alice.to_raw_public().into()).unwrap(); + + let pending: Vec<_> = pool.cull_and_get_pending(&BlockId::number(0), |p| p.map(|a| (*a.sender(), a.original.transfer.nonce)).collect()).unwrap(); + assert_eq!(pending, vec![(Alice.to_raw_public().into(), 209), (Alice.to_raw_public().into(), 210)]); + } +} diff --git a/substrate/substrate/network/src/protocol.rs b/substrate/substrate/network/src/protocol.rs index 10b08a4366..bf85395eb2 100644 --- a/substrate/substrate/network/src/protocol.rs +++ b/substrate/substrate/network/src/protocol.rs @@ -29,7 +29,7 @@ use message::{self, Message}; use message::generic::Message as GenericMessage; use specialization::Specialization; use sync::{ChainSync, Status as SyncStatus, SyncState}; -use service::{Roles, TransactionPool}; +use service::{Roles, TransactionPool, ExHashT}; use import_queue::ImportQueue; use config::ProtocolConfig; use chain::Client; @@ -48,16 +48,16 @@ pub (crate) const CURRENT_PACKET_COUNT: u8 = 1; const MAX_BLOCK_DATA_RESPONSE: u32 = 128; // Lock must always be taken in order declared here. -pub struct Protocol> { +pub struct Protocol, H: ExHashT> { config: ProtocolConfig, on_demand: Option>>, genesis_hash: B::Hash, sync: Arc>>, specialization: RwLock, - context_data: ContextData, + context_data: ContextData, // Connected peers pending Status message. handshaking_peers: RwLock>, - transaction_pool: Arc>, + transaction_pool: Arc>, } /// Syncing status and statistics #[derive(Clone)] @@ -71,7 +71,7 @@ pub struct ProtocolStatus { } /// Peer information -struct Peer { +struct Peer { /// Protocol version protocol_version: u32, /// Roles @@ -85,7 +85,7 @@ struct Peer { /// Request timestamp request_timestamp: Option, /// Holds a set of transactions known to this peer. - known_extrinsics: HashSet, + known_extrinsics: HashSet, /// Holds a set of blocks known to this peer. known_blocks: HashSet, /// Request counter, @@ -121,13 +121,13 @@ pub trait Context { } /// Protocol context. -pub(crate) struct ProtocolContext<'a, B: 'a + BlockT> { +pub(crate) struct ProtocolContext<'a, B: 'a + BlockT, H: 'a + ExHashT> { io: &'a mut SyncIo, - context_data: &'a ContextData, + context_data: &'a ContextData, } -impl<'a, B: BlockT + 'a> ProtocolContext<'a, B> { - pub(crate) fn new(context_data: &'a ContextData, io: &'a mut SyncIo) -> Self { +impl<'a, B: BlockT + 'a, H: 'a + ExHashT> ProtocolContext<'a, B, H> { + pub(crate) fn new(context_data: &'a ContextData, io: &'a mut SyncIo) -> Self { ProtocolContext { io, context_data, @@ -157,7 +157,7 @@ impl<'a, B: BlockT + 'a> ProtocolContext<'a, B> { } } -impl<'a, B: BlockT + 'a> Context for ProtocolContext<'a, B> { +impl<'a, B: BlockT + 'a, H: ExHashT + 'a> Context for ProtocolContext<'a, B, H> { fn send_message(&mut self, who: NodeIndex, message: Message) { ProtocolContext::send_message(self, who, message); } @@ -176,20 +176,20 @@ impl<'a, B: BlockT + 'a> Context for ProtocolContext<'a, B> { } /// Data necessary to create a context. -pub(crate) struct ContextData { +pub(crate) struct ContextData { // All connected peers - peers: RwLock>>, + peers: RwLock>>, chain: Arc>, } -impl> Protocol { +impl, H: ExHashT> Protocol { /// Create a new instance. pub fn new( config: ProtocolConfig, chain: Arc>, import_queue: Arc>, on_demand: Option>>, - transaction_pool: Arc>, + transaction_pool: Arc>, specialization: S, ) -> error::Result { let info = chain.info()?; @@ -210,7 +210,7 @@ impl> Protocol { Ok(protocol) } - pub(crate) fn context_data(&self) -> &ContextData { + pub(crate) fn context_data(&self) -> &ContextData { &self.context_data } @@ -276,7 +276,7 @@ impl> Protocol { } pub fn send_message(&self, io: &mut SyncIo, who: NodeIndex, message: Message) { - send_message::(&self.context_data.peers, io, who, message) + send_message::(&self.context_data.peers, io, who, message) } /// Called when a new peer is connected @@ -490,7 +490,7 @@ impl> Protocol { let (hashes, to_send): (Vec<_>, Vec<_>) = extrinsics .iter() .cloned() - .filter(|&(hash, _)| peer.known_extrinsics.insert(hash)) + .filter(|&(ref hash, _)| peer.known_extrinsics.insert(hash.clone())) .unzip(); if !to_send.is_empty() { @@ -616,11 +616,11 @@ impl> Protocol { Default::default() }, }; - self.send_message(io, who, GenericMessage::RemoteReadResponse(message::RemoteReadResponse { + self.send_message(io, who, GenericMessage::RemoteReadResponse(message::RemoteReadResponse { id: request.id, proof, })); } - fn on_remote_read_response(&self, io: &mut SyncIo, who: NodeIndex, response: message::RemoteReadResponse) { + fn on_remote_read_response(&self, io: &mut SyncIo, who: NodeIndex, response: message::RemoteReadResponse) { trace!(target: "sync", "Remote read response {} from {}", response.id, who); self.on_demand.as_ref().map(|s| s.on_remote_read_response(io, who, response)); } @@ -633,7 +633,7 @@ impl> Protocol { } } -fn send_message(peers: &RwLock>>, io: &mut SyncIo, who: NodeIndex, mut message: Message) { +fn send_message(peers: &RwLock>>, io: &mut SyncIo, who: NodeIndex, mut message: Message) { match &mut message { &mut GenericMessage::BlockRequest(ref mut r) => { let mut peers = peers.write(); diff --git a/substrate/substrate/network/src/service.rs b/substrate/substrate/network/src/service.rs index dfcff2a62c..43e026c36a 100644 --- a/substrate/substrate/network/src/service.rs +++ b/substrate/substrate/network/src/service.rs @@ -81,14 +81,17 @@ pub trait SyncProvider: Send + Sync { fn node_id(&self) -> Option; } +pub trait ExHashT: ::std::hash::Hash + Eq + ::std::fmt::Debug + Clone + Send + Sync + 'static {} +impl ExHashT for T where T: ::std::hash::Hash + Eq + ::std::fmt::Debug + Clone + Send + Sync + 'static {} + /// Transaction pool interface -pub trait TransactionPool: Send + Sync { +pub trait TransactionPool: Send + Sync { /// Get transactions from the pool that are ready to be propagated. - fn transactions(&self) -> Vec<(B::Hash, B::Extrinsic)>; + fn transactions(&self) -> Vec<(H, B::Extrinsic)>; /// Import a transaction into the pool. - fn import(&self, transaction: &B::Extrinsic) -> Option; + fn import(&self, transaction: &B::Extrinsic) -> Option; /// Notify the pool about transactions broadcast. - fn on_broadcasted(&self, propagations: HashMap>); + fn on_broadcasted(&self, propagations: HashMap>); } /// ConsensusService @@ -109,9 +112,9 @@ pub trait ExecuteInContext: Send + Sync { fn execute_in_context)>(&self, closure: F); } -/// devp2p Protocol handler -struct ProtocolHandler> { - protocol: Protocol, +/// Network protocol handler +struct ProtocolHandler, H: ExHashT> { + protocol: Protocol, } /// Peer connection information @@ -132,7 +135,7 @@ pub struct PeerInfo { } /// Service initialization parameters. -pub struct Params { +pub struct Params { /// Configuration. pub config: ProtocolConfig, /// Network layer configuration. @@ -142,24 +145,24 @@ pub struct Params { /// On-demand service reference. pub on_demand: Option>>, /// Transaction pool. - pub transaction_pool: Arc>, + pub transaction_pool: Arc>, /// Protocol specialization. pub specialization: S, } /// Polkadot network service. Handles network IO and manages connectivity. -pub struct Service> { +pub struct Service, H: ExHashT> { /// Network service network: NetworkService, /// Devp2p protocol handler - handler: Arc>, + handler: Arc>, /// Devp2p protocol ID. protocol_id: ProtocolId, } -impl> Service { +impl, H: ExHashT> Service { /// Creates and register protocol with the network service - pub fn new(params: Params, protocol_id: ProtocolId) -> Result>, Error> { + pub fn new(params: Params, protocol_id: ProtocolId) -> Result>, Error> { let chain = params.chain.clone(); let import_queue = Arc::new(AsyncImportQueue::new()); let handler = Arc::new(ProtocolHandler { @@ -228,13 +231,12 @@ impl> Service { } } -impl> Drop for Service { +impl, H:ExHashT> Drop for Service { fn drop(&mut self) { self.handler.protocol.stop(); } } - -impl> ExecuteInContext for Service { +impl, H: ExHashT> ExecuteInContext for Service { fn execute_in_context)>(&self, closure: F) { self.network.with_context(self.protocol_id, |context| { closure(&mut ProtocolContext::new(self.handler.protocol.context_data(), &mut NetSyncIo::new(context))) @@ -242,7 +244,7 @@ impl> ExecuteInContext for Service< } } -impl> SyncProvider for Service { +impl, H: ExHashT> SyncProvider for Service { /// Get sync status fn status(&self) -> ProtocolStatus { self.handler.protocol.status() @@ -276,7 +278,7 @@ impl> SyncProvider for Service> NetworkProtocolHandler for ProtocolHandler { +impl, H: ExHashT> NetworkProtocolHandler for ProtocolHandler { fn initialize(&self, io: &NetworkContext) { io.register_timer(TICK_TOKEN, TICK_TIMEOUT) .expect("Error registering sync timer"); @@ -319,7 +321,7 @@ pub trait ManageNetwork: Send + Sync { } -impl> ManageNetwork for Service { +impl, H: ExHashT> ManageNetwork for Service { fn accept_unreserved_peers(&self) { self.network.set_non_reserved_mode(NonReservedPeerMode::Accept); } diff --git a/substrate/substrate/network/src/test/mod.rs b/substrate/substrate/network/src/test/mod.rs index 9d98a66d94..35d0e2dc73 100644 --- a/substrate/substrate/network/src/test/mod.rs +++ b/substrate/substrate/network/src/test/mod.rs @@ -116,7 +116,7 @@ pub struct TestPacket { pub struct Peer { client: Arc>, - pub sync: Protocol, + pub sync: Protocol, pub queue: RwLock>, } @@ -173,8 +173,8 @@ impl Peer { fn flush(&self) { } - fn generate_blocks(&self, count: usize, mut edit_block: F) - where F: FnMut(&mut BlockBuilder) + fn generate_blocks(&self, count: usize, mut edit_block: F) + where F: FnMut(&mut BlockBuilder) { for _ in 0 .. count { let mut builder = self.client.new_block().unwrap(); @@ -207,7 +207,7 @@ impl Peer { pub struct EmptyTransactionPool; -impl TransactionPool for EmptyTransactionPool { +impl TransactionPool for EmptyTransactionPool { fn transactions(&self) -> Vec<(Hash, Extrinsic)> { Vec::new() } diff --git a/substrate/substrate/rpc-servers/src/lib.rs b/substrate/substrate/rpc-servers/src/lib.rs index e42996ff6c..8fcdea851f 100644 --- a/substrate/substrate/rpc-servers/src/lib.rs +++ b/substrate/substrate/rpc-servers/src/lib.rs @@ -39,17 +39,18 @@ pub type HttpServer = http::Server; pub type WsServer = ws::Server; /// Construct rpc `IoHandler` -pub fn rpc_handler( +pub fn rpc_handler( state: S, chain: C, author: A, system: Y, ) -> RpcHandler where Block: BlockT + 'static, + ExHash: Send + Sync + 'static + substrate_runtime_primitives::Serialize + substrate_runtime_primitives::DeserializeOwned, PendingExtrinsics: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static, S: apis::state::StateApi, C: apis::chain::ChainApi, - A: apis::author::AuthorApi, + A: apis::author::AuthorApi, Y: apis::system::SystemApi, { let mut io = pubsub::PubSubHandler::default(); diff --git a/substrate/substrate/rpc/src/author/error.rs b/substrate/substrate/rpc/src/author/error.rs index ce82ec2925..e831770056 100644 --- a/substrate/substrate/rpc/src/author/error.rs +++ b/substrate/substrate/rpc/src/author/error.rs @@ -17,15 +17,14 @@ //! Authoring RPC module errors. use client; -use extrinsic_pool::txpool; +use extrinsic_pool; use rpc; use errors; - error_chain! { links { - Pool(txpool::Error, txpool::ErrorKind) #[doc = "Pool error"]; + Pool(extrinsic_pool::Error, extrinsic_pool::ErrorKind) #[doc = "Pool error"]; Client(client::error::Error, client::error::ErrorKind) #[doc = "Client error"]; } errors { diff --git a/substrate/substrate/rpc/src/author/mod.rs b/substrate/substrate/rpc/src/author/mod.rs index d63bf95a63..08a46ad34b 100644 --- a/substrate/substrate/rpc/src/author/mod.rs +++ b/substrate/substrate/rpc/src/author/mod.rs @@ -19,10 +19,16 @@ use std::sync::Arc; use client::{self, Client}; -use codec::Codec; +use codec::Decode; use extrinsic_pool::{ - api::{Error, ExtrinsicPool}, + Pool, + IntoPoolError, + ChainApi as PoolChainApi, watcher::Status, + VerifiedTransaction, + AllExtrinsics, + ExHash, + ExtrinsicFor, }; use jsonrpc_macros::pubsub; use jsonrpc_pubsub::SubscriptionId; @@ -54,7 +60,7 @@ build_rpc_trait! { /// Returns all pending extrinsics, potentially grouped by sender. #[rpc(name = "author_pendingExtrinsics")] fn pending_extrinsics(&self) -> Result; - + #[pubsub(name = "author_extrinsicUpdate")] { /// Submit an extrinsic to watch. #[rpc(name = "author_submitAndWatchExtrinsic")] @@ -64,22 +70,27 @@ build_rpc_trait! { #[rpc(name = "author_unwatchExtrinsic")] fn unwatch_extrinsic(&self, SubscriptionId) -> Result; } + } } /// Authoring API -pub struct Author { +pub struct Author where + P: PoolChainApi + Sync + Send + 'static, +{ /// Substrate client - client: Arc>, + client: Arc::Block>>, /// Extrinsic pool - pool: Arc

, + pool: Arc>, /// Subscriptions manager subscriptions: Subscriptions, } -impl Author { +impl Author where + P: PoolChainApi + Sync + Send + 'static, +{ /// Create new instance of Authoring API. - pub fn new(client: Arc>, pool: Arc

, executor: TaskExecutor) -> Self { + pub fn new(client: Arc::Block>>, pool: Arc>, executor: TaskExecutor) -> Self { Author { client, pool, @@ -88,45 +99,40 @@ impl Author { } } -impl AuthorApi for Author where - B: client::backend::Backend + Send + Sync + 'static, - E: client::CallExecutor + Send + Sync + 'static, - Block: traits::Block + 'static, - Hash: traits::MaybeSerializeDebug + Send + Sync + 'static, - InPool: traits::MaybeSerializeDebug + Send + Sync + 'static, - P: ExtrinsicPool, Hash, InPool=InPool>, +impl AuthorApi, ExtrinsicFor

, AllExtrinsics

> for Author where + B: client::backend::Backend<

::Block, KeccakHasher, RlpCodec> + Send + Sync + 'static, + E: client::CallExecutor<

::Block, KeccakHasher, RlpCodec> + Send + Sync + 'static, + P: PoolChainApi + Sync + Send + 'static, P::Error: 'static, - Ex: Codec, { type Metadata = ::metadata::Metadata; - fn submit_extrinsic(&self, xt: Bytes) -> Result { - let dxt = Ex::decode(&mut &xt[..]).ok_or(error::Error::from(error::ErrorKind::BadFormat))?; + fn submit_extrinsic(&self, xt: Bytes) -> Result> { + let dxt = Decode::decode(&mut &xt[..]).ok_or(error::Error::from(error::ErrorKind::BadFormat))?; self.submit_rich_extrinsic(dxt) } - fn submit_rich_extrinsic(&self, xt: Ex) -> Result { + fn submit_rich_extrinsic(&self, xt: <

::Block as traits::Block>::Extrinsic) -> Result> { let best_block_hash = self.client.info()?.chain.best_hash; self.pool - .submit(generic::BlockId::hash(best_block_hash), vec![xt]) - .map(|mut res| res.pop().expect("One extrinsic passed; one result back; qed")) + .submit_one(&generic::BlockId::hash(best_block_hash), xt) .map_err(|e| e.into_pool_error() .map(Into::into) .unwrap_or_else(|e| error::ErrorKind::Verification(Box::new(e)).into()) ) + .map(|ex| ex.hash().clone()) } - fn pending_extrinsics(&self) -> Result { + fn pending_extrinsics(&self) -> Result> { Ok(self.pool.all()) } - fn watch_extrinsic(&self, _metadata: Self::Metadata, subscriber: pubsub::Subscriber>, xt: Bytes) { - + fn watch_extrinsic(&self, _metadata: Self::Metadata, subscriber: pubsub::Subscriber>>, xt: Bytes) { let submit = || -> Result<_> { let best_block_hash = self.client.info()?.chain.best_hash; - let dxt = Ex::decode(&mut &xt[..]).ok_or(error::Error::from(error::ErrorKind::BadFormat))?; + let dxt = <

::Block as traits::Block>::Extrinsic::decode(&mut &xt[..]).ok_or(error::Error::from(error::ErrorKind::BadFormat))?; self.pool - .submit_and_watch(generic::BlockId::hash(best_block_hash), dxt) + .submit_and_watch(&generic::BlockId::hash(best_block_hash), dxt) .map_err(|e| e.into_pool_error() .map(Into::into) .unwrap_or_else(|e| error::ErrorKind::Verification(Box::new(e)).into()) @@ -154,4 +160,3 @@ impl AuthorApi for Author>, - sender: Mutex>>, +#[derive(Clone, Debug)] +pub struct Verified +{ + sender: u64, + hash: u64, } -#[derive(Debug)] -struct Error; -impl api::Error for Error {} -impl ::std::error::Error for Error { - fn description(&self) -> &str { "Error" } +impl VerifiedTransaction for Verified { + type Hash = u64; + type Sender = u64; + + fn hash(&self) -> &Self::Hash { &self.hash } + fn sender(&self) -> &Self::Sender { &self.sender } + fn mem_usage(&self) -> usize { 256 } } -impl fmt::Display for Error { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - fmt::Debug::fmt(self, fmt) + +struct TestApi; + +impl ChainApi for TestApi { + type Block = Block; + type Hash = u64; + type Sender = u64; + type Error = PoolError; + type VEx = Verified; + type Score = u64; + type Event = (); + type Ready = (); + + fn verify_transaction(&self, _at: &BlockId, uxt: &ExtrinsicFor) -> Result { + Ok(Verified { + sender: uxt.transfer.from[31] as u64, + hash: uxt.transfer.nonce, + }) } -} -impl api::ExtrinsicPool for DummyTxPool { - type Error = Error; - type InPool = Vec; + fn is_ready(&self, _at: &BlockId, _c: &mut Self::Ready, _xt: &VerifiedFor) -> Readiness { + Readiness::Ready + } + + fn ready(&self) -> Self::Ready { } - /// Submit extrinsic for inclusion in block. - fn submit(&self, _block: BlockHash, xt: Vec) -> Result, Self::Error> { - let mut submitted = self.submitted.lock(); - if submitted.len() < 1 { - let hashes = xt.iter().map(|_xt| 1).collect(); - submitted.extend(xt); - Ok(hashes) - } else { - Err(Error) + fn compare(old: &VerifiedFor, other: &VerifiedFor) -> ::std::cmp::Ordering { + old.verified.hash().cmp(&other.verified.hash()) + } + + fn choose(_old: &VerifiedFor, _new: &VerifiedFor) -> scoring::Choice { + scoring::Choice::ReplaceOld + } + + fn update_scores(xts: &[Transaction>], scores: &mut [Self::Score], _change: scoring::Change<()>) { + for i in 0..xts.len() { + scores[i] = xts[i].verified.sender } } - fn submit_and_watch(&self, _block: BlockHash, xt: Extrinsic) -> Result, Self::Error> { - let mut submitted = self.submitted.lock(); - if submitted.len() < 1 { - submitted.push(xt); - let mut sender = watcher::Sender::default(); - let watcher = sender.new_watcher(); - *self.sender.lock() = Some(sender); - Ok(watcher) - } else { - Err(Error) + fn should_replace(_old: &VerifiedFor, _new: &VerifiedFor) -> scoring::Choice { + scoring::Choice::ReplaceOld + } +} + +type DummyTxPool = Pool; + +fn uxt(sender: u64, hash: u64) -> Extrinsic { + Extrinsic { + signature: Default::default(), + transfer: Transfer { + amount: Default::default(), + nonce: hash, + from: From::from(sender), + to: Default::default(), } } - - fn light_status(&self) -> txpool::LightStatus { - unreachable!() - } - - fn import_notification_stream(&self) -> api::EventStream { - unreachable!() - } - - fn all(&self) -> Self::InPool { - vec![1, 2, 3, 4, 5] - } } #[test] @@ -91,16 +104,16 @@ fn submit_transaction_should_not_cause_error() { let runtime = runtime::Runtime::new().unwrap(); let p = Author { client: Arc::new(test_client::new()), - pool: Arc::new(DummyTxPool::default()), + pool: Arc::new(DummyTxPool::new(Default::default(), TestApi)), subscriptions: Subscriptions::new(runtime.executor()), }; assert_matches!( - AuthorApi::submit_extrinsic(&p, u64::encode(&5).into()), + AuthorApi::submit_extrinsic(&p, uxt(5, 1).encode().into()), Ok(1) ); assert!( - AuthorApi::submit_extrinsic(&p, u64::encode(&5).into()).is_err() + AuthorApi::submit_extrinsic(&p, uxt(5, 1).encode().into()).is_err() ); } @@ -109,16 +122,16 @@ fn submit_rich_transaction_should_not_cause_error() { let runtime = runtime::Runtime::new().unwrap(); let p = Author { client: Arc::new(test_client::new()), - pool: Arc::new(DummyTxPool::default()), + pool: Arc::new(DummyTxPool::new(Default::default(), TestApi)), subscriptions: Subscriptions::new(runtime.executor()), }; assert_matches!( - AuthorApi::submit_rich_extrinsic(&p, 5), - Ok(1) + AuthorApi::submit_rich_extrinsic(&p, uxt(5, 0)), + Ok(0) ); assert!( - AuthorApi::submit_rich_extrinsic(&p, 5).is_err() + AuthorApi::submit_rich_extrinsic(&p, uxt(5, 0)).is_err() ); } @@ -126,7 +139,7 @@ fn submit_rich_transaction_should_not_cause_error() { fn should_watch_extrinsic() { //given let mut runtime = runtime::Runtime::new().unwrap(); - let pool = Arc::new(DummyTxPool::default()); + let pool = Arc::new(DummyTxPool::new(Default::default(), TestApi)); let p = Author { client: Arc::new(test_client::new()), pool: pool.clone(), @@ -135,31 +148,31 @@ fn should_watch_extrinsic() { let (subscriber, id_rx, data) = ::jsonrpc_macros::pubsub::Subscriber::new_test("test"); // when - p.watch_extrinsic(Default::default(), subscriber, u64::encode(&5).into()); + p.watch_extrinsic(Default::default(), subscriber, uxt(5, 5).encode().into()); // then assert_eq!(runtime.block_on(id_rx), Ok(Ok(0.into()))); - // check notifications - pool.sender.lock().as_mut().unwrap().usurped(5); - + AuthorApi::submit_rich_extrinsic(&p, uxt(5, 1)).unwrap(); assert_eq!( runtime.block_on(data.into_future()).unwrap().0, - Some(r#"{"jsonrpc":"2.0","method":"test","params":{"result":{"usurped":5},"subscription":0}}"#.into()) + Some(r#"{"jsonrpc":"2.0","method":"test","params":{"result":{"usurped":1},"subscription":0}}"#.into()) ); } #[test] fn should_return_pending_extrinsics() { let runtime = runtime::Runtime::new().unwrap(); + let pool = Arc::new(DummyTxPool::new(Default::default(), TestApi)); let p = Author { client: Arc::new(test_client::new()), - pool: Arc::new(DummyTxPool::default()), + pool: pool.clone(), subscriptions: Subscriptions::new(runtime.executor()), }; - - assert_matches!( + let ex = uxt(5, 1); + AuthorApi::submit_rich_extrinsic(&p, ex.clone()).unwrap(); + assert_matches!( p.pending_extrinsics(), - Ok(ref expected) if expected == &[1u8, 2, 3, 4, 5] + Ok(ref expected) if expected.get(&5) == Some(&vec![ex]) ); } diff --git a/substrate/substrate/runtime/primitives/src/lib.rs b/substrate/substrate/runtime/primitives/src/lib.rs index a658118bca..a1ff760f76 100644 --- a/substrate/substrate/runtime/primitives/src/lib.rs +++ b/substrate/substrate/runtime/primitives/src/lib.rs @@ -62,6 +62,9 @@ pub mod bft; use traits::{Verify, Lazy}; +#[cfg(feature = "std")] +pub use serde::{Serialize, de::DeserializeOwned}; + /// A set of key value pairs for storage. #[cfg(feature = "std")] pub type StorageMap = HashMap, Vec>; diff --git a/substrate/substrate/service/src/components.rs b/substrate/substrate/service/src/components.rs index d653168fbb..bd7cebb7a5 100644 --- a/substrate/substrate/service/src/components.rs +++ b/substrate/substrate/service/src/components.rs @@ -16,6 +16,7 @@ //! Polkadot service components. +use std::fmt; use std::sync::Arc; use std::marker::PhantomData; use serde::{Serialize, de::DeserializeOwned}; @@ -25,8 +26,8 @@ use client::{self, Client}; use error; use network::{self, OnDemand}; use substrate_executor::{NativeExecutor, NativeExecutionDispatch}; -use extrinsic_pool::{txpool::Options as ExtrinsicPoolOptions, api::ExtrinsicPool as ExtrinsicPoolApi}; -use runtime_primitives::{traits::Block as BlockT, traits::Header as HeaderT, generic::BlockId, BuildStorage}; +use extrinsic_pool::{self, Options as ExtrinsicPoolOptions, Pool as ExtrinsicPool}; +use runtime_primitives::{traits::Block as BlockT, traits::Header as HeaderT, BuildStorage}; use config::Configuration; use primitives::{KeccakHasher, RlpCodec, H256}; @@ -35,7 +36,8 @@ use primitives::{KeccakHasher, RlpCodec, H256}; /// Network service type for a factory. pub type NetworkService = network::Service< ::Block, - ::NetworkProtocol + ::NetworkProtocol, + ::ExtrinsicHash, >; /// Code executor type for a factory. @@ -53,8 +55,7 @@ pub type FullExecutor = client::LocalCallExecutor< /// Light client backend type for a factory. pub type LightBackend = client::light::backend::Backend< client_db::light::LightStorage<::Block>, - network::OnDemand<::Block, - NetworkService> + network::OnDemand<::Block, NetworkService>, >; /// Light client executor type for a factory. @@ -97,25 +98,33 @@ pub type ComponentClient = Client< /// Block type for `Components` pub type ComponentBlock = <::Factory as ServiceFactory>::Block; +/// Extrinsic hash type for `Components` +pub type ComponentExHash = <::ExtrinsicPoolApi as extrinsic_pool::ChainApi>::Hash; + +/// Extrinsic type. +pub type ComponentExtrinsic = as BlockT>::Extrinsic; + /// Extrinsic pool API type for `Components`. -pub type PoolApi = <::ExtrinsicPool as ExtrinsicPool>>::Api; +pub type PoolApi = ::ExtrinsicPoolApi; /// A set of traits for the runtime genesis config. pub trait RuntimeGenesis: Serialize + DeserializeOwned + BuildStorage {} impl RuntimeGenesis for T {} /// A collection of types and methods to build a service on top of the substrate service. -pub trait ServiceFactory { +pub trait ServiceFactory: 'static { /// Block type. type Block: BlockT; + /// Extrinsic hash type. + type ExtrinsicHash: ::std::hash::Hash + Eq + Copy + fmt::Debug + fmt::LowerHex + Serialize + DeserializeOwned + ::std::str::FromStr + Send + Sync + Default + 'static; /// Network protocol extensions. type NetworkProtocol: network::specialization::Specialization; /// Chain runtime. type RuntimeDispatch: NativeExecutionDispatch + Send + Sync + 'static; - /// Extrinsic pool type for the full client. - type FullExtrinsicPool: ExtrinsicPool; - /// Extrinsic pool type for the light client. - type LightExtrinsicPool: ExtrinsicPool; + /// Extrinsic pool backend type for the full client. + type FullExtrinsicPoolApi: extrinsic_pool::ChainApi + Send + 'static; + /// Extrinsic pool backend type for the light client. + type LightExtrinsicPoolApi: extrinsic_pool::ChainApi + 'static; /// Genesis configuration for the runtime. type Genesis: RuntimeGenesis; /// Other configuration for service members. @@ -127,29 +136,18 @@ pub trait ServiceFactory { //TODO: replace these with a constructor trait. that ExtrinsicPool implements. /// Extrinsic pool constructor for the full client. fn build_full_extrinsic_pool(config: ExtrinsicPoolOptions, client: Arc>) - -> Result; + -> Result, error::Error>; /// Extrinsic pool constructor for the light client. fn build_light_extrinsic_pool(config: ExtrinsicPoolOptions, client: Arc>) - -> Result; + -> Result, error::Error>; /// Build network protocol. fn build_network_protocol(config: &FactoryFullConfiguration) -> Result; -} - -// TODO: move this to substrate-extrinsic-pool -/// Extrinsic pool bridge. -pub trait ExtrinsicPool: network::TransactionPool + Send + Sync + 'static { - type Api: ExtrinsicPoolApi, Block::Hash>; - - /// Update the pool after a new block has been imported. - fn prune_imported(&self, hash: &Block::Hash); - /// Returns underlying API. - fn api(&self) -> Arc; -} +} /// A collection of types and function to generalise over full / light client type. -pub trait Components { +pub trait Components: 'static { /// Associated service factory. type Factory: ServiceFactory; /// Client backend. @@ -157,7 +155,7 @@ pub trait Components { /// Client executor. type Executor: 'static + client::CallExecutor, KeccakHasher, RlpCodec> + Send + Sync; /// Extrinsic pool type. - type ExtrinsicPool: ExtrinsicPool>; + type ExtrinsicPoolApi: 'static + extrinsic_pool::ChainApi::ExtrinsicHash, Block=FactoryBlock>; /// Create client. fn build_client( @@ -171,7 +169,7 @@ pub trait Components { /// Create extrinsic pool. fn build_extrinsic_pool(config: ExtrinsicPoolOptions, client: Arc>) - -> Result; + -> Result, error::Error>; } /// A struct that implement `Components` for the full client. @@ -183,7 +181,7 @@ impl Components for FullComponents { type Factory = Factory; type Executor = FullExecutor; type Backend = FullBackend; - type ExtrinsicPool = ::FullExtrinsicPool; + type ExtrinsicPoolApi = ::FullExtrinsicPoolApi; fn build_client( config: &FactoryFullConfiguration, @@ -203,7 +201,7 @@ impl Components for FullComponents { } fn build_extrinsic_pool(config: ExtrinsicPoolOptions, client: Arc>) - -> Result + -> Result, error::Error> { Factory::build_full_extrinsic_pool(config, client) } @@ -221,7 +219,7 @@ impl Components for LightComponents type Factory = Factory; type Executor = LightExecutor; type Backend = LightBackend; - type ExtrinsicPool = ::LightExtrinsicPool; + type ExtrinsicPoolApi = ::LightExtrinsicPoolApi; fn build_client( config: &FactoryFullConfiguration, @@ -248,7 +246,7 @@ impl Components for LightComponents } fn build_extrinsic_pool(config: ExtrinsicPoolOptions, client: Arc>) - -> Result + -> Result, error::Error> { Factory::build_light_extrinsic_pool(config, client) } diff --git a/substrate/substrate/service/src/config.rs b/substrate/substrate/service/src/config.rs index be3bcd5d2d..d6f0b074c0 100644 --- a/substrate/substrate/service/src/config.rs +++ b/substrate/substrate/service/src/config.rs @@ -38,7 +38,7 @@ pub struct Configuration { /// Node roles. pub roles: Roles, /// Extrinsic pool configuration. - pub extrinsic_pool: extrinsic_pool::txpool::Options, + pub extrinsic_pool: extrinsic_pool::Options, /// Network configuration. pub network: NetworkConfiguration, /// Path to key files. diff --git a/substrate/substrate/service/src/lib.rs b/substrate/substrate/service/src/lib.rs index a265ec41e1..2b8158cd27 100644 --- a/substrate/substrate/service/src/lib.rs +++ b/substrate/substrate/service/src/lib.rs @@ -57,34 +57,37 @@ pub mod chain_ops; use std::io; use std::net::SocketAddr; use std::sync::Arc; +use std::collections::HashMap; use std::fmt::Write; use futures::prelude::*; use keystore::Store as Keystore; use client::BlockchainEvents; use runtime_primitives::traits::{Header, As}; +use runtime_primitives::generic::BlockId; use exit_future::Signal; use tokio::runtime::TaskExecutor; use substrate_executor::NativeExecutor; +use codec::{Encode, Decode}; pub use self::error::{ErrorKind, Error}; pub use config::{Configuration, Roles, PruningMode}; pub use chain_spec::ChainSpec; -pub use extrinsic_pool::txpool::{Options as ExtrinsicPoolOptions}; -pub use extrinsic_pool::api::{ExtrinsicPool as ExtrinsicPoolApi}; +pub use extrinsic_pool::{Pool as ExtrinsicPool, Options as ExtrinsicPoolOptions, ChainApi, VerifiedTransaction, IntoPoolError}; pub use client::ExecutionStrategy; pub use components::{ServiceFactory, FullBackend, FullExecutor, LightBackend, - LightExecutor, ExtrinsicPool, Components, PoolApi, ComponentClient, + LightExecutor, Components, PoolApi, ComponentClient, ComponentBlock, FullClient, LightClient, FullComponents, LightComponents, CodeExecutor, NetworkService, FactoryChainSpec, FactoryBlock, - FactoryFullConfiguration, RuntimeGenesis, FactoryGenesis, + FactoryFullConfiguration, RuntimeGenesis, FactoryGenesis, + ComponentExHash, ComponentExtrinsic, }; /// Substrate service. pub struct Service { client: Arc>, network: Option>>, - extrinsic_pool: Arc, + extrinsic_pool: Arc>, keystore: Keystore, exit: ::exit_future::Exit, signal: Option, @@ -149,7 +152,11 @@ impl Service let extrinsic_pool = Arc::new( Components::build_extrinsic_pool(config.extrinsic_pool, client.clone())? ); - let extrinsic_pool_adapter = extrinsic_pool.clone(); + let extrinsic_pool_adapter = ExtrinsicPoolAdapter:: { + imports_external_transactions: !config.roles == Roles::LIGHT, + pool: extrinsic_pool.clone(), + client: client.clone(), + }; let network_params = network::Params { config: network::ProtocolConfig { @@ -159,7 +166,7 @@ impl Service chain: client.clone(), on_demand: on_demand.clone() .map(|d| d as Arc>>), - transaction_pool: extrinsic_pool_adapter, + transaction_pool: Arc::new(extrinsic_pool_adapter), specialization: network_protocol, }; @@ -174,7 +181,8 @@ impl Service let events = client.import_notification_stream() .for_each(move |notification| { network.on_block_imported(notification.hash, ¬ification.header); - txpool.prune_imported(¬ification.hash); + txpool.cull(&BlockId::hash(notification.hash)) + .map_err(|e| warn!("Error removing extrinsics: {:?}", e))?; Ok(()) }) .select(exit.clone()) @@ -185,7 +193,7 @@ impl Service { // extrinsic notifications let network = network.clone(); - let events = extrinsic_pool.api().import_notification_stream() + let events = extrinsic_pool.import_notification_stream() // TODO [ToDr] Consider throttling? .for_each(move |_| { network.trigger_repropagate(); @@ -209,9 +217,8 @@ impl Service let client = client.clone(); let chain = rpc::apis::chain::Chain::new(client.clone(), task_executor.clone()); let state = rpc::apis::state::State::new(client.clone(), task_executor.clone()); - let author = rpc::apis::author::Author::new(client.clone(), extrinsic_pool.api(), task_executor.clone()); - - rpc::rpc_handler::, _, _, _, _, _>( + let author = rpc::apis::author::Author::new(client.clone(), extrinsic_pool.clone(), task_executor.clone()); + rpc::rpc_handler::, ComponentExHash, _, _, _, _, _>( state, chain, author, @@ -278,8 +285,8 @@ impl Service } /// Get shared extrinsic pool instance. - pub fn extrinsic_pool(&self) -> Arc> { - self.extrinsic_pool.api() + pub fn extrinsic_pool(&self) -> Arc> { + self.extrinsic_pool.clone() } /// Get shared keystore. @@ -343,3 +350,77 @@ impl substrate_rpc::system::SystemApi for RpcConfig { Ok(self.chain_name.clone()) } } + +/// Transaction pool adapter. +pub struct ExtrinsicPoolAdapter { + imports_external_transactions: bool, + pool: Arc>, + client: Arc>, +} + +impl ExtrinsicPoolAdapter { + fn best_block_id(&self) -> Option>> { + self.client.info() + .map(|info| BlockId::hash(info.chain.best_hash)) + .map_err(|e| { + debug!("Error getting best block: {:?}", e); + }) + .ok() + } +} + +impl network::TransactionPool, ComponentBlock> for ExtrinsicPoolAdapter { + fn transactions(&self) -> Vec<(ComponentExHash, ComponentExtrinsic)> { + let best_block_id = match self.best_block_id() { + Some(id) => id, + None => return vec![], + }; + self.pool.cull_and_get_pending(&best_block_id, |pending| pending + .map(|t| { + let hash = t.hash().clone(); + let ex: ComponentExtrinsic = t.original.clone(); + (hash, ex) + }) + .collect() + ).unwrap_or_else(|e| { + warn!("Error retrieving pending set: {}", e); + vec![] + }) + } + + fn import(&self, transaction: &ComponentExtrinsic) -> Option> { + if !self.imports_external_transactions { + return None; + } + + let encoded = transaction.encode(); + if let Some(uxt) = Decode::decode(&mut &encoded[..]) { + let best_block_id = self.best_block_id()?; + match self.pool.submit_one(&best_block_id, uxt) { + Ok(xt) => Some(*xt.hash()), + Err(e) => match e.into_pool_error() { + Ok(e) => match e.kind() { + extrinsic_pool::ErrorKind::AlreadyImported(hash) => + Some(::std::str::FromStr::from_str(&hash).map_err(|_| {}) + .expect("Hash string is always valid")), + _ => { + debug!("Error adding transaction to the pool: {:?}", e); + None + }, + }, + Err(e) => { + debug!("Error converting pool error: {:?}", e); + None + } + } + } + } else { + debug!("Error decoding transaction"); + None + } + } + + fn on_broadcasted(&self, propagations: HashMap, Vec>) { + self.pool.on_broadcasted(propagations) + } +} \ No newline at end of file