Make everything compile and fix tests

This commit is contained in:
Bastian Köcher
2020-01-14 22:22:21 +01:00
parent 1418b842aa
commit 828447d49c
26 changed files with 5214 additions and 3536 deletions
+272 -196
View File
@@ -18,34 +18,32 @@
use cumulus_runtime::ParachainBlockData;
use sr_primitives::traits::{Block as BlockT, Header as HeaderT};
use consensus_common::{
BlockImport, Environment, Proposer, ForkChoiceStrategy, BlockImportParams, BlockOrigin,
Error as ConsensusError,
use sp_consensus::{
BlockImport, BlockImportParams, BlockOrigin, Environment, Error as ConsensusError,
ForkChoiceStrategy, Proposal, Proposer, RecordProof,
};
use inherents::InherentDataProviders;
use substrate_primitives::Blake2Hasher;
use sp_inherents::InherentDataProviders;
use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
use polkadot_collator::{
InvalidHead, ParachainContext, BuildParachainContext, Network as CollatorNetwork, VersionInfo,
TaskExecutor, PolkadotClient,
BuildParachainContext, InvalidHead, Network as CollatorNetwork, ParachainContext,
PolkadotClient,
};
use polkadot_primitives::{
Hash as PHash, Block as PBlock,
parachain::{
self, BlockData, Message, Id as ParaId, OutgoingMessages, Status as ParachainStatus,
CollatorPair,
}
self, BlockData, CollatorPair, Id as ParaId, Message, OutgoingMessages,
Status as ParachainStatus,
},
Block as PBlock, Hash as PHash,
};
use codec::{Decode, Encode};
use log::{error, trace};
use futures03::{TryFutureExt, future};
use futures::{Future, future::IntoFuture};
use futures::{future, task::Spawn, Future, FutureExt, TryFutureExt};
use std::{sync::Arc, marker::PhantomData, time::Duration, fmt::Debug};
use std::{fmt::Debug, marker::PhantomData, sync::Arc, time::Duration};
use parking_lot::Mutex;
@@ -95,17 +93,25 @@ impl<Block, PF, BI> Clone for Collator<Block, PF, BI> {
}
impl<Block, PF, BI> ParachainContext for Collator<Block, PF, BI>
where
Block: BlockT,
PF: Environment<Block> + 'static + Send,
BI: BlockImport<Block, Error=ConsensusError> + Send + Sync + 'static,
where
Block: BlockT,
PF: Environment<Block> + 'static + Send,
PF::Proposer: Send,
BI: BlockImport<
Block,
Error = ConsensusError,
Transaction = <PF::Proposer as Proposer<Block>>::Transaction,
> + Send
+ Sync
+ 'static,
{
type ProduceCandidate = Box<
dyn Future<Item=(BlockData, parachain::HeadData, OutgoingMessages), Error=InvalidHead>
dyn Future<Output = Result<(BlockData, parachain::HeadData, OutgoingMessages), InvalidHead>>
+ Send
+ Unpin,
>;
fn produce_candidate<I: IntoIterator<Item=(ParaId, Message)>>(
fn produce_candidate<I: IntoIterator<Item = (ParaId, Message)>>(
&mut self,
_relay_chain_parent: PHash,
status: ParachainStatus,
@@ -117,124 +123,145 @@ impl<Block, PF, BI> ParachainContext for Collator<Block, PF, BI>
let inherent_providers = self.inherent_data_providers.clone();
let block_import = self.block_import.clone();
let res = HeadData::<Block>::decode(&mut &status.head_data.0[..])
.map_err(|e| {
let res = future::ready(
HeadData::<Block>::decode(&mut &status.head_data.0[..]).map_err(|e| {
error!(target: "cumulus-collator", "Could not decode the head data: {:?}", e);
InvalidHead
})
.into_future()
.and_then(move |last_head| {
let parent_state_root = *last_head.header.state_root();
}),
)
.and_then(move |last_head| {
let parent_state_root = *last_head.header.state_root();
factory.lock()
.init(&last_head.header)
.map_err(|e| {
error!(
target: "cumulus-collator",
"Could not create proposer: {:?}",
e,
);
InvalidHead
})
.and_then(|mut proposer| {
let inherent_data = inherent_providers.create_inherent_data()
.map_err(|e| {
error!(
target: "cumulus-collator",
"Failed to create inherent data: {:?}",
e,
);
InvalidHead
})?;
let future = proposer.propose(
inherent_data,
Default::default(),
//TODO: Fix this.
Duration::from_secs(6),
true,
)
let proposer_inherent_data = factory
.lock()
.init(&last_head.header)
.map_err(|e| {
error!(
target: "cumulus-collator",
"Could not create proposer: {:?}",
e,
);
InvalidHead
})
.and_then(|proposer| {
inherent_providers
.create_inherent_data()
.map_err(|e| {
error!(
target: "cumulus-collator",
"Proposing failed: {:?}",
target: "cumulus-collator",
"Failed to create inherent data: {:?}",
e,
);
InvalidHead
})
.and_then(move |(block, proof)| {
let res = match proof {
Some(proof) => {
let (header, extrinsics) = block.deconstruct();
.map(move |inherent_data| (proposer, inherent_data))
});
// Create the parachain block data for the validators.
Ok(
ParachainBlockData::<Block>::new(
header,
extrinsics,
proof,
parent_state_root,
)
)
}
None => {
error!(
target: "cumulus-collator",
"Proposer did not return the requested proof.",
);
Err(InvalidHead)
}
};
future::ready(proposer_inherent_data).and_then(move |(mut proposer, inherent_data)| {
proposer
.propose(
inherent_data,
Default::default(),
//TODO: Fix this.
Duration::from_secs(6),
RecordProof::Yes,
)
.map_err(|e| {
error!(
target: "cumulus-collator",
"Proposing failed: {:?}",
e,
);
InvalidHead
})
.and_then(move |proposal| {
let Proposal {
block,
storage_changes,
proof,
} = proposal;
let res = match proof {
Some(proof) => {
let (header, extrinsics) = block.deconstruct();
future::ready(res)
})
.compat();
// Create the parachain block data for the validators.
Ok((
ParachainBlockData::<Block>::new(
header,
extrinsics,
proof.iter_nodes().collect(),
parent_state_root,
),
storage_changes,
))
}
None => {
error!(
target: "cumulus-collator",
"Proposer did not return the requested proof.",
);
Err(InvalidHead)
}
};
Ok(future)
future::ready(res)
})
})
.flatten()
.and_then(move |b| {
let block_import_params = BlockImportParams {
origin: BlockOrigin::Own,
header: b.header().clone(),
justification: None,
post_digests: vec![],
body: Some(b.extrinsics().to_vec()),
finalized: false,
auxiliary: vec![], // block-weight is written in block import.
// TODO: block-import handles fork choice and this shouldn't even have the
// option to specify one.
// https://github.com/paritytech/substrate/issues/3623
fork_choice: ForkChoiceStrategy::LongestChain,
};
})
.and_then(move |(b, storage_changes)| {
let block_import_params = BlockImportParams {
origin: BlockOrigin::Own,
header: b.header().clone(),
justification: None,
post_digests: vec![],
body: Some(b.extrinsics().to_vec()),
finalized: false,
auxiliary: vec![], // block-weight is written in block import.
// TODO: block-import handles fork choice and this shouldn't even have the
// option to specify one.
// https://github.com/paritytech/substrate/issues/3623
fork_choice: ForkChoiceStrategy::LongestChain,
allow_missing_state: false,
import_existing: false,
storage_changes: Some(storage_changes),
};
if let Err(err) = block_import.lock().import_block(
block_import_params,
Default::default(),
) {
error!(
target: "cumulus-collator",
"Error importing build block (at {:?}): {:?}",
b.header().parent_hash(),
err,
);
Err(InvalidHead)
} else {
Ok(b)
}
})
.map(|b| {
let block_data = BlockData(b.encode());
let head_data = HeadData::<Block> { header: b.into_header() };
let messages = OutgoingMessages { outgoing_messages: Vec::new() };
let res = if let Err(err) = block_import
.lock()
.import_block(block_import_params, Default::default())
{
error!(
target: "cumulus-collator",
"Error importing build block (at {:?}): {:?}",
b.header().parent_hash(),
err,
);
Err(InvalidHead)
} else {
Ok(b)
};
(block_data, parachain::HeadData(head_data.encode()), messages)
})
.then(|r| {
trace!(target: "cumulus-collator", "Produced candidate: {:?}", r);
r
});
future::ready(res)
})
.map_ok(|b| {
let block_data = BlockData(b.encode());
let head_data = HeadData::<Block> {
header: b.into_header(),
};
let messages = OutgoingMessages {
outgoing_messages: Vec::new(),
};
(
block_data,
parachain::HeadData(head_data.encode()),
messages,
)
})
.then(|r| {
trace!(target: "cumulus-collator", "Produced candidate: {:?}", r);
future::ready(r)
});
Box::new(res)
}
@@ -256,24 +283,44 @@ impl<Block, SP> CollatorBuilder<Block, SP> {
}
}
impl<Block: BlockT, SP: SetupParachain<Block>> BuildParachainContext for CollatorBuilder<Block, SP> {
impl<Block: BlockT, SP: SetupParachain<Block>> BuildParachainContext for CollatorBuilder<Block, SP>
where
<SP::ProposerFactory as Environment<Block>>::Proposer: Send,
{
type ParachainContext = Collator<Block, SP::ProposerFactory, SP::BlockImport>;
fn build<B, E>(
fn build<B, E, R, Spawner, Extrinsic>(
self,
client: Arc<PolkadotClient<B, E>>,
task_executor: TaskExecutor,
client: Arc<PolkadotClient<B, E, R>>,
spawner: Spawner,
network: Arc<dyn CollatorNetwork>,
) -> Result<Self::ParachainContext, ()>
where
B: substrate_client::backend::Backend<PBlock, Blake2Hasher> + 'static,
E: substrate_client::CallExecutor<PBlock, Blake2Hasher> + Clone + Send + Sync + 'static
where
PolkadotClient<B, E, R>: sp_api::ProvideRuntimeApi<PBlock>,
<PolkadotClient<B, E, R> as sp_api::ProvideRuntimeApi<PBlock>>::Api:
polkadot_service::RuntimeApiCollection<Extrinsic>,
E: sc_client::CallExecutor<PBlock> + Clone + Send + Sync + 'static,
Spawner: Spawn + Clone + Send + Sync + 'static,
Extrinsic: codec::Codec + Send + Sync + 'static,
<<PolkadotClient<B, E, R> as sp_api::ProvideRuntimeApi<PBlock>>::Api as sp_api::ApiExt<
PBlock,
>>::StateBackend: sp_api::StateBackend<sp_core::Blake2Hasher>,
R: Send + Sync + 'static,
B: sc_client_api::Backend<PBlock> + 'static,
// Rust bug: https://github.com/rust-lang/rust/issues/24159
B::State: sp_api::StateBackend<sp_core::Blake2Hasher>,
{
let (proposer_factory, block_import, inherent_data_providers) = self.setup_parachain
.setup_parachain(client, task_executor)
let (proposer_factory, block_import, inherent_data_providers) = self
.setup_parachain
.setup_parachain(client, spawner)
.map_err(|e| error!("Error setting up the parachain: {}", e))?;
Ok(Collator::new(proposer_factory, inherent_data_providers, network, block_import))
Ok(Collator::new(
proposer_factory,
inherent_data_providers,
network,
block_import,
))
}
}
@@ -282,14 +329,32 @@ pub trait SetupParachain<Block: BlockT>: Send {
/// The proposer factory of the parachain to build blocks.
type ProposerFactory: Environment<Block> + Send + 'static;
/// The block import for importing the blocks build by the collator.
type BlockImport: BlockImport<Block, Error=ConsensusError> + Send + Sync + 'static;
type BlockImport: BlockImport<
Block,
Error = ConsensusError,
Transaction = <<Self::ProposerFactory as Environment<Block>>::Proposer as Proposer<
Block,
>>::Transaction,
> + Send
+ Sync
+ 'static;
/// Setup the parachain.
fn setup_parachain<P: cumulus_consensus::PolkadotClient>(
fn setup_parachain<P, SP>(
self,
polkadot_client: P,
task_executor: TaskExecutor,
) -> Result<(Self::ProposerFactory, Self::BlockImport, InherentDataProviders), String>;
spawner: SP,
) -> Result<
(
Self::ProposerFactory,
Self::BlockImport,
InherentDataProviders,
),
String,
>
where
P: cumulus_consensus::PolkadotClient,
SP: Spawn + Clone + Send + Sync + 'static;
}
/// Run a collator with the given proposer factory.
@@ -298,16 +363,16 @@ pub fn run_collator<Block, SP, E>(
para_id: ParaId,
exit: E,
key: Arc<CollatorPair>,
version: VersionInfo,
) -> Result<(), cli::error::Error>
configuration: polkadot_collator::Configuration,
) -> Result<(), sc_cli::error::Error>
where
Block: BlockT,
SP: SetupParachain<Block> + Send + 'static,
E: IntoFuture<Item=(), Error=()>,
E::Future: Send + Clone + Sync + 'static,
<<SP as SetupParachain<Block>>::ProposerFactory as Environment<Block>>::Proposer: Send,
E: Future<Output = ()> + Unpin + Send + Clone + Sync + 'static,
{
let builder = CollatorBuilder::new(setup_parachain);
polkadot_collator::run_collator(builder, para_id, exit, key, version)
polkadot_collator::run_collator(builder, para_id, exit, key, configuration)
}
#[cfg(test)]
@@ -315,26 +380,31 @@ mod tests {
use super::*;
use std::time::Duration;
use polkadot_collator::{collate, RelayChainContext, PeerId, CollatorId, SignedStatement};
use polkadot_primitives::parachain::{ConsolidatedIngress, HeadData, FeeSchedule};
use polkadot_collator::{collate, CollatorId, PeerId, RelayChainContext, SignedStatement};
use polkadot_primitives::parachain::{ConsolidatedIngress, FeeSchedule, HeadData};
use keyring::Sr25519Keyring;
use sr_primitives::{generic::BlockId, traits::{DigestFor, Header as HeaderT}};
use inherents::InherentData;
use substrate_client::error::Result as ClientResult;
use sp_blockchain::Result as ClientResult;
use sp_inherents::InherentData;
use sp_keyring::Sr25519Keyring;
use sp_runtime::{
generic::BlockId,
traits::{DigestFor, Header as HeaderT},
};
use sp_state_machine::StorageProof;
use substrate_test_client::{NativeExecutor, WasmExecutionMethod::Interpreted};
use test_client::{
Client, DefaultTestClientBuilderExt, TestClientBuilder, TestClientBuilderExt,
};
use test_runtime::{Block, Header};
use test_client::{Client, DefaultTestClientBuilderExt, TestClientBuilder, TestClientBuilderExt};
use futures03::future;
use futures::Stream;
#[derive(Debug)]
struct Error;
impl From<consensus_common::Error> for Error {
fn from(_: consensus_common::Error) -> Self {
impl From<sp_consensus::Error> for Error {
fn from(_: sp_consensus::Error) -> Self {
unimplemented!("Not required in tests")
}
}
@@ -354,14 +424,15 @@ mod tests {
impl Proposer<Block> for DummyProposer {
type Error = Error;
type Proposal = future::Ready<Result<(Block, Option<Vec<Vec<u8>>>), Error>>;
type Proposal = future::Ready<Result<Proposal<Block, Self::Transaction>, Error>>;
type Transaction = sc_client_api::TransactionFor<test_client::Backend, Block>;
fn propose(
&mut self,
_: InherentData,
digest : DigestFor<Block>,
digest: DigestFor<Block>,
_: Duration,
_: bool,
_: RecordProof,
) -> Self::Proposal {
let header = Header::new(
1337,
@@ -371,22 +442,25 @@ mod tests {
digest,
);
future::ready(Ok((Block::new(header, Vec::new()), Some(Default::default()))))
future::ready(Ok(Proposal {
block: Block::new(header, Vec::new()),
storage_changes: Default::default(),
proof: Some(StorageProof::empty()),
}))
}
}
struct DummyCollatorNetwork;
impl CollatorNetwork for DummyCollatorNetwork {
fn collator_id_to_peer_id(&self, _: CollatorId) ->
Box<dyn Future<Item=Option<PeerId>, Error=()> + Send>
{
fn collator_id_to_peer_id(
&self,
_: CollatorId,
) -> Box<dyn Future<Output = Option<PeerId>> + Send> {
unimplemented!("Not required in tests")
}
fn checked_statements(&self, _: PHash) ->
Box<dyn Stream<Item=SignedStatement, Error=()>>
{
fn checked_statements(&self, _: PHash) -> Box<dyn Stream<Item = SignedStatement>> {
unimplemented!("Not required in tests")
}
}
@@ -395,10 +469,10 @@ mod tests {
impl RelayChainContext for DummyRelayChainContext {
type Error = Error;
type FutureEgress = Result<ConsolidatedIngress, Self::Error>;
type FutureEgress = future::Ready<Result<ConsolidatedIngress, Error>>;
fn unrouted_egress(&self, _id: ParaId) -> Self::FutureEgress {
Ok(ConsolidatedIngress(Vec::new()))
future::ready(Ok(ConsolidatedIngress(Vec::new())))
}
}
@@ -407,7 +481,7 @@ mod tests {
impl cumulus_consensus::PolkadotClient for DummyPolkadotClient {
type Error = Error;
type Finalized = Box<dyn futures03::Stream<Item=Vec<u8>> + Send + Unpin>;
type Finalized = Box<dyn futures::Stream<Item = Vec<u8>> + Send + Unpin>;
fn finalized_heads(&self, _: ParaId) -> ClientResult<Self::Finalized> {
unimplemented!("Not required in tests")
@@ -417,7 +491,7 @@ mod tests {
&self,
_: &BlockId<PBlock>,
_: ParaId,
) -> ClientResult<Option<Vec<u8>>>{
) -> ClientResult<Option<Vec<u8>>> {
unimplemented!("Not required in tests")
}
}
@@ -428,45 +502,45 @@ mod tests {
type ProposerFactory = DummyFactory;
type BlockImport = Client;
fn setup_parachain<P: cumulus_consensus::PolkadotClient>(
fn setup_parachain<P, SP>(
self,
_: P,
_: TaskExecutor,
) -> Result<(Self::ProposerFactory, Self::BlockImport, InherentDataProviders), String> {
Ok((DummyFactory, TestClientBuilder::new().build(), InherentDataProviders::default()))
}
}
type BoxFuture = Box<dyn Future<Item = (), Error = ()> + Send>;
struct DummyFutureExecutor;
impl futures::future::Executor<BoxFuture>
for DummyFutureExecutor
{
fn execute(
&self,
_: BoxFuture,
) -> Result<(), futures::future::ExecuteError<BoxFuture>> {
unimplemented!("Not required in tests")
_: SP,
) -> Result<
(
Self::ProposerFactory,
Self::BlockImport,
InherentDataProviders,
),
String,
> {
Ok((
DummyFactory,
TestClientBuilder::new().build(),
InherentDataProviders::default(),
))
}
}
#[test]
fn collates_produces_a_block() {
let _ = env_logger::try_init();
let spawner = futures::executor::ThreadPool::new().unwrap();
let builder = CollatorBuilder::new(DummySetup);
let context = builder.build(
Arc::new(
substrate_test_client::TestClientBuilder::<_, _, ()>::default()
.build_with_native_executor(
Some(NativeExecutor::<polkadot_executor::Executor>::new(Interpreted, None)),
).0
),
Arc::new(DummyFutureExecutor),
Arc::new(DummyCollatorNetwork),
).expect("Creates parachain context");
let context = builder
.build::<_, _, polkadot_service::polkadot_runtime::RuntimeApi, _, _>(
Arc::new(
substrate_test_client::TestClientBuilder::<_, _, ()>::default()
.build_with_native_executor(Some(NativeExecutor::<
polkadot_service::PolkadotExecutor,
>::new(Interpreted, None)))
.0,
),
spawner,
Arc::new(DummyCollatorNetwork),
)
.expect("Creates parachain context");
let id = ParaId::from(100);
let header = Header::new(
@@ -491,7 +565,9 @@ mod tests {
DummyRelayChainContext,
context,
Arc::new(Sr25519Keyring::Alice.pair().into()),
).wait().unwrap().0;
);
let collation = futures::executor::block_on(collation).unwrap().0;
let block_data = collation.pov.block_data;