mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-30 09:21:04 +00:00
Consensus utilities and rearchitecture for more dynamic collators (#2382)
* implement a proposer utility for consensus * tidy up deps of new proposer crate * implement a collator-service crate * rewrite cumulus-collator to use new service struct * implement a module for relay-chain-driven collators * adapt start_collator to use the new relay_chain_driven module * move collator-service to a public submodule * create an interface trait for the proposer * begin aura reimplementation * address review comments * update substrrate git ref * update polkadot-primitives refs * rough draft of aura collation using standalone fns * add a ServiceInterface * port aura reimpl to use new service trait * add an import queue utility crate * remove import queue crate in favor of module in common * implement new verification queue for aura * implement remaining behaviors * split 'collate' into smaller functions that could be pub * add telemetry * fix doc job? * Specify async-trait patch version Co-authored-by: Bastian Köcher <git@kchr.de> * remove 'fn@' in doc string. Co-authored-by: Bastian Köcher <git@kchr.de> * update variable names to be more readable * refactor proposer errors to anyhow/thiserror * remove manual span instrumentation Co-authored-by: Bastian Köcher <git@kchr.de> * make slot_claim private * fix unused import * fmt * fmt * make clippy happy --------- Co-authored-by: Bastian Köcher <git@kchr.de>
This commit is contained in:
Generated
+25
@@ -2225,10 +2225,17 @@ name = "cumulus-client-consensus-aura"
|
|||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"async-trait",
|
"async-trait",
|
||||||
|
"cumulus-client-collator",
|
||||||
"cumulus-client-consensus-common",
|
"cumulus-client-consensus-common",
|
||||||
|
"cumulus-client-consensus-proposer",
|
||||||
"cumulus-primitives-core",
|
"cumulus-primitives-core",
|
||||||
|
"cumulus-primitives-parachain-inherent",
|
||||||
|
"cumulus-relay-chain-interface",
|
||||||
"futures",
|
"futures",
|
||||||
"parity-scale-codec",
|
"parity-scale-codec",
|
||||||
|
"polkadot-node-primitives",
|
||||||
|
"polkadot-overseer",
|
||||||
|
"polkadot-primitives",
|
||||||
"sc-client-api",
|
"sc-client-api",
|
||||||
"sc-consensus",
|
"sc-consensus",
|
||||||
"sc-consensus-aura",
|
"sc-consensus-aura",
|
||||||
@@ -2244,6 +2251,8 @@ dependencies = [
|
|||||||
"sp-inherents",
|
"sp-inherents",
|
||||||
"sp-keystore",
|
"sp-keystore",
|
||||||
"sp-runtime",
|
"sp-runtime",
|
||||||
|
"sp-state-machine",
|
||||||
|
"sp-timestamp",
|
||||||
"substrate-prometheus-endpoint",
|
"substrate-prometheus-endpoint",
|
||||||
"tracing",
|
"tracing",
|
||||||
]
|
]
|
||||||
@@ -2268,12 +2277,28 @@ dependencies = [
|
|||||||
"schnellru",
|
"schnellru",
|
||||||
"sp-blockchain",
|
"sp-blockchain",
|
||||||
"sp-consensus",
|
"sp-consensus",
|
||||||
|
"sp-core",
|
||||||
"sp-runtime",
|
"sp-runtime",
|
||||||
"sp-tracing",
|
"sp-tracing",
|
||||||
"sp-trie",
|
"sp-trie",
|
||||||
|
"substrate-prometheus-endpoint",
|
||||||
"tracing",
|
"tracing",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "cumulus-client-consensus-proposer"
|
||||||
|
version = "0.1.0"
|
||||||
|
dependencies = [
|
||||||
|
"anyhow",
|
||||||
|
"async-trait",
|
||||||
|
"cumulus-primitives-parachain-inherent",
|
||||||
|
"sp-consensus",
|
||||||
|
"sp-inherents",
|
||||||
|
"sp-runtime",
|
||||||
|
"sp-state-machine",
|
||||||
|
"thiserror",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "cumulus-client-consensus-relay-chain"
|
name = "cumulus-client-consensus-relay-chain"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ members = [
|
|||||||
"client/cli",
|
"client/cli",
|
||||||
"client/consensus/aura",
|
"client/consensus/aura",
|
||||||
"client/consensus/common",
|
"client/consensus/common",
|
||||||
|
"client/consensus/proposer",
|
||||||
"client/consensus/relay-chain",
|
"client/consensus/relay-chain",
|
||||||
"client/network",
|
"client/network",
|
||||||
"client/pov-recovery",
|
"client/pov-recovery",
|
||||||
|
|||||||
@@ -5,15 +5,15 @@ authors = ["Parity Technologies <admin@parity.io>"]
|
|||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
parking_lot = "0.12.1"
|
||||||
codec = { package = "parity-scale-codec", version = "3.0.0", features = [ "derive" ] }
|
codec = { package = "parity-scale-codec", version = "3.0.0", features = [ "derive" ] }
|
||||||
futures = "0.3.21"
|
futures = "0.3.21"
|
||||||
parking_lot = "0.12.0"
|
|
||||||
tracing = "0.1.25"
|
tracing = "0.1.25"
|
||||||
|
|
||||||
# Substrate
|
# Substrate
|
||||||
sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||||
sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
|
||||||
sp-consensus = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
sp-consensus = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||||
|
sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||||
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||||
sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||||
|
|
||||||
|
|||||||
+141
-224
@@ -16,50 +16,46 @@
|
|||||||
|
|
||||||
//! Cumulus Collator implementation for Substrate.
|
//! Cumulus Collator implementation for Substrate.
|
||||||
|
|
||||||
use cumulus_client_network::WaitToAnnounce;
|
|
||||||
use cumulus_primitives_core::{
|
use cumulus_primitives_core::{
|
||||||
relay_chain::Hash as PHash, CollationInfo, CollectCollationInfo, ParachainBlockData,
|
relay_chain::Hash as PHash, CollectCollationInfo, PersistedValidationData,
|
||||||
PersistedValidationData,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
use sc_client_api::BlockBackend;
|
use sc_client_api::BlockBackend;
|
||||||
use sp_api::{ApiExt, ProvideRuntimeApi};
|
use sp_api::ProvideRuntimeApi;
|
||||||
use sp_consensus::BlockStatus;
|
|
||||||
use sp_core::traits::SpawnNamed;
|
use sp_core::traits::SpawnNamed;
|
||||||
use sp_runtime::traits::{Block as BlockT, HashFor, Header as HeaderT, Zero};
|
use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
|
||||||
|
|
||||||
use cumulus_client_consensus_common::ParachainConsensus;
|
use cumulus_client_consensus_common::ParachainConsensus;
|
||||||
use polkadot_node_primitives::{
|
use polkadot_node_primitives::{CollationResult, MaybeCompressedPoV};
|
||||||
BlockData, Collation, CollationGenerationConfig, CollationResult, MaybeCompressedPoV, PoV,
|
|
||||||
};
|
|
||||||
use polkadot_node_subsystem::messages::{CollationGenerationMessage, CollatorProtocolMessage};
|
|
||||||
use polkadot_overseer::Handle as OverseerHandle;
|
use polkadot_overseer::Handle as OverseerHandle;
|
||||||
use polkadot_primitives::{CollatorPair, Id as ParaId};
|
use polkadot_primitives::{CollatorPair, Id as ParaId};
|
||||||
|
|
||||||
use codec::{Decode, Encode};
|
use codec::{Decode, Encode};
|
||||||
use futures::{channel::oneshot, FutureExt};
|
use futures::prelude::*;
|
||||||
use parking_lot::Mutex;
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tracing::Instrument;
|
|
||||||
|
use crate::service::CollatorService;
|
||||||
|
|
||||||
|
pub mod service;
|
||||||
|
|
||||||
/// The logging target.
|
/// The logging target.
|
||||||
const LOG_TARGET: &str = "cumulus-collator";
|
const LOG_TARGET: &str = "cumulus-collator";
|
||||||
|
|
||||||
/// The implementation of the Cumulus `Collator`.
|
/// The implementation of the Cumulus `Collator`.
|
||||||
|
///
|
||||||
|
/// Note that this implementation is soon to be deprecated and removed, and it is suggested to
|
||||||
|
/// directly use the [`CollatorService`] instead, so consensus engine implementations
|
||||||
|
/// live at the top level.
|
||||||
pub struct Collator<Block: BlockT, BS, RA> {
|
pub struct Collator<Block: BlockT, BS, RA> {
|
||||||
block_status: Arc<BS>,
|
service: CollatorService<Block, BS, RA>,
|
||||||
parachain_consensus: Box<dyn ParachainConsensus<Block>>,
|
parachain_consensus: Box<dyn ParachainConsensus<Block>>,
|
||||||
wait_to_announce: Arc<Mutex<WaitToAnnounce<Block>>>,
|
|
||||||
runtime_api: Arc<RA>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<Block: BlockT, BS, RA> Clone for Collator<Block, BS, RA> {
|
impl<Block: BlockT, BS, RA> Clone for Collator<Block, BS, RA> {
|
||||||
fn clone(&self) -> Self {
|
fn clone(&self) -> Self {
|
||||||
Self {
|
Collator {
|
||||||
block_status: self.block_status.clone(),
|
service: self.service.clone(),
|
||||||
wait_to_announce: self.wait_to_announce.clone(),
|
|
||||||
parachain_consensus: self.parachain_consensus.clone(),
|
parachain_consensus: self.parachain_consensus.clone(),
|
||||||
runtime_api: self.runtime_api.clone(),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -73,159 +69,10 @@ where
|
|||||||
{
|
{
|
||||||
/// Create a new instance.
|
/// Create a new instance.
|
||||||
fn new(
|
fn new(
|
||||||
block_status: Arc<BS>,
|
collator_service: CollatorService<Block, BS, RA>,
|
||||||
spawner: Arc<dyn SpawnNamed + Send + Sync>,
|
|
||||||
announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
|
|
||||||
runtime_api: Arc<RA>,
|
|
||||||
parachain_consensus: Box<dyn ParachainConsensus<Block>>,
|
parachain_consensus: Box<dyn ParachainConsensus<Block>>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let wait_to_announce = Arc::new(Mutex::new(WaitToAnnounce::new(spawner, announce_block)));
|
Self { service: collator_service, parachain_consensus }
|
||||||
|
|
||||||
Self { block_status, wait_to_announce, runtime_api, parachain_consensus }
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Checks the status of the given block hash in the Parachain.
|
|
||||||
///
|
|
||||||
/// Returns `true` if the block could be found and is good to be build on.
|
|
||||||
fn check_block_status(&self, hash: Block::Hash, header: &Block::Header) -> bool {
|
|
||||||
match self.block_status.block_status(hash) {
|
|
||||||
Ok(BlockStatus::Queued) => {
|
|
||||||
tracing::debug!(
|
|
||||||
target: LOG_TARGET,
|
|
||||||
block_hash = ?hash,
|
|
||||||
"Skipping candidate production, because block is still queued for import.",
|
|
||||||
);
|
|
||||||
false
|
|
||||||
},
|
|
||||||
Ok(BlockStatus::InChainWithState) => true,
|
|
||||||
Ok(BlockStatus::InChainPruned) => {
|
|
||||||
tracing::error!(
|
|
||||||
target: LOG_TARGET,
|
|
||||||
"Skipping candidate production, because block `{:?}` is already pruned!",
|
|
||||||
hash,
|
|
||||||
);
|
|
||||||
false
|
|
||||||
},
|
|
||||||
Ok(BlockStatus::KnownBad) => {
|
|
||||||
tracing::error!(
|
|
||||||
target: LOG_TARGET,
|
|
||||||
block_hash = ?hash,
|
|
||||||
"Block is tagged as known bad and is included in the relay chain! Skipping candidate production!",
|
|
||||||
);
|
|
||||||
false
|
|
||||||
},
|
|
||||||
Ok(BlockStatus::Unknown) => {
|
|
||||||
if header.number().is_zero() {
|
|
||||||
tracing::error!(
|
|
||||||
target: LOG_TARGET,
|
|
||||||
block_hash = ?hash,
|
|
||||||
"Could not find the header of the genesis block in the database!",
|
|
||||||
);
|
|
||||||
} else {
|
|
||||||
tracing::debug!(
|
|
||||||
target: LOG_TARGET,
|
|
||||||
block_hash = ?hash,
|
|
||||||
"Skipping candidate production, because block is unknown.",
|
|
||||||
);
|
|
||||||
}
|
|
||||||
false
|
|
||||||
},
|
|
||||||
Err(e) => {
|
|
||||||
tracing::error!(
|
|
||||||
target: LOG_TARGET,
|
|
||||||
block_hash = ?hash,
|
|
||||||
error = ?e,
|
|
||||||
"Failed to get block status.",
|
|
||||||
);
|
|
||||||
false
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Fetch the collation info from the runtime.
|
|
||||||
///
|
|
||||||
/// Returns `Ok(Some(_))` on success, `Err(_)` on error or `Ok(None)` if the runtime api isn't implemented by the runtime.
|
|
||||||
fn fetch_collation_info(
|
|
||||||
&self,
|
|
||||||
block_hash: Block::Hash,
|
|
||||||
header: &Block::Header,
|
|
||||||
) -> Result<Option<CollationInfo>, sp_api::ApiError> {
|
|
||||||
let runtime_api = self.runtime_api.runtime_api();
|
|
||||||
|
|
||||||
let api_version =
|
|
||||||
match runtime_api.api_version::<dyn CollectCollationInfo<Block>>(block_hash)? {
|
|
||||||
Some(version) => version,
|
|
||||||
None => {
|
|
||||||
tracing::error!(
|
|
||||||
target: LOG_TARGET,
|
|
||||||
"Could not fetch `CollectCollationInfo` runtime api version."
|
|
||||||
);
|
|
||||||
return Ok(None)
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
let collation_info = if api_version < 2 {
|
|
||||||
#[allow(deprecated)]
|
|
||||||
runtime_api
|
|
||||||
.collect_collation_info_before_version_2(block_hash)?
|
|
||||||
.into_latest(header.encode().into())
|
|
||||||
} else {
|
|
||||||
runtime_api.collect_collation_info(block_hash, header)?
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok(Some(collation_info))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn build_collation(
|
|
||||||
&self,
|
|
||||||
block: ParachainBlockData<Block>,
|
|
||||||
block_hash: Block::Hash,
|
|
||||||
pov: PoV,
|
|
||||||
) -> Option<Collation> {
|
|
||||||
let collation_info = self
|
|
||||||
.fetch_collation_info(block_hash, block.header())
|
|
||||||
.map_err(|e| {
|
|
||||||
tracing::error!(
|
|
||||||
target: LOG_TARGET,
|
|
||||||
error = ?e,
|
|
||||||
"Failed to collect collation info.",
|
|
||||||
)
|
|
||||||
})
|
|
||||||
.ok()
|
|
||||||
.flatten()?;
|
|
||||||
|
|
||||||
let upward_messages = collation_info
|
|
||||||
.upward_messages
|
|
||||||
.try_into()
|
|
||||||
.map_err(|e| {
|
|
||||||
tracing::error!(
|
|
||||||
target: LOG_TARGET,
|
|
||||||
error = ?e,
|
|
||||||
"Number of upward messages should not be greater than `MAX_UPWARD_MESSAGE_NUM`",
|
|
||||||
)
|
|
||||||
})
|
|
||||||
.ok()?;
|
|
||||||
let horizontal_messages = collation_info
|
|
||||||
.horizontal_messages
|
|
||||||
.try_into()
|
|
||||||
.map_err(|e| {
|
|
||||||
tracing::error!(
|
|
||||||
target: LOG_TARGET,
|
|
||||||
error = ?e,
|
|
||||||
"Number of horizontal messages should not be greater than `MAX_HORIZONTAL_MESSAGE_NUM`",
|
|
||||||
)
|
|
||||||
})
|
|
||||||
.ok()?;
|
|
||||||
|
|
||||||
Some(Collation {
|
|
||||||
upward_messages,
|
|
||||||
new_validation_code: collation_info.new_validation_code,
|
|
||||||
processed_downward_messages: collation_info.processed_downward_messages,
|
|
||||||
horizontal_messages,
|
|
||||||
hrmp_watermark: collation_info.hrmp_watermark,
|
|
||||||
head_data: collation_info.head_data,
|
|
||||||
proof_of_validity: MaybeCompressedPoV::Compressed(pov),
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn produce_candidate(
|
async fn produce_candidate(
|
||||||
@@ -252,7 +99,7 @@ where
|
|||||||
};
|
};
|
||||||
|
|
||||||
let last_head_hash = last_head.hash();
|
let last_head_hash = last_head.hash();
|
||||||
if !self.check_block_status(last_head_hash, &last_head) {
|
if !self.service.check_block_status(last_head_hash, &last_head) {
|
||||||
return None
|
return None
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -268,19 +115,9 @@ where
|
|||||||
.produce_candidate(&last_head, relay_parent, &validation_data)
|
.produce_candidate(&last_head, relay_parent, &validation_data)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let (header, extrinsics) = candidate.block.deconstruct();
|
let block_hash = candidate.block.header().hash();
|
||||||
|
|
||||||
let compact_proof =
|
let (collation, b) = self.service.build_collation(&last_head, block_hash, candidate)?;
|
||||||
match candidate.proof.into_compact_proof::<HashFor<Block>>(*last_head.state_root()) {
|
|
||||||
Ok(proof) => proof,
|
|
||||||
Err(e) => {
|
|
||||||
tracing::error!(target: "cumulus-collator", "Failed to compact proof: {:?}", e);
|
|
||||||
return None
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
// Create the parachain block data for the validators.
|
|
||||||
let b = ParachainBlockData::<Block>::new(header, extrinsics, compact_proof);
|
|
||||||
|
|
||||||
tracing::info!(
|
tracing::info!(
|
||||||
target: LOG_TARGET,
|
target: LOG_TARGET,
|
||||||
@@ -290,21 +127,15 @@ where
|
|||||||
b.storage_proof().encode().len() as f64 / 1024f64,
|
b.storage_proof().encode().len() as f64 / 1024f64,
|
||||||
);
|
);
|
||||||
|
|
||||||
let pov =
|
if let MaybeCompressedPoV::Compressed(ref pov) = collation.proof_of_validity {
|
||||||
polkadot_node_primitives::maybe_compress_pov(PoV { block_data: BlockData(b.encode()) });
|
tracing::info!(
|
||||||
|
target: LOG_TARGET,
|
||||||
|
"Compressed PoV size: {}kb",
|
||||||
|
pov.block_data.0.len() as f64 / 1024f64,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
tracing::info!(
|
let result_sender = self.service.announce_with_barrier(block_hash);
|
||||||
target: LOG_TARGET,
|
|
||||||
"Compressed PoV size: {}kb",
|
|
||||||
pov.block_data.0.len() as f64 / 1024f64,
|
|
||||||
);
|
|
||||||
|
|
||||||
let block_hash = b.header().hash();
|
|
||||||
let collation = self.build_collation(b, block_hash, pov)?;
|
|
||||||
|
|
||||||
let (result_sender, signed_stmt_recv) = oneshot::channel();
|
|
||||||
|
|
||||||
self.wait_to_announce.lock().wait_to_announce(block_hash, signed_stmt_recv);
|
|
||||||
|
|
||||||
tracing::info!(target: LOG_TARGET, ?block_hash, "Produced proof-of-validity candidate.",);
|
tracing::info!(target: LOG_TARGET, ?block_hash, "Produced proof-of-validity candidate.",);
|
||||||
|
|
||||||
@@ -312,6 +143,96 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Relay-chain-driven collators are those whose block production is driven purely
|
||||||
|
/// by new relay chain blocks and the most recently included parachain blocks
|
||||||
|
/// within them.
|
||||||
|
///
|
||||||
|
/// This method of driving collators is not suited to anything but the most simple parachain
|
||||||
|
/// consensus mechanisms, and this module may soon be deprecated.
|
||||||
|
pub mod relay_chain_driven {
|
||||||
|
use futures::{
|
||||||
|
channel::{mpsc, oneshot},
|
||||||
|
prelude::*,
|
||||||
|
};
|
||||||
|
use polkadot_node_primitives::{CollationGenerationConfig, CollationResult};
|
||||||
|
use polkadot_node_subsystem::messages::{CollationGenerationMessage, CollatorProtocolMessage};
|
||||||
|
use polkadot_overseer::Handle as OverseerHandle;
|
||||||
|
use polkadot_primitives::{CollatorPair, Id as ParaId};
|
||||||
|
|
||||||
|
use cumulus_primitives_core::{relay_chain::Hash as PHash, PersistedValidationData};
|
||||||
|
|
||||||
|
/// A request to author a collation, based on the advancement of the relay chain.
|
||||||
|
///
|
||||||
|
/// See the module docs for more info on relay-chain-driven collators.
|
||||||
|
pub struct CollationRequest {
|
||||||
|
relay_parent: PHash,
|
||||||
|
pvd: PersistedValidationData,
|
||||||
|
sender: oneshot::Sender<Option<CollationResult>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl CollationRequest {
|
||||||
|
/// Get the relay parent of the collation request.
|
||||||
|
pub fn relay_parent(&self) -> &PHash {
|
||||||
|
&self.relay_parent
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get the [`PersistedValidationData`] for the request.
|
||||||
|
pub fn persisted_validation_data(&self) -> &PersistedValidationData {
|
||||||
|
&self.pvd
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Complete the request with a collation, if any.
|
||||||
|
pub fn complete(self, collation: Option<CollationResult>) {
|
||||||
|
let _ = self.sender.send(collation);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Initialize the collator with Polkadot's collation-generation
|
||||||
|
/// subsystem, returning a stream of collation requests to handle.
|
||||||
|
pub async fn init(
|
||||||
|
key: CollatorPair,
|
||||||
|
para_id: ParaId,
|
||||||
|
overseer_handle: OverseerHandle,
|
||||||
|
) -> mpsc::Receiver<CollationRequest> {
|
||||||
|
let mut overseer_handle = overseer_handle;
|
||||||
|
|
||||||
|
let (stream_tx, stream_rx) = mpsc::channel(0);
|
||||||
|
let config = CollationGenerationConfig {
|
||||||
|
key,
|
||||||
|
para_id,
|
||||||
|
collator: Box::new(move |relay_parent, validation_data| {
|
||||||
|
// Cloning the channel on each usage effectively makes the channel
|
||||||
|
// unbounded. The channel is actually bounded by the block production
|
||||||
|
// and consensus systems of Polkadot, which limits the amount of possible
|
||||||
|
// blocks.
|
||||||
|
let mut stream_tx = stream_tx.clone();
|
||||||
|
let validation_data = validation_data.clone();
|
||||||
|
Box::pin(async move {
|
||||||
|
let (this_tx, this_rx) = oneshot::channel();
|
||||||
|
let request =
|
||||||
|
CollationRequest { relay_parent, pvd: validation_data, sender: this_tx };
|
||||||
|
|
||||||
|
if stream_tx.send(request).await.is_err() {
|
||||||
|
return None
|
||||||
|
}
|
||||||
|
|
||||||
|
this_rx.await.ok().flatten()
|
||||||
|
})
|
||||||
|
}),
|
||||||
|
};
|
||||||
|
|
||||||
|
overseer_handle
|
||||||
|
.send_msg(CollationGenerationMessage::Initialize(config), "StartCollator")
|
||||||
|
.await;
|
||||||
|
|
||||||
|
overseer_handle
|
||||||
|
.send_msg(CollatorProtocolMessage::CollateOn(para_id), "StartCollator")
|
||||||
|
.await;
|
||||||
|
|
||||||
|
stream_rx
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Parameters for [`start_collator`].
|
/// Parameters for [`start_collator`].
|
||||||
pub struct StartCollatorParams<Block: BlockT, RA, BS, Spawner> {
|
pub struct StartCollatorParams<Block: BlockT, RA, BS, Spawner> {
|
||||||
pub para_id: ParaId,
|
pub para_id: ParaId,
|
||||||
@@ -330,7 +251,7 @@ pub async fn start_collator<Block, RA, BS, Spawner>(
|
|||||||
para_id,
|
para_id,
|
||||||
block_status,
|
block_status,
|
||||||
announce_block,
|
announce_block,
|
||||||
mut overseer_handle,
|
overseer_handle,
|
||||||
spawner,
|
spawner,
|
||||||
key,
|
key,
|
||||||
parachain_consensus,
|
parachain_consensus,
|
||||||
@@ -343,34 +264,28 @@ pub async fn start_collator<Block, RA, BS, Spawner>(
|
|||||||
RA: ProvideRuntimeApi<Block> + Send + Sync + 'static,
|
RA: ProvideRuntimeApi<Block> + Send + Sync + 'static,
|
||||||
RA::Api: CollectCollationInfo<Block>,
|
RA::Api: CollectCollationInfo<Block>,
|
||||||
{
|
{
|
||||||
let collator = Collator::new(
|
let collator_service =
|
||||||
block_status,
|
CollatorService::new(block_status, Arc::new(spawner.clone()), announce_block, runtime_api);
|
||||||
Arc::new(spawner),
|
|
||||||
announce_block,
|
|
||||||
runtime_api,
|
|
||||||
parachain_consensus,
|
|
||||||
);
|
|
||||||
|
|
||||||
let span = tracing::Span::current();
|
let collator = Collator::new(collator_service, parachain_consensus);
|
||||||
let config = CollationGenerationConfig {
|
|
||||||
key,
|
|
||||||
para_id,
|
|
||||||
collator: Box::new(move |relay_parent, validation_data| {
|
|
||||||
let collator = collator.clone();
|
|
||||||
collator
|
|
||||||
.produce_candidate(relay_parent, validation_data.clone())
|
|
||||||
.instrument(span.clone())
|
|
||||||
.boxed()
|
|
||||||
}),
|
|
||||||
};
|
|
||||||
|
|
||||||
overseer_handle
|
let mut request_stream = relay_chain_driven::init(key, para_id, overseer_handle).await;
|
||||||
.send_msg(CollationGenerationMessage::Initialize(config), "StartCollator")
|
|
||||||
.await;
|
|
||||||
|
|
||||||
overseer_handle
|
let collation_future = Box::pin(async move {
|
||||||
.send_msg(CollatorProtocolMessage::CollateOn(para_id), "StartCollator")
|
while let Some(request) = request_stream.next().await {
|
||||||
.await;
|
let collation = collator
|
||||||
|
.clone()
|
||||||
|
.produce_candidate(
|
||||||
|
*request.relay_parent(),
|
||||||
|
request.persisted_validation_data().clone(),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
request.complete(collation);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
spawner.spawn("cumulus-relay-driven-collator", None, collation_future);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
@@ -378,12 +293,14 @@ mod tests {
|
|||||||
use super::*;
|
use super::*;
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use cumulus_client_consensus_common::ParachainCandidate;
|
use cumulus_client_consensus_common::ParachainCandidate;
|
||||||
|
use cumulus_primitives_core::ParachainBlockData;
|
||||||
use cumulus_test_client::{
|
use cumulus_test_client::{
|
||||||
Client, ClientBlockImportExt, DefaultTestClientBuilderExt, InitBlockBuilder,
|
Client, ClientBlockImportExt, DefaultTestClientBuilderExt, InitBlockBuilder,
|
||||||
TestClientBuilder, TestClientBuilderExt,
|
TestClientBuilder, TestClientBuilderExt,
|
||||||
};
|
};
|
||||||
use cumulus_test_runtime::{Block, Header};
|
use cumulus_test_runtime::{Block, Header};
|
||||||
use futures::{channel::mpsc, executor::block_on, StreamExt};
|
use futures::{channel::mpsc, executor::block_on, StreamExt};
|
||||||
|
use polkadot_node_subsystem::messages::CollationGenerationMessage;
|
||||||
use polkadot_node_subsystem_test_helpers::ForwardSubsystem;
|
use polkadot_node_subsystem_test_helpers::ForwardSubsystem;
|
||||||
use polkadot_overseer::{dummy::dummy_overseer_builder, HeadSupportsParachains};
|
use polkadot_overseer::{dummy::dummy_overseer_builder, HeadSupportsParachains};
|
||||||
use sp_consensus::BlockOrigin;
|
use sp_consensus::BlockOrigin;
|
||||||
|
|||||||
@@ -0,0 +1,318 @@
|
|||||||
|
// Copyright 2023 Parity Technologies (UK) Ltd.
|
||||||
|
// This file is part of Cumulus.
|
||||||
|
|
||||||
|
// Cumulus is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
|
||||||
|
// Cumulus is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU General Public License for more details.
|
||||||
|
|
||||||
|
// You should have received a copy of the GNU General Public License
|
||||||
|
// along with Cumulus. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
//! The Cumulus [`CollatorService`] is a utility struct for performing common
|
||||||
|
//! operations used in parachain consensus/authoring.
|
||||||
|
|
||||||
|
use cumulus_client_network::WaitToAnnounce;
|
||||||
|
use cumulus_primitives_core::{CollationInfo, CollectCollationInfo, ParachainBlockData};
|
||||||
|
|
||||||
|
use sc_client_api::BlockBackend;
|
||||||
|
use sp_api::{ApiExt, ProvideRuntimeApi};
|
||||||
|
use sp_consensus::BlockStatus;
|
||||||
|
use sp_core::traits::SpawnNamed;
|
||||||
|
use sp_runtime::traits::{Block as BlockT, HashFor, Header as HeaderT, Zero};
|
||||||
|
|
||||||
|
use cumulus_client_consensus_common::ParachainCandidate;
|
||||||
|
use polkadot_node_primitives::{
|
||||||
|
BlockData, Collation, CollationSecondedSignal, MaybeCompressedPoV, PoV,
|
||||||
|
};
|
||||||
|
|
||||||
|
use codec::Encode;
|
||||||
|
use futures::channel::oneshot;
|
||||||
|
use parking_lot::Mutex;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
/// The logging target.
|
||||||
|
const LOG_TARGET: &str = "cumulus-collator";
|
||||||
|
|
||||||
|
/// Utility functions generally applicable to writing collators for Cumulus.
|
||||||
|
pub trait ServiceInterface<Block: BlockT> {
|
||||||
|
/// Checks the status of the given block hash in the Parachain.
|
||||||
|
///
|
||||||
|
/// Returns `true` if the block could be found and is good to be build on.
|
||||||
|
fn check_block_status(&self, hash: Block::Hash, header: &Block::Header) -> bool;
|
||||||
|
|
||||||
|
/// Build a full [`Collation`] from a given [`ParachainCandidate`]. This requires
|
||||||
|
/// that the underlying block has been fully imported into the underlying client,
|
||||||
|
/// as implementations will fetch underlying runtime API data.
|
||||||
|
///
|
||||||
|
/// This also returns the unencoded parachain block data, in case that is desired.
|
||||||
|
fn build_collation(
|
||||||
|
&self,
|
||||||
|
parent_header: &Block::Header,
|
||||||
|
block_hash: Block::Hash,
|
||||||
|
candidate: ParachainCandidate<Block>,
|
||||||
|
) -> Option<(Collation, ParachainBlockData<Block>)>;
|
||||||
|
|
||||||
|
/// Inform networking systems that the block should be announced after an appropriate
|
||||||
|
/// signal has been received. This returns the sending half of the signal.
|
||||||
|
fn announce_with_barrier(
|
||||||
|
&self,
|
||||||
|
block_hash: Block::Hash,
|
||||||
|
) -> oneshot::Sender<CollationSecondedSignal>;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The [`CollatorService`] provides common utilities for parachain consensus and authoring.
|
||||||
|
///
|
||||||
|
/// This includes logic for checking the block status of arbitrary parachain headers
|
||||||
|
/// gathered from the relay chain state, creating full [`Collation`]s to be shared with validators,
|
||||||
|
/// and distributing new parachain blocks along the network.
|
||||||
|
pub struct CollatorService<Block: BlockT, BS, RA> {
|
||||||
|
block_status: Arc<BS>,
|
||||||
|
wait_to_announce: Arc<Mutex<WaitToAnnounce<Block>>>,
|
||||||
|
runtime_api: Arc<RA>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<Block: BlockT, BS, RA> Clone for CollatorService<Block, BS, RA> {
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
Self {
|
||||||
|
block_status: self.block_status.clone(),
|
||||||
|
wait_to_announce: self.wait_to_announce.clone(),
|
||||||
|
runtime_api: self.runtime_api.clone(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<Block, BS, RA> CollatorService<Block, BS, RA>
|
||||||
|
where
|
||||||
|
Block: BlockT,
|
||||||
|
BS: BlockBackend<Block>,
|
||||||
|
RA: ProvideRuntimeApi<Block>,
|
||||||
|
RA::Api: CollectCollationInfo<Block>,
|
||||||
|
{
|
||||||
|
/// Create a new instance.
|
||||||
|
pub fn new(
|
||||||
|
block_status: Arc<BS>,
|
||||||
|
spawner: Arc<dyn SpawnNamed + Send + Sync>,
|
||||||
|
announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
|
||||||
|
runtime_api: Arc<RA>,
|
||||||
|
) -> Self {
|
||||||
|
let wait_to_announce = Arc::new(Mutex::new(WaitToAnnounce::new(spawner, announce_block)));
|
||||||
|
|
||||||
|
Self { block_status, wait_to_announce, runtime_api }
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Checks the status of the given block hash in the Parachain.
|
||||||
|
///
|
||||||
|
/// Returns `true` if the block could be found and is good to be build on.
|
||||||
|
pub fn check_block_status(&self, hash: Block::Hash, header: &Block::Header) -> bool {
|
||||||
|
match self.block_status.block_status(hash) {
|
||||||
|
Ok(BlockStatus::Queued) => {
|
||||||
|
tracing::debug!(
|
||||||
|
target: LOG_TARGET,
|
||||||
|
block_hash = ?hash,
|
||||||
|
"Skipping candidate production, because block is still queued for import.",
|
||||||
|
);
|
||||||
|
false
|
||||||
|
},
|
||||||
|
Ok(BlockStatus::InChainWithState) => true,
|
||||||
|
Ok(BlockStatus::InChainPruned) => {
|
||||||
|
tracing::error!(
|
||||||
|
target: LOG_TARGET,
|
||||||
|
"Skipping candidate production, because block `{:?}` is already pruned!",
|
||||||
|
hash,
|
||||||
|
);
|
||||||
|
false
|
||||||
|
},
|
||||||
|
Ok(BlockStatus::KnownBad) => {
|
||||||
|
tracing::error!(
|
||||||
|
target: LOG_TARGET,
|
||||||
|
block_hash = ?hash,
|
||||||
|
"Block is tagged as known bad and is included in the relay chain! Skipping candidate production!",
|
||||||
|
);
|
||||||
|
false
|
||||||
|
},
|
||||||
|
Ok(BlockStatus::Unknown) => {
|
||||||
|
if header.number().is_zero() {
|
||||||
|
tracing::error!(
|
||||||
|
target: LOG_TARGET,
|
||||||
|
block_hash = ?hash,
|
||||||
|
"Could not find the header of the genesis block in the database!",
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
tracing::debug!(
|
||||||
|
target: LOG_TARGET,
|
||||||
|
block_hash = ?hash,
|
||||||
|
"Skipping candidate production, because block is unknown.",
|
||||||
|
);
|
||||||
|
}
|
||||||
|
false
|
||||||
|
},
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!(
|
||||||
|
target: LOG_TARGET,
|
||||||
|
block_hash = ?hash,
|
||||||
|
error = ?e,
|
||||||
|
"Failed to get block status.",
|
||||||
|
);
|
||||||
|
false
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Fetch the collation info from the runtime.
|
||||||
|
///
|
||||||
|
/// Returns `Ok(Some(_))` on success, `Err(_)` on error or `Ok(None)` if the runtime api isn't implemented by the runtime.
|
||||||
|
pub fn fetch_collation_info(
|
||||||
|
&self,
|
||||||
|
block_hash: Block::Hash,
|
||||||
|
header: &Block::Header,
|
||||||
|
) -> Result<Option<CollationInfo>, sp_api::ApiError> {
|
||||||
|
let runtime_api = self.runtime_api.runtime_api();
|
||||||
|
|
||||||
|
let api_version =
|
||||||
|
match runtime_api.api_version::<dyn CollectCollationInfo<Block>>(block_hash)? {
|
||||||
|
Some(version) => version,
|
||||||
|
None => {
|
||||||
|
tracing::error!(
|
||||||
|
target: LOG_TARGET,
|
||||||
|
"Could not fetch `CollectCollationInfo` runtime api version."
|
||||||
|
);
|
||||||
|
return Ok(None)
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
let collation_info = if api_version < 2 {
|
||||||
|
#[allow(deprecated)]
|
||||||
|
runtime_api
|
||||||
|
.collect_collation_info_before_version_2(block_hash)?
|
||||||
|
.into_latest(header.encode().into())
|
||||||
|
} else {
|
||||||
|
runtime_api.collect_collation_info(block_hash, header)?
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(Some(collation_info))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Build a full [`Collation`] from a given [`ParachainCandidate`]. This requires
|
||||||
|
/// that the underlying block has been fully imported into the underlying client,
|
||||||
|
/// as it fetches underlying runtime API data.
|
||||||
|
///
|
||||||
|
/// This also returns the unencoded parachain block data, in case that is desired.
|
||||||
|
pub fn build_collation(
|
||||||
|
&self,
|
||||||
|
parent_header: &Block::Header,
|
||||||
|
block_hash: Block::Hash,
|
||||||
|
candidate: ParachainCandidate<Block>,
|
||||||
|
) -> Option<(Collation, ParachainBlockData<Block>)> {
|
||||||
|
let (header, extrinsics) = candidate.block.deconstruct();
|
||||||
|
|
||||||
|
let compact_proof = match candidate
|
||||||
|
.proof
|
||||||
|
.into_compact_proof::<HashFor<Block>>(*parent_header.state_root())
|
||||||
|
{
|
||||||
|
Ok(proof) => proof,
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!(target: "cumulus-collator", "Failed to compact proof: {:?}", e);
|
||||||
|
return None
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
// Create the parachain block data for the validators.
|
||||||
|
let collation_info = self
|
||||||
|
.fetch_collation_info(block_hash, &header)
|
||||||
|
.map_err(|e| {
|
||||||
|
tracing::error!(
|
||||||
|
target: LOG_TARGET,
|
||||||
|
error = ?e,
|
||||||
|
"Failed to collect collation info.",
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.ok()
|
||||||
|
.flatten()?;
|
||||||
|
|
||||||
|
let block_data = ParachainBlockData::<Block>::new(header, extrinsics, compact_proof);
|
||||||
|
|
||||||
|
let pov = polkadot_node_primitives::maybe_compress_pov(PoV {
|
||||||
|
block_data: BlockData(block_data.encode()),
|
||||||
|
});
|
||||||
|
|
||||||
|
let upward_messages = collation_info
|
||||||
|
.upward_messages
|
||||||
|
.try_into()
|
||||||
|
.map_err(|e| {
|
||||||
|
tracing::error!(
|
||||||
|
target: LOG_TARGET,
|
||||||
|
error = ?e,
|
||||||
|
"Number of upward messages should not be greater than `MAX_UPWARD_MESSAGE_NUM`",
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.ok()?;
|
||||||
|
let horizontal_messages = collation_info
|
||||||
|
.horizontal_messages
|
||||||
|
.try_into()
|
||||||
|
.map_err(|e| {
|
||||||
|
tracing::error!(
|
||||||
|
target: LOG_TARGET,
|
||||||
|
error = ?e,
|
||||||
|
"Number of horizontal messages should not be greater than `MAX_HORIZONTAL_MESSAGE_NUM`",
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.ok()?;
|
||||||
|
|
||||||
|
let collation = Collation {
|
||||||
|
upward_messages,
|
||||||
|
new_validation_code: collation_info.new_validation_code,
|
||||||
|
processed_downward_messages: collation_info.processed_downward_messages,
|
||||||
|
horizontal_messages,
|
||||||
|
hrmp_watermark: collation_info.hrmp_watermark,
|
||||||
|
head_data: collation_info.head_data,
|
||||||
|
proof_of_validity: MaybeCompressedPoV::Compressed(pov),
|
||||||
|
};
|
||||||
|
|
||||||
|
Some((collation, block_data))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Inform the networking systems that the block should be announced after an appropriate
|
||||||
|
/// signal has been received. This returns the sending half of the signal.
|
||||||
|
pub fn announce_with_barrier(
|
||||||
|
&self,
|
||||||
|
block_hash: Block::Hash,
|
||||||
|
) -> oneshot::Sender<CollationSecondedSignal> {
|
||||||
|
let (result_sender, signed_stmt_recv) = oneshot::channel();
|
||||||
|
self.wait_to_announce.lock().wait_to_announce(block_hash, signed_stmt_recv);
|
||||||
|
result_sender
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<Block, BS, RA> ServiceInterface<Block> for CollatorService<Block, BS, RA>
|
||||||
|
where
|
||||||
|
Block: BlockT,
|
||||||
|
BS: BlockBackend<Block>,
|
||||||
|
RA: ProvideRuntimeApi<Block>,
|
||||||
|
RA::Api: CollectCollationInfo<Block>,
|
||||||
|
{
|
||||||
|
fn check_block_status(&self, hash: Block::Hash, header: &Block::Header) -> bool {
|
||||||
|
CollatorService::check_block_status(self, hash, header)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn build_collation(
|
||||||
|
&self,
|
||||||
|
parent_header: &Block::Header,
|
||||||
|
block_hash: Block::Hash,
|
||||||
|
candidate: ParachainCandidate<Block>,
|
||||||
|
) -> Option<(Collation, ParachainBlockData<Block>)> {
|
||||||
|
CollatorService::build_collation(self, parent_header, block_hash, candidate)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn announce_with_barrier(
|
||||||
|
&self,
|
||||||
|
block_hash: Block::Hash,
|
||||||
|
) -> oneshot::Sender<CollationSecondedSignal> {
|
||||||
|
CollatorService::announce_with_barrier(self, block_hash)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -27,8 +27,19 @@ sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
|||||||
sp-inherents = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
sp-inherents = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||||
sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||||
sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||||
|
sp-timestamp = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||||
|
sp-state-machine = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||||
substrate-prometheus-endpoint = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
substrate-prometheus-endpoint = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||||
|
|
||||||
# Cumulus
|
# Cumulus
|
||||||
cumulus-client-consensus-common = { path = "../common" }
|
cumulus-client-consensus-common = { path = "../common" }
|
||||||
|
cumulus-relay-chain-interface = { path = "../../relay-chain-interface" }
|
||||||
|
cumulus-client-consensus-proposer = { path = "../proposer" }
|
||||||
cumulus-primitives-core = { path = "../../../primitives/core" }
|
cumulus-primitives-core = { path = "../../../primitives/core" }
|
||||||
|
cumulus-primitives-parachain-inherent = { path = "../../../primitives/parachain-inherent" }
|
||||||
|
cumulus-client-collator = { path = "../../collator" }
|
||||||
|
|
||||||
|
# Polkadot
|
||||||
|
polkadot-primitives = { git = "https://github.com/paritytech/polkadot", branch = "master" }
|
||||||
|
polkadot-node-primitives = { git = "https://github.com/paritytech/polkadot", branch = "master" }
|
||||||
|
polkadot-overseer = { git = "https://github.com/paritytech/polkadot", branch = "master" }
|
||||||
|
|||||||
@@ -50,6 +50,8 @@ pub use import_queue::{build_verifier, import_queue, BuildVerifierParams, Import
|
|||||||
pub use sc_consensus_aura::{slot_duration, AuraVerifier, BuildAuraWorkerParams, SlotProportion};
|
pub use sc_consensus_aura::{slot_duration, AuraVerifier, BuildAuraWorkerParams, SlotProportion};
|
||||||
pub use sc_consensus_slots::InherentDataProviderExt;
|
pub use sc_consensus_slots::InherentDataProviderExt;
|
||||||
|
|
||||||
|
pub mod unstable_reimpl;
|
||||||
|
|
||||||
const LOG_TARGET: &str = "aura::cumulus";
|
const LOG_TARGET: &str = "aura::cumulus";
|
||||||
|
|
||||||
/// The implementation of the AURA consensus for parachains.
|
/// The implementation of the AURA consensus for parachains.
|
||||||
|
|||||||
@@ -0,0 +1,529 @@
|
|||||||
|
// Copyright 2023 Parity Technologies (UK) Ltd.
|
||||||
|
// This file is part of Cumulus.
|
||||||
|
|
||||||
|
// Cumulus is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
|
||||||
|
// Cumulus is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU General Public License for more details.
|
||||||
|
|
||||||
|
// You should have received a copy of the GNU General Public License
|
||||||
|
// along with Cumulus. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
//! The AuRa consensus algorithm for parachains.
|
||||||
|
//!
|
||||||
|
//! This extends the Substrate provided AuRa consensus implementation to make it compatible for
|
||||||
|
//! parachains. This provides the option to run a "bare" relay-chain driven Aura implementation,
|
||||||
|
//! but also exposes the core functionalities separately to be composed into more complex implementations.
|
||||||
|
//!
|
||||||
|
//! For more information about AuRa, the Substrate crate should be checked.
|
||||||
|
|
||||||
|
use codec::{Decode, Encode};
|
||||||
|
use cumulus_client_collator::service::ServiceInterface as CollatorServiceInterface;
|
||||||
|
use cumulus_client_consensus_common::{ParachainBlockImportMarker, ParachainCandidate};
|
||||||
|
use cumulus_client_consensus_proposer::ProposerInterface;
|
||||||
|
use cumulus_primitives_core::{
|
||||||
|
relay_chain::Hash as PHash, CollectCollationInfo, PersistedValidationData,
|
||||||
|
};
|
||||||
|
use cumulus_primitives_parachain_inherent::ParachainInherentData;
|
||||||
|
use cumulus_relay_chain_interface::RelayChainInterface;
|
||||||
|
|
||||||
|
use polkadot_node_primitives::{CollationResult, MaybeCompressedPoV};
|
||||||
|
use polkadot_overseer::Handle as OverseerHandle;
|
||||||
|
use polkadot_primitives::{CollatorPair, Id as ParaId};
|
||||||
|
|
||||||
|
use futures::prelude::*;
|
||||||
|
use sc_client_api::{backend::AuxStore, BlockBackend, BlockOf};
|
||||||
|
use sc_consensus::{
|
||||||
|
import_queue::{BasicQueue, Verifier as VerifierT},
|
||||||
|
BlockImport, BlockImportParams, ForkChoiceStrategy, StateAction,
|
||||||
|
};
|
||||||
|
use sc_consensus_aura::standalone as aura_internal;
|
||||||
|
use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG, CONSENSUS_TRACE};
|
||||||
|
use sp_api::ProvideRuntimeApi;
|
||||||
|
use sp_application_crypto::AppPublic;
|
||||||
|
use sp_block_builder::BlockBuilder as BlockBuilderApi;
|
||||||
|
use sp_blockchain::HeaderBackend;
|
||||||
|
use sp_consensus::{error::Error as ConsensusError, BlockOrigin, SyncOracle};
|
||||||
|
use sp_consensus_aura::{AuraApi, Slot, SlotDuration};
|
||||||
|
use sp_core::crypto::Pair;
|
||||||
|
use sp_inherents::{CreateInherentDataProviders, InherentData, InherentDataProvider};
|
||||||
|
use sp_keystore::KeystorePtr;
|
||||||
|
use sp_runtime::{
|
||||||
|
generic::Digest,
|
||||||
|
traits::{Block as BlockT, HashFor, Header as HeaderT, Member},
|
||||||
|
};
|
||||||
|
use sp_state_machine::StorageChanges;
|
||||||
|
use std::{convert::TryFrom, error::Error, fmt::Debug, hash::Hash, sync::Arc, time::Duration};
|
||||||
|
|
||||||
|
/// Parameters for [`run_bare_relay_driven`].
|
||||||
|
pub struct Params<BI, CIDP, Client, RClient, SO, Proposer, CS> {
|
||||||
|
pub create_inherent_data_providers: CIDP,
|
||||||
|
pub block_import: BI,
|
||||||
|
pub para_client: Arc<Client>,
|
||||||
|
pub relay_client: Arc<RClient>,
|
||||||
|
pub sync_oracle: SO,
|
||||||
|
pub keystore: KeystorePtr,
|
||||||
|
pub key: CollatorPair,
|
||||||
|
pub para_id: ParaId,
|
||||||
|
pub overseer_handle: OverseerHandle,
|
||||||
|
pub slot_duration: SlotDuration,
|
||||||
|
pub proposer: Proposer,
|
||||||
|
pub collator_service: CS,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Run bare Aura consensus as a relay-chain-driven collator.
|
||||||
|
pub async fn run_bare_relay_driven<Block, P, BI, CIDP, Client, RClient, SO, Proposer, CS>(
|
||||||
|
params: Params<BI, CIDP, Client, RClient, SO, Proposer, CS>,
|
||||||
|
) where
|
||||||
|
Block: BlockT,
|
||||||
|
Client: ProvideRuntimeApi<Block>
|
||||||
|
+ BlockOf
|
||||||
|
+ AuxStore
|
||||||
|
+ HeaderBackend<Block>
|
||||||
|
+ BlockBackend<Block>
|
||||||
|
+ Send
|
||||||
|
+ Sync
|
||||||
|
+ 'static,
|
||||||
|
Client::Api: AuraApi<Block, P::Public> + CollectCollationInfo<Block>,
|
||||||
|
RClient: RelayChainInterface,
|
||||||
|
CIDP: CreateInherentDataProviders<Block, ()> + 'static,
|
||||||
|
BI: BlockImport<Block> + ParachainBlockImportMarker + Send + Sync + 'static,
|
||||||
|
SO: SyncOracle + Send + Sync + Clone + 'static,
|
||||||
|
Proposer: ProposerInterface<Block, Transaction = BI::Transaction>,
|
||||||
|
Proposer::Transaction: Sync,
|
||||||
|
CS: CollatorServiceInterface<Block>,
|
||||||
|
P: Pair + Send + Sync,
|
||||||
|
P::Public: AppPublic + Hash + Member + Encode + Decode,
|
||||||
|
P::Signature: TryFrom<Vec<u8>> + Hash + Member + Encode + Decode,
|
||||||
|
{
|
||||||
|
let mut proposer = params.proposer;
|
||||||
|
let mut block_import = params.block_import;
|
||||||
|
|
||||||
|
let mut collation_requests = cumulus_client_collator::relay_chain_driven::init(
|
||||||
|
params.key,
|
||||||
|
params.para_id,
|
||||||
|
params.overseer_handle,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
while let Some(request) = collation_requests.next().await {
|
||||||
|
macro_rules! reject_with_error {
|
||||||
|
($err:expr) => {{
|
||||||
|
request.complete(None);
|
||||||
|
tracing::error!(target: crate::LOG_TARGET, err = ?{ $err });
|
||||||
|
continue;
|
||||||
|
}};
|
||||||
|
}
|
||||||
|
|
||||||
|
let validation_data = request.persisted_validation_data();
|
||||||
|
|
||||||
|
let parent_header = match Block::Header::decode(&mut &validation_data.parent_head.0[..]) {
|
||||||
|
Ok(x) => x,
|
||||||
|
Err(e) => reject_with_error!(e),
|
||||||
|
};
|
||||||
|
|
||||||
|
let parent_hash = parent_header.hash();
|
||||||
|
|
||||||
|
if !params.collator_service.check_block_status(parent_hash, &parent_header) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
let claim = match claim_slot::<_, _, P>(
|
||||||
|
&*params.para_client,
|
||||||
|
parent_hash,
|
||||||
|
params.slot_duration,
|
||||||
|
¶ms.keystore,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(None) => continue,
|
||||||
|
Ok(Some(c)) => c,
|
||||||
|
Err(e) => reject_with_error!(e),
|
||||||
|
};
|
||||||
|
|
||||||
|
let (parachain_inherent_data, other_inherent_data) = match create_inherent_data(
|
||||||
|
*request.relay_parent(),
|
||||||
|
&validation_data,
|
||||||
|
parent_hash,
|
||||||
|
params.para_id,
|
||||||
|
¶ms.relay_client,
|
||||||
|
¶ms.create_inherent_data_providers,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(x) => x,
|
||||||
|
Err(e) => reject_with_error!(e),
|
||||||
|
};
|
||||||
|
|
||||||
|
let proposal = match proposer
|
||||||
|
.propose(
|
||||||
|
&parent_header,
|
||||||
|
¶chain_inherent_data,
|
||||||
|
other_inherent_data,
|
||||||
|
Digest { logs: vec![claim.pre_digest] },
|
||||||
|
// TODO [https://github.com/paritytech/cumulus/issues/2439]
|
||||||
|
// We should call out to a pluggable interface that provides
|
||||||
|
// the proposal duration.
|
||||||
|
Duration::from_millis(500),
|
||||||
|
// Set the block limit to 50% of the maximum PoV size.
|
||||||
|
//
|
||||||
|
// TODO: If we got benchmarking that includes the proof size,
|
||||||
|
// we should be able to use the maximum pov size.
|
||||||
|
Some((validation_data.max_pov_size / 2) as usize),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(p) => p,
|
||||||
|
Err(e) => reject_with_error!(e),
|
||||||
|
};
|
||||||
|
|
||||||
|
let sealed_importable = match seal::<_, _, P>(
|
||||||
|
proposal.block,
|
||||||
|
proposal.storage_changes,
|
||||||
|
&claim.author_pub,
|
||||||
|
¶ms.keystore,
|
||||||
|
) {
|
||||||
|
Ok(s) => s,
|
||||||
|
Err(e) => reject_with_error!(e),
|
||||||
|
};
|
||||||
|
|
||||||
|
let post_hash = sealed_importable.post_hash();
|
||||||
|
let block = Block::new(
|
||||||
|
sealed_importable.post_header(),
|
||||||
|
sealed_importable
|
||||||
|
.body
|
||||||
|
.as_ref()
|
||||||
|
.expect("body always created with this `propose` fn; qed")
|
||||||
|
.clone(),
|
||||||
|
);
|
||||||
|
|
||||||
|
if let Err(e) = block_import.import_block(sealed_importable).await {
|
||||||
|
reject_with_error!(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
let response = if let Some((collation, b)) = params.collator_service.build_collation(
|
||||||
|
&parent_header,
|
||||||
|
post_hash,
|
||||||
|
ParachainCandidate { block, proof: proposal.proof },
|
||||||
|
) {
|
||||||
|
tracing::info!(
|
||||||
|
target: crate::LOG_TARGET,
|
||||||
|
"PoV size {{ header: {}kb, extrinsics: {}kb, storage_proof: {}kb }}",
|
||||||
|
b.header().encode().len() as f64 / 1024f64,
|
||||||
|
b.extrinsics().encode().len() as f64 / 1024f64,
|
||||||
|
b.storage_proof().encode().len() as f64 / 1024f64,
|
||||||
|
);
|
||||||
|
|
||||||
|
if let MaybeCompressedPoV::Compressed(ref pov) = collation.proof_of_validity {
|
||||||
|
tracing::info!(
|
||||||
|
target: crate::LOG_TARGET,
|
||||||
|
"Compressed PoV size: {}kb",
|
||||||
|
pov.block_data.0.len() as f64 / 1024f64,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
let result_sender = params.collator_service.announce_with_barrier(post_hash);
|
||||||
|
Some(CollationResult { collation, result_sender: Some(result_sender) })
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
|
request.complete(response);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn slot_now(slot_duration: SlotDuration) -> Slot {
|
||||||
|
let timestamp = sp_timestamp::InherentDataProvider::from_system_time().timestamp();
|
||||||
|
Slot::from_timestamp(timestamp, slot_duration)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A claim on an Aura slot.
|
||||||
|
struct SlotClaim<Pub> {
|
||||||
|
author_pub: Pub,
|
||||||
|
pre_digest: sp_runtime::DigestItem,
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn claim_slot<B, C, P>(
|
||||||
|
client: &C,
|
||||||
|
parent_hash: B::Hash,
|
||||||
|
slot_duration: SlotDuration,
|
||||||
|
keystore: &KeystorePtr,
|
||||||
|
) -> Result<Option<SlotClaim<P::Public>>, Box<dyn Error>>
|
||||||
|
where
|
||||||
|
B: BlockT,
|
||||||
|
C: ProvideRuntimeApi<B> + Send + Sync + 'static,
|
||||||
|
C::Api: AuraApi<B, P::Public>,
|
||||||
|
P: Pair,
|
||||||
|
P::Public: Encode + Decode,
|
||||||
|
P::Signature: Encode + Decode,
|
||||||
|
{
|
||||||
|
// load authorities
|
||||||
|
let authorities = client.runtime_api().authorities(parent_hash).map_err(Box::new)?;
|
||||||
|
|
||||||
|
// Determine the current slot.
|
||||||
|
let slot_now = slot_now(slot_duration);
|
||||||
|
|
||||||
|
// Try to claim the slot locally.
|
||||||
|
let author_pub = {
|
||||||
|
let res = aura_internal::claim_slot::<P>(slot_now, &authorities, keystore).await;
|
||||||
|
match res {
|
||||||
|
Some(p) => p,
|
||||||
|
None => return Ok(None),
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Produce the pre-digest.
|
||||||
|
let pre_digest = aura_internal::pre_digest::<P>(slot_now);
|
||||||
|
|
||||||
|
Ok(Some(SlotClaim { author_pub, pre_digest }))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn create_inherent_data<B: BlockT>(
|
||||||
|
relay_parent: PHash,
|
||||||
|
validation_data: &PersistedValidationData,
|
||||||
|
parent_hash: B::Hash,
|
||||||
|
para_id: ParaId,
|
||||||
|
relay_chain_interface: &impl RelayChainInterface,
|
||||||
|
create_inherent_data_providers: &impl CreateInherentDataProviders<B, ()>,
|
||||||
|
) -> Result<(ParachainInherentData, InherentData), Box<dyn Error>> {
|
||||||
|
let paras_inherent_data = ParachainInherentData::create_at(
|
||||||
|
relay_parent,
|
||||||
|
relay_chain_interface,
|
||||||
|
validation_data,
|
||||||
|
para_id,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
let paras_inherent_data = match paras_inherent_data {
|
||||||
|
Some(p) => p,
|
||||||
|
None =>
|
||||||
|
return Err(format!("Could not create paras inherent data at {:?}", relay_parent).into()),
|
||||||
|
};
|
||||||
|
|
||||||
|
let other_inherent_data = create_inherent_data_providers
|
||||||
|
.create_inherent_data_providers(parent_hash, ())
|
||||||
|
.map_err(|e| e as Box<dyn Error>)
|
||||||
|
.await?
|
||||||
|
.create_inherent_data()
|
||||||
|
.await
|
||||||
|
.map_err(Box::new)?;
|
||||||
|
|
||||||
|
Ok((paras_inherent_data, other_inherent_data))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn seal<B: BlockT, T, P>(
|
||||||
|
pre_sealed: B,
|
||||||
|
storage_changes: StorageChanges<T, HashFor<B>>,
|
||||||
|
author_pub: &P::Public,
|
||||||
|
keystore: &KeystorePtr,
|
||||||
|
) -> Result<BlockImportParams<B, T>, Box<dyn Error>>
|
||||||
|
where
|
||||||
|
P: Pair,
|
||||||
|
P::Signature: Encode + Decode + TryFrom<Vec<u8>>,
|
||||||
|
P::Public: AppPublic,
|
||||||
|
{
|
||||||
|
let (pre_header, body) = pre_sealed.deconstruct();
|
||||||
|
let pre_hash = pre_header.hash();
|
||||||
|
let block_number = *pre_header.number();
|
||||||
|
|
||||||
|
// seal the block.
|
||||||
|
let block_import_params = {
|
||||||
|
let seal_digest =
|
||||||
|
aura_internal::seal::<_, P>(&pre_hash, &author_pub, keystore).map_err(Box::new)?;
|
||||||
|
let mut block_import_params = BlockImportParams::new(BlockOrigin::Own, pre_header);
|
||||||
|
block_import_params.post_digests.push(seal_digest);
|
||||||
|
block_import_params.body = Some(body.clone());
|
||||||
|
block_import_params.state_action =
|
||||||
|
StateAction::ApplyChanges(sc_consensus::StorageChanges::Changes(storage_changes));
|
||||||
|
block_import_params.fork_choice = Some(ForkChoiceStrategy::LongestChain);
|
||||||
|
block_import_params
|
||||||
|
};
|
||||||
|
let post_hash = block_import_params.post_hash();
|
||||||
|
|
||||||
|
tracing::info!(
|
||||||
|
target: crate::LOG_TARGET,
|
||||||
|
"🔖 Pre-sealed block for proposal at {}. Hash now {:?}, previously {:?}.",
|
||||||
|
block_number,
|
||||||
|
post_hash,
|
||||||
|
pre_hash,
|
||||||
|
);
|
||||||
|
|
||||||
|
Ok(block_import_params)
|
||||||
|
}
|
||||||
|
|
||||||
|
struct Verifier<P, Client, Block, CIDP> {
|
||||||
|
client: Arc<Client>,
|
||||||
|
create_inherent_data_providers: CIDP,
|
||||||
|
slot_duration: SlotDuration,
|
||||||
|
telemetry: Option<TelemetryHandle>,
|
||||||
|
_marker: std::marker::PhantomData<(Block, P)>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl<P, Client, Block, CIDP> VerifierT<Block> for Verifier<P, Client, Block, CIDP>
|
||||||
|
where
|
||||||
|
P: Pair,
|
||||||
|
P::Signature: Encode + Decode,
|
||||||
|
P::Public: Encode + Decode + PartialEq + Clone + Debug,
|
||||||
|
Block: BlockT,
|
||||||
|
Client: ProvideRuntimeApi<Block> + Send + Sync,
|
||||||
|
<Client as ProvideRuntimeApi<Block>>::Api: BlockBuilderApi<Block> + AuraApi<Block, P::Public>,
|
||||||
|
|
||||||
|
CIDP: CreateInherentDataProviders<Block, ()>,
|
||||||
|
{
|
||||||
|
async fn verify(
|
||||||
|
&mut self,
|
||||||
|
mut block_params: BlockImportParams<Block, ()>,
|
||||||
|
) -> Result<BlockImportParams<Block, ()>, String> {
|
||||||
|
// Skip checks that include execution, if being told so, or when importing only state.
|
||||||
|
//
|
||||||
|
// This is done for example when gap syncing and it is expected that the block after the gap
|
||||||
|
// was checked/chosen properly, e.g. by warp syncing to this block using a finality proof.
|
||||||
|
if block_params.state_action.skip_execution_checks() || block_params.with_state() {
|
||||||
|
return Ok(block_params)
|
||||||
|
}
|
||||||
|
|
||||||
|
let post_hash = block_params.header.hash();
|
||||||
|
let parent_hash = *block_params.header.parent_hash();
|
||||||
|
|
||||||
|
// check seal and update pre-hash/post-hash
|
||||||
|
{
|
||||||
|
let authorities = aura_internal::fetch_authorities(self.client.as_ref(), parent_hash)
|
||||||
|
.map_err(|e| {
|
||||||
|
format!("Could not fetch authorities at {:?}: {}", parent_hash, e)
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let slot_now = slot_now(self.slot_duration);
|
||||||
|
let res = aura_internal::check_header_slot_and_seal::<Block, P>(
|
||||||
|
slot_now,
|
||||||
|
block_params.header,
|
||||||
|
&authorities,
|
||||||
|
);
|
||||||
|
|
||||||
|
match res {
|
||||||
|
Ok((pre_header, _slot, seal_digest)) => {
|
||||||
|
telemetry!(
|
||||||
|
self.telemetry;
|
||||||
|
CONSENSUS_TRACE;
|
||||||
|
"aura.checked_and_importing";
|
||||||
|
"pre_header" => ?pre_header,
|
||||||
|
);
|
||||||
|
|
||||||
|
block_params.header = pre_header;
|
||||||
|
block_params.post_digests.push(seal_digest);
|
||||||
|
block_params.fork_choice = Some(ForkChoiceStrategy::LongestChain);
|
||||||
|
block_params.post_hash = Some(post_hash);
|
||||||
|
},
|
||||||
|
Err(aura_internal::SealVerificationError::Deferred(hdr, slot)) => {
|
||||||
|
telemetry!(
|
||||||
|
self.telemetry;
|
||||||
|
CONSENSUS_DEBUG;
|
||||||
|
"aura.header_too_far_in_future";
|
||||||
|
"hash" => ?post_hash,
|
||||||
|
"a" => ?hdr,
|
||||||
|
"b" => ?slot,
|
||||||
|
);
|
||||||
|
|
||||||
|
return Err(format!(
|
||||||
|
"Rejecting block ({:?}) from future slot {:?}",
|
||||||
|
post_hash, slot
|
||||||
|
))
|
||||||
|
},
|
||||||
|
Err(e) =>
|
||||||
|
return Err(format!(
|
||||||
|
"Rejecting block ({:?}) with invalid seal ({:?})",
|
||||||
|
post_hash, e
|
||||||
|
)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// check inherents.
|
||||||
|
if let Some(body) = block_params.body.clone() {
|
||||||
|
let block = Block::new(block_params.header.clone(), body);
|
||||||
|
let create_inherent_data_providers = self
|
||||||
|
.create_inherent_data_providers
|
||||||
|
.create_inherent_data_providers(parent_hash, ())
|
||||||
|
.await
|
||||||
|
.map_err(|e| format!("Could not create inherent data {:?}", e))?;
|
||||||
|
|
||||||
|
let inherent_data = create_inherent_data_providers
|
||||||
|
.create_inherent_data()
|
||||||
|
.await
|
||||||
|
.map_err(|e| format!("Could not create inherent data {:?}", e))?;
|
||||||
|
|
||||||
|
let inherent_res = self
|
||||||
|
.client
|
||||||
|
.runtime_api()
|
||||||
|
.check_inherents_with_context(
|
||||||
|
parent_hash,
|
||||||
|
block_params.origin.into(),
|
||||||
|
block,
|
||||||
|
inherent_data,
|
||||||
|
)
|
||||||
|
.map_err(|e| format!("Unable to check block inherents {:?}", e))?;
|
||||||
|
|
||||||
|
if !inherent_res.ok() {
|
||||||
|
for (i, e) in inherent_res.into_errors() {
|
||||||
|
match create_inherent_data_providers.try_handle_error(&i, &e).await {
|
||||||
|
Some(res) => res.map_err(|e| format!("Inherent Error {:?}", e))?,
|
||||||
|
None =>
|
||||||
|
return Err(format!(
|
||||||
|
"Unknown inherent error, source {:?}",
|
||||||
|
String::from_utf8_lossy(&i[..])
|
||||||
|
)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(block_params)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Start an import queue for a Cumulus node which checks blocks' seals and inherent data.
|
||||||
|
///
|
||||||
|
/// Pass in only inherent data providers which don't include aura or parachain consensus inherents,
|
||||||
|
/// e.g. things like timestamp and custom inherents for the runtime.
|
||||||
|
///
|
||||||
|
/// The others are generated explicitly internally.
|
||||||
|
///
|
||||||
|
/// This should only be used for runtimes where the runtime does not check all inherents and
|
||||||
|
/// seals in `execute_block` (see <https://github.com/paritytech/cumulus/issues/2436>)
|
||||||
|
pub fn fully_verifying_import_queue<P, Client, Block: BlockT, I, CIDP>(
|
||||||
|
client: Arc<Client>,
|
||||||
|
block_import: I,
|
||||||
|
create_inherent_data_providers: CIDP,
|
||||||
|
slot_duration: SlotDuration,
|
||||||
|
spawner: &impl sp_core::traits::SpawnEssentialNamed,
|
||||||
|
registry: Option<&substrate_prometheus_endpoint::Registry>,
|
||||||
|
telemetry: Option<TelemetryHandle>,
|
||||||
|
) -> BasicQueue<Block, I::Transaction>
|
||||||
|
where
|
||||||
|
P: Pair,
|
||||||
|
P::Signature: Encode + Decode,
|
||||||
|
P::Public: Encode + Decode + PartialEq + Clone + Debug,
|
||||||
|
I: BlockImport<Block, Error = ConsensusError>
|
||||||
|
+ ParachainBlockImportMarker
|
||||||
|
+ Send
|
||||||
|
+ Sync
|
||||||
|
+ 'static,
|
||||||
|
I::Transaction: Send,
|
||||||
|
Client: ProvideRuntimeApi<Block> + Send + Sync + 'static,
|
||||||
|
<Client as ProvideRuntimeApi<Block>>::Api: BlockBuilderApi<Block> + AuraApi<Block, P::Public>,
|
||||||
|
CIDP: CreateInherentDataProviders<Block, ()> + 'static,
|
||||||
|
{
|
||||||
|
let verifier = Verifier::<P, _, _, _> {
|
||||||
|
client,
|
||||||
|
create_inherent_data_providers,
|
||||||
|
slot_duration,
|
||||||
|
telemetry,
|
||||||
|
_marker: std::marker::PhantomData,
|
||||||
|
};
|
||||||
|
|
||||||
|
BasicQueue::new(verifier, Box::new(block_import), None, spawner, registry)
|
||||||
|
}
|
||||||
@@ -18,8 +18,10 @@ sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "mas
|
|||||||
sc-consensus = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
sc-consensus = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||||
sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||||
sp-consensus = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
sp-consensus = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||||
|
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||||
sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||||
sp-trie = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
sp-trie = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||||
|
substrate-prometheus-endpoint = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||||
|
|
||||||
# Polkadot
|
# Polkadot
|
||||||
polkadot-primitives = { git = "https://github.com/paritytech/polkadot", branch = "master" }
|
polkadot-primitives = { git = "https://github.com/paritytech/polkadot", branch = "master" }
|
||||||
|
|||||||
@@ -0,0 +1,77 @@
|
|||||||
|
// Copyright 2019-2023 Parity Technologies (UK) Ltd.
|
||||||
|
// This file is part of Cumulus.
|
||||||
|
|
||||||
|
// Cumulus is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
|
||||||
|
// Cumulus is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU General Public License for more details.
|
||||||
|
|
||||||
|
// You should have received a copy of the GNU General Public License
|
||||||
|
// along with Cumulus. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
//! (unstable) Composable utilities for constructing import queues for parachains.
|
||||||
|
//!
|
||||||
|
//! Unlike standalone chains, parachains have the requirement that all consensus logic
|
||||||
|
//! must be checked within the runtime. This property means that work which is normally
|
||||||
|
//! done in the import queue per-block, such as checking signatures, quorums, and whether
|
||||||
|
//! inherent extrinsics were constructed faithfully do not need to be done, per se.
|
||||||
|
//!
|
||||||
|
//! It may seem that it would be beneficial for the client to do these checks regardless,
|
||||||
|
//! but in practice this means that clients would just reject blocks which are _valid_ according
|
||||||
|
//! to their Parachain Validation Function, which is the ultimate source of consensus truth.
|
||||||
|
//!
|
||||||
|
//! However, parachain runtimes expose two different access points for executing blocks
|
||||||
|
//! in full nodes versus executing those blocks in the parachain validation environment.
|
||||||
|
//! At the time of writing, the inherent and consensus checks in most Cumulus runtimes
|
||||||
|
//! are only performed during parachain validation, not full node block execution.
|
||||||
|
//!
|
||||||
|
//! See <https://github.com/paritytech/cumulus/issues/2436> for details.
|
||||||
|
|
||||||
|
use sp_consensus::error::Error as ConsensusError;
|
||||||
|
use sp_runtime::traits::Block as BlockT;
|
||||||
|
|
||||||
|
use sc_consensus::{
|
||||||
|
block_import::{BlockImport, BlockImportParams},
|
||||||
|
import_queue::{BasicQueue, Verifier},
|
||||||
|
};
|
||||||
|
|
||||||
|
use crate::ParachainBlockImportMarker;
|
||||||
|
|
||||||
|
/// A [`Verifier`] for blocks which verifies absolutely nothing.
|
||||||
|
///
|
||||||
|
/// This should only be used when the runtime is responsible for checking block seals and inherents.
|
||||||
|
pub struct VerifyNothing;
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl<Block: BlockT> Verifier<Block> for VerifyNothing {
|
||||||
|
async fn verify(
|
||||||
|
&mut self,
|
||||||
|
params: BlockImportParams<Block, ()>,
|
||||||
|
) -> Result<BlockImportParams<Block, ()>, String> {
|
||||||
|
Ok(params)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// An import queue which does no verification.
|
||||||
|
///
|
||||||
|
/// This should only be used when the runtime is responsible for checking block seals and inherents.
|
||||||
|
pub fn verify_nothing_import_queue<Block: BlockT, I>(
|
||||||
|
block_import: I,
|
||||||
|
spawner: &impl sp_core::traits::SpawnEssentialNamed,
|
||||||
|
registry: Option<&substrate_prometheus_endpoint::Registry>,
|
||||||
|
) -> BasicQueue<Block, I::Transaction>
|
||||||
|
where
|
||||||
|
I: BlockImport<Block, Error = ConsensusError>
|
||||||
|
+ ParachainBlockImportMarker
|
||||||
|
+ Send
|
||||||
|
+ Sync
|
||||||
|
+ 'static,
|
||||||
|
I::Transaction: Send,
|
||||||
|
{
|
||||||
|
BasicQueue::new(VerifyNothing, Box::new(block_import), None, spawner, registry)
|
||||||
|
}
|
||||||
@@ -32,6 +32,8 @@ pub use parachain_consensus::run_parachain_consensus;
|
|||||||
use level_monitor::LevelMonitor;
|
use level_monitor::LevelMonitor;
|
||||||
pub use level_monitor::{LevelLimit, MAX_LEAVES_PER_LEVEL_SENSIBLE_DEFAULT};
|
pub use level_monitor::{LevelLimit, MAX_LEAVES_PER_LEVEL_SENSIBLE_DEFAULT};
|
||||||
|
|
||||||
|
pub mod import_queue;
|
||||||
|
|
||||||
/// The result of [`ParachainConsensus::produce_candidate`].
|
/// The result of [`ParachainConsensus::produce_candidate`].
|
||||||
pub struct ParachainCandidate<B> {
|
pub struct ParachainCandidate<B> {
|
||||||
/// The block that was built for this candidate.
|
/// The block that was built for this candidate.
|
||||||
|
|||||||
@@ -0,0 +1,20 @@
|
|||||||
|
[package]
|
||||||
|
name = "cumulus-client-consensus-proposer"
|
||||||
|
description = "A Substrate `Proposer` for building parachain blocks"
|
||||||
|
version = "0.1.0"
|
||||||
|
authors = ["Parity Technologies <admin@parity.io>"]
|
||||||
|
edition = "2021"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
anyhow = "1.0"
|
||||||
|
async-trait = "0.1.68"
|
||||||
|
thiserror = "1.0.40"
|
||||||
|
|
||||||
|
# Substrate
|
||||||
|
sp-consensus = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||||
|
sp-inherents = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||||
|
sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||||
|
sp-state-machine = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||||
|
|
||||||
|
# Cumulus
|
||||||
|
cumulus-primitives-parachain-inherent = { path = "../../../primitives/parachain-inherent" }
|
||||||
@@ -0,0 +1,137 @@
|
|||||||
|
// Copyright 2023 Parity Technologies (UK) Ltd.
|
||||||
|
// This file is part of Cumulus.
|
||||||
|
|
||||||
|
// Cumulus is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
|
||||||
|
// Cumulus is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU General Public License for more details.
|
||||||
|
|
||||||
|
// You should have received a copy of the GNU General Public License
|
||||||
|
// along with Cumulus. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
//! The Cumulus [`Proposer`] is a wrapper around a Substrate [`sp_consensus::Environment`]
|
||||||
|
//! for creating new parachain blocks.
|
||||||
|
//!
|
||||||
|
//! This utility is designed to be composed within any collator consensus algorithm.
|
||||||
|
|
||||||
|
use async_trait::async_trait;
|
||||||
|
|
||||||
|
use cumulus_primitives_parachain_inherent::ParachainInherentData;
|
||||||
|
use sp_consensus::{EnableProofRecording, Environment, Proposal, Proposer as SubstrateProposer};
|
||||||
|
use sp_inherents::InherentData;
|
||||||
|
use sp_runtime::{traits::Block as BlockT, Digest};
|
||||||
|
use sp_state_machine::StorageProof;
|
||||||
|
|
||||||
|
use std::{fmt::Debug, time::Duration};
|
||||||
|
|
||||||
|
/// Errors that can occur when proposing a parachain block.
|
||||||
|
#[derive(thiserror::Error, Debug)]
|
||||||
|
#[error(transparent)]
|
||||||
|
pub struct Error {
|
||||||
|
inner: anyhow::Error,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Error {
|
||||||
|
/// Create an error tied to the creation of a proposer.
|
||||||
|
pub fn proposer_creation(err: impl Into<anyhow::Error>) -> Self {
|
||||||
|
Error { inner: err.into().context("Proposer Creation") }
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create an error tied to the proposing logic itself.
|
||||||
|
pub fn proposing(err: impl Into<anyhow::Error>) -> Self {
|
||||||
|
Error { inner: err.into().context("Proposing") }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A type alias for easily referring to the type of a proposal produced by a specific
|
||||||
|
/// [`Proposer`].
|
||||||
|
pub type ProposalOf<B, T> = Proposal<B, <T as ProposerInterface<B>>::Transaction, StorageProof>;
|
||||||
|
|
||||||
|
/// An interface for proposers.
|
||||||
|
#[async_trait]
|
||||||
|
pub trait ProposerInterface<Block: BlockT> {
|
||||||
|
/// The underlying DB transaction type produced with the block proposal.
|
||||||
|
type Transaction: Default + Send + 'static;
|
||||||
|
|
||||||
|
/// Propose a collation using the supplied `InherentData` and the provided
|
||||||
|
/// `ParachainInherentData`.
|
||||||
|
///
|
||||||
|
/// Also specify any required inherent digests, the maximum proposal duration,
|
||||||
|
/// and the block size limit in bytes. See the documentation on [`sp_consensus::Proposer::propose`]
|
||||||
|
/// for more details on how to interpret these parameters.
|
||||||
|
///
|
||||||
|
/// The `InherentData` and `Digest` are left deliberately general in order to accommodate
|
||||||
|
/// all possible collator selection algorithms or inherent creation mechanisms,
|
||||||
|
/// while the `ParachainInherentData` is made explicit so it will be constructed appropriately.
|
||||||
|
///
|
||||||
|
/// If the `InherentData` passed into this function already has a `ParachainInherentData`,
|
||||||
|
/// this should throw an error.
|
||||||
|
async fn propose(
|
||||||
|
&mut self,
|
||||||
|
parent_header: &Block::Header,
|
||||||
|
paras_inherent_data: &ParachainInherentData,
|
||||||
|
other_inherent_data: InherentData,
|
||||||
|
inherent_digests: Digest,
|
||||||
|
max_duration: Duration,
|
||||||
|
block_size_limit: Option<usize>,
|
||||||
|
) -> Result<Proposal<Block, Self::Transaction, StorageProof>, Error>;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A simple wrapper around a Substrate proposer for creating collations.
|
||||||
|
pub struct Proposer<B, T> {
|
||||||
|
inner: T,
|
||||||
|
_marker: std::marker::PhantomData<B>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<B, T> Proposer<B, T> {
|
||||||
|
/// Create a new Cumulus [`Proposer`].
|
||||||
|
pub fn new(inner: T) -> Self {
|
||||||
|
Proposer { inner, _marker: std::marker::PhantomData }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl<B, T> ProposerInterface<B> for Proposer<B, T>
|
||||||
|
where
|
||||||
|
B: sp_runtime::traits::Block,
|
||||||
|
T: Environment<B> + Send,
|
||||||
|
T::Error: Send + Sync + 'static,
|
||||||
|
T::Proposer: SubstrateProposer<B, ProofRecording = EnableProofRecording, Proof = StorageProof>,
|
||||||
|
<T::Proposer as SubstrateProposer<B>>::Error: Send + Sync + 'static,
|
||||||
|
{
|
||||||
|
type Transaction = <<T as Environment<B>>::Proposer as SubstrateProposer<B>>::Transaction;
|
||||||
|
|
||||||
|
async fn propose(
|
||||||
|
&mut self,
|
||||||
|
parent_header: &B::Header,
|
||||||
|
paras_inherent_data: &ParachainInherentData,
|
||||||
|
other_inherent_data: InherentData,
|
||||||
|
inherent_digests: Digest,
|
||||||
|
max_duration: Duration,
|
||||||
|
block_size_limit: Option<usize>,
|
||||||
|
) -> Result<Proposal<B, Self::Transaction, StorageProof>, Error> {
|
||||||
|
let proposer = self
|
||||||
|
.inner
|
||||||
|
.init(parent_header)
|
||||||
|
.await
|
||||||
|
.map_err(|e| Error::proposer_creation(anyhow::Error::new(e)))?;
|
||||||
|
|
||||||
|
let mut inherent_data = other_inherent_data;
|
||||||
|
inherent_data
|
||||||
|
.put_data(
|
||||||
|
cumulus_primitives_parachain_inherent::INHERENT_IDENTIFIER,
|
||||||
|
¶s_inherent_data,
|
||||||
|
)
|
||||||
|
.map_err(|e| Error::proposing(anyhow::Error::new(e)))?;
|
||||||
|
|
||||||
|
proposer
|
||||||
|
.propose(inherent_data, inherent_digests, max_duration, block_size_limit)
|
||||||
|
.await
|
||||||
|
.map_err(|e| Error::proposing(anyhow::Error::new(e)).into())
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user