sc-consensus-manual-seal uses Finalizer trait for finalization instead of Backend. (#5469)

* manual-seal uses Finalizer trait for finalization instead of Backend

* fix tests

* use Transaction type

* refactor import_queue

* ugh

* line-width

* Update client/consensus/manual-seal/src/lib.rs

Co-Authored-By: Joshy Orndorff <JoshOrndorff@users.noreply.github.com>

* fix tests

* update docs

* Update client/consensus/manual-seal/src/seal_new_block.rs

Co-Authored-By: André Silva <123550+andresilva@users.noreply.github.com>

* Don't auto-finalize on verification

* Explicity don't finalize on import.

Co-authored-by: Joshy Orndorff <JoshOrndorff@users.noreply.github.com>
Co-authored-by: Joshy Orndorff <admin@joshyorndorff.com>
Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com>
This commit is contained in:
Seun Lanlege
2020-04-07 19:08:54 +01:00
committed by GitHub
parent f202c58bbb
commit 6a87e1d1af
3 changed files with 80 additions and 106 deletions
@@ -23,35 +23,40 @@ use sp_runtime::{
generic::BlockId, generic::BlockId,
}; };
use std::sync::Arc; use std::sync::Arc;
use sc_client_api::backend::Backend as ClientBackend; use sc_client_api::backend::{Backend as ClientBackend, Finalizer};
use std::marker::PhantomData;
/// params for block finalization. /// params for block finalization.
pub struct FinalizeBlockParams<B: BlockT, CB> { pub struct FinalizeBlockParams<B: BlockT, F, CB> {
/// hash of the block /// hash of the block
pub hash: <B as BlockT>::Hash, pub hash: <B as BlockT>::Hash,
/// sender to report errors/success to the rpc. /// sender to report errors/success to the rpc.
pub sender: rpc::Sender<()>, pub sender: rpc::Sender<()>,
/// finalization justification /// finalization justification
pub justification: Option<Justification>, pub justification: Option<Justification>,
/// client backend /// Finalizer trait object.
pub backend: Arc<CB>, pub finalizer: Arc<F>,
/// phantom type to pin the Backend type
pub _phantom: PhantomData<CB>,
} }
/// finalizes a block in the backend with the given params. /// finalizes a block in the backend with the given params.
pub async fn finalize_block<B, CB>(params: FinalizeBlockParams<B, CB>) pub async fn finalize_block<B, F, CB>(params: FinalizeBlockParams<B, F, CB>)
where where
B: BlockT, B: BlockT,
F: Finalizer<B, CB>,
CB: ClientBackend<B>, CB: ClientBackend<B>,
{ {
let FinalizeBlockParams { let FinalizeBlockParams {
hash, hash,
mut sender, mut sender,
justification, justification,
backend: back_end, finalizer,
.. ..
} = params; } = params;
match back_end.finalize_block(BlockId::Hash(hash), justification) { match finalizer.finalize_block(BlockId::Hash(hash), justification, true) {
Err(e) => { Err(e) => {
log::warn!("Failed to finalize block {:?}", e); log::warn!("Failed to finalize block {:?}", e);
rpc::send_result(&mut sender, Err(e.into())) rpc::send_result(&mut sender, Err(e.into()))
@@ -17,66 +17,32 @@
//! A manual sealing engine: the engine listens for rpc calls to seal blocks and create forks. //! A manual sealing engine: the engine listens for rpc calls to seal blocks and create forks.
//! This is suitable for a testing environment. //! This is suitable for a testing environment.
use futures::prelude::*;
use sp_consensus::{ use sp_consensus::{
self, BlockImport, Environment, Proposer, BlockCheckParams, Environment, Proposer, ForkChoiceStrategy, BlockImportParams, BlockOrigin, SelectChain,
ForkChoiceStrategy, BlockImportParams, BlockOrigin, import_queue::{BasicQueue, CacheKeyId, Verifier, BoxBlockImport},
ImportResult, SelectChain,
import_queue::{
BasicQueue,
CacheKeyId,
Verifier,
BoxBlockImport,
},
}; };
use sp_blockchain::HeaderBackend;
use sp_inherents::InherentDataProviders; use sp_inherents::InherentDataProviders;
use sp_runtime::{traits::Block as BlockT, Justification}; use sp_runtime::{traits::Block as BlockT, Justification};
use sc_client_api::backend::Backend as ClientBackend; use sc_client_api::backend::{Backend as ClientBackend, Finalizer};
use futures::prelude::*;
use sc_transaction_pool::txpool; use sc_transaction_pool::txpool;
use std::collections::HashMap; use std::{sync::Arc, marker::PhantomData};
use std::sync::Arc;
pub mod rpc;
mod error; mod error;
mod finalize_block; mod finalize_block;
mod seal_new_block; mod seal_new_block;
use finalize_block::{finalize_block, FinalizeBlockParams}; pub mod rpc;
use seal_new_block::{seal_new_block, SealBlockParams};
pub use error::Error;
pub use rpc::{EngineCommand, CreatedBlock};
/// The synchronous block-import worker of the engine. use self::{
pub struct ManualSealBlockImport<I> { finalize_block::{finalize_block, FinalizeBlockParams},
inner: I, seal_new_block::{seal_new_block, SealBlockParams},
} };
pub use self::{
impl<I> From<I> for ManualSealBlockImport<I> { error::Error,
fn from(i: I) -> Self { rpc::{EngineCommand, CreatedBlock},
ManualSealBlockImport { inner: i } };
} use sc_client_api::{TransactionFor, Backend};
}
impl<B, I> BlockImport<B> for ManualSealBlockImport<I>
where
B: BlockT,
I: BlockImport<B, Transaction = ()>,
{
type Error = I::Error;
type Transaction = ();
fn check_block(&mut self, block: BlockCheckParams<B>) -> Result<ImportResult, Self::Error>
{
self.inner.check_block(block)
}
fn import_block(
&mut self,
block: BlockImportParams<B, Self::Transaction>,
cache: HashMap<CacheKeyId, Vec<u8>>,
) -> Result<ImportResult, Self::Error> {
self.inner.import_block(block, cache)
}
}
/// The verifier for the manual seal engine; instantly finalizes. /// The verifier for the manual seal engine; instantly finalizes.
struct ManualSealVerifier; struct ManualSealVerifier;
@@ -92,7 +58,7 @@ impl<B: BlockT> Verifier<B> for ManualSealVerifier {
let mut import_params = BlockImportParams::new(origin, header); let mut import_params = BlockImportParams::new(origin, header);
import_params.justification = justification; import_params.justification = justification;
import_params.body = body; import_params.body = body;
import_params.finalized = true; import_params.finalized = false;
import_params.fork_choice = Some(ForkChoiceStrategy::LongestChain); import_params.fork_choice = Some(ForkChoiceStrategy::LongestChain);
Ok((import_params, None)) Ok((import_params, None))
@@ -100,37 +66,43 @@ impl<B: BlockT> Verifier<B> for ManualSealVerifier {
} }
/// Instantiate the import queue for the manual seal consensus engine. /// Instantiate the import queue for the manual seal consensus engine.
pub fn import_queue<B: BlockT>(block_import: BoxBlockImport<B, ()>) -> BasicQueue<B, ()> pub fn import_queue<Block, B>(
block_import: BoxBlockImport<Block, TransactionFor<B, Block>>
) -> BasicQueue<Block, TransactionFor<B, Block>>
where
Block: BlockT,
B: Backend<Block> + 'static,
{ {
BasicQueue::new( BasicQueue::new(
ManualSealVerifier, ManualSealVerifier,
block_import, Box::new(block_import),
None, None,
None, None,
) )
} }
/// Creates the background authorship task for the manual seal engine. /// Creates the background authorship task for the manual seal engine.
pub async fn run_manual_seal<B, CB, E, A, C, S, T>( pub async fn run_manual_seal<B, CB, E, C, A, SC, S, T>(
mut block_import: BoxBlockImport<B, T>, mut block_import: BoxBlockImport<B, T>,
mut env: E, mut env: E,
backend: Arc<CB>, client: Arc<C>,
pool: Arc<txpool::Pool<A>>, pool: Arc<txpool::Pool<A>>,
mut seal_block_channel: S, mut commands_stream: S,
select_chain: C, select_chain: SC,
inherent_data_providers: InherentDataProviders, inherent_data_providers: InherentDataProviders,
) )
where where
A: txpool::ChainApi<Block=B, Hash=<B as BlockT>::Hash> + 'static,
B: BlockT + 'static, B: BlockT + 'static,
C: HeaderBackend<B> + Finalizer<B, CB> + 'static,
CB: ClientBackend<B> + 'static, CB: ClientBackend<B> + 'static,
E: Environment<B> + 'static, E: Environment<B> + 'static,
E::Error: std::fmt::Display, E::Error: std::fmt::Display,
<E::Proposer as Proposer<B>>::Error: std::fmt::Display, <E::Proposer as Proposer<B>>::Error: std::fmt::Display,
A: txpool::ChainApi<Block=B, Hash=<B as BlockT>::Hash> + 'static,
S: Stream<Item=EngineCommand<<B as BlockT>::Hash>> + Unpin + 'static, S: Stream<Item=EngineCommand<<B as BlockT>::Hash>> + Unpin + 'static,
C: SelectChain<B> + 'static, SC: SelectChain<B> + 'static,
{ {
while let Some(command) = seal_block_channel.next().await { while let Some(command) = commands_stream.next().await {
match command { match command {
EngineCommand::SealNewBlock { EngineCommand::SealNewBlock {
create_empty, create_empty,
@@ -149,7 +121,7 @@ pub async fn run_manual_seal<B, CB, E, A, C, S, T>(
block_import: &mut block_import, block_import: &mut block_import,
inherent_data_provider: &inherent_data_providers, inherent_data_provider: &inherent_data_providers,
pool: pool.clone(), pool: pool.clone(),
backend: backend.clone(), client: client.clone(),
} }
).await; ).await;
} }
@@ -159,7 +131,8 @@ pub async fn run_manual_seal<B, CB, E, A, C, S, T>(
hash, hash,
sender, sender,
justification, justification,
backend: backend.clone(), finalizer: client.clone(),
_phantom: PhantomData,
} }
).await ).await
} }
@@ -170,26 +143,28 @@ pub async fn run_manual_seal<B, CB, E, A, C, S, T>(
/// runs the background authorship task for the instant seal engine. /// runs the background authorship task for the instant seal engine.
/// instant-seal creates a new block for every transaction imported into /// instant-seal creates a new block for every transaction imported into
/// the transaction pool. /// the transaction pool.
pub async fn run_instant_seal<B, CB, E, A, C, T>( pub async fn run_instant_seal<B, CB, E, C, A, SC, T>(
block_import: BoxBlockImport<B, T>, block_import: BoxBlockImport<B, T>,
env: E, env: E,
backend: Arc<CB>, client: Arc<C>,
pool: Arc<txpool::Pool<A>>, pool: Arc<txpool::Pool<A>>,
select_chain: C, select_chain: SC,
inherent_data_providers: InherentDataProviders, inherent_data_providers: InherentDataProviders,
) )
where where
A: txpool::ChainApi<Block=B, Hash=<B as BlockT>::Hash> + 'static, A: txpool::ChainApi<Block=B, Hash=<B as BlockT>::Hash> + 'static,
B: BlockT + 'static, B: BlockT + 'static,
C: HeaderBackend<B> + Finalizer<B, CB> + 'static,
CB: ClientBackend<B> + 'static, CB: ClientBackend<B> + 'static,
E: Environment<B> + 'static, E: Environment<B> + 'static,
E::Error: std::fmt::Display, E::Error: std::fmt::Display,
<E::Proposer as Proposer<B>>::Error: std::fmt::Display, <E::Proposer as Proposer<B>>::Error: std::fmt::Display,
C: SelectChain<B> + 'static SC: SelectChain<B> + 'static
{ {
// instant-seal creates blocks as soon as transactions are imported // instant-seal creates blocks as soon as transactions are imported
// into the transaction pool. // into the transaction pool.
let seal_block_channel = pool.validated_pool().import_notification_stream() let commands_stream = pool.validated_pool()
.import_notification_stream()
.map(|_| { .map(|_| {
EngineCommand::SealNewBlock { EngineCommand::SealNewBlock {
create_empty: false, create_empty: false,
@@ -202,9 +177,9 @@ pub async fn run_instant_seal<B, CB, E, A, C, T>(
run_manual_seal( run_manual_seal(
block_import, block_import,
env, env,
backend, client,
pool, pool,
seal_block_channel, commands_stream,
select_chain, select_chain,
inherent_data_providers, inherent_data_providers,
).await ).await
@@ -226,9 +201,7 @@ mod tests {
use substrate_test_runtime_transaction_pool::{TestApi, uxt}; use substrate_test_runtime_transaction_pool::{TestApi, uxt};
use sp_transaction_pool::{TransactionPool, MaintainedTransactionPool, TransactionSource}; use sp_transaction_pool::{TransactionPool, MaintainedTransactionPool, TransactionSource};
use sp_runtime::generic::BlockId; use sp_runtime::generic::BlockId;
use sp_blockchain::HeaderBackend;
use sp_consensus::ImportedAux; use sp_consensus::ImportedAux;
use sc_client::LongestChain;
use sp_inherents::InherentDataProviders; use sp_inherents::InherentDataProviders;
use sc_basic_authorship::ProposerFactory; use sc_basic_authorship::ProposerFactory;
@@ -241,9 +214,8 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn instant_seal() { async fn instant_seal() {
let builder = TestClientBuilder::new(); let builder = TestClientBuilder::new();
let backend = builder.backend(); let (client, select_chain) = builder.build_with_longest_chain();
let client = Arc::new(builder.build()); let client = Arc::new(client);
let select_chain = LongestChain::new(backend.clone());
let inherent_data_providers = InherentDataProviders::new(); let inherent_data_providers = InherentDataProviders::new();
let pool = Arc::new(BasicPool::new(Options::default(), api()).0); let pool = Arc::new(BasicPool::new(Options::default(), api()).0);
let env = ProposerFactory::new( let env = ProposerFactory::new(
@@ -268,7 +240,7 @@ mod tests {
let future = run_manual_seal( let future = run_manual_seal(
Box::new(client.clone()), Box::new(client.clone()),
env, env,
backend.clone(), client.clone(),
pool.pool().clone(), pool.pool().clone(),
stream, stream,
select_chain, select_chain,
@@ -300,15 +272,14 @@ mod tests {
} }
); );
// assert that there's a new block in the db. // assert that there's a new block in the db.
assert!(backend.blockchain().header(BlockId::Number(1)).unwrap().is_some()) assert!(client.header(&BlockId::Number(1)).unwrap().is_some())
} }
#[tokio::test] #[tokio::test]
async fn manual_seal_and_finalization() { async fn manual_seal_and_finalization() {
let builder = TestClientBuilder::new(); let builder = TestClientBuilder::new();
let backend = builder.backend(); let (client, select_chain) = builder.build_with_longest_chain();
let client = Arc::new(builder.build()); let client = Arc::new(client);
let select_chain = LongestChain::new(backend.clone());
let inherent_data_providers = InherentDataProviders::new(); let inherent_data_providers = InherentDataProviders::new();
let pool = Arc::new(BasicPool::new(Options::default(), api()).0); let pool = Arc::new(BasicPool::new(Options::default(), api()).0);
let env = ProposerFactory::new( let env = ProposerFactory::new(
@@ -320,7 +291,7 @@ mod tests {
let future = run_manual_seal( let future = run_manual_seal(
Box::new(client.clone()), Box::new(client.clone()),
env, env,
backend.clone(), client.clone(),
pool.pool().clone(), pool.pool().clone(),
stream, stream,
select_chain, select_chain,
@@ -360,7 +331,7 @@ mod tests {
} }
); );
// assert that there's a new block in the db. // assert that there's a new block in the db.
let header = backend.blockchain().header(BlockId::Number(1)).unwrap().unwrap(); let header = client.header(&BlockId::Number(1)).unwrap().unwrap();
let (tx, rx) = futures::channel::oneshot::channel(); let (tx, rx) = futures::channel::oneshot::channel();
sink.send(EngineCommand::FinalizeBlock { sink.send(EngineCommand::FinalizeBlock {
sender: Some(tx), sender: Some(tx),
@@ -374,9 +345,8 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn manual_seal_fork_blocks() { async fn manual_seal_fork_blocks() {
let builder = TestClientBuilder::new(); let builder = TestClientBuilder::new();
let backend = builder.backend(); let (client, select_chain) = builder.build_with_longest_chain();
let client = Arc::new(builder.build()); let client = Arc::new(client);
let select_chain = LongestChain::new(backend.clone());
let inherent_data_providers = InherentDataProviders::new(); let inherent_data_providers = InherentDataProviders::new();
let pool_api = api(); let pool_api = api();
let pool = Arc::new(BasicPool::new(Options::default(), pool_api.clone()).0); let pool = Arc::new(BasicPool::new(Options::default(), pool_api.clone()).0);
@@ -389,7 +359,7 @@ mod tests {
let future = run_manual_seal( let future = run_manual_seal(
Box::new(client.clone()), Box::new(client.clone()),
env, env,
backend.clone(), client.clone(),
pool.pool().clone(), pool.pool().clone(),
stream, stream,
select_chain, select_chain,
@@ -431,12 +401,12 @@ mod tests {
} }
); );
// assert that there's a new block in the db. // assert that there's a new block in the db.
assert!(backend.blockchain().header(BlockId::Number(0)).unwrap().is_some()); assert!(client.header(&BlockId::Number(0)).unwrap().is_some());
assert!(pool.submit_one(&BlockId::Number(1), SOURCE, uxt(Alice, 1)).await.is_ok()); assert!(pool.submit_one(&BlockId::Number(1), SOURCE, uxt(Alice, 1)).await.is_ok());
pool.maintain(sp_transaction_pool::ChainEvent::NewBlock { pool.maintain(sp_transaction_pool::ChainEvent::NewBlock {
id: BlockId::Number(1), id: BlockId::Number(1),
header: backend.blockchain().header(BlockId::Number(1)).expect("db error").expect("imported above"), header: client.header(&BlockId::Number(1)).expect("db error").expect("imported above"),
is_new_best: true, is_new_best: true,
retracted: vec![], retracted: vec![],
}).await; }).await;
@@ -452,7 +422,7 @@ mod tests {
rx1.await.expect("should be no error receiving"), rx1.await.expect("should be no error receiving"),
Ok(_) Ok(_)
); );
assert!(backend.blockchain().header(BlockId::Number(1)).unwrap().is_some()); assert!(client.header(&BlockId::Number(1)).unwrap().is_some());
pool_api.increment_nonce(Alice.into()); pool_api.increment_nonce(Alice.into());
assert!(pool.submit_one(&BlockId::Number(2), SOURCE, uxt(Alice, 2)).await.is_ok()); assert!(pool.submit_one(&BlockId::Number(2), SOURCE, uxt(Alice, 2)).await.is_ok());
@@ -465,6 +435,6 @@ mod tests {
}).await.is_ok()); }).await.is_ok());
let imported = rx2.await.unwrap().unwrap(); let imported = rx2.await.unwrap().unwrap();
// assert that fork block is in the db // assert that fork block is in the db
assert!(backend.blockchain().header(BlockId::Hash(imported.hash)).unwrap().is_some()) assert!(client.header(&BlockId::Hash(imported.hash)).unwrap().is_some())
} }
} }
@@ -33,7 +33,6 @@ use sp_consensus::{
import_queue::BoxBlockImport, import_queue::BoxBlockImport,
}; };
use sp_blockchain::HeaderBackend; use sp_blockchain::HeaderBackend;
use sc_client_api::backend::Backend as ClientBackend;
use std::collections::HashMap; use std::collections::HashMap;
use std::time::Duration; use std::time::Duration;
use sp_inherents::InherentDataProviders; use sp_inherents::InherentDataProviders;
@@ -42,7 +41,7 @@ use sp_inherents::InherentDataProviders;
const MAX_PROPOSAL_DURATION: u64 = 10; const MAX_PROPOSAL_DURATION: u64 = 10;
/// params for sealing a new block /// params for sealing a new block
pub struct SealBlockParams<'a, B: BlockT, C, CB, E, T, P: txpool::ChainApi> { pub struct SealBlockParams<'a, B: BlockT, SC, HB, E, T, P: txpool::ChainApi> {
/// if true, empty blocks(without extrinsics) will be created. /// if true, empty blocks(without extrinsics) will be created.
/// otherwise, will return Error::EmptyTransactionPool. /// otherwise, will return Error::EmptyTransactionPool.
pub create_empty: bool, pub create_empty: bool,
@@ -54,12 +53,12 @@ pub struct SealBlockParams<'a, B: BlockT, C, CB, E, T, P: txpool::ChainApi> {
pub sender: rpc::Sender<CreatedBlock<<B as BlockT>::Hash>>, pub sender: rpc::Sender<CreatedBlock<<B as BlockT>::Hash>>,
/// transaction pool /// transaction pool
pub pool: Arc<txpool::Pool<P>>, pub pool: Arc<txpool::Pool<P>>,
/// client backend /// header backend
pub backend: Arc<CB>, pub client: Arc<HB>,
/// Environment trait object for creating a proposer /// Environment trait object for creating a proposer
pub env: &'a mut E, pub env: &'a mut E,
/// SelectChain object /// SelectChain object
pub select_chain: &'a C, pub select_chain: &'a SC,
/// block import object /// block import object
pub block_import: &'a mut BoxBlockImport<B, T>, pub block_import: &'a mut BoxBlockImport<B, T>,
/// inherent data provider /// inherent data provider
@@ -67,24 +66,24 @@ pub struct SealBlockParams<'a, B: BlockT, C, CB, E, T, P: txpool::ChainApi> {
} }
/// seals a new block with the given params /// seals a new block with the given params
pub async fn seal_new_block<B, SC, CB, E, T, P>( pub async fn seal_new_block<B, SC, HB, E, T, P>(
SealBlockParams { SealBlockParams {
create_empty, create_empty,
finalize, finalize,
pool, pool,
parent_hash, parent_hash,
backend: back_end, client,
select_chain, select_chain,
block_import, block_import,
env, env,
inherent_data_provider, inherent_data_provider,
mut sender, mut sender,
.. ..
}: SealBlockParams<'_, B, SC, CB, E, T, P> }: SealBlockParams<'_, B, SC, HB, E, T, P>
) )
where where
B: BlockT, B: BlockT,
CB: ClientBackend<B>, HB: HeaderBackend<B>,
E: Environment<B>, E: Environment<B>,
<E as Environment<B>>::Error: std::fmt::Display, <E as Environment<B>>::Error: std::fmt::Display,
<E::Proposer as Proposer<B>>::Error: std::fmt::Display, <E::Proposer as Proposer<B>>::Error: std::fmt::Display,
@@ -101,7 +100,7 @@ pub async fn seal_new_block<B, SC, CB, E, T, P>(
// or fetch the best_block. // or fetch the best_block.
let header = match parent_hash { let header = match parent_hash {
Some(hash) => { Some(hash) => {
match back_end.blockchain().header(BlockId::Hash(hash))? { match client.header(BlockId::Hash(hash))? {
Some(header) => header, Some(header) => header,
None => return Err(Error::BlockNotFound(format!("{}", hash))), None => return Err(Error::BlockNotFound(format!("{}", hash))),
} }