// Copyright 2019 Parity Technologies (UK) Ltd. // This file is part of Cumulus. // Substrate is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Substrate is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Cumulus. If not, see . //! Cumulus Collator implementation for Substrate. use cumulus_runtime::ParachainBlockData; use sp_consensus::{ BlockImport, BlockImportParams, BlockOrigin, Environment, Error as ConsensusError, ForkChoiceStrategy, Proposal, Proposer, RecordProof, }; use sp_inherents::InherentDataProviders; use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; use polkadot_collator::{ BuildParachainContext, InvalidHead, Network as CollatorNetwork, ParachainContext, PolkadotClient, }; use polkadot_primitives::{ parachain::{ self, BlockData, CollatorPair, Id as ParaId, Message, OutgoingMessages, Status as ParachainStatus, }, Block as PBlock, Hash as PHash, }; use codec::{Decode, Encode}; use log::{error, trace}; use futures::{task::Spawn, Future, future}; use std::{fmt::Debug, marker::PhantomData, sync::Arc, time::Duration, pin::Pin}; use parking_lot::Mutex; /// The head data of the parachain, stored in the relay chain. #[derive(Decode, Encode, Debug)] struct HeadData { header: Block::Header, } /// The implementation of the Cumulus `Collator`. pub struct Collator { proposer_factory: Arc>, _phantom: PhantomData, inherent_data_providers: InherentDataProviders, collator_network: Arc, block_import: Arc>, } impl Collator { /// Create a new instance. fn new( proposer_factory: PF, inherent_data_providers: InherentDataProviders, collator_network: Arc, block_import: BI, ) -> Self { Self { proposer_factory: Arc::new(Mutex::new(proposer_factory)), inherent_data_providers, _phantom: PhantomData, collator_network, block_import: Arc::new(Mutex::new(block_import)), } } } impl Clone for Collator { fn clone(&self) -> Self { Self { proposer_factory: self.proposer_factory.clone(), inherent_data_providers: self.inherent_data_providers.clone(), _phantom: PhantomData, collator_network: self.collator_network.clone(), block_import: self.block_import.clone(), } } } impl ParachainContext for Collator where Block: BlockT, PF: Environment + 'static + Send, PF::Proposer: Send, BI: BlockImport< Block, Error = ConsensusError, Transaction = >::Transaction, > + Send + Sync + 'static, { type ProduceCandidate = Pin> + Send, >>; fn produce_candidate>( &mut self, _relay_chain_parent: PHash, status: ParachainStatus, _: I, ) -> Self::ProduceCandidate { let factory = self.proposer_factory.clone(); let inherent_providers = self.inherent_data_providers.clone(); let block_import = self.block_import.clone(); trace!(target: "cumulus-collator", "Producing candidate"); let last_head = match HeadData::::decode(&mut &status.head_data.0[..]) { Ok(x) => x, Err(e) => { error!(target: "cumulus-collator", "Could not decode the head data: {:?}", e); return Box::pin(future::ready(Err(InvalidHead))); } }; let proposer_future = factory .lock() .init(&last_head.header); Box::pin(async move { let parent_state_root = *last_head.header.state_root(); let mut proposer = proposer_future .await .map_err(|e| { error!( target: "cumulus-collator", "Could not create proposer: {:?}", e, ); InvalidHead })?; let inherent_data = inherent_providers .create_inherent_data() .map_err(|e| { error!( target: "cumulus-collator", "Failed to create inherent data: {:?}", e, ); InvalidHead })?; let Proposal { block, storage_changes, proof, } = proposer .propose( inherent_data, Default::default(), //TODO: Fix this. Duration::from_secs(6), RecordProof::Yes, ) .await .map_err(|e| { error!( target: "cumulus-collator", "Proposing failed: {:?}", e, ); InvalidHead })?; let proof = proof .ok_or_else(|| { error!( target: "cumulus-collator", "Proposer did not return the requested proof.", ); InvalidHead })?; let (header, extrinsics) = block.deconstruct(); // Create the parachain block data for the validators. let b = ParachainBlockData::::new( header, extrinsics, proof.iter_nodes().collect(), parent_state_root, ); let block_import_params = BlockImportParams { origin: BlockOrigin::Own, header: b.header().clone(), justification: None, post_digests: vec![], body: Some(b.extrinsics().to_vec()), finalized: false, auxiliary: vec![], // block-weight is written in block import. // TODO: block-import handles fork choice and this shouldn't even have the // option to specify one. // https://github.com/paritytech/substrate/issues/3623 fork_choice: ForkChoiceStrategy::LongestChain, allow_missing_state: false, import_existing: false, storage_changes: Some(storage_changes), }; if let Err(err) = block_import .lock() .import_block(block_import_params, Default::default()) { error!( target: "cumulus-collator", "Error importing build block (at {:?}): {:?}", b.header().parent_hash(), err, ); return Err(InvalidHead); } let block_data = BlockData(b.encode()); let head_data = HeadData:: { header: b.into_header(), }; let messages = OutgoingMessages { outgoing_messages: Vec::new(), }; let candidate = ( block_data, parachain::HeadData(head_data.encode()), messages, ); trace!(target: "cumulus-collator", "Produced candidate: {:?}", candidate); Ok(candidate) }) } } /// Implements `BuildParachainContext` to build a collator instance. struct CollatorBuilder { setup_parachain: SP, _marker: PhantomData, } impl CollatorBuilder { /// Create a new instance of self. fn new(setup_parachain: SP) -> Self { Self { setup_parachain, _marker: PhantomData, } } } impl> BuildParachainContext for CollatorBuilder where >::Proposer: Send, { type ParachainContext = Collator; fn build( self, client: Arc>, spawner: Spawner, network: Arc, ) -> Result where PolkadotClient: sp_api::ProvideRuntimeApi, as sp_api::ProvideRuntimeApi>::Api: polkadot_service::RuntimeApiCollection, E: sc_client::CallExecutor + Clone + Send + Sync + 'static, Spawner: Spawn + Clone + Send + Sync + 'static, Extrinsic: codec::Codec + Send + Sync + 'static, < as sp_api::ProvideRuntimeApi>::Api as sp_api::ApiExt< PBlock, >>::StateBackend: sp_api::StateBackend, R: Send + Sync + 'static, B: sc_client_api::Backend + 'static, // Rust bug: https://github.com/rust-lang/rust/issues/24159 B::State: sp_api::StateBackend, { let (proposer_factory, block_import, inherent_data_providers) = self .setup_parachain .setup_parachain(client, spawner) .map_err(|e| error!("Error setting up the parachain: {}", e))?; Ok(Collator::new( proposer_factory, inherent_data_providers, network, block_import, )) } } /// Something that can setup a parachain. pub trait SetupParachain: Send { /// The proposer factory of the parachain to build blocks. type ProposerFactory: Environment + Send + 'static; /// The block import for importing the blocks build by the collator. type BlockImport: BlockImport< Block, Error = ConsensusError, Transaction = <>::Proposer as Proposer< Block, >>::Transaction, > + Send + Sync + 'static; /// Setup the parachain. fn setup_parachain( self, polkadot_client: P, spawner: SP, ) -> Result< ( Self::ProposerFactory, Self::BlockImport, InherentDataProviders, ), String, > where P: cumulus_consensus::PolkadotClient, SP: Spawn + Clone + Send + Sync + 'static; } /// Run a collator with the given proposer factory. pub fn run_collator( setup_parachain: SP, para_id: ParaId, exit: E, key: Arc, configuration: polkadot_collator::Configuration, ) -> Result<(), sc_cli::error::Error> where Block: BlockT, SP: SetupParachain + Send + 'static, <>::ProposerFactory as Environment>::Proposer: Send, E: Future + Unpin + Send + Clone + Sync + 'static, { let builder = CollatorBuilder::new(setup_parachain); polkadot_collator::run_collator(builder, para_id, exit, key, configuration) } #[cfg(test)] mod tests { use super::*; use std::time::Duration; use polkadot_collator::{collate, CollatorId, PeerId, RelayChainContext, SignedStatement}; use polkadot_primitives::parachain::{ConsolidatedIngress, FeeSchedule, HeadData}; use sp_blockchain::Result as ClientResult; use sp_inherents::InherentData; use sp_keyring::Sr25519Keyring; use sp_runtime::{ generic::BlockId, traits::{DigestFor, Header as HeaderT}, }; use sp_state_machine::StorageProof; use substrate_test_client::{NativeExecutor, WasmExecutionMethod::Interpreted}; use test_client::{ Client, DefaultTestClientBuilderExt, TestClientBuilder, TestClientBuilderExt, }; use test_runtime::{Block, Header}; use futures::{Stream, future}; #[derive(Debug)] struct Error; impl From for Error { fn from(_: sp_consensus::Error) -> Self { unimplemented!("Not required in tests") } } struct DummyFactory; impl Environment for DummyFactory { type Proposer = DummyProposer; type Error = Error; type CreateProposer = Pin> + Send + Unpin + 'static >>; fn init(&mut self, _: &Header) -> Self::CreateProposer { Box::pin(future::ready(Ok(DummyProposer))) } } struct DummyProposer; impl Proposer for DummyProposer { type Error = Error; type Proposal = future::Ready, Error>>; type Transaction = sc_client_api::TransactionFor; fn propose( &mut self, _: InherentData, digest: DigestFor, _: Duration, _: RecordProof, ) -> Self::Proposal { let header = Header::new( 1337, Default::default(), Default::default(), Default::default(), digest, ); future::ready(Ok(Proposal { block: Block::new(header, Vec::new()), storage_changes: Default::default(), proof: Some(StorageProof::empty()), })) } } struct DummyCollatorNetwork; impl CollatorNetwork for DummyCollatorNetwork { fn collator_id_to_peer_id( &self, _: CollatorId, ) -> Box> + Send> { unimplemented!("Not required in tests") } fn checked_statements(&self, _: PHash) -> Box> { unimplemented!("Not required in tests") } } struct DummyRelayChainContext; impl RelayChainContext for DummyRelayChainContext { type Error = Error; type FutureEgress = future::Ready>; fn unrouted_egress(&self, _id: ParaId) -> Self::FutureEgress { future::ready(Ok(ConsolidatedIngress(Vec::new()))) } } #[derive(Clone)] struct DummyPolkadotClient; impl cumulus_consensus::PolkadotClient for DummyPolkadotClient { type Error = Error; type Finalized = Box> + Send + Unpin>; fn finalized_heads(&self, _: ParaId) -> ClientResult { unimplemented!("Not required in tests") } fn parachain_head_at( &self, _: &BlockId, _: ParaId, ) -> ClientResult>> { unimplemented!("Not required in tests") } } struct DummySetup; impl SetupParachain for DummySetup { type ProposerFactory = DummyFactory; type BlockImport = Client; fn setup_parachain( self, _: P, _: SP, ) -> Result< ( Self::ProposerFactory, Self::BlockImport, InherentDataProviders, ), String, > { Ok(( DummyFactory, TestClientBuilder::new().build(), InherentDataProviders::default(), )) } } #[test] fn collates_produces_a_block() { let _ = env_logger::try_init(); let spawner = futures::executor::ThreadPool::new().unwrap(); let builder = CollatorBuilder::new(DummySetup); let context = builder .build::<_, _, polkadot_service::polkadot_runtime::RuntimeApi, _, _>( Arc::new( substrate_test_client::TestClientBuilder::<_, _, ()>::default() .build_with_native_executor(Some(NativeExecutor::< polkadot_service::PolkadotExecutor, >::new(Interpreted, None))) .0, ), spawner, Arc::new(DummyCollatorNetwork), ) .expect("Creates parachain context"); let id = ParaId::from(100); let header = Header::new( 0, Default::default(), Default::default(), Default::default(), Default::default(), ); let collation = collate( Default::default(), id, ParachainStatus { head_data: HeadData(header.encode()), balance: 10, fee_schedule: FeeSchedule { base: 0, per_byte: 1, }, }, DummyRelayChainContext, context, Arc::new(Sr25519Keyring::Alice.pair().into()), ); let collation = futures::executor::block_on(collation).unwrap().0; let block_data = collation.pov.block_data; let block = Block::decode(&mut &block_data.0[..]).expect("Is a valid block"); assert_eq!(1337, *block.header().number()); } }