mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-01 15:57:55 +00:00
Merge pull request #11 from paritytech/bkchr-collator
Implement Collator
This commit is contained in:
+1
-1
@@ -58,7 +58,7 @@ test-linux-stable:
|
||||
# but still want to have debug assertions.
|
||||
RUSTFLAGS: -Cdebug-assertions=y
|
||||
script:
|
||||
- time cargo test --all --release --frozen |
|
||||
- time cargo test --all --release --locked |
|
||||
tee output.log
|
||||
- sccache -s
|
||||
after_script:
|
||||
|
||||
Generated
+1709
-1140
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,27 @@
|
||||
[package]
|
||||
name = "cumulus-collator"
|
||||
version = "0.1.0"
|
||||
authors = ["Parity Technologies <admin@parity.io>"]
|
||||
edition = "2018"
|
||||
|
||||
[dependencies]
|
||||
# Substrate dependencies
|
||||
sr-primitives = { package = "sr-primitives", git = "https://github.com/paritytech/substrate", branch = "bkchr-cumulus-branch" }
|
||||
consensus-common = { package = "substrate-consensus-common", git = "https://github.com/paritytech/substrate", branch = "bkchr-cumulus-branch" }
|
||||
inherents = { package = "substrate-inherents", git = "https://github.com/paritytech/substrate", branch = "bkchr-cumulus-branch" }
|
||||
cli = { package = "substrate-cli", git = "https://github.com/paritytech/substrate", branch = "bkchr-cumulus-branch" }
|
||||
|
||||
# Polkadot dependencies
|
||||
polkadot-collator = { git = "https://github.com/paritytech/polkadot", branch = "bkchr-cumulus-branch" }
|
||||
polkadot-primitives = { git = "https://github.com/paritytech/polkadot", branch = "bkchr-cumulus-branch" }
|
||||
|
||||
# other deps
|
||||
log = "0.4.8"
|
||||
codec = { package = "parity-scale-codec", version = "1.0.6", features = [ "derive" ] }
|
||||
futures = "0.1.29"
|
||||
futures03 = { package = "futures-preview", version = "0.3.0-alpha.19", features = ["compat"] }
|
||||
parking_lot = "0.9"
|
||||
|
||||
[dev-dependencies]
|
||||
test-runtime = { package = "cumulus-test-runtime", path = "../test/runtime" }
|
||||
keyring = { package = "substrate-keyring", git = "https://github.com/paritytech/substrate", branch = "bkchr-cumulus-branch" }
|
||||
@@ -0,0 +1,330 @@
|
||||
// Copyright 2019 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Substrate.
|
||||
|
||||
// Substrate is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Substrate is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Cumulus. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
//! Cumulus Collator implementation for Substrate.
|
||||
|
||||
use sr_primitives::traits::Block as BlockT;
|
||||
use consensus_common::{Environment, Proposer};
|
||||
use inherents::InherentDataProviders;
|
||||
|
||||
use polkadot_collator::{
|
||||
InvalidHead, ParachainContext, BuildParachainContext, Network as CollatorNetwork, VersionInfo,
|
||||
};
|
||||
use polkadot_primitives::{
|
||||
Hash,
|
||||
parachain::{
|
||||
self, BlockData, Message, Id as ParaId, OutgoingMessages, Status as ParachainStatus,
|
||||
CollatorPair,
|
||||
}
|
||||
};
|
||||
|
||||
use codec::{Decode, Encode};
|
||||
|
||||
use log::error;
|
||||
|
||||
use futures03::TryFutureExt;
|
||||
use futures::{Future, future::IntoFuture};
|
||||
|
||||
use std::{sync::Arc, marker::PhantomData, time::Duration};
|
||||
|
||||
use parking_lot::Mutex;
|
||||
|
||||
/// The head data of the parachain, stored in the relay chain.
|
||||
#[derive(Decode, Encode, Debug)]
|
||||
struct HeadData<Block: BlockT> {
|
||||
header: Block::Header,
|
||||
}
|
||||
|
||||
/// The implementation of the Cumulus `Collator`.
|
||||
pub struct Collator<Block, PF> {
|
||||
proposer_factory: Arc<Mutex<PF>>,
|
||||
_phantom: PhantomData<Block>,
|
||||
inherent_data_providers: InherentDataProviders,
|
||||
collator_network: Arc<dyn CollatorNetwork>,
|
||||
}
|
||||
|
||||
impl<Block: BlockT, PF: Environment<Block>> Collator<Block, PF> {
|
||||
/// Create a new instance.
|
||||
fn new(
|
||||
proposer_factory: PF,
|
||||
inherent_data_providers: InherentDataProviders,
|
||||
collator_network: Arc<dyn CollatorNetwork>,
|
||||
) -> Self {
|
||||
Self {
|
||||
proposer_factory: Arc::new(Mutex::new(proposer_factory)),
|
||||
inherent_data_providers,
|
||||
_phantom: PhantomData,
|
||||
collator_network,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<Block, PF> Clone for Collator<Block, PF> {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
proposer_factory: self.proposer_factory.clone(),
|
||||
inherent_data_providers: self.inherent_data_providers.clone(),
|
||||
_phantom: PhantomData,
|
||||
collator_network: self.collator_network.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<Block, PF> ParachainContext for Collator<Block, PF> where
|
||||
Block: BlockT,
|
||||
PF: Environment<Block> + 'static + Send + Sync,
|
||||
PF::Error: std::fmt::Debug,
|
||||
PF::Proposer: Send + Sync,
|
||||
<PF::Proposer as Proposer<Block>>::Create: Unpin + Send + Sync,
|
||||
{
|
||||
type ProduceCandidate = Box<
|
||||
dyn Future<Item=(BlockData, parachain::HeadData, OutgoingMessages), Error=InvalidHead>
|
||||
+ Send + Sync
|
||||
>;
|
||||
|
||||
fn produce_candidate<I: IntoIterator<Item=(ParaId, Message)>>(
|
||||
&self,
|
||||
_relay_chain_parent: Hash,
|
||||
status: ParachainStatus,
|
||||
_: I,
|
||||
) -> Self::ProduceCandidate {
|
||||
let factory = self.proposer_factory.clone();
|
||||
let inherent_providers = self.inherent_data_providers.clone();
|
||||
|
||||
let res = HeadData::<Block>::decode(&mut &status.head_data.0[..])
|
||||
.map_err(|_| InvalidHead)
|
||||
.into_future()
|
||||
.and_then(move |last_head|
|
||||
factory.lock()
|
||||
.init(&last_head.header)
|
||||
.map_err(|e| {
|
||||
//TODO: Do we want to return the real error?
|
||||
error!("Could not create proposer: {:?}", e);
|
||||
InvalidHead
|
||||
})
|
||||
)
|
||||
.and_then(move |proposer|
|
||||
inherent_providers.create_inherent_data()
|
||||
.map(|id| (proposer, id))
|
||||
.map_err(|e| {
|
||||
error!("Failed to create inherent data: {:?}", e);
|
||||
InvalidHead
|
||||
})
|
||||
)
|
||||
.and_then(|(mut proposer, inherent_data)| {
|
||||
proposer.propose(
|
||||
inherent_data,
|
||||
Default::default(),
|
||||
//TODO: Fix this.
|
||||
Duration::from_secs(6),
|
||||
)
|
||||
.map_err(|e| {
|
||||
error!("Proposing failed: {:?}", e);
|
||||
InvalidHead
|
||||
})
|
||||
.compat()
|
||||
})
|
||||
.map(|b| {
|
||||
let block_data = BlockData(b.encode());
|
||||
let head_data = HeadData::<Block> { header: b.deconstruct().0 };
|
||||
let messages = OutgoingMessages { outgoing_messages: Vec::new() };
|
||||
|
||||
(block_data, parachain::HeadData(head_data.encode()), messages)
|
||||
});
|
||||
|
||||
Box::new(res)
|
||||
}
|
||||
}
|
||||
|
||||
/// Implements `BuildParachainContext` to build a collator instance.
|
||||
struct CollatorBuilder<Block, PF> {
|
||||
inherent_data_providers: InherentDataProviders,
|
||||
proposer_factory: PF,
|
||||
_phantom: PhantomData<Block>,
|
||||
}
|
||||
|
||||
impl<Block, PF> CollatorBuilder<Block, PF> {
|
||||
/// Create a new instance of self.
|
||||
fn new(proposer_factory: PF, inherent_data_providers: InherentDataProviders) -> Self {
|
||||
Self {
|
||||
inherent_data_providers,
|
||||
proposer_factory,
|
||||
_phantom: Default::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<Block, PF> BuildParachainContext for CollatorBuilder<Block, PF> where
|
||||
Block: BlockT,
|
||||
PF: Environment<Block> + 'static + Send + Sync,
|
||||
PF::Error: std::fmt::Debug,
|
||||
PF::Proposer: Send + Sync,
|
||||
<PF::Proposer as Proposer<Block>>::Create: Unpin + Send + Sync,
|
||||
{
|
||||
type ParachainContext = Collator<Block, PF>;
|
||||
|
||||
fn build(self, network: Arc<dyn CollatorNetwork>) -> Result<Self::ParachainContext, ()> {
|
||||
Ok(Collator::new(self.proposer_factory, self.inherent_data_providers, network))
|
||||
}
|
||||
}
|
||||
|
||||
/// Run a collator with the given proposer factory.
|
||||
pub fn run_collator<Block, PF, E, I>(
|
||||
proposer_factory: PF,
|
||||
inherent_data_providers: InherentDataProviders,
|
||||
para_id: ParaId,
|
||||
exit: E,
|
||||
key: Arc<CollatorPair>,
|
||||
version: VersionInfo,
|
||||
) -> Result<(), cli::error::Error>
|
||||
where
|
||||
Block: BlockT,
|
||||
PF: Environment<Block> + 'static + Send + Sync,
|
||||
PF::Error: std::fmt::Debug,
|
||||
PF::Proposer: Send + Sync,
|
||||
<PF::Proposer as Proposer<Block>>::Create: Unpin + Send + Sync,
|
||||
E: IntoFuture<Item=(), Error=()>,
|
||||
E::Future: Send + Clone + Sync + 'static,
|
||||
{
|
||||
let builder = CollatorBuilder::new(proposer_factory, inherent_data_providers);
|
||||
polkadot_collator::run_collator(builder, para_id, exit, key, version)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::time::Duration;
|
||||
|
||||
use polkadot_collator::{collate, RelayChainContext, PeerId, CollatorId, SignedStatement};
|
||||
use polkadot_primitives::parachain::{ConsolidatedIngress, HeadData, FeeSchedule};
|
||||
|
||||
use keyring::Sr25519Keyring;
|
||||
use sr_primitives::traits::{DigestFor, Header as HeaderT};
|
||||
use inherents::InherentData;
|
||||
|
||||
use test_runtime::{Block, Header};
|
||||
|
||||
use futures03::future;
|
||||
use futures::Stream;
|
||||
|
||||
#[derive(Debug)]
|
||||
struct Error;
|
||||
|
||||
impl From<consensus_common::Error> for Error {
|
||||
fn from(_: consensus_common::Error) -> Self {
|
||||
unimplemented!("Not required in tests")
|
||||
}
|
||||
}
|
||||
|
||||
struct DummyFactory;
|
||||
|
||||
impl Environment<Block> for DummyFactory {
|
||||
type Proposer = DummyProposer;
|
||||
type Error = Error;
|
||||
|
||||
fn init(&mut self, _: &Header) -> Result<Self::Proposer, Self::Error> {
|
||||
Ok(DummyProposer)
|
||||
}
|
||||
}
|
||||
|
||||
struct DummyProposer;
|
||||
|
||||
impl Proposer<Block> for DummyProposer {
|
||||
type Error = Error;
|
||||
type Create = future::Ready<Result<Block, Error>>;
|
||||
|
||||
fn propose(
|
||||
&mut self,
|
||||
_: InherentData,
|
||||
digest : DigestFor<Block>,
|
||||
_: Duration,
|
||||
) -> Self::Create {
|
||||
let header = Header::new(
|
||||
1337,
|
||||
Default::default(),
|
||||
Default::default(),
|
||||
Default::default(),
|
||||
digest,
|
||||
);
|
||||
|
||||
future::ready(Ok(Block::new(header, Vec::new())))
|
||||
}
|
||||
}
|
||||
|
||||
struct DummyCollatorNetwork;
|
||||
|
||||
impl CollatorNetwork for DummyCollatorNetwork {
|
||||
fn collator_id_to_peer_id(&self, _: CollatorId) ->
|
||||
Box<dyn Future<Item=Option<PeerId>, Error=()> + Send>
|
||||
{
|
||||
unimplemented!("Not required in tests")
|
||||
}
|
||||
|
||||
fn checked_statements(&self, _: Hash) ->
|
||||
Box<dyn Stream<Item=SignedStatement, Error=()>>
|
||||
{
|
||||
unimplemented!("Not required in tests")
|
||||
}
|
||||
}
|
||||
|
||||
struct DummyRelayChainContext;
|
||||
|
||||
impl RelayChainContext for DummyRelayChainContext {
|
||||
type Error = Error;
|
||||
type FutureEgress = Result<ConsolidatedIngress, Self::Error>;
|
||||
|
||||
fn unrouted_egress(&self, _id: ParaId) -> Self::FutureEgress {
|
||||
Ok(ConsolidatedIngress(Vec::new()))
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn collates_produces_a_block() {
|
||||
let builder = CollatorBuilder::new(DummyFactory, InherentDataProviders::new());
|
||||
let context = builder.build(Arc::new(DummyCollatorNetwork)).expect("Creates parachain context");
|
||||
|
||||
let id = ParaId::from(100);
|
||||
let header = Header::new(
|
||||
0,
|
||||
Default::default(),
|
||||
Default::default(),
|
||||
Default::default(),
|
||||
Default::default(),
|
||||
);
|
||||
|
||||
let collation = collate(
|
||||
Default::default(),
|
||||
id,
|
||||
ParachainStatus {
|
||||
head_data: HeadData(header.encode()),
|
||||
balance: 10,
|
||||
fee_schedule: FeeSchedule {
|
||||
base: 0,
|
||||
per_byte: 1,
|
||||
},
|
||||
},
|
||||
DummyRelayChainContext,
|
||||
context,
|
||||
Arc::new(Sr25519Keyring::Alice.pair().into()),
|
||||
).wait().unwrap().0;
|
||||
|
||||
let block_data = collation.pov.block_data;
|
||||
|
||||
let block = Block::decode(&mut &block_data.0[..]).expect("Is a valid block");
|
||||
|
||||
assert_eq!(1337, *block.header().number());
|
||||
}
|
||||
}
|
||||
@@ -17,7 +17,7 @@ polkadot-primitives = { git = "https://github.com/paritytech/polkadot", branch =
|
||||
polkadot-runtime = { git = "https://github.com/paritytech/polkadot", branch = "bkchr-cumulus-branch" }
|
||||
|
||||
# other deps
|
||||
futures = "0.1.21"
|
||||
tokio = "0.1.8"
|
||||
parity-codec = "3.5"
|
||||
futures = { package = "futures-preview", version = "0.3.0-alpha.19", features = ["compat"] }
|
||||
tokio = "0.1.22"
|
||||
codec = { package = "parity-scale-codec", version = "1.0.5", features = [ "derive" ] }
|
||||
log = "0.4"
|
||||
|
||||
+30
-85
@@ -14,7 +14,7 @@
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Cumulus. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use substrate_client::{backend::Backend, CallExecutor, Client, BlockchainEvents};
|
||||
use substrate_client::{backend::{Backend, Finalizer}, CallExecutor, Client, BlockchainEvents};
|
||||
use substrate_client::error::{Error as ClientError, Result as ClientResult};
|
||||
use substrate_primitives::{Blake2Hasher, H256};
|
||||
use sr_primitives::generic::BlockId;
|
||||
@@ -22,8 +22,8 @@ use sr_primitives::traits::{Block as BlockT, Header as HeaderT, ProvideRuntimeAp
|
||||
use polkadot_primitives::{Hash as PHash, Block as PBlock};
|
||||
use polkadot_primitives::parachain::{Id as ParaId, ParachainHost};
|
||||
|
||||
use futures::{prelude::*, stream};
|
||||
use parity_codec::{Encode, Decode};
|
||||
use futures::{Stream, StreamExt, TryStreamExt, future, Future, TryFutureExt, FutureExt};
|
||||
use codec::{Encode, Decode};
|
||||
use log::warn;
|
||||
|
||||
use std::sync::Arc;
|
||||
@@ -33,10 +33,6 @@ pub trait LocalClient {
|
||||
/// The block type of the local client.
|
||||
type Block: BlockT;
|
||||
|
||||
/// Mark the given block as the best block.
|
||||
/// Returns `false` if the block is not known.
|
||||
fn mark_best(&self, hash: <Self::Block as BlockT>::Hash) -> ClientResult<bool>;
|
||||
|
||||
/// Finalize the given block.
|
||||
/// Returns `false` if the block is not known.
|
||||
fn finalize(&self, hash: <Self::Block as BlockT>::Hash) -> ClientResult<bool>;
|
||||
@@ -44,11 +40,9 @@ pub trait LocalClient {
|
||||
|
||||
/// Errors that can occur while following the polkadot relay-chain.
|
||||
#[derive(Debug)]
|
||||
pub enum Error<P> {
|
||||
pub enum Error {
|
||||
/// An underlying client error.
|
||||
Client(ClientError),
|
||||
/// Polkadot client error.
|
||||
Polkadot(P),
|
||||
/// Head data returned was not for our parachain.
|
||||
InvalidHeadData,
|
||||
}
|
||||
@@ -67,60 +61,41 @@ pub trait PolkadotClient: Clone {
|
||||
/// The error type for interacting with the Polkadot client.
|
||||
type Error: std::fmt::Debug + Send;
|
||||
|
||||
/// A stream that yields updates to the parachain head.
|
||||
type HeadUpdates: Stream<Item=HeadUpdate, Error=Self::Error> + Send;
|
||||
/// A stream that yields finalized head-data for a certain parachain.
|
||||
type Finalized: Stream<Item=Vec<u8>, Error=Self::Error> + Send;
|
||||
type Finalized: Stream<Item = Vec<u8>> + Send;
|
||||
|
||||
/// Get a stream of head updates.
|
||||
fn head_updates(&self, para_id: ParaId) -> Self::HeadUpdates;
|
||||
/// Get a stream of finalized heads.
|
||||
fn finalized_heads(&self, para_id: ParaId) -> Self::Finalized;
|
||||
fn finalized_heads(&self, para_id: ParaId) -> ClientResult<Self::Finalized>;
|
||||
}
|
||||
|
||||
/// Spawns a future that follows the Polkadot relay chain for the given parachain.
|
||||
pub fn follow_polkadot<'a, L: 'a, P: 'a>(para_id: ParaId, local: Arc<L>, polkadot: P)
|
||||
-> impl Future<Item=(),Error=()> + Send + 'a
|
||||
-> ClientResult<impl Future<Output = ()> + Send + 'a>
|
||||
where
|
||||
L: LocalClient + Send + Sync,
|
||||
P: PolkadotClient + Send + Sync,
|
||||
{
|
||||
let head_updates = polkadot.head_updates(para_id);
|
||||
let finalized_heads = polkadot.finalized_heads(para_id);
|
||||
|
||||
let follow_best = {
|
||||
let local = local.clone();
|
||||
|
||||
head_updates
|
||||
.map_err(Error::Polkadot)
|
||||
.and_then(|update| -> Result<Option<<L::Block as BlockT>::Header>, _> {
|
||||
Decode::decode(&mut &update.head_data[..]).ok_or_else(|| Error::InvalidHeadData)
|
||||
})
|
||||
.filter_map(|h| h)
|
||||
.for_each(move |p_head| {
|
||||
let _synced = local.mark_best(p_head.hash()).map_err(Error::Client)?;
|
||||
Ok(())
|
||||
})
|
||||
};
|
||||
let finalized_heads = polkadot.finalized_heads(para_id)?;
|
||||
|
||||
let follow_finalized = {
|
||||
let local = local.clone();
|
||||
|
||||
finalized_heads
|
||||
.map_err(Error::Polkadot)
|
||||
.and_then(|head_data| -> Result<Option<<L::Block as BlockT>::Header>, _> {
|
||||
Decode::decode(&mut &head_data[..]).ok_or_else(|| Error::InvalidHeadData)
|
||||
.map(|head_data| {
|
||||
<Option<<L::Block as BlockT>::Header>>::decode(&mut &head_data[..])
|
||||
.map_err(|_| Error::InvalidHeadData)
|
||||
})
|
||||
.filter_map(|h| h)
|
||||
.for_each(move |p_head| {
|
||||
let _synced = local.finalize(p_head.hash()).map_err(Error::Client)?;
|
||||
Ok(())
|
||||
.try_filter_map(|h| future::ready(Ok(h)))
|
||||
.try_for_each(move |p_head| {
|
||||
future::ready(local.finalize(p_head.hash()).map_err(Error::Client).map(|_| ()))
|
||||
})
|
||||
};
|
||||
|
||||
follow_best.join(follow_finalized)
|
||||
.map_err(|e| warn!("Could not follow relay-chain: {:?}", e))
|
||||
.map(|((), ())| ())
|
||||
Ok(
|
||||
follow_finalized
|
||||
.map_err(|e| warn!("Could not follow relay-chain: {:?}", e))
|
||||
.map(|_| ())
|
||||
)
|
||||
}
|
||||
|
||||
impl<B, E, Block, RA> LocalClient for Client<B, E, Block, RA> where
|
||||
@@ -130,16 +105,6 @@ impl<B, E, Block, RA> LocalClient for Client<B, E, Block, RA> where
|
||||
{
|
||||
type Block = Block;
|
||||
|
||||
fn mark_best(&self, hash: <Self::Block as BlockT>::Hash) -> ClientResult<bool> {
|
||||
match self.set_head(BlockId::hash(hash)) {
|
||||
Ok(()) => Ok(true),
|
||||
Err(e) => match e {
|
||||
ClientError::UnknownBlock(_) => Ok(false),
|
||||
_ => Err(e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn finalize(&self, hash: <Self::Block as BlockT>::Hash) -> ClientResult<bool> {
|
||||
match self.finalize_block(BlockId::hash(hash), None, true) {
|
||||
Ok(()) => Ok(true),
|
||||
@@ -168,41 +133,21 @@ impl<B, E, RA> PolkadotClient for Arc<Client<B, E, PBlock, RA>> where
|
||||
{
|
||||
type Error = ClientError;
|
||||
|
||||
type HeadUpdates = Box<dyn Stream<Item=HeadUpdate, Error=Self::Error> + Send>;
|
||||
type Finalized = Box<dyn Stream<Item=Vec<u8>, Error=Self::Error> + Send>;
|
||||
type Finalized = Box<dyn Stream<Item=Vec<u8>> + Send + Unpin>;
|
||||
|
||||
fn head_updates(&self, para_id: ParaId) -> Self::HeadUpdates {
|
||||
let parachain_key = parachain_key(para_id);
|
||||
let stream = stream::once(self.storage_changes_notification_stream(Some(&[parachain_key.clone()]), None))
|
||||
.map(|s| s.map_err(|()| panic!("unbounded receivers never yield errors; qed")))
|
||||
.flatten();
|
||||
|
||||
let s = stream.filter_map(move |(hash, changes)| {
|
||||
let head_data = changes.iter()
|
||||
.filter_map(|(_, k, v)| if k == ¶chain_key { Some(v) } else { None })
|
||||
.next();
|
||||
|
||||
match head_data {
|
||||
Some(Some(head_data)) => Some(HeadUpdate {
|
||||
relay_hash: hash,
|
||||
head_data: head_data.0.clone(),
|
||||
}),
|
||||
Some(None) | None => None,
|
||||
}
|
||||
});
|
||||
|
||||
Box::new(s)
|
||||
}
|
||||
|
||||
fn finalized_heads(&self, para_id: ParaId) -> Self::Finalized {
|
||||
fn finalized_heads(&self, para_id: ParaId) -> ClientResult<Self::Finalized> {
|
||||
let polkadot = self.clone();
|
||||
let parachain_key = parachain_key(para_id);
|
||||
|
||||
let s = self.finality_notification_stream()
|
||||
.map_err(|()| panic!("unbounded receivers never yield errors; qed"))
|
||||
.and_then(move |n| polkadot.storage(&BlockId::hash(n.hash), ¶chain_key))
|
||||
.filter_map(|d| d.map(|d| d.0));
|
||||
.filter_map(move |n|
|
||||
future::ready(
|
||||
polkadot.storage(&BlockId::hash(n.hash), ¶chain_key)
|
||||
.ok()
|
||||
.and_then(|d| d.map(|d| d.0)),
|
||||
),
|
||||
);
|
||||
|
||||
Box::new(s)
|
||||
Ok(Box::new(s))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
+8
-13
@@ -5,6 +5,13 @@ authors = ["Parity Technologies <admin@parity.io>"]
|
||||
edition = "2018"
|
||||
|
||||
[dependencies]
|
||||
# Other dependencies
|
||||
codec = { package = "parity-scale-codec", version = "1.0.5", default-features = false, features = [ "derive" ] }
|
||||
memory-db = { version = "0.15.2", default-features = false }
|
||||
hash-db = { version = "0.15.2", default-features = false }
|
||||
trie-db = { version = "0.15.2", default-features = false }
|
||||
hashbrown = "0.6.1"
|
||||
|
||||
# Substrate dependencies
|
||||
rstd = { package = "sr-std", git = "https://github.com/paritytech/substrate", default-features = false, branch = "bkchr-cumulus-branch" }
|
||||
runtime-primitives = { package = "sr-primitives", git = "https://github.com/paritytech/substrate", default-features = false, branch = "bkchr-cumulus-branch" }
|
||||
@@ -14,14 +21,7 @@ executive = { package = "srml-executive", git = "https://github.com/paritytech/s
|
||||
substrate-trie = { git = "https://github.com/paritytech/substrate", default-features = false, branch = "bkchr-cumulus-branch" }
|
||||
|
||||
# Polkadot dependencies
|
||||
parachain = { package = "polkadot-parachain", git = "https://github.com/paritytech/polkadot", default-features = false, branch = "bkchr-cumulus-branch" }
|
||||
|
||||
# Other deps
|
||||
codec = { package = "parity-codec", version = "3.5.1", default-features = false, features = [ "derive" ] }
|
||||
memory-db = { version = "0.12.2", default-features = false }
|
||||
hash-db = { version = "0.12.2", default-features = false }
|
||||
trie-db = { version = "0.12.2", default-features = false }
|
||||
hashbrown = "0.5.0"
|
||||
parachain = { package = "polkadot-parachain", git = "https://github.com/paritytech/polkadot", branch = "bkchr-cumulus-branch", default-features = false, features = [ "wasm-api" ] }
|
||||
|
||||
[dev-dependencies]
|
||||
keyring = { package = "substrate-keyring", git = "https://github.com/paritytech/substrate", branch = "bkchr-cumulus-branch" }
|
||||
@@ -44,8 +44,3 @@ std = [
|
||||
"substrate-trie/std",
|
||||
"parachain/std",
|
||||
]
|
||||
no_std = [
|
||||
"hashbrown/nightly",
|
||||
"rio/wasm-nice-panic-message",
|
||||
"parachain/wasm-api",
|
||||
]
|
||||
|
||||
@@ -17,16 +17,15 @@
|
||||
//! The actual implementation of the validate block functionality.
|
||||
|
||||
use crate::WitnessData;
|
||||
use runtime_primitives::traits::{
|
||||
Block as BlockT, Header as HeaderT, Hash as HashT
|
||||
};
|
||||
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT};
|
||||
use executive::ExecuteBlock;
|
||||
use primitives::{Blake2Hasher, H256};
|
||||
|
||||
use substrate_trie::{MemoryDB, read_trie_value, delta_trie_root};
|
||||
use substrate_trie::{MemoryDB, read_trie_value, delta_trie_root, Layout};
|
||||
|
||||
use rstd::{slice, ptr, cmp, vec::Vec, boxed::Box, mem};
|
||||
|
||||
use hash_db::HashDB;
|
||||
use hash_db::{HashDB, EMPTY_PREFIX};
|
||||
|
||||
use parachain::ValidationParams;
|
||||
|
||||
@@ -36,9 +35,6 @@ const STORAGE_SET_EXPECT: &str =
|
||||
"`STORAGE` needs to be set before calling this function.";
|
||||
const STORAGE_ROOT_LEN: usize = 32;
|
||||
|
||||
/// Extract the hashing algorithm type from the given block type.
|
||||
type HashingOf<B> = <<B as BlockT>::Header as HeaderT>::Hashing;
|
||||
|
||||
/// Abstract the storage into a trait without `Block` generic.
|
||||
trait Storage {
|
||||
/// Retrieve the value for the given key.
|
||||
@@ -56,7 +52,7 @@ trait Storage {
|
||||
|
||||
/// Validate a given parachain block on a validator.
|
||||
#[doc(hidden)]
|
||||
pub fn validate_block<B: BlockT, E: ExecuteBlock<B>>(
|
||||
pub fn validate_block<B: BlockT<Hash = H256>, E: ExecuteBlock<B>>(
|
||||
params: ValidationParams,
|
||||
) {
|
||||
use codec::Decode;
|
||||
@@ -94,12 +90,12 @@ pub fn validate_block<B: BlockT, E: ExecuteBlock<B>>(
|
||||
/// The storage implementation used when validating a block that is using the
|
||||
/// witness data as source.
|
||||
struct WitnessStorage<B: BlockT> {
|
||||
witness_data: MemoryDB<<HashingOf<B> as HashT>::Hasher>,
|
||||
witness_data: MemoryDB<Blake2Hasher>,
|
||||
overlay: hashbrown::HashMap<Vec<u8>, Option<Vec<u8>>>,
|
||||
storage_root: B::Hash,
|
||||
}
|
||||
|
||||
impl<B: BlockT> WitnessStorage<B> {
|
||||
impl<B: BlockT<Hash = H256>> WitnessStorage<B> {
|
||||
/// Initialize from the given witness data and storage root.
|
||||
///
|
||||
/// Returns an error if given storage root was not found in the witness data.
|
||||
@@ -108,9 +104,9 @@ impl<B: BlockT> WitnessStorage<B> {
|
||||
storage_root: B::Hash,
|
||||
) -> Result<Self, &'static str> {
|
||||
let mut db = MemoryDB::default();
|
||||
data.into_iter().for_each(|i| { db.insert(&[], &i); });
|
||||
data.into_iter().for_each(|i| { db.insert(EMPTY_PREFIX, &i); });
|
||||
|
||||
if !db.contains(&storage_root, &[]) {
|
||||
if !db.contains(&storage_root, EMPTY_PREFIX) {
|
||||
return Err("Witness data does not contain given storage root.")
|
||||
}
|
||||
|
||||
@@ -122,10 +118,10 @@ impl<B: BlockT> WitnessStorage<B> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<B: BlockT> Storage for WitnessStorage<B> {
|
||||
impl<B: BlockT<Hash = H256>> Storage for WitnessStorage<B> {
|
||||
fn get(&self, key: &[u8]) -> Option<Vec<u8>> {
|
||||
self.overlay.get(key).cloned().or_else(|| {
|
||||
read_trie_value(
|
||||
read_trie_value::<Layout<Blake2Hasher>, _>(
|
||||
&self.witness_data,
|
||||
&self.storage_root,
|
||||
key,
|
||||
@@ -142,7 +138,7 @@ impl<B: BlockT> Storage for WitnessStorage<B> {
|
||||
}
|
||||
|
||||
fn storage_root(&mut self) -> [u8; STORAGE_ROOT_LEN] {
|
||||
let root = match delta_trie_root(
|
||||
let root = match delta_trie_root::<Layout<Blake2Hasher>, _, _, _, _>(
|
||||
&mut self.witness_data,
|
||||
self.storage_root.clone(),
|
||||
self.overlay.drain()
|
||||
@@ -234,4 +230,4 @@ unsafe fn ext_storage_root(result: *mut u8) {
|
||||
let res = STORAGE.as_mut().expect(STORAGE_SET_EXPECT).storage_root();
|
||||
let result = slice::from_raw_parts_mut(result, STORAGE_ROOT_LEN);
|
||||
result.copy_from_slice(&res);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,3 +9,6 @@ test-client = { package = "substrate-test-client", git = "https://github.com/par
|
||||
runtime = { package = "cumulus-test-runtime", path = "../runtime" }
|
||||
runtime_primitives = { package = "sr-primitives", git = "https://github.com/paritytech/substrate", branch = "bkchr-cumulus-branch" }
|
||||
primitives = { package = "substrate-primitives", git = "https://github.com/paritytech/substrate", branch = "bkchr-cumulus-branch" }
|
||||
keyring = { package = "substrate-keyring", git = "https://github.com/paritytech/substrate", branch = "bkchr-cumulus-branch" }
|
||||
codec = { package = "parity-scale-codec", version = "1.0.5", default-features = false, features = [ "derive" ] }
|
||||
|
||||
|
||||
+31
-18
@@ -20,7 +20,8 @@ pub use test_client::*;
|
||||
pub use runtime;
|
||||
use runtime::{Block, genesismap::{GenesisConfig, additional_storage_with_genesis}};
|
||||
use runtime_primitives::traits::{Hash as HashT, Header as HeaderT, Block as BlockT};
|
||||
use primitives::storage::well_known_keys;
|
||||
use primitives::{storage::well_known_keys, sr25519};
|
||||
use keyring::{Sr25519Keyring, AccountKeyring};
|
||||
|
||||
mod local_executor {
|
||||
use test_client::executor::native_executor_instance;
|
||||
@@ -28,7 +29,6 @@ mod local_executor {
|
||||
pub LocalExecutor,
|
||||
runtime::api::dispatch,
|
||||
runtime::native_version,
|
||||
runtime::WASM_BINARY
|
||||
);
|
||||
}
|
||||
|
||||
@@ -58,16 +58,23 @@ pub struct GenesisParameters {
|
||||
|
||||
impl test_client::GenesisInit for GenesisParameters {
|
||||
fn genesis_storage(&self) -> (StorageOverlay, ChildrenStorageOverlay) {
|
||||
use codec::Encode;
|
||||
let mut storage = genesis_config(self.support_changes_trie).genesis_map();
|
||||
storage.insert(well_known_keys::CODE.to_vec(), runtime::WASM_BINARY.to_vec());
|
||||
storage.0.insert(well_known_keys::CODE.to_vec(), runtime::WASM_BINARY.to_vec());
|
||||
|
||||
let state_root = <<<Block as BlockT>::Header as HeaderT>::Hashing as HashT>::trie_root(
|
||||
storage.clone().into_iter()
|
||||
let child_roots = storage.1.iter().map(|(sk, child_map)| {
|
||||
let state_root = <<<runtime::Block as BlockT>::Header as HeaderT>::Hashing as HashT>::trie_root(
|
||||
child_map.clone().into_iter().collect()
|
||||
);
|
||||
(sk.clone(), state_root.encode())
|
||||
});
|
||||
let state_root = <<<runtime::Block as BlockT>::Header as HeaderT>::Hashing as HashT>::trie_root(
|
||||
storage.0.clone().into_iter().chain(child_roots).collect()
|
||||
);
|
||||
let block: runtime::Block = client::genesis::construct_genesis_block(state_root);
|
||||
storage.extend(additional_storage_with_genesis(&block));
|
||||
storage.0.extend(additional_storage_with_genesis(&block));
|
||||
|
||||
(storage, Default::default())
|
||||
storage
|
||||
}
|
||||
}
|
||||
|
||||
@@ -109,15 +116,21 @@ impl DefaultTestClientBuilderExt for TestClientBuilder {
|
||||
}
|
||||
|
||||
fn genesis_config(support_changes_trie: bool) -> GenesisConfig {
|
||||
GenesisConfig::new(support_changes_trie, vec![
|
||||
AuthorityKeyring::Alice.into(),
|
||||
AuthorityKeyring::Bob.into(),
|
||||
AuthorityKeyring::Charlie.into(),
|
||||
], vec![
|
||||
AccountKeyring::Alice.into(),
|
||||
AccountKeyring::Bob.into(),
|
||||
AccountKeyring::Charlie.into(),
|
||||
],
|
||||
1000
|
||||
GenesisConfig::new(
|
||||
support_changes_trie,
|
||||
vec![
|
||||
sr25519::Public::from(Sr25519Keyring::Alice).into(),
|
||||
sr25519::Public::from(Sr25519Keyring::Bob).into(),
|
||||
sr25519::Public::from(Sr25519Keyring::Charlie).into(),
|
||||
],
|
||||
vec![
|
||||
AccountKeyring::Alice.into(),
|
||||
AccountKeyring::Bob.into(),
|
||||
AccountKeyring::Charlie.into(),
|
||||
],
|
||||
1000,
|
||||
None,
|
||||
Default::default(),
|
||||
Default::default(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,7 +10,7 @@ runtime = { package = "cumulus-runtime", path = "../../runtime", default-feature
|
||||
substrate-test-runtime = { git = "https://github.com/paritytech/substrate", default-features = false, branch = "bkchr-cumulus-branch" }
|
||||
|
||||
[build-dependencies]
|
||||
wasm-builder-runner = { git = "https://github.com/paritytech/substrate", branch = "bkchr-cumulus-branch" }
|
||||
wasm-builder-runner = { package = "substrate-wasm-builder-runner", version = " 1.0.2" }
|
||||
|
||||
[features]
|
||||
default = ["std"]
|
||||
@@ -18,7 +18,3 @@ std = [
|
||||
"runtime/std",
|
||||
"substrate-test-runtime/std",
|
||||
]
|
||||
no_std = [
|
||||
"runtime/no_std",
|
||||
"substrate-test-runtime/no_std",
|
||||
]
|
||||
|
||||
@@ -17,11 +17,5 @@
|
||||
use wasm_builder_runner::{build_current_project, WasmBuilderSource};
|
||||
|
||||
fn main() {
|
||||
build_current_project(
|
||||
"wasm_binary.rs",
|
||||
WasmBuilderSource::Git {
|
||||
repo: "https://github.com/paritytech/substrate",
|
||||
rev: "c7fa536d85df5d6a0fc5cdc3f82b6e5a1a2db640",
|
||||
}
|
||||
);
|
||||
}
|
||||
build_current_project("wasm_binary.rs", WasmBuilderSource::Crates("1.0.7"));
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user