// Copyright 2019-2021 Parity Technologies (UK) Ltd. // This file is part of Polkadot. // Polkadot is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Polkadot is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . //! Parachain specific networking //! //! Provides a custom block announcement implementation for parachains //! that use the relay chain provided consensus. See [`BlockAnnounceValidator`] //! and [`WaitToAnnounce`] for more information about this implementation. use sc_client_api::{Backend, BlockchainEvents}; use sp_api::ProvideRuntimeApi; use sp_blockchain::HeaderBackend; use sp_consensus::{ block_validation::{BlockAnnounceValidator as BlockAnnounceValidatorT, Validation}, SyncOracle, }; use sp_core::traits::SpawnNamed; use sp_runtime::{ generic::BlockId, traits::{Block as BlockT, HashFor, Header as HeaderT}, }; use polkadot_node_primitives::{SignedFullStatement, Statement}; use polkadot_parachain::primitives::HeadData; use polkadot_primitives::v1::{ Block as PBlock, Hash as PHash, CandidateReceipt, CompactStatement, Id as ParaId, OccupiedCoreAssumption, ParachainHost, UncheckedSigned, SigningContext, }; use polkadot_client::ClientHandle; use codec::{Decode, Encode}; use futures::{ channel::oneshot, future::{ready, FutureExt}, Future, }; use std::{convert::TryFrom, fmt, marker::PhantomData, pin::Pin, sync::Arc}; use wait_on_relay_chain_block::WaitOnRelayChainBlock; #[cfg(test)] mod tests; mod wait_on_relay_chain_block; const LOG_TARGET: &str = "sync::cumulus"; type BoxedError = Box; #[derive(Debug)] struct BlockAnnounceError(String); impl std::error::Error for BlockAnnounceError {} impl fmt::Display for BlockAnnounceError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { self.0.fmt(f) } } /// The data that we attach to a block announcement. /// /// This will be used to prove that a header belongs to a block that is probably being backed by /// the relay chain. #[derive(Encode, Decode, Debug)] pub struct BlockAnnounceData { receipt: CandidateReceipt, statement: UncheckedSigned, } impl BlockAnnounceData { /// Validate that the receipt, statement and announced header match. /// /// This will not check the signature, for this you should use [`BlockAnnounceData::check_signature`]. fn validate(&self, encoded_header: Vec) -> Result<(), Validation> { let candidate_hash = if let CompactStatement::Seconded(h) = self.statement.unchecked_payload() { h } else { tracing::debug!( target: LOG_TARGET, "`CompactStatement` isn't the candidate variant!", ); return Err(Validation::Failure { disconnect: true }); }; if *candidate_hash != self.receipt.hash() { tracing::debug!( target: LOG_TARGET, "Receipt candidate hash doesn't match candidate hash in statement", ); return Err(Validation::Failure { disconnect: true }); } if HeadData(encoded_header).hash() != self.receipt.descriptor.para_head { tracing::debug!( target: LOG_TARGET, "Receipt para head hash doesn't match the hash of the header in the block announcement", ); return Err(Validation::Failure { disconnect: true }); } Ok(()) } /// Check the signature of the statement. /// /// Returns an `Err(_)` if it failed. fn check_signature

