Introduce thread pool for transaction validation. (#4051)

This commit is contained in:
Tomasz Drwięga
2019-11-08 21:34:30 +01:00
committed by Gavin Wood
parent a8ce80b72d
commit c3f6e5bd40
3 changed files with 132 additions and 6 deletions
+32 -4
View File
@@ -17,11 +17,17 @@
//! Chain api required for the transaction pool.
use std::{
sync::Arc,
marker::PhantomData,
pin::Pin,
sync::Arc,
};
use client::{runtime_api::TaggedTransactionQueue, blockchain::HeaderBackend};
use codec::Encode;
use futures::{
channel::oneshot,
executor::{ThreadPool, ThreadPoolBuilder},
future::Future,
};
use txpool;
use primitives::{
H256,
@@ -39,6 +45,7 @@ use crate::error;
/// The transaction pool logic
pub struct FullChainApi<T, Block> {
client: Arc<T>,
pool: ThreadPool,
_marker: PhantomData<Block>,
}
@@ -49,6 +56,11 @@ impl<T, Block> FullChainApi<T, Block> where
pub fn new(client: Arc<T>) -> Self {
FullChainApi {
client,
pool: ThreadPoolBuilder::new()
.pool_size(2)
.name_prefix("txpool-verifier")
.create()
.expect("Failed to spawn verifier threads, that are critical for node operation."),
_marker: Default::default()
}
}
@@ -56,20 +68,36 @@ impl<T, Block> FullChainApi<T, Block> where
impl<T, Block> txpool::ChainApi for FullChainApi<T, Block> where
Block: traits::Block<Hash=H256>,
T: traits::ProvideRuntimeApi + HeaderBackend<Block>,
T: traits::ProvideRuntimeApi + HeaderBackend<Block> + 'static,
T::Api: TaggedTransactionQueue<Block>
{
type Block = Block;
type Hash = H256;
type Error = error::Error;
type ValidationFuture = futures::future::Ready<error::Result<TransactionValidity>>;
type ValidationFuture = Pin<Box<dyn Future<Output=error::Result<TransactionValidity>> + Send>>;
fn validate_transaction(
&self,
at: &BlockId<Self::Block>,
uxt: txpool::ExtrinsicFor<Self>,
) -> Self::ValidationFuture {
futures::future::ready(self.client.runtime_api().validate_transaction(at, uxt).map_err(Into::into))
let (tx, rx) = oneshot::channel();
let client = self.client.clone();
let at = at.clone();
self.pool.spawn_ok(async move {
let res = client.runtime_api().validate_transaction(&at, uxt).map_err(Into::into);
if let Err(e) = tx.send(res) {
log::warn!("Unable to send a validate transaction result: {:?}", e);
}
});
Box::pin(async move {
match rx.await {
Ok(r) => r,
Err(e) => Err(client::error::Error::Msg(format!("{}", e)))?,
}
})
}
fn block_id_to_number(&self, at: &BlockId<Self::Block>) -> error::Result<Option<txpool::NumberFor<Self>>> {