Implement block import

This commit is contained in:
Bastian Köcher
2019-10-22 15:26:11 +02:00
parent 649f1b9a10
commit 074e9e7de6
3 changed files with 98 additions and 20 deletions
+73 -15
View File
@@ -19,7 +19,10 @@
use cumulus_runtime::ParachainBlockData;
use sr_primitives::traits::{Block as BlockT, Header as HeaderT};
use consensus_common::{Environment, Proposer};
use consensus_common::{
BlockImport, Environment, Proposer, ForkChoiceStrategy, BlockImportParams, BlockOrigin,
Error as ConsensusError,
};
use inherents::InherentDataProviders;
use substrate_primitives::Blake2Hasher;
@@ -53,44 +56,49 @@ struct HeadData<Block: BlockT> {
}
/// The implementation of the Cumulus `Collator`.
pub struct Collator<Block, PF> {
pub struct Collator<Block, PF, BI> {
proposer_factory: Arc<Mutex<PF>>,
_phantom: PhantomData<Block>,
inherent_data_providers: InherentDataProviders,
collator_network: Arc<dyn CollatorNetwork>,
block_import: Arc<Mutex<BI>>,
}
impl<Block: BlockT, PF: Environment<Block>> Collator<Block, PF> {
impl<Block, PF, BI> Collator<Block, PF, BI> {
/// Create a new instance.
fn new(
proposer_factory: PF,
inherent_data_providers: InherentDataProviders,
collator_network: Arc<dyn CollatorNetwork>,
block_import: BI,
) -> Self {
Self {
proposer_factory: Arc::new(Mutex::new(proposer_factory)),
inherent_data_providers,
_phantom: PhantomData,
collator_network,
block_import: Arc::new(Mutex::new(block_import)),
}
}
}
impl<Block, PF> Clone for Collator<Block, PF> {
impl<Block, PF, BI> Clone for Collator<Block, PF, BI> {
fn clone(&self) -> Self {
Self {
proposer_factory: self.proposer_factory.clone(),
inherent_data_providers: self.inherent_data_providers.clone(),
_phantom: PhantomData,
collator_network: self.collator_network.clone(),
block_import: self.block_import.clone(),
}
}
}
impl<Block, PF> ParachainContext for Collator<Block, PF>
impl<Block, PF, BI> ParachainContext for Collator<Block, PF, BI>
where
Block: BlockT,
PF: Environment<Block> + 'static + Send,
BI: BlockImport<Block, Error=ConsensusError> + Send + Sync + 'static,
{
type ProduceCandidate = Box<
dyn Future<Item=(BlockData, parachain::HeadData, OutgoingMessages), Error=InvalidHead>
@@ -107,9 +115,13 @@ impl<Block, PF> ParachainContext for Collator<Block, PF>
let factory = self.proposer_factory.clone();
let inherent_providers = self.inherent_data_providers.clone();
let block_import = self.block_import.clone();
let res = HeadData::<Block>::decode(&mut &status.head_data.0[..])
.map_err(|_| InvalidHead)
.map_err(|e| {
error!(target: "cumulus-collator", "Could not decode the head data: {:?}", e);
InvalidHead
})
.into_future()
.and_then(move |last_head| {
let parent_state_root = *last_head.header.state_root();
@@ -117,14 +129,21 @@ impl<Block, PF> ParachainContext for Collator<Block, PF>
factory.lock()
.init(&last_head.header)
.map_err(|e| {
//TODO: Do we want to return the real error?
error!("Could not create proposer: {:?}", e);
error!(
target: "cumulus-collator",
"Could not create proposer: {:?}",
e,
);
InvalidHead
})
.and_then(|mut proposer| {
let inherent_data = inherent_providers.create_inherent_data()
.map_err(|e| {
error!("Failed to create inherent data: {:?}", e);
error!(
target: "cumulus-collator",
"Failed to create inherent data: {:?}",
e,
);
InvalidHead
})?;
@@ -136,7 +155,11 @@ impl<Block, PF> ParachainContext for Collator<Block, PF>
true,
)
.map_err(|e| {
error!("Proposing failed: {:?}", e);
error!(
target: "cumulus-collator",
"Proposing failed: {:?}",
e,
);
InvalidHead
})
.and_then(move |(block, proof)| {
@@ -155,7 +178,10 @@ impl<Block, PF> ParachainContext for Collator<Block, PF>
)
}
None => {
error!("Proposer did not return the requested proof.");
error!(
target: "cumulus-collator",
"Proposer did not return the requested proof.",
);
Err(InvalidHead)
}
};
@@ -168,6 +194,36 @@ impl<Block, PF> ParachainContext for Collator<Block, PF>
})
})
.flatten()
.and_then(move |b| {
let block_import_params = BlockImportParams {
origin: BlockOrigin::Own,
header: b.header().clone(),
justification: None,
post_digests: vec![],
body: Some(b.extrinsics().to_vec()),
finalized: false,
auxiliary: vec![], // block-weight is written in block import.
// TODO: block-import handles fork choice and this shouldn't even have the
// option to specify one.
// https://github.com/paritytech/substrate/issues/3623
fork_choice: ForkChoiceStrategy::LongestChain,
};
if let Err(err) = block_import.lock().import_block(
block_import_params,
Default::default(),
) {
error!(
target: "cumulus-collator",
"Error importing build block (at {:?}): {:?}",
b.header().parent_hash(),
err,
);
Err(InvalidHead)
} else {
Ok(b)
}
})
.map(|b| {
let block_data = BlockData(b.encode());
let head_data = HeadData::<Block> { header: b.into_header() };
@@ -201,7 +257,7 @@ impl<Block, SP> CollatorBuilder<Block, SP> {
}
impl<Block: BlockT, SP: SetupParachain<Block>> BuildParachainContext for CollatorBuilder<Block, SP> {
type ParachainContext = Collator<Block, SP::ProposerFactory>;
type ParachainContext = Collator<Block, SP::ProposerFactory, SP::BlockImport>;
fn build<B, E>(
self,
@@ -213,11 +269,11 @@ impl<Block: BlockT, SP: SetupParachain<Block>> BuildParachainContext for Collato
B: substrate_client::backend::Backend<PBlock, Blake2Hasher> + 'static,
E: substrate_client::CallExecutor<PBlock, Blake2Hasher> + Clone + Send + Sync + 'static
{
let (proposer_factory, inherent_data_providers) = self.setup_parachain
let (proposer_factory, block_import, inherent_data_providers) = self.setup_parachain
.setup_parachain(client, task_executor)
.map_err(|e| error!("Error setting up the parachain: {}", e))?;
Ok(Collator::new(proposer_factory, inherent_data_providers, network))
Ok(Collator::new(proposer_factory, inherent_data_providers, network, block_import))
}
}
@@ -225,13 +281,15 @@ impl<Block: BlockT, SP: SetupParachain<Block>> BuildParachainContext for Collato
pub trait SetupParachain<Block: BlockT>: Send {
/// The proposer factory of the parachain to build blocks.
type ProposerFactory: Environment<Block> + Send + 'static;
/// The block import for importing the blocks build by the collator.
type BlockImport: BlockImport<Block, Error=ConsensusError> + Send + Sync + 'static;
/// Setup the parachain.
fn setup_parachain<P: cumulus_consensus::PolkadotClient>(
self,
polkadot_client: P,
task_executor: TaskExecutor,
) -> Result<(Self::ProposerFactory, InherentDataProviders), String>;
) -> Result<(Self::ProposerFactory, Self::BlockImport, InherentDataProviders), String>;
}
/// Run a collator with the given proposer factory.
+10
View File
@@ -64,4 +64,14 @@ impl<B: BlockT> ParachainBlockData<B> {
pub fn into_header(self) -> B::Header {
self.header
}
/// Returns the header.
pub fn header(&self) -> &B::Header {
&self.header
}
/// Returns the extrinsics.
pub fn extrinsics(&self) -> &[B::Extrinsic] {
&self.extrinsics
}
}
+15 -5
View File
@@ -91,32 +91,42 @@ pub fn run_collator<C: Send + Default + 'static, E: crate::cli::IntoExit + Send
};
let on_exit = service.on_exit();
let block_import = service.client();
let setup_parachain = SetupParachain { service, inherent_data_providers, proposer_factory, exit };
let setup_parachain = SetupParachain {
service,
inherent_data_providers,
proposer_factory,
exit,
block_import,
};
cumulus_collator::run_collator(setup_parachain, crate::PARA_ID, on_exit, key, version)
}
struct SetupParachain<S, PF, E> {
struct SetupParachain<S, PF, E, BI> {
service: S,
proposer_factory: PF,
exit: E,
inherent_data_providers: InherentDataProviders,
block_import: BI,
}
impl<S, PF, E> cumulus_collator::SetupParachain<Block> for SetupParachain<S, PF, E>
impl<S, PF, E, BI> cumulus_collator::SetupParachain<Block> for SetupParachain<S, PF, E, BI>
where
S: AbstractService,
E: Send + crate::cli::IntoExit,
PF: consensus_common::Environment<Block> + Send + 'static,
BI: consensus_common::BlockImport<Block, Error=consensus_common::Error> + Send + Sync + 'static,
{
type ProposerFactory = PF;
type BlockImport = BI;
fn setup_parachain<P: cumulus_consensus::PolkadotClient>(
self,
polkadot_client: P,
task_executor: polkadot_collator::TaskExecutor,
) -> Result<(Self::ProposerFactory, InherentDataProviders), String> {
) -> Result<(Self::ProposerFactory, Self::BlockImport, InherentDataProviders), String> {
let client = self.service.client();
let follow = match cumulus_consensus::follow_polkadot(crate::PARA_ID, client, polkadot_client) {
@@ -139,6 +149,6 @@ impl<S, PF, E> cumulus_collator::SetupParachain<Block> for SetupParachain<S, PF,
),
).map_err(|_| "Could not spawn parachain server!")?;
Ok((self.proposer_factory, self.inherent_data_providers))
Ok((self.proposer_factory, self.block_import, self.inherent_data_providers))
}
}