mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-13 05:51:02 +00:00
Consensus utilities and rearchitecture for more dynamic collators (#2382)
* implement a proposer utility for consensus * tidy up deps of new proposer crate * implement a collator-service crate * rewrite cumulus-collator to use new service struct * implement a module for relay-chain-driven collators * adapt start_collator to use the new relay_chain_driven module * move collator-service to a public submodule * create an interface trait for the proposer * begin aura reimplementation * address review comments * update substrrate git ref * update polkadot-primitives refs * rough draft of aura collation using standalone fns * add a ServiceInterface * port aura reimpl to use new service trait * add an import queue utility crate * remove import queue crate in favor of module in common * implement new verification queue for aura * implement remaining behaviors * split 'collate' into smaller functions that could be pub * add telemetry * fix doc job? * Specify async-trait patch version Co-authored-by: Bastian Köcher <git@kchr.de> * remove 'fn@' in doc string. Co-authored-by: Bastian Köcher <git@kchr.de> * update variable names to be more readable * refactor proposer errors to anyhow/thiserror * remove manual span instrumentation Co-authored-by: Bastian Köcher <git@kchr.de> * make slot_claim private * fix unused import * fmt * fmt * make clippy happy --------- Co-authored-by: Bastian Köcher <git@kchr.de>
This commit is contained in:
@@ -5,15 +5,15 @@ authors = ["Parity Technologies <admin@parity.io>"]
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
parking_lot = "0.12.1"
|
||||
codec = { package = "parity-scale-codec", version = "3.0.0", features = [ "derive" ] }
|
||||
futures = "0.3.21"
|
||||
parking_lot = "0.12.0"
|
||||
tracing = "0.1.25"
|
||||
|
||||
# Substrate
|
||||
sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
sp-consensus = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
|
||||
|
||||
+141
-224
@@ -16,50 +16,46 @@
|
||||
|
||||
//! Cumulus Collator implementation for Substrate.
|
||||
|
||||
use cumulus_client_network::WaitToAnnounce;
|
||||
use cumulus_primitives_core::{
|
||||
relay_chain::Hash as PHash, CollationInfo, CollectCollationInfo, ParachainBlockData,
|
||||
PersistedValidationData,
|
||||
relay_chain::Hash as PHash, CollectCollationInfo, PersistedValidationData,
|
||||
};
|
||||
|
||||
use sc_client_api::BlockBackend;
|
||||
use sp_api::{ApiExt, ProvideRuntimeApi};
|
||||
use sp_consensus::BlockStatus;
|
||||
use sp_api::ProvideRuntimeApi;
|
||||
use sp_core::traits::SpawnNamed;
|
||||
use sp_runtime::traits::{Block as BlockT, HashFor, Header as HeaderT, Zero};
|
||||
use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
|
||||
|
||||
use cumulus_client_consensus_common::ParachainConsensus;
|
||||
use polkadot_node_primitives::{
|
||||
BlockData, Collation, CollationGenerationConfig, CollationResult, MaybeCompressedPoV, PoV,
|
||||
};
|
||||
use polkadot_node_subsystem::messages::{CollationGenerationMessage, CollatorProtocolMessage};
|
||||
use polkadot_node_primitives::{CollationResult, MaybeCompressedPoV};
|
||||
use polkadot_overseer::Handle as OverseerHandle;
|
||||
use polkadot_primitives::{CollatorPair, Id as ParaId};
|
||||
|
||||
use codec::{Decode, Encode};
|
||||
use futures::{channel::oneshot, FutureExt};
|
||||
use parking_lot::Mutex;
|
||||
use futures::prelude::*;
|
||||
use std::sync::Arc;
|
||||
use tracing::Instrument;
|
||||
|
||||
use crate::service::CollatorService;
|
||||
|
||||
pub mod service;
|
||||
|
||||
/// The logging target.
|
||||
const LOG_TARGET: &str = "cumulus-collator";
|
||||
|
||||
/// The implementation of the Cumulus `Collator`.
|
||||
///
|
||||
/// Note that this implementation is soon to be deprecated and removed, and it is suggested to
|
||||
/// directly use the [`CollatorService`] instead, so consensus engine implementations
|
||||
/// live at the top level.
|
||||
pub struct Collator<Block: BlockT, BS, RA> {
|
||||
block_status: Arc<BS>,
|
||||
service: CollatorService<Block, BS, RA>,
|
||||
parachain_consensus: Box<dyn ParachainConsensus<Block>>,
|
||||
wait_to_announce: Arc<Mutex<WaitToAnnounce<Block>>>,
|
||||
runtime_api: Arc<RA>,
|
||||
}
|
||||
|
||||
impl<Block: BlockT, BS, RA> Clone for Collator<Block, BS, RA> {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
block_status: self.block_status.clone(),
|
||||
wait_to_announce: self.wait_to_announce.clone(),
|
||||
Collator {
|
||||
service: self.service.clone(),
|
||||
parachain_consensus: self.parachain_consensus.clone(),
|
||||
runtime_api: self.runtime_api.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -73,159 +69,10 @@ where
|
||||
{
|
||||
/// Create a new instance.
|
||||
fn new(
|
||||
block_status: Arc<BS>,
|
||||
spawner: Arc<dyn SpawnNamed + Send + Sync>,
|
||||
announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
|
||||
runtime_api: Arc<RA>,
|
||||
collator_service: CollatorService<Block, BS, RA>,
|
||||
parachain_consensus: Box<dyn ParachainConsensus<Block>>,
|
||||
) -> Self {
|
||||
let wait_to_announce = Arc::new(Mutex::new(WaitToAnnounce::new(spawner, announce_block)));
|
||||
|
||||
Self { block_status, wait_to_announce, runtime_api, parachain_consensus }
|
||||
}
|
||||
|
||||
/// Checks the status of the given block hash in the Parachain.
|
||||
///
|
||||
/// Returns `true` if the block could be found and is good to be build on.
|
||||
fn check_block_status(&self, hash: Block::Hash, header: &Block::Header) -> bool {
|
||||
match self.block_status.block_status(hash) {
|
||||
Ok(BlockStatus::Queued) => {
|
||||
tracing::debug!(
|
||||
target: LOG_TARGET,
|
||||
block_hash = ?hash,
|
||||
"Skipping candidate production, because block is still queued for import.",
|
||||
);
|
||||
false
|
||||
},
|
||||
Ok(BlockStatus::InChainWithState) => true,
|
||||
Ok(BlockStatus::InChainPruned) => {
|
||||
tracing::error!(
|
||||
target: LOG_TARGET,
|
||||
"Skipping candidate production, because block `{:?}` is already pruned!",
|
||||
hash,
|
||||
);
|
||||
false
|
||||
},
|
||||
Ok(BlockStatus::KnownBad) => {
|
||||
tracing::error!(
|
||||
target: LOG_TARGET,
|
||||
block_hash = ?hash,
|
||||
"Block is tagged as known bad and is included in the relay chain! Skipping candidate production!",
|
||||
);
|
||||
false
|
||||
},
|
||||
Ok(BlockStatus::Unknown) => {
|
||||
if header.number().is_zero() {
|
||||
tracing::error!(
|
||||
target: LOG_TARGET,
|
||||
block_hash = ?hash,
|
||||
"Could not find the header of the genesis block in the database!",
|
||||
);
|
||||
} else {
|
||||
tracing::debug!(
|
||||
target: LOG_TARGET,
|
||||
block_hash = ?hash,
|
||||
"Skipping candidate production, because block is unknown.",
|
||||
);
|
||||
}
|
||||
false
|
||||
},
|
||||
Err(e) => {
|
||||
tracing::error!(
|
||||
target: LOG_TARGET,
|
||||
block_hash = ?hash,
|
||||
error = ?e,
|
||||
"Failed to get block status.",
|
||||
);
|
||||
false
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// Fetch the collation info from the runtime.
|
||||
///
|
||||
/// Returns `Ok(Some(_))` on success, `Err(_)` on error or `Ok(None)` if the runtime api isn't implemented by the runtime.
|
||||
fn fetch_collation_info(
|
||||
&self,
|
||||
block_hash: Block::Hash,
|
||||
header: &Block::Header,
|
||||
) -> Result<Option<CollationInfo>, sp_api::ApiError> {
|
||||
let runtime_api = self.runtime_api.runtime_api();
|
||||
|
||||
let api_version =
|
||||
match runtime_api.api_version::<dyn CollectCollationInfo<Block>>(block_hash)? {
|
||||
Some(version) => version,
|
||||
None => {
|
||||
tracing::error!(
|
||||
target: LOG_TARGET,
|
||||
"Could not fetch `CollectCollationInfo` runtime api version."
|
||||
);
|
||||
return Ok(None)
|
||||
},
|
||||
};
|
||||
|
||||
let collation_info = if api_version < 2 {
|
||||
#[allow(deprecated)]
|
||||
runtime_api
|
||||
.collect_collation_info_before_version_2(block_hash)?
|
||||
.into_latest(header.encode().into())
|
||||
} else {
|
||||
runtime_api.collect_collation_info(block_hash, header)?
|
||||
};
|
||||
|
||||
Ok(Some(collation_info))
|
||||
}
|
||||
|
||||
fn build_collation(
|
||||
&self,
|
||||
block: ParachainBlockData<Block>,
|
||||
block_hash: Block::Hash,
|
||||
pov: PoV,
|
||||
) -> Option<Collation> {
|
||||
let collation_info = self
|
||||
.fetch_collation_info(block_hash, block.header())
|
||||
.map_err(|e| {
|
||||
tracing::error!(
|
||||
target: LOG_TARGET,
|
||||
error = ?e,
|
||||
"Failed to collect collation info.",
|
||||
)
|
||||
})
|
||||
.ok()
|
||||
.flatten()?;
|
||||
|
||||
let upward_messages = collation_info
|
||||
.upward_messages
|
||||
.try_into()
|
||||
.map_err(|e| {
|
||||
tracing::error!(
|
||||
target: LOG_TARGET,
|
||||
error = ?e,
|
||||
"Number of upward messages should not be greater than `MAX_UPWARD_MESSAGE_NUM`",
|
||||
)
|
||||
})
|
||||
.ok()?;
|
||||
let horizontal_messages = collation_info
|
||||
.horizontal_messages
|
||||
.try_into()
|
||||
.map_err(|e| {
|
||||
tracing::error!(
|
||||
target: LOG_TARGET,
|
||||
error = ?e,
|
||||
"Number of horizontal messages should not be greater than `MAX_HORIZONTAL_MESSAGE_NUM`",
|
||||
)
|
||||
})
|
||||
.ok()?;
|
||||
|
||||
Some(Collation {
|
||||
upward_messages,
|
||||
new_validation_code: collation_info.new_validation_code,
|
||||
processed_downward_messages: collation_info.processed_downward_messages,
|
||||
horizontal_messages,
|
||||
hrmp_watermark: collation_info.hrmp_watermark,
|
||||
head_data: collation_info.head_data,
|
||||
proof_of_validity: MaybeCompressedPoV::Compressed(pov),
|
||||
})
|
||||
Self { service: collator_service, parachain_consensus }
|
||||
}
|
||||
|
||||
async fn produce_candidate(
|
||||
@@ -252,7 +99,7 @@ where
|
||||
};
|
||||
|
||||
let last_head_hash = last_head.hash();
|
||||
if !self.check_block_status(last_head_hash, &last_head) {
|
||||
if !self.service.check_block_status(last_head_hash, &last_head) {
|
||||
return None
|
||||
}
|
||||
|
||||
@@ -268,19 +115,9 @@ where
|
||||
.produce_candidate(&last_head, relay_parent, &validation_data)
|
||||
.await?;
|
||||
|
||||
let (header, extrinsics) = candidate.block.deconstruct();
|
||||
let block_hash = candidate.block.header().hash();
|
||||
|
||||
let compact_proof =
|
||||
match candidate.proof.into_compact_proof::<HashFor<Block>>(*last_head.state_root()) {
|
||||
Ok(proof) => proof,
|
||||
Err(e) => {
|
||||
tracing::error!(target: "cumulus-collator", "Failed to compact proof: {:?}", e);
|
||||
return None
|
||||
},
|
||||
};
|
||||
|
||||
// Create the parachain block data for the validators.
|
||||
let b = ParachainBlockData::<Block>::new(header, extrinsics, compact_proof);
|
||||
let (collation, b) = self.service.build_collation(&last_head, block_hash, candidate)?;
|
||||
|
||||
tracing::info!(
|
||||
target: LOG_TARGET,
|
||||
@@ -290,21 +127,15 @@ where
|
||||
b.storage_proof().encode().len() as f64 / 1024f64,
|
||||
);
|
||||
|
||||
let pov =
|
||||
polkadot_node_primitives::maybe_compress_pov(PoV { block_data: BlockData(b.encode()) });
|
||||
if let MaybeCompressedPoV::Compressed(ref pov) = collation.proof_of_validity {
|
||||
tracing::info!(
|
||||
target: LOG_TARGET,
|
||||
"Compressed PoV size: {}kb",
|
||||
pov.block_data.0.len() as f64 / 1024f64,
|
||||
);
|
||||
}
|
||||
|
||||
tracing::info!(
|
||||
target: LOG_TARGET,
|
||||
"Compressed PoV size: {}kb",
|
||||
pov.block_data.0.len() as f64 / 1024f64,
|
||||
);
|
||||
|
||||
let block_hash = b.header().hash();
|
||||
let collation = self.build_collation(b, block_hash, pov)?;
|
||||
|
||||
let (result_sender, signed_stmt_recv) = oneshot::channel();
|
||||
|
||||
self.wait_to_announce.lock().wait_to_announce(block_hash, signed_stmt_recv);
|
||||
let result_sender = self.service.announce_with_barrier(block_hash);
|
||||
|
||||
tracing::info!(target: LOG_TARGET, ?block_hash, "Produced proof-of-validity candidate.",);
|
||||
|
||||
@@ -312,6 +143,96 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Relay-chain-driven collators are those whose block production is driven purely
|
||||
/// by new relay chain blocks and the most recently included parachain blocks
|
||||
/// within them.
|
||||
///
|
||||
/// This method of driving collators is not suited to anything but the most simple parachain
|
||||
/// consensus mechanisms, and this module may soon be deprecated.
|
||||
pub mod relay_chain_driven {
|
||||
use futures::{
|
||||
channel::{mpsc, oneshot},
|
||||
prelude::*,
|
||||
};
|
||||
use polkadot_node_primitives::{CollationGenerationConfig, CollationResult};
|
||||
use polkadot_node_subsystem::messages::{CollationGenerationMessage, CollatorProtocolMessage};
|
||||
use polkadot_overseer::Handle as OverseerHandle;
|
||||
use polkadot_primitives::{CollatorPair, Id as ParaId};
|
||||
|
||||
use cumulus_primitives_core::{relay_chain::Hash as PHash, PersistedValidationData};
|
||||
|
||||
/// A request to author a collation, based on the advancement of the relay chain.
|
||||
///
|
||||
/// See the module docs for more info on relay-chain-driven collators.
|
||||
pub struct CollationRequest {
|
||||
relay_parent: PHash,
|
||||
pvd: PersistedValidationData,
|
||||
sender: oneshot::Sender<Option<CollationResult>>,
|
||||
}
|
||||
|
||||
impl CollationRequest {
|
||||
/// Get the relay parent of the collation request.
|
||||
pub fn relay_parent(&self) -> &PHash {
|
||||
&self.relay_parent
|
||||
}
|
||||
|
||||
/// Get the [`PersistedValidationData`] for the request.
|
||||
pub fn persisted_validation_data(&self) -> &PersistedValidationData {
|
||||
&self.pvd
|
||||
}
|
||||
|
||||
/// Complete the request with a collation, if any.
|
||||
pub fn complete(self, collation: Option<CollationResult>) {
|
||||
let _ = self.sender.send(collation);
|
||||
}
|
||||
}
|
||||
|
||||
/// Initialize the collator with Polkadot's collation-generation
|
||||
/// subsystem, returning a stream of collation requests to handle.
|
||||
pub async fn init(
|
||||
key: CollatorPair,
|
||||
para_id: ParaId,
|
||||
overseer_handle: OverseerHandle,
|
||||
) -> mpsc::Receiver<CollationRequest> {
|
||||
let mut overseer_handle = overseer_handle;
|
||||
|
||||
let (stream_tx, stream_rx) = mpsc::channel(0);
|
||||
let config = CollationGenerationConfig {
|
||||
key,
|
||||
para_id,
|
||||
collator: Box::new(move |relay_parent, validation_data| {
|
||||
// Cloning the channel on each usage effectively makes the channel
|
||||
// unbounded. The channel is actually bounded by the block production
|
||||
// and consensus systems of Polkadot, which limits the amount of possible
|
||||
// blocks.
|
||||
let mut stream_tx = stream_tx.clone();
|
||||
let validation_data = validation_data.clone();
|
||||
Box::pin(async move {
|
||||
let (this_tx, this_rx) = oneshot::channel();
|
||||
let request =
|
||||
CollationRequest { relay_parent, pvd: validation_data, sender: this_tx };
|
||||
|
||||
if stream_tx.send(request).await.is_err() {
|
||||
return None
|
||||
}
|
||||
|
||||
this_rx.await.ok().flatten()
|
||||
})
|
||||
}),
|
||||
};
|
||||
|
||||
overseer_handle
|
||||
.send_msg(CollationGenerationMessage::Initialize(config), "StartCollator")
|
||||
.await;
|
||||
|
||||
overseer_handle
|
||||
.send_msg(CollatorProtocolMessage::CollateOn(para_id), "StartCollator")
|
||||
.await;
|
||||
|
||||
stream_rx
|
||||
}
|
||||
}
|
||||
|
||||
/// Parameters for [`start_collator`].
|
||||
pub struct StartCollatorParams<Block: BlockT, RA, BS, Spawner> {
|
||||
pub para_id: ParaId,
|
||||
@@ -330,7 +251,7 @@ pub async fn start_collator<Block, RA, BS, Spawner>(
|
||||
para_id,
|
||||
block_status,
|
||||
announce_block,
|
||||
mut overseer_handle,
|
||||
overseer_handle,
|
||||
spawner,
|
||||
key,
|
||||
parachain_consensus,
|
||||
@@ -343,34 +264,28 @@ pub async fn start_collator<Block, RA, BS, Spawner>(
|
||||
RA: ProvideRuntimeApi<Block> + Send + Sync + 'static,
|
||||
RA::Api: CollectCollationInfo<Block>,
|
||||
{
|
||||
let collator = Collator::new(
|
||||
block_status,
|
||||
Arc::new(spawner),
|
||||
announce_block,
|
||||
runtime_api,
|
||||
parachain_consensus,
|
||||
);
|
||||
let collator_service =
|
||||
CollatorService::new(block_status, Arc::new(spawner.clone()), announce_block, runtime_api);
|
||||
|
||||
let span = tracing::Span::current();
|
||||
let config = CollationGenerationConfig {
|
||||
key,
|
||||
para_id,
|
||||
collator: Box::new(move |relay_parent, validation_data| {
|
||||
let collator = collator.clone();
|
||||
collator
|
||||
.produce_candidate(relay_parent, validation_data.clone())
|
||||
.instrument(span.clone())
|
||||
.boxed()
|
||||
}),
|
||||
};
|
||||
let collator = Collator::new(collator_service, parachain_consensus);
|
||||
|
||||
overseer_handle
|
||||
.send_msg(CollationGenerationMessage::Initialize(config), "StartCollator")
|
||||
.await;
|
||||
let mut request_stream = relay_chain_driven::init(key, para_id, overseer_handle).await;
|
||||
|
||||
overseer_handle
|
||||
.send_msg(CollatorProtocolMessage::CollateOn(para_id), "StartCollator")
|
||||
.await;
|
||||
let collation_future = Box::pin(async move {
|
||||
while let Some(request) = request_stream.next().await {
|
||||
let collation = collator
|
||||
.clone()
|
||||
.produce_candidate(
|
||||
*request.relay_parent(),
|
||||
request.persisted_validation_data().clone(),
|
||||
)
|
||||
.await;
|
||||
|
||||
request.complete(collation);
|
||||
}
|
||||
});
|
||||
|
||||
spawner.spawn("cumulus-relay-driven-collator", None, collation_future);
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -378,12 +293,14 @@ mod tests {
|
||||
use super::*;
|
||||
use async_trait::async_trait;
|
||||
use cumulus_client_consensus_common::ParachainCandidate;
|
||||
use cumulus_primitives_core::ParachainBlockData;
|
||||
use cumulus_test_client::{
|
||||
Client, ClientBlockImportExt, DefaultTestClientBuilderExt, InitBlockBuilder,
|
||||
TestClientBuilder, TestClientBuilderExt,
|
||||
};
|
||||
use cumulus_test_runtime::{Block, Header};
|
||||
use futures::{channel::mpsc, executor::block_on, StreamExt};
|
||||
use polkadot_node_subsystem::messages::CollationGenerationMessage;
|
||||
use polkadot_node_subsystem_test_helpers::ForwardSubsystem;
|
||||
use polkadot_overseer::{dummy::dummy_overseer_builder, HeadSupportsParachains};
|
||||
use sp_consensus::BlockOrigin;
|
||||
|
||||
@@ -0,0 +1,318 @@
|
||||
// Copyright 2023 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Cumulus.
|
||||
|
||||
// Cumulus is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Cumulus is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Cumulus. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
//! The Cumulus [`CollatorService`] is a utility struct for performing common
|
||||
//! operations used in parachain consensus/authoring.
|
||||
|
||||
use cumulus_client_network::WaitToAnnounce;
|
||||
use cumulus_primitives_core::{CollationInfo, CollectCollationInfo, ParachainBlockData};
|
||||
|
||||
use sc_client_api::BlockBackend;
|
||||
use sp_api::{ApiExt, ProvideRuntimeApi};
|
||||
use sp_consensus::BlockStatus;
|
||||
use sp_core::traits::SpawnNamed;
|
||||
use sp_runtime::traits::{Block as BlockT, HashFor, Header as HeaderT, Zero};
|
||||
|
||||
use cumulus_client_consensus_common::ParachainCandidate;
|
||||
use polkadot_node_primitives::{
|
||||
BlockData, Collation, CollationSecondedSignal, MaybeCompressedPoV, PoV,
|
||||
};
|
||||
|
||||
use codec::Encode;
|
||||
use futures::channel::oneshot;
|
||||
use parking_lot::Mutex;
|
||||
use std::sync::Arc;
|
||||
|
||||
/// The logging target.
|
||||
const LOG_TARGET: &str = "cumulus-collator";
|
||||
|
||||
/// Utility functions generally applicable to writing collators for Cumulus.
|
||||
pub trait ServiceInterface<Block: BlockT> {
|
||||
/// Checks the status of the given block hash in the Parachain.
|
||||
///
|
||||
/// Returns `true` if the block could be found and is good to be build on.
|
||||
fn check_block_status(&self, hash: Block::Hash, header: &Block::Header) -> bool;
|
||||
|
||||
/// Build a full [`Collation`] from a given [`ParachainCandidate`]. This requires
|
||||
/// that the underlying block has been fully imported into the underlying client,
|
||||
/// as implementations will fetch underlying runtime API data.
|
||||
///
|
||||
/// This also returns the unencoded parachain block data, in case that is desired.
|
||||
fn build_collation(
|
||||
&self,
|
||||
parent_header: &Block::Header,
|
||||
block_hash: Block::Hash,
|
||||
candidate: ParachainCandidate<Block>,
|
||||
) -> Option<(Collation, ParachainBlockData<Block>)>;
|
||||
|
||||
/// Inform networking systems that the block should be announced after an appropriate
|
||||
/// signal has been received. This returns the sending half of the signal.
|
||||
fn announce_with_barrier(
|
||||
&self,
|
||||
block_hash: Block::Hash,
|
||||
) -> oneshot::Sender<CollationSecondedSignal>;
|
||||
}
|
||||
|
||||
/// The [`CollatorService`] provides common utilities for parachain consensus and authoring.
|
||||
///
|
||||
/// This includes logic for checking the block status of arbitrary parachain headers
|
||||
/// gathered from the relay chain state, creating full [`Collation`]s to be shared with validators,
|
||||
/// and distributing new parachain blocks along the network.
|
||||
pub struct CollatorService<Block: BlockT, BS, RA> {
|
||||
block_status: Arc<BS>,
|
||||
wait_to_announce: Arc<Mutex<WaitToAnnounce<Block>>>,
|
||||
runtime_api: Arc<RA>,
|
||||
}
|
||||
|
||||
impl<Block: BlockT, BS, RA> Clone for CollatorService<Block, BS, RA> {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
block_status: self.block_status.clone(),
|
||||
wait_to_announce: self.wait_to_announce.clone(),
|
||||
runtime_api: self.runtime_api.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<Block, BS, RA> CollatorService<Block, BS, RA>
|
||||
where
|
||||
Block: BlockT,
|
||||
BS: BlockBackend<Block>,
|
||||
RA: ProvideRuntimeApi<Block>,
|
||||
RA::Api: CollectCollationInfo<Block>,
|
||||
{
|
||||
/// Create a new instance.
|
||||
pub fn new(
|
||||
block_status: Arc<BS>,
|
||||
spawner: Arc<dyn SpawnNamed + Send + Sync>,
|
||||
announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
|
||||
runtime_api: Arc<RA>,
|
||||
) -> Self {
|
||||
let wait_to_announce = Arc::new(Mutex::new(WaitToAnnounce::new(spawner, announce_block)));
|
||||
|
||||
Self { block_status, wait_to_announce, runtime_api }
|
||||
}
|
||||
|
||||
/// Checks the status of the given block hash in the Parachain.
|
||||
///
|
||||
/// Returns `true` if the block could be found and is good to be build on.
|
||||
pub fn check_block_status(&self, hash: Block::Hash, header: &Block::Header) -> bool {
|
||||
match self.block_status.block_status(hash) {
|
||||
Ok(BlockStatus::Queued) => {
|
||||
tracing::debug!(
|
||||
target: LOG_TARGET,
|
||||
block_hash = ?hash,
|
||||
"Skipping candidate production, because block is still queued for import.",
|
||||
);
|
||||
false
|
||||
},
|
||||
Ok(BlockStatus::InChainWithState) => true,
|
||||
Ok(BlockStatus::InChainPruned) => {
|
||||
tracing::error!(
|
||||
target: LOG_TARGET,
|
||||
"Skipping candidate production, because block `{:?}` is already pruned!",
|
||||
hash,
|
||||
);
|
||||
false
|
||||
},
|
||||
Ok(BlockStatus::KnownBad) => {
|
||||
tracing::error!(
|
||||
target: LOG_TARGET,
|
||||
block_hash = ?hash,
|
||||
"Block is tagged as known bad and is included in the relay chain! Skipping candidate production!",
|
||||
);
|
||||
false
|
||||
},
|
||||
Ok(BlockStatus::Unknown) => {
|
||||
if header.number().is_zero() {
|
||||
tracing::error!(
|
||||
target: LOG_TARGET,
|
||||
block_hash = ?hash,
|
||||
"Could not find the header of the genesis block in the database!",
|
||||
);
|
||||
} else {
|
||||
tracing::debug!(
|
||||
target: LOG_TARGET,
|
||||
block_hash = ?hash,
|
||||
"Skipping candidate production, because block is unknown.",
|
||||
);
|
||||
}
|
||||
false
|
||||
},
|
||||
Err(e) => {
|
||||
tracing::error!(
|
||||
target: LOG_TARGET,
|
||||
block_hash = ?hash,
|
||||
error = ?e,
|
||||
"Failed to get block status.",
|
||||
);
|
||||
false
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// Fetch the collation info from the runtime.
|
||||
///
|
||||
/// Returns `Ok(Some(_))` on success, `Err(_)` on error or `Ok(None)` if the runtime api isn't implemented by the runtime.
|
||||
pub fn fetch_collation_info(
|
||||
&self,
|
||||
block_hash: Block::Hash,
|
||||
header: &Block::Header,
|
||||
) -> Result<Option<CollationInfo>, sp_api::ApiError> {
|
||||
let runtime_api = self.runtime_api.runtime_api();
|
||||
|
||||
let api_version =
|
||||
match runtime_api.api_version::<dyn CollectCollationInfo<Block>>(block_hash)? {
|
||||
Some(version) => version,
|
||||
None => {
|
||||
tracing::error!(
|
||||
target: LOG_TARGET,
|
||||
"Could not fetch `CollectCollationInfo` runtime api version."
|
||||
);
|
||||
return Ok(None)
|
||||
},
|
||||
};
|
||||
|
||||
let collation_info = if api_version < 2 {
|
||||
#[allow(deprecated)]
|
||||
runtime_api
|
||||
.collect_collation_info_before_version_2(block_hash)?
|
||||
.into_latest(header.encode().into())
|
||||
} else {
|
||||
runtime_api.collect_collation_info(block_hash, header)?
|
||||
};
|
||||
|
||||
Ok(Some(collation_info))
|
||||
}
|
||||
|
||||
/// Build a full [`Collation`] from a given [`ParachainCandidate`]. This requires
|
||||
/// that the underlying block has been fully imported into the underlying client,
|
||||
/// as it fetches underlying runtime API data.
|
||||
///
|
||||
/// This also returns the unencoded parachain block data, in case that is desired.
|
||||
pub fn build_collation(
|
||||
&self,
|
||||
parent_header: &Block::Header,
|
||||
block_hash: Block::Hash,
|
||||
candidate: ParachainCandidate<Block>,
|
||||
) -> Option<(Collation, ParachainBlockData<Block>)> {
|
||||
let (header, extrinsics) = candidate.block.deconstruct();
|
||||
|
||||
let compact_proof = match candidate
|
||||
.proof
|
||||
.into_compact_proof::<HashFor<Block>>(*parent_header.state_root())
|
||||
{
|
||||
Ok(proof) => proof,
|
||||
Err(e) => {
|
||||
tracing::error!(target: "cumulus-collator", "Failed to compact proof: {:?}", e);
|
||||
return None
|
||||
},
|
||||
};
|
||||
|
||||
// Create the parachain block data for the validators.
|
||||
let collation_info = self
|
||||
.fetch_collation_info(block_hash, &header)
|
||||
.map_err(|e| {
|
||||
tracing::error!(
|
||||
target: LOG_TARGET,
|
||||
error = ?e,
|
||||
"Failed to collect collation info.",
|
||||
)
|
||||
})
|
||||
.ok()
|
||||
.flatten()?;
|
||||
|
||||
let block_data = ParachainBlockData::<Block>::new(header, extrinsics, compact_proof);
|
||||
|
||||
let pov = polkadot_node_primitives::maybe_compress_pov(PoV {
|
||||
block_data: BlockData(block_data.encode()),
|
||||
});
|
||||
|
||||
let upward_messages = collation_info
|
||||
.upward_messages
|
||||
.try_into()
|
||||
.map_err(|e| {
|
||||
tracing::error!(
|
||||
target: LOG_TARGET,
|
||||
error = ?e,
|
||||
"Number of upward messages should not be greater than `MAX_UPWARD_MESSAGE_NUM`",
|
||||
)
|
||||
})
|
||||
.ok()?;
|
||||
let horizontal_messages = collation_info
|
||||
.horizontal_messages
|
||||
.try_into()
|
||||
.map_err(|e| {
|
||||
tracing::error!(
|
||||
target: LOG_TARGET,
|
||||
error = ?e,
|
||||
"Number of horizontal messages should not be greater than `MAX_HORIZONTAL_MESSAGE_NUM`",
|
||||
)
|
||||
})
|
||||
.ok()?;
|
||||
|
||||
let collation = Collation {
|
||||
upward_messages,
|
||||
new_validation_code: collation_info.new_validation_code,
|
||||
processed_downward_messages: collation_info.processed_downward_messages,
|
||||
horizontal_messages,
|
||||
hrmp_watermark: collation_info.hrmp_watermark,
|
||||
head_data: collation_info.head_data,
|
||||
proof_of_validity: MaybeCompressedPoV::Compressed(pov),
|
||||
};
|
||||
|
||||
Some((collation, block_data))
|
||||
}
|
||||
|
||||
/// Inform the networking systems that the block should be announced after an appropriate
|
||||
/// signal has been received. This returns the sending half of the signal.
|
||||
pub fn announce_with_barrier(
|
||||
&self,
|
||||
block_hash: Block::Hash,
|
||||
) -> oneshot::Sender<CollationSecondedSignal> {
|
||||
let (result_sender, signed_stmt_recv) = oneshot::channel();
|
||||
self.wait_to_announce.lock().wait_to_announce(block_hash, signed_stmt_recv);
|
||||
result_sender
|
||||
}
|
||||
}
|
||||
|
||||
impl<Block, BS, RA> ServiceInterface<Block> for CollatorService<Block, BS, RA>
|
||||
where
|
||||
Block: BlockT,
|
||||
BS: BlockBackend<Block>,
|
||||
RA: ProvideRuntimeApi<Block>,
|
||||
RA::Api: CollectCollationInfo<Block>,
|
||||
{
|
||||
fn check_block_status(&self, hash: Block::Hash, header: &Block::Header) -> bool {
|
||||
CollatorService::check_block_status(self, hash, header)
|
||||
}
|
||||
|
||||
fn build_collation(
|
||||
&self,
|
||||
parent_header: &Block::Header,
|
||||
block_hash: Block::Hash,
|
||||
candidate: ParachainCandidate<Block>,
|
||||
) -> Option<(Collation, ParachainBlockData<Block>)> {
|
||||
CollatorService::build_collation(self, parent_header, block_hash, candidate)
|
||||
}
|
||||
|
||||
fn announce_with_barrier(
|
||||
&self,
|
||||
block_hash: Block::Hash,
|
||||
) -> oneshot::Sender<CollationSecondedSignal> {
|
||||
CollatorService::announce_with_barrier(self, block_hash)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user