Parachain Consensus abstractions (#329)

* Move consensus to consensus-common crate

* Move the parachain consensus out of the collator

* Add first relay chain consensus stuff

* Remove some warnings

* Fix more stuff

* Fix collator test

* Change `ParachainConsensus` to take a mutable self

* Make everything compile

* Feedback
This commit is contained in:
Bastian Köcher
2021-02-16 12:45:30 +01:00
committed by GitHub
parent 5f5df0485a
commit d3f9c7db38
15 changed files with 707 additions and 551 deletions
+35 -15
View File
@@ -1085,44 +1085,41 @@ dependencies = [
name = "cumulus-client-collator"
version = "0.1.0"
dependencies = [
"async-trait",
"cumulus-client-consensus-common",
"cumulus-client-network",
"cumulus-primitives-core",
"cumulus-primitives-parachain-inherent",
"cumulus-test-client",
"cumulus-test-runtime",
"env_logger",
"futures 0.3.12",
"log",
"parity-scale-codec",
"parking_lot 0.9.0",
"polkadot-node-primitives",
"polkadot-node-subsystem",
"polkadot-node-subsystem-test-helpers",
"polkadot-overseer",
"polkadot-parachain",
"polkadot-primitives",
"polkadot-service",
"polkadot-test-client",
"sc-cli",
"sc-client-api",
"sp-api",
"sp-blockchain",
"sp-consensus",
"sp-core",
"sp-inherents",
"sp-io",
"sp-keyring",
"sp-runtime",
"sp-state-machine",
"substrate-test-client",
"tracing",
]
[[package]]
name = "cumulus-client-consensus"
name = "cumulus-client-consensus-common"
version = "0.1.0"
dependencies = [
"async-trait",
"cumulus-test-client",
"cumulus-test-runtime",
"dyn-clone",
"futures 0.3.12",
"futures-timer 3.0.2",
"parity-scale-codec",
@@ -1137,11 +1134,36 @@ dependencies = [
"sp-inherents",
"sp-runtime",
"sp-tracing",
"sp-trie",
"substrate-prometheus-endpoint",
"tokio 0.1.22",
"tracing",
]
[[package]]
name = "cumulus-client-consensus-relay-chain"
version = "0.1.0"
dependencies = [
"async-trait",
"cumulus-client-consensus-common",
"cumulus-primitives-core",
"cumulus-primitives-parachain-inherent",
"futures 0.3.12",
"parity-scale-codec",
"parking_lot 0.9.0",
"polkadot-service",
"sc-client-api",
"sp-api",
"sp-block-builder",
"sp-blockchain",
"sp-consensus",
"sp-core",
"sp-inherents",
"sp-runtime",
"substrate-prometheus-endpoint",
"tracing",
]
[[package]]
name = "cumulus-client-network"
version = "0.1.0"
@@ -1174,11 +1196,10 @@ name = "cumulus-client-service"
version = "0.1.0"
dependencies = [
"cumulus-client-collator",
"cumulus-client-consensus",
"cumulus-client-consensus-common",
"cumulus-primitives-core",
"futures 0.3.12",
"parity-scale-codec",
"polkadot-overseer",
"polkadot-primitives",
"polkadot-service",
"sc-chain-spec",
@@ -1189,7 +1210,6 @@ dependencies = [
"sp-blockchain",
"sp-consensus",
"sp-core",
"sp-inherents",
"sp-runtime",
"tracing",
]
@@ -1388,7 +1408,7 @@ dependencies = [
name = "cumulus-test-service"
version = "0.1.0"
dependencies = [
"cumulus-client-consensus",
"cumulus-client-consensus-relay-chain",
"cumulus-client-network",
"cumulus-client-service",
"cumulus-primitives-core",
@@ -6914,7 +6934,7 @@ version = "0.1.0"
dependencies = [
"assert_cmd",
"cumulus-client-collator",
"cumulus-client-consensus",
"cumulus-client-consensus-relay-chain",
"cumulus-client-network",
"cumulus-client-service",
"cumulus-primitives-core",
@@ -10121,7 +10141,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04f8ab788026715fa63b31960869617cba39117e520eb415b0139543e325ab59"
dependencies = [
"cfg-if 0.1.10",
"rand 0.7.3",
"rand 0.6.5",
"static_assertions",
]
+2 -1
View File
@@ -1,6 +1,7 @@
[workspace]
members = [
"client/consensus",
"client/consensus/common",
"client/consensus/relay-chain",
"client/network",
"client/service",
"pallets/parachain-system",
+6 -11
View File
@@ -10,15 +10,10 @@ sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-io = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-consensus = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-inherents = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-state-machine = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-cli = { git = "https://github.com/paritytech/substrate", branch = "master" }
# Polkadot dependencies
polkadot-service = { git = "https://github.com/paritytech/polkadot", features = [ "real-overseer" ], branch = "master" }
polkadot-parachain = { git = "https://github.com/paritytech/polkadot", branch = "master" }
polkadot-primitives = { git = "https://github.com/paritytech/polkadot", branch = "master" }
polkadot-node-primitives = { git = "https://github.com/paritytech/polkadot", branch = "master" }
polkadot-overseer = { git = "https://github.com/paritytech/polkadot", branch = "master" }
@@ -26,16 +21,19 @@ polkadot-node-subsystem = { git = "https://github.com/paritytech/polkadot", bran
# Cumulus dependencies
cumulus-client-network = { path = "../network" }
cumulus-client-consensus-common = { path = "../consensus/common" }
cumulus-primitives-core = { path = "../../primitives/core" }
cumulus-primitives-parachain-inherent = { path = "../../primitives/parachain-inherent" }
# Other dependencies
log = "0.4.8"
codec = { package = "parity-scale-codec", version = "2.0.0", features = [ "derive" ] }
futures = { version = "0.3.1", features = ["compat"] }
parking_lot = "0.9"
tracing = "0.1.22"
[dev-dependencies]
# Polkadot dependencies
polkadot-node-subsystem-test-helpers = { git = "https://github.com/paritytech/polkadot", branch = "master" }
# Cumulus dependencies
cumulus-test-runtime = { path = "../../test/runtime" }
cumulus-test-client = { path = "../../test/client" }
@@ -46,9 +44,6 @@ sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "mas
sp-state-machine = { git = "https://github.com/paritytech/substrate", branch = "master" }
substrate-test-client = { git = "https://github.com/paritytech/substrate", branch = "master" }
# Polkadot dependencies
polkadot-test-client = { git = "https://github.com/paritytech/polkadot", branch = "master" }
polkadot-node-subsystem-test-helpers = { git = "https://github.com/paritytech/polkadot", branch = "master" }
# Other dependencies
env_logger = "0.7.1"
async-trait = "0.1.42"
+128 -348
View File
@@ -20,219 +20,131 @@ use cumulus_client_network::WaitToAnnounce;
use cumulus_primitives_core::{
well_known_keys, OutboundHrmpMessage, ParachainBlockData, PersistedValidationData,
};
use cumulus_primitives_parachain_inherent::ParachainInherentData;
use sc_client_api::{BlockBackend, StateBackend};
use sp_consensus::{
BlockImport, BlockImportParams, BlockOrigin, BlockStatus, Environment, Error as ConsensusError,
ForkChoiceStrategy, Proposal, Proposer, RecordProof,
};
use sc_client_api::BlockBackend;
use sp_consensus::BlockStatus;
use sp_core::traits::SpawnNamed;
use sp_inherents::{InherentData, InherentDataProviders};
use sp_runtime::{
generic::BlockId,
traits::{BlakeTwo256, Block as BlockT, Header as HeaderT, Zero},
traits::{Block as BlockT, Header as HeaderT, Zero},
};
use sp_state_machine::InspectState;
use cumulus_client_consensus_common::ParachainConsensus;
use polkadot_node_primitives::{Collation, CollationGenerationConfig, CollationResult};
use polkadot_node_subsystem::messages::{CollationGenerationMessage, CollatorProtocolMessage};
use polkadot_overseer::OverseerHandler;
use polkadot_primitives::v1::{
Block as PBlock, BlockData, BlockNumber as PBlockNumber, CollatorPair, Hash as PHash, HeadData,
Id as ParaId, PoV, UpwardMessage,
BlockData, BlockNumber as PBlockNumber, CollatorPair, Hash as PHash, HeadData, Id as ParaId,
PoV, UpwardMessage,
};
use polkadot_service::RuntimeApiCollection;
use codec::{Decode, Encode};
use log::{debug, error, info, trace};
use futures::{channel::oneshot, FutureExt};
use futures::{channel::oneshot, prelude::*};
use std::{marker::PhantomData, sync::Arc, time::Duration};
use std::sync::Arc;
use parking_lot::Mutex;
type TransactionFor<E, Block> =
<<E as Environment<Block>>::Proposer as Proposer<Block>>::Transaction;
/// The logging target.
const LOG_TARGET: &str = "cumulus-collator";
/// The implementation of the Cumulus `Collator`.
pub struct Collator<Block: BlockT, PF, BI, BS, Backend, PBackend, PClient, PBackend2> {
para_id: ParaId,
proposer_factory: Arc<Mutex<PF>>,
_phantom: PhantomData<(Block, PBackend)>,
inherent_data_providers: InherentDataProviders,
block_import: Arc<Mutex<BI>>,
pub struct Collator<Block: BlockT, BS, Backend> {
block_status: Arc<BS>,
parachain_consensus: Box<dyn ParachainConsensus<Block>>,
wait_to_announce: Arc<Mutex<WaitToAnnounce<Block>>>,
backend: Arc<Backend>,
polkadot_client: Arc<PClient>,
polkadot_backend: Arc<PBackend2>,
}
impl<Block: BlockT, PF, BI, BS, Backend, PBackend, PClient, PBackend2> Clone
for Collator<Block, PF, BI, BS, Backend, PBackend, PClient, PBackend2>
{
impl<Block: BlockT, BS, Backend> Clone for Collator<Block, BS, Backend> {
fn clone(&self) -> Self {
Self {
para_id: self.para_id.clone(),
proposer_factory: self.proposer_factory.clone(),
inherent_data_providers: self.inherent_data_providers.clone(),
_phantom: PhantomData,
block_import: self.block_import.clone(),
block_status: self.block_status.clone(),
wait_to_announce: self.wait_to_announce.clone(),
backend: self.backend.clone(),
polkadot_client: self.polkadot_client.clone(),
polkadot_backend: self.polkadot_backend.clone(),
parachain_consensus: self.parachain_consensus.clone(),
}
}
}
impl<Block, PF, BI, BS, Backend, PBackend, PApi, PClient, PBackend2>
Collator<Block, PF, BI, BS, Backend, PBackend, PClient, PBackend2>
impl<Block, BS, Backend> Collator<Block, BS, Backend>
where
Block: BlockT,
PF: Environment<Block> + 'static + Send,
PF::Proposer: Send,
BI: BlockImport<
Block,
Error = ConsensusError,
Transaction = <PF::Proposer as Proposer<Block>>::Transaction,
> + Send
+ Sync
+ 'static,
BS: BlockBackend<Block>,
Backend: sc_client_api::Backend<Block> + 'static,
PBackend: sc_client_api::Backend<PBlock> + 'static,
PBackend::State: StateBackend<BlakeTwo256>,
PApi: RuntimeApiCollection<StateBackend = PBackend::State>,
PClient: polkadot_service::AbstractClient<PBlock, PBackend, Api = PApi> + 'static,
PBackend2: sc_client_api::Backend<PBlock> + 'static,
PBackend2::State: StateBackend<BlakeTwo256>,
{
/// Create a new instance.
fn new(
para_id: ParaId,
proposer_factory: PF,
inherent_data_providers: InherentDataProviders,
block_import: BI,
block_status: Arc<BS>,
spawner: Arc<dyn SpawnNamed + Send + Sync>,
announce_block: Arc<dyn Fn(Block::Hash, Vec<u8>) + Send + Sync>,
backend: Arc<Backend>,
polkadot_client: Arc<PClient>,
polkadot_backend: Arc<PBackend2>,
parachain_consensus: Box<dyn ParachainConsensus<Block>>,
) -> Self {
let wait_to_announce = Arc::new(Mutex::new(WaitToAnnounce::new(spawner, announce_block)));
Self {
para_id,
proposer_factory: Arc::new(Mutex::new(proposer_factory)),
inherent_data_providers,
_phantom: PhantomData,
block_import: Arc::new(Mutex::new(block_import)),
block_status,
wait_to_announce,
backend,
polkadot_client,
polkadot_backend,
parachain_consensus,
}
}
/// Get the inherent data with validation function parameters injected
fn inherent_data(
&mut self,
validation_data: &PersistedValidationData,
relay_parent: PHash,
) -> Option<InherentData> {
let mut inherent_data = self
.inherent_data_providers
.create_inherent_data()
.map_err(|e| {
error!(
target: LOG_TARGET,
"Failed to create inherent data: {:?}", e,
)
})
.ok()?;
let parachain_inherent_data = ParachainInherentData::create_at(
relay_parent,
&*self.polkadot_client,
&*self.polkadot_backend,
validation_data,
self.para_id,
)?;
inherent_data
.put_data(
cumulus_primitives_parachain_inherent::INHERENT_IDENTIFIER,
&parachain_inherent_data,
)
.map_err(|e| {
error!(
target: LOG_TARGET,
"Failed to put the system inherent into inherent data: {:?}", e,
)
})
.ok()?;
Some(inherent_data)
}
/// Checks the status of the given block hash in the Parachain.
///
/// Returns `true` if the block could be found and is good to be build on.
fn check_block_status(&self, hash: Block::Hash, header: &Block::Header) -> bool {
match self.block_status.block_status(&BlockId::Hash(hash)) {
Ok(BlockStatus::Queued) => {
debug!(
tracing::debug!(
target: LOG_TARGET,
"Skipping candidate production, because block `{:?}` is still queued for import.",
hash,
block_hash = ?hash,
"Skipping candidate production, because block is still queued for import.",
);
false
}
Ok(BlockStatus::InChainWithState) => true,
Ok(BlockStatus::InChainPruned) => {
error!(
tracing::error!(
target: LOG_TARGET,
"Skipping candidate production, because block `{:?}` is already pruned!", hash,
"Skipping candidate production, because block `{:?}` is already pruned!",
hash,
);
false
}
Ok(BlockStatus::KnownBad) => {
error!(
tracing::error!(
target: LOG_TARGET,
"Block `{}` is tagged as known bad and is included in the relay chain! Skipping candidate production!",
hash,
block_hash = ?hash,
"Block is tagged as known bad and is included in the relay chain! Skipping candidate production!",
);
false
}
Ok(BlockStatus::Unknown) => {
if header.number().is_zero() {
error!(
tracing::error!(
target: LOG_TARGET,
"Could not find the header `{:?}` of the genesis block in the database!",
hash,
block_hash = ?hash,
"Could not find the header of the genesis block in the database!",
);
} else {
debug!(
tracing::debug!(
target: LOG_TARGET,
"Skipping candidate production, because block `{:?}` is unknown.", hash,
block_hash = ?hash,
"Skipping candidate production, because block is unknown.",
);
}
false
}
Err(e) => {
error!(
tracing::error!(
target: LOG_TARGET,
"Failed to get block status of `{:?}`: {:?}", hash, e,
block_hash = ?hash,
error = ?e,
"Failed to get block status.",
);
false
}
@@ -252,9 +164,10 @@ where
let state = match self.backend.state_at(BlockId::Hash(block_hash)) {
Ok(state) => state,
Err(e) => {
error!(
tracing::error!(
target: LOG_TARGET,
"Failed to get state of the freshly built block: {:?}", e
error = ?e,
"Failed to get state of the freshly built block.",
);
return None;
}
@@ -266,9 +179,10 @@ where
match upward_messages.map(|v| Vec::<UpwardMessage>::decode(&mut &v[..])) {
Some(Ok(msgs)) => msgs,
Some(Err(e)) => {
error!(
tracing::error!(
target: LOG_TARGET,
"Failed to decode upward messages from the build block: {:?}", e
error = ?e,
"Failed to decode upward messages from the build block.",
);
return None;
}
@@ -283,9 +197,10 @@ where
match processed_downward_messages.map(|v| u32::decode(&mut &v[..])) {
Some(Ok(processed_cnt)) => processed_cnt,
Some(Err(e)) => {
error!(
tracing::error!(
target: LOG_TARGET,
"Failed to decode the count of processed downward messages: {:?}", e
error = ?e,
"Failed to decode the count of processed downward message.",
);
return None;
}
@@ -298,9 +213,10 @@ where
{
Some(Ok(horizontal_messages)) => horizontal_messages,
Some(Err(e)) => {
error!(
tracing::error!(
target: LOG_TARGET,
"Failed to decode the horizontal messages: {:?}", e
error = ?e,
"Failed to decode the horizontal messages.",
);
return None;
}
@@ -311,9 +227,10 @@ where
let hrmp_watermark = match hrmp_watermark.map(|v| PBlockNumber::decode(&mut &v[..])) {
Some(Ok(hrmp_watermark)) => hrmp_watermark,
Some(Err(e)) => {
error!(
tracing::error!(
target: LOG_TARGET,
"Failed to decode the HRMP watermark: {:?}", e
error = ?e,
"Failed to decode the HRMP watermark."
);
return None;
}
@@ -343,14 +260,19 @@ where
relay_parent: PHash,
validation_data: PersistedValidationData,
) -> Option<CollationResult> {
trace!(target: LOG_TARGET, "Producing candidate");
tracing::trace!(
target: LOG_TARGET,
relay_parent = ?relay_parent,
"Producing candidate",
);
let last_head = match Block::Header::decode(&mut &validation_data.parent_head.0[..]) {
Ok(x) => x,
Err(e) => {
error!(
tracing::error!(
target: LOG_TARGET,
"Could not decode the head data: {:?}", e
error = ?e,
"Could not decode the head data."
);
return None;
}
@@ -361,78 +283,24 @@ where
return None;
}
info!(
tracing::info!(
target: LOG_TARGET,
"Starting collation for relay parent {:?} on parent {:?}.",
relay_parent,
last_head_hash,
relay_parent = ?relay_parent,
at = ?last_head_hash,
"Starting collation.",
);
let proposer_future = self.proposer_factory.lock().init(&last_head);
let candidate = self
.parachain_consensus
.produce_candidate(&last_head, relay_parent, &validation_data)
.await?;
let proposer = proposer_future
.await
.map_err(|e| error!(target: LOG_TARGET, "Could not create proposer: {:?}", e,))
.ok()?;
let inherent_data = self.inherent_data(&validation_data, relay_parent)?;
let Proposal {
block,
storage_changes,
proof,
} = proposer
.propose(
inherent_data,
Default::default(),
//TODO: Fix this.
Duration::from_millis(500),
RecordProof::Yes,
)
.await
.map_err(|e| error!(target: LOG_TARGET, "Proposing failed: {:?}", e,))
.ok()?;
let proof = match proof {
Some(proof) => proof,
None => {
error!(
target: LOG_TARGET,
"Proposer did not return the requested proof.",
);
return None;
}
};
let (header, extrinsics) = block.deconstruct();
let block_hash = header.hash();
let (header, extrinsics) = candidate.block.deconstruct();
// Create the parachain block data for the validators.
let b = ParachainBlockData::<Block>::new(header.clone(), extrinsics, proof);
let b = ParachainBlockData::<Block>::new(header, extrinsics, candidate.proof);
let mut block_import_params = BlockImportParams::new(BlockOrigin::Own, header);
block_import_params.body = Some(b.extrinsics().to_vec());
// Best block is determined by the relay chain.
block_import_params.fork_choice = Some(ForkChoiceStrategy::Custom(false));
block_import_params.storage_changes = Some(storage_changes);
if let Err(err) = self
.block_import
.lock()
.import_block(block_import_params, Default::default())
{
error!(
target: LOG_TARGET,
"Error importing build block (at {:?}): {:?}",
b.header().parent_hash(),
err,
);
return None;
}
trace!(
tracing::debug!(
target: LOG_TARGET,
"PoV size {{ header: {}kb, extrinsics: {}kb, storage_proof: {}kb }}",
b.header().encode().len() as f64 / 1024f64,
@@ -440,6 +308,7 @@ where
b.storage_proof().encode().len() as f64 / 1024f64,
);
let block_hash = b.header().hash();
let collation = self.build_collation(b, block_hash, validation_data.relay_parent_number)?;
let pov_hash = collation.proof_of_validity.hash();
@@ -449,9 +318,11 @@ where
.lock()
.wait_to_announce(block_hash, pov_hash, signed_stmt_recv);
info!(
tracing::info!(
target: LOG_TARGET,
"Produced proof-of-validity candidate {:?} from block {:?}.", pov_hash, block_hash,
pov_hash = ?pov_hash,
?block_hash,
"Produced proof-of-validity candidate.",
);
Some(CollationResult {
@@ -462,75 +333,41 @@ where
}
/// Parameters for [`start_collator`].
pub struct StartCollatorParams<Block: BlockT, PF, BI, Backend, BS, Spawner, PClient, PBackend> {
pub proposer_factory: PF,
pub inherent_data_providers: InherentDataProviders,
pub struct StartCollatorParams<Block: BlockT, Backend, BS, Spawner> {
pub para_id: ParaId,
pub backend: Arc<Backend>,
pub block_import: BI,
pub block_status: Arc<BS>,
pub announce_block: Arc<dyn Fn(Block::Hash, Vec<u8>) + Send + Sync>,
pub overseer_handler: OverseerHandler,
pub spawner: Spawner,
pub para_id: ParaId,
pub key: CollatorPair,
pub polkadot_client: Arc<PClient>,
pub polkadot_backend: Arc<PBackend>,
pub parachain_consensus: Box<dyn ParachainConsensus<Block>>,
}
pub async fn start_collator<
Block: BlockT,
PF,
BI,
Backend,
BS,
Spawner,
PClient,
PBackend,
PBackend2,
PApi,
>(
/// Start the collator.
pub async fn start_collator<Block, Backend, BS, Spawner>(
StartCollatorParams {
proposer_factory,
inherent_data_providers,
backend,
block_import,
para_id,
block_status,
announce_block,
mut overseer_handler,
spawner,
para_id,
key,
polkadot_client,
polkadot_backend,
}: StartCollatorParams<Block, PF, BI, Backend, BS, Spawner, PClient, PBackend2>,
) -> Result<(), String>
where
PF: Environment<Block> + Send + 'static,
BI: BlockImport<Block, Error = sp_consensus::Error, Transaction = TransactionFor<PF, Block>>
+ Send
+ Sync
+ 'static,
parachain_consensus,
backend,
}: StartCollatorParams<Block, Backend, BS, Spawner>,
) where
Block: BlockT,
Backend: sc_client_api::Backend<Block> + 'static,
BS: BlockBackend<Block> + Send + Sync + 'static,
Spawner: SpawnNamed + Clone + Send + Sync + 'static,
PBackend: sc_client_api::Backend<PBlock> + 'static,
PBackend::State: StateBackend<BlakeTwo256>,
PApi: RuntimeApiCollection<StateBackend = PBackend::State>,
PClient: polkadot_service::AbstractClient<PBlock, PBackend, Api = PApi> + 'static,
PBackend2: sc_client_api::Backend<PBlock> + 'static,
PBackend2::State: StateBackend<BlakeTwo256>,
{
let collator = Collator::new(
para_id,
proposer_factory,
inherent_data_providers,
block_import,
block_status,
Arc::new(spawner),
announce_block,
backend,
polkadot_client,
polkadot_backend,
parachain_consensus,
);
let config = CollationGenerationConfig {
@@ -551,87 +388,53 @@ where
overseer_handler
.send_msg(CollatorProtocolMessage::CollateOn(para_id))
.await;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use std::{pin::Pin, time::Duration};
use sp_core::{testing::TaskExecutor, Pair};
use sp_inherents::InherentData;
use sp_runtime::traits::DigestFor;
use cumulus_client_consensus_common::ParachainCandidate;
use cumulus_test_client::{
Client, DefaultTestClientBuilderExt, InitBlockBuilder, TestClientBuilder,
TestClientBuilderExt,
Client, ClientBlockImportExt, DefaultTestClientBuilderExt, InitBlockBuilder,
TestClientBuilder, TestClientBuilderExt,
};
use cumulus_test_runtime::{Block, Header};
use polkadot_node_subsystem::messages::CollationGenerationMessage;
use futures::{channel::mpsc, executor::block_on, StreamExt};
use polkadot_node_subsystem_test_helpers::ForwardSubsystem;
use polkadot_overseer::{AllSubsystems, Overseer};
use sp_consensus::BlockOrigin;
use sp_core::{testing::TaskExecutor, Pair};
use futures::{channel::mpsc, executor::block_on, future};
#[derive(Debug)]
struct Error;
impl From<sp_consensus::Error> for Error {
fn from(_: sp_consensus::Error) -> Self {
unimplemented!("Not required in tests")
}
}
struct DummyFactory(Arc<Client>);
impl Environment<Block> for DummyFactory {
type Proposer = DummyProposer;
type Error = Error;
type CreateProposer = Pin<
Box<dyn Future<Output = Result<Self::Proposer, Self::Error>> + Send + Unpin + 'static>,
>;
fn init(&mut self, header: &Header) -> Self::CreateProposer {
Box::pin(future::ready(Ok(DummyProposer {
client: self.0.clone(),
header: header.clone(),
})))
}
}
struct DummyProposer {
#[derive(Clone)]
struct DummyParachainConsensus {
client: Arc<Client>,
header: Header,
}
impl Proposer<Block> for DummyProposer {
type Error = Error;
type Proposal = future::Ready<Result<Proposal<Block, Self::Transaction>, Error>>;
type Transaction = sc_client_api::TransactionFor<cumulus_test_client::Backend, Block>;
#[async_trait::async_trait]
impl ParachainConsensus<Block> for DummyParachainConsensus {
async fn produce_candidate(
&mut self,
parent: &Header,
_: PHash,
validation_data: &PersistedValidationData,
) -> Option<ParachainCandidate<Block>> {
let block_id = BlockId::Hash(parent.hash());
let builder = self.client.init_block_builder_at(
&block_id,
Some(validation_data.clone()),
Default::default(),
);
fn propose(
self,
_: InherentData,
_: DigestFor<Block>,
_: Duration,
_: RecordProof,
) -> Self::Proposal {
let block_id = BlockId::Hash(self.header.hash());
let builder = self
.client
.init_block_builder_at(&block_id, None, Default::default());
let (block, _, proof) = builder.build().expect("Creates block").into_inner();
let (block, storage_changes, proof) =
builder.build().expect("Creates block").into_inner();
self.client
.import(BlockOrigin::Own, block.clone())
.expect("Imports the block");
future::ready(Ok(Proposal {
Some(ParachainCandidate {
block,
storage_changes,
proof,
}))
proof: proof.expect("Proof is returned"),
})
}
}
@@ -656,43 +459,19 @@ mod tests {
spawner.spawn("overseer", overseer.run().then(|_| async { () }).boxed());
let (polkadot_client, polkadot_backend, relay_parent) = {
// Create a polkadot client with a block imported.
use polkadot_test_client::{
ClientBlockImportExt as _, DefaultTestClientBuilderExt as _,
InitPolkadotBlockBuilder as _, TestClientBuilderExt as _,
};
let client_builder = polkadot_test_client::TestClientBuilder::new();
let polkadot_backend = client_builder.backend();
let mut client = client_builder.build();
let block_builder = client.init_polkadot_block_builder();
let block = block_builder.build().expect("Finalizes the block").block;
let hash = block.header().hash();
client
.import_as_best(BlockOrigin::Own, block)
.expect("Imports the block");
(client, polkadot_backend, hash)
};
let collator_start =
start_collator::<_, _, _, _, _, _, _, polkadot_service::FullBackend, _, _>(
StartCollatorParams {
proposer_factory: DummyFactory(client.clone()),
inherent_data_providers: Default::default(),
backend,
block_import: client.clone(),
block_status: client.clone(),
announce_block: Arc::new(announce_block),
overseer_handler: handler,
spawner,
para_id,
key: CollatorPair::generate().0,
polkadot_client: Arc::new(polkadot_client),
polkadot_backend,
},
);
block_on(collator_start).expect("Should start collator");
let collator_start = start_collator(StartCollatorParams {
backend,
block_status: client.clone(),
announce_block: Arc::new(announce_block),
overseer_handler: handler,
spawner,
para_id,
key: CollatorPair::generate().0,
parachain_consensus: Box::new(DummyParachainConsensus {
client: client.clone(),
}),
});
block_on(collator_start);
let msg = block_on(sub_rx.into_future())
.0
@@ -704,6 +483,7 @@ mod tests {
let mut validation_data = PersistedValidationData::default();
validation_data.parent_head = header.encode().into();
let relay_parent = Default::default();
let collation = block_on((config.collator)(relay_parent, &validation_data))
.expect("Collation is build")
@@ -1,6 +1,6 @@
[package]
name = "cumulus-client-consensus"
description = "Proxy Polkadot's consensus as a consensus engine for Substrate"
name = "cumulus-client-consensus-common"
description = "Cumulus specific common consensus implementations"
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"
@@ -15,6 +15,7 @@ sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master
sp-block-builder = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-trie = { git = "https://github.com/paritytech/substrate", branch = "master" }
substrate-prometheus-endpoint = { git = "https://github.com/paritytech/substrate", branch = "master" }
# Polkadot deps
@@ -26,14 +27,16 @@ futures = { version = "0.3.8", features = ["compat"] }
tokio = "0.1.22"
codec = { package = "parity-scale-codec", version = "2.0.0", features = [ "derive" ] }
tracing = "0.1.22"
async-trait = "0.1.42"
dyn-clone = "1.0.4"
[dev-dependencies]
# Substrate deps
sp-tracing = { git = "https://github.com/paritytech/substrate", branch = "master" }
# Cumulus dependencies
cumulus-test-runtime = { path = "../../test/runtime" }
cumulus-test-client = { path = "../../test/client" }
cumulus-test-runtime = { path = "../../../test/runtime" }
cumulus-test-client = { path = "../../../test/client" }
# Other deps
futures-timer = "3.0.2"
@@ -29,7 +29,8 @@ use sp_runtime::{
};
use polkadot_primitives::v1::{
Block as PBlock, Id as ParaId, OccupiedCoreAssumption, ParachainHost,
Block as PBlock, Hash as PHash, Id as ParaId, OccupiedCoreAssumption, ParachainHost,
PersistedValidationData,
};
use codec::Decode;
@@ -37,8 +38,6 @@ use futures::{future, select, FutureExt, Stream, StreamExt};
use std::{marker::PhantomData, sync::Arc};
pub mod import_queue;
/// Errors that can occur while following the polkadot relay-chain.
#[derive(Debug)]
pub enum Error {
@@ -511,6 +510,52 @@ where
}
}
/// The result of [`ParachainConsensus::produce_candidate`].
pub struct ParachainCandidate<B> {
/// The block that was build for this candidate.
pub block: B,
/// The proof that was recorded while building the block.
pub proof: sp_trie::StorageProof,
}
/// A specific parachain consensus implementation that can be used by a collator to produce candidates.
///
/// The collator will call [`Self::produce_candidate`] every time there is a free core for the parachain
/// this collator is collating for. It is the job of the consensus implementation to decide if this
/// specific collator should build candidate for the given relay chain block. The consensus
/// implementation could for example check if this specific collator is part of the validator.
#[async_trait::async_trait]
pub trait ParachainConsensus<B: BlockT>: Send + Sync + dyn_clone::DynClone {
/// Produce a new candidate at the given parent block.
///
/// Should return `None` if the consensus implementation decided that it shouldn't build a
/// candidate or if there occurred any error.
///
/// # NOTE
///
/// It is expected that the block is already imported when the future resolves.
async fn produce_candidate(
&mut self,
parent: &B::Header,
relay_parent: PHash,
validation_data: &PersistedValidationData,
) -> Option<ParachainCandidate<B>>;
}
dyn_clone::clone_trait_object!(<B> ParachainConsensus<B> where B: BlockT);
#[async_trait::async_trait]
impl<B: BlockT> ParachainConsensus<B> for Box<dyn ParachainConsensus<B> + Send + Sync> {
async fn produce_candidate(
&mut self,
parent: &B::Header,
relay_parent: PHash,
validation_data: &PersistedValidationData,
) -> Option<ParachainCandidate<B>> {
(*self).produce_candidate(parent, relay_parent, validation_data).await
}
}
#[cfg(test)]
mod tests {
use super::*;
@@ -0,0 +1,33 @@
[package]
name = "cumulus-client-consensus-relay-chain"
description = "The relay-chain provided consensus algorithm"
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"
[dependencies]
# Substrate deps
sp-consensus = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-inherents = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-block-builder = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
substrate-prometheus-endpoint = { git = "https://github.com/paritytech/substrate", branch = "master" }
# Polkadot dependencies
polkadot-service = { git = "https://github.com/paritytech/polkadot", branch = "master", features = [ "real-overseer" ] }
# Cumulus dependencies
cumulus-client-consensus-common = { path = "../common" }
cumulus-primitives-core = { path = "../../../primitives/core" }
cumulus-primitives-parachain-inherent = { path = "../../../primitives/parachain-inherent" }
# Other deps
futures = { version = "0.3.8", features = ["compat"] }
codec = { package = "parity-scale-codec", version = "2.0.0", features = [ "derive" ] }
tracing = "0.1.22"
async-trait = "0.1.42"
parking_lot = "0.9"
@@ -0,0 +1,364 @@
// Copyright 2021 Parity Technologies (UK) Ltd.
// This file is part of Cumulus.
// Cumulus is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Cumulus is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Cumulus. If not, see <http://www.gnu.org/licenses/>.
//! The relay-chain provided consensus algoritm for parachains.
//!
//! This is the simplest consensus algorithm you can use when developing a parachain. It is a
//! permission-less consensus algorithm that doesn't require any staking or similar to join as a
//! collator. In this algorithm the consensus is provided by the relay-chain. This works in the
//! following way.
//!
//! 1. Each node that sees itself as a collator is free to build a parachain candidate.
//!
//! 2. This parachain candidate is send to the parachain validators that are part of the relay chain.
//!
//! 3. The parachain validators validate at most X different parachain candidates, where X is the
//! total number of parachain validators.
//!
//! 4. The parachain candidate that is backed by the most validators is choosen by the relay-chain
//! block producer to be added as backed candidate on chain.
//!
//! 5. After the parachain candidate got backed and included, all collators start at 1.
use cumulus_client_consensus_common::{ParachainCandidate, ParachainConsensus};
use cumulus_primitives_core::{
relay_chain::v1::{Block as PBlock, Hash as PHash, ParachainHost},
ParaId, PersistedValidationData,
};
use cumulus_primitives_parachain_inherent::ParachainInherentData;
pub use import_queue::import_queue;
use parking_lot::Mutex;
use polkadot_service::ClientHandle;
use sc_client_api::Backend;
use sp_api::ProvideRuntimeApi;
use sp_consensus::{
BlockImport, BlockImportParams, BlockOrigin, Environment, ForkChoiceStrategy, Proposal,
Proposer, RecordProof,
};
use sp_inherents::{InherentData, InherentDataProviders};
use sp_runtime::traits::{Block as BlockT, HashFor, Header as HeaderT};
use std::{marker::PhantomData, sync::Arc, time::Duration};
mod import_queue;
const LOG_TARGET: &str = "cumulus-consensus-relay-chain";
/// The implementation of the relay-chain provided consensus for parachains.
pub struct RelayChainConsensus<B, PF, BI, RClient, RBackend> {
para_id: ParaId,
_phantom: PhantomData<B>,
proposer_factory: Arc<Mutex<PF>>,
inherent_data_providers: InherentDataProviders,
block_import: Arc<Mutex<BI>>,
relay_chain_client: Arc<RClient>,
relay_chain_backend: Arc<RBackend>,
}
impl<B, PF, BI, RClient, RBackend> Clone for RelayChainConsensus<B, PF, BI, RClient, RBackend> {
fn clone(&self) -> Self {
Self {
para_id: self.para_id,
_phantom: PhantomData,
proposer_factory: self.proposer_factory.clone(),
inherent_data_providers: self.inherent_data_providers.clone(),
block_import: self.block_import.clone(),
relay_chain_backend: self.relay_chain_backend.clone(),
relay_chain_client: self.relay_chain_client.clone(),
}
}
}
impl<B, PF, BI, RClient, RBackend> RelayChainConsensus<B, PF, BI, RClient, RBackend>
where
B: BlockT,
RClient: ProvideRuntimeApi<PBlock>,
RClient::Api: ParachainHost<PBlock>,
RBackend: Backend<PBlock>,
{
/// Create a new instance of relay-chain provided consensus.
pub fn new(
para_id: ParaId,
proposer_factory: PF,
inherent_data_providers: InherentDataProviders,
block_import: BI,
polkadot_client: Arc<RClient>,
polkadot_backend: Arc<RBackend>,
) -> Self {
Self {
para_id,
proposer_factory: Arc::new(Mutex::new(proposer_factory)),
inherent_data_providers,
block_import: Arc::new(Mutex::new(block_import)),
relay_chain_backend: polkadot_backend,
relay_chain_client: polkadot_client,
_phantom: PhantomData,
}
}
/// Get the inherent data with validation function parameters injected
fn inherent_data(
&self,
validation_data: &PersistedValidationData,
relay_parent: PHash,
) -> Option<InherentData> {
let mut inherent_data = self
.inherent_data_providers
.create_inherent_data()
.map_err(|e| {
tracing::error!(
target: LOG_TARGET,
error = ?e,
"Failed to create inherent data.",
)
})
.ok()?;
let parachain_inherent_data = ParachainInherentData::create_at(
relay_parent,
&*self.relay_chain_client,
&*self.relay_chain_backend,
validation_data,
self.para_id,
)?;
inherent_data
.put_data(
cumulus_primitives_parachain_inherent::INHERENT_IDENTIFIER,
&parachain_inherent_data,
)
.map_err(|e| {
tracing::error!(
target: LOG_TARGET,
error = ?e,
"Failed to put the system inherent into inherent data.",
)
})
.ok()?;
Some(inherent_data)
}
}
#[async_trait::async_trait]
impl<B, PF, BI, RClient, RBackend> ParachainConsensus<B>
for RelayChainConsensus<B, PF, BI, RClient, RBackend>
where
B: BlockT,
RClient: ProvideRuntimeApi<PBlock> + Send + Sync,
RClient::Api: ParachainHost<PBlock>,
RBackend: Backend<PBlock>,
BI: BlockImport<B> + Send + Sync,
PF: Environment<B> + Send + Sync,
PF::Proposer: Proposer<B, Transaction = BI::Transaction>,
{
async fn produce_candidate(
&mut self,
parent: &B::Header,
relay_parent: PHash,
validation_data: &PersistedValidationData,
) -> Option<ParachainCandidate<B>> {
let proposer_future = self.proposer_factory.lock().init(&parent);
let proposer = proposer_future
.await
.map_err(
|e| tracing::error!(target: LOG_TARGET, error = ?e, "Could not create proposer."),
)
.ok()?;
let inherent_data = self.inherent_data(&validation_data, relay_parent)?;
let Proposal {
block,
storage_changes,
proof,
} = proposer
.propose(
inherent_data,
Default::default(),
//TODO: Fix this.
Duration::from_millis(500),
RecordProof::Yes,
)
.await
.map_err(|e| tracing::error!(target: LOG_TARGET, error = ?e, "Proposing failed."))
.ok()?;
let proof = match proof {
Some(proof) => proof,
None => {
tracing::error!(
target: LOG_TARGET,
"Proposer did not return the requested proof.",
);
return None;
}
};
let (header, extrinsics) = block.clone().deconstruct();
let mut block_import_params = BlockImportParams::new(BlockOrigin::Own, header);
block_import_params.body = Some(extrinsics);
// Best block is determined by the relay chain.
block_import_params.fork_choice = Some(ForkChoiceStrategy::Custom(false));
block_import_params.storage_changes = Some(storage_changes);
if let Err(err) = self
.block_import
.lock()
.import_block(block_import_params, Default::default())
{
tracing::error!(
target: LOG_TARGET,
at = ?parent.hash(),
error = ?err,
"Error importing build block.",
);
return None;
}
Some(ParachainCandidate { block, proof })
}
}
/// Paramaters of [`build_relay_chain_consensus`].
pub struct BuildRelayChainConsensusParams<PF, BI, RBackend> {
pub para_id: ParaId,
pub proposer_factory: PF,
pub inherent_data_providers: InherentDataProviders,
pub block_import: BI,
pub relay_chain_client: polkadot_service::Client,
pub relay_chain_backend: Arc<RBackend>,
}
/// Build the [`RelayChainConsensus`].
///
/// Returns a boxed [`ParachainConsensus`].
pub fn build_relay_chain_consensus<Block, PF, BI, RBackend>(
BuildRelayChainConsensusParams {
para_id,
proposer_factory,
inherent_data_providers,
block_import,
relay_chain_client,
relay_chain_backend,
}: BuildRelayChainConsensusParams<PF, BI, RBackend>,
) -> Box<dyn ParachainConsensus<Block>>
where
Block: BlockT,
PF: Environment<Block> + Send + Sync + 'static,
PF::Proposer: Proposer<Block, Transaction = BI::Transaction>,
BI: BlockImport<Block> + Send + Sync + 'static,
RBackend: Backend<PBlock> + 'static,
// Rust bug: https://github.com/rust-lang/rust/issues/24159
sc_client_api::StateBackendFor<RBackend, PBlock>: sc_client_api::StateBackend<HashFor<PBlock>>,
{
RelayChainConsensusBuilder::new(
para_id,
proposer_factory,
block_import,
inherent_data_providers,
relay_chain_client,
relay_chain_backend,
)
.build()
}
/// Relay chain consensus builder.
///
/// Builds a [`RelayChainConsensus`] for a parachain. As this requires
/// a concrete relay chain client instance, the builder takes a [`polkadot_service::Client`]
/// that wraps this concrete instanace. By using [`polkadot_service::ExecuteWithClient`]
/// the builder gets access to this concrete instance.
struct RelayChainConsensusBuilder<Block, PF, BI, RBackend> {
para_id: ParaId,
_phantom: PhantomData<Block>,
proposer_factory: PF,
inherent_data_providers: InherentDataProviders,
block_import: BI,
relay_chain_backend: Arc<RBackend>,
relay_chain_client: polkadot_service::Client,
}
impl<Block, PF, BI, RBackend> RelayChainConsensusBuilder<Block, PF, BI, RBackend>
where
Block: BlockT,
// Rust bug: https://github.com/rust-lang/rust/issues/24159
sc_client_api::StateBackendFor<RBackend, PBlock>: sc_client_api::StateBackend<HashFor<PBlock>>,
PF: Environment<Block> + Send + Sync + 'static,
PF::Proposer: Proposer<Block, Transaction = BI::Transaction>,
BI: BlockImport<Block> + Send + Sync + 'static,
RBackend: Backend<PBlock> + 'static,
{
/// Create a new instance of the builder.
fn new(
para_id: ParaId,
proposer_factory: PF,
block_import: BI,
inherent_data_providers: InherentDataProviders,
relay_chain_client: polkadot_service::Client,
relay_chain_backend: Arc<RBackend>,
) -> Self {
Self {
para_id,
_phantom: PhantomData,
proposer_factory,
block_import,
inherent_data_providers,
relay_chain_backend,
relay_chain_client,
}
}
/// Build the relay chain consensus.
fn build(self) -> Box<dyn ParachainConsensus<Block>> {
self.relay_chain_client.clone().execute_with(self)
}
}
impl<Block, PF, BI, RBackend> polkadot_service::ExecuteWithClient
for RelayChainConsensusBuilder<Block, PF, BI, RBackend>
where
Block: BlockT,
// Rust bug: https://github.com/rust-lang/rust/issues/24159
sc_client_api::StateBackendFor<RBackend, PBlock>: sc_client_api::StateBackend<HashFor<PBlock>>,
PF: Environment<Block> + Send + Sync + 'static,
PF::Proposer: Proposer<Block, Transaction = BI::Transaction>,
BI: BlockImport<Block> + Send + Sync + 'static,
RBackend: Backend<PBlock> + 'static,
{
type Output = Box<dyn ParachainConsensus<Block>>;
fn execute_with_client<PClient, Api, PBackend>(self, client: Arc<PClient>) -> Self::Output
where
<Api as sp_api::ApiExt<PBlock>>::StateBackend: sp_api::StateBackend<HashFor<PBlock>>,
PBackend: Backend<PBlock>,
PBackend::State: sp_api::StateBackend<sp_runtime::traits::BlakeTwo256>,
Api: polkadot_service::RuntimeApiCollection<StateBackend = PBackend::State>,
PClient: polkadot_service::AbstractClient<PBlock, PBackend, Api = Api> + 'static,
{
Box::new(RelayChainConsensus::new(
self.para_id,
self.proposer_factory,
self.inherent_data_providers,
self.block_import,
client.clone(),
self.relay_chain_backend,
))
}
}
+1 -3
View File
@@ -6,7 +6,7 @@ edition = "2018"
[dependencies]
# Cumulus dependencies
cumulus-client-consensus = { path = "../consensus" }
cumulus-client-consensus-common = { path = "../consensus/common" }
cumulus-client-collator = { path = "../collator" }
cumulus-primitives-core = { path = "../../primitives/core" }
@@ -19,13 +19,11 @@ sp-consensus = { git = "https://github.com/paritytech/substrate", branch = "mast
sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-inherents = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "master" }
# Polkadot dependencies
polkadot-primitives = { git = "https://github.com/paritytech/polkadot", branch = "master" }
polkadot-service = { git = "https://github.com/paritytech/polkadot", branch = "master" }
polkadot-overseer = { git = "https://github.com/paritytech/polkadot", branch = "master" }
# Other deps
futures = "0.3.6"
+32 -129
View File
@@ -18,53 +18,38 @@
//!
//! Provides functions for starting a collator node or a normal full node.
use cumulus_client_consensus_common::ParachainConsensus;
use cumulus_primitives_core::ParaId;
use futures::{Future, FutureExt};
use polkadot_overseer::OverseerHandler;
use futures::FutureExt;
use polkadot_primitives::v1::{Block as PBlock, CollatorId, CollatorPair};
use polkadot_service::{AbstractClient, Client as PClient, ClientHandle, RuntimeApiCollection};
use sc_client_api::{
Backend as BackendT, BlockBackend, BlockchainEvents, Finalizer, StateBackend, UsageProvider,
Backend as BackendT, BlockBackend, BlockchainEvents, Finalizer, UsageProvider,
};
use sc_service::{error::Result as ServiceResult, Configuration, Role, TaskManager};
use sp_blockchain::HeaderBackend;
use sp_consensus::{BlockImport, Environment, Error as ConsensusError, Proposer};
use sp_consensus::BlockImport;
use sp_core::traits::SpawnNamed;
use sp_inherents::InherentDataProviders;
use sp_runtime::traits::{BlakeTwo256, Block as BlockT};
use std::{marker::PhantomData, sync::Arc};
pub mod genesis;
/// Polkadot full node handles.
type PFullNode<C> = polkadot_service::NewFull<C>;
/// Relay chain full node handles.
type RFullNode<C> = polkadot_service::NewFull<C>;
/// Parameters given to [`start_collator`].
pub struct StartCollatorParams<
'a,
Block: BlockT,
PF,
BI,
BS,
Client,
Backend,
Spawner,
PClient,
PBackend,
> {
pub proposer_factory: PF,
pub inherent_data_providers: InherentDataProviders,
pub struct StartCollatorParams<'a, Block: BlockT, BS, Client, Backend, Spawner, RClient> {
pub backend: Arc<Backend>,
pub block_import: BI,
pub block_status: Arc<BS>,
pub client: Arc<Client>,
pub announce_block: Arc<dyn Fn(Block::Hash, Vec<u8>) + Send + Sync>,
pub spawner: Spawner,
pub para_id: ParaId,
pub collator_key: CollatorPair,
pub polkadot_full_node: PFullNode<PClient>,
pub relay_chain_full_node: RFullNode<RClient>,
pub task_manager: &'a mut TaskManager,
pub polkadot_backend: Arc<PBackend>,
pub parachain_consensus: Box<dyn ParachainConsensus<Block>>,
}
/// Start a collator node for a parachain.
@@ -72,33 +57,22 @@ pub struct StartCollatorParams<
/// A collator is similar to a validator in a normal blockchain.
/// It is responsible for producing blocks and sending the blocks to a
/// parachain validator for validation and inclusion into the relay chain.
pub async fn start_collator<'a, Block, PF, BI, BS, Client, Backend, Spawner, PClient, PBackend>(
pub async fn start_collator<'a, Block, BS, Client, Backend, Spawner, RClient>(
StartCollatorParams {
proposer_factory,
inherent_data_providers,
backend,
block_import,
block_status,
client,
announce_block,
spawner,
para_id,
collator_key,
polkadot_full_node,
task_manager,
polkadot_backend,
}: StartCollatorParams<'a, Block, PF, BI, BS, Client, Backend, Spawner, PClient, PBackend>,
relay_chain_full_node,
parachain_consensus,
}: StartCollatorParams<'a, Block, BS, Client, Backend, Spawner, RClient>,
) -> sc_service::error::Result<()>
where
Block: BlockT,
PF: Environment<Block> + Send + 'static,
BI: BlockImport<
Block,
Error = ConsensusError,
Transaction = <PF::Proposer as Proposer<Block>>::Transaction,
> + Send
+ Sync
+ 'static,
BS: BlockBackend<Block> + Send + Sync + 'static,
Client: Finalizer<Block, Backend>
+ UsageProvider<Block>
@@ -111,11 +85,9 @@ where
for<'b> &'b Client: BlockImport<Block>,
Backend: BackendT<Block> + 'static,
Spawner: SpawnNamed + Clone + Send + Sync + 'static,
PClient: ClientHandle,
PBackend: BackendT<PBlock> + 'static,
PBackend::State: StateBackend<BlakeTwo256>,
RClient: ClientHandle,
{
polkadot_full_node.client.execute_with(StartConsensus {
relay_chain_full_node.client.execute_with(StartConsensus {
para_id,
announce_block: announce_block.clone(),
client: client.clone(),
@@ -123,99 +95,30 @@ where
_phantom: PhantomData,
})?;
polkadot_full_node
.client
.execute_with(StartCollator {
proposer_factory,
inherent_data_providers,
backend,
announce_block,
overseer_handler: polkadot_full_node
.overseer_handler
.ok_or_else(|| "Polkadot full node did not provided an `OverseerHandler`!")?,
spawner,
para_id,
collator_key,
block_import,
block_status,
polkadot_backend,
})
.await?;
cumulus_client_collator::start_collator(cumulus_client_collator::StartCollatorParams {
backend,
block_status,
announce_block,
overseer_handler: relay_chain_full_node
.overseer_handler
.ok_or_else(|| "Polkadot full node did not provided an `OverseerHandler`!")?,
spawner,
para_id,
key: collator_key,
parachain_consensus,
})
.await;
task_manager.add_child(polkadot_full_node.task_manager);
task_manager.add_child(relay_chain_full_node.task_manager);
Ok(())
}
struct StartCollator<Block: BlockT, Backend, PF, BI, BS, Spawner, PBackend> {
proposer_factory: PF,
inherent_data_providers: InherentDataProviders,
backend: Arc<Backend>,
block_import: BI,
block_status: Arc<BS>,
announce_block: Arc<dyn Fn(Block::Hash, Vec<u8>) + Send + Sync>,
overseer_handler: OverseerHandler,
spawner: Spawner,
para_id: ParaId,
collator_key: CollatorPair,
polkadot_backend: Arc<PBackend>,
}
impl<Block, Backend, PF, BI, BS, Spawner, PBackend2> polkadot_service::ExecuteWithClient
for StartCollator<Block, Backend, PF, BI, BS, Spawner, PBackend2>
where
Block: BlockT,
PF: Environment<Block> + Send + 'static,
BI: BlockImport<
Block,
Error = ConsensusError,
Transaction = <PF::Proposer as Proposer<Block>>::Transaction,
> + Send
+ Sync
+ 'static,
BS: BlockBackend<Block> + Send + Sync + 'static,
Backend: BackendT<Block> + 'static,
Spawner: SpawnNamed + Clone + Send + Sync + 'static,
PBackend2: sc_client_api::Backend<PBlock> + 'static,
PBackend2::State: sp_api::StateBackend<BlakeTwo256>,
{
type Output = std::pin::Pin<Box<dyn Future<Output = ServiceResult<()>>>>;
fn execute_with_client<PClient, Api, PBackend>(self, client: Arc<PClient>) -> Self::Output
where
<Api as sp_api::ApiExt<PBlock>>::StateBackend: sp_api::StateBackend<BlakeTwo256>,
PBackend: sc_client_api::Backend<PBlock> + 'static,
PBackend::State: sp_api::StateBackend<BlakeTwo256>,
Api: RuntimeApiCollection<StateBackend = PBackend::State>,
PClient: AbstractClient<PBlock, PBackend, Api = Api> + 'static,
{
async move {
cumulus_client_collator::start_collator(cumulus_client_collator::StartCollatorParams {
proposer_factory: self.proposer_factory,
inherent_data_providers: self.inherent_data_providers,
backend: self.backend,
block_import: self.block_import,
block_status: self.block_status,
announce_block: self.announce_block,
overseer_handler: self.overseer_handler,
spawner: self.spawner,
para_id: self.para_id,
key: self.collator_key,
polkadot_client: client,
polkadot_backend: self.polkadot_backend,
})
.await
.map_err(Into::into)
}
.boxed()
}
}
/// Parameters given to [`start_full_node`].
pub struct StartFullNodeParams<'a, Block: BlockT, Client, PClient> {
pub para_id: ParaId,
pub client: Arc<Client>,
pub polkadot_full_node: PFullNode<PClient>,
pub polkadot_full_node: RFullNode<PClient>,
pub task_manager: &'a mut TaskManager,
pub announce_block: Arc<dyn Fn(Block::Hash, Vec<u8>) + Send + Sync>,
}
@@ -291,7 +194,7 @@ where
Api: RuntimeApiCollection<StateBackend = PBackend::State>,
PClient: AbstractClient<PBlock, PBackend, Api = Api> + 'static,
{
let consensus = cumulus_client_consensus::run_parachain_consensus(
let consensus = cumulus_client_consensus_common::run_parachain_consensus(
self.para_id,
self.client,
client,
@@ -330,7 +233,7 @@ pub fn prepare_node_config(mut parachain_config: Configuration) -> Configuration
pub fn build_polkadot_full_node(
config: Configuration,
collator_id: CollatorId,
) -> Result<PFullNode<PClient>, polkadot_service::Error> {
) -> Result<RFullNode<PClient>, polkadot_service::Error> {
let is_light = matches!(config.role, Role::Light);
if is_light {
Err(polkadot_service::Error::Sub(
+1 -1
View File
@@ -56,7 +56,7 @@ sp-offchain = { git = "https://github.com/paritytech/substrate", branch = "maste
jsonrpc-core = "15.1.0"
# Cumulus dependencies
cumulus-client-consensus = { path = "../client/consensus" }
cumulus-client-consensus-relay-chain = { path = "../client/consensus/relay-chain" }
cumulus-client-collator = { path = "../client/collator" }
cumulus-client-service = { path = "../client/service" }
cumulus-client-network = { path = "../client/network" }
+12 -7
View File
@@ -19,6 +19,7 @@ use cumulus_primitives_core::ParaId;
use cumulus_client_service::{
prepare_node_config, start_collator, start_full_node, StartCollatorParams, StartFullNodeParams,
};
use cumulus_client_consensus_relay_chain::{build_relay_chain_consensus, BuildRelayChainConsensusParams};
use parachain_runtime::RuntimeApi;
use polkadot_primitives::v0::CollatorPair;
use rococo_parachain_primitives::Block;
@@ -70,7 +71,7 @@ pub fn new_partial(
client.clone(),
);
let import_queue = cumulus_client_consensus::import_queue::import_queue(
let import_queue = cumulus_client_consensus_relay_chain::import_queue(
client.clone(),
client.clone(),
inherent_data_providers.clone(),
@@ -188,22 +189,26 @@ where
);
let spawner = task_manager.spawn_handle();
let polkadot_backend = polkadot_full_node.backend.clone();
let parachain_consensus = build_relay_chain_consensus(BuildRelayChainConsensusParams {
para_id: id,
proposer_factory,
inherent_data_providers: params.inherent_data_providers,
block_import: client.clone(),
relay_chain_client: polkadot_full_node.client.clone(),
relay_chain_backend: polkadot_full_node.backend.clone(),
});
let params = StartCollatorParams {
para_id: id,
block_import: client.clone(),
proposer_factory,
inherent_data_providers: params.inherent_data_providers,
block_status: client.clone(),
announce_block,
client: client.clone(),
task_manager: &mut task_manager,
collator_key,
polkadot_full_node,
relay_chain_full_node: polkadot_full_node,
spawner,
backend,
polkadot_backend,
parachain_consensus,
};
start_collator(params).await?;
+1 -1
View File
@@ -43,7 +43,7 @@ polkadot-test-service = { git = "https://github.com/paritytech/polkadot", branch
polkadot-overseer = { git = "https://github.com/paritytech/polkadot", branch = "master" }
# Cumulus
cumulus-client-consensus = { path = "../../client/consensus" }
cumulus-client-consensus-relay-chain = { path = "../../client/consensus/relay-chain" }
cumulus-client-network = { path = "../../client/network" }
cumulus-client-service = { path = "../../client/service" }
cumulus-primitives-core = { path = "../../primitives/core" }
+37 -28
View File
@@ -27,10 +27,10 @@ pub use genesis::*;
use core::future::Future;
use cumulus_client_network::BlockAnnounceValidator;
use cumulus_primitives_core::ParaId;
use cumulus_client_service::{
prepare_node_config, start_collator, start_full_node, StartCollatorParams, StartFullNodeParams,
};
use cumulus_primitives_core::ParaId;
use cumulus_test_runtime::{NodeBlock as Block, RuntimeApi};
use polkadot_primitives::v1::CollatorPair;
use sc_client_api::execution_extensions::ExecutionStrategies;
@@ -39,8 +39,8 @@ pub use sc_executor::NativeExecutor;
use sc_network::{config::TransportConfig, multiaddr, NetworkService};
use sc_service::{
config::{
DatabaseConfig, KeystoreConfig, MultiaddrWithPeerId, NetworkConfiguration,
OffchainWorkerConfig, KeepBlocks, TransactionStorageMode, PruningMode, WasmExecutionMethod,
DatabaseConfig, KeepBlocks, KeystoreConfig, MultiaddrWithPeerId, NetworkConfiguration,
OffchainWorkerConfig, PruningMode, TransactionStorageMode, WasmExecutionMethod,
},
BasePath, ChainSpec, Configuration, Error as ServiceError, PartialComponents, Role,
RpcHandlers, TFullBackend, TFullClient, TaskExecutor, TaskManager,
@@ -93,7 +93,7 @@ pub fn new_partial(
client.clone(),
);
let import_queue = cumulus_client_consensus::import_queue::import_queue(
let import_queue = cumulus_client_consensus_relay_chain::import_queue(
client.clone(),
client.clone(),
inherent_data_providers.clone(),
@@ -123,7 +123,7 @@ pub fn new_partial(
async fn start_node_impl<RB>(
parachain_config: Configuration,
collator_key: CollatorPair,
polkadot_config: Configuration,
relay_chain_config: Configuration,
para_id: ParaId,
is_collator: bool,
rpc_ext_builder: RB,
@@ -155,8 +155,8 @@ where
let transaction_pool = params.transaction_pool.clone();
let mut task_manager = params.task_manager;
let polkadot_full_node = polkadot_test_service::new_full(
polkadot_config,
let relay_chain_full_node = polkadot_test_service::new_full(
relay_chain_config,
polkadot_service::IsCollator::Yes(collator_key.public()),
)
.map_err(|e| match e {
@@ -167,11 +167,11 @@ where
let client = params.client.clone();
let backend = params.backend.clone();
let block_announce_validator = BlockAnnounceValidator::new(
polkadot_full_node.client.clone(),
relay_chain_full_node.client.clone(),
para_id,
Box::new(polkadot_full_node.network.clone()),
polkadot_full_node.backend.clone(),
polkadot_full_node.client.clone(),
Box::new(relay_chain_full_node.network.clone()),
relay_chain_full_node.backend.clone(),
relay_chain_full_node.client.clone(),
);
let block_announce_validator_builder = move |_| Box::new(block_announce_validator) as Box<_>;
@@ -214,7 +214,6 @@ where
Arc::new(move |hash, data| network.announce_block(hash, Some(data)))
};
let polkadot_full_node = polkadot_full_node.with_client(polkadot_test_service::TestClient);
if is_collator {
let proposer_factory = sc_basic_authorship::ProposerFactory::new(
task_manager.spawn_handle(),
@@ -222,13 +221,20 @@ where
transaction_pool,
prometheus_registry.as_ref(),
);
let polkadot_backend = polkadot_full_node.backend.clone();
let params = StartCollatorParams {
let parachain_consensus = cumulus_client_consensus_relay_chain::RelayChainConsensus::new(
para_id,
proposer_factory,
inherent_data_providers: params.inherent_data_providers,
params.inherent_data_providers,
client.clone(),
relay_chain_full_node.client.clone(),
relay_chain_full_node.backend.clone(),
);
let relay_chain_full_node =
relay_chain_full_node.with_client(polkadot_test_service::TestClient);
let params = StartCollatorParams {
backend: params.backend,
block_import: client.clone(),
block_status: client.clone(),
announce_block,
client: client.clone(),
@@ -236,18 +242,21 @@ where
task_manager: &mut task_manager,
para_id,
collator_key,
polkadot_full_node,
polkadot_backend,
parachain_consensus: Box::new(parachain_consensus),
relay_chain_full_node,
};
start_collator(params).await?;
} else {
let relay_chain_full_node =
relay_chain_full_node.with_client(polkadot_test_service::TestClient);
let params = StartFullNodeParams {
client: client.clone(),
announce_block,
task_manager: &mut task_manager,
para_id,
polkadot_full_node,
polkadot_full_node: relay_chain_full_node,
};
start_full_node(params)?;
@@ -281,9 +290,9 @@ pub async fn run_test_node(
task_executor: TaskExecutor,
key: Sr25519Keyring,
parachain_storage_update_func: impl Fn(),
polkadot_storage_update_func: impl Fn(),
relay_chain_storage_update_func: impl Fn(),
parachain_boot_nodes: Vec<MultiaddrWithPeerId>,
polkadot_boot_nodes: Vec<MultiaddrWithPeerId>,
relay_chain_boot_nodes: Vec<MultiaddrWithPeerId>,
para_id: ParaId,
is_collator: bool,
) -> CumulusTestNode {
@@ -297,22 +306,22 @@ pub async fn run_test_node(
is_collator,
)
.expect("could not generate Configuration");
let mut polkadot_config = polkadot_test_service::node_config(
polkadot_storage_update_func,
let mut relay_chain_config = polkadot_test_service::node_config(
relay_chain_storage_update_func,
task_executor.clone(),
key,
polkadot_boot_nodes,
relay_chain_boot_nodes,
false,
);
polkadot_config.network.node_name =
format!("{} (relay chain)", polkadot_config.network.node_name);
relay_chain_config.network.node_name =
format!("{} (relay chain)", relay_chain_config.network.node_name);
let multiaddr = parachain_config.network.listen_addresses[0].clone();
let (task_manager, client, network, rpc_handlers) = start_node_impl(
parachain_config,
collator_key,
polkadot_config,
relay_chain_config,
para_id,
is_collator,
|_| Default::default(),