From 39f78aa7c42fd06f17ec18ebfd9c37c7b4bb2811 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Sun, 15 Apr 2018 12:53:53 +0200 Subject: [PATCH] prune finalized transactions from the pool (#127) --- polkadot/service/src/lib.rs | 29 ++++++++++++++++++++++++----- 1 file changed, 24 insertions(+), 5 deletions(-) diff --git a/polkadot/service/src/lib.rs b/polkadot/service/src/lib.rs index 1000a2e597..9c0adc23d9 100644 --- a/polkadot/service/src/lib.rs +++ b/polkadot/service/src/lib.rs @@ -53,8 +53,8 @@ use futures::prelude::*; use parking_lot::Mutex; use tokio_core::reactor::Core; use codec::Slicable; -use runtime_io::with_externalities; -use primitives::block::{Id as BlockId, TransactionHash}; +use primitives::block::{Id as BlockId, Extrinsic, ExtrinsicHash, HeaderHash}; +use primitives::hashing; use transaction_pool::TransactionPool; use substrate_executor::NativeExecutor; use polkadot_executor::Executor as LocalDispatch; @@ -71,7 +71,6 @@ pub use config::{Configuration, Role, ChainSpec}; type Client = client::Client>; - /// Polkadot service. pub struct Service { thread: Option>, @@ -87,7 +86,7 @@ struct TransactionPoolAdapter { } impl network::TransactionPool for TransactionPoolAdapter { - fn transactions(&self) -> Vec<(TransactionHash, Vec)> { + fn transactions(&self) -> Vec<(ExtrinsicHash, Vec)> { let best_block = match self.client.info() { Ok(info) => info.chain.best_hash, Err(e) => { @@ -104,7 +103,7 @@ impl network::TransactionPool for TransactionPoolAdapter { }).collect() } - fn import(&self, transaction: &[u8]) -> Option { + fn import(&self, transaction: &[u8]) -> Option { if let Some(tx) = codec::Slicable::decode(&mut &transaction[..]) { match self.pool.lock().import(tx) { Ok(t) => Some(t.hash()[..].into()), @@ -299,11 +298,14 @@ impl Service { let thread_client = client.clone(); let thread_network = network.clone(); + let thread_txpool = transaction_pool.clone(); let thread = thread::spawn(move || { thread_network.start_network(); let mut core = Core::new().expect("tokio::Core could not be created"); let events = thread_client.import_notification_stream().for_each(|notification| { thread_network.on_block_imported(notification.hash, ¬ification.header); + prune_imported(&*thread_client, &*thread_txpool, notification.hash); + Ok(()) }); if let Err(e) = core.run(events) { @@ -336,6 +338,23 @@ impl Service { } } +fn prune_transactions(pool: &mut TransactionPool, extrinsics: &[Extrinsic]) { + for extrinsic in extrinsics { + let hash: _ = hashing::blake2_256(&extrinsic.encode()).into(); + pool.remove(&hash, true); + } +} + +/// Produce a task which prunes any finalized transactions from the pool. +pub fn prune_imported(client: &Client, pool: &Mutex, hash: HeaderHash) { + let id = BlockId::Hash(hash); + match client.body(&id) { + Ok(Some(body)) => prune_transactions(&mut *pool.lock(), &body[..]), + Ok(None) => warn!("Missing imported block {:?}", hash), + Err(e) => warn!("Failed to fetch block: {:?}", e), + } +} + impl Drop for Service { fn drop(&mut self) { self.client.stop_notifications();