diff --git a/cumulus/consensus/src/lib.rs b/cumulus/consensus/src/lib.rs index 45e68ab661..bc5c2b117f 100644 --- a/cumulus/consensus/src/lib.rs +++ b/cumulus/consensus/src/lib.rs @@ -14,19 +14,26 @@ // You should have received a copy of the GNU General Public License // along with Cumulus. If not, see . -use substrate_client::{backend::{Backend, Finalizer}, CallExecutor, Client, BlockchainEvents}; -use substrate_client::error::{Error as ClientError, Result as ClientResult}; +use substrate_client::{ + backend::{Backend, Finalizer}, CallExecutor, Client, BlockchainEvents, + error::{Error as ClientError, Result as ClientResult}, +}; use substrate_primitives::{Blake2Hasher, H256}; -use sr_primitives::generic::BlockId; -use sr_primitives::traits::{Block as BlockT, Header as HeaderT, ProvideRuntimeApi}; -use polkadot_primitives::{Hash as PHash, Block as PBlock}; -use polkadot_primitives::parachain::{Id as ParaId, ParachainHost}; +use sr_primitives::{ + generic::BlockId, + traits::{Block as BlockT, Header as HeaderT, ProvideRuntimeApi}, +}; +use substrate_consensus_common::{Error as ConsensusError, SelectChain as SelectChainT}; + +use polkadot_primitives::{ + Hash as PHash, Block as PBlock, parachain::{Id as ParaId, ParachainHost}, +}; use futures::{Stream, StreamExt, TryStreamExt, future, Future, TryFutureExt, FutureExt}; -use codec::{Encode, Decode}; +use codec::Decode; use log::warn; -use std::sync::Arc; +use std::{sync::Arc, marker::PhantomData}; /// Helper for the local client. pub trait LocalClient { @@ -66,6 +73,13 @@ pub trait PolkadotClient: Clone { /// Get a stream of finalized heads. fn finalized_heads(&self, para_id: ParaId) -> ClientResult; + + /// Returns the parachain head for the given `para_id` at the given block id. + fn parachain_head_at( + &self, + at: &BlockId, + para_id: ParaId, + ) -> ClientResult>>; } /// Spawns a future that follows the Polkadot relay chain for the given parachain. @@ -116,20 +130,11 @@ impl LocalClient for Client where } } -fn parachain_key(para_id: ParaId) -> substrate_primitives::storage::StorageKey { - const PREFIX: &[u8] = &*b"Parachains Heads"; - para_id.using_encoded(|s| { - let mut v = PREFIX.to_vec(); - v.extend(s); - substrate_primitives::storage::StorageKey(v) - }) -} - impl PolkadotClient for Arc> where B: Backend + Send + Sync + 'static, E: CallExecutor + Send + Sync + 'static, - RA: ProvideRuntimeApi + Send + Sync + 'static, - RA::Api: ParachainHost, + Client: ProvideRuntimeApi + Send + Sync + 'static, + as ProvideRuntimeApi>::Api: ParachainHost, { type Error = ClientError; @@ -137,17 +142,100 @@ impl PolkadotClient for Arc> where fn finalized_heads(&self, para_id: ParaId) -> ClientResult { let polkadot = self.clone(); - let parachain_key = parachain_key(para_id); let s = self.finality_notification_stream() .filter_map(move |n| future::ready( - polkadot.storage(&BlockId::hash(n.hash), ¶chain_key) - .ok() - .and_then(|d| d.map(|d| d.0)), + polkadot.parachain_head_at(&BlockId::hash(n.hash), para_id).ok().and_then(|h| h), ), ); Ok(Box::new(s)) } + + fn parachain_head_at( + &self, + at: &BlockId, + para_id: ParaId, + ) -> ClientResult>> { + self.runtime_api().parachain_status(at, para_id).map(|s| s.map(|s| s.head_data.0)) + } +} + +/// Select chain implementation for parachains. +/// +/// The actual behavior of the implementation depends on the select chain implementation used by +/// Polkadot. +pub struct SelectChain { + polkadot_client: PC, + polkadot_select_chain: SC, + para_id: ParaId, + _marker: PhantomData, +} + +impl SelectChain { + /// Create new instance of `Self`. + /// + /// - `para_id`: The id of the parachain. + /// - `polkadot_client`: The client of the Polkadot node. + /// - `polkadot_select_chain`: The Polkadot select chain implementation. + pub fn new(para_id: ParaId, polkadot_client: PC, polkadot_select_chain: SC) -> Self { + Self { + polkadot_client, + polkadot_select_chain, + para_id, + _marker: PhantomData, + } + } +} + +impl Clone for SelectChain { + fn clone(&self) -> Self { + Self { + polkadot_client: self.polkadot_client.clone(), + polkadot_select_chain: self.polkadot_select_chain.clone(), + para_id: self.para_id, + _marker: PhantomData, + } + } +} + +impl SelectChainT for SelectChain where + Block: BlockT, + PC: PolkadotClient + Clone + Send + Sync, + PC::Error: ToString, + SC: SelectChainT, +{ + fn leaves(&self) -> Result::Hash>, ConsensusError> { + let leaves = self.polkadot_select_chain.leaves()?; + leaves.into_iter() + .filter_map(|l| + self.polkadot_client + .parachain_head_at(&BlockId::Hash(l), self.para_id) + .map(|h| h.and_then(|d| <::Hash>::decode(&mut &d[..]).ok())) + .transpose() + ) + .collect::, _>>() + .map_err(|e| ConsensusError::ChainLookup(e.to_string())) + } + + fn best_chain(&self) -> Result<::Header, ConsensusError> { + let best_chain = self.polkadot_select_chain.best_chain()?; + let para_best_chain = self.polkadot_client + .parachain_head_at(&BlockId::Hash(best_chain.hash()), self.para_id) + .map_err(|e| ConsensusError::ChainLookup(e.to_string()))?; + + match para_best_chain { + Some(best) => Decode::decode(&mut &best[..]) + .map_err(|e| + ConsensusError::ChainLookup( + format!("Error decoding parachain head: {}", e.what()), + ), + ), + None => Err( + ConsensusError::ChainLookup( + "Could not find parachain head for best relay chain!".into()), + ) + } + } }