Move block proposing execution to dedicated blocking threadpool (#4202)

* Move block proposing execution to dedicated blocking threadpool

* typo: move comments around

* Fix tests
This commit is contained in:
Wei Tang
2019-11-26 20:00:00 +01:00
committed by GitHub
parent c7d9c58ce8
commit b362be3ff3
3 changed files with 72 additions and 34 deletions
@@ -17,7 +17,6 @@
//! A consensus proposer for "basic" chains which use the primitive inherent-data.
// FIXME #1021 move this into substrate-consensus-common
//
use std::{time, sync::Arc};
use client_api::{error, CallExecutor};
@@ -45,10 +44,47 @@ pub struct ProposerFactory<C, A> where A: txpool::ChainApi {
pub transaction_pool: Arc<TransactionPool<A>>,
}
impl<B, E, Block, RA, A> ProposerFactory<SubstrateClient<B, E, Block, RA>, A>
where
A: txpool::ChainApi<Block=Block> + 'static,
B: client_api::backend::Backend<Block, Blake2Hasher> + Send + Sync + 'static,
E: CallExecutor<Block, Blake2Hasher> + Send + Sync + Clone + 'static,
Block: BlockT<Hash=H256>,
RA: Send + Sync + 'static,
SubstrateClient<B, E, Block, RA>: ProvideRuntimeApi,
<SubstrateClient<B, E, Block, RA> as ProvideRuntimeApi>::Api:
BlockBuilderApi<Block, Error = error::Error>,
{
pub fn init_with_now(
&mut self,
parent_header: &<Block as BlockT>::Header,
now: Box<dyn Fn() -> time::Instant + Send + Sync>,
) -> Result<Proposer<Block, SubstrateClient<B, E, Block, RA>, A>, error::Error> {
let parent_hash = parent_header.hash();
let id = BlockId::hash(parent_hash);
info!("Starting consensus session on top of parent {:?}", parent_hash);
let proposer = Proposer {
inner: Arc::new(ProposerInner {
client: self.client.clone(),
parent_hash,
parent_id: id,
parent_number: *parent_header.number(),
transaction_pool: self.transaction_pool.clone(),
now,
}),
};
Ok(proposer)
}
}
impl<B, E, Block, RA, A> consensus_common::Environment<Block> for
ProposerFactory<SubstrateClient<B, E, Block, RA>, A>
where
A: txpool::ChainApi<Block=Block>,
A: txpool::ChainApi<Block=Block> + 'static,
B: client_api::backend::Backend<Block, Blake2Hasher> + Send + Sync + 'static,
E: CallExecutor<Block, Blake2Hasher> + Send + Sync + Clone + 'static,
Block: BlockT<Hash=H256>,
@@ -64,39 +100,29 @@ where
&mut self,
parent_header: &<Block as BlockT>::Header,
) -> Result<Self::Proposer, error::Error> {
let parent_hash = parent_header.hash();
let id = BlockId::hash(parent_hash);
info!("Starting consensus session on top of parent {:?}", parent_hash);
let proposer = Proposer {
client: self.client.clone(),
parent_hash,
parent_id: id,
parent_number: *parent_header.number(),
transaction_pool: self.transaction_pool.clone(),
now: Box::new(time::Instant::now),
};
Ok(proposer)
self.init_with_now(parent_header, Box::new(time::Instant::now))
}
}
/// The proposer logic.
pub struct Proposer<Block: BlockT, C, A: txpool::ChainApi> {
inner: Arc<ProposerInner<Block, C, A>>,
}
/// Proposer inner, to wrap parameters under Arc.
struct ProposerInner<Block: BlockT, C, A: txpool::ChainApi> {
client: Arc<C>,
parent_hash: <Block as BlockT>::Hash,
parent_id: BlockId<Block>,
parent_number: <<Block as BlockT>::Header as HeaderT>::Number,
transaction_pool: Arc<TransactionPool<A>>,
now: Box<dyn Fn() -> time::Instant>,
now: Box<dyn Fn() -> time::Instant + Send + Sync>,
}
impl<B, E, Block, RA, A> consensus_common::Proposer<Block> for
Proposer<Block, SubstrateClient<B, E, Block, RA>, A>
where
A: txpool::ChainApi<Block=Block>,
A: txpool::ChainApi<Block=Block> + 'static,
B: client_api::backend::Backend<Block, Blake2Hasher> + Send + Sync + 'static,
E: CallExecutor<Block, Blake2Hasher> + Send + Sync + Clone + 'static,
Block: BlockT<Hash=H256>,
@@ -105,7 +131,7 @@ where
<SubstrateClient<B, E, Block, RA> as ProvideRuntimeApi>::Api:
BlockBuilderApi<Block, Error = error::Error>,
{
type Create = futures::future::Ready<Result<Block, error::Error>>;
type Create = tokio_executor::blocking::Blocking<Result<Block, error::Error>>;
type Error = error::Error;
fn propose(
@@ -114,14 +140,17 @@ where
inherent_digests: DigestFor<Block>,
max_duration: time::Duration,
) -> Self::Create {
// leave some time for evaluation and block finalization (33%)
let deadline = (self.now)() + max_duration - max_duration / 3;
futures::future::ready(self.propose_with(inherent_data, inherent_digests, deadline))
let inner = self.inner.clone();
tokio_executor::blocking::run(move || {
// leave some time for evaluation and block finalization (33%)
let deadline = (inner.now)() + max_duration - max_duration / 3;
inner.propose_with(inherent_data, inherent_digests, deadline)
})
}
}
impl<Block, B, E, RA, A> Proposer<Block, SubstrateClient<B, E, Block, RA>, A> where
A: txpool::ChainApi<Block=Block>,
impl<Block, B, E, RA, A> ProposerInner<Block, SubstrateClient<B, E, Block, RA>, A> where
A: txpool::ChainApi<Block=Block> + 'static,
B: client_api::backend::Backend<Block, Blake2Hasher> + Send + Sync + 'static,
E: CallExecutor<Block, Blake2Hasher> + Send + Sync + Clone + 'static,
Block: BlockT<Hash=H256>,
@@ -232,8 +261,8 @@ impl<Block, B, E, RA, A> Proposer<Block, SubstrateClient<B, E, Block, RA>, A> wh
mod tests {
use super::*;
use std::cell::RefCell;
use consensus_common::{Environment, Proposer};
use parking_lot::Mutex;
use consensus_common::Proposer;
use test_client::{self, runtime::{Extrinsic, Transfer}, AccountKeyring};
fn extrinsic(nonce: u64) -> Extrinsic {
@@ -261,16 +290,19 @@ mod tests {
transaction_pool: txpool.clone(),
};
let mut proposer = proposer_factory.init(
let cell = Mutex::new(time::Instant::now());
let mut proposer = proposer_factory.init_with_now(
&client.header(&BlockId::number(0)).unwrap().unwrap(),
Box::new(move || {
let mut value = cell.lock();
let old = *value;
let new = old + time::Duration::from_secs(2);
*value = new;
old
})
).unwrap();
// when
let cell = RefCell::new(time::Instant::now());
proposer.now = Box::new(move || {
let new = *cell.borrow() + time::Duration::from_secs(2);
cell.replace(new)
});
let deadline = time::Duration::from_secs(3);
let block = futures::executor::block_on(proposer.propose(Default::default(), Default::default(), deadline))
.unwrap();