More generic extrinsic pool (#579)

This commit is contained in:
Arkadiy Paronyan
2018-08-20 14:54:03 +02:00
committed by Gav Wood
parent 3f366cc738
commit 3514ae9807
21 changed files with 845 additions and 355 deletions
+4 -1
View File
@@ -2376,7 +2376,6 @@ dependencies = [
"regex 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
"slog 2.2.3 (registry+https://github.com/rust-lang/crates.io-index)",
"substrate-client 0.1.0",
"substrate-extrinsic-pool 0.1.0",
"substrate-network 0.1.0",
"substrate-network-libp2p 0.1.0",
"substrate-runtime-primitives 0.1.0",
@@ -2494,6 +2493,10 @@ dependencies = [
"parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.70 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_derive 1.0.70 (registry+https://github.com/rust-lang/crates.io-index)",
"substrate-codec 0.1.0",
"substrate-keyring 0.1.0",
"substrate-runtime-primitives 0.1.0",
"substrate-test-client 0.1.0",
"transaction-pool 1.12.3 (registry+https://github.com/rust-lang/crates.io-index)",
]
+58 -22
View File
@@ -48,41 +48,77 @@ extern crate log;
pub mod error;
use std::sync::Arc;
use demo_primitives::Hash;
use demo_runtime::{Block, BlockId, UncheckedExtrinsic, GenesisConfig,
use demo_primitives::{AccountId, Hash};
use demo_runtime::{Block, BlockId, GenesisConfig,
ConsensusConfig, CouncilConfig, DemocracyConfig, SessionConfig, StakingConfig,
TimestampConfig};
use futures::{Future, Sink, Stream};
use tokio::runtime::Runtime;
use demo_executor::NativeExecutor;
use extrinsic_pool::{Pool as ExtrinsicPool, ExtrinsicFor, VerifiedFor, scoring, Readiness};
struct DummyPool;
impl extrinsic_pool::api::ExtrinsicPool<UncheckedExtrinsic, BlockId, Hash> for DummyPool {
type Error = extrinsic_pool::txpool::Error;
type InPool = ();
#[derive(Debug, Clone)]
struct VerifiedExtrinsic {
sender: AccountId,
hash: Hash,
}
fn submit(&self, _block: BlockId, _: Vec<UncheckedExtrinsic>)
-> Result<Vec<Hash>, Self::Error>
{
Err("unimplemented".into())
impl extrinsic_pool::VerifiedTransaction for VerifiedExtrinsic {
type Hash = Hash;
type Sender = AccountId;
fn hash(&self) -> &Self::Hash {
&self.hash
}
fn submit_and_watch(&self, _block: BlockId, _: UncheckedExtrinsic)
-> Result<extrinsic_pool::watcher::Watcher<Hash>, Self::Error>
{
Err("unimplemented".into())
fn sender(&self) -> &Self::Sender {
&self.sender
}
fn light_status(&self) -> extrinsic_pool::txpool::LightStatus {
unreachable!()
fn mem_usage(&self) -> usize {
0
}
}
struct Pool;
impl extrinsic_pool::ChainApi for Pool {
type Block = Block;
type Hash = Hash;
type Sender = AccountId;
type VEx = VerifiedExtrinsic;
type Ready = ();
type Error = extrinsic_pool::Error;
type Score = u64;
type Event = ();
fn verify_transaction(&self, _at: &BlockId, _xt: &ExtrinsicFor<Self>) -> Result<Self::VEx, Self::Error> {
unimplemented!()
}
fn import_notification_stream(&self) -> extrinsic_pool::api::EventStream {
unreachable!()
fn ready(&self) -> Self::Ready { }
fn is_ready(&self, _at: &BlockId, _ready: &mut Self::Ready, _xt: &VerifiedFor<Self>) -> Readiness {
unimplemented!()
}
fn all(&self) -> Self::InPool {
unreachable!()
fn compare(_old: &VerifiedFor<Self>, _other: &VerifiedFor<Self>) -> ::std::cmp::Ordering {
unimplemented!()
}
fn choose(_old: &VerifiedFor<Self>, _new: &VerifiedFor<Self>) -> scoring::Choice {
unimplemented!()
}
fn update_scores(
_xts: &[extrinsic_pool::Transaction<VerifiedFor<Self>>],
_scores: &mut [Self::Score],
_change: scoring::Change<()>
) {
unimplemented!()
}
fn should_replace(_old: &VerifiedFor<Self>, _new: &VerifiedFor<Self>) -> scoring::Choice {
unimplemented!()
}
}
@@ -180,8 +216,8 @@ pub fn run<I, T>(args: I) -> error::Result<()> where
let handler = || {
let state = rpc::apis::state::State::new(client.clone(), runtime.executor());
let chain = rpc::apis::chain::Chain::new(client.clone(), runtime.executor());
let author = rpc::apis::author::Author::new(client.clone(), Arc::new(DummyPool), runtime.executor());
rpc::rpc_handler::<Block, _, _, _, _, _>(state, chain, author, DummySystem)
let author = rpc::apis::author::Author::new(client.clone(), Arc::new(ExtrinsicPool::new(Default::default(), Pool)), runtime.executor());
rpc::rpc_handler::<Block, Hash, _, _, _, _, _>(state, chain, author, DummySystem)
};
let http_address = "127.0.0.1:9933".parse().unwrap();
let ws_address = "127.0.0.1:9944".parse().unwrap();
-1
View File
@@ -24,7 +24,6 @@ fdlimit = "0.1"
exit-future = "0.1"
sysinfo = "0.5.7"
substrate-client = { path = "../../substrate/client" }
substrate-extrinsic-pool = { path = "../../substrate/extrinsic-pool" }
substrate-network = { path = "../../substrate/network" }
substrate-network-libp2p = { path = "../../substrate/network-libp2p" }
substrate-runtime-primitives = { path = "../../substrate/runtime/primitives" }
-1
View File
@@ -26,7 +26,6 @@ use sysinfo::{get_current_pid, ProcessExt, System, SystemExt};
use network::{SyncState, SyncProvider};
use client::BlockchainEvents;
use runtime_primitives::traits::{Header, As};
use substrate_extrinsic_pool::api::ExtrinsicPool;
const TIMER_INTERVAL_MS: u64 = 5000;
-1
View File
@@ -36,7 +36,6 @@ extern crate substrate_client as client;
extern crate substrate_network as network;
extern crate substrate_network_libp2p as network_libp2p;
extern crate substrate_runtime_primitives as runtime_primitives;
extern crate substrate_extrinsic_pool;
extern crate substrate_service as service;
#[macro_use]
extern crate slog; // needed until we can reexport `slog_info` from `substrate_telemetry`
@@ -11,3 +11,9 @@ futures = "0.1"
log = "0.3"
parking_lot = "0.4"
transaction-pool = "1.12"
substrate-runtime-primitives = { path = "../../substrate/runtime/primitives" }
[dev-dependencies]
substrate-test-client = { path = "../../substrate/test-client" }
substrate-keyring = { path = "../../substrate/keyring" }
substrate-codec = { path = "../../substrate/codec" }
@@ -1,66 +0,0 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! External API for extrinsic pool.
use std::fmt::Debug;
use serde::{Serialize, de::DeserializeOwned};
use txpool;
use futures::sync::mpsc;
use watcher::Watcher;
/// Extrinsic pool error.
pub trait Error: ::std::error::Error + Send + Sized {
/// Try to extract original `txpool::Error`
///
/// This implementation is optional and used only to
/// provide more descriptive error messages for end users
/// of RPC API.
fn into_pool_error(self) -> Result<txpool::Error, Self> { Err(self) }
}
impl Error for txpool::Error {
fn into_pool_error(self) -> Result<txpool::Error, Self> { Ok(self) }
}
/// Modification notification event stream type;
pub type EventStream = mpsc::UnboundedReceiver<()>;
/// Extrinsic pool.
pub trait ExtrinsicPool<Ex, BlockId, Hash>: Send + Sync + 'static {
/// Error type
type Error: Error;
/// Pooled extrinsics
type InPool: Debug + Serialize + DeserializeOwned + Send + Sync + 'static;
/// Submit a collection of extrinsics to the pool.
fn submit(&self, block: BlockId, xt: Vec<Ex>) -> Result<Vec<Hash>, Self::Error>;
/// Submit an extrinsic to the pool and start watching it's progress.
fn submit_and_watch(&self, block: BlockId, xt: Ex) -> Result<Watcher<Hash>, Self::Error>;
/// Returns light status of the pool.
fn light_status(&self) -> txpool::LightStatus;
/// Return an event stream of transactions imported to the pool.
fn import_notification_stream(&self) -> EventStream;
/// Return all extrinsics in the pool aggregated by the sender.
fn all(&self) -> Self::InPool;
}
@@ -0,0 +1,33 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! External Error trait for extrinsic pool.
use txpool;
/// Extrinsic pool error.
pub trait IntoPoolError: ::std::error::Error + Send + Sized {
/// Try to extract original `txpool::Error`
///
/// This implementation is optional and used only to
/// provide more descriptive error messages for end users
/// of RPC API.
fn into_pool_error(self) -> Result<txpool::Error, Self> { Err(self) }
}
impl IntoPoolError for txpool::Error {
fn into_pool_error(self) -> Result<txpool::Error, Self> { Ok(self) }
}
+14 -7
View File
@@ -15,25 +15,32 @@
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
#![warn(missing_docs)]
#![warn(unused_extern_crates)]
//! Generic extrinsic pool.
extern crate futures;
extern crate parking_lot;
extern crate serde;
extern crate substrate_runtime_primitives as runtime_primitives;
#[macro_use]
extern crate log;
extern crate serde;
#[macro_use]
extern crate serde_derive;
extern crate transaction_pool as txpool;
#[cfg(test)] extern crate substrate_test_client as test_client;
#[cfg(test)] extern crate substrate_keyring as keyring;
#[cfg(test)] extern crate substrate_codec as codec;
pub extern crate transaction_pool as txpool;
pub mod api;
pub mod watcher;
mod error;
mod listener;
mod pool;
pub use self::listener::Listener;
pub use self::pool::Pool;
pub use listener::Listener;
pub use pool::{Pool, ChainApi, EventStream, Verified, VerifiedFor, ExtrinsicFor, ExHash, AllExtrinsics};
pub use txpool::scoring;
pub use txpool::{Error, ErrorKind};
pub use error::IntoPoolError;
pub use txpool::{Options, Status, LightStatus, VerifiedTransaction, Readiness, Transaction};
+435 -63
View File
@@ -1,65 +1,183 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// This file is part of Substrate.
// Polkadot is free software: you can redistribute it and/or modify
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use std::{
collections::HashMap,
fmt,
marker::PhantomData,
sync::Arc,
};
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use std::{ collections::HashMap, fmt, sync::Arc, collections::BTreeMap};
use futures::sync::mpsc;
use parking_lot::{RwLock, Mutex};
use txpool;
use parking_lot::{Mutex, RwLock};
use serde::{Serialize, de::DeserializeOwned};
use txpool::{self, Scoring, Readiness};
use listener::Listener;
use watcher::Watcher;
use error::IntoPoolError;
use runtime_primitives::{generic::BlockId, traits::Block as BlockT};
/// Modification notification event stream type;
pub type EventStream = mpsc::UnboundedReceiver<()>;
/// Extrinsic hash type for a pool.
pub type ExHash<A> = <A as ChainApi>::Hash;
/// Extrinsic type for a pool.
pub type ExtrinsicFor<A> = <<A as ChainApi>::Block as BlockT>::Extrinsic;
/// Verified extrinsic data for `ChainApi`.
pub type VerifiedFor<A> = Verified<ExtrinsicFor<A>, <A as ChainApi>::VEx>;
/// A collection of all extrinsics.
pub type AllExtrinsics<A> = BTreeMap<<<A as ChainApi>::VEx as txpool::VerifiedTransaction>::Sender, Vec<ExtrinsicFor<A>>>;
/// Verified extrinsic struct. Wraps original extrinsic and verification info.
#[derive(Debug)]
pub struct Verified<Ex: ::std::fmt::Debug, VEx: txpool::VerifiedTransaction> {
/// Original extrinsic.
pub original: Ex,
/// Verification data.
pub verified: VEx,
}
impl<Ex, VEx> txpool::VerifiedTransaction for Verified<Ex, VEx>
where
Ex: ::std::fmt::Debug,
VEx: txpool::VerifiedTransaction,
{
type Hash = <VEx as txpool::VerifiedTransaction>::Hash;
type Sender = <VEx as txpool::VerifiedTransaction>::Sender;
fn hash(&self) -> &Self::Hash {
self.verified.hash()
}
fn sender(&self) -> &Self::Sender {
self.verified.sender()
}
fn mem_usage(&self) -> usize {
// TODO: add `original` mem usage.
self.verified.mem_usage()
}
}
/// Concrete extrinsic validation and query logic.
pub trait ChainApi: Send + Sync {
/// Block type.
type Block: BlockT;
/// Extrinsic hash type.
type Hash: ::std::hash::Hash + Eq + Copy + fmt::Debug + fmt::LowerHex + Serialize + DeserializeOwned + ::std::str::FromStr + Send + Sync + Default + 'static;
/// Extrinsic sender type.
type Sender: ::std::hash::Hash + fmt::Debug + Serialize + DeserializeOwned + Eq + Clone + Send + Sync + Ord + Default;
/// Unchecked extrinsic type.
/// Verified extrinsic type.
type VEx: txpool::VerifiedTransaction<Hash=Self::Hash, Sender=Self::Sender> + Send + Sync + Clone;
/// Readiness evaluator
type Ready;
/// Error type.
type Error: From<txpool::Error> + IntoPoolError;
/// Score type.
type Score: ::std::cmp::Ord + Clone + Default + fmt::Debug + Send + Send + Sync;
/// Custom scoring update event type.
type Event: ::std::fmt::Debug;
/// Verify extrinsic at given block.
fn verify_transaction(&self, at: &BlockId<Self::Block>, uxt: &ExtrinsicFor<Self>) -> Result<Self::VEx, Self::Error>;
/// Create new readiness evaluator.
fn ready(&self) -> Self::Ready;
/// Check readiness for verified extrinsic at given block.
fn is_ready(&self, at: &BlockId<Self::Block>, context: &mut Self::Ready, xt: &VerifiedFor<Self>) -> Readiness;
/// Decides on ordering of `T`s from a particular sender.
fn compare(old: &VerifiedFor<Self>, other: &VerifiedFor<Self>) -> ::std::cmp::Ordering;
/// Decides how to deal with two transactions from a sender that seem to occupy the same slot in the queue.
fn choose(old: &VerifiedFor<Self>, new: &VerifiedFor<Self>) -> txpool::scoring::Choice;
/// Updates the transaction scores given a list of transactions and a change to previous scoring.
/// NOTE: you can safely assume that both slices have the same length.
/// (i.e. score at index `i` represents transaction at the same index)
fn update_scores(xts: &[txpool::Transaction<VerifiedFor<Self>>], scores: &mut [Self::Score], change: txpool::scoring::Change<Self::Event>);
/// Decides if `new` should push out `old` transaction from the pool.
///
/// NOTE returning `InsertNew` here can lead to some transactions being accepted above pool limits.
fn should_replace(old: &VerifiedFor<Self>, new: &VerifiedFor<Self>) -> txpool::scoring::Choice;
}
pub struct Ready<'a, 'b, B: 'a + ChainApi> {
api: &'a B,
at: &'b BlockId<B::Block>,
context: B::Ready,
}
impl<'a, 'b, B: ChainApi> txpool::Ready<VerifiedFor<B>> for Ready<'a, 'b, B> {
fn is_ready(&mut self, xt: &VerifiedFor<B>) -> Readiness {
self.api.is_ready(self.at, &mut self.context, xt)
}
}
pub struct ScoringAdapter<T>(::std::marker::PhantomData<T>);
impl<T> ::std::fmt::Debug for ScoringAdapter<T> {
fn fmt(&self, _f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
Ok(())
}
}
impl<T: ChainApi> Scoring<VerifiedFor<T>> for ScoringAdapter<T> {
type Score = <T as ChainApi>::Score;
type Event = <T as ChainApi>::Event;
fn compare(&self, old: &VerifiedFor<T>, other: &VerifiedFor<T>) -> ::std::cmp::Ordering {
T::compare(old, other)
}
fn choose(&self, old: &VerifiedFor<T>, new: &VerifiedFor<T>) -> txpool::scoring::Choice {
T::choose(old, new)
}
fn update_scores(&self, xts: &[txpool::Transaction<VerifiedFor<T>>], scores: &mut [Self::Score], change: txpool::scoring::Change<Self::Event>) {
T::update_scores(xts, scores, change)
}
fn should_replace(&self, old: &VerifiedFor<T>, new: &VerifiedFor<T>) -> txpool::scoring::Choice {
T::should_replace(old, new)
}
}
/// Extrinsics pool.
pub struct Pool<Hash, VEx, S, E> where
Hash: ::std::hash::Hash + Eq + Copy + fmt::Debug + fmt::LowerHex,
S: txpool::Scoring<VEx>,
VEx: txpool::VerifiedTransaction<Hash=Hash>,
{
_error: Mutex<PhantomData<E>>,
pub struct Pool<B: ChainApi> {
api: B,
pool: RwLock<txpool::Pool<
VEx,
S,
Listener<Hash>,
VerifiedFor<B>,
ScoringAdapter<B>,
Listener<B::Hash>,
>>,
import_notification_sinks: Mutex<Vec<mpsc::UnboundedSender<()>>>,
}
impl<Hash, VEx, S, E> Pool<Hash, VEx, S, E> where
Hash: ::std::hash::Hash + Eq + Copy + fmt::Debug + fmt::LowerHex + Default,
S: txpool::Scoring<VEx>,
VEx: txpool::VerifiedTransaction<Hash=Hash>,
E: From<txpool::Error>,
{
impl<B: ChainApi> Pool<B> {
/// Create a new transaction pool.
pub fn new(options: txpool::Options, scoring: S) -> Self {
pub fn new(options: txpool::Options, api: B) -> Self {
Pool {
_error: Default::default(),
pool: RwLock::new(txpool::Pool::new(Listener::default(), scoring, options)),
pool: RwLock::new(txpool::Pool::new(Listener::default(), ScoringAdapter::<B>(Default::default()), options)),
import_notification_sinks: Default::default(),
api,
}
}
/// Imports a pre-verified extrinsic to the pool.
pub fn import(&self, xt: VEx) -> Result<Arc<VEx>, E> {
pub fn import(&self, xt: VerifiedFor<B>) -> Result<Arc<VerifiedFor<B>>, B::Error> {
let result = self.pool.write().import(xt)?;
self.import_notification_sinks.lock()
@@ -69,62 +187,82 @@ impl<Hash, VEx, S, E> Pool<Hash, VEx, S, E> where
}
/// Return an event stream of transactions imported to the pool.
pub fn import_notification_stream(&self) -> mpsc::UnboundedReceiver<()> {
pub fn import_notification_stream(&self) -> EventStream {
let (sink, stream) = mpsc::unbounded();
self.import_notification_sinks.lock().push(sink);
stream
}
/// Invoked when extrinsics are broadcasted.
pub fn on_broadcasted(&self, propagated: HashMap<Hash, Vec<String>>) {
pub fn on_broadcasted(&self, propagated: HashMap<B::Hash, Vec<String>>) {
for (hash, peers) in propagated.into_iter() {
self.pool.write().listener_mut().broadcasted(&hash, peers);
}
}
/// Imports a bunch of unverified extrinsics to the pool
pub fn submit<V, Ex, T>(&self, verifier: V, xts: T) -> Result<Vec<Arc<VEx>>, E> where
V: txpool::Verifier<Ex, VerifiedTransaction=VEx>,
E: From<V::Error>,
T: IntoIterator<Item=Ex>
pub fn submit_at<T>(&self, at: &BlockId<B::Block>, xts: T) -> Result<Vec<Arc<VerifiedFor<B>>>, B::Error> where
T: IntoIterator<Item=ExtrinsicFor<B>>
{
xts
.into_iter()
.map(|xt| verifier.verify_transaction(xt))
.map(|xt| {
Ok(self.pool.write().import(xt?)?)
.map(|xt| (self.api.verify_transaction(at, &xt), xt))
.map(|(v, xt)| {
let xt = Verified { original: xt, verified: v? };
Ok(self.pool.write().import(xt)?)
})
.collect()
}
/// Imports one unverified extrinsic to the pool
pub fn submit_one(&self, at: &BlockId<B::Block>, xt: ExtrinsicFor<B>) -> Result<Arc<VerifiedFor<B>>, B::Error> {
let v = self.api.verify_transaction(at, &xt)?;
let xt = Verified { original: xt, verified: v };
Ok(self.pool.write().import(xt)?)
}
/// Import a single extrinsic and starts to watch their progress in the pool.
pub fn submit_and_watch<V, Ex>(&self, verifier: V, xt: Ex) -> Result<Watcher<Hash>, E> where
V: txpool::Verifier<Ex, VerifiedTransaction=VEx>,
E: From<V::Error>,
{
let xt = self.submit(verifier, vec![xt])?.pop().expect("One extrinsic passed; one result returned; qed");
pub fn submit_and_watch(&self, at: &BlockId<B::Block>, xt: ExtrinsicFor<B>) -> Result<Watcher<B::Hash>, B::Error> {
let xt = self.submit_at(at, Some(xt))?.pop().expect("One extrinsic passed; one result returned; qed");
Ok(self.pool.write().listener_mut().create_watcher(xt))
}
/// Remove from the pool.
pub fn remove(&self, hashes: &[Hash], is_valid: bool) -> Vec<Option<Arc<VEx>>> {
pub fn remove(&self, hashes: &[B::Hash], is_valid: bool) -> Vec<Option<Arc<VerifiedFor<B>>>> {
let mut pool = self.pool.write();
let mut results = Vec::with_capacity(hashes.len());
for hash in hashes {
results.push(pool.remove(hash, !is_valid));
results.push(pool.remove(hash, is_valid));
}
results
}
/// Cull transactions from the queue.
pub fn cull<R>(&self, senders: Option<&[<VEx as txpool::VerifiedTransaction>::Sender]>, ready: R) -> usize where
R: txpool::Ready<VEx>,
pub fn cull_from(
&self,
at: &BlockId<B::Block>,
senders: Option<&[<B::VEx as txpool::VerifiedTransaction>::Sender]>,
) -> usize
{
let ready = Ready { api: &self.api, context: self.api.ready(), at };
self.pool.write().cull(senders, ready)
}
/// Cull old transactions from the queue.
pub fn cull(&self, at: &BlockId<B::Block>) -> Result<usize, B::Error> {
Ok(self.cull_from(at, None))
}
/// Cull transactions from the queue and then compute the pending set.
pub fn cull_and_get_pending<F, T>(&self, at: &BlockId<B::Block>, f: F) -> Result<T, B::Error> where
F: FnOnce(txpool::PendingIterator<VerifiedFor<B>, Ready<B>, ScoringAdapter<B>, Listener<B::Hash>>) -> T,
{
self.cull_from(at, None);
Ok(self.pending(at, f))
}
/// Get the full status of the queue (including readiness)
pub fn status<R: txpool::Ready<VEx>>(&self, ready: R) -> txpool::Status {
pub fn status<R: txpool::Ready<VerifiedFor<B>>>(&self, ready: R) -> txpool::Status {
self.pool.read().status(ready)
}
@@ -134,34 +272,268 @@ impl<Hash, VEx, S, E> Pool<Hash, VEx, S, E> where
}
/// Removes all transactions from given sender
pub fn remove_sender(&self, sender: VEx::Sender) -> Vec<Arc<VEx>> {
pub fn remove_sender(&self, sender: <B::VEx as txpool::VerifiedTransaction>::Sender) -> Vec<Arc<VerifiedFor<B>>> {
let mut pool = self.pool.write();
let pending = pool.pending_from_sender(|_: &VEx| txpool::Readiness::Ready, &sender).collect();
let pending = pool.pending_from_sender(|_: &VerifiedFor<B>| txpool::Readiness::Ready, &sender).collect();
// remove all transactions from this sender
pool.cull(Some(&[sender]), |_: &VEx| txpool::Readiness::Stale);
pool.cull(Some(&[sender]), |_: &VerifiedFor<B>| txpool::Readiness::Stale);
pending
}
/// Retrieve the pending set. Be careful to not leak the pool `ReadGuard` to prevent deadlocks.
pub fn pending<R, F, T>(&self, ready: R, f: F) -> T where
R: txpool::Ready<VEx>,
F: FnOnce(txpool::PendingIterator<VEx, R, S, Listener<Hash>>) -> T,
pub fn pending<F, T>(&self, at: &BlockId<B::Block>, f: F) -> T where
F: FnOnce(txpool::PendingIterator<VerifiedFor<B>, Ready<B>, ScoringAdapter<B>, Listener<B::Hash>>) -> T,
{
let ready = Ready { api: &self.api, context: self.api.ready(), at };
f(self.pool.read().pending(ready))
}
/// Retry to import all verified transactions from given sender.
pub fn retry_verification(&self, at: &BlockId<B::Block>, sender: <B::VEx as txpool::VerifiedTransaction>::Sender) -> Result<(), B::Error> {
let to_reverify = self.remove_sender(sender);
self.submit_at(at, to_reverify.into_iter().map(|ex| Arc::try_unwrap(ex).expect("Removed items have no references").original))?;
Ok(())
}
/// Retrieve all transactions in the pool. The transactions might be unordered.
pub fn all<F, T>(&self, f: F) -> T where
F: FnOnce(txpool::UnorderedIterator<VEx, AlwaysReady, S>) -> T,
{
f(self.pool.read().unordered_pending(AlwaysReady))
/// Reverify transaction that has been reported incorrect.
///
/// Returns `Ok(None)` in case the hash is missing, `Err(e)` in case of verification error and new transaction
/// reference otherwise.
///
/// TODO [ToDr] That method is currently unused, should be used together with BlockBuilder
/// when we detect that particular transaction has failed.
/// In such case we will attempt to remove or re-verify it.
pub fn reverify_transaction(&self, at: &BlockId<B::Block>, hash: B::Hash) -> Result<Option<Arc<VerifiedFor<B>>>, B::Error> {
let result = self.remove(&[hash], false).pop().expect("One hash passed; one result received; qed");
if let Some(ex) = result {
self.submit_one(at, Arc::try_unwrap(ex).expect("Removed items have no references").original).map(Some)
} else {
Ok(None)
}
}
/// Retrieve all transactions in the pool grouped by sender.
pub fn all(&self) -> AllExtrinsics<B> {
use txpool::VerifiedTransaction;
let pool = self.pool.read();
let all = pool.unordered_pending(AlwaysReady);
all.fold(Default::default(), |mut map: AllExtrinsics<B>, tx| {
// Map with `null` key is not serializable, so we fallback to default accountId.
map.entry(tx.verified.sender().clone())
.or_insert_with(Vec::new)
// use bytes type to make it serialize nicer.
.push(tx.original.clone());
map
})
}
}
/// A Readiness implementation that returns `Ready` for all transactions.
/// A Readiness implementation that returns `Ready` for all transactions.
pub struct AlwaysReady;
impl<VEx> txpool::Ready<VEx> for AlwaysReady {
fn is_ready(&mut self, _tx: &VEx) -> txpool::Readiness {
txpool::Readiness::Ready
}
}
#[cfg(test)]
mod tests {
use txpool;
use super::{VerifiedFor, ExtrinsicFor};
use std::collections::HashMap;
use std::cmp::Ordering;
use {Pool, ChainApi, scoring, Readiness};
use keyring::Keyring::{self, *};
use codec::Encode;
use test_client::runtime::{AccountId, Block, Hash, Index, Extrinsic, Transfer};
use runtime_primitives::{generic, traits::{Hash as HashT, BlindCheckable, BlakeTwo256}};
use VerifiedTransaction as VerifiedExtrinsic;
type BlockId = generic::BlockId<Block>;
#[derive(Clone, Debug)]
pub struct VerifiedTransaction {
hash: Hash,
sender: AccountId,
nonce: u64,
}
impl txpool::VerifiedTransaction for VerifiedTransaction {
type Hash = Hash;
type Sender = AccountId;
fn hash(&self) -> &Self::Hash {
&self.hash
}
fn sender(&self) -> &Self::Sender {
&self.sender
}
fn mem_usage(&self) -> usize {
256
}
}
struct TestApi;
impl TestApi {
fn default() -> Self {
TestApi
}
}
impl ChainApi for TestApi {
type Block = Block;
type Hash = Hash;
type Sender = AccountId;
type Error = txpool::Error;
type VEx = VerifiedTransaction;
type Ready = HashMap<AccountId, u64>;
type Score = u64;
type Event = ();
fn verify_transaction(&self, _at: &BlockId, uxt: &ExtrinsicFor<Self>) -> Result<Self::VEx, Self::Error> {
let hash = BlakeTwo256::hash(&uxt.encode());
let xt = uxt.clone().check()?;
Ok(VerifiedTransaction {
hash,
sender: xt.transfer.from,
nonce: xt.transfer.nonce,
})
}
fn is_ready(&self, at: &BlockId, nonce_cache: &mut Self::Ready, xt: &VerifiedFor<Self>) -> Readiness {
let sender = xt.verified.sender;
let next_index = nonce_cache.entry(sender)
.or_insert_with(|| index(at, sender));
let result = match xt.original.transfer.nonce.cmp(&next_index) {
Ordering::Greater => Readiness::Future,
Ordering::Equal => Readiness::Ready,
Ordering::Less => Readiness::Stale,
};
// remember to increment `next_index`
*next_index = next_index.saturating_add(1);
result
}
fn ready(&self) -> Self::Ready {
HashMap::default()
}
fn compare(old: &VerifiedFor<Self>, other: &VerifiedFor<Self>) -> Ordering {
old.original.transfer.nonce.cmp(&other.original.transfer.nonce)
}
fn choose(old: &VerifiedFor<Self>, new: &VerifiedFor<Self>) -> scoring::Choice {
assert!(new.verified.sender == old.verified.sender, "Scoring::choose called with transactions from different senders");
if old.original.transfer.nonce == new.original.transfer.nonce {
return scoring::Choice::RejectNew;
}
scoring::Choice::InsertNew
}
fn update_scores(
xts: &[txpool::Transaction<VerifiedFor<Self>>],
scores: &mut [Self::Score],
_change: scoring::Change<()>
) {
for i in 0..xts.len() {
scores[i] = xts[i].original.transfer.amount;
}
}
fn should_replace(_old: &VerifiedFor<Self>, _new: &VerifiedFor<Self>) -> scoring::Choice {
scoring::Choice::InsertNew
}
}
fn index(at: &BlockId, _account: AccountId) -> u64 {
(_account[0] as u64) + number_of(at)
}
fn number_of(at: &BlockId) -> u64 {
match at {
generic::BlockId::Number(n) => *n as u64,
_ => 0,
}
}
fn uxt(who: Keyring, nonce: Index) -> Extrinsic {
let transfer = Transfer {
from: who.to_raw_public().into(),
to: AccountId::default(),
nonce,
amount: 1,
};
let signature = transfer.using_encoded(|e| who.sign(e));
Extrinsic {
transfer,
signature: signature.into(),
}
}
fn pool() -> Pool<TestApi> {
Pool::new(Default::default(), TestApi::default())
}
#[test]
fn submission_should_work() {
let pool = pool();
assert_eq!(209, index(&BlockId::number(0), Alice.to_raw_public().into()));
pool.submit_one(&BlockId::number(0), uxt(Alice, 209)).unwrap();
let pending: Vec<_> = pool.cull_and_get_pending(&BlockId::number(0), |p| p.map(|a| (*a.sender(), a.original.transfer.nonce)).collect()).unwrap();
assert_eq!(pending, vec![(Alice.to_raw_public().into(), 209)]);
}
#[test]
fn multiple_submission_should_work() {
let pool = pool();
pool.submit_one(&BlockId::number(0), uxt(Alice, 209)).unwrap();
pool.submit_one(&BlockId::number(0), uxt(Alice, 210)).unwrap();
let pending: Vec<_> = pool.cull_and_get_pending(&BlockId::number(0), |p| p.map(|a| (*a.sender(), a.original.transfer.nonce)).collect()).unwrap();
assert_eq!(pending, vec![(Alice.to_raw_public().into(), 209), (Alice.to_raw_public().into(), 210)]);
}
#[test]
fn early_nonce_should_be_culled() {
let pool = pool();
pool.submit_one(&BlockId::number(0), uxt(Alice, 208)).unwrap();
let pending: Vec<_> = pool.cull_and_get_pending(&BlockId::number(0), |p| p.map(|a| (*a.sender(), a.original.transfer.nonce)).collect()).unwrap();
assert_eq!(pending, vec![]);
}
#[test]
fn late_nonce_should_be_queued() {
let pool = pool();
pool.submit_one(&BlockId::number(0), uxt(Alice, 210)).unwrap();
let pending: Vec<_> = pool.cull_and_get_pending(&BlockId::number(0), |p| p.map(|a| (*a.sender(), a.original.transfer.nonce)).collect()).unwrap();
assert_eq!(pending, vec![]);
pool.submit_one(&BlockId::number(0), uxt(Alice, 209)).unwrap();
let pending: Vec<_> = pool.cull_and_get_pending(&BlockId::number(0), |p| p.map(|a| (*a.sender(), a.original.transfer.nonce)).collect()).unwrap();
assert_eq!(pending, vec![(Alice.to_raw_public().into(), 209), (Alice.to_raw_public().into(), 210)]);
}
#[test]
fn retrying_verification_might_not_change_anything() {
let pool = pool();
pool.submit_one(&BlockId::number(0), uxt(Alice, 209)).unwrap();
pool.submit_one(&BlockId::number(0), uxt(Alice, 210)).unwrap();
let pending: Vec<_> = pool.cull_and_get_pending(&BlockId::number(0), |p| p.map(|a| (*a.sender(), a.original.transfer.nonce)).collect()).unwrap();
assert_eq!(pending, vec![(Alice.to_raw_public().into(), 209), (Alice.to_raw_public().into(), 210)]);
pool.retry_verification(&BlockId::number(1), Alice.to_raw_public().into()).unwrap();
let pending: Vec<_> = pool.cull_and_get_pending(&BlockId::number(0), |p| p.map(|a| (*a.sender(), a.original.transfer.nonce)).collect()).unwrap();
assert_eq!(pending, vec![(Alice.to_raw_public().into(), 209), (Alice.to_raw_public().into(), 210)]);
}
}
+21 -21
View File
@@ -29,7 +29,7 @@ use message::{self, Message};
use message::generic::Message as GenericMessage;
use specialization::Specialization;
use sync::{ChainSync, Status as SyncStatus, SyncState};
use service::{Roles, TransactionPool};
use service::{Roles, TransactionPool, ExHashT};
use import_queue::ImportQueue;
use config::ProtocolConfig;
use chain::Client;
@@ -48,16 +48,16 @@ pub (crate) const CURRENT_PACKET_COUNT: u8 = 1;
const MAX_BLOCK_DATA_RESPONSE: u32 = 128;
// Lock must always be taken in order declared here.
pub struct Protocol<B: BlockT, S: Specialization<B>> {
pub struct Protocol<B: BlockT, S: Specialization<B>, H: ExHashT> {
config: ProtocolConfig,
on_demand: Option<Arc<OnDemandService<B>>>,
genesis_hash: B::Hash,
sync: Arc<RwLock<ChainSync<B>>>,
specialization: RwLock<S>,
context_data: ContextData<B>,
context_data: ContextData<B, H>,
// Connected peers pending Status message.
handshaking_peers: RwLock<HashMap<NodeIndex, time::Instant>>,
transaction_pool: Arc<TransactionPool<B>>,
transaction_pool: Arc<TransactionPool<H, B>>,
}
/// Syncing status and statistics
#[derive(Clone)]
@@ -71,7 +71,7 @@ pub struct ProtocolStatus<B: BlockT> {
}
/// Peer information
struct Peer<B: BlockT> {
struct Peer<B: BlockT, H: ExHashT> {
/// Protocol version
protocol_version: u32,
/// Roles
@@ -85,7 +85,7 @@ struct Peer<B: BlockT> {
/// Request timestamp
request_timestamp: Option<time::Instant>,
/// Holds a set of transactions known to this peer.
known_extrinsics: HashSet<B::Hash>,
known_extrinsics: HashSet<H>,
/// Holds a set of blocks known to this peer.
known_blocks: HashSet<B::Hash>,
/// Request counter,
@@ -121,13 +121,13 @@ pub trait Context<B: BlockT> {
}
/// Protocol context.
pub(crate) struct ProtocolContext<'a, B: 'a + BlockT> {
pub(crate) struct ProtocolContext<'a, B: 'a + BlockT, H: 'a + ExHashT> {
io: &'a mut SyncIo,
context_data: &'a ContextData<B>,
context_data: &'a ContextData<B, H>,
}
impl<'a, B: BlockT + 'a> ProtocolContext<'a, B> {
pub(crate) fn new(context_data: &'a ContextData<B>, io: &'a mut SyncIo) -> Self {
impl<'a, B: BlockT + 'a, H: 'a + ExHashT> ProtocolContext<'a, B, H> {
pub(crate) fn new(context_data: &'a ContextData<B, H>, io: &'a mut SyncIo) -> Self {
ProtocolContext {
io,
context_data,
@@ -157,7 +157,7 @@ impl<'a, B: BlockT + 'a> ProtocolContext<'a, B> {
}
}
impl<'a, B: BlockT + 'a> Context<B> for ProtocolContext<'a, B> {
impl<'a, B: BlockT + 'a, H: ExHashT + 'a> Context<B> for ProtocolContext<'a, B, H> {
fn send_message(&mut self, who: NodeIndex, message: Message<B>) {
ProtocolContext::send_message(self, who, message);
}
@@ -176,20 +176,20 @@ impl<'a, B: BlockT + 'a> Context<B> for ProtocolContext<'a, B> {
}
/// Data necessary to create a context.
pub(crate) struct ContextData<B: BlockT> {
pub(crate) struct ContextData<B: BlockT, H: ExHashT> {
// All connected peers
peers: RwLock<HashMap<NodeIndex, Peer<B>>>,
peers: RwLock<HashMap<NodeIndex, Peer<B, H>>>,
chain: Arc<Client<B>>,
}
impl<B: BlockT, S: Specialization<B>> Protocol<B, S> {
impl<B: BlockT, S: Specialization<B>, H: ExHashT> Protocol<B, S, H> {
/// Create a new instance.
pub fn new(
config: ProtocolConfig,
chain: Arc<Client<B>>,
import_queue: Arc<ImportQueue<B>>,
on_demand: Option<Arc<OnDemandService<B>>>,
transaction_pool: Arc<TransactionPool<B>>,
transaction_pool: Arc<TransactionPool<H, B>>,
specialization: S,
) -> error::Result<Self> {
let info = chain.info()?;
@@ -210,7 +210,7 @@ impl<B: BlockT, S: Specialization<B>> Protocol<B, S> {
Ok(protocol)
}
pub(crate) fn context_data(&self) -> &ContextData<B> {
pub(crate) fn context_data(&self) -> &ContextData<B, H> {
&self.context_data
}
@@ -276,7 +276,7 @@ impl<B: BlockT, S: Specialization<B>> Protocol<B, S> {
}
pub fn send_message(&self, io: &mut SyncIo, who: NodeIndex, message: Message<B>) {
send_message::<B>(&self.context_data.peers, io, who, message)
send_message::<B, H>(&self.context_data.peers, io, who, message)
}
/// Called when a new peer is connected
@@ -490,7 +490,7 @@ impl<B: BlockT, S: Specialization<B>> Protocol<B, S> {
let (hashes, to_send): (Vec<_>, Vec<_>) = extrinsics
.iter()
.cloned()
.filter(|&(hash, _)| peer.known_extrinsics.insert(hash))
.filter(|&(ref hash, _)| peer.known_extrinsics.insert(hash.clone()))
.unzip();
if !to_send.is_empty() {
@@ -616,11 +616,11 @@ impl<B: BlockT, S: Specialization<B>> Protocol<B, S> {
Default::default()
},
};
self.send_message(io, who, GenericMessage::RemoteReadResponse(message::RemoteReadResponse {
self.send_message(io, who, GenericMessage::RemoteReadResponse(message::RemoteReadResponse {
id: request.id, proof,
}));
}
fn on_remote_read_response(&self, io: &mut SyncIo, who: NodeIndex, response: message::RemoteReadResponse) {
fn on_remote_read_response(&self, io: &mut SyncIo, who: NodeIndex, response: message::RemoteReadResponse) {
trace!(target: "sync", "Remote read response {} from {}", response.id, who);
self.on_demand.as_ref().map(|s| s.on_remote_read_response(io, who, response));
}
@@ -633,7 +633,7 @@ impl<B: BlockT, S: Specialization<B>> Protocol<B, S> {
}
}
fn send_message<B: BlockT>(peers: &RwLock<HashMap<NodeIndex, Peer<B>>>, io: &mut SyncIo, who: NodeIndex, mut message: Message<B>) {
fn send_message<B: BlockT, H: ExHashT>(peers: &RwLock<HashMap<NodeIndex, Peer<B, H>>>, io: &mut SyncIo, who: NodeIndex, mut message: Message<B>) {
match &mut message {
&mut GenericMessage::BlockRequest(ref mut r) => {
let mut peers = peers.write();
+21 -19
View File
@@ -81,14 +81,17 @@ pub trait SyncProvider<B: BlockT>: Send + Sync {
fn node_id(&self) -> Option<String>;
}
pub trait ExHashT: ::std::hash::Hash + Eq + ::std::fmt::Debug + Clone + Send + Sync + 'static {}
impl<T> ExHashT for T where T: ::std::hash::Hash + Eq + ::std::fmt::Debug + Clone + Send + Sync + 'static {}
/// Transaction pool interface
pub trait TransactionPool<B: BlockT>: Send + Sync {
pub trait TransactionPool<H: ExHashT, B: BlockT>: Send + Sync {
/// Get transactions from the pool that are ready to be propagated.
fn transactions(&self) -> Vec<(B::Hash, B::Extrinsic)>;
fn transactions(&self) -> Vec<(H, B::Extrinsic)>;
/// Import a transaction into the pool.
fn import(&self, transaction: &B::Extrinsic) -> Option<B::Hash>;
fn import(&self, transaction: &B::Extrinsic) -> Option<H>;
/// Notify the pool about transactions broadcast.
fn on_broadcasted(&self, propagations: HashMap<B::Hash, Vec<String>>);
fn on_broadcasted(&self, propagations: HashMap<H, Vec<String>>);
}
/// ConsensusService
@@ -109,9 +112,9 @@ pub trait ExecuteInContext<B: BlockT>: Send + Sync {
fn execute_in_context<F: Fn(&mut Context<B>)>(&self, closure: F);
}
/// devp2p Protocol handler
struct ProtocolHandler<B: BlockT, S: Specialization<B>> {
protocol: Protocol<B, S>,
/// Network protocol handler
struct ProtocolHandler<B: BlockT, S: Specialization<B>, H: ExHashT> {
protocol: Protocol<B, S, H>,
}
/// Peer connection information
@@ -132,7 +135,7 @@ pub struct PeerInfo<B: BlockT> {
}
/// Service initialization parameters.
pub struct Params<B: BlockT, S> {
pub struct Params<B: BlockT, S, H: ExHashT> {
/// Configuration.
pub config: ProtocolConfig,
/// Network layer configuration.
@@ -142,24 +145,24 @@ pub struct Params<B: BlockT, S> {
/// On-demand service reference.
pub on_demand: Option<Arc<OnDemandService<B>>>,
/// Transaction pool.
pub transaction_pool: Arc<TransactionPool<B>>,
pub transaction_pool: Arc<TransactionPool<H, B>>,
/// Protocol specialization.
pub specialization: S,
}
/// Polkadot network service. Handles network IO and manages connectivity.
pub struct Service<B: BlockT + 'static, S: Specialization<B>> {
pub struct Service<B: BlockT + 'static, S: Specialization<B>, H: ExHashT> {
/// Network service
network: NetworkService,
/// Devp2p protocol handler
handler: Arc<ProtocolHandler<B, S>>,
handler: Arc<ProtocolHandler<B, S, H>>,
/// Devp2p protocol ID.
protocol_id: ProtocolId,
}
impl<B: BlockT + 'static, S: Specialization<B>> Service<B, S> {
impl<B: BlockT + 'static, S: Specialization<B>, H: ExHashT> Service<B, S, H> {
/// Creates and register protocol with the network service
pub fn new(params: Params<B, S>, protocol_id: ProtocolId) -> Result<Arc<Service<B, S>>, Error> {
pub fn new(params: Params<B, S, H>, protocol_id: ProtocolId) -> Result<Arc<Service<B, S, H>>, Error> {
let chain = params.chain.clone();
let import_queue = Arc::new(AsyncImportQueue::new());
let handler = Arc::new(ProtocolHandler {
@@ -228,13 +231,12 @@ impl<B: BlockT + 'static, S: Specialization<B>> Service<B, S> {
}
}
impl<B: BlockT + 'static, S: Specialization<B>> Drop for Service<B, S> {
impl<B: BlockT + 'static, S: Specialization<B>, H:ExHashT> Drop for Service<B, S, H> {
fn drop(&mut self) {
self.handler.protocol.stop();
}
}
impl<B: BlockT + 'static, S: Specialization<B>> ExecuteInContext<B> for Service<B, S> {
impl<B: BlockT + 'static, S: Specialization<B>, H: ExHashT> ExecuteInContext<B> for Service<B, S, H> {
fn execute_in_context<F: Fn(&mut ::protocol::Context<B>)>(&self, closure: F) {
self.network.with_context(self.protocol_id, |context| {
closure(&mut ProtocolContext::new(self.handler.protocol.context_data(), &mut NetSyncIo::new(context)))
@@ -242,7 +244,7 @@ impl<B: BlockT + 'static, S: Specialization<B>> ExecuteInContext<B> for Service<
}
}
impl<B: BlockT + 'static, S: Specialization<B>> SyncProvider<B> for Service<B, S> {
impl<B: BlockT + 'static, S: Specialization<B>, H: ExHashT> SyncProvider<B> for Service<B, S, H> {
/// Get sync status
fn status(&self) -> ProtocolStatus<B> {
self.handler.protocol.status()
@@ -276,7 +278,7 @@ impl<B: BlockT + 'static, S: Specialization<B>> SyncProvider<B> for Service<B, S
}
}
impl<B: BlockT + 'static, S: Specialization<B>> NetworkProtocolHandler for ProtocolHandler<B, S> {
impl<B: BlockT + 'static, S: Specialization<B>, H: ExHashT> NetworkProtocolHandler for ProtocolHandler<B, S, H> {
fn initialize(&self, io: &NetworkContext) {
io.register_timer(TICK_TOKEN, TICK_TIMEOUT)
.expect("Error registering sync timer");
@@ -319,7 +321,7 @@ pub trait ManageNetwork: Send + Sync {
}
impl<B: BlockT + 'static, S: Specialization<B>> ManageNetwork for Service<B, S> {
impl<B: BlockT + 'static, S: Specialization<B>, H: ExHashT> ManageNetwork for Service<B, S, H> {
fn accept_unreserved_peers(&self) {
self.network.set_non_reserved_mode(NonReservedPeerMode::Accept);
}
+4 -4
View File
@@ -116,7 +116,7 @@ pub struct TestPacket {
pub struct Peer {
client: Arc<client::Client<test_client::Backend, test_client::Executor, Block>>,
pub sync: Protocol<Block, DummySpecialization>,
pub sync: Protocol<Block, DummySpecialization, Hash>,
pub queue: RwLock<VecDeque<TestPacket>>,
}
@@ -173,8 +173,8 @@ impl Peer {
fn flush(&self) {
}
fn generate_blocks<F>(&self, count: usize, mut edit_block: F)
where F: FnMut(&mut BlockBuilder<test_client::Backend, test_client::Executor, Block, KeccakHasher, RlpCodec>)
fn generate_blocks<F>(&self, count: usize, mut edit_block: F)
where F: FnMut(&mut BlockBuilder<test_client::Backend, test_client::Executor, Block, KeccakHasher, RlpCodec>)
{
for _ in 0 .. count {
let mut builder = self.client.new_block().unwrap();
@@ -207,7 +207,7 @@ impl Peer {
pub struct EmptyTransactionPool;
impl TransactionPool<Block> for EmptyTransactionPool {
impl TransactionPool<Hash, Block> for EmptyTransactionPool {
fn transactions(&self) -> Vec<(Hash, Extrinsic)> {
Vec::new()
}
+3 -2
View File
@@ -39,17 +39,18 @@ pub type HttpServer = http::Server;
pub type WsServer = ws::Server;
/// Construct rpc `IoHandler`
pub fn rpc_handler<Block, PendingExtrinsics, S, C, A, Y>(
pub fn rpc_handler<Block: BlockT, ExHash, PendingExtrinsics, S, C, A, Y>(
state: S,
chain: C,
author: A,
system: Y,
) -> RpcHandler where
Block: BlockT + 'static,
ExHash: Send + Sync + 'static + substrate_runtime_primitives::Serialize + substrate_runtime_primitives::DeserializeOwned,
PendingExtrinsics: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
S: apis::state::StateApi<Block::Hash, Metadata=Metadata>,
C: apis::chain::ChainApi<Block::Hash, Block::Header, Block::Extrinsic, Metadata=Metadata>,
A: apis::author::AuthorApi<Block::Hash, Block::Extrinsic, PendingExtrinsics, Metadata=Metadata>,
A: apis::author::AuthorApi<ExHash, Block::Extrinsic, PendingExtrinsics, Metadata=Metadata>,
Y: apis::system::SystemApi,
{
let mut io = pubsub::PubSubHandler::default();
+2 -3
View File
@@ -17,15 +17,14 @@
//! Authoring RPC module errors.
use client;
use extrinsic_pool::txpool;
use extrinsic_pool;
use rpc;
use errors;
error_chain! {
links {
Pool(txpool::Error, txpool::ErrorKind) #[doc = "Pool error"];
Pool(extrinsic_pool::Error, extrinsic_pool::ErrorKind) #[doc = "Pool error"];
Client(client::error::Error, client::error::ErrorKind) #[doc = "Client error"];
}
errors {
+32 -27
View File
@@ -19,10 +19,16 @@
use std::sync::Arc;
use client::{self, Client};
use codec::Codec;
use codec::Decode;
use extrinsic_pool::{
api::{Error, ExtrinsicPool},
Pool,
IntoPoolError,
ChainApi as PoolChainApi,
watcher::Status,
VerifiedTransaction,
AllExtrinsics,
ExHash,
ExtrinsicFor,
};
use jsonrpc_macros::pubsub;
use jsonrpc_pubsub::SubscriptionId;
@@ -54,7 +60,7 @@ build_rpc_trait! {
/// Returns all pending extrinsics, potentially grouped by sender.
#[rpc(name = "author_pendingExtrinsics")]
fn pending_extrinsics(&self) -> Result<PendingExtrinsics>;
#[pubsub(name = "author_extrinsicUpdate")] {
/// Submit an extrinsic to watch.
#[rpc(name = "author_submitAndWatchExtrinsic")]
@@ -64,22 +70,27 @@ build_rpc_trait! {
#[rpc(name = "author_unwatchExtrinsic")]
fn unwatch_extrinsic(&self, SubscriptionId) -> Result<bool>;
}
}
}
/// Authoring API
pub struct Author<B, E, Block: traits::Block, P> {
pub struct Author<B, E, P> where
P: PoolChainApi + Sync + Send + 'static,
{
/// Substrate client
client: Arc<Client<B, E, Block>>,
client: Arc<Client<B, E, <P as PoolChainApi>::Block>>,
/// Extrinsic pool
pool: Arc<P>,
pool: Arc<Pool<P>>,
/// Subscriptions manager
subscriptions: Subscriptions,
}
impl<B, E, Block: traits::Block, P> Author<B, E, Block, P> {
impl<B, E, P> Author<B, E, P> where
P: PoolChainApi + Sync + Send + 'static,
{
/// Create new instance of Authoring API.
pub fn new(client: Arc<Client<B, E, Block>>, pool: Arc<P>, executor: TaskExecutor) -> Self {
pub fn new(client: Arc<Client<B, E, <P as PoolChainApi>::Block>>, pool: Arc<Pool<P>>, executor: TaskExecutor) -> Self {
Author {
client,
pool,
@@ -88,45 +99,40 @@ impl<B, E, Block: traits::Block, P> Author<B, E, Block, P> {
}
}
impl<B, E, Block, P, Ex, Hash, InPool> AuthorApi<Hash, Ex, InPool> for Author<B, E, Block, P> where
B: client::backend::Backend<Block, KeccakHasher, RlpCodec> + Send + Sync + 'static,
E: client::CallExecutor<Block, KeccakHasher, RlpCodec> + Send + Sync + 'static,
Block: traits::Block + 'static,
Hash: traits::MaybeSerializeDebug + Send + Sync + 'static,
InPool: traits::MaybeSerializeDebug + Send + Sync + 'static,
P: ExtrinsicPool<Ex, generic::BlockId<Block>, Hash, InPool=InPool>,
impl<B, E, P> AuthorApi<ExHash<P>, ExtrinsicFor<P>, AllExtrinsics<P>> for Author<B, E, P> where
B: client::backend::Backend<<P as PoolChainApi>::Block, KeccakHasher, RlpCodec> + Send + Sync + 'static,
E: client::CallExecutor<<P as PoolChainApi>::Block, KeccakHasher, RlpCodec> + Send + Sync + 'static,
P: PoolChainApi + Sync + Send + 'static,
P::Error: 'static,
Ex: Codec,
{
type Metadata = ::metadata::Metadata;
fn submit_extrinsic(&self, xt: Bytes) -> Result<Hash> {
let dxt = Ex::decode(&mut &xt[..]).ok_or(error::Error::from(error::ErrorKind::BadFormat))?;
fn submit_extrinsic(&self, xt: Bytes) -> Result<ExHash<P>> {
let dxt = Decode::decode(&mut &xt[..]).ok_or(error::Error::from(error::ErrorKind::BadFormat))?;
self.submit_rich_extrinsic(dxt)
}
fn submit_rich_extrinsic(&self, xt: Ex) -> Result<Hash> {
fn submit_rich_extrinsic(&self, xt: <<P as PoolChainApi>::Block as traits::Block>::Extrinsic) -> Result<ExHash<P>> {
let best_block_hash = self.client.info()?.chain.best_hash;
self.pool
.submit(generic::BlockId::hash(best_block_hash), vec![xt])
.map(|mut res| res.pop().expect("One extrinsic passed; one result back; qed"))
.submit_one(&generic::BlockId::hash(best_block_hash), xt)
.map_err(|e| e.into_pool_error()
.map(Into::into)
.unwrap_or_else(|e| error::ErrorKind::Verification(Box::new(e)).into())
)
.map(|ex| ex.hash().clone())
}
fn pending_extrinsics(&self) -> Result<InPool> {
fn pending_extrinsics(&self) -> Result<AllExtrinsics<P>> {
Ok(self.pool.all())
}
fn watch_extrinsic(&self, _metadata: Self::Metadata, subscriber: pubsub::Subscriber<Status<Hash>>, xt: Bytes) {
fn watch_extrinsic(&self, _metadata: Self::Metadata, subscriber: pubsub::Subscriber<Status<ExHash<P>>>, xt: Bytes) {
let submit = || -> Result<_> {
let best_block_hash = self.client.info()?.chain.best_hash;
let dxt = Ex::decode(&mut &xt[..]).ok_or(error::Error::from(error::ErrorKind::BadFormat))?;
let dxt = <<P as PoolChainApi>::Block as traits::Block>::Extrinsic::decode(&mut &xt[..]).ok_or(error::Error::from(error::ErrorKind::BadFormat))?;
self.pool
.submit_and_watch(generic::BlockId::hash(best_block_hash), dxt)
.submit_and_watch(&generic::BlockId::hash(best_block_hash), dxt)
.map_err(|e| e.into_pool_error()
.map(Into::into)
.unwrap_or_else(|e| error::ErrorKind::Verification(Box::new(e)).into())
@@ -154,4 +160,3 @@ impl<B, E, Block, P, Ex, Hash, InPool> AuthorApi<Hash, Ex, InPool> for Author<B,
Ok(self.subscriptions.cancel(id))
}
}
+83 -70
View File
@@ -16,74 +16,87 @@
use super::*;
use std::{fmt, sync::Arc, result::Result};
use std::{sync::Arc, result::Result};
use codec::Encode;
use extrinsic_pool::{api, txpool, watcher::{self, Watcher}};
use parking_lot::Mutex;
use extrinsic_pool::{VerifiedTransaction, scoring, Transaction, ChainApi, Error as PoolError,
Readiness, ExtrinsicFor, VerifiedFor};
use test_client::runtime::{Block, Extrinsic, Transfer};
use test_client;
use tokio::runtime;
use runtime_primitives::generic::BlockId;
type Extrinsic = u64;
type Hash = u64;
#[derive(Default)]
struct DummyTxPool {
submitted: Mutex<Vec<Extrinsic>>,
sender: Mutex<Option<watcher::Sender<u64>>>,
#[derive(Clone, Debug)]
pub struct Verified
{
sender: u64,
hash: u64,
}
#[derive(Debug)]
struct Error;
impl api::Error for Error {}
impl ::std::error::Error for Error {
fn description(&self) -> &str { "Error" }
impl VerifiedTransaction for Verified {
type Hash = u64;
type Sender = u64;
fn hash(&self) -> &Self::Hash { &self.hash }
fn sender(&self) -> &Self::Sender { &self.sender }
fn mem_usage(&self) -> usize { 256 }
}
impl fmt::Display for Error {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt::Debug::fmt(self, fmt)
struct TestApi;
impl ChainApi for TestApi {
type Block = Block;
type Hash = u64;
type Sender = u64;
type Error = PoolError;
type VEx = Verified;
type Score = u64;
type Event = ();
type Ready = ();
fn verify_transaction(&self, _at: &BlockId<Block>, uxt: &ExtrinsicFor<Self>) -> Result<Self::VEx, Self::Error> {
Ok(Verified {
sender: uxt.transfer.from[31] as u64,
hash: uxt.transfer.nonce,
})
}
}
impl<BlockHash> api::ExtrinsicPool<Extrinsic, BlockHash, u64> for DummyTxPool {
type Error = Error;
type InPool = Vec<u8>;
fn is_ready(&self, _at: &BlockId<Block>, _c: &mut Self::Ready, _xt: &VerifiedFor<Self>) -> Readiness {
Readiness::Ready
}
fn ready(&self) -> Self::Ready { }
/// Submit extrinsic for inclusion in block.
fn submit(&self, _block: BlockHash, xt: Vec<Extrinsic>) -> Result<Vec<Hash>, Self::Error> {
let mut submitted = self.submitted.lock();
if submitted.len() < 1 {
let hashes = xt.iter().map(|_xt| 1).collect();
submitted.extend(xt);
Ok(hashes)
} else {
Err(Error)
fn compare(old: &VerifiedFor<Self>, other: &VerifiedFor<Self>) -> ::std::cmp::Ordering {
old.verified.hash().cmp(&other.verified.hash())
}
fn choose(_old: &VerifiedFor<Self>, _new: &VerifiedFor<Self>) -> scoring::Choice {
scoring::Choice::ReplaceOld
}
fn update_scores(xts: &[Transaction<VerifiedFor<Self>>], scores: &mut [Self::Score], _change: scoring::Change<()>) {
for i in 0..xts.len() {
scores[i] = xts[i].verified.sender
}
}
fn submit_and_watch(&self, _block: BlockHash, xt: Extrinsic) -> Result<Watcher<u64>, Self::Error> {
let mut submitted = self.submitted.lock();
if submitted.len() < 1 {
submitted.push(xt);
let mut sender = watcher::Sender::default();
let watcher = sender.new_watcher();
*self.sender.lock() = Some(sender);
Ok(watcher)
} else {
Err(Error)
fn should_replace(_old: &VerifiedFor<Self>, _new: &VerifiedFor<Self>) -> scoring::Choice {
scoring::Choice::ReplaceOld
}
}
type DummyTxPool = Pool<TestApi>;
fn uxt(sender: u64, hash: u64) -> Extrinsic {
Extrinsic {
signature: Default::default(),
transfer: Transfer {
amount: Default::default(),
nonce: hash,
from: From::from(sender),
to: Default::default(),
}
}
fn light_status(&self) -> txpool::LightStatus {
unreachable!()
}
fn import_notification_stream(&self) -> api::EventStream {
unreachable!()
}
fn all(&self) -> Self::InPool {
vec![1, 2, 3, 4, 5]
}
}
#[test]
@@ -91,16 +104,16 @@ fn submit_transaction_should_not_cause_error() {
let runtime = runtime::Runtime::new().unwrap();
let p = Author {
client: Arc::new(test_client::new()),
pool: Arc::new(DummyTxPool::default()),
pool: Arc::new(DummyTxPool::new(Default::default(), TestApi)),
subscriptions: Subscriptions::new(runtime.executor()),
};
assert_matches!(
AuthorApi::submit_extrinsic(&p, u64::encode(&5).into()),
AuthorApi::submit_extrinsic(&p, uxt(5, 1).encode().into()),
Ok(1)
);
assert!(
AuthorApi::submit_extrinsic(&p, u64::encode(&5).into()).is_err()
AuthorApi::submit_extrinsic(&p, uxt(5, 1).encode().into()).is_err()
);
}
@@ -109,16 +122,16 @@ fn submit_rich_transaction_should_not_cause_error() {
let runtime = runtime::Runtime::new().unwrap();
let p = Author {
client: Arc::new(test_client::new()),
pool: Arc::new(DummyTxPool::default()),
pool: Arc::new(DummyTxPool::new(Default::default(), TestApi)),
subscriptions: Subscriptions::new(runtime.executor()),
};
assert_matches!(
AuthorApi::submit_rich_extrinsic(&p, 5),
Ok(1)
AuthorApi::submit_rich_extrinsic(&p, uxt(5, 0)),
Ok(0)
);
assert!(
AuthorApi::submit_rich_extrinsic(&p, 5).is_err()
AuthorApi::submit_rich_extrinsic(&p, uxt(5, 0)).is_err()
);
}
@@ -126,7 +139,7 @@ fn submit_rich_transaction_should_not_cause_error() {
fn should_watch_extrinsic() {
//given
let mut runtime = runtime::Runtime::new().unwrap();
let pool = Arc::new(DummyTxPool::default());
let pool = Arc::new(DummyTxPool::new(Default::default(), TestApi));
let p = Author {
client: Arc::new(test_client::new()),
pool: pool.clone(),
@@ -135,31 +148,31 @@ fn should_watch_extrinsic() {
let (subscriber, id_rx, data) = ::jsonrpc_macros::pubsub::Subscriber::new_test("test");
// when
p.watch_extrinsic(Default::default(), subscriber, u64::encode(&5).into());
p.watch_extrinsic(Default::default(), subscriber, uxt(5, 5).encode().into());
// then
assert_eq!(runtime.block_on(id_rx), Ok(Ok(0.into())));
// check notifications
pool.sender.lock().as_mut().unwrap().usurped(5);
AuthorApi::submit_rich_extrinsic(&p, uxt(5, 1)).unwrap();
assert_eq!(
runtime.block_on(data.into_future()).unwrap().0,
Some(r#"{"jsonrpc":"2.0","method":"test","params":{"result":{"usurped":5},"subscription":0}}"#.into())
Some(r#"{"jsonrpc":"2.0","method":"test","params":{"result":{"usurped":1},"subscription":0}}"#.into())
);
}
#[test]
fn should_return_pending_extrinsics() {
let runtime = runtime::Runtime::new().unwrap();
let pool = Arc::new(DummyTxPool::new(Default::default(), TestApi));
let p = Author {
client: Arc::new(test_client::new()),
pool: Arc::new(DummyTxPool::default()),
pool: pool.clone(),
subscriptions: Subscriptions::new(runtime.executor()),
};
assert_matches!(
let ex = uxt(5, 1);
AuthorApi::submit_rich_extrinsic(&p, ex.clone()).unwrap();
assert_matches!(
p.pending_extrinsics(),
Ok(ref expected) if expected == &[1u8, 2, 3, 4, 5]
Ok(ref expected) if expected.get(&5) == Some(&vec![ex])
);
}
@@ -62,6 +62,9 @@ pub mod bft;
use traits::{Verify, Lazy};
#[cfg(feature = "std")]
pub use serde::{Serialize, de::DeserializeOwned};
/// A set of key value pairs for storage.
#[cfg(feature = "std")]
pub type StorageMap = HashMap<Vec<u8>, Vec<u8>>;
+30 -32
View File
@@ -16,6 +16,7 @@
//! Polkadot service components.
use std::fmt;
use std::sync::Arc;
use std::marker::PhantomData;
use serde::{Serialize, de::DeserializeOwned};
@@ -25,8 +26,8 @@ use client::{self, Client};
use error;
use network::{self, OnDemand};
use substrate_executor::{NativeExecutor, NativeExecutionDispatch};
use extrinsic_pool::{txpool::Options as ExtrinsicPoolOptions, api::ExtrinsicPool as ExtrinsicPoolApi};
use runtime_primitives::{traits::Block as BlockT, traits::Header as HeaderT, generic::BlockId, BuildStorage};
use extrinsic_pool::{self, Options as ExtrinsicPoolOptions, Pool as ExtrinsicPool};
use runtime_primitives::{traits::Block as BlockT, traits::Header as HeaderT, BuildStorage};
use config::Configuration;
use primitives::{KeccakHasher, RlpCodec, H256};
@@ -35,7 +36,8 @@ use primitives::{KeccakHasher, RlpCodec, H256};
/// Network service type for a factory.
pub type NetworkService<F> = network::Service<
<F as ServiceFactory>::Block,
<F as ServiceFactory>::NetworkProtocol
<F as ServiceFactory>::NetworkProtocol,
<F as ServiceFactory>::ExtrinsicHash,
>;
/// Code executor type for a factory.
@@ -53,8 +55,7 @@ pub type FullExecutor<F> = client::LocalCallExecutor<
/// Light client backend type for a factory.
pub type LightBackend<F> = client::light::backend::Backend<
client_db::light::LightStorage<<F as ServiceFactory>::Block>,
network::OnDemand<<F as ServiceFactory>::Block,
NetworkService<F>>
network::OnDemand<<F as ServiceFactory>::Block, NetworkService<F>>,
>;
/// Light client executor type for a factory.
@@ -97,25 +98,33 @@ pub type ComponentClient<C> = Client<
/// Block type for `Components`
pub type ComponentBlock<C> = <<C as Components>::Factory as ServiceFactory>::Block;
/// Extrinsic hash type for `Components`
pub type ComponentExHash<C> = <<C as Components>::ExtrinsicPoolApi as extrinsic_pool::ChainApi>::Hash;
/// Extrinsic type.
pub type ComponentExtrinsic<C> = <ComponentBlock<C> as BlockT>::Extrinsic;
/// Extrinsic pool API type for `Components`.
pub type PoolApi<C> = <<C as Components>::ExtrinsicPool as ExtrinsicPool<ComponentBlock<C>>>::Api;
pub type PoolApi<C> = <C as Components>::ExtrinsicPoolApi;
/// A set of traits for the runtime genesis config.
pub trait RuntimeGenesis: Serialize + DeserializeOwned + BuildStorage {}
impl<T: Serialize + DeserializeOwned + BuildStorage> RuntimeGenesis for T {}
/// A collection of types and methods to build a service on top of the substrate service.
pub trait ServiceFactory {
pub trait ServiceFactory: 'static {
/// Block type.
type Block: BlockT;
/// Extrinsic hash type.
type ExtrinsicHash: ::std::hash::Hash + Eq + Copy + fmt::Debug + fmt::LowerHex + Serialize + DeserializeOwned + ::std::str::FromStr + Send + Sync + Default + 'static;
/// Network protocol extensions.
type NetworkProtocol: network::specialization::Specialization<Self::Block>;
/// Chain runtime.
type RuntimeDispatch: NativeExecutionDispatch + Send + Sync + 'static;
/// Extrinsic pool type for the full client.
type FullExtrinsicPool: ExtrinsicPool<Self::Block>;
/// Extrinsic pool type for the light client.
type LightExtrinsicPool: ExtrinsicPool<Self::Block>;
/// Extrinsic pool backend type for the full client.
type FullExtrinsicPoolApi: extrinsic_pool::ChainApi<Hash=Self::ExtrinsicHash, Block=Self::Block> + Send + 'static;
/// Extrinsic pool backend type for the light client.
type LightExtrinsicPoolApi: extrinsic_pool::ChainApi<Hash=Self::ExtrinsicHash, Block=Self::Block> + 'static;
/// Genesis configuration for the runtime.
type Genesis: RuntimeGenesis;
/// Other configuration for service members.
@@ -127,29 +136,18 @@ pub trait ServiceFactory {
//TODO: replace these with a constructor trait. that ExtrinsicPool implements.
/// Extrinsic pool constructor for the full client.
fn build_full_extrinsic_pool(config: ExtrinsicPoolOptions, client: Arc<FullClient<Self>>)
-> Result<Self::FullExtrinsicPool, error::Error>;
-> Result<ExtrinsicPool<Self::FullExtrinsicPoolApi>, error::Error>;
/// Extrinsic pool constructor for the light client.
fn build_light_extrinsic_pool(config: ExtrinsicPoolOptions, client: Arc<LightClient<Self>>)
-> Result<Self::LightExtrinsicPool, error::Error>;
-> Result<ExtrinsicPool<Self::LightExtrinsicPoolApi>, error::Error>;
/// Build network protocol.
fn build_network_protocol(config: &FactoryFullConfiguration<Self>)
-> Result<Self::NetworkProtocol, error::Error>;
}
// TODO: move this to substrate-extrinsic-pool
/// Extrinsic pool bridge.
pub trait ExtrinsicPool<Block: BlockT>: network::TransactionPool<Block> + Send + Sync + 'static {
type Api: ExtrinsicPoolApi<Block::Extrinsic, BlockId<Block>, Block::Hash>;
/// Update the pool after a new block has been imported.
fn prune_imported(&self, hash: &Block::Hash);
/// Returns underlying API.
fn api(&self) -> Arc<Self::Api>;
}
}
/// A collection of types and function to generalise over full / light client type.
pub trait Components {
pub trait Components: 'static {
/// Associated service factory.
type Factory: ServiceFactory;
/// Client backend.
@@ -157,7 +155,7 @@ pub trait Components {
/// Client executor.
type Executor: 'static + client::CallExecutor<FactoryBlock<Self::Factory>, KeccakHasher, RlpCodec> + Send + Sync;
/// Extrinsic pool type.
type ExtrinsicPool: ExtrinsicPool<FactoryBlock<Self::Factory>>;
type ExtrinsicPoolApi: 'static + extrinsic_pool::ChainApi<Hash=<Self::Factory as ServiceFactory>::ExtrinsicHash, Block=FactoryBlock<Self::Factory>>;
/// Create client.
fn build_client(
@@ -171,7 +169,7 @@ pub trait Components {
/// Create extrinsic pool.
fn build_extrinsic_pool(config: ExtrinsicPoolOptions, client: Arc<ComponentClient<Self>>)
-> Result<Self::ExtrinsicPool, error::Error>;
-> Result<ExtrinsicPool<Self::ExtrinsicPoolApi>, error::Error>;
}
/// A struct that implement `Components` for the full client.
@@ -183,7 +181,7 @@ impl<Factory: ServiceFactory> Components for FullComponents<Factory> {
type Factory = Factory;
type Executor = FullExecutor<Factory>;
type Backend = FullBackend<Factory>;
type ExtrinsicPool = <Factory as ServiceFactory>::FullExtrinsicPool;
type ExtrinsicPoolApi = <Factory as ServiceFactory>::FullExtrinsicPoolApi;
fn build_client(
config: &FactoryFullConfiguration<Factory>,
@@ -203,7 +201,7 @@ impl<Factory: ServiceFactory> Components for FullComponents<Factory> {
}
fn build_extrinsic_pool(config: ExtrinsicPoolOptions, client: Arc<ComponentClient<Self>>)
-> Result<Self::ExtrinsicPool, error::Error>
-> Result<ExtrinsicPool<Self::ExtrinsicPoolApi>, error::Error>
{
Factory::build_full_extrinsic_pool(config, client)
}
@@ -221,7 +219,7 @@ impl<Factory: ServiceFactory> Components for LightComponents<Factory>
type Factory = Factory;
type Executor = LightExecutor<Factory>;
type Backend = LightBackend<Factory>;
type ExtrinsicPool = <Factory as ServiceFactory>::LightExtrinsicPool;
type ExtrinsicPoolApi = <Factory as ServiceFactory>::LightExtrinsicPoolApi;
fn build_client(
config: &FactoryFullConfiguration<Factory>,
@@ -248,7 +246,7 @@ impl<Factory: ServiceFactory> Components for LightComponents<Factory>
}
fn build_extrinsic_pool(config: ExtrinsicPoolOptions, client: Arc<ComponentClient<Self>>)
-> Result<Self::ExtrinsicPool, error::Error>
-> Result<ExtrinsicPool<Self::ExtrinsicPoolApi>, error::Error>
{
Factory::build_light_extrinsic_pool(config, client)
}
+1 -1
View File
@@ -38,7 +38,7 @@ pub struct Configuration<C, G: Serialize + DeserializeOwned + BuildStorage> {
/// Node roles.
pub roles: Roles,
/// Extrinsic pool configuration.
pub extrinsic_pool: extrinsic_pool::txpool::Options,
pub extrinsic_pool: extrinsic_pool::Options,
/// Network configuration.
pub network: NetworkConfiguration,
/// Path to key files.
+95 -14
View File
@@ -57,34 +57,37 @@ pub mod chain_ops;
use std::io;
use std::net::SocketAddr;
use std::sync::Arc;
use std::collections::HashMap;
use std::fmt::Write;
use futures::prelude::*;
use keystore::Store as Keystore;
use client::BlockchainEvents;
use runtime_primitives::traits::{Header, As};
use runtime_primitives::generic::BlockId;
use exit_future::Signal;
use tokio::runtime::TaskExecutor;
use substrate_executor::NativeExecutor;
use codec::{Encode, Decode};
pub use self::error::{ErrorKind, Error};
pub use config::{Configuration, Roles, PruningMode};
pub use chain_spec::ChainSpec;
pub use extrinsic_pool::txpool::{Options as ExtrinsicPoolOptions};
pub use extrinsic_pool::api::{ExtrinsicPool as ExtrinsicPoolApi};
pub use extrinsic_pool::{Pool as ExtrinsicPool, Options as ExtrinsicPoolOptions, ChainApi, VerifiedTransaction, IntoPoolError};
pub use client::ExecutionStrategy;
pub use components::{ServiceFactory, FullBackend, FullExecutor, LightBackend,
LightExecutor, ExtrinsicPool, Components, PoolApi, ComponentClient,
LightExecutor, Components, PoolApi, ComponentClient,
ComponentBlock, FullClient, LightClient, FullComponents, LightComponents,
CodeExecutor, NetworkService, FactoryChainSpec, FactoryBlock,
FactoryFullConfiguration, RuntimeGenesis, FactoryGenesis,
FactoryFullConfiguration, RuntimeGenesis, FactoryGenesis,
ComponentExHash, ComponentExtrinsic,
};
/// Substrate service.
pub struct Service<Components: components::Components> {
client: Arc<ComponentClient<Components>>,
network: Option<Arc<components::NetworkService<Components::Factory>>>,
extrinsic_pool: Arc<Components::ExtrinsicPool>,
extrinsic_pool: Arc<ExtrinsicPool<Components::ExtrinsicPoolApi>>,
keystore: Keystore,
exit: ::exit_future::Exit,
signal: Option<Signal>,
@@ -149,7 +152,11 @@ impl<Components> Service<Components>
let extrinsic_pool = Arc::new(
Components::build_extrinsic_pool(config.extrinsic_pool, client.clone())?
);
let extrinsic_pool_adapter = extrinsic_pool.clone();
let extrinsic_pool_adapter = ExtrinsicPoolAdapter::<Components> {
imports_external_transactions: !config.roles == Roles::LIGHT,
pool: extrinsic_pool.clone(),
client: client.clone(),
};
let network_params = network::Params {
config: network::ProtocolConfig {
@@ -159,7 +166,7 @@ impl<Components> Service<Components>
chain: client.clone(),
on_demand: on_demand.clone()
.map(|d| d as Arc<network::OnDemandService<ComponentBlock<Components>>>),
transaction_pool: extrinsic_pool_adapter,
transaction_pool: Arc::new(extrinsic_pool_adapter),
specialization: network_protocol,
};
@@ -174,7 +181,8 @@ impl<Components> Service<Components>
let events = client.import_notification_stream()
.for_each(move |notification| {
network.on_block_imported(notification.hash, &notification.header);
txpool.prune_imported(&notification.hash);
txpool.cull(&BlockId::hash(notification.hash))
.map_err(|e| warn!("Error removing extrinsics: {:?}", e))?;
Ok(())
})
.select(exit.clone())
@@ -185,7 +193,7 @@ impl<Components> Service<Components>
{
// extrinsic notifications
let network = network.clone();
let events = extrinsic_pool.api().import_notification_stream()
let events = extrinsic_pool.import_notification_stream()
// TODO [ToDr] Consider throttling?
.for_each(move |_| {
network.trigger_repropagate();
@@ -209,9 +217,8 @@ impl<Components> Service<Components>
let client = client.clone();
let chain = rpc::apis::chain::Chain::new(client.clone(), task_executor.clone());
let state = rpc::apis::state::State::new(client.clone(), task_executor.clone());
let author = rpc::apis::author::Author::new(client.clone(), extrinsic_pool.api(), task_executor.clone());
rpc::rpc_handler::<ComponentBlock<Components>, _, _, _, _, _>(
let author = rpc::apis::author::Author::new(client.clone(), extrinsic_pool.clone(), task_executor.clone());
rpc::rpc_handler::<ComponentBlock<Components>, ComponentExHash<Components>, _, _, _, _, _>(
state,
chain,
author,
@@ -278,8 +285,8 @@ impl<Components> Service<Components>
}
/// Get shared extrinsic pool instance.
pub fn extrinsic_pool(&self) -> Arc<PoolApi<Components>> {
self.extrinsic_pool.api()
pub fn extrinsic_pool(&self) -> Arc<ExtrinsicPool<Components::ExtrinsicPoolApi>> {
self.extrinsic_pool.clone()
}
/// Get shared keystore.
@@ -343,3 +350,77 @@ impl substrate_rpc::system::SystemApi for RpcConfig {
Ok(self.chain_name.clone())
}
}
/// Transaction pool adapter.
pub struct ExtrinsicPoolAdapter<C: Components> {
imports_external_transactions: bool,
pool: Arc<ExtrinsicPool<C::ExtrinsicPoolApi>>,
client: Arc<ComponentClient<C>>,
}
impl<C: Components> ExtrinsicPoolAdapter<C> {
fn best_block_id(&self) -> Option<BlockId<ComponentBlock<C>>> {
self.client.info()
.map(|info| BlockId::hash(info.chain.best_hash))
.map_err(|e| {
debug!("Error getting best block: {:?}", e);
})
.ok()
}
}
impl<C: Components> network::TransactionPool<ComponentExHash<C>, ComponentBlock<C>> for ExtrinsicPoolAdapter<C> {
fn transactions(&self) -> Vec<(ComponentExHash<C>, ComponentExtrinsic<C>)> {
let best_block_id = match self.best_block_id() {
Some(id) => id,
None => return vec![],
};
self.pool.cull_and_get_pending(&best_block_id, |pending| pending
.map(|t| {
let hash = t.hash().clone();
let ex: ComponentExtrinsic<C> = t.original.clone();
(hash, ex)
})
.collect()
).unwrap_or_else(|e| {
warn!("Error retrieving pending set: {}", e);
vec![]
})
}
fn import(&self, transaction: &ComponentExtrinsic<C>) -> Option<ComponentExHash<C>> {
if !self.imports_external_transactions {
return None;
}
let encoded = transaction.encode();
if let Some(uxt) = Decode::decode(&mut &encoded[..]) {
let best_block_id = self.best_block_id()?;
match self.pool.submit_one(&best_block_id, uxt) {
Ok(xt) => Some(*xt.hash()),
Err(e) => match e.into_pool_error() {
Ok(e) => match e.kind() {
extrinsic_pool::ErrorKind::AlreadyImported(hash) =>
Some(::std::str::FromStr::from_str(&hash).map_err(|_| {})
.expect("Hash string is always valid")),
_ => {
debug!("Error adding transaction to the pool: {:?}", e);
None
},
},
Err(e) => {
debug!("Error converting pool error: {:?}", e);
None
}
}
}
} else {
debug!("Error decoding transaction");
None
}
}
fn on_broadcasted(&self, propagations: HashMap<ComponentExHash<C>, Vec<String>>) {
self.pool.on_broadcasted(propagations)
}
}