diff --git a/cumulus/Cargo.lock b/cumulus/Cargo.lock index 64466bf64c..49ad1cfe86 100644 --- a/cumulus/Cargo.lock +++ b/cumulus/Cargo.lock @@ -474,6 +474,8 @@ name = "cumulus-consensus" version = "0.1.0" dependencies = [ "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", + "parity-codec 2.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "polkadot-primitives 0.1.0 (git+https://github.com/paritytech/polkadot?branch=rh-update-substrate)", "polkadot-service 0.3.0 (git+https://github.com/paritytech/polkadot?branch=rh-update-substrate)", "sr-primitives 0.1.0 (git+https://github.com/paritytech/substrate)", diff --git a/cumulus/consensus/Cargo.toml b/cumulus/consensus/Cargo.toml index 4c3adb8df1..437e21e848 100644 --- a/cumulus/consensus/Cargo.toml +++ b/cumulus/consensus/Cargo.toml @@ -18,4 +18,6 @@ polkadot-primitives = { git = "https://github.com/paritytech/polkadot", branch = # other deps futures = "0.1.21" -tokio = "0.1.8" \ No newline at end of file +tokio = "0.1.8" +parity-codec = "2.0" +log = "0.4" \ No newline at end of file diff --git a/cumulus/consensus/src/lib.rs b/cumulus/consensus/src/lib.rs index ed109e9574..a791ff36ca 100644 --- a/cumulus/consensus/src/lib.rs +++ b/cumulus/consensus/src/lib.rs @@ -14,10 +14,109 @@ // You should have received a copy of the GNU General Public License // along with Cumulus. If not, see . -#[cfg(test)] -mod tests { - #[test] - fn it_works() { - assert_eq!(2 + 2, 4); - } +use substrate_client::{backend::Backend, Client, BlockchainEvents}; +use substrate_client::error::{Error as ClientError, Result as ClientResult}; +use sr_primitives::traits::{Block as BlockT, Header as HeaderT, ProvideRuntimeApi}; +use polkadot_primitives::{BlockNumber as PBlockNumber, Hash as PHash, parachain::Id as ParaId}; + +use futures::prelude::*; +use parity_codec::Decode; +use log::warn; + +use std::sync::Arc; + +/// Helper for the local client. +pub trait LocalClient { + /// The block type of the local client. + type Block: BlockT; + + /// Mark the given block as the best block. + /// Returns `false` if the block is not known. + fn mark_best(&self, hash: ::Hash) -> ClientResult; + + /// Finalize the given block. + /// Returns `false` if the block is not known. + fn finalize(&self, hash: ::Hash) -> ClientResult; } + +/// Errors that can occur while following the polkadot relay-chain. +#[derive(Debug)] +pub enum Error

{ + /// An underlying client error. + Client(ClientError), + /// Polkadot client error. + Polkadot(P), + /// Head data returned was not for our parachain. + InvalidHeadData, +} + +/// A parachain head update. +pub struct HeadUpdate { + /// The relay-chain's block hash where the parachain head updated. + pub relay_hash: PHash, + /// The relay-chain's block number where the parachain head updated. + pub relay_number: PBlockNumber, + /// The parachain head-data. + pub head_data: Vec, +} + +/// Helper for the Polkadot client. +pub trait PolkadotClient { + /// The error type for interacting with the Polkadot client. + type Error: std::fmt::Debug + Send; + + /// A stream that yields updates to the parachain head. + type HeadUpdates: Stream + Send; + /// A stream that yields finalized head-data for a certain parachain. + type Finalized: Stream,Error=Self::Error> + Send; + + /// Get a stream of head updates. + fn head_updates(&self, para_id: ParaId) -> Self::HeadUpdates; + /// Get a stream of finalized heads. + fn finalized_heads(&self, para_id: ParaId) -> Self::Finalized; +} + +/// Spawns a future that follows the Polkadot relay chain for the given parachain. +pub fn follow_polkadot<'a, L: 'a, P: 'a>(para_id: ParaId, local: Arc, polkadot: Arc

) + -> impl Future + Send + 'a + where + L: LocalClient + Send + Sync, + P: PolkadotClient + Send + Sync, +{ + let head_updates = polkadot.head_updates(para_id); + let finalized_heads = polkadot.finalized_heads(para_id); + + let follow_best = { + let local = local.clone(); + + head_updates + .map_err(Error::Polkadot) + .and_then(|update| { + ::Header::decode(&mut &update.head_data[..]) + .ok_or_else(|| Error::InvalidHeadData) + }) + .for_each(move |p_head| { + let _synced = local.mark_best(p_head.hash()).map_err(Error::Client)?; + Ok(()) + }) + }; + + let follow_finalized = { + let local = local.clone(); + + finalized_heads + .map_err(Error::Polkadot) + .and_then(|head_data| { + ::Header::decode(&mut &head_data[..]) + .ok_or_else(|| Error::InvalidHeadData) + }) + .for_each(move |p_head| { + let _synced = local.finalize(p_head.hash()).map_err(Error::Client)?; + Ok(()) + }) + }; + + follow_best.join(follow_finalized) + .map_err(|e| warn!("Could not follow relay-chain: {:?}", e)) + .map(|((), ())| ()) +} \ No newline at end of file