manual seal is now consensus agnostic (#7010)

* manual seal is now consensus agnostic

* pr grumbles
This commit is contained in:
Seun Lanlege
2020-09-03 13:55:12 +01:00
committed by GitHub
parent 34980ec88a
commit b0ff817ba0
10 changed files with 439 additions and 88 deletions
+6
View File
@@ -6673,13 +6673,19 @@ dependencies = [
"parking_lot 0.10.2", "parking_lot 0.10.2",
"sc-basic-authorship", "sc-basic-authorship",
"sc-client-api", "sc-client-api",
"sc-consensus-babe",
"sc-consensus-epochs",
"sc-keystore",
"sc-transaction-pool", "sc-transaction-pool",
"serde", "serde",
"sp-api",
"sp-blockchain", "sp-blockchain",
"sp-consensus", "sp-consensus",
"sp-consensus-babe",
"sp-core", "sp-core",
"sp-inherents", "sp-inherents",
"sp-runtime", "sp-runtime",
"sp-timestamp",
"sp-transaction-pool", "sp-transaction-pool",
"substrate-prometheus-endpoint", "substrate-prometheus-endpoint",
"substrate-test-runtime-client", "substrate-test-runtime-client",
@@ -51,7 +51,7 @@ fn load_decode<B, T>(backend: &B, key: &[u8]) -> ClientResult<Option<T>>
} }
/// Load or initialize persistent epoch change data from backend. /// Load or initialize persistent epoch change data from backend.
pub(crate) fn load_epoch_changes<Block: BlockT, B: AuxStore>( pub fn load_epoch_changes<Block: BlockT, B: AuxStore>(
backend: &B, backend: &B,
config: &BabeGenesisConfiguration, config: &BabeGenesisConfiguration,
) -> ClientResult<SharedEpochChanges<Block, Epoch>> { ) -> ClientResult<SharedEpochChanges<Block, Epoch>> {
+3 -2
View File
@@ -126,9 +126,10 @@ use schnorrkel::SignatureError;
use codec::{Encode, Decode}; use codec::{Encode, Decode};
use sp_api::ApiExt; use sp_api::ApiExt;
mod aux_schema;
mod verification; mod verification;
mod migration; mod migration;
pub mod aux_schema;
pub mod authorship; pub mod authorship;
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;
@@ -1051,7 +1052,7 @@ where
} }
/// Register the babe inherent data provider, if not registered already. /// Register the babe inherent data provider, if not registered already.
fn register_babe_inherent_data_provider( pub fn register_babe_inherent_data_provider(
inherent_data_providers: &InherentDataProviders, inherent_data_providers: &InherentDataProviders,
slot_duration: u64, slot_duration: u64,
) -> Result<(), sp_consensus::Error> { ) -> Result<(), sp_consensus::Error> {
@@ -22,20 +22,28 @@ parking_lot = "0.10.0"
serde = { version = "1.0", features=["derive"] } serde = { version = "1.0", features=["derive"] }
assert_matches = "1.3.0" assert_matches = "1.3.0"
sc-client-api = { path = "../../../client/api", version = "2.0.0-rc6" } sc-client-api = { path = "../../api", version = "2.0.0-rc5" }
sc-transaction-pool = { path = "../../transaction-pool", version = "2.0.0-rc6" } sc-consensus-babe = { path = "../../consensus/babe", version = "0.8.0-rc5" }
sp-blockchain = { path = "../../../primitives/blockchain", version = "2.0.0-rc6" } sc-consensus-epochs = { path = "../../consensus/epochs", version = "0.8.0-rc5" }
sp-consensus = { package = "sp-consensus", path = "../../../primitives/consensus/common", version = "0.8.0-rc6" } sp-consensus-babe = { path = "../../../primitives/consensus/babe", version = "0.8.0-rc5" }
sp-inherents = { path = "../../../primitives/inherents", version = "2.0.0-rc6" } sc-keystore = { path = "../../keystore", version = "2.0.0-rc5" }
sp-runtime = { path = "../../../primitives/runtime", version = "2.0.0-rc6" }
sp-core = { path = "../../../primitives/core", version = "2.0.0-rc6" } sc-transaction-pool = { path = "../../transaction-pool", version = "2.0.0-rc5" }
sp-transaction-pool = { path = "../../../primitives/transaction-pool", version = "2.0.0-rc6" } sp-blockchain = { path = "../../../primitives/blockchain", version = "2.0.0-rc5" }
prometheus-endpoint = { package = "substrate-prometheus-endpoint", path = "../../../utils/prometheus", version = "0.8.0-rc6" } sp-consensus = { package = "sp-consensus", path = "../../../primitives/consensus/common", version = "0.8.0-rc5" }
sp-inherents = { path = "../../../primitives/inherents", version = "2.0.0-rc5" }
sp-runtime = { path = "../../../primitives/runtime", version = "2.0.0-rc5" }
sp-core = { path = "../../../primitives/core", version = "2.0.0-rc5" }
sp-api = { path = "../../../primitives/api", version = "2.0.0-rc5" }
sp-transaction-pool = { path = "../../../primitives/transaction-pool", version = "2.0.0-rc5" }
sp-timestamp = { path = "../../../primitives/timestamp", version = "2.0.0-rc6" }
prometheus-endpoint = { package = "substrate-prometheus-endpoint", path = "../../../utils/prometheus", version = "0.8.0-rc5" }
[dev-dependencies] [dev-dependencies]
tokio = { version = "0.2", features = ["rt-core", "macros"] }
sc-basic-authorship = { path = "../../basic-authorship", version = "0.8.0-rc6" } sc-basic-authorship = { path = "../../basic-authorship", version = "0.8.0-rc6" }
substrate-test-runtime-client = { path = "../../../test-utils/runtime/client", version = "2.0.0-rc6" } substrate-test-runtime-client = { path = "../../../test-utils/runtime/client", version = "2.0.0-rc6" }
substrate-test-runtime-transaction-pool = { path = "../../../test-utils/runtime/transaction-pool", version = "2.0.0-rc6" } substrate-test-runtime-transaction-pool = { path = "../../../test-utils/runtime/transaction-pool", version = "2.0.0-rc6" }
tokio = { version = "0.2", features = ["rt-core", "macros"] }
env_logger = "0.7.0" env_logger = "0.7.0"
tempfile = "3.1.0" tempfile = "3.1.0"
@@ -0,0 +1,44 @@
// This file is part of Substrate.
// Copyright (C) 2020 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Extensions for manual seal to produce blocks valid for any runtime.
use super::Error;
use sp_runtime::traits::{Block as BlockT, DigestFor};
use sp_inherents::InherentData;
use sp_consensus::BlockImportParams;
pub mod babe;
/// Consensus data provider, manual seal uses this trait object for authoring blocks valid
/// for any runtime.
pub trait ConsensusDataProvider<B: BlockT>: Send + Sync {
/// Block import transaction type
type Transaction;
/// Attempt to create a consensus digest.
fn create_digest(&self, parent: &B::Header, inherents: &InherentData) -> Result<DigestFor<B>, Error>;
/// set up the neccessary import params.
fn append_block_import(
&self,
parent: &B::Header,
params: &mut BlockImportParams<B, Self::Transaction>,
inherents: &InherentData
) -> Result<(), Error>;
}
@@ -0,0 +1,197 @@
// This file is part of Substrate.
// Copyright (C) 2020 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! BABE consensus data provider
use super::ConsensusDataProvider;
use crate::Error;
use std::{
any::Any,
borrow::Cow,
sync::{Arc, atomic},
time::SystemTime,
};
use sc_client_api::AuxStore;
use sc_consensus_babe::{
Config, Epoch, authorship, CompatibleDigestItem, BabeIntermediate,
register_babe_inherent_data_provider, INTERMEDIATE_KEY,
};
use sc_consensus_epochs::{SharedEpochChanges, descendent_query};
use sc_keystore::KeyStorePtr;
use sp_api::{ProvideRuntimeApi, TransactionFor};
use sp_blockchain::{HeaderBackend, HeaderMetadata};
use sp_consensus::BlockImportParams;
use sp_consensus_babe::{BabeApi, inherents::BabeInherentData};
use sp_inherents::{InherentDataProviders, InherentData, ProvideInherentData, InherentIdentifier};
use sp_runtime::{
traits::{DigestItemFor, DigestFor, Block as BlockT, Header as _},
generic::Digest,
};
use sp_timestamp::{InherentType, InherentError, INHERENT_IDENTIFIER};
/// Provides BABE-compatible predigests and BlockImportParams.
/// Intended for use with BABE runtimes.
pub struct BabeConsensusDataProvider<B: BlockT, C> {
/// shared reference to keystore
keystore: KeyStorePtr,
/// Shared reference to the client.
client: Arc<C>,
/// Shared epoch changes
epoch_changes: SharedEpochChanges<B, Epoch>,
/// BABE config, gotten from the runtime.
config: Config,
}
impl<B, C> BabeConsensusDataProvider<B, C>
where
B: BlockT,
C: AuxStore + ProvideRuntimeApi<B>,
C::Api: BabeApi<B, Error = sp_blockchain::Error>,
{
pub fn new(
client: Arc<C>,
keystore: KeyStorePtr,
provider: &InherentDataProviders,
epoch_changes: SharedEpochChanges<B, Epoch>,
) -> Result<Self, Error> {
let config = Config::get_or_compute(&*client)?;
let timestamp_provider = SlotTimestampProvider::new(config.slot_duration)?;
provider.register_provider(timestamp_provider)?;
register_babe_inherent_data_provider(provider, config.slot_duration)?;
Ok(Self {
config,
client,
keystore,
epoch_changes,
})
}
}
impl<B, C> ConsensusDataProvider<B> for BabeConsensusDataProvider<B, C>
where
B: BlockT,
C: AuxStore + HeaderBackend<B> + HeaderMetadata<B, Error = sp_blockchain::Error> + ProvideRuntimeApi<B>,
C::Api: BabeApi<B, Error = sp_blockchain::Error>,
{
type Transaction = TransactionFor<C, B>;
fn create_digest(&self, parent: &B::Header, inherents: &InherentData) -> Result<DigestFor<B>, Error> {
let slot_number = inherents.babe_inherent_data()?;
let epoch_changes = self.epoch_changes.lock();
let epoch_descriptor = epoch_changes
.epoch_descriptor_for_child_of(
descendent_query(&*self.client),
&parent.hash(),
parent.number().clone(),
slot_number,
)
.map_err(|e| Error::StringError(format!("failed to fetch epoch_descriptor: {}", e)))?
.ok_or_else(|| sp_consensus::Error::InvalidAuthoritiesSet)?;
let epoch = epoch_changes
.viable_epoch(
&epoch_descriptor,
|slot| Epoch::genesis(&self.config, slot),
)
.ok_or_else(|| {
log::info!(target: "babe", "create_digest: no viable_epoch :(");
sp_consensus::Error::InvalidAuthoritiesSet
})?;
// this is a dev node environment, we should always be able to claim a slot.
let (predigest, _) = authorship::claim_slot(slot_number, epoch.as_ref(), &self.keystore)
.ok_or_else(|| Error::StringError("failed to claim slot for authorship".into()))?;
Ok(Digest {
logs: vec![
<DigestItemFor<B> as CompatibleDigestItem>::babe_pre_digest(predigest),
],
})
}
fn append_block_import(
&self,
parent: &B::Header,
params: &mut BlockImportParams<B, Self::Transaction>,
inherents: &InherentData
) -> Result<(), Error> {
let slot_number = inherents.babe_inherent_data()?;
let epoch_descriptor = self.epoch_changes.lock()
.epoch_descriptor_for_child_of(
descendent_query(&*self.client),
&parent.hash(),
parent.number().clone(),
slot_number,
)
.map_err(|e| Error::StringError(format!("failed to fetch epoch data: {}", e)))?
.ok_or_else(|| sp_consensus::Error::InvalidAuthoritiesSet)?;
params.intermediates.insert(
Cow::from(INTERMEDIATE_KEY),
Box::new(BabeIntermediate::<B> { epoch_descriptor }) as Box<dyn Any>,
);
Ok(())
}
}
/// Provide duration since unix epoch in millisecond for timestamp inherent.
/// Mocks the timestamp inherent to always produce the timestamp for the next babe slot.
struct SlotTimestampProvider {
time: atomic::AtomicU64,
slot_duration: u64
}
impl SlotTimestampProvider {
/// create a new mocked time stamp provider.
fn new(slot_duration: u64) -> Result<Self, Error> {
let now = SystemTime::now();
let duration = now.duration_since(SystemTime::UNIX_EPOCH)
.map_err(|err| Error::StringError(format!("{}", err)))?;
Ok(Self {
time: atomic::AtomicU64::new(duration.as_millis() as u64),
slot_duration,
})
}
}
impl ProvideInherentData for SlotTimestampProvider {
fn inherent_identifier(&self) -> &'static InherentIdentifier {
&INHERENT_IDENTIFIER
}
fn provide_inherent_data(&self, inherent_data: &mut InherentData) -> Result<(), sp_inherents::Error> {
// we update the time here.
let duration: InherentType = self.time.fetch_add(self.slot_duration, atomic::Ordering::SeqCst);
inherent_data.put_data(INHERENT_IDENTIFIER, &duration)?;
Ok(())
}
fn error_to_string(&self, error: &[u8]) -> Option<String> {
InherentError::try_from(&INHERENT_IDENTIFIER, error).map(|e| format!("{:?}", e))
}
}
@@ -18,6 +18,7 @@
//! A manual sealing engine: the engine listens for rpc calls to seal blocks and create forks. //! A manual sealing engine: the engine listens for rpc calls to seal blocks and create forks.
//! This is suitable for a testing environment. //! This is suitable for a testing environment.
use sp_consensus::{Error as ConsensusError, ImportResult}; use sp_consensus::{Error as ConsensusError, ImportResult};
use sp_blockchain::Error as BlockchainError; use sp_blockchain::Error as BlockchainError;
use sp_inherents::Error as InherentsError; use sp_inherents::Error as InherentsError;
+121 -41
View File
@@ -21,8 +21,9 @@
use futures::prelude::*; use futures::prelude::*;
use sp_consensus::{ use sp_consensus::{
Environment, Proposer, ForkChoiceStrategy, BlockImportParams, BlockOrigin, SelectChain, Environment, Proposer, SelectChain, BlockImport,
import_queue::{BasicQueue, CacheKeyId, Verifier, BoxBlockImport}, ForkChoiceStrategy, BlockImportParams, BlockOrigin,
import_queue::{Verifier, BasicQueue, CacheKeyId, BoxBlockImport},
}; };
use sp_blockchain::HeaderBackend; use sp_blockchain::HeaderBackend;
use sp_inherents::InherentDataProviders; use sp_inherents::InherentDataProviders;
@@ -34,17 +35,19 @@ use prometheus_endpoint::Registry;
mod error; mod error;
mod finalize_block; mod finalize_block;
mod seal_new_block; mod seal_block;
pub mod consensus;
pub mod rpc; pub mod rpc;
use self::{
finalize_block::{finalize_block, FinalizeBlockParams},
seal_new_block::{seal_new_block, SealBlockParams},
};
pub use self::{ pub use self::{
error::Error, error::Error,
consensus::ConsensusDataProvider,
finalize_block::{finalize_block, FinalizeBlockParams},
seal_block::{SealBlockParams, seal_block, MAX_PROPOSAL_DURATION},
rpc::{EngineCommand, CreatedBlock}, rpc::{EngineCommand, CreatedBlock},
}; };
use sp_api::{ProvideRuntimeApi, TransactionFor};
/// The verifier for the manual seal engine; instantly finalizes. /// The verifier for the manual seal engine; instantly finalizes.
struct ManualSealVerifier; struct ManualSealVerifier;
@@ -87,25 +90,83 @@ pub fn import_queue<Block, Transaction>(
) )
} }
/// Params required to start the instant sealing authorship task.
pub struct ManualSealParams<B: BlockT, BI, E, C: ProvideRuntimeApi<B>, A: txpool::ChainApi, SC, CS> {
/// Block import instance for well. importing blocks.
pub block_import: BI,
/// The environment we are producing blocks for.
pub env: E,
/// Client instance
pub client: Arc<C>,
/// Shared reference to the transaction pool.
pub pool: Arc<txpool::Pool<A>>,
/// Stream<Item = EngineCommands>, Basically the receiving end of a channel for sending commands to
/// the authorship task.
pub commands_stream: CS,
/// SelectChain strategy.
pub select_chain: SC,
/// Digest provider for inclusion in blocks.
pub consensus_data_provider: Option<Box<dyn ConsensusDataProvider<B, Transaction = TransactionFor<C, B>>>>,
/// Provider for inherents to include in blocks.
pub inherent_data_providers: InherentDataProviders,
}
/// Params required to start the manual sealing authorship task.
pub struct InstantSealParams<B: BlockT, BI, E, C: ProvideRuntimeApi<B>, A: txpool::ChainApi, SC> {
/// Block import instance for well. importing blocks.
pub block_import: BI,
/// The environment we are producing blocks for.
pub env: E,
/// Client instance
pub client: Arc<C>,
/// Shared reference to the transaction pool.
pub pool: Arc<txpool::Pool<A>>,
/// SelectChain strategy.
pub select_chain: SC,
/// Digest provider for inclusion in blocks.
pub consensus_data_provider: Option<Box<dyn ConsensusDataProvider<B, Transaction = TransactionFor<C, B>>>>,
/// Provider for inherents to include in blocks.
pub inherent_data_providers: InherentDataProviders,
}
/// Creates the background authorship task for the manual seal engine. /// Creates the background authorship task for the manual seal engine.
pub async fn run_manual_seal<B, CB, E, C, A, SC, S, T>( pub async fn run_manual_seal<B, BI, CB, E, C, A, SC, CS>(
mut block_import: BoxBlockImport<B, T>, ManualSealParams {
mut env: E, mut block_import,
client: Arc<C>, mut env,
pool: Arc<txpool::Pool<A>>, client,
mut commands_stream: S, pool,
select_chain: SC, mut commands_stream,
inherent_data_providers: InherentDataProviders, select_chain,
inherent_data_providers,
consensus_data_provider,
..
}: ManualSealParams<B, BI, E, C, A, SC, CS>
) )
where where
A: txpool::ChainApi<Block=B> + 'static, A: txpool::ChainApi<Block=B> + 'static,
B: BlockT + 'static, B: BlockT + 'static,
C: HeaderBackend<B> + Finalizer<B, CB> + 'static, BI: BlockImport<B, Error = sp_consensus::Error, Transaction = sp_api::TransactionFor<C, B>>
+ Send + Sync + 'static,
C: HeaderBackend<B> + Finalizer<B, CB> + ProvideRuntimeApi<B> + 'static,
CB: ClientBackend<B> + 'static, CB: ClientBackend<B> + 'static,
E: Environment<B> + 'static, E: Environment<B> + 'static,
E::Error: std::fmt::Display, E::Error: std::fmt::Display,
<E::Proposer as Proposer<B>>::Error: std::fmt::Display, <E::Proposer as Proposer<B>>::Error: std::fmt::Display,
S: Stream<Item=EngineCommand<<B as BlockT>::Hash>> + Unpin + 'static, CS: Stream<Item=EngineCommand<<B as BlockT>::Hash>> + Unpin + 'static,
SC: SelectChain<B> + 'static, SC: SelectChain<B> + 'static,
{ {
while let Some(command) = commands_stream.next().await { while let Some(command) = commands_stream.next().await {
@@ -116,7 +177,7 @@ pub async fn run_manual_seal<B, CB, E, C, A, SC, S, T>(
parent_hash, parent_hash,
sender, sender,
} => { } => {
seal_new_block( seal_block(
SealBlockParams { SealBlockParams {
sender, sender,
parent_hash, parent_hash,
@@ -126,6 +187,7 @@ pub async fn run_manual_seal<B, CB, E, C, A, SC, S, T>(
select_chain: &select_chain, select_chain: &select_chain,
block_import: &mut block_import, block_import: &mut block_import,
inherent_data_provider: &inherent_data_providers, inherent_data_provider: &inherent_data_providers,
consensus_data_provider: consensus_data_provider.as_ref().map(|p| &**p),
pool: pool.clone(), pool: pool.clone(),
client: client.clone(), client: client.clone(),
} }
@@ -149,18 +211,24 @@ pub async fn run_manual_seal<B, CB, E, C, A, SC, S, T>(
/// runs the background authorship task for the instant seal engine. /// runs the background authorship task for the instant seal engine.
/// instant-seal creates a new block for every transaction imported into /// instant-seal creates a new block for every transaction imported into
/// the transaction pool. /// the transaction pool.
pub async fn run_instant_seal<B, CB, E, C, A, SC, T>( pub async fn run_instant_seal<B, BI, CB, E, C, A, SC>(
block_import: BoxBlockImport<B, T>, InstantSealParams {
env: E, block_import,
client: Arc<C>, env,
pool: Arc<txpool::Pool<A>>, client,
select_chain: SC, pool,
inherent_data_providers: InherentDataProviders, select_chain,
consensus_data_provider,
inherent_data_providers,
..
}: InstantSealParams<B, BI, E, C, A, SC>
) )
where where
A: txpool::ChainApi<Block=B> + 'static, A: txpool::ChainApi<Block=B> + 'static,
B: BlockT + 'static, B: BlockT + 'static,
C: HeaderBackend<B> + Finalizer<B, CB> + 'static, BI: BlockImport<B, Error = sp_consensus::Error, Transaction = sp_api::TransactionFor<C, B>>
+ Send + Sync + 'static,
C: HeaderBackend<B> + Finalizer<B, CB> + ProvideRuntimeApi<B> + 'static,
CB: ClientBackend<B> + 'static, CB: ClientBackend<B> + 'static,
E: Environment<B> + 'static, E: Environment<B> + 'static,
E::Error: std::fmt::Display, E::Error: std::fmt::Display,
@@ -181,13 +249,16 @@ pub async fn run_instant_seal<B, CB, E, C, A, SC, T>(
}); });
run_manual_seal( run_manual_seal(
ManualSealParams {
block_import, block_import,
env, env,
client, client,
pool, pool,
commands_stream, commands_stream,
select_chain, select_chain,
consensus_data_provider,
inherent_data_providers, inherent_data_providers,
}
).await ).await
} }
@@ -233,7 +304,7 @@ mod tests {
// this test checks that blocks are created as soon as transactions are imported into the pool. // this test checks that blocks are created as soon as transactions are imported into the pool.
let (sender, receiver) = futures::channel::oneshot::channel(); let (sender, receiver) = futures::channel::oneshot::channel();
let mut sender = Arc::new(Some(sender)); let mut sender = Arc::new(Some(sender));
let stream = pool.pool().validated_pool().import_notification_stream() let commands_stream = pool.pool().validated_pool().import_notification_stream()
.map(move |_| { .map(move |_| {
// we're only going to submit one tx so this fn will only be called once. // we're only going to submit one tx so this fn will only be called once.
let mut_sender = Arc::get_mut(&mut sender).unwrap(); let mut_sender = Arc::get_mut(&mut sender).unwrap();
@@ -246,13 +317,16 @@ mod tests {
} }
}); });
let future = run_manual_seal( let future = run_manual_seal(
Box::new(client.clone()), ManualSealParams {
block_import: client.clone(),
env, env,
client.clone(), client: client.clone(),
pool.pool().clone(), pool: pool.pool().clone(),
stream, commands_stream,
select_chain, select_chain,
inherent_data_providers, inherent_data_providers,
consensus_data_provider: None,
}
); );
std::thread::spawn(|| { std::thread::spawn(|| {
let mut rt = tokio::runtime::Runtime::new().unwrap(); let mut rt = tokio::runtime::Runtime::new().unwrap();
@@ -299,15 +373,18 @@ mod tests {
None, None,
); );
// this test checks that blocks are created as soon as an engine command is sent over the stream. // this test checks that blocks are created as soon as an engine command is sent over the stream.
let (mut sink, stream) = futures::channel::mpsc::channel(1024); let (mut sink, commands_stream) = futures::channel::mpsc::channel(1024);
let future = run_manual_seal( let future = run_manual_seal(
Box::new(client.clone()), ManualSealParams {
block_import: client.clone(),
env, env,
client.clone(), client: client.clone(),
pool.pool().clone(), pool: pool.pool().clone(),
stream, commands_stream,
select_chain, select_chain,
consensus_data_provider: None,
inherent_data_providers, inherent_data_providers,
}
); );
std::thread::spawn(|| { std::thread::spawn(|| {
let mut rt = tokio::runtime::Runtime::new().unwrap(); let mut rt = tokio::runtime::Runtime::new().unwrap();
@@ -371,15 +448,18 @@ mod tests {
None, None,
); );
// this test checks that blocks are created as soon as an engine command is sent over the stream. // this test checks that blocks are created as soon as an engine command is sent over the stream.
let (mut sink, stream) = futures::channel::mpsc::channel(1024); let (mut sink, commands_stream) = futures::channel::mpsc::channel(1024);
let future = run_manual_seal( let future = run_manual_seal(
Box::new(client.clone()), ManualSealParams {
block_import: client.clone(),
env, env,
client.clone(), client: client.clone(),
pool.pool().clone(), pool: pool.pool().clone(),
stream, commands_stream,
select_chain, select_chain,
consensus_data_provider: None,
inherent_data_providers, inherent_data_providers,
}
); );
std::thread::spawn(|| { std::thread::spawn(|| {
let mut rt = tokio::runtime::Runtime::new().unwrap(); let mut rt = tokio::runtime::Runtime::new().unwrap();
@@ -14,7 +14,8 @@
// You should have received a copy of the GNU General Public License // You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>. // along with Substrate. If not, see <http://www.gnu.org/licenses/>.
//! RPC interface for the ManualSeal Engine. //! RPC interface for the `ManualSeal` Engine.
use sp_consensus::ImportedAux; use sp_consensus::ImportedAux;
use jsonrpc_core::Error; use jsonrpc_core::Error;
use jsonrpc_derive::rpc; use jsonrpc_derive::rpc;
@@ -16,7 +16,7 @@
//! Block sealing utilities //! Block sealing utilities
use crate::{Error, rpc}; use crate::{Error, rpc, CreatedBlock, ConsensusDataProvider};
use std::sync::Arc; use std::sync::Arc;
use sp_runtime::{ use sp_runtime::{
traits::{Block as BlockT, Header as HeaderT}, traits::{Block as BlockT, Header as HeaderT},
@@ -24,24 +24,21 @@ use sp_runtime::{
}; };
use futures::prelude::*; use futures::prelude::*;
use sc_transaction_pool::txpool; use sc_transaction_pool::txpool;
use rpc::CreatedBlock;
use sp_consensus::{ use sp_consensus::{
self, BlockImport, Environment, Proposer, self, BlockImport, Environment, Proposer, ForkChoiceStrategy,
ForkChoiceStrategy, BlockImportParams, BlockOrigin, BlockImportParams, BlockOrigin, ImportResult, SelectChain,
ImportResult, SelectChain,
import_queue::BoxBlockImport,
}; };
use sp_blockchain::HeaderBackend; use sp_blockchain::HeaderBackend;
use std::collections::HashMap; use std::collections::HashMap;
use std::time::Duration; use std::time::Duration;
use sp_inherents::InherentDataProviders; use sp_inherents::InherentDataProviders;
use sp_api::{ProvideRuntimeApi, TransactionFor};
/// max duration for creating a proposal in secs /// max duration for creating a proposal in secs
const MAX_PROPOSAL_DURATION: u64 = 10; pub const MAX_PROPOSAL_DURATION: u64 = 10;
/// params for sealing a new block /// params for sealing a new block
pub struct SealBlockParams<'a, B: BlockT, SC, HB, E, T, P: txpool::ChainApi> { pub struct SealBlockParams<'a, B: BlockT, BI, SC, C: ProvideRuntimeApi<B>, E, P: txpool::ChainApi> {
/// if true, empty blocks(without extrinsics) will be created. /// if true, empty blocks(without extrinsics) will be created.
/// otherwise, will return Error::EmptyTransactionPool. /// otherwise, will return Error::EmptyTransactionPool.
pub create_empty: bool, pub create_empty: bool,
@@ -54,19 +51,21 @@ pub struct SealBlockParams<'a, B: BlockT, SC, HB, E, T, P: txpool::ChainApi> {
/// transaction pool /// transaction pool
pub pool: Arc<txpool::Pool<P>>, pub pool: Arc<txpool::Pool<P>>,
/// header backend /// header backend
pub client: Arc<HB>, pub client: Arc<C>,
/// Environment trait object for creating a proposer /// Environment trait object for creating a proposer
pub env: &'a mut E, pub env: &'a mut E,
/// SelectChain object /// SelectChain object
pub select_chain: &'a SC, pub select_chain: &'a SC,
/// Digest provider for inclusion in blocks.
pub consensus_data_provider: Option<&'a dyn ConsensusDataProvider<B, Transaction = TransactionFor<C, B>>>,
/// block import object /// block import object
pub block_import: &'a mut BoxBlockImport<B, T>, pub block_import: &'a mut BI,
/// inherent data provider /// inherent data provider
pub inherent_data_provider: &'a InherentDataProviders, pub inherent_data_provider: &'a InherentDataProviders,
} }
/// seals a new block with the given params /// seals a new block with the given params
pub async fn seal_new_block<B, SC, HB, E, T, P>( pub async fn seal_block<B, BI, SC, C, E, P>(
SealBlockParams { SealBlockParams {
create_empty, create_empty,
finalize, finalize,
@@ -77,13 +76,16 @@ pub async fn seal_new_block<B, SC, HB, E, T, P>(
block_import, block_import,
env, env,
inherent_data_provider, inherent_data_provider,
consensus_data_provider: digest_provider,
mut sender, mut sender,
.. ..
}: SealBlockParams<'_, B, SC, HB, E, T, P> }: SealBlockParams<'_, B, BI, SC, C, E, P>
) )
where where
B: BlockT, B: BlockT,
HB: HeaderBackend<B>, BI: BlockImport<B, Error = sp_consensus::Error, Transaction = sp_api::TransactionFor<C, B>>
+ Send + Sync + 'static,
C: HeaderBackend<B> + ProvideRuntimeApi<B>,
E: Environment<B>, E: Environment<B>,
<E as Environment<B>>::Error: std::fmt::Display, <E as Environment<B>>::Error: std::fmt::Display,
<E::Proposer as Proposer<B>>::Error: std::fmt::Display, <E::Proposer as Proposer<B>>::Error: std::fmt::Display,
@@ -98,7 +100,7 @@ pub async fn seal_new_block<B, SC, HB, E, T, P>(
// get the header to build this new block on. // get the header to build this new block on.
// use the parent_hash supplied via `EngineCommand` // use the parent_hash supplied via `EngineCommand`
// or fetch the best_block. // or fetch the best_block.
let header = match parent_hash { let parent = match parent_hash {
Some(hash) => { Some(hash) => {
match client.header(BlockId::Hash(hash))? { match client.header(BlockId::Hash(hash))? {
Some(header) => header, Some(header) => header,
@@ -108,11 +110,18 @@ pub async fn seal_new_block<B, SC, HB, E, T, P>(
None => select_chain.best_chain()? None => select_chain.best_chain()?
}; };
let proposer = env.init(&header) let proposer = env.init(&parent)
.map_err(|err| Error::StringError(format!("{}", err))).await?; .map_err(|err| Error::StringError(format!("{}", err))).await?;
let id = inherent_data_provider.create_inherent_data()?; let id = inherent_data_provider.create_inherent_data()?;
let inherents_len = id.len(); let inherents_len = id.len();
let proposal = proposer.propose(id, Default::default(), Duration::from_secs(MAX_PROPOSAL_DURATION), false.into())
let digest = if let Some(digest_provider) = digest_provider {
digest_provider.create_digest(&parent, &id)?
} else {
Default::default()
};
let proposal = proposer.propose(id.clone(), digest, Duration::from_secs(MAX_PROPOSAL_DURATION), false.into())
.map_err(|err| Error::StringError(format!("{}", err))).await?; .map_err(|err| Error::StringError(format!("{}", err))).await?;
if proposal.block.extrinsics().len() == inherents_len && !create_empty { if proposal.block.extrinsics().len() == inherents_len && !create_empty {
@@ -125,6 +134,10 @@ pub async fn seal_new_block<B, SC, HB, E, T, P>(
params.finalized = finalize; params.finalized = finalize;
params.fork_choice = Some(ForkChoiceStrategy::LongestChain); params.fork_choice = Some(ForkChoiceStrategy::LongestChain);
if let Some(digest_provider) = digest_provider {
digest_provider.append_block_import(&parent, &mut params, &id)?;
}
match block_import.import_block(params, HashMap::new())? { match block_import.import_block(params, HashMap::new())? {
ImportResult::Imported(aux) => { ImportResult::Imported(aux) => {
Ok(CreatedBlock { hash: <B as BlockT>::Header::hash(&header), aux }) Ok(CreatedBlock { hash: <B as BlockT>::Header::hash(&header), aux })