Extrinsic pool (#182)

* Use latest version of txpool.

* Initial version of the pool.

* Fix abstraction.

* Implement watchers and notifications.

* Return hash from RPC.

* Remove commented code.

* Remove client dep.

* Fix tests.
This commit is contained in:
Tomasz Drwięga
2018-05-31 22:49:17 +02:00
committed by Gav Wood
parent 1b997de958
commit 59adb4767b
8 changed files with 222 additions and 248 deletions
+2 -19
View File
@@ -22,7 +22,7 @@ pub use network::NetworkConfiguration;
/// The chain specification (this should eventually be replaced by a more general JSON-based chain
/// specification).
#[derive(Clone)]
#[derive(Clone, Copy)]
pub enum ChainSpec {
/// Whatever the current runtime is, with just Alice as an auth.
Development,
@@ -33,6 +33,7 @@ pub enum ChainSpec {
}
/// Service configuration.
#[derive(Clone)]
pub struct Configuration {
/// Node roles.
pub roles: Role,
@@ -63,21 +64,3 @@ impl Default for Configuration {
}
}
}
impl Clone for Configuration {
fn clone(&self) -> Configuration {
Configuration {
roles: self.roles.clone(),
transaction_pool: transaction_pool::Options {
max_count: self.transaction_pool.max_count.clone(),
max_mem_usage: self.transaction_pool.max_mem_usage.clone(),
max_per_sender: self.transaction_pool.max_per_sender.clone(),
},
network: self.network.clone(),
keystore_path: self.keystore_path.clone(),
database_path: self.database_path.clone(),
keys: self.keys.clone(),
chain_spec: self.chain_spec.clone(),
}
}
}
+52 -39
View File
@@ -50,14 +50,14 @@ extern crate hex_literal;
mod error;
mod config;
use std::collections::HashMap;
use std::sync::Arc;
use std::thread;
use futures::prelude::*;
use parking_lot::Mutex;
use tokio_core::reactor::Core;
use codec::Slicable;
use primitives::block::{Id as BlockId, Extrinsic, ExtrinsicHash, HeaderHash, Header};
use primitives::{AuthorityId, hashing};
use primitives::block::{Id as BlockId, ExtrinsicHash, HeaderHash, Header};
use primitives::{AuthorityId};
use transaction_pool::TransactionPool;
use substrate_executor::NativeExecutor;
use polkadot_executor::Executor as LocalDispatch;
@@ -80,13 +80,13 @@ pub struct Service<B, E> {
thread: Option<thread::JoinHandle<()>>,
client: Arc<Client<B, E>>,
network: Arc<network::Service>,
transaction_pool: Arc<Mutex<TransactionPool>>,
transaction_pool: Arc<TransactionPool>,
signal: Option<Signal>,
_consensus: Option<consensus::Service>,
}
struct TransactionPoolAdapter<B, E, A> where A: Send + Sync, E: Send + Sync {
pool: Arc<Mutex<TransactionPool>>,
pool: Arc<TransactionPool>,
client: Arc<Client<B, E>>,
api: Arc<A>,
}
@@ -108,19 +108,22 @@ impl<B, E, A> network::TransactionPool for TransactionPoolAdapter<B, E, A>
};
let id = self.api.check_id(BlockId::Hash(best_block)).expect("Best block is always valid; qed.");
let mut pool = self.pool.lock();
pool.cull(None, transaction_pool::Ready::create(id.clone(), &*self.api));
pool.pending(transaction_pool::Ready::create(id, &*self.api)).map(|t| {
let hash = ::primitives::Hash::from(&t.hash()[..]);
let tx = codec::Slicable::encode(t.as_transaction());
(hash, tx)
}).collect()
let ready = transaction_pool::Ready::create(id, &*self.api);
self.pool.cull_and_get_pending(ready, |pending| pending
.map(|t| {
let hash = ::primitives::Hash::from(&t.hash()[..]);
let tx = codec::Slicable::encode(t.as_transaction());
(hash, tx)
})
.collect()
)
}
fn import(&self, transaction: &[u8]) -> Option<ExtrinsicHash> {
if let Some(tx) = codec::Slicable::decode(&mut &transaction[..]) {
match self.pool.lock().import(tx) {
Ok(t) => Some(t.hash()[..].into()),
if let Some(uxt) = codec::Slicable::decode(&mut &transaction[..]) {
match self.pool.import_unchecked_extrinsic(uxt) {
Ok(xt) => Some(*xt.hash()),
Err(e) => match *e.kind() {
transaction_pool::ErrorKind::AlreadyImported(hash) => Some(hash[..].into()),
_ => {
@@ -134,6 +137,10 @@ impl<B, E, A> network::TransactionPool for TransactionPoolAdapter<B, E, A>
None
}
}
fn on_broadcasted(&self, propagations: HashMap<ExtrinsicHash, Vec<String>>) {
self.pool.on_broadcasted(propagations)
}
}
pub struct ChainConfig {
@@ -341,7 +348,7 @@ impl<B, E> Service<B, E>
C: Fn(
Arc<Client<B, E>>,
Arc<network::Service>,
Arc<Mutex<TransactionPool>>,
Arc<TransactionPool>,
&Keystore
) -> Result<Option<consensus::Service>, error::Error>,
A: PolkadotApi + Send + Sync + 'static,
@@ -383,7 +390,7 @@ impl<B, E> Service<B, E>
let api = api_creator(client.clone());
let best_header = client.best_block_header()?;
info!("Starting Polkadot. Best block is #{}", best_header.number);
let transaction_pool = Arc::new(Mutex::new(TransactionPool::new(config.transaction_pool)));
let transaction_pool = Arc::new(TransactionPool::new(config.transaction_pool));
let transaction_pool_adapter = Arc::new(TransactionPoolAdapter {
pool: transaction_pool.clone(),
client: client.clone(),
@@ -414,14 +421,28 @@ impl<B, E> Service<B, E>
thread_barrier.wait();
let mut core = Core::new().expect("tokio::Core could not be created");
let events = client.import_notification_stream().for_each(move |notification| {
network.on_block_imported(notification.hash, &notification.header);
prune_imported(&*client, &*txpool, notification.hash);
Ok(())
});
// block notifications
let network1 = network.clone();
let txpool1 = txpool.clone();
let events = client.import_notification_stream()
.for_each(move |notification| {
network1.on_block_imported(notification.hash, &notification.header);
prune_imported(&*api, &*txpool1, notification.hash);
Ok(())
});
core.handle().spawn(events);
// transaction notifications
let events = txpool.import_notification_stream()
// TODO [ToDr] Consider throttling?
.for_each(move |_| {
network.trigger_repropagate();
Ok(())
});
core.handle().spawn(events);
if let Err(e) = core.run(exit) {
debug!("Polkadot service event loop shutdown with {:?}", e);
}
@@ -457,30 +478,22 @@ impl<B, E> Service<B, E>
}
/// Get shared transaction pool instance.
pub fn transaction_pool(&self) -> Arc<Mutex<TransactionPool>> {
pub fn transaction_pool(&self) -> Arc<TransactionPool> {
self.transaction_pool.clone()
}
}
fn prune_transactions(pool: &mut TransactionPool, extrinsics: &[Extrinsic]) {
for extrinsic in extrinsics {
let hash: _ = hashing::blake2_256(&extrinsic.encode()).into();
pool.remove(&hash, true);
}
}
/// Produce a task which prunes any finalized transactions from the pool.
pub fn prune_imported<B, E>(client: &Client<B, E>, pool: &Mutex<TransactionPool>, hash: HeaderHash)
pub fn prune_imported<A>(api: &A, pool: &TransactionPool, hash: HeaderHash)
where
B: Backend + Send + Sync,
E: CallExecutor + Send + Sync,
client::error::Error: From<<<B as Backend>::State as state_machine::backend::Backend>::Error>
A: PolkadotApi,
{
let id = BlockId::Hash(hash);
match client.body(&id) {
Ok(Some(body)) => prune_transactions(&mut *pool.lock(), &body[..]),
Ok(None) => warn!("Missing imported block {:?}", hash),
Err(e) => warn!("Failed to fetch block: {:?}", e),
match api.check_id(BlockId::Hash(hash)) {
Ok(id) => {
let ready = transaction_pool::Ready::create(id, api);
pool.cull(None, ready);
},
Err(e) => warn!("Failed to check block id: {:?}", e),
}
}