prune finalized transactions from the pool (#127)

This commit is contained in:
Robert Habermeier
2018-04-15 12:53:53 +02:00
committed by Gav Wood
parent 2bafd5c0be
commit 39f78aa7c4
+24 -5
View File
@@ -53,8 +53,8 @@ use futures::prelude::*;
use parking_lot::Mutex;
use tokio_core::reactor::Core;
use codec::Slicable;
use runtime_io::with_externalities;
use primitives::block::{Id as BlockId, TransactionHash};
use primitives::block::{Id as BlockId, Extrinsic, ExtrinsicHash, HeaderHash};
use primitives::hashing;
use transaction_pool::TransactionPool;
use substrate_executor::NativeExecutor;
use polkadot_executor::Executor as LocalDispatch;
@@ -71,7 +71,6 @@ pub use config::{Configuration, Role, ChainSpec};
type Client = client::Client<InMemory, NativeExecutor<LocalDispatch>>;
/// Polkadot service.
pub struct Service {
thread: Option<thread::JoinHandle<()>>,
@@ -87,7 +86,7 @@ struct TransactionPoolAdapter {
}
impl network::TransactionPool for TransactionPoolAdapter {
fn transactions(&self) -> Vec<(TransactionHash, Vec<u8>)> {
fn transactions(&self) -> Vec<(ExtrinsicHash, Vec<u8>)> {
let best_block = match self.client.info() {
Ok(info) => info.chain.best_hash,
Err(e) => {
@@ -104,7 +103,7 @@ impl network::TransactionPool for TransactionPoolAdapter {
}).collect()
}
fn import(&self, transaction: &[u8]) -> Option<TransactionHash> {
fn import(&self, transaction: &[u8]) -> Option<ExtrinsicHash> {
if let Some(tx) = codec::Slicable::decode(&mut &transaction[..]) {
match self.pool.lock().import(tx) {
Ok(t) => Some(t.hash()[..].into()),
@@ -299,11 +298,14 @@ impl Service {
let thread_client = client.clone();
let thread_network = network.clone();
let thread_txpool = transaction_pool.clone();
let thread = thread::spawn(move || {
thread_network.start_network();
let mut core = Core::new().expect("tokio::Core could not be created");
let events = thread_client.import_notification_stream().for_each(|notification| {
thread_network.on_block_imported(notification.hash, &notification.header);
prune_imported(&*thread_client, &*thread_txpool, notification.hash);
Ok(())
});
if let Err(e) = core.run(events) {
@@ -336,6 +338,23 @@ impl Service {
}
}
fn prune_transactions(pool: &mut TransactionPool, extrinsics: &[Extrinsic]) {
for extrinsic in extrinsics {
let hash: _ = hashing::blake2_256(&extrinsic.encode()).into();
pool.remove(&hash, true);
}
}
/// Produce a task which prunes any finalized transactions from the pool.
pub fn prune_imported(client: &Client, pool: &Mutex<TransactionPool>, hash: HeaderHash) {
let id = BlockId::Hash(hash);
match client.body(&id) {
Ok(Some(body)) => prune_transactions(&mut *pool.lock(), &body[..]),
Ok(None) => warn!("Missing imported block {:?}", hash),
Err(e) => warn!("Failed to fetch block: {:?}", e),
}
}
impl Drop for Service {
fn drop(&mut self) {
self.client.stop_notifications();