( self, relay_chain_client: &Arc

, ) -> Result where P: ProvideRuntimeApi + Send + Sync + 'static, P::Api: ParachainHost, { let runtime_api = relay_chain_client.runtime_api(); let validator_index = self.statement.unchecked_validator_index(); let runtime_api_block_id = BlockId::Hash(self.receipt.descriptor.relay_parent); let session_index = match runtime_api.session_index_for_child(&runtime_api_block_id) { Ok(r) => r, Err(e) => { return Err(BlockAnnounceError(format!("{:?}", e))); } }; let signing_context = SigningContext { parent_hash: self.receipt.descriptor.relay_parent, session_index, }; // Check that the signer is a legit validator. let authorities = match runtime_api.validators(&runtime_api_block_id) { Ok(r) => r, Err(e) => { return Err(BlockAnnounceError(format!("{:?}", e))); } }; let signer = match authorities.get(validator_index.0 as usize) { Some(r) => r, None => { tracing::debug!( target: LOG_TARGET, "Block announcement justification signer is a validator index out of bound", ); return Ok(Validation::Failure { disconnect: true }); } }; // Check statement is correctly signed. if self .statement .try_into_checked(&signing_context, &signer) .is_err() { tracing::debug!( target: LOG_TARGET, "Block announcement justification signature is invalid.", ); return Ok(Validation::Failure { disconnect: true }); } Ok(Validation::Success { is_new_best: true }) } } impl TryFrom<&'_ SignedFullStatement> for BlockAnnounceData { type Error = (); fn try_from(stmt: &SignedFullStatement) -> Result { let receipt = if let Statement::Seconded(receipt) = stmt.payload() { receipt.to_plain() } else { return Err(()); }; Ok(BlockAnnounceData { receipt, statement: stmt.convert_payload().into(), }) } } /// Parachain specific block announce validator. /// /// This block announce validator is required if the parachain is running /// with the relay chain provided consensus to make sure each node only /// imports a reasonable number of blocks per round. The relay chain provided /// consensus doesn't have any authorities and so it could happen that without /// this special block announce validator a node would need to import *millions* /// of blocks per round, which is clearly not doable. /// /// To solve this problem, each block announcement is delayed until a collator /// has received a [`Statement::Seconded`] for its `PoV`. This message tells the /// collator that its `PoV` was validated successfully by a parachain validator and /// that it is very likely that this `PoV` will be included in the relay chain. Every /// collator that doesn't receive the message for its `PoV` will not announce its block. /// For more information on the block announcement, see [`WaitToAnnounce`]. /// /// For each block announcement that is received, the generic block announcement validation /// will call this validator and provides the extra data that was attached to the announcement. /// We call this extra data `justification`. /// It is expected that the attached data is a SCALE encoded [`BlockAnnounceData`]. The /// statement is checked to be a [`CompactStatement::Candidate`] and that it is signed by an active /// parachain validator. /// /// If no justification was provided we check if the block announcement is at the tip of the known /// chain. If it is at the tip, it is required to provide a justification or otherwise we reject /// it. However, if the announcement is for a block below the tip the announcement is accepted /// as it probably comes from a node that is currently syncing the chain. pub struct BlockAnnounceValidator { phantom: PhantomData, relay_chain_client: Arc, relay_chain_backend: Arc, para_id: ParaId, relay_chain_sync_oracle: Box, wait_on_relay_chain_block: WaitOnRelayChainBlock, } impl BlockAnnounceValidator { /// Create a new [`BlockAnnounceValidator`]. pub fn new( relay_chain_client: Arc, para_id: ParaId, relay_chain_sync_oracle: Box, relay_chain_backend: Arc, relay_chain_blockchain_events: Arc, ) -> Self { Self { phantom: Default::default(), relay_chain_client, para_id, relay_chain_sync_oracle, relay_chain_backend: relay_chain_backend.clone(), wait_on_relay_chain_block: WaitOnRelayChainBlock::new( relay_chain_backend, relay_chain_blockchain_events, ), } } } impl BlockAnnounceValidator where R: ProvideRuntimeApi + Send + Sync + 'static, R::Api: ParachainHost, B: Backend + 'static, // Rust bug: https://github.com/rust-lang/rust/issues/24159 sc_client_api::StateBackendFor: sc_client_api::StateBackend>, { /// Get the included block of the given parachain in the relay chain. fn included_block( relay_chain_client: &R, block_id: &BlockId, para_id: ParaId, ) -> Result { let validation_data = relay_chain_client .runtime_api() .persisted_validation_data(block_id, para_id, OccupiedCoreAssumption::TimedOut) .map_err(|e| Box::new(BlockAnnounceError(format!("{:?}", e))) as Box<_>)? .ok_or_else(|| { Box::new(BlockAnnounceError( "Could not find parachain head in relay chain".into(), )) as Box<_> })?; let para_head = Block::Header::decode(&mut &validation_data.parent_head.0[..]).map_err(|e| { Box::new(BlockAnnounceError(format!( "Failed to decode parachain head: {:?}", e ))) as Box<_> })?; Ok(para_head) } /// Get the backed block hash of the given parachain in the relay chain. fn backed_block_hash( relay_chain_client: &R, block_id: &BlockId, para_id: ParaId, ) -> Result, BoxedError> { let candidate_receipt = relay_chain_client .runtime_api() .candidate_pending_availability(block_id, para_id) .map_err(|e| Box::new(BlockAnnounceError(format!("{:?}", e))) as Box<_>)?; Ok(candidate_receipt.map(|cr| cr.descriptor.para_head)) } /// Handle a block announcement with empty data (no statement) attached to it. fn handle_empty_block_announce_data( &self, header: Block::Header, ) -> impl Future> { let relay_chain_client = self.relay_chain_client.clone(); let relay_chain_backend = self.relay_chain_backend.clone(); let para_id = self.para_id; async move { // Check if block is equal or higher than best (this requires a justification) let relay_chain_info = relay_chain_backend.blockchain().info(); let runtime_api_block_id = BlockId::Hash(relay_chain_info.best_hash); let block_number = header.number(); let best_head = Self::included_block(&*relay_chain_client, &runtime_api_block_id, para_id)?; let known_best_number = best_head.number(); let backed_block = || Self::backed_block_hash(&*relay_chain_client, &runtime_api_block_id, para_id); if best_head == header { tracing::debug!( target: LOG_TARGET, "Announced block matches best block.", ); Ok(Validation::Success { is_new_best: true }) } else if Some(HeadData(header.encode()).hash()) == backed_block()? { tracing::debug!( target: LOG_TARGET, "Announced block matches latest backed block.", ); Ok(Validation::Success { is_new_best: true }) } else if block_number >= known_best_number { tracing::debug!( target: LOG_TARGET, "Validation failed because a justification is needed if the block at the top of the chain." ); Ok(Validation::Failure { disconnect: false }) } else { Ok(Validation::Success { is_new_best: false }) } } } } impl BlockAnnounceValidatorT for BlockAnnounceValidator where P: ProvideRuntimeApi + Send + Sync + 'static, P::Api: ParachainHost, B: Backend + 'static, BCE: BlockchainEvents + 'static + Send + Sync, // Rust bug: https://github.com/rust-lang/rust/issues/24159 sc_client_api::StateBackendFor: sc_client_api::StateBackend>, { fn validate( &mut self, header: &Block::Header, mut data: &[u8], ) -> Pin> + Send>> { if self.relay_chain_sync_oracle.is_major_syncing() { return ready(Ok(Validation::Success { is_new_best: false })).boxed(); } if data.is_empty() { return self .handle_empty_block_announce_data(header.clone()) .boxed(); } let block_announce_data = match BlockAnnounceData::decode(&mut data) { Ok(r) => r, Err(_) => { return ready(Err(Box::new(BlockAnnounceError( "Can not decode the `BlockAnnounceData`".into(), )) as Box<_>)) .boxed() } }; let relay_chain_client = self.relay_chain_client.clone(); let header_encoded = header.encode(); let wait_on_relay_chain_block = self.wait_on_relay_chain_block.clone(); async move { if let Err(e) = block_announce_data.validate(header_encoded) { return Ok(e); } let relay_parent = block_announce_data.receipt.descriptor.relay_parent; wait_on_relay_chain_block .wait_on_relay_chain_block(relay_parent) .await .map_err(|e| Box::new(BlockAnnounceError(e.to_string())) as Box<_>)?; block_announce_data .check_signature(&relay_chain_client) .map_err(|e| Box::new(e) as Box<_>) } .boxed() } } /// Build a block announce validator instance. /// /// Returns a boxed [`BlockAnnounceValidator`]. pub fn build_block_announce_validator( relay_chain_client: polkadot_client::Client, para_id: ParaId, relay_chain_sync_oracle: Box, relay_chain_backend: Arc, ) -> Box + Send> where B: Backend + Send + 'static, // Rust bug: https://github.com/rust-lang/rust/issues/24159 sc_client_api::StateBackendFor: sc_client_api::StateBackend>, { BlockAnnounceValidatorBuilder::new( relay_chain_client, para_id, relay_chain_sync_oracle, relay_chain_backend, ) .build() } /// Block announce validator builder. /// /// Builds a [`BlockAnnounceValidator`] for a parachain. As this requires /// a concrete relay chain client instance, the builder takes a [`polkadot_client::Client`] /// that wraps this concrete instanace. By using [`polkadot_client::ExecuteWithClient`] /// the builder gets access to this concrete instance. struct BlockAnnounceValidatorBuilder { phantom: PhantomData, relay_chain_client: polkadot_client::Client, para_id: ParaId, relay_chain_sync_oracle: Box, relay_chain_backend: Arc, } impl BlockAnnounceValidatorBuilder where B: Backend + Send + 'static, // Rust bug: https://github.com/rust-lang/rust/issues/24159 sc_client_api::StateBackendFor: sc_client_api::StateBackend>, { /// Create a new instance of the builder. fn new( relay_chain_client: polkadot_client::Client, para_id: ParaId, relay_chain_sync_oracle: Box, relay_chain_backend: Arc, ) -> Self { Self { relay_chain_client, para_id, relay_chain_sync_oracle, relay_chain_backend, phantom: PhantomData, } } /// Build the block announce validator. fn build(self) -> Box + Send> { self.relay_chain_client.clone().execute_with(self) } } impl polkadot_client::ExecuteWithClient for BlockAnnounceValidatorBuilder where B: Backend + Send + 'static, // Rust bug: https://github.com/rust-lang/rust/issues/24159 sc_client_api::StateBackendFor: sc_client_api::StateBackend>, { type Output = Box + Send>; fn execute_with_client(self, client: Arc) -> Self::Output where >::StateBackend: sp_api::StateBackend, PBackend: Backend, PBackend::State: sp_api::StateBackend, Api: polkadot_client::RuntimeApiCollection, PClient: polkadot_client::AbstractClient + 'static, { Box::new(BlockAnnounceValidator::new( client.clone(), self.para_id, self.relay_chain_sync_oracle, self.relay_chain_backend, client, )) } } /// Wait before announcing a block that a candidate message has been received for this block, then /// add this message as justification for the block announcement. /// /// This object will spawn a new task every time the method `wait_to_announce` is called and cancel /// the previous task running. pub struct WaitToAnnounce { spawner: Arc, announce_block: Arc>) + Send + Sync>, } impl WaitToAnnounce { /// Create the `WaitToAnnounce` object pub fn new( spawner: Arc, announce_block: Arc>) + Send + Sync>, ) -> WaitToAnnounce { WaitToAnnounce { spawner, announce_block, } } /// Wait for a candidate message for the block, then announce the block. The candidate /// message will be added as justification to the block announcement. pub fn wait_to_announce( &mut self, block_hash: ::Hash, signed_stmt_recv: oneshot::Receiver, ) { let announce_block = self.announce_block.clone(); self.spawner.spawn( "cumulus-wait-to-announce", async move { tracing::debug!( target: "cumulus-network", "waiting for announce block in a background task...", ); wait_to_announce::(block_hash, announce_block, signed_stmt_recv).await; tracing::debug!( target: "cumulus-network", "block announcement finished", ); } .boxed(), ); } } async fn wait_to_announce( block_hash: ::Hash, announce_block: Arc>) + Send + Sync>, signed_stmt_recv: oneshot::Receiver, ) { let statement = match signed_stmt_recv.await { Ok(s) => s, Err(_) => { tracing::debug!( target: "cumulus-network", block = ?block_hash, "Wait to announce stopped, because sender was dropped.", ); return; } }; if let Ok(data) = BlockAnnounceData::try_from(&statement) { announce_block(block_hash, Some(data.encode())); } else { tracing::debug!( target: "cumulus-network", statement = ?statement, block = ?block_hash, "Received invalid statement while waiting to announce block.", ); } }