Try to fix the build

This commit is contained in:
Bastian Köcher
2019-09-29 02:05:49 +02:00
parent 9af76cc73d
commit 1ceecff053
10 changed files with 2344 additions and 1446 deletions
+49 -45
View File
@@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Cumulus. If not, see <http://www.gnu.org/licenses/>.
use substrate_client::{backend::Backend, CallExecutor, Client, BlockchainEvents};
use substrate_client::{backend::{Backend, Finalizer}, CallExecutor, Client, BlockchainEvents};
use substrate_client::error::{Error as ClientError, Result as ClientResult};
use substrate_primitives::{Blake2Hasher, H256};
use sr_primitives::generic::BlockId;
@@ -22,8 +22,10 @@ use sr_primitives::traits::{Block as BlockT, Header as HeaderT, ProvideRuntimeAp
use polkadot_primitives::{Hash as PHash, Block as PBlock};
use polkadot_primitives::parachain::{Id as ParaId, ParachainHost};
use futures::{prelude::*, stream};
use parity_codec::{Encode, Decode};
use futures03::{
Stream, stream, StreamExt, TryStreamExt, TryStream, future, Future, TryFutureExt, FutureExt,
};
use codec::{Encode, Decode};
use log::warn;
use std::sync::Arc;
@@ -44,11 +46,9 @@ pub trait LocalClient {
/// Errors that can occur while following the polkadot relay-chain.
#[derive(Debug)]
pub enum Error<P> {
pub enum Error {
/// An underlying client error.
Client(ClientError),
/// Polkadot client error.
Polkadot(P),
/// Head data returned was not for our parachain.
InvalidHeadData,
}
@@ -68,38 +68,37 @@ pub trait PolkadotClient: Clone {
type Error: std::fmt::Debug + Send;
/// A stream that yields updates to the parachain head.
type HeadUpdates: Stream<Item=HeadUpdate, Error=Self::Error> + Send;
type HeadUpdates: Stream<Item = HeadUpdate> + Send;
/// A stream that yields finalized head-data for a certain parachain.
type Finalized: Stream<Item=Vec<u8>, Error=Self::Error> + Send;
type Finalized: Stream<Item = Vec<u8>> + Send;
/// Get a stream of head updates.
fn head_updates(&self, para_id: ParaId) -> Self::HeadUpdates;
fn head_updates(&self, para_id: ParaId) -> ClientResult<Self::HeadUpdates>;
/// Get a stream of finalized heads.
fn finalized_heads(&self, para_id: ParaId) -> Self::Finalized;
fn finalized_heads(&self, para_id: ParaId) -> ClientResult<Self::Finalized>;
}
/// Spawns a future that follows the Polkadot relay chain for the given parachain.
pub fn follow_polkadot<'a, L: 'a, P: 'a>(para_id: ParaId, local: Arc<L>, polkadot: P)
-> impl Future<Item=(),Error=()> + Send + 'a
-> ClientResult<impl Future<Output = ()> + Send + 'a>
where
L: LocalClient + Send + Sync,
P: PolkadotClient + Send + Sync,
{
let head_updates = polkadot.head_updates(para_id);
let finalized_heads = polkadot.finalized_heads(para_id);
let head_updates = polkadot.head_updates(para_id)?;
let finalized_heads = polkadot.finalized_heads(para_id)?;
let follow_best = {
let local = local.clone();
head_updates
.map_err(Error::Polkadot)
.and_then(|update| -> Result<Option<<L::Block as BlockT>::Header>, _> {
Decode::decode(&mut &update.head_data[..]).ok_or_else(|| Error::InvalidHeadData)
.map(|update| {
<Option<<L::Block as BlockT>::Header>>::decode(&mut &update.head_data[..])
.map_err(|_| Error::InvalidHeadData)
})
.filter_map(|h| h)
.for_each(move |p_head| {
let _synced = local.mark_best(p_head.hash()).map_err(Error::Client)?;
Ok(())
.try_filter_map(|h| future::ready(Ok(h)))
.try_for_each(move |p_head| {
future::ready(local.mark_best(p_head.hash()).map_err(Error::Client).map(|_| ()))
})
};
@@ -107,20 +106,21 @@ pub fn follow_polkadot<'a, L: 'a, P: 'a>(para_id: ParaId, local: Arc<L>, polkado
let local = local.clone();
finalized_heads
.map_err(Error::Polkadot)
.and_then(|head_data| -> Result<Option<<L::Block as BlockT>::Header>, _> {
Decode::decode(&mut &head_data[..]).ok_or_else(|| Error::InvalidHeadData)
.map(|head_data| {
<Option<<L::Block as BlockT>::Header>>::decode(&mut &head_data[..])
.map_err(|_| Error::InvalidHeadData)
})
.filter_map(|h| h)
.for_each(move |p_head| {
let _synced = local.finalize(p_head.hash()).map_err(Error::Client)?;
Ok(())
.try_filter_map(|h| future::ready(Ok(h)))
.try_for_each(move |p_head| {
future::ready(local.finalize(p_head.hash()).map_err(Error::Client).map(|_| ()))
})
};
follow_best.join(follow_finalized)
.map_err(|e| warn!("Could not follow relay-chain: {:?}", e))
.map(|((), ())| ())
Ok(
future::try_join(follow_best, follow_finalized)
.map_err(|e| warn!("Could not follow relay-chain: {:?}", e))
.map(|_| ())
)
}
impl<B, E, Block, RA> LocalClient for Client<B, E, Block, RA> where
@@ -168,41 +168,45 @@ impl<B, E, RA> PolkadotClient for Arc<Client<B, E, PBlock, RA>> where
{
type Error = ClientError;
type HeadUpdates = Box<dyn Stream<Item=HeadUpdate, Error=Self::Error> + Send>;
type Finalized = Box<dyn Stream<Item=Vec<u8>, Error=Self::Error> + Send>;
type HeadUpdates = Box<dyn Stream<Item=HeadUpdate> + Send + Unpin>;
type Finalized = Box<dyn Stream<Item=Vec<u8>> + Send + Unpin>;
fn head_updates(&self, para_id: ParaId) -> Self::HeadUpdates {
fn head_updates(&self, para_id: ParaId) -> ClientResult<Self::HeadUpdates> {
let parachain_key = parachain_key(para_id);
let stream = stream::once(self.storage_changes_notification_stream(Some(&[parachain_key.clone()]), None))
.map(|s| s.map_err(|()| panic!("unbounded receivers never yield errors; qed")))
.flatten();
let stream = self.storage_changes_notification_stream(Some(&[parachain_key.clone()]), None)?;
let s = stream.filter_map(move |(hash, changes)| {
let head_data = changes.iter()
.filter_map(|(_, k, v)| if k == &parachain_key { Some(v) } else { None })
.next();
match head_data {
let res = match head_data {
Some(Some(head_data)) => Some(HeadUpdate {
relay_hash: hash,
head_data: head_data.0.clone(),
}),
Some(None) | None => None,
}
};
future::ready(res)
});
Box::new(s)
Ok(Box::new(s))
}
fn finalized_heads(&self, para_id: ParaId) -> Self::Finalized {
fn finalized_heads(&self, para_id: ParaId) -> ClientResult<Self::Finalized> {
let polkadot = self.clone();
let parachain_key = parachain_key(para_id);
let s = self.finality_notification_stream()
.map_err(|()| panic!("unbounded receivers never yield errors; qed"))
.and_then(move |n| polkadot.storage(&BlockId::hash(n.hash), &parachain_key))
.filter_map(|d| d.map(|d| d.0));
.filter_map(move |n|
future::ready(
polkadot.storage(&BlockId::hash(n.hash), &parachain_key)
.ok()
.and_then(|d| d.map(|d| d.0)),
),
);
Box::new(s)
Ok(Box::new(s))
}
}
}