implement PolkadotClient for polkadot clients without parachain key

This commit is contained in:
Robert Habermeier
2019-01-21 21:47:21 -03:00
parent bf49f5624a
commit 6397c786ff
3 changed files with 481 additions and 144 deletions
+416 -134
View File
File diff suppressed because it is too large Load Diff
+3 -2
View File
@@ -13,8 +13,9 @@ substrate-primitives = { git = "https://github.com/paritytech/substrate" }
sr-primitives = { git = "https://github.com/paritytech/substrate" } sr-primitives = { git = "https://github.com/paritytech/substrate" }
# polkadot deps # polkadot deps
polkadot-service = { git = "https://github.com/paritytech/polkadot", branch = "rh-update-substrate" } polkadot-service = { git = "https://github.com/paritytech/polkadot" }
polkadot-primitives = { git = "https://github.com/paritytech/polkadot", branch = "rh-update-substrate" } polkadot-primitives = { git = "https://github.com/paritytech/polkadot" }
polkadot-runtime = { git = "https://github.com/paritytech/polkadot" }
# other deps # other deps
futures = "0.1.21" futures = "0.1.21"
+62 -8
View File
@@ -19,9 +19,11 @@ use substrate_client::error::{Error as ClientError, Result as ClientResult, Erro
use substrate_primitives::{Blake2Hasher, H256}; use substrate_primitives::{Blake2Hasher, H256};
use sr_primitives::generic::BlockId; use sr_primitives::generic::BlockId;
use sr_primitives::traits::{Block as BlockT, Header as HeaderT, ProvideRuntimeApi}; use sr_primitives::traits::{Block as BlockT, Header as HeaderT, ProvideRuntimeApi};
use polkadot_primitives::{BlockNumber as PBlockNumber, Hash as PHash, parachain::Id as ParaId}; use polkadot_primitives::{Hash as PHash, Block as PBlock};
use polkadot_primitives::parachain::{Id as ParaId, ParachainHost};
use futures::prelude::*; use futures::prelude::*;
use futures::stream;
use parity_codec::Decode; use parity_codec::Decode;
use log::warn; use log::warn;
@@ -56,14 +58,13 @@ pub enum Error<P> {
pub struct HeadUpdate { pub struct HeadUpdate {
/// The relay-chain's block hash where the parachain head updated. /// The relay-chain's block hash where the parachain head updated.
pub relay_hash: PHash, pub relay_hash: PHash,
/// The relay-chain's block number where the parachain head updated.
pub relay_number: PBlockNumber,
/// The parachain head-data. /// The parachain head-data.
pub head_data: Vec<u8>, pub head_data: Vec<u8>,
} }
/// Helper for the Polkadot client. /// Helper for the Polkadot client. This is expected to be a lightweight handle
pub trait PolkadotClient { /// like an `Arc`.
pub trait PolkadotClient: Clone {
/// The error type for interacting with the Polkadot client. /// The error type for interacting with the Polkadot client.
type Error: std::fmt::Debug + Send; type Error: std::fmt::Debug + Send;
@@ -79,7 +80,7 @@ pub trait PolkadotClient {
} }
/// Spawns a future that follows the Polkadot relay chain for the given parachain. /// Spawns a future that follows the Polkadot relay chain for the given parachain.
pub fn follow_polkadot<'a, L: 'a, P: 'a>(para_id: ParaId, local: Arc<L>, polkadot: Arc<P>) pub fn follow_polkadot<'a, L: 'a, P: 'a>(para_id: ParaId, local: Arc<L>, polkadot: P)
-> impl Future<Item=(),Error=()> + Send + 'a -> impl Future<Item=(),Error=()> + Send + 'a
where where
L: LocalClient + Send + Sync, L: LocalClient + Send + Sync,
@@ -94,9 +95,10 @@ pub fn follow_polkadot<'a, L: 'a, P: 'a>(para_id: ParaId, local: Arc<L>, polkado
head_updates head_updates
.map_err(Error::Polkadot) .map_err(Error::Polkadot)
.and_then(|update| { .and_then(|update| {
<L::Block as BlockT>::Header::decode(&mut &update.head_data[..]) Option<<L::Block as BlockT>::Header>::decode(&mut &update.head_data[..])
.ok_or_else(|| Error::InvalidHeadData) .ok_or_else(|| Error::InvalidHeadData)
}) })
.filter_map(|h| h)
.for_each(move |p_head| { .for_each(move |p_head| {
let _synced = local.mark_best(p_head.hash()).map_err(Error::Client)?; let _synced = local.mark_best(p_head.hash()).map_err(Error::Client)?;
Ok(()) Ok(())
@@ -109,9 +111,10 @@ pub fn follow_polkadot<'a, L: 'a, P: 'a>(para_id: ParaId, local: Arc<L>, polkado
finalized_heads finalized_heads
.map_err(Error::Polkadot) .map_err(Error::Polkadot)
.and_then(|head_data| { .and_then(|head_data| {
<L::Block as BlockT>::Header::decode(&mut &head_data[..]) Option<<L::Block as BlockT>::Header>::decode(&mut &head_data[..])
.ok_or_else(|| Error::InvalidHeadData) .ok_or_else(|| Error::InvalidHeadData)
}) })
.filter_map(|h| h)
.for_each(move |p_head| { .for_each(move |p_head| {
let _synced = local.finalize(p_head.hash()).map_err(Error::Client)?; let _synced = local.finalize(p_head.hash()).map_err(Error::Client)?;
Ok(()) Ok(())
@@ -143,4 +146,55 @@ impl<B, E, Block, RA> LocalClient for Client<B, E, Block, RA> where
} }
} }
} }
}
fn parachain_key(_para_id: ParaId) -> substrate_primitives::storage::StorageKey {
unimplemented!()
}
impl<B, E, RA> PolkadotClient for Arc<Client<B, E, PBlock, RA>> where
B: Backend<PBlock, Blake2Hasher> + Send + Sync + 'static,
E: CallExecutor<PBlock, Blake2Hasher> + Send + Sync + 'static,
RA: ProvideRuntimeApi + Send + Sync + 'static,
RA::Api: ParachainHost<PBlock>,
{
type Error = ClientError;
type HeadUpdates = Box<Stream<Item=HeadUpdate,Error=Self::Error> + Send>;
type Finalized = Box<Stream<Item=Vec<u8>,Error=Self::Error> + Send>;
fn head_updates(&self, para_id: ParaId) -> Self::HeadUpdates {
let parachain_key = parachain_key(para_id);
let stream = stream::once(self.storage_changes_notification_stream(Some(&[parachain_key.clone()])))
.map(|s| s.map_err(|()| panic!("unbounded receivers never yield errors; qed")))
.flatten();
let s = stream.filter_map(move |(hash, changes)| {
let head_data = changes.iter()
.filter_map(|(k, v)| if k == &parachain_key { Some(v) } else { None })
.next();
match head_data {
Some(Some(head_data)) => Some(HeadUpdate {
relay_hash: hash,
head_data: head_data.0.clone(),
}),
Some(None) | None => None,
}
});
Box::new(s)
}
fn finalized_heads(&self, para_id: ParaId) -> Self::Finalized {
let polkadot = self.clone();
let parachain_key = parachain_key(para_id);
let s = self.finality_notification_stream()
.map_err(|()| panic!("unbounded receivers never yield errors; qed"))
.and_then(move |n| polkadot.storage(&BlockId::hash(n.hash), &parachain_key))
.filter_map(|d| d.map(|d| d.0));
Box::new(s)
}
} }