// Copyright 2019-2021 Parity Technologies (UK) Ltd. // This file is part of Cumulus. // Substrate is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Substrate is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Cumulus. If not, see . //! Cumulus Collator implementation for Substrate. use cumulus_client_network::WaitToAnnounce; use cumulus_primitives_core::{CollectCollationInfo, ParachainBlockData, PersistedValidationData}; use sc_client_api::BlockBackend; use sp_api::ProvideRuntimeApi; use sp_consensus::BlockStatus; use sp_core::traits::SpawnNamed; use sp_runtime::{ generic::BlockId, traits::{Block as BlockT, Header as HeaderT, Zero}, }; use cumulus_client_consensus_common::ParachainConsensus; use polkadot_node_primitives::{ BlockData, Collation, CollationGenerationConfig, CollationResult, PoV, }; use polkadot_node_subsystem::messages::{CollationGenerationMessage, CollatorProtocolMessage}; use polkadot_overseer::OverseerHandler; use polkadot_primitives::v1::{CollatorPair, Hash as PHash, HeadData, Id as ParaId}; use codec::{Decode, Encode}; use futures::{channel::oneshot, FutureExt}; use parking_lot::Mutex; use std::sync::Arc; use tracing::Instrument; /// The logging target. const LOG_TARGET: &str = "cumulus-collator"; /// The implementation of the Cumulus `Collator`. pub struct Collator { block_status: Arc, parachain_consensus: Box>, wait_to_announce: Arc>>, runtime_api: Arc, } impl Clone for Collator { fn clone(&self) -> Self { Self { block_status: self.block_status.clone(), wait_to_announce: self.wait_to_announce.clone(), parachain_consensus: self.parachain_consensus.clone(), runtime_api: self.runtime_api.clone(), } } } impl Collator where Block: BlockT, BS: BlockBackend, RA: ProvideRuntimeApi, RA::Api: CollectCollationInfo, { /// Create a new instance. fn new( block_status: Arc, spawner: Arc, announce_block: Arc>) + Send + Sync>, runtime_api: Arc, parachain_consensus: Box>, ) -> Self { let wait_to_announce = Arc::new(Mutex::new(WaitToAnnounce::new(spawner, announce_block))); Self { block_status, wait_to_announce, runtime_api, parachain_consensus, } } /// Checks the status of the given block hash in the Parachain. /// /// Returns `true` if the block could be found and is good to be build on. fn check_block_status(&self, hash: Block::Hash, header: &Block::Header) -> bool { match self.block_status.block_status(&BlockId::Hash(hash)) { Ok(BlockStatus::Queued) => { tracing::debug!( target: LOG_TARGET, block_hash = ?hash, "Skipping candidate production, because block is still queued for import.", ); false } Ok(BlockStatus::InChainWithState) => true, Ok(BlockStatus::InChainPruned) => { tracing::error!( target: LOG_TARGET, "Skipping candidate production, because block `{:?}` is already pruned!", hash, ); false } Ok(BlockStatus::KnownBad) => { tracing::error!( target: LOG_TARGET, block_hash = ?hash, "Block is tagged as known bad and is included in the relay chain! Skipping candidate production!", ); false } Ok(BlockStatus::Unknown) => { if header.number().is_zero() { tracing::error!( target: LOG_TARGET, block_hash = ?hash, "Could not find the header of the genesis block in the database!", ); } else { tracing::debug!( target: LOG_TARGET, block_hash = ?hash, "Skipping candidate production, because block is unknown.", ); } false } Err(e) => { tracing::error!( target: LOG_TARGET, block_hash = ?hash, error = ?e, "Failed to get block status.", ); false } } } fn build_collation( &mut self, block: ParachainBlockData, block_hash: Block::Hash, ) -> Option { let block_data = BlockData(block.encode()); let header = block.into_header(); let head_data = HeadData(header.encode()); let collation_info = match self .runtime_api .runtime_api() .collect_collation_info(&BlockId::Hash(block_hash)) { Ok(ci) => ci, Err(e) => { tracing::error!( target: LOG_TARGET, error = ?e, "Failed to collect collation info.", ); return None; } }; Some(Collation { upward_messages: collation_info.upward_messages, new_validation_code: collation_info.new_validation_code, processed_downward_messages: collation_info.processed_downward_messages, horizontal_messages: collation_info.horizontal_messages, hrmp_watermark: collation_info.hrmp_watermark, head_data, proof_of_validity: PoV { block_data }, }) } async fn produce_candidate( mut self, relay_parent: PHash, validation_data: PersistedValidationData, ) -> Option { tracing::trace!( target: LOG_TARGET, relay_parent = ?relay_parent, "Producing candidate", ); let last_head = match Block::Header::decode(&mut &validation_data.parent_head.0[..]) { Ok(x) => x, Err(e) => { tracing::error!( target: LOG_TARGET, error = ?e, "Could not decode the head data." ); return None; } }; let last_head_hash = last_head.hash(); if !self.check_block_status(last_head_hash, &last_head) { return None; } tracing::info!( target: LOG_TARGET, relay_parent = ?relay_parent, at = ?last_head_hash, "Starting collation.", ); let candidate = self .parachain_consensus .produce_candidate(&last_head, relay_parent, &validation_data) .await?; let (header, extrinsics) = candidate.block.deconstruct(); // Create the parachain block data for the validators. let b = ParachainBlockData::::new(header, extrinsics, candidate.proof); tracing::debug!( target: LOG_TARGET, "PoV size {{ header: {}kb, extrinsics: {}kb, storage_proof: {}kb }}", b.header().encode().len() as f64 / 1024f64, b.extrinsics().encode().len() as f64 / 1024f64, b.storage_proof().encode().len() as f64 / 1024f64, ); let block_hash = b.header().hash(); let collation = self.build_collation(b, block_hash)?; let (result_sender, signed_stmt_recv) = oneshot::channel(); self.wait_to_announce .lock() .wait_to_announce(block_hash, signed_stmt_recv); tracing::info!( target: LOG_TARGET, ?block_hash, "Produced proof-of-validity candidate.", ); Some(CollationResult { collation, result_sender: Some(result_sender), }) } } /// Parameters for [`start_collator`]. pub struct StartCollatorParams { pub para_id: ParaId, pub runtime_api: Arc, pub block_status: Arc, pub announce_block: Arc>) + Send + Sync>, pub overseer_handler: OverseerHandler, pub spawner: Spawner, pub key: CollatorPair, pub parachain_consensus: Box>, } /// Start the collator. pub async fn start_collator( StartCollatorParams { para_id, block_status, announce_block, mut overseer_handler, spawner, key, parachain_consensus, runtime_api, }: StartCollatorParams, ) where Block: BlockT, BS: BlockBackend + Send + Sync + 'static, Spawner: SpawnNamed + Clone + Send + Sync + 'static, RA: ProvideRuntimeApi + Send + Sync + 'static, RA::Api: CollectCollationInfo, { let collator = Collator::new( block_status, Arc::new(spawner), announce_block, runtime_api, parachain_consensus, ); let span = tracing::Span::current(); let config = CollationGenerationConfig { key, para_id, collator: Box::new(move |relay_parent, validation_data| { let collator = collator.clone(); collator .produce_candidate(relay_parent, validation_data.clone()) .instrument(span.clone()) .boxed() }), }; overseer_handler .send_msg(CollationGenerationMessage::Initialize(config)) .await; overseer_handler .send_msg(CollatorProtocolMessage::CollateOn(para_id)) .await; } #[cfg(test)] mod tests { use super::*; use cumulus_client_consensus_common::ParachainCandidate; use cumulus_test_client::{ Client, ClientBlockImportExt, DefaultTestClientBuilderExt, InitBlockBuilder, TestClientBuilder, TestClientBuilderExt, }; use cumulus_test_runtime::{Block, Header}; use futures::{channel::mpsc, executor::block_on, StreamExt}; use polkadot_node_subsystem_test_helpers::ForwardSubsystem; use polkadot_overseer::{AllSubsystems, HeadSupportsParachains, Overseer}; use sp_consensus::BlockOrigin; use sp_core::{testing::TaskExecutor, Pair}; use sp_runtime::traits::BlakeTwo256; use sp_state_machine::Backend; struct AlwaysSupportsParachains; impl HeadSupportsParachains for AlwaysSupportsParachains { fn head_supports_parachains(&self, _head: &PHash) -> bool { true } } #[derive(Clone)] struct DummyParachainConsensus { client: Arc, } #[async_trait::async_trait] impl ParachainConsensus for DummyParachainConsensus { async fn produce_candidate( &mut self, parent: &Header, _: PHash, validation_data: &PersistedValidationData, ) -> Option> { let block_id = BlockId::Hash(parent.hash()); let builder = self.client.init_block_builder_at( &block_id, Some(validation_data.clone()), Default::default(), ); let (block, _, proof) = builder.build().expect("Creates block").into_inner(); self.client .import(BlockOrigin::Own, block.clone()) .await .expect("Imports the block"); Some(ParachainCandidate { block, proof: proof.expect("Proof is returned"), }) } } #[test] fn collates_produces_a_block_and_storage_proof_does_not_contains_code() { let _ = env_logger::try_init(); let spawner = TaskExecutor::new(); let para_id = ParaId::from(100); let announce_block = |_, _| (); let client = Arc::new(TestClientBuilder::new().build()); let header = client.header(&BlockId::Number(0)).unwrap().unwrap(); let (sub_tx, sub_rx) = mpsc::channel(64); let all_subsystems = AllSubsystems::<()>::dummy().replace_collation_generation(ForwardSubsystem(sub_tx)); let (overseer, handler) = Overseer::new( Vec::new(), all_subsystems, None, AlwaysSupportsParachains, spawner.clone(), ) .expect("Creates overseer"); spawner.spawn("overseer", overseer.run().then(|_| async { () }).boxed()); let collator_start = start_collator(StartCollatorParams { runtime_api: client.clone(), block_status: client.clone(), announce_block: Arc::new(announce_block), overseer_handler: handler, spawner, para_id, key: CollatorPair::generate().0, parachain_consensus: Box::new(DummyParachainConsensus { client: client.clone(), }), }); block_on(collator_start); let msg = block_on(sub_rx.into_future()) .0 .expect("message should be send by `start_collator` above."); let config = match msg { CollationGenerationMessage::Initialize(config) => config, }; let mut validation_data = PersistedValidationData::default(); validation_data.parent_head = header.encode().into(); let relay_parent = Default::default(); let collation = block_on((config.collator)(relay_parent, &validation_data)) .expect("Collation is build") .collation; let block_data = collation.proof_of_validity.block_data; let block = ParachainBlockData::::decode(&mut &block_data.0[..]).expect("Is a valid block"); assert_eq!(1, *block.header().number()); // Ensure that we did not include `:code` in the proof. let db = block.storage_proof().clone().into_memory_db(); let backend = sp_state_machine::new_in_mem::().update_backend(*header.state_root(), db); // Should return an error, as it was not included while building the proof. assert!(backend .storage(sp_core::storage::well_known_keys::CODE) .unwrap_err() .contains("Trie lookup error: Database missing expected key")); } }